YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/stepdown_under_load-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <memory>
16
#include <vector>
17
18
#include "yb/client/client.h"
19
#include "yb/client/table.h"
20
21
#include "yb/common/entity_ids.h"
22
23
#include "yb/integration-tests/cluster_itest_util.h"
24
#include "yb/integration-tests/cluster_verifier.h"
25
#include "yb/integration-tests/external_mini_cluster.h"
26
#include "yb/integration-tests/load_generator.h"
27
#include "yb/integration-tests/yb_table_test_base.h"
28
29
#include "yb/master/master_cluster.proxy.h"
30
31
#include "yb/util/test_util.h"
32
33
namespace yb {
34
namespace itest {
35
36
using client::YBClient;
37
using client::YBClientBuilder;
38
using client::YBTable;
39
using integration_tests::YBTableTestBase;
40
using std::shared_ptr;
41
using std::string;
42
using std::unique_ptr;
43
using std::vector;
44
45
namespace {
46
47
const auto kDefaultTimeout = MonoDelta::FromSeconds(30);
48
49
}  // anonymous namespace
50
51
52
class StepDownUnderLoadTest : public YBTableTestBase {
53
 public:
54
3
  bool use_external_mini_cluster() override { return true; }
55
2
  int num_tablets() override { return 1; }
56
1
  bool enable_ysql() override { return false; }
57
};
58
59
1
TEST_F(StepDownUnderLoadTest, TestStepDownUnderLoad) {
60
1
  std::atomic_bool stop_requested_flag(false);
61
1
  static constexpr int kRows = 1000000;
62
1
  static constexpr int kStartKey = 0;
63
1
  static constexpr int kWriterThreads = 4;
64
1
  static constexpr int kReaderThreads = 4;
65
1
  static constexpr int kValueSizeBytes = 16;
66
67
  // Tolerate some errors in the load test due to temporary unavailability.
68
1
  static constexpr int kMaxWriteErrors = 1000;
69
1
  static constexpr int kMaxReadErrors = 1000;
70
71
  // Create two separate clients for read and writes.
72
1
  auto write_client = CreateYBClient();
73
1
  auto read_client = CreateYBClient();
74
1
  yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_);
75
1
  yb::load_generator::YBSessionFactory read_session_factory(read_client.get(), &table_);
76
77
1
  yb::load_generator::MultiThreadedWriter writer(kRows, kStartKey, kWriterThreads,
78
1
                                                 &write_session_factory, &stop_requested_flag,
79
1
                                                 kValueSizeBytes, kMaxWriteErrors);
80
81
1
  yb::load_generator::MultiThreadedReader reader(kRows, kReaderThreads, &read_session_factory,
82
1
                                                 writer.InsertionPoint(), writer.InsertedKeys(),
83
1
                                                 writer.FailedKeys(), &stop_requested_flag,
84
1
                                                 kValueSizeBytes, kMaxReadErrors);
85
86
1
  auto* const emc = external_mini_cluster();
87
0
  TabletServerMap ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(emc));
88
89
0
  vector<TabletId> tablet_ids;
90
0
  {
91
0
    const auto ts0_uuid = ts_map.begin()->first;
92
0
    auto* const ts0_details = ts_map[ts0_uuid].get();
93
0
    ASSERT_OK(ListRunningTabletIds(ts0_details, kDefaultTimeout, &tablet_ids));
94
0
    ASSERT_EQ(1, tablet_ids.size());
95
0
  }
96
0
  const TabletId tablet_id(tablet_ids.front());
97
98
0
  writer.Start();
99
100
0
  reader.set_client_id(write_session_factory.ClientId());
101
0
  reader.Start();
102
103
0
  for (int i = 0; i < 10 && !stop_requested_flag; ++i) {
104
0
    TServerDetails* leader = nullptr;
105
0
    ASSERT_OK(FindTabletLeader(ts_map, tablet_id, kDefaultTimeout, &leader));
106
0
    CHECK_NOTNULL(leader);
107
108
    // Find a non-leader tablet and restart it.
109
0
    const TServerDetails* non_leader = nullptr;
110
0
    for (const auto& ts_map_entry : ts_map) {
111
0
      const TServerDetails* ts_details = ts_map_entry.second.get();
112
0
      if (ts_details->uuid() != leader->uuid()) {
113
0
        non_leader = ts_details;
114
0
        break;
115
0
      }
116
0
    }
117
0
    ASSERT_NE(non_leader->uuid(), leader->uuid());
118
119
0
    auto *const external_ts = emc->tablet_server_by_uuid(non_leader->uuid());
120
0
    external_ts->Shutdown();
121
0
    SleepFor(MonoDelta::FromSeconds(3));
122
0
    ASSERT_OK(external_ts->Restart());
123
124
0
    while (!emc->tablet_server_by_uuid(non_leader->uuid())->IsProcessAlive()) {
125
0
      SleepFor(MonoDelta::FromMilliseconds(50));
126
0
    }
127
128
    // Step down in favor of the server that was stopped at the previous iteration and therefore
129
    // has a high chance of not being caught up. This stepdown will most likely be unsuccessful,
130
    // but might uncover bugs in commit index handling on the new leader.
131
0
    ASSERT_OK(FindTabletLeader(ts_map, tablet_id, kDefaultTimeout, &leader));
132
0
    CHECK_NOTNULL(leader);
133
0
    auto s = LeaderStepDown(leader, tablet_id, non_leader, kDefaultTimeout);
134
0
    ASSERT_TRUE(s.ok() || s.IsIllegalState());
135
0
  }
136
137
0
  stop_requested_flag = true;  // stop both reader and writer
138
0
  writer.WaitForCompletion();
139
0
  LOG(INFO) << "Writing complete";
140
141
0
  reader.WaitForCompletion();
142
0
  LOG(INFO) << "Reading complete";
143
144
0
  ASSERT_EQ(0, writer.num_write_errors());
145
0
  ASSERT_EQ(0, reader.num_read_errors());
146
147
0
  ASSERT_GE(writer.num_writes(), kWriterThreads);
148
149
  // Assuming reads are at least as fast as writes.
150
0
  ASSERT_GE(reader.num_reads(), kReaderThreads);
151
152
0
  ClusterVerifier cluster_verifier(external_mini_cluster());
153
0
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
154
0
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY,
155
0
      writer.num_writes()));
156
0
}
157
158
}  // namespace itest
159
}  // namespace yb