YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/ts_itest-base.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/ts_itest-base.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/schema.h"
18
#include "yb/client/table.h"
19
20
#include "yb/gutil/strings/split.h"
21
22
#include "yb/integration-tests/cluster_verifier.h"
23
#include "yb/integration-tests/external_mini_cluster.h"
24
#include "yb/integration-tests/external_mini_cluster_fs_inspector.h"
25
26
#include "yb/master/master_client.proxy.h"
27
#include "yb/master/master_cluster.proxy.h"
28
29
#include "yb/rpc/rpc_controller.h"
30
31
#include "yb/server/server_base.proxy.h"
32
33
#include "yb/util/opid.h"
34
#include "yb/util/status_log.h"
35
36
DEFINE_string(ts_flags, "", "Flags to pass through to tablet servers");
37
DEFINE_string(master_flags, "", "Flags to pass through to masters");
38
39
DEFINE_int32(num_tablet_servers, 3, "Number of tablet servers to start");
40
DEFINE_int32(num_replicas, 3, "Number of replicas per tablet server");
41
42
namespace yb {
43
namespace tserver {
44
45
TabletServerIntegrationTestBase::TabletServerIntegrationTestBase()
46
50
    : random_(SeedRandom()) {}
47
48
31
TabletServerIntegrationTestBase::~TabletServerIntegrationTestBase() = default;
49
50
void TabletServerIntegrationTestBase::AddExtraFlags(
51
94
    const std::string& flags_str, std::vector<std::string>* flags) {
52
94
  if (flags_str.empty()) {
53
93
    return;
54
93
  }
55
1
  std::vector<std::string> split_flags = strings::Split(flags_str, " ");
56
2
  for (const std::string& flag : split_flags) {
57
2
    flags->push_back(flag);
58
2
  }
59
1
}
60
61
void TabletServerIntegrationTestBase::CreateCluster(
62
    const std::string& data_root_path,
63
    const std::vector<std::string>& non_default_ts_flags,
64
47
    const std::vector<std::string>& non_default_master_flags) {
65
47
  LOG(INFO) << "Starting cluster with:";
66
47
  LOG(INFO) << "--------------";
67
47
  LOG(INFO) << FLAGS_num_tablet_servers << " tablet servers";
68
47
  LOG(INFO) << FLAGS_num_replicas << " replicas per TS";
69
47
  LOG(INFO) << "--------------";
70
71
47
  ExternalMiniClusterOptions opts;
72
47
  opts.num_tablet_servers = FLAGS_num_tablet_servers;
73
47
  opts.data_root = GetTestPath(data_root_path);
74
75
  // If the caller passed no flags use the default ones, where we stress consensus by setting
76
  // low timeouts and frequent cache misses.
77
47
  if (non_default_ts_flags.empty()) {
78
14
    opts.extra_tserver_flags.push_back("--log_cache_size_limit_mb=10");
79
14
    opts.extra_tserver_flags.push_back(strings::Substitute("--consensus_rpc_timeout_ms=$0",
80
14
                                                           FLAGS_consensus_rpc_timeout_ms));
81
33
  } else {
82
92
    for (const std::string& flag : non_default_ts_flags) {
83
92
      opts.extra_tserver_flags.push_back(flag);
84
92
    }
85
33
  }
86
  // Disable load balancer for master by default for these tests. You can override this through
87
  // setting flags in the passed in non_default_master_flags argument.
88
47
  opts.extra_master_flags.push_back("--enable_load_balancing=false");
89
47
  opts.extra_master_flags.push_back(yb::Format("--replication_factor=$0", FLAGS_num_replicas));
90
48
  for (const std::string& flag : non_default_master_flags) {
91
48
    opts.extra_master_flags.push_back(flag);
92
48
  }
93
94
47
  AddExtraFlags(FLAGS_ts_flags, &opts.extra_tserver_flags);
95
47
  AddExtraFlags(FLAGS_master_flags, &opts.extra_master_flags);
96
97
47
  UpdateMiniClusterOptions(&opts);
98
99
47
  cluster_.reset(new ExternalMiniCluster(opts));
100
47
  ASSERT_OK(cluster_->Start());
101
47
  inspect_.reset(new itest::ExternalMiniClusterFsInspector(cluster_.get()));
102
47
  CreateTSProxies();
103
47
}
104
105
// Creates TSServerDetails instance for each TabletServer and stores them
106
// in 'tablet_servers_'.
107
47
void TabletServerIntegrationTestBase::CreateTSProxies() {
108
47
  CHECK(tablet_servers_.empty());
109
47
  tablet_servers_ = CHECK_RESULT(itest::CreateTabletServerMap(cluster_.get()));
110
47
}
111
112
// Waits that all replicas for a all tablets of 'kTableName' table are online
113
// and creates the tablet_replicas_ map.
114
43
void TabletServerIntegrationTestBase::WaitForReplicasAndUpdateLocations() {
115
43
  int num_retries = 0;
116
117
43
  bool replicas_missing = true;
118
43
  do {
119
43
    std::unordered_multimap<std::string, itest::TServerDetails*> tablet_replicas;
120
43
    master::GetTableLocationsRequestPB req;
121
43
    master::GetTableLocationsResponsePB resp;
122
43
    rpc::RpcController controller;
123
43
    kTableName.SetIntoTableIdentifierPB(req.mutable_table());
124
43
    controller.set_timeout(MonoDelta::FromSeconds(1));
125
43
    CHECK_OK(cluster_->GetLeaderMasterProxy<master::MasterClientProxy>().GetTableLocations(
126
43
        req, &resp, &controller));
127
43
    CHECK_OK(controller.status());
128
0
    CHECK(!resp.has_error()) << "Response had an error: " << resp.error().ShortDebugString();
129
130
47
    for (const master::TabletLocationsPB& location : resp.tablet_locations()) {
131
139
      for (const master::TabletLocationsPB_ReplicaPB& replica : location.replicas()) {
132
139
        auto server = FindOrDie(tablet_servers_,
133
139
                                replica.ts_info().permanent_uuid()).get();
134
139
        tablet_replicas.emplace(location.tablet_id(), server);
135
139
      }
136
137
47
      if (tablet_replicas.count(location.tablet_id()) < implicit_cast<size_t>(FLAGS_num_replicas)) {
138
0
        LOG(WARNING)<< "Couldn't find the leader and/or replicas. Location: "
139
0
            << location.ShortDebugString();
140
0
        replicas_missing = true;
141
0
        SleepFor(MonoDelta::FromSeconds(1));
142
0
        num_retries++;
143
0
        break;
144
0
      }
145
146
47
      replicas_missing = false;
147
47
    }
148
43
    if (!replicas_missing) {
149
43
      tablet_replicas_ = tablet_replicas;
150
43
    }
151
43
  } while (replicas_missing && num_retries < kMaxRetries);
152
43
}
153
154
// Returns the last committed leader of the consensus configuration. Tries to get it from master
155
// but then actually tries to the get the committed consensus configuration to make sure.
156
itest::TServerDetails* TabletServerIntegrationTestBase::GetLeaderReplicaOrNull(
157
85
    const std::string& tablet_id) {
158
85
  std::string leader_uuid;
159
85
  Status master_found_leader_result = GetTabletLeaderUUIDFromMaster(tablet_id, &leader_uuid);
160
161
  // See if the master is up to date. I.e. if it does report a leader and if the
162
  // replica it reports as leader is still alive and (at least thinks) its still
163
  // the leader.
164
85
  itest::TServerDetails* leader;
165
85
  if (master_found_leader_result.ok()) {
166
85
    leader = GetReplicaWithUuidOrNull(tablet_id, leader_uuid);
167
85
    if (leader && GetReplicaStatusAndCheckIfLeader(leader, tablet_id,
168
32
                                                   MonoDelta::FromMilliseconds(100)).ok()) {
169
32
      return leader;
170
32
    }
171
53
  }
172
173
  // The replica we got from the master (if any) is either dead or not the leader.
174
  // Find the actual leader.
175
53
  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
176
53
      tablet_replicas_.equal_range(tablet_id);
177
53
  std::vector<itest::TServerDetails*> replicas_copy;
178
212
  for (; range.first != range.second; ++range.first) {
179
159
    replicas_copy.push_back((*range.first).second);
180
159
  }
181
182
53
  std::random_shuffle(replicas_copy.begin(), replicas_copy.end());
183
159
  for (itest::TServerDetails* replica : replicas_copy) {
184
159
    if (GetReplicaStatusAndCheckIfLeader(replica, tablet_id,
185
1
                                         MonoDelta::FromMilliseconds(100)).ok()) {
186
1
      return replica;
187
1
    }
188
159
  }
189
52
  return NULL;
190
53
}
191
192
Status TabletServerIntegrationTestBase::GetLeaderReplicaWithRetries(
193
    const std::string& tablet_id,
194
    itest::TServerDetails** leader,
195
33
    int max_attempts) {
196
33
  int attempts = 0;
197
85
  while (attempts < max_attempts) {
198
85
    *leader = GetLeaderReplicaOrNull(tablet_id);
199
85
    if (*leader) {
200
33
      return Status::OK();
201
33
    }
202
52
    attempts++;
203
52
    SleepFor(MonoDelta::FromMilliseconds(100 * attempts));
204
52
  }
205
0
  return STATUS(NotFound, "Leader replica not found");
206
33
}
207
208
Status TabletServerIntegrationTestBase::GetTabletLeaderUUIDFromMaster(const std::string& tablet_id,
209
85
                                                                      std::string* leader_uuid) {
210
85
  master::GetTableLocationsRequestPB req;
211
85
  master::GetTableLocationsResponsePB resp;
212
85
  rpc::RpcController controller;
213
85
  controller.set_timeout(MonoDelta::FromMilliseconds(100));
214
85
  kTableName.SetIntoTableIdentifierPB(req.mutable_table());
215
216
85
  RETURN_NOT_OK(cluster_->GetMasterProxy<master::MasterClientProxy>().GetTableLocations(
217
85
      req, &resp, &controller));
218
85
  for (const master::TabletLocationsPB& loc : resp.tablet_locations()) {
219
85
    if (loc.tablet_id() == tablet_id) {
220
147
      for (const master::TabletLocationsPB::ReplicaPB& replica : loc.replicas()) {
221
147
        if (replica.role() == PeerRole::LEADER) {
222
85
          *leader_uuid = replica.ts_info().permanent_uuid();
223
85
          return Status::OK();
224
85
        }
225
147
      }
226
85
    }
227
85
  }
228
0
  return STATUS(NotFound, "Unable to find leader for tablet", tablet_id);
229
85
}
230
231
itest::TServerDetails* TabletServerIntegrationTestBase::GetReplicaWithUuidOrNull(
232
    const std::string& tablet_id,
233
89
    const std::string& uuid) {
234
89
  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
235
89
      tablet_replicas_.equal_range(tablet_id);
236
157
  for (; range.first != range.second; ++range.first) {
237
156
    if ((*range.first).second->instance_id.permanent_uuid() == uuid) {
238
88
      return (*range.first).second;
239
88
    }
240
156
  }
241
1
  return NULL;
242
89
}
243
244
// Gets the locations of the consensus configuration and waits until all replicas
245
// are available for all tablets.
246
43
void TabletServerIntegrationTestBase::WaitForTSAndReplicas() {
247
43
  int num_retries = 0;
248
  // make sure the replicas are up and find the leader
249
43
  while (true) {
250
43
    if (num_retries >= kMaxRetries) {
251
0
      FAIL() << " Reached max. retries while looking up the config.";
252
0
    }
253
254
43
    Status status = cluster_->WaitForTabletServerCount(FLAGS_num_tablet_servers,
255
43
                                                       MonoDelta::FromSeconds(5));
256
43
    if (status.IsTimedOut()) {
257
0
      LOG(WARNING)<< "Timeout waiting for all replicas to be online, retrying...";
258
0
      num_retries++;
259
0
      continue;
260
0
    }
261
43
    break;
262
43
  }
263
43
  WaitForReplicasAndUpdateLocations();
264
43
}
265
266
// Removes a set of servers from the replicas_ list.
267
// Handy for controlling who to validate against after killing servers.
268
void TabletServerIntegrationTestBase::PruneFromReplicas(
269
0
    const std::unordered_set<std::string>& uuids) {
270
0
  auto iter = tablet_replicas_.begin();
271
0
  while (iter != tablet_replicas_.end()) {
272
0
    if (uuids.count((*iter).second->instance_id.permanent_uuid()) != 0) {
273
0
      iter = tablet_replicas_.erase(iter);
274
0
      continue;
275
0
    }
276
0
    ++iter;
277
0
  }
278
279
0
  for (const std::string& uuid : uuids) {
280
0
    tablet_servers_.erase(uuid);
281
0
  }
282
0
}
283
284
void TabletServerIntegrationTestBase::GetOnlyLiveFollowerReplicas(
285
    const std::string& tablet_id,
286
7
    std::vector<itest::TServerDetails*>* followers) {
287
7
  followers->clear();
288
7
  itest::TServerDetails* leader;
289
7
  CHECK_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
290
291
7
  std::vector<itest::TServerDetails*> replicas;
292
7
  pair<itest::TabletReplicaMap::iterator, itest::TabletReplicaMap::iterator> range =
293
7
      tablet_replicas_.equal_range(tablet_id);
294
28
  for (; range.first != range.second; ++range.first) {
295
21
    replicas.push_back((*range.first).second);
296
21
  }
297
298
21
  for (itest::TServerDetails* replica : replicas) {
299
21
    if (leader != NULL &&
300
21
        replica->instance_id.permanent_uuid() == leader->instance_id.permanent_uuid()) {
301
7
      continue;
302
7
    }
303
14
    Status s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id,
304
14
                                                MonoDelta::FromMilliseconds(100));
305
14
    if (s.IsIllegalState()) {
306
14
      followers->push_back(replica);
307
14
    }
308
14
  }
309
7
}
310
311
// Return the index within 'replicas' for the replica which is farthest ahead.
312
int64_t TabletServerIntegrationTestBase::GetFurthestAheadReplicaIdx(
313
    const std::string& tablet_id,
314
0
    const std::vector<itest::TServerDetails*>& replicas) {
315
0
  auto op_ids = CHECK_RESULT(GetLastOpIdForEachReplica(
316
0
      tablet_id, replicas, consensus::RECEIVED_OPID, MonoDelta::FromSeconds(10)));
317
318
0
  int64 max_index = 0;
319
0
  ssize_t max_replica_index = -1;
320
0
  for (size_t i = 0; i < op_ids.size(); i++) {
321
0
    if (op_ids[i].index > max_index) {
322
0
      max_index = op_ids[i].index;
323
0
      max_replica_index = i;
324
0
    }
325
0
  }
326
327
0
  CHECK_NE(max_replica_index, -1);
328
329
0
  return max_replica_index;
330
0
}
331
332
0
Status TabletServerIntegrationTestBase::ShutdownServerWithUUID(const std::string& uuid) {
333
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
334
0
    ExternalTabletServer* ts = cluster_->tablet_server(i);
335
0
    if (ts->instance_id().permanent_uuid() == uuid) {
336
0
      ts->Shutdown();
337
0
      return Status::OK();
338
0
    }
339
0
  }
340
0
  return STATUS(NotFound, "Unable to find server with UUID", uuid);
341
0
}
342
343
0
Status TabletServerIntegrationTestBase::RestartServerWithUUID(const std::string& uuid) {
344
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
345
0
    ExternalTabletServer* ts = cluster_->tablet_server(i);
346
0
    if (ts->instance_id().permanent_uuid() == uuid) {
347
0
      ts->Shutdown();
348
0
      RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()-1));
349
0
      RETURN_NOT_OK(ts->Restart());
350
0
      RETURN_NOT_OK(CheckTabletServersAreAlive(tablet_servers_.size()));
351
0
      return Status::OK();
352
0
    }
353
0
  }
354
0
  return STATUS(NotFound, "Unable to find server with UUID", uuid);
355
0
}
356
357
// Since we're fault-tolerant we might mask when a tablet server is
358
// dead. This returns Status::IllegalState() if fewer than 'num_tablet_servers'
359
// are alive.
360
2
Status TabletServerIntegrationTestBase::CheckTabletServersAreAlive(size_t num_tablet_servers) {
361
2
  size_t live_count = 0;
362
2
  std::string error = strings::Substitute("Fewer than $0 TabletServers were alive. Dead TSs: ",
363
2
                                          num_tablet_servers);
364
2
  rpc::RpcController controller;
365
6
  for (const itest::TabletServerMap::value_type& entry : tablet_servers_) {
366
6
    controller.Reset();
367
6
    controller.set_timeout(MonoDelta::FromSeconds(10));
368
6
    server::PingRequestPB req;
369
6
    server::PingResponsePB resp;
370
6
    Status s = entry.second->generic_proxy->Ping(req, &resp, &controller);
371
6
    if (!s.ok()) {
372
0
      error += "\n" + entry.second->ToString() +  " (" + s.ToString() + ")";
373
0
      continue;
374
0
    }
375
6
    live_count++;
376
6
  }
377
2
  if (live_count < num_tablet_servers) {
378
0
    return STATUS(IllegalState, error);
379
0
  }
380
2
  return Status::OK();
381
2
}
382
383
31
void TabletServerIntegrationTestBase::TearDown() {
384
31
  client_.reset();
385
31
  if (cluster_) {
386
28
    for (const auto* daemon : cluster_->master_daemons()) {
387
56
      EXPECT_TRUE(daemon->IsShutdown() || daemon->IsProcessAlive()) << "Daemon: " << daemon->id();
388
28
    }
389
85
    for (const auto* daemon : cluster_->tserver_daemons()) {
390
170
      EXPECT_TRUE(daemon->IsShutdown() || daemon->IsProcessAlive()) << "Daemon: " << daemon->id();
391
85
    }
392
28
    cluster_->Shutdown();
393
28
  }
394
31
  tablet_servers_.clear();
395
31
  TabletServerTestBase::TearDown();
396
31
}
397
398
41
Result<std::unique_ptr<client::YBClient>> TabletServerIntegrationTestBase::CreateClient() {
399
  // Connect to the cluster.
400
41
  client::YBClientBuilder builder;
401
41
  for (const auto* master : cluster_->master_daemons()) {
402
41
    builder.add_master_server_addr(AsString(master->bound_rpc_addr()));
403
41
  }
404
41
  return builder.Build();
405
41
}
406
407
// Create a table with a single tablet.
408
41
void TabletServerIntegrationTestBase::CreateTable() {
409
41
  ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
410
41
                                                kTableName.namespace_type()));
411
412
41
  ASSERT_OK(table_.Create(kTableName, 1, client::YBSchema(schema_), client_.get()));
413
41
}
414
415
// Starts an external cluster with a single tablet and a number of replicas equal
416
// to 'FLAGS_num_replicas'. The caller can pass 'ts_flags' to specify non-default
417
// flags to pass to the tablet servers.
418
void TabletServerIntegrationTestBase::BuildAndStart(
419
    const std::vector<std::string>& ts_flags,
420
41
    const std::vector<std::string>& master_flags) {
421
41
  CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags);
422
41
  client_ = ASSERT_RESULT(CreateClient());
423
41
  ASSERT_NO_FATALS(CreateTable());
424
41
  WaitForTSAndReplicas();
425
41
  CHECK_GT(tablet_replicas_.size(), 0);
426
41
  tablet_id_ = (*tablet_replicas_.begin()).first;
427
41
}
428
429
11
void TabletServerIntegrationTestBase::AssertAllReplicasAgree(size_t expected_result_count) {
430
11
  ClusterVerifier cluster_verifier(cluster_.get());
431
11
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
432
6
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(
433
6
      kTableName, ClusterVerifier::EXACTLY, expected_result_count));
434
6
}
435
436
0
client::YBTableType TabletServerIntegrationTestBase::table_type() {
437
0
  return client::YBTableType::YQL_TABLE_TYPE;
438
0
}
439
440
}  // namespace tserver
441
}  // namespace yb