YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/integration-tests/test_workload.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/test_workload.h"
34
35
#include <memory>
36
#include <string>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "yb/client/client-test-util.h"
41
#include "yb/client/client.h"
42
#include "yb/client/error.h"
43
#include "yb/client/schema.h"
44
#include "yb/client/session.h"
45
#include "yb/client/table_creator.h"
46
#include "yb/client/table_handle.h"
47
#include "yb/client/table_info.h"
48
#include "yb/client/transaction.h"
49
#include "yb/client/transaction_pool.h"
50
#include "yb/client/yb_op.h"
51
52
#include "yb/common/wire_protocol-test-util.h"
53
54
#include "yb/gutil/casts.h"
55
#include "yb/gutil/stl_util.h"
56
#include "yb/gutil/strings/substitute.h"
57
58
#include "yb/integration-tests/mini_cluster_base.h"
59
60
#include "yb/master/master_util.h"
61
62
#include "yb/util/env.h"
63
#include "yb/util/monotime.h"
64
#include "yb/util/random.h"
65
#include "yb/util/random_util.h"
66
#include "yb/util/status_log.h"
67
#include "yb/util/thread.h"
68
#include "yb/util/tsan_util.h"
69
70
#include "yb/yql/cql/ql/util/statement_result.h"
71
72
using namespace std::literals;
73
74
namespace yb {
75
76
using client::YBClient;
77
using client::YBClientBuilder;
78
using client::YBSchema;
79
using client::YBSchemaBuilder;
80
using client::YBSchemaFromSchema;
81
using client::YBSession;
82
using client::YBTable;
83
using client::YBTableCreator;
84
using client::YBTableType;
85
using client::YBTableName;
86
using std::shared_ptr;
87
88
const YBTableName TestWorkloadOptions::kDefaultTableName(
89
    YQL_DATABASE_CQL, "my_keyspace", "test-workload");
90
91
class TestWorkload::State {
92
 public:
93
57
  explicit State(MiniClusterBase* cluster) : cluster_(cluster) {}
94
95
  void Start(const TestWorkloadOptions& options);
96
  void Setup(YBTableType table_type, const TestWorkloadOptions& options);
97
98
20
  void Stop() {
99
20
    should_run_.store(false, std::memory_order_release);
100
20
    start_latch_.Reset(0);
101
20
  }
102
103
20
  void Join() {
104
161
    for (const auto& thr : threads_) {
105
161
      CHECK_OK(ThreadJoiner(thr.get()).Join());
106
161
    }
107
20
    threads_.clear();
108
20
  }
109
110
0
  void set_transaction_pool(client::TransactionPool* pool) {
111
0
    transaction_pool_ = pool;
112
0
  }
113
114
195
  int64_t rows_inserted() const {
115
195
    return rows_inserted_.load(std::memory_order_acquire);
116
195
  }
117
118
0
  int64_t rows_insert_failed() const {
119
0
    return rows_insert_failed_.load(std::memory_order_acquire);
120
0
  }
121
122
0
  int64_t rows_read_ok() const {
123
0
    return rows_read_ok_.load(std::memory_order_acquire);
124
0
  }
125
126
0
  int64_t rows_read_empty() const {
127
0
    return rows_read_empty_.load(std::memory_order_acquire);
128
0
  }
129
130
0
  int64_t rows_read_error() const {
131
0
    return rows_read_error_.load(std::memory_order_acquire);
132
0
  }
133
134
0
  int64_t rows_read_try_again() const {
135
0
    return rows_read_try_again_.load(std::memory_order_acquire);
136
0
  }
137
138
0
  int64_t batches_completed() const {
139
0
    return batches_completed_.load(std::memory_order_acquire);
140
0
  }
141
142
1
  client::YBClient& client() const {
143
1
    return *client_;
144
1
  }
145
146
 private:
147
  CHECKED_STATUS Flush(client::YBSession* session, const TestWorkloadOptions& options);
148
  Result<client::YBTransactionPtr> MayBeStartNewTransaction(
149
      client::YBSession* session, const TestWorkloadOptions& options);
150
  Result<client::TableHandle> OpenTable(const TestWorkloadOptions& options);
151
  void WaitAllThreads();
152
  void WriteThread(const TestWorkloadOptions& options);
153
  void ReadThread(const TestWorkloadOptions& options);
154
155
  MiniClusterBase* cluster_;
156
  std::unique_ptr<client::YBClient> client_;
157
  client::TransactionPool* transaction_pool_;
158
  CountDownLatch start_latch_{0};
159
  std::atomic<bool> should_run_{false};
160
  std::atomic<int64_t> pathological_one_row_counter_{0};
161
  std::atomic<bool> pathological_one_row_inserted_{false};
162
  std::atomic<int64_t> rows_inserted_{0};
163
  std::atomic<int64_t> rows_insert_failed_{0};
164
  std::atomic<int64_t> batches_completed_{0};
165
  std::atomic<int32_t> next_key_{0};
166
  std::atomic<int64_t> rows_read_ok_{0};
167
  std::atomic<int64_t> rows_read_empty_{0};
168
  std::atomic<int64_t> rows_read_error_{0};
169
  std::atomic<int64_t> rows_read_try_again_{0};
170
171
  // Invariant: if sequential_write and read_only_written_keys are set then
172
  // keys in [1 ... next_key_] and not in keys_in_write_progress_ are guaranteed to be written.
173
  std::mutex keys_in_write_progress_mutex_;
174
  std::set<int32_t> keys_in_write_progress_ GUARDED_BY(keys_in_write_progress_mutex_);
175
176
  std::vector<scoped_refptr<Thread> > threads_;
177
};
178
179
TestWorkload::TestWorkload(MiniClusterBase* cluster)
180
57
  : state_(new State(cluster)) {}
181
182
12
TestWorkload::~TestWorkload() {
183
12
  StopAndJoin();
184
12
}
185
186
TestWorkload::TestWorkload(TestWorkload&& rhs)
187
0
    : options_(rhs.options_), state_(std::move(rhs.state_)) {}
188
189
0
void TestWorkload::operator=(TestWorkload&& rhs) {
190
0
  options_ = rhs.options_;
191
0
  state_ = std::move(rhs.state_);
192
0
}
193
194
Result<client::YBTransactionPtr> TestWorkload::State::MayBeStartNewTransaction(
195
7.47k
    client::YBSession* session, const TestWorkloadOptions& options) {
196
7.47k
  client::YBTransactionPtr txn;
197
7.47k
  if (options.is_transactional()) {
198
0
    txn = VERIFY_RESULT(transaction_pool_->TakeAndInit(
199
0
        options.isolation_level, TransactionRpcDeadline()));
200
0
    session->SetTransaction(txn);
201
0
  }
202
7.47k
  return txn;
203
7.47k
}
204
205
162
Result<client::TableHandle> TestWorkload::State::OpenTable(const TestWorkloadOptions& options) {
206
162
  client::TableHandle table;
207
208
  // Loop trying to open up the table. In some tests we set up very
209
  // low RPC timeouts to test those behaviors, so this might fail and
210
  // need retrying.
211
162
  for(;;) {
212
161
    auto s = table.Open(options.table_name, client_.get());
213
161
    if (s.ok()) {
214
158
      return table;
215
158
    }
216
3
    if (!should_run_.load(std::memory_order_acquire)) {
217
0
      LOG(ERROR) << "Failed to open table: " << s;
218
0
      return s;
219
0
    }
220
3
    if (options.timeout_allowed && 
s.IsTimedOut()0
) {
221
0
      SleepFor(MonoDelta::FromMilliseconds(50));
222
0
      continue;
223
0
    }
224
3
    LOG(FATAL) << "Failed to open table: " << s;
225
3
    return s;
226
3
  }
227
162
}
228
229
158
void TestWorkload::State::WaitAllThreads() {
230
  // Wait for all of the workload threads to be ready to go. This maximizes the chance
231
  // that they all send a flood of requests at exactly the same time.
232
  //
233
  // This also minimizes the chance that we see failures to call OpenTable() if
234
  // a late-starting thread overlaps with the flood of outbound traffic from the
235
  // ones that are already writing data.
236
158
  start_latch_.CountDown();
237
158
  start_latch_.Wait();
238
158
}
239
240
165
void TestWorkload::State::WriteThread(const TestWorkloadOptions& options) {
241
3.44M
  auto next_random = [rng = &ThreadLocalRandom()] {
242
3.44M
    return RandomUniformInt<int32_t>(rng);
243
3.44M
  };
244
245
165
  auto table_result = OpenTable(options);
246
165
  if (!table_result.ok()) {
247
0
    return;
248
0
  }
249
165
  auto table = *table_result;
250
251
165
  shared_ptr<YBSession> session = client_->NewSession();
252
165
  session->SetTimeout(options.write_timeout);
253
254
165
  WaitAllThreads();
255
256
165
  std::string test_payload("hello world");
257
165
  if (options.payload_bytes != 11) {
258
    // We fill with zeros if you change the default.
259
8
    test_payload.assign(options.payload_bytes, '0');
260
8
  }
261
262
165
  bool inserting_one_row = false;
263
165
  std::vector<client::YBqlWriteOpPtr> retry_ops;
264
7.63k
  for (;;) {
265
7.63k
    const auto should_run = should_run_.load(std::memory_order_acquire);
266
7.63k
    if (!should_run) {
267
161
      if (options.sequential_write && 
options.read_only_written_keys12
) {
268
        // In this case we want to complete writing of keys_in_write_progress_, so we don't have
269
        // gaps after workload is stopped.
270
0
        std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
271
0
        if (keys_in_write_progress_.empty()) {
272
0
          break;
273
0
        }
274
161
      } else {
275
161
        break;
276
161
      }
277
161
    }
278
7.47k
    auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options));
279
7.47k
    std::vector<client::YBqlWriteOpPtr> ops;
280
7.47k
    ops.swap(retry_ops);
281
7.47k
    const auto num_more_keys_to_insert = should_run ? 
options.write_batch_size - ops.size()7.46k
:
07
;
282
1.72M
    for (size_t i = 0; i < num_more_keys_to_insert; 
i++1.72M
) {
283
1.72M
      if (options.pathological_one_row_enabled) {
284
0
        if (!pathological_one_row_inserted_) {
285
0
          if (++pathological_one_row_counter_ != 1) {
286
0
            continue;
287
0
          }
288
0
        } else {
289
0
          inserting_one_row = true;
290
0
          auto update = table.NewUpdateOp();
291
0
          auto req = update->mutable_request();
292
0
          QLAddInt32HashValue(req, 0);
293
0
          table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random());
294
0
          if (options.ttl >= 0) {
295
0
            req->set_ttl(options.ttl * MonoTime::kMillisecondsPerSecond);
296
0
          }
297
0
          ops.push_back(update);
298
0
          session->Apply(update);
299
0
          break;
300
0
        }
301
0
      }
302
1.72M
      auto insert = table.NewInsertOp();
303
1.72M
      auto req = insert->mutable_request();
304
1.72M
      int32_t key;
305
1.72M
      if (options.sequential_write) {
306
1.14k
        if (options.read_only_written_keys) {
307
0
          std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
308
0
          key = ++next_key_;
309
0
          keys_in_write_progress_.insert(key);
310
1.14k
        } else {
311
1.14k
          key = ++next_key_;
312
1.14k
        }
313
1.72M
      } else {
314
1.72M
        key = options.pathological_one_row_enabled ? 
00
: next_random();
315
1.72M
      }
316
1.72M
      QLAddInt32HashValue(req, key);
317
1.72M
      table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random());
318
1.72M
      table.AddStringColumnValue(req, table.schema().columns()[2].name(), test_payload);
319
1.72M
      if (options.ttl >= 0) {
320
0
        req->set_ttl(options.ttl);
321
0
      }
322
1.72M
      ops.push_back(insert);
323
1.72M
    }
324
325
1.72M
    for (const auto& op : ops) {
326
1.72M
      session->Apply(op);
327
1.72M
    }
328
329
7.47k
    const auto flush_status = session->FlushAndGetOpsErrors();
330
7.47k
    if (!flush_status.status.ok()) {
331
239
      VLOG
(1) << "Flush error: " << AsString(flush_status.status)0
;
332
10.3k
      for (const auto& error : flush_status.errors) {
333
10.3k
        auto* resp = down_cast<client::YBqlOp*>(&error->failed_op())->mutable_response();
334
10.3k
        resp->Clear();
335
10.3k
        resp->set_status(
336
10.3k
            error->status().IsTryAgain() ? 
QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR0
337
10.3k
                                         : QLResponsePB::YQL_STATUS_RUNTIME_ERROR);
338
10.3k
        resp->set_error_message(error->status().message().ToBuffer());
339
10.3k
      }
340
239
    }
341
7.47k
    if (txn) {
342
0
      CHECK_OK(txn->CommitFuture().get());
343
0
    }
344
345
7.47k
    int inserted = 0;
346
1.72M
    for (const auto& op : ops) {
347
1.72M
      if (op->response().status() == QLResponsePB::YQL_STATUS_OK) {
348
1.71M
        VLOG
(2) << "Op succeeded: " << op->ToString()3
;
349
1.71M
        if (options.read_only_written_keys) {
350
0
          std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
351
0
          keys_in_write_progress_.erase(
352
0
              op->request().hashed_column_values(0).value().int32_value());
353
0
        }
354
1.71M
        ++inserted;
355
1.71M
      } else if (
356
10.5k
          options.retry_on_restart_required_error &&
357
10.5k
          
op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR0
) {
358
0
        VLOG(1) << "Op restart required: " << op->ToString() << ": "
359
0
                << op->response().ShortDebugString();
360
0
        auto retry_op = table.NewInsertOp();
361
0
        *retry_op->mutable_request() = op->request();
362
0
        retry_ops.push_back(retry_op);
363
10.5k
      } else 
if (10.5k
options.insert_failures_allowed10.5k
) {
364
10.5k
        VLOG
(1) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString()1
;
365
10.5k
        ++rows_insert_failed_;
366
18.4E
      } else {
367
18.4E
        LOG(FATAL) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString();
368
18.4E
      }
369
1.72M
    }
370
371
7.47k
    rows_inserted_.fetch_add(inserted, std::memory_order_acq_rel);
372
7.47k
    if (inserted > 0) {
373
7.23k
      batches_completed_.fetch_add(1, std::memory_order_acq_rel);
374
7.23k
    }
375
7.47k
    if (inserting_one_row && 
inserted <= 00
) {
376
0
      pathological_one_row_counter_ = 0;
377
0
    }
378
7.47k
    if (PREDICT_FALSE(options.write_interval_millis > 0)) {
379
0
      SleepFor(MonoDelta::FromMilliseconds(options.write_interval_millis));
380
0
    }
381
7.47k
  }
382
165
}
383
384
0
void TestWorkload::State::ReadThread(const TestWorkloadOptions& options) {
385
0
  Random r(narrow_cast<uint32_t>(Env::Default()->gettid()));
386
387
0
  auto table_result = OpenTable(options);
388
0
  if (!table_result.ok()) {
389
0
    return;
390
0
  }
391
0
  auto table = *table_result;
392
393
0
  shared_ptr<YBSession> session = client_->NewSession();
394
0
  session->SetTimeout(options.default_rpc_timeout);
395
396
0
  WaitAllThreads();
397
398
0
  while (should_run_.load(std::memory_order_acquire)) {
399
0
    auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options));
400
0
    auto op = table.NewReadOp();
401
0
    auto req = op->mutable_request();
402
0
    const int32_t next_key = next_key_;
403
0
    int32_t key;
404
0
    if (options.sequential_write) {
405
0
      if (next_key == 0) {
406
0
        std::this_thread::sleep_for(100ms);
407
0
        continue;
408
0
      }
409
0
      for (;;) {
410
0
        key = 1 + r.Uniform(next_key);
411
0
        if (!options.read_only_written_keys) {
412
0
          break;
413
0
        }
414
0
        std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
415
0
        if (keys_in_write_progress_.count(key) == 0) {
416
0
          break;
417
0
        }
418
0
      }
419
0
    } else {
420
0
      key = r.Next();
421
0
    }
422
0
    QLAddInt32HashValue(req, key);
423
0
    session->Apply(op);
424
0
    const auto flush_status = session->FlushAndGetOpsErrors();
425
0
    const auto& s = flush_status.status;
426
0
    if (s.ok()) {
427
0
      if (op->response().status() == QLResponsePB::YQL_STATUS_OK) {
428
0
        ++rows_read_ok_;
429
0
        if (ql::RowsResult(op.get()).GetRowBlock()->row_count() == 0) {
430
0
          ++rows_read_empty_;
431
0
          if (options.read_only_written_keys) {
432
0
            LOG(ERROR) << "Got empty result for key: " << key << " next_key: " << next_key;
433
0
          }
434
0
        }
435
0
      } else {
436
0
        ++rows_read_error_;
437
0
      }
438
0
    } else {
439
0
      if (s.IsTryAgain()) {
440
0
        ++rows_read_try_again_;
441
0
        LOG(INFO) << s;
442
0
      } else {
443
0
        LOG(FATAL) << s;
444
0
      }
445
0
    }
446
0
    if (txn) {
447
0
      CHECK_OK(txn->CommitFuture().get());
448
0
    }
449
0
  }
450
0
}
451
452
57
void TestWorkload::Setup(YBTableType table_type) {
453
57
  state_->Setup(table_type, options_);
454
57
}
455
456
void TestWorkload::set_transactional(
457
0
    IsolationLevel isolation_level, client::TransactionPool* pool) {
458
0
  options_.isolation_level = isolation_level;
459
0
  state_->set_transaction_pool(pool);
460
0
}
461
462
463
57
void TestWorkload::State::Setup(YBTableType table_type, const TestWorkloadOptions& options) {
464
57
  client::YBClientBuilder client_builder;
465
57
  client_builder.default_rpc_timeout(options.default_rpc_timeout);
466
57
  client_ = CHECK_RESULT(cluster_->CreateClient(&client_builder));
467
57
  CHECK_OK(client_->CreateNamespaceIfNotExists(
468
57
      options.table_name.namespace_name(),
469
57
      master::GetDatabaseTypeForTable(client::ClientToPBTableType(table_type))));
470
471
  // Retry YBClient::TableExists() until we make that call retry reliably.
472
  // See KUDU-1074.
473
57
  MonoTime deadline(MonoTime::Now());
474
57
  deadline.AddDelta(MonoDelta::FromSeconds(10));
475
57
  Result<bool> table_exists(false);
476
57
  while (true) {
477
15
    table_exists = client_->TableExists(options.table_name);
478
15
    if (table_exists.ok() || 
deadline.ComesBefore(MonoTime::Now())0
) break;
479
0
    SleepFor(MonoDelta::FromMilliseconds(10));
480
0
  }
481
57
  CHECK_OK(table_exists);
482
483
57
  if (!table_exists.get()) {
484
15
    auto schema = GetSimpleTestSchema();
485
15
    schema.SetTransactional(options.is_transactional());
486
15
    if (options.has_table_ttl()) {
487
0
      schema.SetDefaultTimeToLive(
488
0
          options.table_ttl * MonoTime::kMillisecondsPerSecond);
489
0
    }
490
15
    YBSchema client_schema(YBSchemaFromSchema(schema));
491
492
15
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
493
15
    CHECK_OK(table_creator->table_name(options.table_name)
494
15
             .schema(&client_schema)
495
15
             .num_tablets(options.num_tablets)
496
             // NOTE: this is quite high as a timeout, but the default (5 sec) does not
497
             // seem to be high enough in some cases (see KUDU-550). We should remove
498
             // this once that ticket is addressed.
499
15
             .timeout(MonoDelta::FromSeconds(NonTsanVsTsan(20, 60)))
500
15
             .table_type(table_type)
501
15
             .Create());
502
42
  } else {
503
42
    LOG(INFO) << "TestWorkload: Skipping table creation because table "
504
42
              << options.table_name.ToString() << " already exists";
505
42
  }
506
57
}
507
508
11
void TestWorkload::Start() {
509
11
  state_->Start(options_);
510
11
}
511
512
11
void TestWorkload::State::Start(const TestWorkloadOptions& options) {
513
11
  bool expected = false;
514
11
  should_run_.compare_exchange_strong(expected, true, std::memory_order_acq_rel);
515
11
  CHECK
(!expected) << "Already started"0
;
516
11
  should_run_.store(true, std::memory_order_release);
517
11
  start_latch_.Reset(options.num_write_threads + options.num_read_threads);
518
176
  for (int i = 0; i < options.num_write_threads; 
++i165
) {
519
165
    scoped_refptr<yb::Thread> new_thread;
520
165
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-writer-$0", i),
521
165
                                &State::WriteThread, this, options, &new_thread));
522
165
    threads_.push_back(new_thread);
523
165
  }
524
11
  for (int i = 0; i < options.num_read_threads; 
++i0
) {
525
0
    scoped_refptr<yb::Thread> new_thread;
526
0
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-reader-$0", i),
527
0
                                &State::ReadThread, this, options, &new_thread));
528
0
    threads_.push_back(new_thread);
529
0
  }
530
11
}
531
532
35
void TestWorkload::Stop() {
533
35
  state_->Stop();
534
35
}
535
536
20
void TestWorkload::Join() {
537
20
  state_->Join();
538
20
}
539
540
35
void TestWorkload::StopAndJoin() {
541
35
  Stop();
542
35
  Join();
543
35
}
544
545
1
void TestWorkload::WaitInserted(int64_t required) {
546
2
  while (rows_inserted() < required) {
547
1
    std::this_thread::sleep_for(100ms);
548
1
  }
549
1
}
550
551
195
int64_t TestWorkload::rows_inserted() const {
552
195
  return state_->rows_inserted();
553
195
}
554
555
0
int64_t TestWorkload::rows_insert_failed() const {
556
0
  return state_->rows_insert_failed();
557
0
}
558
559
0
int64_t TestWorkload::rows_read_ok() const {
560
0
  return state_->rows_read_ok();
561
0
}
562
563
0
int64_t TestWorkload::rows_read_empty() const {
564
0
  return state_->rows_read_empty();
565
0
}
566
567
0
int64_t TestWorkload::rows_read_error() const {
568
0
  return state_->rows_read_error();
569
0
}
570
571
0
int64_t TestWorkload::rows_read_try_again() const {
572
0
  return state_->rows_read_try_again();
573
0
}
574
575
0
int64_t TestWorkload::batches_completed() const {
576
0
  return state_->batches_completed();
577
0
}
578
579
1
client::YBClient& TestWorkload::client() const {
580
1
  return state_->client();
581
1
}
582
583
584
}  // namespace yb