YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/load_generator.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/load_generator.h"
15
16
#include <memory>
17
#include <random>
18
#include <thread>
19
20
#include <boost/range/iterator_range.hpp>
21
22
#include "yb/client/client.h"
23
#include "yb/client/error.h"
24
#include "yb/client/schema.h"
25
#include "yb/client/session.h"
26
#include "yb/client/table_handle.h"
27
#include "yb/client/yb_op.h"
28
29
#include "yb/common/common.pb.h"
30
#include "yb/common/partial_row.h"
31
#include "yb/common/ql_value.h"
32
33
#include "yb/gutil/strings/split.h"
34
#include "yb/gutil/strings/substitute.h"
35
36
#include "yb/util/atomic.h"
37
#include "yb/util/debug/leakcheck_disabler.h"
38
#include "yb/util/net/sockaddr.h"
39
#include "yb/util/result.h"
40
#include "yb/util/status_log.h"
41
42
#include "yb/yql/redis/redisserver/redis_client.h"
43
44
using namespace std::literals;
45
46
using std::atomic;
47
using std::atomic_bool;
48
using std::unique_ptr;
49
50
using strings::Substitute;
51
52
using std::shared_ptr;
53
using yb::Status;
54
using yb::ThreadPool;
55
using yb::ThreadPoolBuilder;
56
using yb::MonoDelta;
57
using yb::MemoryOrder;
58
using yb::ConditionVariable;
59
using yb::Mutex;
60
using yb::MutexLock;
61
using yb::CountDownLatch;
62
using yb::Slice;
63
using yb::YBPartialRow;
64
using yb::TableType;
65
66
using yb::client::YBClient;
67
using yb::client::YBError;
68
using yb::client::YBNoOp;
69
using yb::client::YBSession;
70
using yb::client::YBTable;
71
using yb::redisserver::RedisReply;
72
73
DEFINE_bool(load_gen_verbose,
74
            false,
75
            "Custom verbose log messages for debugging the load test tool");
76
77
DEFINE_int32(load_gen_insertion_tracker_delay_ms,
78
             50,
79
             "The interval (ms) at which the load generator's \"insertion tracker thread\" "
80
             "wakes in up ");
81
82
DEFINE_int32(load_gen_scanner_open_retries,
83
             10,
84
             "Number of times to re-try when opening a scanner");
85
86
DEFINE_int32(load_gen_wait_time_increment_step_ms,
87
             100,
88
             "In retry loops used in the load test we increment the wait time by this number of "
89
             "milliseconds after every attempt.");
90
91
namespace {
92
93
0
void ConfigureYBSession(YBSession* session) {
94
0
  session->SetTimeout(60s);
95
0
}
96
97
0
string FormatWithSize(const string& s) {
98
0
  return strings::Substitute("'$0' ($1 bytes)", s, s.size());
99
0
}
100
101
}  // namespace
102
103
namespace yb {
104
namespace load_generator {
105
106
0
string FormatHexForLoadTestKey(uint64_t x) {
107
0
  char buf[64];
108
0
  snprintf(buf, sizeof(buf) - 1, "%016" PRIx64, x);
109
0
  return buf;
110
0
}
111
112
0
size_t KeyIndexSet::NumElements() const {
113
0
  MutexLock l(mutex_);
114
0
  return set_.size();
115
0
}
116
117
0
void KeyIndexSet::Insert(int64_t key) {
118
0
  MutexLock l(mutex_);
119
0
  set_.insert(key);
120
0
}
121
122
0
bool KeyIndexSet::Contains(int64_t key) const {
123
0
  MutexLock l(mutex_);
124
0
  return set_.find(key) != set_.end();
125
0
}
126
127
0
bool KeyIndexSet::RemoveIfContains(int64_t key) {
128
0
  MutexLock l(mutex_);
129
0
  set<int64>::iterator it = set_.find(key);
130
0
  if (it == set_.end()) {
131
0
    return false;
132
0
  } else {
133
0
    set_.erase(it);
134
0
    return true;
135
0
  }
136
0
}
137
138
0
int64_t KeyIndexSet::GetRandomKey(std::mt19937_64* random_number_generator) const {
139
0
  MutexLock l(mutex_);
140
  // The set iterator does not support indexing, so we probabilistically choose a random element
141
  // by iterating the set.
142
0
  size_t n = set_.size();
143
0
  for (int64_t x : set_) {
144
0
    if ((*random_number_generator)() % n == 0) return x;
145
0
    --n;  // Decrement the number of remaining elements we are considering.
146
0
  }
147
  // This will only happen if the set is empty.
148
0
  return -1;
149
0
}
150
151
0
ostream& operator <<(ostream& out, const KeyIndexSet &key_index_set) {
152
0
  MutexLock l(key_index_set.mutex_);
153
0
  out << "[";
154
0
  bool first = true;
155
0
  for (auto key : key_index_set.set_) {
156
0
    if (!first) {
157
0
      out << ", ";
158
0
    }
159
0
    first = false;
160
0
    out << key;
161
0
  }
162
0
  out << "]";
163
0
  return out;
164
0
}
165
166
// ------------------------------------------------------------------------------------------------
167
// SessionFactory
168
// ------------------------------------------------------------------------------------------------
169
170
YBSessionFactory::YBSessionFactory(client::YBClient* client, client::TableHandle* table)
171
0
    : client_(client), table_(table) {}
172
173
0
string YBSessionFactory::ClientId() { return client_->id().ToString(); }
174
175
0
SingleThreadedWriter* YBSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) {
176
0
  return new YBSingleThreadedWriter(writer, client_, table_, idx);
177
0
}
178
179
0
SingleThreadedReader* YBSessionFactory::GetReader(MultiThreadedReader* reader, int idx) {
180
0
  return new YBSingleThreadedReader(reader, client_, table_, idx);
181
0
}
182
183
0
SingleThreadedWriter* NoopSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) {
184
0
  return new NoopSingleThreadedWriter(writer, client_, table_, idx);
185
0
}
186
187
RedisSessionFactory::RedisSessionFactory(const string& redis_server_addresses)
188
0
    : redis_server_addresses_(redis_server_addresses) {}
189
190
0
string RedisSessionFactory::ClientId() { return "redis_client"; }
191
192
0
SingleThreadedWriter* RedisSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) {
193
0
  return new RedisSingleThreadedWriter(writer, redis_server_addresses_, idx);
194
0
}
195
196
0
SingleThreadedReader* RedisSessionFactory::GetReader(MultiThreadedReader* reader, int idx) {
197
0
  return new RedisSingleThreadedReader(reader, redis_server_addresses_, idx);
198
0
}
199
200
0
SingleThreadedWriter* RedisNoopSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) {
201
0
  return new RedisNoopSingleThreadedWriter(writer, redis_server_addresses_, idx);
202
0
}
203
204
// ------------------------------------------------------------------------------------------------
205
// MultiThreadedAction
206
// ------------------------------------------------------------------------------------------------
207
208
MultiThreadedAction::MultiThreadedAction(
209
    const string& description, int64_t num_keys, int64_t start_key, int num_action_threads,
210
    int num_extra_threads, const string& client_id, atomic_bool* stop_requested_flag,
211
    int value_size)
212
    : description_(description),
213
      num_keys_(num_keys),
214
      start_key_(start_key),
215
      num_action_threads_(num_action_threads),
216
      client_id_(client_id),
217
      running_threads_latch_(num_action_threads),
218
      stop_requested_(stop_requested_flag),
219
0
      value_size_(value_size) {
220
0
  CHECK_OK(
221
0
      ThreadPoolBuilder(description)
222
0
          .set_max_threads(num_action_threads_ + num_extra_threads)
223
0
          .Build(&thread_pool_));
224
0
}
225
226
0
MultiThreadedAction::~MultiThreadedAction() {}
227
228
0
string MultiThreadedAction::GetKeyByIndex(int64_t key_index) {
229
0
  string key_index_str(Substitute("key$0", key_index));
230
0
  return Substitute(
231
0
      "$0_$1_$2", FormatHexForLoadTestKey(std::hash<string>()(key_index_str)), key_index_str,
232
0
      client_id_);
233
0
}
234
235
// Creates a human-readable string with hex characters to be used as a value in our test. This is
236
// deterministic based on key_index.
237
DISABLE_UBSAN
238
0
string MultiThreadedAction::GetValueByIndex(int64_t key_index) {
239
0
  string value;
240
0
  int64_t x = key_index;
241
0
  for (int i = 0; i < value_size_; ++i) {
242
0
    int val = static_cast<int>(x & 0xf);
243
0
    char c = static_cast<char>(val > 9 ? val - 10 + 'a' : val + '0');
244
0
    value.push_back(c);
245
    // Add pseudo-randomness by using the loop index.
246
0
    x = (x >> 4) * 31 + i;
247
0
  }
248
0
  return value;
249
0
}
250
251
0
void MultiThreadedAction::Start() {
252
0
  LOG(INFO) << "Starting " << num_action_threads_ << " " << description_ << " threads";
253
0
  CHECK_OK(thread_pool_->SubmitFunc(std::bind(&MultiThreadedAction::RunStatsThread, this)));
254
0
  for (int i = 0; i < num_action_threads_; i++) {
255
0
    CHECK_OK(thread_pool_->SubmitFunc(std::bind(&MultiThreadedAction::RunActionThread, this, i)));
256
0
  }
257
0
}
258
259
0
void MultiThreadedAction::WaitForCompletion() {
260
0
  thread_pool_->Wait();
261
0
}
262
263
// ------------------------------------------------------------------------------------------------
264
// MultiThreadedWriter
265
// ------------------------------------------------------------------------------------------------
266
267
MultiThreadedWriter::MultiThreadedWriter(
268
    int64_t num_keys, int64_t start_key, int num_writer_threads, SessionFactory* session_factory,
269
    atomic_bool* stop_flag, int value_size, size_t max_num_write_errors)
270
    : MultiThreadedAction(
271
          "writers", num_keys, start_key, num_writer_threads, 2, session_factory->ClientId(),
272
          stop_flag, value_size),
273
      session_factory_(session_factory),
274
      next_key_(start_key),
275
      inserted_up_to_inclusive_(-1),
276
0
      max_num_write_errors_(max_num_write_errors) {}
277
278
0
void MultiThreadedWriter::Start() {
279
0
  MultiThreadedAction::Start();
280
0
  CHECK_OK(
281
0
      thread_pool_->SubmitFunc(std::bind(&MultiThreadedWriter::RunInsertionTrackerThread, this)));
282
0
}
283
284
0
void MultiThreadedWriter::WaitForCompletion() {
285
0
  MultiThreadedAction::WaitForCompletion();
286
0
  LOG(INFO) << "Inserted up to and including " << inserted_up_to_inclusive_.load();
287
0
}
288
289
0
void MultiThreadedWriter::RunActionThread(int writer_index) {
290
0
  unique_ptr<SingleThreadedWriter> writer(session_factory_->GetWriter(this, writer_index));
291
0
  writer->set_pause_flag(pause_flag_);
292
0
  writer->Run();
293
294
0
  LOG(INFO) << "Writer thread " << writer_index << " finished";
295
0
  running_threads_latch_.CountDown();
296
0
}
297
298
0
void SingleThreadedWriter::Run() {
299
0
  LOG(INFO) << "Writer thread " << writer_index_ << " started";
300
0
  ConfigureSession();
301
0
  while (!multi_threaded_writer_->IsStopRequested()) {
302
0
    if (pause_flag_ && pause_flag_->load(std::memory_order_acquire)) {
303
0
      std::this_thread::sleep_for(10ms);
304
0
      continue;
305
0
    }
306
0
    int64_t key_index = multi_threaded_writer_->next_key_++;
307
0
    if (key_index >= multi_threaded_writer_->num_keys_) {
308
0
      break;
309
0
    }
310
311
0
    string key_str(multi_threaded_writer_->GetKeyByIndex(key_index));
312
0
    string value_str(multi_threaded_writer_->GetValueByIndex(key_index));
313
314
0
    if (Write(key_index, key_str, value_str)) {
315
0
      multi_threaded_writer_->inserted_keys_.Insert(key_index);
316
0
    } else {
317
0
      multi_threaded_writer_->failed_keys_.Insert(key_index);
318
0
      HandleInsertionFailure(key_index, key_str);
319
0
      if (multi_threaded_writer_->num_write_errors() >
320
0
          multi_threaded_writer_->max_num_write_errors_) {
321
0
        LOG(ERROR) << "Exceeded the maximum number of write errors "
322
0
                   << multi_threaded_writer_->max_num_write_errors_ << ", stopping the test.";
323
0
        multi_threaded_writer_->Stop();
324
0
        break;
325
0
      }
326
0
    }
327
0
  }
328
0
  CloseSession();
329
0
}
330
331
void ConfigureRedisSessions(
332
0
    const string& redis_server_addresses, vector<shared_ptr<RedisClient> >* clients) {
333
0
  std::vector<string> addresses;
334
0
  SplitStringUsing(redis_server_addresses, ",", &addresses);
335
0
  for (auto& addr : addresses) {
336
0
    auto remote = CHECK_RESULT(ParseEndpoint(addr, 6379));
337
0
    clients->push_back(std::make_shared<RedisClient>(
338
0
        remote.address().to_string(), remote.port()));
339
0
  }
340
0
}
341
342
0
void RedisSingleThreadedWriter::ConfigureSession() {
343
0
  ConfigureRedisSessions(redis_server_addresses_, &clients_);
344
0
}
345
346
bool RedisSingleThreadedWriter::Write(
347
0
    int64_t key_index, const string& key_str, const string& value_str) {
348
0
  bool success = false;
349
0
  auto writer_index = writer_index_;
350
0
  int64_t idx = key_index % clients_.size();
351
0
  clients_[idx]->Send(
352
0
      {"SET", key_str, value_str}, [&success, key_index, writer_index](const RedisReply& reply) {
353
0
        if ("OK" == reply.as_string()) {
354
0
          VLOG(2) << "Writer " << writer_index << " Successfully inserted key #" << key_index
355
0
                  << " into redis ";
356
0
          success = true;
357
0
        } else {
358
0
          VLOG(1) << "Failed Insersion key #" << key_index << reply.as_string();
359
0
          success = false;
360
0
        }
361
0
      });
362
0
  clients_[idx]->Commit();
363
364
0
  return success;
365
0
}
366
367
0
void RedisSingleThreadedWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) {
368
  // Nothing special to do for Redis failures.
369
0
}
370
371
0
void RedisSingleThreadedWriter::CloseSession() {
372
0
  for (auto client : clients_) {
373
0
    client->Disconnect();
374
0
  }
375
0
}
376
377
bool RedisNoopSingleThreadedWriter::Write(
378
0
    int64_t key_index, const string& key_str, const string& value_str) {
379
0
  bool success = false;
380
0
  auto writer_index = writer_index_;
381
0
  int64_t idx = key_index % clients_.size();
382
0
  clients_[idx]->Send({"ECHO", "OK"}, [&success, key_index, writer_index](const RedisReply& reply) {
383
0
    if ("OK" == reply.as_string()) {
384
0
      VLOG(2) << "Writer " << writer_index << " Successfully inserted key #" << key_index
385
0
              << " into redis ";
386
0
      success = true;
387
0
    } else {
388
0
      VLOG(1) << "Failed Insersion key #" << key_index << reply.as_string();
389
0
      success = false;
390
0
    }
391
0
  });
392
0
  clients_[idx]->Commit();
393
394
0
  return success;
395
0
}
396
397
0
void YBSingleThreadedWriter::ConfigureSession() {
398
0
  session_ = client_->NewSession();
399
0
  ConfigureYBSession(session_.get());
400
0
}
401
402
bool YBSingleThreadedWriter::Write(
403
0
    int64_t key_index, const string& key_str, const string& value_str) {
404
0
  auto insert = table_->NewInsertOp();
405
  // Generate a Put for key_str, value_str
406
0
  QLAddStringHashValue(insert->mutable_request(), key_str);
407
0
  table_->AddStringColumnValue(insert->mutable_request(), "v", value_str);
408
  // submit a the put to apply.
409
  // If successful, add to inserted
410
0
  session_->Apply(insert);
411
0
  const auto flush_status = session_->FlushAndGetOpsErrors();
412
0
  const auto& status = flush_status.status;
413
0
  if (!status.ok()) {
414
0
    for (const auto& error : flush_status.errors) {
415
      // It means that key was actually written successfully, but our retry failed because
416
      // it was detected as duplicate request.
417
0
      if (error->status().IsAlreadyPresent()) {
418
0
        return true;
419
0
      }
420
0
      LOG(WARNING) << "Error inserting key '" << key_str << "': " << error->status();
421
0
    }
422
423
0
    LOG(WARNING) << "Error inserting key '" << key_str << "': "
424
0
                 << "Flush() failed (" << status << ")";
425
0
    return false;
426
0
  }
427
0
  if (insert->response().status() != QLResponsePB::YQL_STATUS_OK) {
428
0
    LOG(WARNING) << "Error inserting key '" << key_str << "': "
429
0
                 << insert->response().error_message();
430
0
    return false;
431
0
  }
432
433
0
  multi_threaded_writer_->inserted_keys_.Insert(key_index);
434
0
  VLOG(2) << "Successfully inserted key #" << key_index << " at hybrid_time "
435
0
          << client_->GetLatestObservedHybridTime() << " or earlier";
436
437
0
  return true;
438
0
}
439
440
0
void YBSingleThreadedWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) {
441
  // Already handled in YBSingleThreadedWriter::Write.
442
0
}
443
444
0
void YBSingleThreadedWriter::CloseSession() { CHECK_OK(session_->Close()); }
445
446
0
void MultiThreadedWriter::RunStatsThread() {
447
0
  MicrosecondsInt64 prev_time = GetMonoTimeMicros();
448
0
  int64_t prev_writes = 0;
449
0
  while (!IsStopRequested() && running_threads_latch_.count() > 0) {
450
0
    running_threads_latch_.WaitFor(MonoDelta::FromSeconds(5));
451
0
    int64_t num_writes = this->num_writes();
452
0
    MicrosecondsInt64 current_time = GetMonoTimeMicros();
453
0
    LOG(INFO) << "Wrote " << num_writes << " rows ("
454
0
              << (num_writes - prev_writes) * 1000000.0 / (current_time - prev_time)
455
0
              << " writes/sec), contiguous insertion point: " << inserted_up_to_inclusive_.load()
456
0
              << ", write errors: " << failed_keys_.NumElements();
457
0
    prev_writes = num_writes;
458
0
    prev_time = current_time;
459
0
  }
460
0
}
461
462
0
void MultiThreadedWriter::RunInsertionTrackerThread() {
463
0
  LOG(INFO) << "Insertion tracker thread started";
464
0
  int64_t current_key = 0;  // the first key to be inserted
465
0
  while (!IsStopRequested() && running_threads_latch_.count() > 0) {
466
0
    while (failed_keys_.Contains(current_key) || inserted_keys_.RemoveIfContains(current_key)) {
467
0
      VLOG(2) << "Advancing insertion tracker to key #" << current_key;
468
0
      inserted_up_to_inclusive_.store(current_key);
469
0
      current_key++;
470
0
    }
471
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_load_gen_insertion_tracker_delay_ms));
472
0
  }
473
0
  LOG(INFO) << "Insertion tracker thread stopped";
474
0
}
475
476
// ------------------------------------------------------------------------------------------------
477
// SingleThreadedScanner
478
// ------------------------------------------------------------------------------------------------
479
480
0
SingleThreadedScanner::SingleThreadedScanner(client::TableHandle* table) : table_(table) {}
481
482
0
int64_t SingleThreadedScanner::CountRows() {
483
0
  auto result = boost::size(client::TableRange(*table_));
484
485
0
  LOG(INFO) << " num read rows = " << result;
486
0
  return result;
487
0
}
488
489
// ------------------------------------------------------------------------------------------------
490
// MultiThreadedReader
491
// ------------------------------------------------------------------------------------------------
492
493
MultiThreadedReader::MultiThreadedReader(int64_t num_keys, int num_reader_threads,
494
                                         SessionFactory* session_factory,
495
                                         atomic<int64_t>* insertion_point,
496
                                         const KeyIndexSet* inserted_keys,
497
                                         const KeyIndexSet* failed_keys, atomic_bool* stop_flag,
498
                                         int value_size, int max_num_read_errors,
499
                                         MultiThreadedReaderOptions options)
500
    : MultiThreadedAction(
501
          "readers", num_keys, 0, num_reader_threads, 1, session_factory->ClientId(),
502
          stop_flag, value_size),
503
      session_factory_(session_factory),
504
      insertion_point_(insertion_point),
505
      inserted_keys_(inserted_keys),
506
      failed_keys_(failed_keys),
507
      num_reads_(0),
508
      num_read_errors_(0),
509
      max_num_read_errors_(max_num_read_errors),
510
0
      options_(options) {}
511
512
0
void MultiThreadedReader::RunActionThread(int reader_index) {
513
0
  unique_ptr<SingleThreadedReader> reader_loop(session_factory_->GetReader(this, reader_index));
514
0
  reader_loop->Run();
515
516
0
  LOG(INFO) << "Reader thread " << reader_index << " finished";
517
0
  running_threads_latch_.CountDown();
518
0
}
519
520
0
void MultiThreadedReader::RunStatsThread() {
521
0
  MicrosecondsInt64 prev_time = GetMonoTimeMicros();
522
0
  int64_t prev_rows_read = 0;
523
0
  while (!IsStopRequested() && running_threads_latch_.count() > 0) {
524
0
    running_threads_latch_.WaitFor(MonoDelta::FromSeconds(5));
525
0
    MicrosecondsInt64 current_time = GetMonoTimeMicros();
526
0
    int64_t num_rows_read = num_reads_.load();
527
0
    LOG(INFO) << "Read " << num_rows_read << " rows ("
528
0
              << (num_rows_read - prev_rows_read) * 1000000.0 / (current_time - prev_time)
529
0
              << " reads/sec), read errors: " << num_read_errors_.load();
530
0
    prev_rows_read = num_rows_read;
531
0
    prev_time = current_time;
532
0
  }
533
0
}
534
535
0
void MultiThreadedReader::IncrementReadErrorCount(ReadStatus read_status) {
536
0
  DCHECK(read_status != ReadStatus::kOk);
537
538
0
  if (++num_read_errors_ > max_num_read_errors_) {
539
0
    LOG(ERROR) << "Exceeded the maximum number of read errors (" << max_num_read_errors_ << ")!";
540
0
    read_status_stopped_ = read_status;
541
0
    Stop();
542
0
  }
543
0
  for (const auto& option_and_status :
544
0
       {std::make_pair(MultiThreadedReaderOption::kStopOnEmptyRead, ReadStatus::kNoRows),
545
0
        std::make_pair(MultiThreadedReaderOption::kStopOnInvalidRead, ReadStatus::kInvalidRead),
546
0
        std::make_pair(MultiThreadedReaderOption::kStopOnExtraRead, ReadStatus::kExtraRows)}) {
547
0
    if (options_.Test(option_and_status.first) && read_status == option_and_status.second) {
548
0
      LOG(ERROR) << "Stopping due to not allowed read status: " << AsString(read_status);
549
0
      read_status_stopped_ = read_status;
550
0
      Stop();
551
0
      return;
552
0
    }
553
0
  }
554
0
}
555
556
0
void RedisSingleThreadedReader::ConfigureSession() {
557
0
  ConfigureRedisSessions(redis_server_addresses_, &clients_);
558
0
}
559
560
0
void RedisSingleThreadedReader::CloseSession() {
561
0
  for (auto client : clients_) {
562
0
    client->Disconnect();
563
0
  }
564
0
}
565
566
0
void YBSingleThreadedReader::ConfigureSession() {
567
0
  session_ = client_->NewSession();
568
0
  ConfigureYBSession(session_.get());
569
0
}
570
571
bool NoopSingleThreadedWriter::Write(
572
0
    int64_t key_index, const string& key_str, const string& value_str) {
573
0
  YBNoOp noop(table_->table());
574
0
  std::unique_ptr<YBPartialRow> row(table_->schema().NewRow());
575
0
  CHECK_OK(row->SetBinary("k", key_str));
576
0
  Status s = noop.Execute(client_, *row);
577
0
  if (s.ok()) {
578
0
    return true;
579
0
  }
580
0
  LOG(ERROR) << "NoOp failed" << s.CodeAsString();
581
0
  return false;
582
0
}
583
584
ReadStatus YBSingleThreadedReader::PerformRead(
585
0
    int64_t key_index, const string& key_str, const string& expected_value_str) {
586
0
  uint64_t read_ts = client_->GetLatestObservedHybridTime();
587
588
0
  for (int i = 1;; ++i) {
589
0
    auto read_op = table_->NewReadOp();
590
0
    QLAddStringHashValue(read_op->mutable_request(), key_str);
591
0
    table_->AddColumns({"k", "v"}, read_op->mutable_request());
592
0
    auto status = session_->ApplyAndFlush(read_op);
593
0
    boost::optional<QLRowBlock> row_block;
594
0
    if (status.ok()) {
595
0
      auto result = read_op->MakeRowBlock();
596
0
      if (!result.ok()) {
597
0
        status = std::move(result.status());
598
0
      } else {
599
0
        row_block = std::move(*result);
600
0
      }
601
0
    }
602
0
    if (!status.ok()) {
603
0
      LOG(ERROR) << "Failed to read: " << status << ", re-trying.";
604
0
      if (i >= FLAGS_load_gen_scanner_open_retries) {
605
0
        CHECK_OK(status);
606
0
      }
607
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_load_gen_wait_time_increment_step_ms * i));
608
0
      continue;
609
0
    }
610
0
    if (row_block->row_count() == 0) {
611
0
      LOG(ERROR) << "No rows found for key #" << key_index
612
0
                 << " (read hybrid_time: " << read_ts << ")";
613
0
      return ReadStatus::kNoRows;
614
0
    }
615
0
    if (row_block->row_count() != 1) {
616
0
      LOG(ERROR) << "Found an invalid number of rows for key #" << key_index << ": "
617
0
                 << row_block->row_count() << " (expected to find 1 row), read hybrid_time: "
618
0
                 << read_ts;
619
0
      multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError);
620
0
      return ReadStatus::kOtherError;
621
0
    }
622
0
    auto row = row_block->rows()[0];
623
0
    if (row.column(0).binary_value() != key_str) {
624
0
      LOG(ERROR) << "Invalid key returned by the read operation: '" << row.column(0).binary_value()
625
0
                 << "', expected: '" << key_str << "', read hybrid_time: " << read_ts;
626
0
      multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError);
627
0
      return ReadStatus::kOtherError;
628
0
    }
629
0
    auto returned_value = row.column(1).binary_value();
630
0
    if (returned_value != expected_value_str) {
631
0
      LOG(ERROR) << "Invalid value returned by the read operation for key '" << key_str
632
0
                 << "': " << FormatWithSize(returned_value)
633
0
                 << ", expected: " << FormatWithSize(expected_value_str)
634
0
                 << ", read hybrid_time: " << read_ts;
635
0
      multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError);
636
0
      return ReadStatus::kOtherError;
637
0
    }
638
0
    break;
639
0
  }
640
641
0
  return ReadStatus::kOk;
642
0
}
643
644
0
void YBSingleThreadedReader::CloseSession() { CHECK_OK(session_->Close()); }
645
646
ReadStatus RedisSingleThreadedReader::PerformRead(
647
0
    int64_t key_index, const string& key_str, const string& expected_value_str) {
648
0
  string value_str;
649
0
  int64_t idx = key_index % clients_.size();
650
0
  clients_[idx]->Send({"GET", key_str}, [&value_str](const RedisReply& reply) {
651
0
    value_str = reply.as_string();
652
0
  });
653
0
  VLOG(3) << "Trying to read key #" << key_index << " from redis "
654
0
          << " key : " << key_str;
655
0
  clients_[idx]->Commit();
656
657
0
  if (expected_value_str != value_str) {
658
0
    LOG(INFO) << "Read the wrong value for #" << key_index << " from redis "
659
0
              << " key : " << key_str << " value : " << value_str
660
0
              << " expected : " << expected_value_str;
661
0
    return ReadStatus::kOtherError;
662
0
  }
663
664
0
  VLOG(2) << "Reader " << reader_index_ << " Successfully read key #" << key_index << " from redis "
665
0
          << " key : " << key_str << " value : " << value_str;
666
0
  return ReadStatus::kOk;
667
0
}
668
669
0
void SingleThreadedReader::Run() {
670
0
  std::mt19937_64 random_number_generator(reader_index_);
671
672
0
  LOG(INFO) << "Reader thread " << reader_index_ << " started";
673
0
  ConfigureSession();
674
675
  // Wait until at least one row has been inserted (keys are numbered starting from 1).
676
0
  while (!multi_threaded_reader_->IsStopRequested() &&
677
0
         multi_threaded_reader_->insertion_point_->load() < 0) {
678
0
    VLOG(1) << "Reader thread " << reader_index_ << " Sleeping until load() >= 0";
679
0
    SleepFor(MonoDelta::FromMilliseconds(10));
680
0
  }
681
682
0
  while (!multi_threaded_reader_->IsStopRequested()) {
683
0
    const int64_t key_index = NextKeyIndexToRead(&random_number_generator);
684
685
0
    ++multi_threaded_reader_->num_reads_;
686
0
    const string key_str(multi_threaded_reader_->GetKeyByIndex(key_index));
687
0
    const string expected_value_str(multi_threaded_reader_->GetValueByIndex(key_index));
688
0
    const ReadStatus read_status = PerformRead(key_index, key_str, expected_value_str);
689
690
    // Read operation returning zero rows is treated as a read error.
691
    // See: https://yugabyte.atlassian.net/browse/ENG-1272
692
0
    if (read_status == ReadStatus::kNoRows) {
693
0
      multi_threaded_reader_->IncrementReadErrorCount(read_status);
694
0
    }
695
0
  }
696
697
0
  CloseSession();
698
0
}
699
700
0
int64_t SingleThreadedReader::NextKeyIndexToRead(std::mt19937_64* random_number_generator) const {
701
0
  int64_t key_index = 0;
702
0
  VLOG(3) << "Reader thread " << reader_index_ << " waiting to load insertion point";
703
0
  int64_t written_up_to = multi_threaded_reader_->insertion_point_->load();
704
0
  do {
705
0
    VLOG(3) << "Reader thread " << reader_index_ << " coin toss";
706
0
    switch ((*random_number_generator)() % 3) {
707
0
      case 0:
708
        // Read the latest value that the insertion tracker knows we've written up to.
709
0
        key_index = written_up_to;
710
0
        break;
711
0
      case 1:
712
        // Read one of the keys that have been successfully inserted but have not been processed
713
        // by the insertion tracker thread yet.
714
0
        key_index =
715
0
            multi_threaded_reader_->inserted_keys_->GetRandomKey(random_number_generator);
716
0
        if (key_index == -1) {
717
          // The set is empty.
718
0
          key_index = written_up_to;
719
0
        }
720
0
        break;
721
722
0
      default:
723
        // We're assuming the total number of keys is < RAND_MAX (~2 billion) here.
724
0
        key_index = (*random_number_generator)() % (written_up_to + 1);
725
0
        break;
726
0
    }
727
    // Ensure we don't try to read a key for which a write failed.
728
0
  } while (multi_threaded_reader_->failed_keys_->Contains(key_index) &&
729
0
           !multi_threaded_reader_->IsStopRequested());
730
731
0
  VLOG(1) << "Reader thread " << reader_index_ << " saw written_up_to=" << written_up_to
732
0
          << " and picked key #" << key_index;
733
0
  return key_index;
734
0
}
735
736
}  // namespace load_generator
737
}  // namespace yb