YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/test_workload.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/test_workload.h"
34
35
#include <memory>
36
#include <string>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "yb/client/client-test-util.h"
41
#include "yb/client/client.h"
42
#include "yb/client/error.h"
43
#include "yb/client/schema.h"
44
#include "yb/client/session.h"
45
#include "yb/client/table_creator.h"
46
#include "yb/client/table_handle.h"
47
#include "yb/client/table_info.h"
48
#include "yb/client/transaction.h"
49
#include "yb/client/transaction_pool.h"
50
#include "yb/client/yb_op.h"
51
52
#include "yb/common/wire_protocol-test-util.h"
53
54
#include "yb/gutil/casts.h"
55
#include "yb/gutil/stl_util.h"
56
#include "yb/gutil/strings/substitute.h"
57
58
#include "yb/integration-tests/mini_cluster_base.h"
59
60
#include "yb/master/master_util.h"
61
62
#include "yb/util/env.h"
63
#include "yb/util/monotime.h"
64
#include "yb/util/random.h"
65
#include "yb/util/random_util.h"
66
#include "yb/util/status_log.h"
67
#include "yb/util/thread.h"
68
#include "yb/util/tsan_util.h"
69
70
#include "yb/yql/cql/ql/util/statement_result.h"
71
72
using namespace std::literals;
73
74
namespace yb {
75
76
using client::YBClient;
77
using client::YBClientBuilder;
78
using client::YBSchema;
79
using client::YBSchemaBuilder;
80
using client::YBSchemaFromSchema;
81
using client::YBSession;
82
using client::YBTable;
83
using client::YBTableCreator;
84
using client::YBTableType;
85
using client::YBTableName;
86
using std::shared_ptr;
87
88
const YBTableName TestWorkloadOptions::kDefaultTableName(
89
    YQL_DATABASE_CQL, "my_keyspace", "test-workload");
90
91
class TestWorkload::State {
92
 public:
93
53
  explicit State(MiniClusterBase* cluster) : cluster_(cluster) {}
94
95
  void Start(const TestWorkloadOptions& options);
96
  void Setup(YBTableType table_type, const TestWorkloadOptions& options);
97
98
18
  void Stop() {
99
18
    should_run_.store(false, std::memory_order_release);
100
18
    start_latch_.Reset(0);
101
18
  }
102
103
18
  void Join() {
104
161
    for (const auto& thr : threads_) {
105
161
      CHECK_OK(ThreadJoiner(thr.get()).Join());
106
161
    }
107
18
    threads_.clear();
108
18
  }
109
110
0
  void set_transaction_pool(client::TransactionPool* pool) {
111
0
    transaction_pool_ = pool;
112
0
  }
113
114
106
  int64_t rows_inserted() const {
115
106
    return rows_inserted_.load(std::memory_order_acquire);
116
106
  }
117
118
0
  int64_t rows_insert_failed() const {
119
0
    return rows_insert_failed_.load(std::memory_order_acquire);
120
0
  }
121
122
0
  int64_t rows_read_ok() const {
123
0
    return rows_read_ok_.load(std::memory_order_acquire);
124
0
  }
125
126
0
  int64_t rows_read_empty() const {
127
0
    return rows_read_empty_.load(std::memory_order_acquire);
128
0
  }
129
130
0
  int64_t rows_read_error() const {
131
0
    return rows_read_error_.load(std::memory_order_acquire);
132
0
  }
133
134
0
  int64_t rows_read_try_again() const {
135
0
    return rows_read_try_again_.load(std::memory_order_acquire);
136
0
  }
137
138
0
  int64_t batches_completed() const {
139
0
    return batches_completed_.load(std::memory_order_acquire);
140
0
  }
141
142
1
  client::YBClient& client() const {
143
1
    return *client_;
144
1
  }
145
146
 private:
147
  CHECKED_STATUS Flush(client::YBSession* session, const TestWorkloadOptions& options);
148
  Result<client::YBTransactionPtr> MayBeStartNewTransaction(
149
      client::YBSession* session, const TestWorkloadOptions& options);
150
  Result<client::TableHandle> OpenTable(const TestWorkloadOptions& options);
151
  void WaitAllThreads();
152
  void WriteThread(const TestWorkloadOptions& options);
153
  void ReadThread(const TestWorkloadOptions& options);
154
155
  MiniClusterBase* cluster_;
156
  std::unique_ptr<client::YBClient> client_;
157
  client::TransactionPool* transaction_pool_;
158
  CountDownLatch start_latch_{0};
159
  std::atomic<bool> should_run_{false};
160
  std::atomic<int64_t> pathological_one_row_counter_{0};
161
  std::atomic<bool> pathological_one_row_inserted_{false};
162
  std::atomic<int64_t> rows_inserted_{0};
163
  std::atomic<int64_t> rows_insert_failed_{0};
164
  std::atomic<int64_t> batches_completed_{0};
165
  std::atomic<int32_t> next_key_{0};
166
  std::atomic<int64_t> rows_read_ok_{0};
167
  std::atomic<int64_t> rows_read_empty_{0};
168
  std::atomic<int64_t> rows_read_error_{0};
169
  std::atomic<int64_t> rows_read_try_again_{0};
170
171
  // Invariant: if sequential_write and read_only_written_keys are set then
172
  // keys in [1 ... next_key_] and not in keys_in_write_progress_ are guaranteed to be written.
173
  std::mutex keys_in_write_progress_mutex_;
174
  std::set<int32_t> keys_in_write_progress_ GUARDED_BY(keys_in_write_progress_mutex_);
175
176
  std::vector<scoped_refptr<Thread> > threads_;
177
};
178
179
TestWorkload::TestWorkload(MiniClusterBase* cluster)
180
53
  : state_(new State(cluster)) {}
181
182
10
TestWorkload::~TestWorkload() {
183
10
  StopAndJoin();
184
10
}
185
186
TestWorkload::TestWorkload(TestWorkload&& rhs)
187
0
    : options_(rhs.options_), state_(std::move(rhs.state_)) {}
188
189
0
void TestWorkload::operator=(TestWorkload&& rhs) {
190
0
  options_ = rhs.options_;
191
0
  state_ = std::move(rhs.state_);
192
0
}
193
194
Result<client::YBTransactionPtr> TestWorkload::State::MayBeStartNewTransaction(
195
7.11k
    client::YBSession* session, const TestWorkloadOptions& options) {
196
7.11k
  client::YBTransactionPtr txn;
197
7.11k
  if (options.is_transactional()) {
198
0
    txn = VERIFY_RESULT(transaction_pool_->TakeAndInit(options.isolation_level));
199
0
    session->SetTransaction(txn);
200
0
  }
201
7.11k
  return txn;
202
7.11k
}
203
204
162
Result<client::TableHandle> TestWorkload::State::OpenTable(const TestWorkloadOptions& options) {
205
162
  client::TableHandle table;
206
207
  // Loop trying to open up the table. In some tests we set up very
208
  // low RPC timeouts to test those behaviors, so this might fail and
209
  // need retrying.
210
161
  for(;;) {
211
161
    auto s = table.Open(options.table_name, client_.get());
212
161
    if (s.ok()) {
213
161
      return table;
214
161
    }
215
0
    if (!should_run_.load(std::memory_order_acquire)) {
216
0
      LOG(ERROR) << "Failed to open table: " << s;
217
0
      return s;
218
0
    }
219
0
    if (options.timeout_allowed && s.IsTimedOut()) {
220
0
      SleepFor(MonoDelta::FromMilliseconds(50));
221
0
      continue;
222
0
    }
223
0
    LOG(FATAL) << "Failed to open table: " << s;
224
0
    return s;
225
0
  }
226
162
}
227
228
161
void TestWorkload::State::WaitAllThreads() {
229
  // Wait for all of the workload threads to be ready to go. This maximizes the chance
230
  // that they all send a flood of requests at exactly the same time.
231
  //
232
  // This also minimizes the chance that we see failures to call OpenTable() if
233
  // a late-starting thread overlaps with the flood of outbound traffic from the
234
  // ones that are already writing data.
235
161
  start_latch_.CountDown();
236
161
  start_latch_.Wait();
237
161
}
238
239
165
void TestWorkload::State::WriteThread(const TestWorkloadOptions& options) {
240
2.95M
  auto next_random = [rng = &ThreadLocalRandom()] {
241
2.95M
    return RandomUniformInt<int32_t>(rng);
242
2.95M
  };
243
244
165
  auto table_result = OpenTable(options);
245
165
  if (!table_result.ok()) {
246
0
    return;
247
0
  }
248
165
  auto table = *table_result;
249
250
165
  shared_ptr<YBSession> session = client_->NewSession();
251
165
  session->SetTimeout(options.write_timeout);
252
253
165
  WaitAllThreads();
254
255
165
  std::string test_payload("hello world");
256
165
  if (options.payload_bytes != 11) {
257
    // We fill with zeros if you change the default.
258
8
    test_payload.assign(options.payload_bytes, '0');
259
8
  }
260
261
165
  bool inserting_one_row = false;
262
165
  std::vector<client::YBqlWriteOpPtr> retry_ops;
263
7.27k
  for (;;) {
264
7.27k
    const auto should_run = should_run_.load(std::memory_order_acquire);
265
7.27k
    if (!should_run) {
266
161
      if (options.sequential_write && options.read_only_written_keys) {
267
        // In this case we want to complete writing of keys_in_write_progress_, so we don't have
268
        // gaps after workload is stopped.
269
0
        std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
270
0
        if (keys_in_write_progress_.empty()) {
271
0
          break;
272
0
        }
273
161
      } else {
274
161
        break;
275
161
      }
276
7.11k
    }
277
7.11k
    auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options));
278
7.11k
    std::vector<client::YBqlWriteOpPtr> ops;
279
7.11k
    ops.swap(retry_ops);
280
7.11k
    const auto num_more_keys_to_insert = should_run ? options.write_batch_size - ops.size() : 0;
281
1.48M
    for (size_t i = 0; i < num_more_keys_to_insert; i++) {
282
1.47M
      if (options.pathological_one_row_enabled) {
283
0
        if (!pathological_one_row_inserted_) {
284
0
          if (++pathological_one_row_counter_ != 1) {
285
0
            continue;
286
0
          }
287
0
        } else {
288
0
          inserting_one_row = true;
289
0
          auto update = table.NewUpdateOp();
290
0
          auto req = update->mutable_request();
291
0
          QLAddInt32HashValue(req, 0);
292
0
          table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random());
293
0
          if (options.ttl >= 0) {
294
0
            req->set_ttl(options.ttl * MonoTime::kMillisecondsPerSecond);
295
0
          }
296
0
          ops.push_back(update);
297
0
          session->Apply(update);
298
0
          break;
299
0
        }
300
1.47M
      }
301
1.47M
      auto insert = table.NewInsertOp();
302
1.47M
      auto req = insert->mutable_request();
303
1.47M
      int32_t key;
304
1.47M
      if (options.sequential_write) {
305
1.11k
        if (options.read_only_written_keys) {
306
0
          std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
307
0
          key = ++next_key_;
308
0
          keys_in_write_progress_.insert(key);
309
1.11k
        } else {
310
1.11k
          key = ++next_key_;
311
1.11k
        }
312
1.47M
      } else {
313
1.47M
        key = options.pathological_one_row_enabled ? 0 : next_random();
314
1.47M
      }
315
1.47M
      QLAddInt32HashValue(req, key);
316
1.47M
      table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), next_random());
317
1.47M
      table.AddStringColumnValue(req, table.schema().columns()[2].name(), test_payload);
318
1.47M
      if (options.ttl >= 0) {
319
0
        req->set_ttl(options.ttl);
320
0
      }
321
1.47M
      ops.push_back(insert);
322
1.47M
    }
323
324
1.47M
    for (const auto& op : ops) {
325
1.47M
      session->Apply(op);
326
1.47M
    }
327
328
7.11k
    const auto flush_status = session->FlushAndGetOpsErrors();
329
7.11k
    if (!flush_status.status.ok()) {
330
0
      VLOG(1) << "Flush error: " << AsString(flush_status.status);
331
11.3k
      for (const auto& error : flush_status.errors) {
332
11.3k
        auto* resp = down_cast<client::YBqlOp*>(&error->failed_op())->mutable_response();
333
11.3k
        resp->Clear();
334
11.3k
        resp->set_status(
335
0
            error->status().IsTryAgain() ? QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR
336
11.3k
                                         : QLResponsePB::YQL_STATUS_RUNTIME_ERROR);
337
11.3k
        resp->set_error_message(error->status().message().ToBuffer());
338
11.3k
      }
339
279
    }
340
7.11k
    if (txn) {
341
0
      CHECK_OK(txn->CommitFuture().get());
342
0
    }
343
344
7.11k
    int inserted = 0;
345
1.47M
    for (const auto& op : ops) {
346
1.47M
      if (op->response().status() == QLResponsePB::YQL_STATUS_OK) {
347
18.4E
        VLOG(2) << "Op succeeded: " << op->ToString();
348
1.46M
        if (options.read_only_written_keys) {
349
0
          std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
350
0
          keys_in_write_progress_.erase(
351
0
              op->request().hashed_column_values(0).value().int32_value());
352
0
        }
353
1.46M
        ++inserted;
354
11.4k
      } else if (
355
11.4k
          options.retry_on_restart_required_error &&
356
0
          op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
357
0
        VLOG(1) << "Op restart required: " << op->ToString() << ": "
358
0
                << op->response().ShortDebugString();
359
0
        auto retry_op = table.NewInsertOp();
360
0
        *retry_op->mutable_request() = op->request();
361
0
        retry_ops.push_back(retry_op);
362
11.4k
      } else if (options.insert_failures_allowed) {
363
2
        VLOG(1) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString();
364
11.4k
        ++rows_insert_failed_;
365
16
      } else {
366
16
        LOG(FATAL) << "Op failed: " << op->ToString() << ": " << op->response().ShortDebugString();
367
16
      }
368
1.47M
    }
369
370
7.11k
    rows_inserted_.fetch_add(inserted, std::memory_order_acq_rel);
371
7.11k
    if (inserted > 0) {
372
6.84k
      batches_completed_.fetch_add(1, std::memory_order_acq_rel);
373
6.84k
    }
374
7.11k
    if (inserting_one_row && inserted <= 0) {
375
0
      pathological_one_row_counter_ = 0;
376
0
    }
377
7.11k
    if (PREDICT_FALSE(options.write_interval_millis > 0)) {
378
0
      SleepFor(MonoDelta::FromMilliseconds(options.write_interval_millis));
379
0
    }
380
7.11k
  }
381
165
}
382
383
0
void TestWorkload::State::ReadThread(const TestWorkloadOptions& options) {
384
0
  Random r(narrow_cast<uint32_t>(Env::Default()->gettid()));
385
386
0
  auto table_result = OpenTable(options);
387
0
  if (!table_result.ok()) {
388
0
    return;
389
0
  }
390
0
  auto table = *table_result;
391
392
0
  shared_ptr<YBSession> session = client_->NewSession();
393
0
  session->SetTimeout(options.default_rpc_timeout);
394
395
0
  WaitAllThreads();
396
397
0
  while (should_run_.load(std::memory_order_acquire)) {
398
0
    auto txn = CHECK_RESULT(MayBeStartNewTransaction(session.get(), options));
399
0
    auto op = table.NewReadOp();
400
0
    auto req = op->mutable_request();
401
0
    const int32_t next_key = next_key_;
402
0
    int32_t key;
403
0
    if (options.sequential_write) {
404
0
      if (next_key == 0) {
405
0
        std::this_thread::sleep_for(100ms);
406
0
        continue;
407
0
      }
408
0
      for (;;) {
409
0
        key = 1 + r.Uniform(next_key);
410
0
        if (!options.read_only_written_keys) {
411
0
          break;
412
0
        }
413
0
        std::lock_guard<std::mutex> lock(keys_in_write_progress_mutex_);
414
0
        if (keys_in_write_progress_.count(key) == 0) {
415
0
          break;
416
0
        }
417
0
      }
418
0
    } else {
419
0
      key = r.Next();
420
0
    }
421
0
    QLAddInt32HashValue(req, key);
422
0
    session->Apply(op);
423
0
    const auto flush_status = session->FlushAndGetOpsErrors();
424
0
    const auto& s = flush_status.status;
425
0
    if (s.ok()) {
426
0
      if (op->response().status() == QLResponsePB::YQL_STATUS_OK) {
427
0
        ++rows_read_ok_;
428
0
        if (ql::RowsResult(op.get()).GetRowBlock()->row_count() == 0) {
429
0
          ++rows_read_empty_;
430
0
          if (options.read_only_written_keys) {
431
0
            LOG(ERROR) << "Got empty result for key: " << key << " next_key: " << next_key;
432
0
          }
433
0
        }
434
0
      } else {
435
0
        ++rows_read_error_;
436
0
      }
437
0
    } else {
438
0
      if (s.IsTryAgain()) {
439
0
        ++rows_read_try_again_;
440
0
        LOG(INFO) << s;
441
0
      } else {
442
0
        LOG(FATAL) << s;
443
0
      }
444
0
    }
445
0
    if (txn) {
446
0
      CHECK_OK(txn->CommitFuture().get());
447
0
    }
448
0
  }
449
0
}
450
451
53
void TestWorkload::Setup(YBTableType table_type) {
452
53
  state_->Setup(table_type, options_);
453
53
}
454
455
void TestWorkload::set_transactional(
456
0
    IsolationLevel isolation_level, client::TransactionPool* pool) {
457
0
  options_.isolation_level = isolation_level;
458
0
  state_->set_transaction_pool(pool);
459
0
}
460
461
462
53
void TestWorkload::State::Setup(YBTableType table_type, const TestWorkloadOptions& options) {
463
53
  client::YBClientBuilder client_builder;
464
53
  client_builder.default_rpc_timeout(options.default_rpc_timeout);
465
53
  client_ = CHECK_RESULT(cluster_->CreateClient(&client_builder));
466
53
  CHECK_OK(client_->CreateNamespaceIfNotExists(
467
53
      options.table_name.namespace_name(),
468
53
      master::GetDatabaseTypeForTable(client::ClientToPBTableType(table_type))));
469
470
  // Retry YBClient::TableExists() until we make that call retry reliably.
471
  // See KUDU-1074.
472
53
  MonoTime deadline(MonoTime::Now());
473
53
  deadline.AddDelta(MonoDelta::FromSeconds(10));
474
53
  Result<bool> table_exists(false);
475
53
  while (true) {
476
15
    table_exists = client_->TableExists(options.table_name);
477
15
    if (table_exists.ok() || deadline.ComesBefore(MonoTime::Now())) break;
478
0
    SleepFor(MonoDelta::FromMilliseconds(10));
479
0
  }
480
53
  CHECK_OK(table_exists);
481
482
53
  if (!table_exists.get()) {
483
15
    auto schema = GetSimpleTestSchema();
484
15
    schema.SetTransactional(options.is_transactional());
485
15
    if (options.has_table_ttl()) {
486
0
      schema.SetDefaultTimeToLive(
487
0
          options.table_ttl * MonoTime::kMillisecondsPerSecond);
488
0
    }
489
15
    YBSchema client_schema(YBSchemaFromSchema(schema));
490
491
15
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
492
15
    CHECK_OK(table_creator->table_name(options.table_name)
493
15
             .schema(&client_schema)
494
15
             .num_tablets(options.num_tablets)
495
             // NOTE: this is quite high as a timeout, but the default (5 sec) does not
496
             // seem to be high enough in some cases (see KUDU-550). We should remove
497
             // this once that ticket is addressed.
498
15
             .timeout(MonoDelta::FromSeconds(NonTsanVsTsan(20, 60)))
499
15
             .table_type(table_type)
500
15
             .Create());
501
38
  } else {
502
38
    LOG(INFO) << "TestWorkload: Skipping table creation because table "
503
38
              << options.table_name.ToString() << " already exists";
504
38
  }
505
53
}
506
507
11
void TestWorkload::Start() {
508
11
  state_->Start(options_);
509
11
}
510
511
11
void TestWorkload::State::Start(const TestWorkloadOptions& options) {
512
11
  bool expected = false;
513
11
  should_run_.compare_exchange_strong(expected, true, std::memory_order_acq_rel);
514
0
  CHECK(!expected) << "Already started";
515
11
  should_run_.store(true, std::memory_order_release);
516
11
  start_latch_.Reset(options.num_write_threads + options.num_read_threads);
517
176
  for (int i = 0; i < options.num_write_threads; ++i) {
518
165
    scoped_refptr<yb::Thread> new_thread;
519
165
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-writer-$0", i),
520
165
                                &State::WriteThread, this, options, &new_thread));
521
165
    threads_.push_back(new_thread);
522
165
  }
523
11
  for (int i = 0; i < options.num_read_threads; ++i) {
524
0
    scoped_refptr<yb::Thread> new_thread;
525
0
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("test-reader-$0", i),
526
0
                                &State::ReadThread, this, options, &new_thread));
527
0
    threads_.push_back(new_thread);
528
0
  }
529
11
}
530
531
31
void TestWorkload::Stop() {
532
31
  state_->Stop();
533
31
}
534
535
18
void TestWorkload::Join() {
536
18
  state_->Join();
537
18
}
538
539
31
void TestWorkload::StopAndJoin() {
540
31
  Stop();
541
31
  Join();
542
31
}
543
544
1
void TestWorkload::WaitInserted(int64_t required) {
545
2
  while (rows_inserted() < required) {
546
1
    std::this_thread::sleep_for(100ms);
547
1
  }
548
1
}
549
550
106
int64_t TestWorkload::rows_inserted() const {
551
106
  return state_->rows_inserted();
552
106
}
553
554
0
int64_t TestWorkload::rows_insert_failed() const {
555
0
  return state_->rows_insert_failed();
556
0
}
557
558
0
int64_t TestWorkload::rows_read_ok() const {
559
0
  return state_->rows_read_ok();
560
0
}
561
562
0
int64_t TestWorkload::rows_read_empty() const {
563
0
  return state_->rows_read_empty();
564
0
}
565
566
0
int64_t TestWorkload::rows_read_error() const {
567
0
  return state_->rows_read_error();
568
0
}
569
570
0
int64_t TestWorkload::rows_read_try_again() const {
571
0
  return state_->rows_read_try_again();
572
0
}
573
574
0
int64_t TestWorkload::batches_completed() const {
575
0
  return state_->batches_completed();
576
0
}
577
578
1
client::YBClient& TestWorkload::client() const {
579
1
  return state_->client();
580
1
}
581
582
583
}  // namespace yb