YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/mini_tablet_server.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/mini_tablet_server.h"
34
35
#include <functional>
36
#include <memory>
37
#include <string>
38
#include <utility>
39
40
#include <glog/logging.h>
41
42
#include "yb/common/index.h"
43
#include "yb/common/partition.h"
44
#include "yb/common/schema.h"
45
46
#include "yb/consensus/consensus.pb.h"
47
48
#include "yb/encryption/encrypted_file_factory.h"
49
#include "yb/encryption/header_manager_impl.h"
50
#include "yb/encryption/universe_key_manager.h"
51
52
#include "yb/rocksutil/rocksdb_encrypted_file_factory.h"
53
54
#include "yb/rpc/messenger.h"
55
56
#include "yb/server/rpc_server.h"
57
58
#include "yb/tablet/tablet-harness.h"
59
#include "yb/tablet/tablet.h"
60
#include "yb/tablet/tablet_metadata.h"
61
#include "yb/tablet/tablet_peer.h"
62
63
#include "yb/tserver/tablet_server.h"
64
#include "yb/tserver/ts_tablet_manager.h"
65
66
#include "yb/util/flag_tags.h"
67
#include "yb/util/net/sockaddr.h"
68
#include "yb/util/net/tunnel.h"
69
#include "yb/util/scope_exit.h"
70
#include "yb/util/status.h"
71
72
using std::pair;
73
74
using yb::consensus::Consensus;
75
using yb::consensus::ConsensusOptions;
76
using yb::consensus::RaftPeerPB;
77
using yb::consensus::RaftConfigPB;
78
using yb::log::Log;
79
using strings::Substitute;
80
using yb::tablet::TabletPeer;
81
82
DECLARE_bool(rpc_server_allow_ephemeral_ports);
83
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
84
DECLARE_int32(TEST_nodes_per_cloud);
85
86
DEFINE_test_flag(bool, private_broadcast_address, false,
87
                 "Use private address for broadcast address in tests.");
88
89
namespace yb {
90
namespace tserver {
91
92
MiniTabletServer::MiniTabletServer(const std::vector<std::string>& wal_paths,
93
                                   const std::vector<std::string>& data_paths,
94
                                   uint16_t rpc_port,
95
                                   const TabletServerOptions& extra_opts, int index)
96
  : started_(false),
97
    opts_(extra_opts),
98
    index_(index + 1),
99
    universe_key_manager_(new encryption::UniverseKeyManager()),
100
    encrypted_env_(NewEncryptedEnv(encryption::DefaultHeaderManager(universe_key_manager_.get()))),
101
    rocksdb_encrypted_env_(
102
1.52k
      NewRocksDBEncryptedEnv(encryption::DefaultHeaderManager(universe_key_manager_.get()))) {
103
104
  // Start RPC server on loopback.
105
1.52k
  FLAGS_rpc_server_allow_ephemeral_ports = true;
106
1.52k
  opts_.rpc_opts.rpc_bind_addresses = server::TEST_RpcBindEndpoint(index_, rpc_port);
107
  // A.B.C.D.xip.io resolves to A.B.C.D so it is very useful for testing.
108
1.52k
  opts_.broadcast_addresses = {
109
1.52k
    HostPort(server::TEST_RpcAddress(index_,
110
1.52k
                                     server::Private(FLAGS_TEST_private_broadcast_address)),
111
1.52k
    rpc_port) };
112
1.52k
  opts_.webserver_opts.port = 0;
113
1.52k
  opts_.webserver_opts.bind_interface = opts_.broadcast_addresses.front().host();
114
1.52k
  if (!opts_.has_placement_cloud()) {
115
1.52k
    opts_.SetPlacement(Format("cloud$0", (index_ + 1) / FLAGS_TEST_nodes_per_cloud),
116
1.52k
                       Format("rack$0", index_), "zone");
117
1.52k
  }
118
1.52k
  opts_.fs_opts.wal_paths = wal_paths;
119
1.52k
  opts_.fs_opts.data_paths = data_paths;
120
1.52k
  opts_.universe_key_manager = universe_key_manager_.get();
121
1.52k
  opts_.env = encrypted_env_.get();
122
1.52k
  opts_.rocksdb_env = rocksdb_encrypted_env_.get();
123
1.52k
}
124
125
MiniTabletServer::MiniTabletServer(const string& fs_root,
126
                                   uint16_t rpc_port,
127
                                   const TabletServerOptions& extra_opts,
128
                                   int index)
129
1.49k
  : MiniTabletServer({ fs_root }, { fs_root }, rpc_port, extra_opts, index) {
130
1.49k
}
Unexecuted instantiation: yb::tserver::MiniTabletServer::MiniTabletServer(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned short, yb::tserver::TabletServerOptions const&, int)
yb::tserver::MiniTabletServer::MiniTabletServer(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned short, yb::tserver::TabletServerOptions const&, int)
Line
Count
Source
129
1.49k
  : MiniTabletServer({ fs_root }, { fs_root }, rpc_port, extra_opts, index) {
130
1.49k
}
131
132
210
MiniTabletServer::~MiniTabletServer() {
133
210
}
134
135
Result<std::unique_ptr<MiniTabletServer>> MiniTabletServer::CreateMiniTabletServer(
136
35
    const string& fs_root, uint16_t rpc_port, int index) {
137
35
  auto options_result = TabletServerOptions::CreateTabletServerOptions();
138
35
  RETURN_NOT_OK(options_result);
139
35
  return std::make_unique<MiniTabletServer>(fs_root, rpc_port, *options_result, index);
140
35
}
141
142
1.52k
Status MiniTabletServer::Start() {
143
1.52k
  CHECK(!started_);
144
145
1.52k
  std::unique_ptr<TabletServer> server(new enterprise::TabletServer(opts_));
146
1.52k
  RETURN_NOT_OK(server->Init());
147
148
1.52k
  RETURN_NOT_OK(server->Start());
149
150
1.43k
  server_.swap(server);
151
152
1.43k
  RETURN_NOT_OK(Reconnect());
153
154
1.43k
  started_ = true;
155
1.43k
  return Status::OK();
156
1.43k
}
157
158
0
void MiniTabletServer::Isolate() {
159
0
  server::TEST_Isolate(server_->messenger());
160
0
  tunnel_->Shutdown();
161
0
}
162
163
838
Status MiniTabletServer::Reconnect() {
164
838
  server::TEST_SetupConnectivity(server_->messenger(), index_);
165
166
838
  if (FLAGS_TEST_private_broadcast_address) {
167
8
    return Status::OK();
168
8
  }
169
170
830
  tunnel_ = std::make_unique<Tunnel>(&server_->messenger()->io_service());
171
830
  auto started_tunnel = false;
172
830
  auto se = ScopeExit([this, &started_tunnel] {
173
829
    if (!started_tunnel) {
174
1
      tunnel_->Shutdown();
175
1
    }
176
829
  });
177
178
830
  std::vector<Endpoint> local;
179
830
  RETURN_NOT_OK(opts_.broadcast_addresses[0].ResolveAddresses(&local));
180
830
  Endpoint remote = VERIFY_RESULT(ParseEndpoint(opts_.rpc_opts.rpc_bind_addresses, 0));
181
830
  RETURN_NOT_OK(tunnel_->Start(
182
830
      local.front(), remote, [messenger = server_->messenger()](const IpAddress& address) {
183
830
    return !messenger->TEST_ShouldArtificiallyRejectIncomingCallsFrom(address);
184
830
  }));
185
829
  started_tunnel = true;
186
829
  return Status::OK();
187
830
}
188
189
14
Status MiniTabletServer::WaitStarted() {
190
14
  return server_->WaitInited();
191
14
}
192
193
99
void MiniTabletServer::Shutdown() {
194
99
  if (tunnel_) {
195
99
    tunnel_->Shutdown();
196
99
  }
197
99
  if (started_) {
198
    // Save bind address and port so we can later restart the server.
199
99
    opts_.rpc_opts.rpc_bind_addresses = server::TEST_RpcBindEndpoint(
200
99
        index_, bound_rpc_addr().port());
201
99
    opts_.webserver_opts.port = bound_http_addr().port();
202
99
    server_->Shutdown();
203
99
    tunnel_.reset();
204
99
    server_.reset();
205
99
  }
206
99
  started_ = false;
207
99
}
208
209
namespace {
210
211
CHECKED_STATUS ForAllTablets(
212
    MiniTabletServer* mts,
213
0
    std::function<Status(TabletPeer* tablet_peer)> action) {
214
0
  if (!mts->server()) {
215
0
    return STATUS(IllegalState, "Server is not running");
216
0
  }
217
0
  auto tablets = mts->server()->tablet_manager()->GetTabletPeers();
218
0
  for (const auto& tablet : tablets) {
219
0
    RETURN_NOT_OK(action(tablet.get()));
220
0
  }
221
0
  return Status::OK();
222
0
}
223
224
}  // namespace
225
226
0
Status MiniTabletServer::FlushTablets(tablet::FlushMode mode, tablet::FlushFlags flags) {
227
0
  if (!server_) {
228
0
    return Status::OK();
229
0
  }
230
0
  return ForAllTablets(this, [mode, flags](TabletPeer* tablet_peer) -> Status {
231
0
    if (!tablet_peer->tablet()) {
232
0
      return Status::OK();
233
0
    }
234
0
    return tablet_peer->tablet()->Flush(mode, flags);
235
0
  });
236
0
}
237
238
0
Status MiniTabletServer::CompactTablets() {
239
0
  if (!server_) {
240
0
    return Status::OK();
241
0
  }
242
0
  return ForAllTablets(this, [](TabletPeer* tablet_peer) {
243
0
    if (tablet_peer->tablet()) {
244
0
      tablet_peer->tablet()->ForceRocksDBCompactInTest();
245
0
    }
246
0
    return Status::OK();
247
0
  });
248
0
}
249
250
0
Status MiniTabletServer::SwitchMemtables() {
251
0
  return ForAllTablets(this, [](TabletPeer* tablet_peer) {
252
0
    return tablet_peer->tablet()->TEST_SwitchMemtable();
253
0
  });
254
0
}
255
256
0
Status MiniTabletServer::CleanTabletLogs() {
257
0
  if (!server_) {
258
    // Nothing to clean.
259
0
    return Status::OK();
260
0
  }
261
0
  return ForAllTablets(this, [](TabletPeer* tablet_peer) {
262
0
    return tablet_peer->RunLogGC();
263
0
  });
264
0
}
265
266
0
Status MiniTabletServer::Restart() {
267
0
  CHECK(started_);
268
0
  Shutdown();
269
0
  return Start();
270
0
}
271
272
0
Status MiniTabletServer::RestartStoppedServer() {
273
0
  Shutdown();
274
0
  return Start();
275
0
}
276
277
40
RaftConfigPB MiniTabletServer::CreateLocalConfig() const {
278
40
  CHECK
(started_) << "Must Start()"0
;
279
40
  RaftConfigPB config;
280
40
  RaftPeerPB* peer = config.add_peers();
281
40
  peer->set_permanent_uuid(server_->instance_pb().permanent_uuid());
282
40
  peer->set_member_type(consensus::PeerMemberType::VOTER);
283
40
  auto host_port = peer->mutable_last_known_private_addr()->Add();
284
40
  host_port->set_host(bound_rpc_addr().address().to_string());
285
40
  host_port->set_port(bound_rpc_addr().port());
286
40
  return config;
287
40
}
288
289
Status MiniTabletServer::AddTestTablet(const std::string& ns_id,
290
                                       const std::string& table_id,
291
                                       const std::string& tablet_id,
292
                                       const Schema& schema,
293
31
                                       TableType table_type) {
294
31
  return AddTestTablet(ns_id, table_id, tablet_id, schema, CreateLocalConfig(), table_type);
295
31
}
296
297
Status MiniTabletServer::AddTestTablet(const std::string& ns_id,
298
                                       const std::string& table_id,
299
                                       const std::string& tablet_id,
300
                                       const Schema& schema,
301
                                       const RaftConfigPB& config,
302
31
                                       TableType table_type) {
303
31
  CHECK
(started_) << "Must Start()"0
;
304
31
  Schema schema_with_ids = SchemaBuilder(schema).Build();
305
31
  pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition(schema_with_ids);
306
307
31
  auto table_info = std::make_shared<tablet::TableInfo>(
308
31
      table_id, ns_id, table_id, table_type, schema_with_ids, IndexMap(),
309
31
      boost::none /* index_info */, 0 /* schema_version */, partition.first);
310
311
31
  return ResultToStatus(server_->tablet_manager()->CreateNewTablet(
312
31
      table_info, tablet_id, partition.second, config));
313
31
}
314
315
7
void MiniTabletServer::FailHeartbeats(bool fail_heartbeats_for_tests) {
316
7
  server_->set_fail_heartbeats_for_tests(fail_heartbeats_for_tests);
317
7
}
318
319
227
Endpoint MiniTabletServer::bound_rpc_addr() const {
320
227
  CHECK(started_);
321
227
  return server_->first_rpc_address();
322
227
}
323
324
104
Endpoint MiniTabletServer::bound_http_addr() const {
325
104
  CHECK(started_);
326
104
  return server_->first_http_address();
327
104
}
328
329
0
std::string MiniTabletServer::bound_http_addr_str() const {
330
0
  return HostPort::FromBoundEndpoint(bound_http_addr()).ToString();
331
0
}
332
333
0
std::string MiniTabletServer::bound_rpc_addr_str() const {
334
0
  return HostPort::FromBoundEndpoint(bound_rpc_addr()).ToString();
335
0
}
336
337
0
FsManager& MiniTabletServer::fs_manager() const {
338
0
  CHECK(started_);
339
0
  return *server_->fs_manager();
340
0
}
341
342
} // namespace tserver
343
} // namespace yb