/Users/deen/code/yugabyte-db/src/yb/tools/ysck_remote-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <gtest/gtest.h> |
34 | | |
35 | | #include "yb/client/client.h" |
36 | | #include "yb/client/schema.h" |
37 | | #include "yb/client/session.h" |
38 | | #include "yb/client/table.h" |
39 | | #include "yb/client/table_creator.h" |
40 | | #include "yb/client/yb_op.h" |
41 | | |
42 | | #include "yb/gutil/strings/substitute.h" |
43 | | |
44 | | #include "yb/integration-tests/mini_cluster.h" |
45 | | |
46 | | #include "yb/tools/data_gen_util.h" |
47 | | #include "yb/tools/ysck_remote.h" |
48 | | |
49 | | #include "yb/util/monotime.h" |
50 | | #include "yb/util/promise.h" |
51 | | #include "yb/util/random.h" |
52 | | #include "yb/util/status_log.h" |
53 | | #include "yb/util/test_util.h" |
54 | | #include "yb/util/thread.h" |
55 | | |
56 | | using namespace std::literals; |
57 | | |
58 | | DECLARE_int32(heartbeat_interval_ms); |
59 | | |
60 | | namespace yb { |
61 | | namespace tools { |
62 | | |
63 | | using client::YBColumnSchema; |
64 | | using client::YBSchemaBuilder; |
65 | | using client::YBSession; |
66 | | using client::YBTable; |
67 | | using client::YBTableCreator; |
68 | | using client::YBTableName; |
69 | | using std::shared_ptr; |
70 | | using std::static_pointer_cast; |
71 | | using std::string; |
72 | | using std::vector; |
73 | | using strings::Substitute; |
74 | | |
75 | | static const YBTableName kTableName(YQL_DATABASE_CQL, "my_keyspace", "ysck-test-table"); |
76 | | |
77 | | class RemoteYsckTest : public YBTest { |
78 | | public: |
79 | | RemoteYsckTest() |
80 | 0 | : random_(SeedRandom()) { |
81 | 0 | YBSchemaBuilder b; |
82 | 0 | b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey(); |
83 | 0 | b.AddColumn("int_val")->Type(INT32)->NotNull(); |
84 | 0 | CHECK_OK(b.Build(&schema_)); |
85 | 0 | } |
86 | | |
87 | 0 | void SetUp() override { |
88 | 0 | YBTest::SetUp(); |
89 | | |
90 | | // Speed up testing, saves about 700ms per TEST_F. |
91 | 0 | FLAGS_heartbeat_interval_ms = 10; |
92 | |
|
93 | 0 | MiniClusterOptions opts; |
94 | 0 | opts.num_tablet_servers = 3; |
95 | 0 | mini_cluster_.reset(new MiniCluster(opts)); |
96 | 0 | ASSERT_OK(mini_cluster_->Start()); |
97 | |
|
98 | 0 | master_rpc_addr_ = ASSERT_RESULT(mini_cluster_->GetLeaderMasterBoundRpcAddr()); |
99 | | |
100 | | // Connect to the cluster. |
101 | 0 | client_ = ASSERT_RESULT(mini_cluster_->CreateClient()); |
102 | | |
103 | | // Create one table. |
104 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(), |
105 | 0 | kTableName.namespace_type())); |
106 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
107 | 0 | ASSERT_OK(table_creator->table_name(kTableName) |
108 | 0 | .schema(&schema_) |
109 | 0 | .num_tablets(3) |
110 | 0 | .Create()); |
111 | | // Make sure we can open the table. |
112 | 0 | ASSERT_OK(client_->OpenTable(kTableName, &client_table_)); |
113 | |
|
114 | 0 | ASSERT_OK(RemoteYsckMaster::Build(HostPort(master_rpc_addr_), &master_)); |
115 | 0 | cluster_.reset(new YsckCluster(master_)); |
116 | 0 | ysck_.reset(new Ysck(cluster_)); |
117 | 0 | } |
118 | | |
119 | 0 | void TearDown() override { |
120 | 0 | client_.reset(); |
121 | 0 | if (mini_cluster_) { |
122 | 0 | mini_cluster_->Shutdown(); |
123 | 0 | mini_cluster_.reset(); |
124 | 0 | } |
125 | 0 | YBTest::TearDown(); |
126 | 0 | } |
127 | | |
128 | | // Writes rows to the table until the continue_writing flag is set to false. |
129 | | // |
130 | | // Public for use with std::bind. |
131 | | void GenerateRowWritesLoop(CountDownLatch* started_writing, |
132 | | const AtomicBool& continue_writing, |
133 | 0 | Promise<Status>* promise) { |
134 | 0 | shared_ptr<YBTable> table; |
135 | 0 | Status status = client_->OpenTable(kTableName, &table); |
136 | 0 | if (!status.ok()) { |
137 | 0 | promise->Set(status); |
138 | 0 | return; |
139 | 0 | } |
140 | 0 | shared_ptr<YBSession> session(client_->NewSession()); |
141 | 0 | session->SetTimeout(10s); |
142 | |
|
143 | 0 | for (uint64_t i = 0; continue_writing.Load(); i++) { |
144 | 0 | std::shared_ptr<client::YBqlWriteOp> insert(table->NewQLInsert()); |
145 | 0 | GenerateDataForRow(table->schema(), i, &random_, insert->mutable_request()); |
146 | 0 | status = session->ApplyAndFlush(insert); |
147 | 0 | if (!status.ok()) { |
148 | 0 | promise->Set(status); |
149 | 0 | return; |
150 | 0 | } |
151 | 0 | started_writing->CountDown(1); |
152 | 0 | } |
153 | 0 | promise->Set(Status::OK()); |
154 | 0 | } |
155 | | |
156 | | protected: |
157 | | // Generate a set of split rows for tablets used in this test. |
158 | 0 | Status GenerateRowWrites(uint64_t num_rows) { |
159 | 0 | shared_ptr<YBTable> table; |
160 | 0 | RETURN_NOT_OK(client_->OpenTable(kTableName, &table)); |
161 | 0 | shared_ptr<YBSession> session(client_->NewSession()); |
162 | 0 | session->SetTimeout(10s); |
163 | 0 | for (uint64_t i = 0; i < num_rows; i++) { |
164 | 0 | VLOG(1) << "Generating write for row id " << i; |
165 | 0 | std::shared_ptr<client::YBqlWriteOp> insert(table->NewQLInsert()); |
166 | 0 | GenerateDataForRow(table->schema(), i, &random_, insert->mutable_request()); |
167 | 0 | session->Apply(insert); |
168 | |
|
169 | 0 | if (i > 0 && i % 1000 == 0) { |
170 | 0 | RETURN_NOT_OK(session->Flush()); |
171 | 0 | } |
172 | 0 | } |
173 | 0 | RETURN_NOT_OK(session->Flush()); |
174 | 0 | return Status::OK(); |
175 | 0 | } |
176 | | |
177 | | std::shared_ptr<Ysck> ysck_; |
178 | | std::unique_ptr<client::YBClient> client_; |
179 | | |
180 | | private: |
181 | | HostPort master_rpc_addr_; |
182 | | std::shared_ptr<MiniCluster> mini_cluster_; |
183 | | client::YBSchema schema_; |
184 | | shared_ptr<client::YBTable> client_table_; |
185 | | std::shared_ptr<YsckMaster> master_; |
186 | | std::shared_ptr<YsckCluster> cluster_; |
187 | | Random random_; |
188 | | }; |
189 | | |
190 | 0 | TEST_F(RemoteYsckTest, TestMasterOk) { |
191 | 0 | ASSERT_OK(ysck_->CheckMasterRunning()); |
192 | 0 | } |
193 | | |
194 | 0 | TEST_F(RemoteYsckTest, TestTabletServersOk) { |
195 | 0 | LOG(INFO) << "Fetching table and tablet info..."; |
196 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
197 | 0 | LOG(INFO) << "Checking tablet servers are running..."; |
198 | 0 | ASSERT_OK(ysck_->CheckTabletServersRunning()); |
199 | 0 | } |
200 | | |
201 | 0 | TEST_F(RemoteYsckTest, TestTableConsistency) { |
202 | 0 | MonoTime deadline = MonoTime::Now(); |
203 | 0 | deadline.AddDelta(MonoDelta::FromSeconds(30)); |
204 | 0 | Status s; |
205 | 0 | while (MonoTime::Now().ComesBefore(deadline)) { |
206 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
207 | 0 | s = ysck_->CheckTablesConsistency(); |
208 | 0 | if (s.ok()) { |
209 | 0 | break; |
210 | 0 | } |
211 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
212 | 0 | } |
213 | 0 | ASSERT_OK(s); |
214 | 0 | } |
215 | | |
216 | 0 | TEST_F(RemoteYsckTest, TestChecksum) { |
217 | 0 | uint64_t num_writes = 100; |
218 | 0 | LOG(INFO) << "Generating row writes..."; |
219 | 0 | ASSERT_OK(GenerateRowWrites(num_writes)); |
220 | |
|
221 | 0 | MonoTime deadline = MonoTime::Now(); |
222 | 0 | deadline.AddDelta(MonoDelta::FromSeconds(30)); |
223 | 0 | Status s; |
224 | 0 | while (MonoTime::Now().ComesBefore(deadline)) { |
225 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
226 | 0 | s = ysck_->ChecksumData(vector<string>(), |
227 | 0 | vector<string>(), |
228 | 0 | ChecksumOptions(MonoDelta::FromSeconds(1), 16)); |
229 | 0 | if (s.ok()) { |
230 | 0 | break; |
231 | 0 | } |
232 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
233 | 0 | } |
234 | 0 | ASSERT_OK(s); |
235 | 0 | } |
236 | | |
237 | 0 | TEST_F(RemoteYsckTest, TestChecksumTimeout) { |
238 | 0 | uint64_t num_writes = 10000; |
239 | 0 | LOG(INFO) << "Generating row writes..."; |
240 | 0 | ASSERT_OK(GenerateRowWrites(num_writes)); |
241 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
242 | | // Use an impossibly low timeout value of zero! |
243 | 0 | Status s = ysck_->ChecksumData(vector<string>(), |
244 | 0 | vector<string>(), |
245 | 0 | ChecksumOptions(MonoDelta::FromNanoseconds(0), 16)); |
246 | 0 | ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString(); |
247 | 0 | } |
248 | | |
249 | 0 | TEST_F(RemoteYsckTest, TestChecksumSnapshot) { |
250 | 0 | CountDownLatch started_writing(1); |
251 | 0 | AtomicBool continue_writing(true); |
252 | 0 | Promise<Status> promise; |
253 | 0 | scoped_refptr<Thread> writer_thread; |
254 | |
|
255 | 0 | CHECK_OK( |
256 | 0 | Thread::Create("RemoteYsckTest", |
257 | 0 | "TestChecksumSnapshot", |
258 | 0 | &RemoteYsckTest::GenerateRowWritesLoop, |
259 | 0 | this, |
260 | 0 | &started_writing, |
261 | 0 | std::cref(continue_writing), |
262 | 0 | &promise, |
263 | 0 | &writer_thread)); |
264 | 0 | CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30))); |
265 | |
|
266 | 0 | uint64_t ts = client_->GetLatestObservedHybridTime(); |
267 | 0 | MonoTime start(MonoTime::Now()); |
268 | 0 | MonoTime deadline = start; |
269 | 0 | deadline.AddDelta(MonoDelta::FromSeconds(30)); |
270 | 0 | Status s; |
271 | | // TODO: We need to loop here because safe time is not yet implemented. |
272 | | // Remove this loop when that is done. See KUDU-1056. |
273 | 0 | while (true) { |
274 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
275 | 0 | Status s = ysck_->ChecksumData(vector<string>(), vector<string>(), |
276 | 0 | ChecksumOptions(MonoDelta::FromSeconds(10), 16)); |
277 | 0 | if (s.ok()) break; |
278 | 0 | if (deadline.ComesBefore(MonoTime::Now())) break; |
279 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
280 | 0 | } |
281 | 0 | if (!s.ok()) { |
282 | 0 | LOG(WARNING) << Substitute("Timed out after $0 waiting for ysck to become consistent on TS $1. " |
283 | 0 | "Status: $2", |
284 | 0 | MonoTime::Now().GetDeltaSince(start).ToString(), |
285 | 0 | ts, s.ToString()); |
286 | 0 | EXPECT_OK(s); // To avoid ASAN complaints due to thread reading the CountDownLatch. |
287 | 0 | } |
288 | 0 | continue_writing.Store(false); |
289 | 0 | ASSERT_OK(promise.Get()); |
290 | 0 | writer_thread->Join(); |
291 | 0 | } |
292 | | |
293 | | // Test that followers & leader wait until safe time to respond to a snapshot |
294 | | // scan at current hybrid_time. TODO: Safe time not yet implemented. See KUDU-1056. |
295 | 0 | TEST_F(RemoteYsckTest, DISABLED_TestChecksumSnapshotCurrentHybridTime) { |
296 | 0 | CountDownLatch started_writing(1); |
297 | 0 | AtomicBool continue_writing(true); |
298 | 0 | Promise<Status> promise; |
299 | 0 | scoped_refptr<Thread> writer_thread; |
300 | |
|
301 | 0 | CHECK_OK( |
302 | 0 | Thread::Create("RemoteYsckTest", |
303 | 0 | "TestChecksumSnapshot", |
304 | 0 | &RemoteYsckTest::GenerateRowWritesLoop, |
305 | 0 | this, |
306 | 0 | &started_writing, |
307 | 0 | std::cref(continue_writing), |
308 | 0 | &promise, |
309 | 0 | &writer_thread)); |
310 | 0 | CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30))); |
311 | |
|
312 | 0 | ASSERT_OK(ysck_->FetchTableAndTabletInfo()); |
313 | 0 | ASSERT_OK(ysck_->ChecksumData(vector<string>(), vector<string>(), |
314 | 0 | ChecksumOptions(MonoDelta::FromSeconds(10), 16))); |
315 | 0 | continue_writing.Store(false); |
316 | 0 | ASSERT_OK(promise.Get()); |
317 | 0 | writer_thread->Join(); |
318 | 0 | } |
319 | | |
320 | | } // namespace tools |
321 | | } // namespace yb |