/Users/deen/code/yugabyte-db/src/yb/integration-tests/update_scan_delta_compact-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <memory> |
34 | | #include <string> |
35 | | #include <vector> |
36 | | |
37 | | #include <boost/range/iterator_range.hpp> |
38 | | |
39 | | #include "yb/client/callbacks.h" |
40 | | #include "yb/client/client.h" |
41 | | #include "yb/client/error.h" |
42 | | #include "yb/client/schema.h" |
43 | | #include "yb/client/session.h" |
44 | | #include "yb/client/table_handle.h" |
45 | | #include "yb/client/yb_op.h" |
46 | | #include "yb/client/yb_table_name.h" |
47 | | |
48 | | #include "yb/gutil/strings/strcat.h" |
49 | | |
50 | | #include "yb/integration-tests/mini_cluster.h" |
51 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
52 | | |
53 | | #include "yb/tserver/mini_tablet_server.h" |
54 | | |
55 | | #include "yb/util/countdown_latch.h" |
56 | | #include "yb/util/curl_util.h" |
57 | | #include "yb/util/monotime.h" |
58 | | #include "yb/util/status_log.h" |
59 | | #include "yb/util/stopwatch.h" |
60 | | #include "yb/util/test_macros.h" |
61 | | #include "yb/util/test_util.h" |
62 | | #include "yb/util/thread.h" |
63 | | |
64 | | using namespace std::literals; |
65 | | |
66 | | DECLARE_int32(log_segment_size_mb); |
67 | | DECLARE_int32(maintenance_manager_polling_interval_ms); |
68 | | DEFINE_int32(mbs_for_flushes_and_rolls, 1, "How many MBs are needed to flush and roll"); |
69 | | DEFINE_int32(row_count, 2000, "How many rows will be used in this test for the base data"); |
70 | | DEFINE_int32(seconds_to_run, 4, |
71 | | "How long this test runs for, after inserting the base data, in seconds"); |
72 | | |
73 | | namespace yb { |
74 | | namespace tablet { |
75 | | |
76 | | using client::YBClient; |
77 | | using client::YBClientBuilder; |
78 | | using client::YBColumnSchema; |
79 | | using client::YBSchema; |
80 | | using client::YBSchemaBuilder; |
81 | | using client::YBSession; |
82 | | using client::YBStatusCallback; |
83 | | using client::YBStatusMemberCallback; |
84 | | using client::YBTable; |
85 | | using client::YBTableCreator; |
86 | | using client::YBTableName; |
87 | | using std::shared_ptr; |
88 | | |
89 | | // This integration test tries to trigger all the update-related bits while also serving as a |
90 | | // foundation for benchmarking. It first inserts 'row_count' rows and then starts two threads, |
91 | | // one that continuously updates all the rows sequentially and one that scans them all, until |
92 | | // it's been running for 'seconds_to_run'. It doesn't test for correctness, unless something |
93 | | // FATALs. |
94 | | class UpdateScanDeltaCompactionTest : public YBMiniClusterTestBase<MiniCluster> { |
95 | | protected: |
96 | 1 | UpdateScanDeltaCompactionTest() { |
97 | 1 | YBSchemaBuilder b; |
98 | 1 | b.AddColumn("key")->Type(INT64)->NotNull()->HashPrimaryKey(); |
99 | 1 | b.AddColumn("string")->Type(STRING)->NotNull(); |
100 | 1 | b.AddColumn("int64")->Type(INT64)->NotNull(); |
101 | 1 | CHECK_OK(b.Build(&schema_)); |
102 | 1 | } |
103 | | |
104 | 1 | void SetUp() override { |
105 | 1 | HybridTime::TEST_SetPrettyToString(true); |
106 | 1 | YBMiniClusterTestBase::SetUp(); |
107 | 1 | } |
108 | | |
109 | 1 | void CreateTable() { |
110 | 1 | ASSERT_NO_FATALS(InitCluster()); |
111 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(), |
112 | 0 | kTableName.namespace_type())); |
113 | 0 | ASSERT_OK(table_.Create(kTableName, CalcNumTablets(1), schema_, client_.get())); |
114 | 0 | } |
115 | | |
116 | 0 | void DoTearDown() override { |
117 | 0 | if (cluster_) { |
118 | 0 | cluster_->Shutdown(); |
119 | 0 | } |
120 | 0 | YBMiniClusterTestBase::DoTearDown(); |
121 | 0 | } |
122 | | |
123 | | // Inserts row_count rows sequentially. |
124 | | void InsertBaseData(); |
125 | | |
126 | | // Starts the update and scan threads then stops them after seconds_to_run. |
127 | | void RunThreads(); |
128 | | |
129 | | private: |
130 | | static const YBTableName kTableName; |
131 | | |
132 | 1 | void InitCluster() { |
133 | | // Start mini-cluster with 1 tserver. |
134 | 1 | cluster_.reset(new MiniCluster(MiniClusterOptions())); |
135 | 1 | ASSERT_OK(cluster_->Start()); |
136 | 1 | client_ = ASSERT_RESULT(cluster_->CreateClient()); |
137 | 0 | } |
138 | | |
139 | 0 | shared_ptr<YBSession> CreateSession() { |
140 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
141 | | // Bumped this up from 5 sec to 30 sec in hope to fix the flakiness in this test. |
142 | 0 | session->SetTimeout(30s); |
143 | 0 | return session; |
144 | 0 | } |
145 | | |
146 | | // Continuously updates the existing data until 'stop_latch' drops to 0. |
147 | | void UpdateRows(CountDownLatch* stop_latch); |
148 | | |
149 | | // Continuously scans the data until 'stop_latch' drops to 0. |
150 | | void ScanRows(CountDownLatch* stop_latch) const; |
151 | | |
152 | | // Continuously fetch various web pages on the TS. |
153 | | void CurlWebPages(CountDownLatch* stop_latch) const; |
154 | | |
155 | | // Sets the passed values on the row. |
156 | | // TODO randomize the string column. |
157 | | void MakeRow(int64_t key, int64_t val, QLWriteRequestPB* req) const; |
158 | | |
159 | | // If 'key' is a multiple of kSessionBatchSize, it uses 'flush_future' to wait for the previous |
160 | | // batch to finish and then flushes the current one. |
161 | | Status WaitForLastBatchAndFlush(int64_t key, |
162 | | std::future<client::FlushStatus>* flush_future, |
163 | | shared_ptr<YBSession> session); |
164 | | |
165 | | YBSchema schema_; |
166 | | client::TableHandle table_; |
167 | | std::unique_ptr<YBClient> client_; |
168 | | }; |
169 | | |
170 | | const YBTableName UpdateScanDeltaCompactionTest::kTableName( |
171 | | YQL_DATABASE_CQL, "my_keyspace", "update-scan-delta-compact-tbl"); |
172 | | const int kSessionBatchSize = 1000; |
173 | | |
174 | 1 | TEST_F(UpdateScanDeltaCompactionTest, TestAll) { |
175 | 1 | OverrideFlagForSlowTests("seconds_to_run", "100"); |
176 | 1 | OverrideFlagForSlowTests("row_count", "1000000"); |
177 | 1 | OverrideFlagForSlowTests("mbs_for_flushes_and_rolls", "8"); |
178 | | // Setting this high enough that we see the effects of flushes and compactions. |
179 | 1 | OverrideFlagForSlowTests("maintenance_manager_polling_interval_ms", "2000"); |
180 | 1 | FLAGS_log_segment_size_mb = FLAGS_mbs_for_flushes_and_rolls; |
181 | 1 | if (!AllowSlowTests()) { |
182 | | // Make it run more often since it's not a long test. |
183 | 1 | FLAGS_maintenance_manager_polling_interval_ms = 50; |
184 | 1 | } |
185 | | |
186 | 1 | ASSERT_NO_FATALS(CreateTable()); |
187 | 0 | ASSERT_NO_FATALS(InsertBaseData()); |
188 | 0 | ASSERT_NO_FATALS(RunThreads()); |
189 | 0 | } |
190 | | |
191 | 0 | void UpdateScanDeltaCompactionTest::InsertBaseData() { |
192 | 0 | shared_ptr<YBSession> session = CreateSession(); |
193 | 0 | std::promise<client::FlushStatus> promise; |
194 | 0 | auto flush_future = promise.get_future(); |
195 | 0 | promise.set_value(client::FlushStatus()); |
196 | |
|
197 | 0 | LOG_TIMING(INFO, "Insert") { |
198 | 0 | for (int64_t key = 0; key < FLAGS_row_count; key++) { |
199 | 0 | auto insert = table_.NewInsertOp(); |
200 | 0 | MakeRow(key, 0, insert->mutable_request()); |
201 | 0 | session->Apply(insert); |
202 | 0 | ASSERT_OK(WaitForLastBatchAndFlush(key, &flush_future, session)); |
203 | 0 | } |
204 | 0 | ASSERT_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &flush_future, session)); |
205 | 0 | if (flush_future.valid()) { |
206 | 0 | ASSERT_OK(flush_future.get().status); |
207 | 0 | } |
208 | 0 | } |
209 | 0 | } |
210 | | |
211 | 0 | void UpdateScanDeltaCompactionTest::RunThreads() { |
212 | 0 | vector<scoped_refptr<Thread> > threads; |
213 | |
|
214 | 0 | CountDownLatch stop_latch(1); |
215 | |
|
216 | 0 | { |
217 | 0 | scoped_refptr<Thread> t; |
218 | 0 | ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), |
219 | 0 | StrCat(CURRENT_TEST_CASE_NAME(), "-update"), |
220 | 0 | &UpdateScanDeltaCompactionTest::UpdateRows, this, |
221 | 0 | &stop_latch, &t)); |
222 | 0 | threads.push_back(t); |
223 | 0 | } |
224 | |
|
225 | 0 | { |
226 | 0 | scoped_refptr<Thread> t; |
227 | 0 | ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), |
228 | 0 | StrCat(CURRENT_TEST_CASE_NAME(), "-scan"), |
229 | 0 | &UpdateScanDeltaCompactionTest::ScanRows, this, |
230 | 0 | &stop_latch, &t)); |
231 | 0 | threads.push_back(t); |
232 | 0 | } |
233 | |
|
234 | 0 | { |
235 | 0 | scoped_refptr<Thread> t; |
236 | 0 | ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), |
237 | 0 | StrCat(CURRENT_TEST_CASE_NAME(), "-curl"), |
238 | 0 | &UpdateScanDeltaCompactionTest::CurlWebPages, this, |
239 | 0 | &stop_latch, &t)); |
240 | 0 | threads.push_back(t); |
241 | 0 | } |
242 | |
|
243 | 0 | SleepFor(MonoDelta::FromSeconds(FLAGS_seconds_to_run * 1.0)); |
244 | 0 | stop_latch.CountDown(); |
245 | |
|
246 | 0 | for (const scoped_refptr<Thread>& thread : threads) { |
247 | 0 | ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join()); |
248 | 0 | } |
249 | 0 | } |
250 | | |
251 | 0 | void UpdateScanDeltaCompactionTest::UpdateRows(CountDownLatch* stop_latch) { |
252 | 0 | shared_ptr<YBSession> session = CreateSession(); |
253 | |
|
254 | 0 | for (int64_t iteration = 1; stop_latch->count() > 0; iteration++) { |
255 | 0 | LOG_TIMING(INFO, "Update") { |
256 | 0 | std::promise<client::FlushStatus> promise; |
257 | 0 | auto flush_future = promise.get_future(); |
258 | 0 | promise.set_value(client::FlushStatus()); |
259 | 0 | for (int64_t key = 0; key < FLAGS_row_count && stop_latch->count() > 0; key++) { |
260 | 0 | auto update = table_.NewUpdateOp(); |
261 | 0 | MakeRow(key, iteration, update->mutable_request()); |
262 | 0 | session->Apply(update); |
263 | 0 | CHECK_OK(WaitForLastBatchAndFlush(key, &flush_future, session)); |
264 | 0 | } |
265 | 0 | CHECK_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &flush_future, session)); |
266 | 0 | if (flush_future.valid()) { |
267 | 0 | CHECK_OK(flush_future.get().status); |
268 | 0 | } |
269 | 0 | } |
270 | 0 | } |
271 | 0 | } |
272 | | |
273 | 0 | void UpdateScanDeltaCompactionTest::ScanRows(CountDownLatch* stop_latch) const { |
274 | 0 | while (stop_latch->count() > 0) { |
275 | 0 | LOG_TIMING(INFO, "Scan") { |
276 | 0 | boost::size(client::TableRange(table_)); |
277 | 0 | } |
278 | 0 | } |
279 | 0 | } |
280 | | |
281 | 0 | void UpdateScanDeltaCompactionTest::CurlWebPages(CountDownLatch* stop_latch) const { |
282 | 0 | vector<string> urls; |
283 | 0 | string base_url = yb::ToString(cluster_->mini_tablet_server(0)->bound_http_addr()); |
284 | 0 | urls.push_back(base_url + "/scans"); |
285 | 0 | urls.push_back(base_url + "/transactions"); |
286 | |
|
287 | 0 | EasyCurl curl; |
288 | 0 | faststring dst; |
289 | 0 | while (stop_latch->count() > 0) { |
290 | 0 | for (const string& url : urls) { |
291 | 0 | VLOG(1) << "Curling URL " << url; |
292 | 0 | Status status = curl.FetchURL(url, &dst); |
293 | 0 | if (status.ok()) { |
294 | 0 | CHECK_GT(dst.length(), 0); |
295 | 0 | } |
296 | 0 | } |
297 | 0 | } |
298 | 0 | } |
299 | | |
300 | | void UpdateScanDeltaCompactionTest::MakeRow(int64_t key, |
301 | | int64_t val, |
302 | 0 | QLWriteRequestPB* req) const { |
303 | 0 | QLAddInt64HashValue(req, key); |
304 | 0 | table_.AddStringColumnValue(req, "string", "TODO random string"); |
305 | 0 | table_.AddInt64ColumnValue(req, "int64", val); |
306 | 0 | } |
307 | | |
308 | | Status UpdateScanDeltaCompactionTest::WaitForLastBatchAndFlush( |
309 | 0 | int64_t key, std::future<client::FlushStatus>* flush_future, shared_ptr<YBSession> session) { |
310 | 0 | if (key % kSessionBatchSize == 0) { |
311 | 0 | RETURN_NOT_OK(flush_future->get().status); |
312 | 0 | *flush_future = session->FlushFuture(); |
313 | 0 | } |
314 | 0 | return Status::OK(); |
315 | 0 | } |
316 | | |
317 | | } // namespace tablet |
318 | | } // namespace yb |