/Users/deen/code/yugabyte-db/src/yb/integration-tests/test_workload.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/integration-tests/test_workload.h" |
34 | | |
35 | | #include <memory> |
36 | | #include <string> |
37 | | #include <unordered_set> |
38 | | #include <vector> |
39 | | |
40 | | #include "yb/client/client-test-util.h" |
41 | | #include "yb/client/client.h" |
42 | | #include "yb/client/error.h" |
43 | | #include "yb/client/schema.h" |
44 | | #include "yb/client/session.h" |
45 | | #include "yb/client/table_creator.h" |
46 | | #include "yb/client/table_handle.h" |
47 | | #include "yb/client/table_info.h" |
48 | | #include "yb/client/transaction.h" |
49 | | #include "yb/client/transaction_pool.h" |
50 | | #include "yb/client/yb_op.h" |
51 | | |
52 | | #include "yb/common/wire_protocol-test-util.h" |
53 | | |
54 | | #include "yb/gutil/casts.h" |
55 | | #include "yb/gutil/stl_util.h" |
56 | | #include "yb/gutil/strings/substitute.h" |
57 | | |
58 | | #include "yb/integration-tests/mini_cluster_base.h" |
59 | | |
60 | | #include "yb/master/master_util.h" |
61 | | |
62 | | #include "yb/util/env.h" |
63 | | #include "yb/util/monotime.h" |
64 | | #include "yb/util/random.h" |
65 | | #include "yb/util/random_util.h" |
66 | | #include "yb/util/status_log.h" |
67 | | #include "yb/util/thread.h" |
68 | | #include "yb/util/tsan_util.h" |
69 | | |
70 | | #include "yb/yql/cql/ql/util/statement_result.h" |
71 | | |
72 | | using namespace std::literals; |
73 | | |
74 | | namespace yb { |
75 | | |
76 | | using client::YBClient; |
77 | | using client::YBClientBuilder; |
78 | | using client::YBSchema; |
79 | | using client::YBSchemaBuilder; |
80 | | using client::YBSchemaFromSchema; |
81 | | using client::YBSession; |
82 | | using client::YBTable; |
83 | | using client::YBTableCreator; |
84 | | using client::YBTableType; |
85 | | using client::YBTableName; |
86 | | using std::shared_ptr; |
87 | | |
88 | | const YBTableName TestWorkloadOptions::kDefaultTableName( |
89 | | YQL_DATABASE_CQL, "my_keyspace", "test-workload"); |
90 | | |
91 | | class TestWorkload::State { |
92 | | public: |
93 | 57 | explicit State(MiniClusterBase* cluster) : cluster_(cluster) {} |
94 | | |
95 | | void Start(const TestWorkloadOptions& options); |
96 | | void Setup(YBTableType table_type, const TestWorkloadOptions& options); |
97 | | |
98 | 20 | void Stop() { |
99 | 20 | should_run_.store(false, std::memory_order_release); |
100 | 20 | start_latch_.Reset(0); |
101 | 20 | } |
102 | | |
103 | 20 | void Join() { |
104 | 161 | for (const auto& thr : threads_) { |
105 | 161 | CHECK_OK(ThreadJoiner(thr.get()).Join()); |
106 | 161 | } |
107 | 20 | threads_.clear(); |
108 | 20 | } |
109 | | |
110 | 0 | void set_transaction_pool(client::TransactionPool* pool) { |
111 | 0 | transaction_pool_ = pool; |
112 | 0 | } |
113 | | |
114 | 195 | int64_t rows_inserted() const { |
115 | 195 | return rows_inserted_.load(std::memory_order_acquire); |
116 | 195 | } |
117 | | |
118 | 0 | int64_t rows_insert_failed() const { |
119 | 0 | return rows_insert_failed_.load(std::memory_order_acquire); |
120 | 0 | } |
121 | | |
122 | 0 | int64_t rows_read_ok() const { |
123 | 0 | return rows_read_ok_.load(std::memory_order_acquire); |
124 | 0 | } |
125 | | |
126 | 0 | int64_t rows_read_empty() const { |
127 | 0 | return rows_read_empty_.load(std::memory_order_acquire); |
128 | 0 | } |
129 | | |
130 | 0 | int64_t rows_read_error() const { |
131 | 0 | return rows_read_error_.load(std::memory_order_acquire); |
132 | 0 | } |
133 | | |
134 | 0 | int64_t rows_read_try_again() const { |
135 | 0 | return rows_read_try_again_.load(std::memory_order_acquire); |
136 | 0 | } |
137 | | |
138 | 0 | int64_t batches_completed() const { |
139 | 0 | return batches_completed_.load(std::memory_order_acquire); |
140 | 0 | } |
141 | | |
142 | 1 | client::YBClient& client() const { |
143 | 1 | return *client_; |
144 | 1 | } |
145 | | |
146 | | private: |
147 | | CHECKED_STATUS Flush(client::YBSession* session, const TestWorkloadOptions& options); |
148 | | Result<client::YBTransactionPtr> MayBeStartNewTransaction( |
149 | | client::YBSession* session, const TestWorkloadOptions& options); |
150 | | Result<client::TableHandle> OpenTable(const TestWorkloadOptions& options); |
151 | | void WaitAllThreads(); |
152 | | void WriteThread(const TestWorkloadOptions& options); |
153 | | void ReadThread(const TestWorkloadOptions& options); |
154 | | |
155 | | MiniClusterBase* cluster_; |
156 | | std::unique_ptr<client::YBClient> client_; |
157 | | client::TransactionPool* transaction_pool_; |
158 | | CountDownLatch start_latch_{0}; |
159 | | std::atomic<bool> should_run_{false}; |
160 | | std::atomic<int64_t> pathological_one_row_counter_{0}; |
161 | | std::atomic<bool> pathological_one_row_inserted_{false}; |
162 | | std::atomic<int64_t> rows_inserted_{0}; |
163 | | std::atomic<int64_t> rows_insert_failed_{0}; |
164 | | std::atomic<int64_t> batches_completed_{0}; |
165 | | std::atomic<int32_t> next_key_{0}; |
166 | | std::atomic<int64_t> rows_read_ok_{0}; |
167 | | std::atomic<int64_t> rows_read_empty_{0}; |
168 | | std::atomic<int64_t> rows_read_error_{0}; |
169 | | std::atomic<int64_t> rows_read_try_again_{0}; |
170 | | |
171 | | // Invariant: if sequential_write and read_only_written_keys are set then |
172 | | // keys in [1 ... next_key_] and not in keys_in_write_progress_ are guaranteed to be written. |
173 | | std::mutex keys_in_write_progress_mutex_; |
174 | | std::set<int32_t> keys_in_write_progress_ GUARDED_BY(keys_in_write_progress_mutex_); |
175 | | |
176 | | std::vector<scoped_refptr<Thread> > threads_; |
177 | | }; |
178 | | |
179 | | TestWorkload::TestWorkload(MiniClusterBase* cluster) |
180 | 57 | : state_(new State(cluster)) {} |
181 | | |
182 | 12 | TestWorkload::~TestWorkload() { |
183 | 12 | StopAndJoin(); |
184 | 12 | } |
185 | | |
186 | | TestWorkload::TestWorkload(TestWorkload&& rhs) |
187 | 0 | : options_(rhs.options_), state_(std::move(rhs.state_)) {} |
188 | | |
189 | 0 | void TestWorkload::operator=(TestWorkload&& rhs) { |
190 | 0 | options_ = rhs.options_; |
191 | 0 | state_ = std::move(rhs.state_); |
192 | 0 | } |
193 | | |
194 | | Result<client::YBTransactionPtr> TestWorkload::State::MayBeStartNewTransaction( |
195 | 7.47k | client::YBSession* session, const TestWorkloadOptions& options) { |
196 | 7.47k | client::YBTransactionPtr txn; |
197 | 7.47k | if (options.is_transactional()) { |
198 | 0 | txn = VERIFY_RESULT(transaction_pool_->TakeAndInit( |
199 | 0 | options.isolation_level, TransactionRpcDeadline())); |
200 | 0 | session->SetTransaction(txn); |
201 | 0 | } |
202 | 7.47k | return txn; |
203 | 7.47k | } |
204 | | |
205 | 162 | Result<client::TableHandle> TestWorkload::State::OpenTable(const TestWorkloadOptions& options) { |
206 | 162 | client::TableHandle table; |
207 | | |
208 | | // Loop trying to open up the table. In some tests we set up very |
209 | | // low RPC timeouts to test those behaviors, so this might fail and |
210 | | // need retrying. |
211 | 162 | for(;;) { |
212 | 161 | auto s = table.Open(options.table_name, client_.get()); |
213 | 161 | if (s.ok()) { |
214 | 158 | return table; |
215 | 158 | } |
216 | 3 | if (!should_run_.load(std::memory_order_acquire)) { |
217 | 0 | LOG(ERROR) << "Failed to open table: " << s; |
218 | 0 | return s; |
219 | 0 | } |
220 | 3 | if (options.timeout_allowed && s.IsTimedOut()0 ) { |
221 | 0 | SleepFor(MonoDelta::FromMilliseconds(50)); |
222 | 0 | continue; |
223 | 0 | } |
224 | 3 | LOG(FATAL) << "Failed to open table: " << s; |
225 | 3 | return s; |
226 | 3 | } |
227 | 162 | } |
228 | | |
229 | 158 | void TestWorkload::State::WaitAllThreads() { |
230 | | // Wait for all of the workload threads to be ready to go. This maximizes the chance |
231 | | // that they all send a flood of requests at exactly the same time. |
232 | | // |
233 | | // This also minimizes the chance that we see failures to call OpenTable() if |
234 | | // a late-starting thread overlaps with the flood of outbound traffic from the |
235 | | // ones that are already writing data. |
236 | 158 | start_latch_.CountDown(); |
237 | 158 | start_latch_.Wait(); |
238 | 158 | } |
239 | | |
240 | 165 | void TestWorkload::State::WriteThread(const TestWorkloadOptions& options) { |
241 | 3.44M | auto next_random = [rng = &ThreadLocalRandom()] { |
242 | 3.44M | return RandomUniformInt<int32_t>(rng); |
243 | 3.44M | }; |
244 | | |
245 | 165 | auto table_result = OpenTable(options); |
246 | 165 | if (!table_result.ok()) { |
247 | 0 | return; |
248 | 0 | } |
249 | 165 | auto table = *table_result; |
250 | | |
251 | 165 | shared_ptr<YBSession> session = client_->NewSession(); |
252 | 165 | session->SetTimeout(options.write_timeout); |
253 | | |
254 | 165 | WaitAllThreads(); |
255 | | |
256 | 165 | std::string test_payload("hello world"); |
257 | 165 | if (options.payload_bytes != 11) { |
258 | | // We fill with zeros if you change the default. |
259 | 8 | test_payload.assign(options.payload_bytes, '0'); |
260 | 8 | } |
261 | | |
262 | 165 | bool inserting_one_row = false; |
263 | 165 | std::vector<client::YBqlWriteOpPtr> retry_ops; |
264 | 7.63k | for (;;) { |
265 | 7.63k | const auto should_run = should_run_.load(std::memory_order_acquire); |
266 | 7.63k | if (!should_run) { |
267 | 161 | if (options.sequential_write && options.read_only_written_keys12 ) { |
268 | | // In this case we want to complete writing of keys_in_write_progress_, so we don't have |
269 | | // gaps after workload is stopped. |
270 | 0 | std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_); |
271 | 0 | if (keys_in_write_progress_.empty()) { |
272 | 0 | break; |
273 | 0 | } |
274 | 161 | } else { |
275 | 161 | break; |
276 | 161 | } |
277 | 161 | } |
278 | 7.47k | auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options)); |
279 | 7.47k | std::vector<client::YBqlWriteOpPtr> ops; |
280 | 7.47k | ops.swap(retry_ops); |
281 | 7.47k | const auto num_more_keys_to_insert = should_run ? options.write_batch_size - ops.size()7.46k : 07 ; |
282 | 1.72M | for (size_t i = 0; i < num_more_keys_to_insert; i++1.72M ) { |
283 | 1.72M | if (options.pathological_one_row_enabled) { |
284 | 0 | if (!pathological_one_row_inserted_) { |
285 | 0 | if (++pathological_one_row_counter_ != 1) { |
286 | 0 | continue; |
287 | 0 | } |
288 | 0 | } else { |
289 | 0 | inserting_one_row = true; |
290 | 0 | auto update = table.NewUpdateOp(); |
291 | 0 | auto req = update->mutable_request(); |
292 | 0 | QLAddInt32HashValue(req, 0); |
293 | 0 | table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random()); |
294 | 0 | if (options.ttl >= 0) { |
295 | 0 | req->set_ttl(options.ttl * MonoTime::kMillisecondsPerSecond); |
296 | 0 | } |
297 | 0 | ops.push_back(update); |
298 | 0 | session->Apply(update); |
299 | 0 | break; |
300 | 0 | } |
301 | 0 | } |
302 | 1.72M | auto insert = table.NewInsertOp(); |
303 | 1.72M | auto req = insert->mutable_request(); |
304 | 1.72M | int32_t key; |
305 | 1.72M | if (options.sequential_write) { |
306 | 1.14k | if (options.read_only_written_keys) { |
307 | 0 | std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_); |
308 | 0 | key = ++next_key_; |
309 | 0 | keys_in_write_progress_.insert(key); |
310 | 1.14k | } else { |
311 | 1.14k | key = ++next_key_; |
312 | 1.14k | } |
313 | 1.72M | } else { |
314 | 1.72M | key = options.pathological_one_row_enabled ? 00 : next_random(); |
315 | 1.72M | } |
316 | 1.72M | QLAddInt32HashValue(req, key); |
317 | 1.72M | table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random()); |
318 | 1.72M | table.AddStringColumnValue(req, table.schema().columns()[2].name(), test_payload); |
319 | 1.72M | if (options.ttl >= 0) { |
320 | 0 | req->set_ttl(options.ttl); |
321 | 0 | } |
322 | 1.72M | ops.push_back(insert); |
323 | 1.72M | } |
324 | | |
325 | 1.72M | for (const auto& op : ops) { |
326 | 1.72M | session->Apply(op); |
327 | 1.72M | } |
328 | | |
329 | 7.47k | const auto flush_status = session->FlushAndGetOpsErrors(); |
330 | 7.47k | if (!flush_status.status.ok()) { |
331 | 239 | VLOG(1) << "Flush error: " << AsString(flush_status.status)0 ; |
332 | 10.3k | for (const auto& error : flush_status.errors) { |
333 | 10.3k | auto* resp = down_cast<client::YBqlOp*>(&error->failed_op())->mutable_response(); |
334 | 10.3k | resp->Clear(); |
335 | 10.3k | resp->set_status( |
336 | 10.3k | error->status().IsTryAgain() ? QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR0 |
337 | 10.3k | : QLResponsePB::YQL_STATUS_RUNTIME_ERROR); |
338 | 10.3k | resp->set_error_message(error->status().message().ToBuffer()); |
339 | 10.3k | } |
340 | 239 | } |
341 | 7.47k | if (txn) { |
342 | 0 | CHECK_OK(txn->CommitFuture().get()); |
343 | 0 | } |
344 | | |
345 | 7.47k | int inserted = 0; |
346 | 1.72M | for (const auto& op : ops) { |
347 | 1.72M | if (op->response().status() == QLResponsePB::YQL_STATUS_OK) { |
348 | 1.71M | VLOG(2) << "Op succeeded: " << op->ToString()3 ; |
349 | 1.71M | if (options.read_only_written_keys) { |
350 | 0 | std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_); |
351 | 0 | keys_in_write_progress_.erase( |
352 | 0 | op->request().hashed_column_values(0).value().int32_value()); |
353 | 0 | } |
354 | 1.71M | ++inserted; |
355 | 1.71M | } else if ( |
356 | 10.5k | options.retry_on_restart_required_error && |
357 | 10.5k | op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR0 ) { |
358 | 0 | VLOG(1) << "Op restart required: " << op->ToString() << ": " |
359 | 0 | << op->response().ShortDebugString(); |
360 | 0 | auto retry_op = table.NewInsertOp(); |
361 | 0 | *retry_op->mutable_request() = op->request(); |
362 | 0 | retry_ops.push_back(retry_op); |
363 | 10.5k | } else if (10.5k options.insert_failures_allowed10.5k ) { |
364 | 10.5k | VLOG(1) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString()1 ; |
365 | 10.5k | ++rows_insert_failed_; |
366 | 18.4E | } else { |
367 | 18.4E | LOG(FATAL) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString(); |
368 | 18.4E | } |
369 | 1.72M | } |
370 | | |
371 | 7.47k | rows_inserted_.fetch_add(inserted, std::memory_order_acq_rel); |
372 | 7.47k | if (inserted > 0) { |
373 | 7.23k | batches_completed_.fetch_add(1, std::memory_order_acq_rel); |
374 | 7.23k | } |
375 | 7.47k | if (inserting_one_row && inserted <= 00 ) { |
376 | 0 | pathological_one_row_counter_ = 0; |
377 | 0 | } |
378 | 7.47k | if (PREDICT_FALSE(options.write_interval_millis > 0)) { |
379 | 0 | SleepFor(MonoDelta::FromMilliseconds(options.write_interval_millis)); |
380 | 0 | } |
381 | 7.47k | } |
382 | 165 | } |
383 | | |
384 | 0 | void TestWorkload::State::ReadThread(const TestWorkloadOptions& options) { |
385 | 0 | Random r(narrow_cast<uint32_t>(Env::Default()->gettid())); |
386 | |
|
387 | 0 | auto table_result = OpenTable(options); |
388 | 0 | if (!table_result.ok()) { |
389 | 0 | return; |
390 | 0 | } |
391 | 0 | auto table = *table_result; |
392 | |
|
393 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
394 | 0 | session->SetTimeout(options.default_rpc_timeout); |
395 | |
|
396 | 0 | WaitAllThreads(); |
397 | |
|
398 | 0 | while (should_run_.load(std::memory_order_acquire)) { |
399 | 0 | auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options)); |
400 | 0 | auto op = table.NewReadOp(); |
401 | 0 | auto req = op->mutable_request(); |
402 | 0 | const int32_t next_key = next_key_; |
403 | 0 | int32_t key; |
404 | 0 | if (options.sequential_write) { |
405 | 0 | if (next_key == 0) { |
406 | 0 | std::this_thread::sleep_for(100ms); |
407 | 0 | continue; |
408 | 0 | } |
409 | 0 | for (;;) { |
410 | 0 | key = 1 + r.Uniform(next_key); |
411 | 0 | if (!options.read_only_written_keys) { |
412 | 0 | break; |
413 | 0 | } |
414 | 0 | std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_); |
415 | 0 | if (keys_in_write_progress_.count(key) == 0) { |
416 | 0 | break; |
417 | 0 | } |
418 | 0 | } |
419 | 0 | } else { |
420 | 0 | key = r.Next(); |
421 | 0 | } |
422 | 0 | QLAddInt32HashValue(req, key); |
423 | 0 | session->Apply(op); |
424 | 0 | const auto flush_status = session->FlushAndGetOpsErrors(); |
425 | 0 | const auto& s = flush_status.status; |
426 | 0 | if (s.ok()) { |
427 | 0 | if (op->response().status() == QLResponsePB::YQL_STATUS_OK) { |
428 | 0 | ++rows_read_ok_; |
429 | 0 | if (ql::RowsResult(op.get()).GetRowBlock()->row_count() == 0) { |
430 | 0 | ++rows_read_empty_; |
431 | 0 | if (options.read_only_written_keys) { |
432 | 0 | LOG(ERROR) << "Got empty result for key: " << key << " next_key: " << next_key; |
433 | 0 | } |
434 | 0 | } |
435 | 0 | } else { |
436 | 0 | ++rows_read_error_; |
437 | 0 | } |
438 | 0 | } else { |
439 | 0 | if (s.IsTryAgain()) { |
440 | 0 | ++rows_read_try_again_; |
441 | 0 | LOG(INFO) << s; |
442 | 0 | } else { |
443 | 0 | LOG(FATAL) << s; |
444 | 0 | } |
445 | 0 | } |
446 | 0 | if (txn) { |
447 | 0 | CHECK_OK(txn->CommitFuture().get()); |
448 | 0 | } |
449 | 0 | } |
450 | 0 | } |
451 | | |
452 | 57 | void TestWorkload::Setup(YBTableType table_type) { |
453 | 57 | state_->Setup(table_type, options_); |
454 | 57 | } |
455 | | |
456 | | void TestWorkload::set_transactional( |
457 | 0 | IsolationLevel isolation_level, client::TransactionPool* pool) { |
458 | 0 | options_.isolation_level = isolation_level; |
459 | 0 | state_->set_transaction_pool(pool); |
460 | 0 | } |
461 | | |
462 | | |
463 | 57 | void TestWorkload::State::Setup(YBTableType table_type, const TestWorkloadOptions& options) { |
464 | 57 | client::YBClientBuilder client_builder; |
465 | 57 | client_builder.default_rpc_timeout(options.default_rpc_timeout); |
466 | 57 | client_ = CHECK_RESULT(cluster_->CreateClient(&client_builder)); |
467 | 57 | CHECK_OK(client_->CreateNamespaceIfNotExists( |
468 | 57 | options.table_name.namespace_name(), |
469 | 57 | master::GetDatabaseTypeForTable(client::ClientToPBTableType(table_type)))); |
470 | | |
471 | | // Retry YBClient::TableExists() until we make that call retry reliably. |
472 | | // See KUDU-1074. |
473 | 57 | MonoTime deadline(MonoTime::Now()); |
474 | 57 | deadline.AddDelta(MonoDelta::FromSeconds(10)); |
475 | 57 | Result<bool> table_exists(false); |
476 | 57 | while (true) { |
477 | 15 | table_exists = client_->TableExists(options.table_name); |
478 | 15 | if (table_exists.ok() || deadline.ComesBefore(MonoTime::Now())0 ) break; |
479 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
480 | 0 | } |
481 | 57 | CHECK_OK(table_exists); |
482 | | |
483 | 57 | if (!table_exists.get()) { |
484 | 15 | auto schema = GetSimpleTestSchema(); |
485 | 15 | schema.SetTransactional(options.is_transactional()); |
486 | 15 | if (options.has_table_ttl()) { |
487 | 0 | schema.SetDefaultTimeToLive( |
488 | 0 | options.table_ttl * MonoTime::kMillisecondsPerSecond); |
489 | 0 | } |
490 | 15 | YBSchema client_schema(YBSchemaFromSchema(schema)); |
491 | | |
492 | 15 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
493 | 15 | CHECK_OK(table_creator->table_name(options.table_name) |
494 | 15 | .schema(&client_schema) |
495 | 15 | .num_tablets(options.num_tablets) |
496 | | // NOTE: this is quite high as a timeout, but the default (5 sec) does not |
497 | | // seem to be high enough in some cases (see KUDU-550). We should remove |
498 | | // this once that ticket is addressed. |
499 | 15 | .timeout(MonoDelta::FromSeconds(NonTsanVsTsan(20, 60))) |
500 | 15 | .table_type(table_type) |
501 | 15 | .Create()); |
502 | 42 | } else { |
503 | 42 | LOG(INFO) << "TestWorkload: Skipping table creation because table " |
504 | 42 | << options.table_name.ToString() << " already exists"; |
505 | 42 | } |
506 | 57 | } |
507 | | |
508 | 11 | void TestWorkload::Start() { |
509 | 11 | state_->Start(options_); |
510 | 11 | } |
511 | | |
512 | 11 | void TestWorkload::State::Start(const TestWorkloadOptions& options) { |
513 | 11 | bool expected = false; |
514 | 11 | should_run_.compare_exchange_strong(expected, true, std::memory_order_acq_rel); |
515 | 11 | CHECK(!expected) << "Already started"0 ; |
516 | 11 | should_run_.store(true, std::memory_order_release); |
517 | 11 | start_latch_.Reset(options.num_write_threads + options.num_read_threads); |
518 | 176 | for (int i = 0; i < options.num_write_threads; ++i165 ) { |
519 | 165 | scoped_refptr<yb::Thread> new_thread; |
520 | 165 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-writer-$0", i), |
521 | 165 | &State::WriteThread, this, options, &new_thread)); |
522 | 165 | threads_.push_back(new_thread); |
523 | 165 | } |
524 | 11 | for (int i = 0; i < options.num_read_threads; ++i0 ) { |
525 | 0 | scoped_refptr<yb::Thread> new_thread; |
526 | 0 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-reader-$0", i), |
527 | 0 | &State::ReadThread, this, options, &new_thread)); |
528 | 0 | threads_.push_back(new_thread); |
529 | 0 | } |
530 | 11 | } |
531 | | |
532 | 35 | void TestWorkload::Stop() { |
533 | 35 | state_->Stop(); |
534 | 35 | } |
535 | | |
536 | 20 | void TestWorkload::Join() { |
537 | 20 | state_->Join(); |
538 | 20 | } |
539 | | |
540 | 35 | void TestWorkload::StopAndJoin() { |
541 | 35 | Stop(); |
542 | 35 | Join(); |
543 | 35 | } |
544 | | |
545 | 1 | void TestWorkload::WaitInserted(int64_t required) { |
546 | 2 | while (rows_inserted() < required) { |
547 | 1 | std::this_thread::sleep_for(100ms); |
548 | 1 | } |
549 | 1 | } |
550 | | |
551 | 195 | int64_t TestWorkload::rows_inserted() const { |
552 | 195 | return state_->rows_inserted(); |
553 | 195 | } |
554 | | |
555 | 0 | int64_t TestWorkload::rows_insert_failed() const { |
556 | 0 | return state_->rows_insert_failed(); |
557 | 0 | } |
558 | | |
559 | 0 | int64_t TestWorkload::rows_read_ok() const { |
560 | 0 | return state_->rows_read_ok(); |
561 | 0 | } |
562 | | |
563 | 0 | int64_t TestWorkload::rows_read_empty() const { |
564 | 0 | return state_->rows_read_empty(); |
565 | 0 | } |
566 | | |
567 | 0 | int64_t TestWorkload::rows_read_error() const { |
568 | 0 | return state_->rows_read_error(); |
569 | 0 | } |
570 | | |
571 | 0 | int64_t TestWorkload::rows_read_try_again() const { |
572 | 0 | return state_->rows_read_try_again(); |
573 | 0 | } |
574 | | |
575 | 0 | int64_t TestWorkload::batches_completed() const { |
576 | 0 | return state_->batches_completed(); |
577 | 0 | } |
578 | | |
579 | 1 | client::YBClient& TestWorkload::client() const { |
580 | 1 | return state_->client(); |
581 | 1 | } |
582 | | |
583 | | |
584 | | } // namespace yb |