/Users/deen/code/yugabyte-db/src/yb/integration-tests/stepdown_under_load-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <memory> |
16 | | #include <vector> |
17 | | |
18 | | #include "yb/client/client.h" |
19 | | #include "yb/client/table.h" |
20 | | |
21 | | #include "yb/common/entity_ids.h" |
22 | | |
23 | | #include "yb/integration-tests/cluster_itest_util.h" |
24 | | #include "yb/integration-tests/cluster_verifier.h" |
25 | | #include "yb/integration-tests/external_mini_cluster.h" |
26 | | #include "yb/integration-tests/load_generator.h" |
27 | | #include "yb/integration-tests/yb_table_test_base.h" |
28 | | |
29 | | #include "yb/master/master_cluster.proxy.h" |
30 | | |
31 | | #include "yb/util/test_util.h" |
32 | | |
33 | | namespace yb { |
34 | | namespace itest { |
35 | | |
36 | | using client::YBClient; |
37 | | using client::YBClientBuilder; |
38 | | using client::YBTable; |
39 | | using integration_tests::YBTableTestBase; |
40 | | using std::shared_ptr; |
41 | | using std::string; |
42 | | using std::unique_ptr; |
43 | | using std::vector; |
44 | | |
45 | | namespace { |
46 | | |
47 | | const auto kDefaultTimeout = MonoDelta::FromSeconds(30); |
48 | | |
49 | | } // anonymous namespace |
50 | | |
51 | | |
52 | | class StepDownUnderLoadTest : public YBTableTestBase { |
53 | | public: |
54 | 3 | bool use_external_mini_cluster() override { return true; } |
55 | 2 | int num_tablets() override { return 1; } |
56 | 1 | bool enable_ysql() override { return false; } |
57 | | }; |
58 | | |
59 | 1 | TEST_F(StepDownUnderLoadTest, TestStepDownUnderLoad) { |
60 | 1 | std::atomic_bool stop_requested_flag(false); |
61 | 1 | static constexpr int kRows = 1000000; |
62 | 1 | static constexpr int kStartKey = 0; |
63 | 1 | static constexpr int kWriterThreads = 4; |
64 | 1 | static constexpr int kReaderThreads = 4; |
65 | 1 | static constexpr int kValueSizeBytes = 16; |
66 | | |
67 | | // Tolerate some errors in the load test due to temporary unavailability. |
68 | 1 | static constexpr int kMaxWriteErrors = 1000; |
69 | 1 | static constexpr int kMaxReadErrors = 1000; |
70 | | |
71 | | // Create two separate clients for read and writes. |
72 | 1 | auto write_client = CreateYBClient(); |
73 | 1 | auto read_client = CreateYBClient(); |
74 | 1 | yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_); |
75 | 1 | yb::load_generator::YBSessionFactory read_session_factory(read_client.get(), &table_); |
76 | | |
77 | 1 | yb::load_generator::MultiThreadedWriter writer(kRows, kStartKey, kWriterThreads, |
78 | 1 | &write_session_factory, &stop_requested_flag, |
79 | 1 | kValueSizeBytes, kMaxWriteErrors); |
80 | | |
81 | 1 | yb::load_generator::MultiThreadedReader reader(kRows, kReaderThreads, &read_session_factory, |
82 | 1 | writer.InsertionPoint(), writer.InsertedKeys(), |
83 | 1 | writer.FailedKeys(), &stop_requested_flag, |
84 | 1 | kValueSizeBytes, kMaxReadErrors); |
85 | | |
86 | 1 | auto* const emc = external_mini_cluster(); |
87 | 0 | TabletServerMap ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(emc)); |
88 | |
|
89 | 0 | vector<TabletId> tablet_ids; |
90 | 0 | { |
91 | 0 | const auto ts0_uuid = ts_map.begin()->first; |
92 | 0 | auto* const ts0_details = ts_map[ts0_uuid].get(); |
93 | 0 | ASSERT_OK(ListRunningTabletIds(ts0_details, kDefaultTimeout, &tablet_ids)); |
94 | 0 | ASSERT_EQ(1, tablet_ids.size()); |
95 | 0 | } |
96 | 0 | const TabletId tablet_id(tablet_ids.front()); |
97 | |
|
98 | 0 | writer.Start(); |
99 | |
|
100 | 0 | reader.set_client_id(write_session_factory.ClientId()); |
101 | 0 | reader.Start(); |
102 | |
|
103 | 0 | for (int i = 0; i < 10 && !stop_requested_flag; ++i) { |
104 | 0 | TServerDetails* leader = nullptr; |
105 | 0 | ASSERT_OK(FindTabletLeader(ts_map, tablet_id, kDefaultTimeout, &leader)); |
106 | 0 | CHECK_NOTNULL(leader); |
107 | | |
108 | | // Find a non-leader tablet and restart it. |
109 | 0 | const TServerDetails* non_leader = nullptr; |
110 | 0 | for (const auto& ts_map_entry : ts_map) { |
111 | 0 | const TServerDetails* ts_details = ts_map_entry.second.get(); |
112 | 0 | if (ts_details->uuid() != leader->uuid()) { |
113 | 0 | non_leader = ts_details; |
114 | 0 | break; |
115 | 0 | } |
116 | 0 | } |
117 | 0 | ASSERT_NE(non_leader->uuid(), leader->uuid()); |
118 | |
|
119 | 0 | auto *const external_ts = emc->tablet_server_by_uuid(non_leader->uuid()); |
120 | 0 | external_ts->Shutdown(); |
121 | 0 | SleepFor(MonoDelta::FromSeconds(3)); |
122 | 0 | ASSERT_OK(external_ts->Restart()); |
123 | |
|
124 | 0 | while (!emc->tablet_server_by_uuid(non_leader->uuid())->IsProcessAlive()) { |
125 | 0 | SleepFor(MonoDelta::FromMilliseconds(50)); |
126 | 0 | } |
127 | | |
128 | | // Step down in favor of the server that was stopped at the previous iteration and therefore |
129 | | // has a high chance of not being caught up. This stepdown will most likely be unsuccessful, |
130 | | // but might uncover bugs in commit index handling on the new leader. |
131 | 0 | ASSERT_OK(FindTabletLeader(ts_map, tablet_id, kDefaultTimeout, &leader)); |
132 | 0 | CHECK_NOTNULL(leader); |
133 | 0 | auto s = LeaderStepDown(leader, tablet_id, non_leader, kDefaultTimeout); |
134 | 0 | ASSERT_TRUE(s.ok() || s.IsIllegalState()); |
135 | 0 | } |
136 | |
|
137 | 0 | stop_requested_flag = true; // stop both reader and writer |
138 | 0 | writer.WaitForCompletion(); |
139 | 0 | LOG(INFO) << "Writing complete"; |
140 | |
|
141 | 0 | reader.WaitForCompletion(); |
142 | 0 | LOG(INFO) << "Reading complete"; |
143 | |
|
144 | 0 | ASSERT_EQ(0, writer.num_write_errors()); |
145 | 0 | ASSERT_EQ(0, reader.num_read_errors()); |
146 | |
|
147 | 0 | ASSERT_GE(writer.num_writes(), kWriterThreads); |
148 | | |
149 | | // Assuming reads are at least as fast as writes. |
150 | 0 | ASSERT_GE(reader.num_reads(), kReaderThreads); |
151 | |
|
152 | 0 | ClusterVerifier cluster_verifier(external_mini_cluster()); |
153 | 0 | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
154 | 0 | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, |
155 | 0 | writer.num_writes())); |
156 | 0 | } |
157 | | |
158 | | } // namespace itest |
159 | | } // namespace yb |