/Users/deen/code/yugabyte-db/src/yb/integration-tests/full_stack-insert-scan-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <cmath> |
34 | | #include <cstdlib> |
35 | | #include <memory> |
36 | | #include <string> |
37 | | #include <vector> |
38 | | |
39 | | #include <gflags/gflags.h> |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/gutil/casts.h" |
43 | | |
44 | | #include "yb/client/callbacks.h" |
45 | | #include "yb/client/client-test-util.h" |
46 | | #include "yb/client/client.h" |
47 | | #include "yb/client/error.h" |
48 | | #include "yb/client/schema.h" |
49 | | #include "yb/client/session.h" |
50 | | #include "yb/client/table_handle.h" |
51 | | #include "yb/client/yb_op.h" |
52 | | #include "yb/client/yb_table_name.h" |
53 | | |
54 | | #include "yb/gutil/ref_counted.h" |
55 | | #include "yb/gutil/strings/split.h" |
56 | | #include "yb/gutil/strings/substitute.h" |
57 | | |
58 | | #include "yb/integration-tests/mini_cluster.h" |
59 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
60 | | |
61 | | #include "yb/master/mini_master.h" |
62 | | |
63 | | #include "yb/tablet/maintenance_manager.h" |
64 | | #include "yb/tablet/tablet.h" |
65 | | #include "yb/tablet/tablet_peer.h" |
66 | | |
67 | | #include "yb/tserver/mini_tablet_server.h" |
68 | | #include "yb/tserver/tablet_server.h" |
69 | | #include "yb/tserver/ts_tablet_manager.h" |
70 | | |
71 | | #include "yb/util/async_util.h" |
72 | | #include "yb/util/countdown_latch.h" |
73 | | #include "yb/util/errno.h" |
74 | | #include "yb/util/random.h" |
75 | | #include "yb/util/random_util.h" |
76 | | #include "yb/util/status.h" |
77 | | #include "yb/util/status_log.h" |
78 | | #include "yb/util/stopwatch.h" |
79 | | #include "yb/util/subprocess.h" |
80 | | #include "yb/util/test_macros.h" |
81 | | #include "yb/util/test_util.h" |
82 | | #include "yb/util/thread.h" |
83 | | |
84 | | using namespace std::literals; |
85 | | |
86 | | // Test size parameters |
87 | | DEFINE_int32(concurrent_inserts, -1, "Number of inserting clients to launch"); |
88 | | DEFINE_int32(inserts_per_client, -1, |
89 | | "Number of rows inserted by each inserter client"); |
90 | | DEFINE_int32(rows_per_batch, -1, "Number of rows per client batch"); |
91 | | |
92 | | // Perf-related FLAGS_perf_stat |
93 | | DEFINE_bool(perf_record_scan, false, "Call \"perf record --call-graph\" " |
94 | | "for the duration of the scan, disabled by default"); |
95 | | DEFINE_bool(perf_stat_scan, false, "Print \"perf stat\" results during" |
96 | | "scan to stdout, disabled by default"); |
97 | | DEFINE_bool(perf_fp_flag, false, "Only applicable with --perf_record_scan," |
98 | | " provides argument \"fp\" to the --call-graph flag"); |
99 | | DECLARE_bool(enable_maintenance_manager); |
100 | | |
101 | | using std::string; |
102 | | using std::vector; |
103 | | using std::shared_ptr; |
104 | | |
105 | | namespace yb { |
106 | | namespace tablet { |
107 | | |
108 | | using client::YBClient; |
109 | | using client::YBClientBuilder; |
110 | | using client::YBColumnSchema; |
111 | | using client::YBSchema; |
112 | | using client::YBSchemaBuilder; |
113 | | using client::YBSession; |
114 | | using client::YBStatusMemberCallback; |
115 | | using client::YBTable; |
116 | | using client::YBTableCreator; |
117 | | using client::YBTableName; |
118 | | using strings::Split; |
119 | | using strings::Substitute; |
120 | | |
121 | | namespace { |
122 | | |
123 | | const auto kSessionTimeout = 60s; |
124 | | |
125 | | } |
126 | | |
127 | | class FullStackInsertScanTest : public YBMiniClusterTestBase<MiniCluster> { |
128 | | protected: |
129 | | FullStackInsertScanTest() |
130 | | : // Set the default value depending on whether slow tests are allowed |
131 | | kNumInsertClients(DefaultFlag(FLAGS_concurrent_inserts, 3, 10)), |
132 | | kNumInsertsPerClient(DefaultFlag(FLAGS_inserts_per_client, 500, 50000)), |
133 | | kNumRows(kNumInsertClients*kNumInsertsPerClient), |
134 | | kFlushEveryN(DefaultFlag(FLAGS_rows_per_batch, 125, 5000)), |
135 | | random_(SeedRandom()), |
136 | 2 | sessions_(kNumInsertClients) { |
137 | 2 | tables_.reserve(kNumInsertClients); |
138 | 2 | } |
139 | | |
140 | | const int kNumInsertClients; |
141 | | const int kNumInsertsPerClient; |
142 | | const int kNumRows; |
143 | | |
144 | 2 | void SetUp() override { |
145 | 2 | YBMiniClusterTestBase::SetUp(); |
146 | 2 | } |
147 | | |
148 | | void CreateTable(); |
149 | | |
150 | 1 | void DoTearDown() override { |
151 | 1 | client_.reset(); |
152 | 1 | if (cluster_) { |
153 | 1 | cluster_->Shutdown(); |
154 | 1 | } |
155 | 1 | YBMiniClusterTestBase::DoTearDown(); |
156 | 1 | } |
157 | | |
158 | | void DoConcurrentClientInserts(); |
159 | | void DoTestScans(); |
160 | | void FlushToDisk(); |
161 | | |
162 | | private: |
163 | 6 | int DefaultFlag(int flag, int fast, int slow) { |
164 | 6 | if (flag != -1) return flag; |
165 | 6 | if (AllowSlowTests()) return slow; |
166 | 6 | return fast; |
167 | 6 | } |
168 | | |
169 | 2 | void InitCluster() { |
170 | | // Start mini-cluster with 1 tserver, config client options |
171 | 2 | cluster_.reset(new MiniCluster(MiniClusterOptions())); |
172 | 2 | ASSERT_OK(cluster_->Start()); |
173 | 2 | YBClientBuilder builder; |
174 | 2 | builder.add_master_server_addr( |
175 | 2 | cluster_->mini_master()->bound_rpc_addr_str()); |
176 | 2 | builder.default_rpc_timeout(MonoDelta::FromSeconds(30)); |
177 | 0 | client_ = ASSERT_RESULT(builder.Build()); |
178 | 0 | } |
179 | | |
180 | | // Adds newly generated client's session and table pointers to arrays at id |
181 | 0 | void CreateNewClient(int id) { |
182 | 0 | while (tables_.size() <= implicit_cast<size_t>(id)) { |
183 | 0 | auto table = std::make_unique<client::TableHandle>(); |
184 | 0 | ASSERT_OK(table->Open(kTableName, client_.get())); |
185 | 0 | tables_.push_back(std::move(table)); |
186 | 0 | } |
187 | 0 | std::shared_ptr<YBSession> session = client_->NewSession(); |
188 | 0 | session->SetTimeout(kSessionTimeout); |
189 | 0 | sessions_[id] = session; |
190 | 0 | } |
191 | | |
192 | | // Insert the rows that are associated with that ID. |
193 | | void InsertRows(CountDownLatch* start_latch, int id, uint32_t seed); |
194 | | |
195 | | // Run a scan from the reader_client_ with the projection schema schema |
196 | | // and LOG_TIMING message msg. |
197 | | void ScanProjection(const vector<string>& cols, const string& msg); |
198 | | |
199 | | vector<string> AllColumnNames() const; |
200 | | |
201 | | static const YBTableName kTableName; |
202 | | const int kFlushEveryN; |
203 | | |
204 | | Random random_; |
205 | | |
206 | | YBSchema schema_; |
207 | | std::unique_ptr<YBClient> client_; |
208 | | client::TableHandle reader_table_; |
209 | | // Concurrent client insertion test variables |
210 | | vector<std::shared_ptr<YBSession> > sessions_; |
211 | | std::vector<std::unique_ptr<client::TableHandle>> tables_; |
212 | | }; |
213 | | |
214 | | namespace { |
215 | | |
216 | 0 | std::unique_ptr<Subprocess> MakePerfStat() { |
217 | 0 | if (!FLAGS_perf_stat_scan) return std::unique_ptr<Subprocess>(); |
218 | | // No output flag for perf-stat 2.x, just print to output |
219 | 0 | string cmd = Substitute("perf stat --pid=$0", getpid()); |
220 | 0 | LOG(INFO) << "Calling: \"" << cmd << "\""; |
221 | 0 | return std::unique_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " "))); |
222 | 0 | } |
223 | | |
224 | 0 | std::unique_ptr<Subprocess> MakePerfRecord() { |
225 | 0 | if (!FLAGS_perf_record_scan) return std::unique_ptr<Subprocess>(); |
226 | 0 | string cmd = Substitute("perf record --pid=$0 --call-graph", getpid()); |
227 | 0 | if (FLAGS_perf_fp_flag) cmd += " fp"; |
228 | 0 | LOG(INFO) << "Calling: \"" << cmd << "\""; |
229 | 0 | return std::unique_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " "))); |
230 | 0 | } |
231 | | |
232 | 0 | void InterruptNotNull(std::unique_ptr<Subprocess> sub) { |
233 | 0 | if (!sub) return; |
234 | 0 | ASSERT_OK(sub->Kill(SIGINT)); |
235 | 0 | int exit_status = 0; |
236 | 0 | ASSERT_OK(sub->Wait(&exit_status)); |
237 | 0 | if (!exit_status) { |
238 | 0 | LOG(WARNING) << "Subprocess returned " << exit_status |
239 | 0 | << ": " << ErrnoToString(exit_status); |
240 | 0 | } |
241 | 0 | } |
242 | | |
243 | | // If key is approximately at an even multiple of 1/10 of the way between |
244 | | // start and end, then a % completion update is printed to LOG(INFO) |
245 | | // Assumes that end - start + 1 fits into an int |
246 | | void ReportTenthDone(int64_t key, int64_t start, int64_t end, |
247 | 0 | int id, int numids) { |
248 | 0 | auto done = key - start + 1; |
249 | 0 | auto total = end - start + 1; |
250 | 0 | if (total < 10) return; |
251 | 0 | if (done % (total / 10) == 0) { |
252 | 0 | auto percent = done * 100 / total; |
253 | 0 | LOG(INFO) << "Insertion thread " << id << " of " |
254 | 0 | << numids << " is "<< percent << "% done."; |
255 | 0 | } |
256 | 0 | } |
257 | | |
258 | 0 | void ReportAllDone(int id, int numids) { |
259 | 0 | LOG(INFO) << "Insertion thread " << id << " of " |
260 | 0 | << numids << " is 100% done."; |
261 | 0 | } |
262 | | |
263 | | } // anonymous namespace |
264 | | |
265 | | const YBTableName FullStackInsertScanTest::kTableName( |
266 | | YQL_DATABASE_CQL, "my_keyspace", "full-stack-mrs-test-tbl"); |
267 | | |
268 | 1 | TEST_F(FullStackInsertScanTest, MRSOnlyStressTest) { |
269 | 1 | FLAGS_enable_maintenance_manager = false; |
270 | 1 | ASSERT_NO_FATALS(CreateTable()); |
271 | 0 | ASSERT_NO_FATALS(DoConcurrentClientInserts()); |
272 | 0 | ASSERT_NO_FATALS(DoTestScans()); |
273 | 0 | } |
274 | | |
275 | 1 | TEST_F(FullStackInsertScanTest, WithDiskStressTest) { |
276 | 1 | ASSERT_NO_FATALS(CreateTable()); |
277 | 0 | ASSERT_NO_FATALS(DoConcurrentClientInserts()); |
278 | 0 | ASSERT_NO_FATALS(FlushToDisk()); |
279 | 0 | ASSERT_NO_FATALS(DoTestScans()); |
280 | 0 | } |
281 | | |
282 | 0 | void FullStackInsertScanTest::DoConcurrentClientInserts() { |
283 | 0 | vector<scoped_refptr<Thread> > threads(kNumInsertClients); |
284 | 0 | CountDownLatch start_latch(kNumInsertClients + 1); |
285 | 0 | for (int i = 0; i < kNumInsertClients; ++i) { |
286 | 0 | ASSERT_NO_FATALS(CreateNewClient(i)); |
287 | 0 | ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), |
288 | 0 | StrCat(CURRENT_TEST_CASE_NAME(), "-id", i), |
289 | 0 | &FullStackInsertScanTest::InsertRows, this, |
290 | 0 | &start_latch, i, random_.Next(), &threads[i])); |
291 | 0 | start_latch.CountDown(); |
292 | 0 | } |
293 | 0 | LOG_TIMING(INFO, |
294 | 0 | strings::Substitute("concurrent inserts ($0 rows, $1 threads)", |
295 | 0 | kNumRows, kNumInsertClients)) { |
296 | 0 | start_latch.CountDown(); |
297 | 0 | for (const scoped_refptr<Thread>& thread : threads) { |
298 | 0 | ASSERT_OK(ThreadJoiner(thread.get()).warn_every(15s).Join()); |
299 | 0 | } |
300 | 0 | } |
301 | 0 | } |
302 | | |
303 | | namespace { |
304 | | |
305 | | constexpr int kNumIntCols = 4; |
306 | | constexpr int kRandomStrMinLength = 16; |
307 | | constexpr int kRandomStrMaxLength = 31; |
308 | | |
309 | 0 | const std::vector<string>& StringColumnNames() { |
310 | 0 | static std::vector<string> result = { "string_val" }; |
311 | 0 | return result; |
312 | 0 | } |
313 | | |
314 | 0 | const std::vector<string>& Int32ColumnNames() { |
315 | 0 | static std::vector<string> result = { |
316 | 0 | "int32_val1", |
317 | 0 | "int32_val2", |
318 | 0 | "int32_val3", |
319 | 0 | "int32_val4" }; |
320 | 0 | return result; |
321 | 0 | } |
322 | | |
323 | 0 | const std::vector<string>& Int64ColumnNames() { |
324 | 0 | static std::vector<string> result = { |
325 | 0 | "int64_val1", |
326 | 0 | "int64_val2", |
327 | 0 | "int64_val3", |
328 | 0 | "int64_val4" }; |
329 | 0 | return result; |
330 | 0 | } |
331 | | |
332 | | // Fills in the fields for a row as defined by the Schema below |
333 | | // name: (key, string_val, int32_val$, int64_val$) |
334 | | // type: (int64_t, string, int32_t x4, int64_t x4) |
335 | | // The first int32 gets the id and the first int64 gets the thread |
336 | | // id. The key is assigned to "key," and the other fields are random. |
337 | | void RandomRow(Random* rng, QLWriteRequestPB* req, char* buf, int64_t key, int id, |
338 | 0 | client::TableHandle* table) { |
339 | 0 | QLAddInt64HashValue(req, key); |
340 | 0 | int len = kRandomStrMinLength + rng->Uniform(kRandomStrMaxLength - kRandomStrMinLength + 1); |
341 | 0 | RandomString(buf, len, rng); |
342 | 0 | buf[len] = '\0'; |
343 | 0 | table->AddStringColumnValue(req, StringColumnNames()[0], buf); |
344 | 0 | for (int i = 0; i < kNumIntCols; ++i) { |
345 | 0 | table->AddInt32ColumnValue(req, Int32ColumnNames()[i], rng->Next32()); |
346 | 0 | table->AddInt64ColumnValue(req, Int64ColumnNames()[i], rng->Next64()); |
347 | 0 | } |
348 | 0 | } |
349 | | |
350 | | } // namespace |
351 | | |
352 | 2 | void FullStackInsertScanTest::CreateTable() { |
353 | 2 | ASSERT_GE(kNumInsertClients, 0); |
354 | 2 | ASSERT_GE(kNumInsertsPerClient, 0); |
355 | 2 | ASSERT_NO_FATALS(InitCluster()); |
356 | | |
357 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(), |
358 | 0 | kTableName.namespace_type())); |
359 | |
|
360 | 0 | YBSchemaBuilder b; |
361 | 0 | b.AddColumn("key")->Type(INT64)->NotNull()->HashPrimaryKey(); |
362 | 0 | b.AddColumn("string_val")->Type(STRING)->NotNull(); |
363 | 0 | for (auto& col : Int32ColumnNames()) { |
364 | 0 | b.AddColumn(col)->Type(INT32)->NotNull(); |
365 | 0 | } |
366 | 0 | for (auto& col : Int64ColumnNames()) { |
367 | 0 | b.AddColumn(col)->Type(INT64)->NotNull(); |
368 | 0 | } |
369 | 0 | ASSERT_OK(reader_table_.Create(kTableName, CalcNumTablets(1), client_.get(), &b)); |
370 | 0 | schema_ = reader_table_.schema(); |
371 | 0 | } |
372 | | |
373 | 0 | void FullStackInsertScanTest::DoTestScans() { |
374 | 0 | LOG(INFO) << "Doing test scans on table of " << kNumRows << " rows."; |
375 | |
|
376 | 0 | auto stat = MakePerfStat(); |
377 | 0 | auto record = MakePerfRecord(); |
378 | 0 | if (stat) { |
379 | 0 | CHECK_OK(stat->Start()); |
380 | 0 | } |
381 | 0 | if (record) { |
382 | 0 | CHECK_OK(record->Start()); |
383 | 0 | } |
384 | 0 | ASSERT_NO_FATALS(ScanProjection(vector<string>(), "empty projection, 0 col")); |
385 | 0 | ASSERT_NO_FATALS(ScanProjection({ "key" }, "key scan, 1 col")); |
386 | 0 | ASSERT_NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, 10 col")); |
387 | 0 | ASSERT_NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col")); |
388 | 0 | ASSERT_NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col")); |
389 | 0 | ASSERT_NO_FATALS(ScanProjection(Int64ColumnNames(), "Int64 projection, 4 col")); |
390 | |
|
391 | 0 | ASSERT_NO_FATALS(InterruptNotNull(std::move(record))); |
392 | 0 | ASSERT_NO_FATALS(InterruptNotNull(std::move(stat))); |
393 | 0 | } |
394 | | |
395 | 0 | void FullStackInsertScanTest::FlushToDisk() { |
396 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
397 | 0 | tserver::TabletServer* ts = cluster_->mini_tablet_server(i)->server(); |
398 | 0 | ts->maintenance_manager()->Shutdown(); |
399 | 0 | auto peers = ts->tablet_manager()->GetTabletPeers(); |
400 | 0 | for (const std::shared_ptr<TabletPeer>& peer : peers) { |
401 | 0 | Tablet* tablet = peer->tablet(); |
402 | 0 | ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync)); |
403 | 0 | } |
404 | 0 | } |
405 | 0 | } |
406 | | |
407 | | void FullStackInsertScanTest::InsertRows(CountDownLatch* start_latch, int id, |
408 | 0 | uint32_t seed) { |
409 | 0 | Random rng(seed + id); |
410 | |
|
411 | 0 | start_latch->Wait(); |
412 | | // Retrieve id's session and table |
413 | 0 | std::shared_ptr<YBSession> session = sessions_[id]; |
414 | 0 | client::TableHandle* table = tables_[id].get(); |
415 | | // Identify start and end of keyrange id is responsible for |
416 | 0 | int64_t start = kNumInsertsPerClient * id; |
417 | 0 | int64_t end = start + kNumInsertsPerClient; |
418 | | // Printed id value is in the range 1..kNumInsertClients inclusive |
419 | 0 | ++id; |
420 | | // Prime the future as if it was running a batch (for for-loop code) |
421 | 0 | std::promise<client::FlushStatus> promise; |
422 | 0 | auto flush_status_future = promise.get_future(); |
423 | 0 | promise.set_value(client::FlushStatus()); |
424 | | // Maintain buffer for random string generation |
425 | 0 | char randstr[kRandomStrMaxLength + 1]; |
426 | | // Insert in the id's key range |
427 | 0 | for (int64_t key = start; key < end; ++key) { |
428 | 0 | auto op = table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
429 | 0 | RandomRow(&rng, op->mutable_request(), randstr, key, id, table); |
430 | 0 | session->Apply(op); |
431 | | |
432 | | // Report updates or flush every so often, using the synchronizer to always |
433 | | // start filling up the next batch while previous one is sent out. |
434 | 0 | if (key % kFlushEveryN == 0 || key == end - 1) { |
435 | 0 | auto flush_status = flush_status_future.get(); |
436 | 0 | if (!flush_status.status.ok()) { |
437 | 0 | LogSessionErrorsAndDie(flush_status); |
438 | 0 | } |
439 | 0 | flush_status_future = session->FlushFuture(); |
440 | 0 | } |
441 | 0 | ReportTenthDone(key, start, end, id, kNumInsertClients); |
442 | 0 | } |
443 | 0 | auto flush_status = flush_status_future.get(); |
444 | 0 | if (!flush_status.status.ok()) { |
445 | 0 | LogSessionErrorsAndDie(flush_status); |
446 | 0 | } |
447 | 0 | ReportAllDone(id, kNumInsertClients); |
448 | 0 | FlushSessionOrDie(session); |
449 | 0 | } |
450 | | |
451 | | void FullStackInsertScanTest::ScanProjection(const vector<string>& cols, |
452 | 0 | const string& msg) { |
453 | 0 | client::TableIteratorOptions options; |
454 | 0 | options.columns = cols; |
455 | 0 | size_t nrows; |
456 | 0 | LOG_TIMING(INFO, msg) { |
457 | 0 | nrows = boost::size(client::TableRange(reader_table_)); |
458 | 0 | } |
459 | 0 | ASSERT_EQ(nrows, kNumRows); |
460 | 0 | } |
461 | | |
462 | 0 | vector<string> FullStackInsertScanTest::AllColumnNames() const { |
463 | 0 | vector<string> ret; |
464 | 0 | for (size_t i = 0; i < schema_.num_columns(); i++) { |
465 | 0 | ret.push_back(schema_.Column(i).name()); |
466 | 0 | } |
467 | 0 | return ret; |
468 | 0 | } |
469 | | |
470 | | } // namespace tablet |
471 | | } // namespace yb |