YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/load_generator.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_INTEGRATION_TESTS_LOAD_GENERATOR_H_
15
#define YB_INTEGRATION_TESTS_LOAD_GENERATOR_H_
16
17
#include <dirent.h>
18
#include <signal.h>
19
#include <spawn.h>
20
21
#include <atomic>
22
#include <iostream>
23
#include <map>
24
#include <memory>
25
#include <random>
26
#include <set>
27
#include <string>
28
#include <thread>
29
#include <unordered_set>
30
#include <vector>
31
32
#include <gtest/gtest.h>
33
34
#include "yb/client/client_fwd.h"
35
36
#include "yb/util/countdown_latch.h"
37
#include "yb/util/status.h"
38
#include "yb/util/threadpool.h"
39
#include "yb/util/tsan_util.h"
40
41
namespace yb {
42
43
namespace redisserver {
44
class RedisClient;
45
}
46
47
namespace load_generator {
48
49
using redisserver::RedisClient;
50
51
class SingleThreadedReader;
52
class SingleThreadedWriter;
53
class MultiThreadedReader;
54
class MultiThreadedWriter;
55
56
std::string FormatHexForLoadTestKey(uint64_t x);
57
58
class KeyIndexSet {
59
 public:
60
  size_t NumElements() const;
61
  void Insert(int64_t key);
62
  bool Contains(int64_t key) const;
63
  bool RemoveIfContains(int64_t key);
64
  int64_t GetRandomKey(std::mt19937_64* random_number_generator) const;
65
66
 private:
67
  std::set<int64_t> set_;
68
  mutable Mutex mutex_;
69
70
  friend std::ostream& operator <<(std::ostream& out, const KeyIndexSet &key_index_set);
71
};
72
73
class SessionFactory {
74
 public:
75
0
  virtual ~SessionFactory() {}
76
  virtual string ClientId() = 0;
77
  virtual SingleThreadedWriter* GetWriter(MultiThreadedWriter* writer, int idx) = 0;
78
  virtual SingleThreadedReader* GetReader(MultiThreadedReader* reader, int idx) = 0;
79
};
80
81
class YBSessionFactory : public SessionFactory {
82
 public:
83
  YBSessionFactory(yb::client::YBClient* client, yb::client::TableHandle* table);
84
85
  virtual string ClientId() override;
86
  SingleThreadedWriter* GetWriter(MultiThreadedWriter* writer, int idx) override;
87
  SingleThreadedReader* GetReader(MultiThreadedReader* reader, int idx) override;
88
89
 protected:
90
  yb::client::YBClient* const client_;
91
  yb::client::TableHandle* table_;
92
};
93
94
class NoopSessionFactory : public YBSessionFactory {
95
 public:
96
  NoopSessionFactory(yb::client::YBClient* client, yb::client::TableHandle* table)
97
0
      : YBSessionFactory(client, table) {}
98
99
  SingleThreadedWriter* GetWriter(MultiThreadedWriter* writer, int idx) override;
100
};
101
102
class RedisSessionFactory : public SessionFactory {
103
 public:
104
  explicit RedisSessionFactory(const string& redis_server_address);
105
106
  virtual string ClientId() override;
107
  SingleThreadedWriter* GetWriter(MultiThreadedWriter* writer, int idx) override;
108
  SingleThreadedReader* GetReader(MultiThreadedReader* reader, int idx) override;
109
110
 protected:
111
  string redis_server_addresses_;
112
};
113
114
class RedisNoopSessionFactory : public RedisSessionFactory {
115
 public:
116
  explicit RedisNoopSessionFactory(const string& redis_server_address)
117
0
      : RedisSessionFactory(redis_server_address) {}
118
119
  SingleThreadedWriter* GetWriter(MultiThreadedWriter* writer, int idx) override;
120
};
121
122
class MultiThreadedAction {
123
 public:
124
  MultiThreadedAction(
125
      const std::string& description, int64_t num_keys, int64_t start_key, int num_action_threads,
126
      int num_extra_threads, const std::string& client_id, std::atomic_bool* stop_requested_flag,
127
      int value_size);
128
129
  virtual ~MultiThreadedAction();
130
131
  virtual void Start();
132
  virtual void WaitForCompletion();
133
134
0
  void Stop() { stop_requested_->store(true); }
135
0
  bool IsStopRequested() { return stop_requested_->load(); }
136
0
  void set_client_id(const std::string& client_id) { client_id_ = client_id; }
137
0
  bool IsRunning() { return running_threads_latch_.count() > 0; }
138
139
 protected:
140
  friend class SingleThreadedReader;
141
  friend class SingleThreadedWriter;
142
143
  std::string GetKeyByIndex(int64_t key_index);
144
145
  // The value returned is compared as a string on read, so having a '\0' will use incorrect size.
146
  // This also creates a human readable string with hex characters between '0'-'f'.
147
  std::string GetValueByIndex(int64_t key_index);
148
149
  virtual void RunActionThread(int actionIndex) = 0;
150
  virtual void RunStatsThread() = 0;
151
152
  std::string description_;
153
  const int64_t num_keys_;  // Total number of keys in the table after successful end of this action
154
  const int64_t start_key_;  // First insertion key index of the write action
155
  const int num_action_threads_;
156
  std::string client_id_;
157
158
  std::unique_ptr<ThreadPool> thread_pool_;
159
  yb::CountDownLatch running_threads_latch_;
160
161
  std::atomic_bool* const stop_requested_;
162
  std::atomic<bool> paused_ { false };
163
164
  const int value_size_;
165
};
166
167
// ------------------------------------------------------------------------------------------------
168
// MultiThreadedWriter
169
// ------------------------------------------------------------------------------------------------
170
171
class MultiThreadedWriter : public MultiThreadedAction {
172
 public:
173
  virtual void WaitForCompletion() override;
174
175
  // Client and table are managed by the caller and their lifetime should be a superset of this
176
  // object's lifetime.
177
  MultiThreadedWriter(
178
      int64_t num_keys, int64_t start_key, int num_writer_threads, SessionFactory* session_factory,
179
      std::atomic_bool* stop_flag, int value_size, size_t max_num_write_errors);
180
181
  void Start() override;
182
0
  std::atomic<int64_t>* InsertionPoint() { return &inserted_up_to_inclusive_; }
183
0
  const KeyIndexSet* InsertedKeys() const { return &inserted_keys_; }
184
0
  const KeyIndexSet* FailedKeys() const { return &failed_keys_; }
185
186
0
  int64_t num_writes() { return next_key_.load() - start_key_; }
187
0
  size_t num_write_errors() { return failed_keys_.NumElements(); }
188
0
  void AssertSucceeded() { ASSERT_EQ(num_write_errors(), 0); }
189
190
0
  void set_pause_flag(std::atomic<bool>* pause_flag) { pause_flag_ = pause_flag; }
191
192
 private:
193
  friend class SingleThreadedWriter;
194
  friend class RedisSingleThreadedWriter;
195
  friend class YBSingleThreadedWriter;
196
197
  virtual void RunActionThread(int writerIndex) override;
198
  virtual void RunStatsThread() override;
199
  void RunInsertionTrackerThread();
200
201
  KeyIndexSet inserted_keys_;
202
  KeyIndexSet failed_keys_;
203
  SessionFactory* session_factory_;
204
205
  // This is the current key to be inserted by any thread. Each thread does an atomic get and
206
  // increment operation and inserts the current value.
207
  std::atomic<int64_t> next_key_;
208
  std::atomic<int64_t> inserted_up_to_inclusive_;
209
210
  size_t max_num_write_errors_ = 0;
211
  std::atomic<bool>* pause_flag_ = nullptr;
212
};
213
214
class SingleThreadedWriter {
215
 public:
216
  SingleThreadedWriter(MultiThreadedWriter* writer, int writer_index)
217
0
      : multi_threaded_writer_(writer), writer_index_(writer_index) {}
218
0
  virtual ~SingleThreadedWriter() {}
219
0
  void set_pause_flag(std::atomic<bool>* pause_flag) { pause_flag_ = pause_flag; }
220
  void Run();
221
222
 protected:
223
  MultiThreadedWriter* multi_threaded_writer_;
224
  const int writer_index_;
225
226
 private:
227
  virtual bool Write(int64_t key_index, const string& key_str, const string& value_str) = 0;
228
  virtual void ConfigureSession() = 0;
229
  virtual void CloseSession() = 0;
230
231
  // Returns true if the calling writer thread should stop.
232
  virtual void HandleInsertionFailure(int64_t key_index, const string& key_str) = 0;
233
234
  std::atomic<bool>* pause_flag_ = nullptr;
235
};
236
237
class YBSingleThreadedWriter : public SingleThreadedWriter {
238
 public:
239
  YBSingleThreadedWriter(
240
      MultiThreadedWriter* writer, client::YBClient* client, client::TableHandle* table,
241
      int writer_index)
242
0
      : SingleThreadedWriter(writer, writer_index), client_(client), table_(table) {}
243
244
 protected:
245
  client::YBClient* const client_;
246
  client::TableHandle* table_;
247
  std::shared_ptr<client::YBSession> session_;
248
249
 private:
250
  virtual bool Write(int64_t key_index, const string& key_str, const string& value_str) override;
251
  virtual void ConfigureSession() override;
252
  virtual void CloseSession() override;
253
  virtual void HandleInsertionFailure(int64_t key_index, const string& key_str) override;
254
};
255
256
class NoopSingleThreadedWriter : public YBSingleThreadedWriter {
257
 public:
258
  NoopSingleThreadedWriter(
259
      MultiThreadedWriter* reader, client::YBClient* client, client::TableHandle* table,
260
0
      int reader_index) : YBSingleThreadedWriter(reader, client, table, reader_index) {}
261
262
 private:
263
  virtual bool Write(int64_t key_index, const string& key_str, const string& value_str) override;
264
};
265
266
class RedisSingleThreadedWriter : public SingleThreadedWriter {
267
 public:
268
  RedisSingleThreadedWriter(
269
      MultiThreadedWriter* writer, string redis_server_addrs, int writer_index)
270
0
      : SingleThreadedWriter(writer, writer_index), redis_server_addresses_(redis_server_addrs) {}
271
272
 protected:
273
  virtual bool Write(int64_t key_index, const string& key_str, const string& value_str) override;
274
  virtual void ConfigureSession() override;
275
  virtual void CloseSession() override;
276
  virtual void HandleInsertionFailure(int64_t key_index, const string& key_str) override;
277
278
  std::vector<std::shared_ptr<RedisClient>> clients_;
279
  const string redis_server_addresses_;
280
};
281
282
class RedisNoopSingleThreadedWriter : public RedisSingleThreadedWriter {
283
 public:
284
  RedisNoopSingleThreadedWriter(
285
      MultiThreadedWriter* writer, string redis_server_addr, int writer_index)
286
0
      : RedisSingleThreadedWriter(writer, redis_server_addr, writer_index) {}
287
288
 protected:
289
  virtual bool Write(int64_t key_index, const string& key_str, const string& value_str) override;
290
};
291
// ------------------------------------------------------------------------------------------------
292
// SingleThreadedScanner
293
// ------------------------------------------------------------------------------------------------
294
295
// TODO: create a multi-threaded version of this.
296
class SingleThreadedScanner {
297
 public:
298
  explicit SingleThreadedScanner(client::TableHandle* table);
299
300
  int64_t CountRows();
301
302
 private:
303
  client::TableHandle* table_;
304
};
305
306
// ------------------------------------------------------------------------------------------------
307
// MultiThreadedReader
308
// ------------------------------------------------------------------------------------------------
309
310
YB_DEFINE_ENUM(ReadStatus, (kOk)(kNoRows)(kInvalidRead)(kExtraRows)(kOtherError));
311
YB_DEFINE_ENUM(MultiThreadedReaderOption, (kStopOnEmptyRead)(kStopOnExtraRead)(kStopOnInvalidRead));
312
using MultiThreadedReaderOptions = EnumBitSet<MultiThreadedReaderOption>;
313
314
class MultiThreadedReader : public MultiThreadedAction {
315
 public:
316
  MultiThreadedReader(int64_t num_keys, int num_reader_threads,
317
                      SessionFactory* session_factory,
318
                      std::atomic<int64_t>* insertion_point,
319
                      const KeyIndexSet* inserted_keys,
320
                      const KeyIndexSet* failed_keys,
321
                      std::atomic_bool* stop_flag, int value_size,
322
                      int max_num_read_errors,
323
                      MultiThreadedReaderOptions options = MultiThreadedReaderOptions{
324
                          MultiThreadedReaderOption::kStopOnEmptyRead,
325
                          MultiThreadedReaderOption::kStopOnExtraRead,
326
                          MultiThreadedReaderOption::kStopOnInvalidRead});
327
328
  void IncrementReadErrorCount(ReadStatus read_status);
329
330
0
  int64_t num_reads() { return num_reads_; }
331
0
  int64_t num_read_errors() { return num_read_errors_.load(); }
332
333
  // Returns read status that caused stop of the reader.
334
0
  ReadStatus read_status_stopped() { return read_status_stopped_; }
335
336
0
  void AssertSucceeded() { ASSERT_EQ(num_read_errors(), 0); }
337
338
 protected:
339
  virtual void RunActionThread(int reader_index) override;
340
  virtual void RunStatsThread() override;
341
342
 private:
343
  friend class SingleThreadedReader;
344
  friend class YBSingleThreadedReader;
345
346
  SessionFactory* session_factory_;
347
  const std::atomic<int64_t>* insertion_point_;
348
  const KeyIndexSet* inserted_keys_;
349
  const KeyIndexSet* failed_keys_;
350
351
  std::atomic<int64_t> num_reads_;
352
  std::atomic<int64_t> num_read_errors_;
353
  const int max_num_read_errors_;
354
  MultiThreadedReaderOptions options_;
355
  ReadStatus read_status_stopped_ = ReadStatus::kOk;
356
};
357
358
class SingleThreadedReader {
359
 public:
360
  SingleThreadedReader(MultiThreadedReader* reader, int reader_index)
361
0
      : multi_threaded_reader_(reader), reader_index_(reader_index) {}
362
0
  virtual ~SingleThreadedReader() {}
363
364
  void Run();
365
366
 protected:
367
  MultiThreadedReader* multi_threaded_reader_;
368
  const int reader_index_;
369
370
 private:
371
  virtual ReadStatus PerformRead(
372
      int64_t key_index, const string& key_str, const string& expected_value) = 0;
373
  virtual void ConfigureSession() = 0;
374
  virtual void CloseSession() = 0;
375
376
  int64_t NextKeyIndexToRead(std::mt19937_64* random_number_generator) const;
377
};
378
379
class YBSingleThreadedReader : public SingleThreadedReader {
380
 public:
381
  YBSingleThreadedReader(
382
      MultiThreadedReader* reader, client::YBClient* client, client::TableHandle* table,
383
      int reader_index)
384
0
      : SingleThreadedReader(reader, reader_index), client_(client), table_(table) {}
385
386
 protected:
387
  client::YBClient* const client_;
388
  client::TableHandle* table_;
389
  std::shared_ptr<client::YBSession> session_;
390
391
 private:
392
  virtual ReadStatus PerformRead(
393
      int64_t key_index, const string& key_str, const string& expected_value) override;
394
  virtual void ConfigureSession() override;
395
  virtual void CloseSession() override;
396
397
};
398
399
class RedisSingleThreadedReader : public SingleThreadedReader {
400
 public:
401
  RedisSingleThreadedReader(MultiThreadedReader* reader, string redis_server_addr, int reader_index)
402
0
      : SingleThreadedReader(reader, reader_index), redis_server_addresses_(redis_server_addr) {}
403
404
 private:
405
  virtual ReadStatus PerformRead(
406
      int64_t key_index, const string& key_str, const string& expected_value) override;
407
  virtual void ConfigureSession() override;
408
  virtual void CloseSession() override;
409
410
  std::vector<std::shared_ptr<RedisClient>> clients_;
411
  const std::string redis_server_addresses_;
412
};
413
414
}  // namespace load_generator
415
}  // namespace yb
416
417
#endif  // YB_INTEGRATION_TESTS_LOAD_GENERATOR_H_