YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/tools/db_stress.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
//
24
// The test uses an array to compare against values written to the database.
25
// Keys written to the array are in 1:1 correspondence to the actual values in
26
// the database according to the formula in the function GenerateValue.
27
28
// Space is reserved in the array from 0 to FLAGS_max_key and values are
29
// randomly written/deleted/read from those positions. During verification we
30
// compare all the positions in the array. To shorten/elongate the running
31
// time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
32
// (sometimes also FLAGS_threads).
33
//
34
// NOTE that if FLAGS_test_batches_snapshots is set, the test will have
35
// different behavior. See comment of the flag for details.
36
37
#ifndef GFLAGS
38
#include <cstdio>
39
int main() {
40
  fprintf(stderr, "Please install gflags to run rocksdb tools\n");
41
  return 1;
42
}
43
#else
44
45
#ifndef __STDC_FORMAT_MACROS
46
#define __STDC_FORMAT_MACROS
47
#endif
48
49
#include <inttypes.h>
50
#include <stdio.h>
51
#include <stdlib.h>
52
#include <sys/types.h>
53
#include <chrono>
54
#include <exception>
55
#include <thread>
56
57
#include <gflags/gflags.h>
58
59
#include "yb/rocksdb/db/db_impl.h"
60
#include "yb/rocksdb/db/filename.h"
61
#include "yb/rocksdb/db/version_set.h"
62
#include "yb/rocksdb/hdfs/env_hdfs.h"
63
#include "yb/rocksdb/port/port.h"
64
#include "yb/rocksdb/cache.h"
65
#include "yb/rocksdb/env.h"
66
#include "yb/rocksdb/filter_policy.h"
67
#include "yb/rocksdb/slice_transform.h"
68
#include "yb/rocksdb/statistics.h"
69
#include "yb/rocksdb/utilities/db_ttl.h"
70
#include "yb/rocksdb/write_batch.h"
71
#include "yb/rocksdb/util/coding.h"
72
#include "yb/rocksdb/util/compression.h"
73
#include "yb/rocksdb/util/crc32c.h"
74
#include "yb/rocksdb/util/histogram.h"
75
#include "yb/rocksdb/util/logging.h"
76
#include "yb/rocksdb/util/mutexlock.h"
77
#include "yb/rocksdb/util/random.h"
78
#include "yb/rocksdb/util/testutil.h"
79
#include "yb/rocksdb/utilities/merge_operators.h"
80
81
#include "yb/util/slice.h"
82
#include "yb/util/string_util.h"
83
84
using GFLAGS::ParseCommandLineFlags;
85
using GFLAGS::RegisterFlagValidator;
86
using GFLAGS::SetUsageMessage;
87
88
static const int64_t KB = 1024;
89
90
0
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
91
0
  if (value > std::numeric_limits<uint32_t>::max()) {
92
0
    fprintf(stderr,
93
0
            "Invalid value for --%s: %" PRIu64 ", overflow\n",
94
0
            flagname,
95
0
            value);
96
0
    return false;
97
0
  }
98
0
  return true;
99
0
}
100
101
DEFINE_uint64(seed, 2341234, "Seed for PRNG");
102
static const bool FLAGS_seed_dummy __attribute__((unused)) =
103
    RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
104
105
DEFINE_int64(max_key, 1 * KB* KB,
106
             "Max number of key/values to place in database");
107
108
DEFINE_int32(column_families, 10, "Number of column families");
109
110
// TODO(noetzli) Add support for single deletes
111
DEFINE_bool(test_batches_snapshots, false,
112
            "If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
113
            " which read/write/delete multiple keys in a batch. In this mode,"
114
            " we do not verify db content by comparing the content with the "
115
            "pre-allocated array. Instead, we do partial verification inside"
116
            " MultiGet() by checking various values in a batch. Benefit of"
117
            " this mode:\n"
118
            "\t(a) No need to acquire mutexes during writes (less cache "
119
            "flushes in multi-core leading to speed up)\n"
120
            "\t(b) No long validation at the end (more speed up)\n"
121
            "\t(c) Test snapshot and atomicity of batch writes");
122
123
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
124
125
DEFINE_int32(ttl, -1,
126
             "Opens the db with this ttl value if this is not -1. "
127
             "Carefully specify a large value such that verifications on "
128
             "deleted values don't fail");
129
130
DEFINE_int32(value_size_mult, 8,
131
             "Size of value will be this number times rand_int(1,3) bytes");
132
133
DEFINE_bool(verify_before_write, false, "Verify before write");
134
135
DEFINE_bool(histogram, false, "Print histogram of operation timings");
136
137
DEFINE_bool(destroy_db_initially, true,
138
            "Destroys the database dir before start if this is true");
139
140
DEFINE_bool(verbose, false, "Verbose");
141
142
DEFINE_bool(progress_reports, true,
143
            "If true, db_stress will report number of finished operations");
144
145
DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
146
              "Number of bytes to buffer in all memtables before compacting");
147
148
DEFINE_int32(write_buffer_size,
149
             static_cast<int32_t>(rocksdb::Options().write_buffer_size),
150
             "Number of bytes to buffer in memtable before compacting");
151
152
DEFINE_int32(max_write_buffer_number,
153
             rocksdb::Options().max_write_buffer_number,
154
             "The number of in-memory memtables. "
155
             "Each memtable is of size FLAGS_write_buffer_size.");
156
157
DEFINE_int32(min_write_buffer_number_to_merge,
158
             rocksdb::Options().min_write_buffer_number_to_merge,
159
             "The minimum number of write buffers that will be merged together "
160
             "before writing to storage. This is cheap because it is an "
161
             "in-memory merge. If this feature is not enabled, then all these "
162
             "write buffers are flushed to L0 as separate files and this "
163
             "increases read amplification because a get request has to check "
164
             "in all of these files. Also, an in-memory merge may result in "
165
             "writing less data to storage if there are duplicate records in"
166
             " each of these individual write buffers.");
167
168
DEFINE_int32(max_write_buffer_number_to_maintain,
169
             rocksdb::Options().max_write_buffer_number_to_maintain,
170
             "The total maximum number of write buffers to maintain in memory "
171
             "including copies of buffers that have already been flushed. "
172
             "Unlike max_write_buffer_number, this parameter does not affect "
173
             "flushing. This controls the minimum amount of write history "
174
             "that will be available in memory for conflict checking when "
175
             "Transactions are used. If this value is too low, some "
176
             "transactions may fail at commit time due to not being able to "
177
             "determine whether there were any write conflicts. Setting this "
178
             "value to 0 will cause write buffers to be freed immediately "
179
             "after they are flushed.  If this value is set to -1, "
180
             "'max_write_buffer_number' will be used.");
181
182
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
183
             "Maximum number of files to keep open at the same time "
184
             "(use default if == 0)");
185
186
DEFINE_int64(compressed_cache_size, -1,
187
             "Number of bytes to use as a cache of compressed data."
188
             " Negative means use default settings.");
189
190
DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, "");
191
192
DEFINE_int32(level0_file_num_compaction_trigger,
193
             rocksdb::Options().level0_file_num_compaction_trigger,
194
             "Level0 compaction start trigger");
195
196
DEFINE_int32(level0_slowdown_writes_trigger,
197
             rocksdb::Options().level0_slowdown_writes_trigger,
198
             "Number of files in level-0 that will slow down writes");
199
200
DEFINE_int32(level0_stop_writes_trigger,
201
             rocksdb::Options().level0_stop_writes_trigger,
202
             "Number of files in level-0 that will trigger put stop.");
203
204
DEFINE_int32(block_size,
205
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
206
             "Number of bytes in a block.");
207
208
DEFINE_int32(max_background_compactions,
209
             rocksdb::Options().max_background_compactions,
210
             "The maximum number of concurrent background compactions "
211
             "that can occur in parallel.");
212
213
DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
214
             "The interval (in milliseconds) to adjust compaction thread pool "
215
             "size. Don't change it periodically if the value is 0.");
216
217
DEFINE_int32(compaction_thread_pool_variations, 2,
218
             "Range of background thread pool size variations when adjusted "
219
             "periodically.");
220
221
DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
222
             "The maximum number of concurrent background flushes "
223
             "that can occur in parallel.");
224
225
DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger"
226
             " compaction in universal style");
227
228
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files to "
229
             "compact in universal style compaction");
230
231
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
232
             " in universal style compaction");
233
234
DEFINE_int32(universal_max_size_amplification_percent, 0,
235
             "The max size amplification for universal style compaction");
236
237
DEFINE_int32(clear_column_family_one_in, 1000000,
238
             "With a chance of 1/N, delete a column family and then recreate "
239
             "it again. If N == 0, never drop/create column families. "
240
             "When test_batches_snapshots is true, this flag has no effect");
241
242
DEFINE_int32(set_options_one_in, 0,
243
             "With a chance of 1/N, change some random options");
244
245
DEFINE_int32(set_in_place_one_in, 0,
246
             "With a chance of 1/N, toggle in place support option");
247
248
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
249
             "Number of bytes to use as a cache of uncompressed data.");
250
251
DEFINE_uint64(subcompactions, 1,
252
             "Maximum number of subcompactions to divide L0-L1 compactions "
253
             "into.");
254
255
DEFINE_bool(allow_concurrent_memtable_write, true,
256
            "Allow multi-writers to update mem tables in parallel.");
257
258
DEFINE_bool(enable_write_thread_adaptive_yield, true,
259
            "Use a yielding spin loop for brief writer thread waits.");
260
261
static const bool FLAGS_subcompactions_dummy __attribute__((unused)) =
262
    RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
263
264
0
static bool ValidateInt32Positive(const char* flagname, int32_t value) {
265
0
  if (value < 0) {
266
0
    fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n",
267
0
            flagname, value);
268
0
    return false;
269
0
  }
270
0
  return true;
271
0
}
272
DEFINE_int32(reopen, 10, "Number of times database reopens");
273
static const bool FLAGS_reopen_dummy __attribute__((unused)) =
274
    RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive);
275
276
DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. "
277
             "Negative means use default settings.");
278
279
DEFINE_bool(use_block_based_filter, false, "use block based filter"
280
              "instead of full filter for block based table");
281
282
DEFINE_string(db, "", "Use the db with the following name.");
283
284
DEFINE_bool(verify_checksum, false,
285
            "Verify checksum for every block read from storage");
286
287
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
288
            "Allow reads to occur via mmap-ing files");
289
290
// Database statistics
291
static std::shared_ptr<rocksdb::Statistics> dbstats;
292
DEFINE_bool(statistics, false, "Create database statistics");
293
294
DEFINE_bool(sync, false, "Sync all writes to disk");
295
296
DEFINE_bool(disable_data_sync, false,
297
            "If true, do not wait until data is synced to disk.");
298
299
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
300
301
DEFINE_int32(kill_random_test, 0,
302
             "If non-zero, kill at various points in source code with "
303
             "probability 1/this");
304
static const bool FLAGS_kill_random_test_dummy __attribute__((unused)) =
305
    RegisterFlagValidator(&FLAGS_kill_random_test, &ValidateInt32Positive);
306
extern int rocksdb_kill_odds;
307
308
DEFINE_string(kill_prefix_blacklist, "",
309
              "If non-empty, kill points with prefix in the list given will be"
310
              " skipped. Items are comma-separated.");
311
extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
312
313
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
314
315
DEFINE_int32(target_file_size_base, 64 * KB,
316
             "Target level-1 file size for compaction");
317
318
DEFINE_int32(target_file_size_multiplier, 1,
319
             "A multiplier to compute target level-N file size (N >= 2)");
320
321
DEFINE_uint64(max_bytes_for_level_base, 256 * KB, "Max bytes for level-1");
322
323
DEFINE_int32(max_bytes_for_level_multiplier, 2,
324
             "A multiplier to compute max bytes for level-N (N >= 2)");
325
326
0
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
327
0
  if (value < 0 || value>100) {
328
0
    fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
329
0
            flagname, value);
330
0
    return false;
331
0
  }
332
0
  return true;
333
0
}
334
DEFINE_int32(readpercent, 10,
335
             "Ratio of reads to total workload (expressed as a percentage)");
336
static const bool FLAGS_readpercent_dummy __attribute__((unused)) =
337
    RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent);
338
339
DEFINE_int32(prefixpercent, 20,
340
             "Ratio of prefix iterators to total workload (expressed as a"
341
             " percentage)");
342
static const bool FLAGS_prefixpercent_dummy __attribute__((unused)) =
343
    RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent);
344
345
DEFINE_int32(writepercent, 45,
346
             "Ratio of writes to total workload (expressed as a percentage)");
347
static const bool FLAGS_writepercent_dummy __attribute__((unused)) =
348
    RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent);
349
350
DEFINE_int32(delpercent, 15,
351
             "Ratio of deletes to total workload (expressed as a percentage)");
352
static const bool FLAGS_delpercent_dummy __attribute__((unused)) =
353
    RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent);
354
355
DEFINE_int32(nooverwritepercent, 60,
356
             "Ratio of keys without overwrite to total workload (expressed as "
357
             " a percentage)");
358
static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) =
359
    RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent);
360
361
DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload"
362
             " (expressed as a percentage)");
363
static const bool FLAGS_iterpercent_dummy __attribute__((unused)) =
364
    RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent);
365
366
DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
367
static const bool FLAGS_num_iterations_dummy __attribute__((unused)) =
368
    RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
369
370
namespace {
371
0
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
372
0
  assert(ctype);
373
374
0
  if (!strcasecmp(ctype, "none"))
375
0
    return rocksdb::kNoCompression;
376
0
  else if (!strcasecmp(ctype, "snappy"))
377
0
    return rocksdb::kSnappyCompression;
378
0
  else if (!strcasecmp(ctype, "zlib"))
379
0
    return rocksdb::kZlibCompression;
380
0
  else if (!strcasecmp(ctype, "bzip2"))
381
0
    return rocksdb::kBZip2Compression;
382
0
  else if (!strcasecmp(ctype, "lz4"))
383
0
    return rocksdb::kLZ4Compression;
384
0
  else if (!strcasecmp(ctype, "lz4hc"))
385
0
    return rocksdb::kLZ4HCCompression;
386
0
  else if (!strcasecmp(ctype, "zstd"))
387
0
    return rocksdb::kZSTDNotFinalCompression;
388
389
0
  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
390
0
  return rocksdb::kSnappyCompression; // default value
391
0
}
392
393
0
std::vector<std::string> SplitString(std::string src) {
394
0
  std::vector<std::string> ret;
395
0
  if (src.empty()) {
396
0
    return ret;
397
0
  }
398
0
  size_t pos = 0;
399
0
  size_t pos_comma;
400
0
  while ((pos_comma = src.find(',', pos)) != std::string::npos) {
401
0
    ret.push_back(src.substr(pos, pos_comma - pos));
402
0
    pos = pos_comma + 1;
403
0
  }
404
0
  ret.push_back(src.substr(pos, src.length()));
405
0
  return ret;
406
0
}
407
}  // namespace
408
409
DEFINE_string(compression_type, "snappy",
410
              "Algorithm to use to compress the database");
411
static enum rocksdb::CompressionType FLAGS_compression_type_e =
412
    rocksdb::kSnappyCompression;
413
414
DEFINE_string(hdfs, "", "Name of hdfs environment");
415
// posix or hdfs environment
416
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
417
418
DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
419
static const bool FLAGS_ops_per_thread_dummy __attribute__((unused)) =
420
    RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
421
422
DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock");
423
static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((unused)) =
424
    RegisterFlagValidator(&FLAGS_log2_keys_per_lock, &ValidateUint32Range);
425
426
DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop"
427
            " the delete if key not present");
428
429
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
430
431
enum RepFactory {
432
  kSkipList,
433
  kHashSkipList,
434
  kVectorRep
435
};
436
437
namespace {
438
0
enum RepFactory StringToRepFactory(const char* ctype) {
439
0
  assert(ctype);
440
441
0
  if (!strcasecmp(ctype, "skip_list"))
442
0
    return kSkipList;
443
0
  else if (!strcasecmp(ctype, "prefix_hash"))
444
0
    return kHashSkipList;
445
0
  else if (!strcasecmp(ctype, "vector"))
446
0
    return kVectorRep;
447
448
0
  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
449
0
  return kSkipList;
450
0
}
451
}  // namespace
452
453
static enum RepFactory FLAGS_rep_factory;
454
DEFINE_string(memtablerep, "prefix_hash", "");
455
456
0
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
457
0
  if (value < 0 || value > 8) {
458
0
    fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
459
0
            flagname, value);
460
0
    return false;
461
0
  }
462
0
  return true;
463
0
}
464
DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep");
465
static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
466
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
467
468
DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge "
469
            "that behaves like a Put");
470
471
472
namespace rocksdb {
473
474
// convert long to a big-endian slice key
475
0
static std::string Key(int64_t val) {
476
0
  std::string little_endian_key;
477
0
  std::string big_endian_key;
478
0
  PutFixed64(&little_endian_key, val);
479
0
  assert(little_endian_key.size() == sizeof(val));
480
0
  big_endian_key.resize(sizeof(val));
481
0
  for (size_t i = 0 ; i < sizeof(val); ++i) {
482
0
    big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
483
0
  }
484
0
  return big_endian_key;
485
0
}
486
487
0
static std::string StringToHex(const std::string& str) {
488
0
  std::string result = "0x";
489
0
  char buf[10];
490
0
  for (size_t i = 0; i < str.length(); i++) {
491
0
    snprintf(buf, sizeof(buf), "%02X", (unsigned char)str[i]);
492
0
    result += buf;
493
0
  }
494
0
  return result;
495
0
}
496
497
498
class StressTest;
499
namespace {
500
501
class Stats {
502
 private:
503
  uint64_t start_;
504
  uint64_t finish_;
505
  double  seconds_;
506
  int64_t done_;
507
  int64_t gets_;
508
  int64_t prefixes_;
509
  int64_t writes_;
510
  int64_t deletes_;
511
  size_t single_deletes_;
512
  int64_t iterator_size_sums_;
513
  int64_t founds_;
514
  int64_t iterations_;
515
  int64_t errors_;
516
  int next_report_;
517
  size_t bytes_;
518
  uint64_t last_op_finish_;
519
  HistogramImpl hist_;
520
521
 public:
522
0
  Stats() { }
523
524
0
  void Start() {
525
0
    next_report_ = 100;
526
0
    hist_.Clear();
527
0
    done_ = 0;
528
0
    gets_ = 0;
529
0
    prefixes_ = 0;
530
0
    writes_ = 0;
531
0
    deletes_ = 0;
532
0
    single_deletes_ = 0;
533
0
    iterator_size_sums_ = 0;
534
0
    founds_ = 0;
535
0
    iterations_ = 0;
536
0
    errors_ = 0;
537
0
    bytes_ = 0;
538
0
    seconds_ = 0;
539
0
    start_ = FLAGS_env->NowMicros();
540
0
    last_op_finish_ = start_;
541
0
    finish_ = start_;
542
0
  }
543
544
0
  void Merge(const Stats& other) {
545
0
    hist_.Merge(other.hist_);
546
0
    done_ += other.done_;
547
0
    gets_ += other.gets_;
548
0
    prefixes_ += other.prefixes_;
549
0
    writes_ += other.writes_;
550
0
    deletes_ += other.deletes_;
551
0
    single_deletes_ += other.single_deletes_;
552
0
    iterator_size_sums_ += other.iterator_size_sums_;
553
0
    founds_ += other.founds_;
554
0
    iterations_ += other.iterations_;
555
0
    errors_ += other.errors_;
556
0
    bytes_ += other.bytes_;
557
0
    seconds_ += other.seconds_;
558
0
    if (other.start_ < start_) start_ = other.start_;
559
0
    if (other.finish_ > finish_) finish_ = other.finish_;
560
0
  }
561
562
0
  void Stop() {
563
0
    finish_ = FLAGS_env->NowMicros();
564
0
    seconds_ = (finish_ - start_) * 1e-6;
565
0
  }
566
567
0
  void FinishedSingleOp() {
568
0
    if (FLAGS_histogram) {
569
0
      auto now = FLAGS_env->NowMicros();
570
0
      auto micros = now - last_op_finish_;
571
0
      hist_.Add(micros);
572
0
      if (micros > 20000) {
573
0
        fprintf(stdout, "long op: %" PRIu64 " micros%30s\r", micros, "");
574
0
      }
575
0
      last_op_finish_ = now;
576
0
    }
577
578
0
      done_++;
579
0
    if (FLAGS_progress_reports) {
580
0
      if (done_ >= next_report_) {
581
0
        if      (next_report_ < 1000)   next_report_ += 100;
582
0
        else if (next_report_ < 5000)   next_report_ += 500;
583
0
        else if (next_report_ < 10000)  next_report_ += 1000;
584
0
        else if (next_report_ < 50000)  next_report_ += 5000;
585
0
        else if (next_report_ < 100000) next_report_ += 10000;
586
0
        else if (next_report_ < 500000) next_report_ += 50000;
587
0
        else                            next_report_ += 100000;
588
0
        fprintf(stdout, "... finished %" PRId64 " ops%30s\r", done_, "");
589
0
      }
590
0
    }
591
0
  }
592
593
0
  void AddBytesForWrites(int nwrites, size_t nbytes) {
594
0
    writes_ += nwrites;
595
0
    bytes_ += nbytes;
596
0
  }
597
598
0
  void AddGets(int ngets, int nfounds) {
599
0
    founds_ += nfounds;
600
0
    gets_ += ngets;
601
0
  }
602
603
0
  void AddPrefixes(int nprefixes, int count) {
604
0
    prefixes_ += nprefixes;
605
0
    iterator_size_sums_ += count;
606
0
  }
607
608
0
  void AddIterations(int n) {
609
0
    iterations_ += n;
610
0
  }
611
612
0
  void AddDeletes(int n) {
613
0
    deletes_ += n;
614
0
  }
615
616
0
  void AddSingleDeletes(size_t n) { single_deletes_ += n; }
617
618
0
  void AddErrors(int n) {
619
0
    errors_ += n;
620
0
  }
621
622
0
  void Report(const char* name) {
623
0
    std::string extra;
624
0
    if (bytes_ < 1 || done_ < 1) {
625
0
      fprintf(stderr, "No writes or ops?\n");
626
0
      return;
627
0
    }
628
629
0
    double elapsed = (finish_ - start_) * 1e-6;
630
0
    double bytes_mb = bytes_ / 1048576.0;
631
0
    double rate = bytes_mb / elapsed;
632
0
    int64_t throughput = static_cast<int64_t>(done_ / elapsed);
633
634
0
    fprintf(stdout, "%-12s: ", name);
635
0
    fprintf(stdout, "%.3f micros/op %" PRId64 " ops/sec\n",
636
0
            seconds_ * 1e6 / done_, throughput);
637
0
    fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%" PRId64 "%% of %" PRId64 " ops)\n",
638
0
            "", bytes_mb, rate, (100 * writes_) / done_, done_);
639
0
    fprintf(stdout, "%-12s: Wrote %" PRId64 " times\n", "", writes_);
640
0
    fprintf(stdout, "%-12s: Deleted %" PRId64 " times\n", "", deletes_);
641
0
    fprintf(stdout, "%-12s: Single deleted %" ROCKSDB_PRIszt " times\n", "",
642
0
           single_deletes_);
643
0
    fprintf(stdout, "%-12s: %" PRId64 " read and %" PRId64 " found the key\n", "",
644
0
            gets_, founds_);
645
0
    fprintf(stdout, "%-12s: Prefix scanned %" PRId64 " times\n", "", prefixes_);
646
0
    fprintf(stdout, "%-12s: Iterator size sum is %" PRId64 "\n", "", iterator_size_sums_);
647
0
    fprintf(stdout, "%-12s: Iterated %" PRId64 " times\n", "", iterations_);
648
0
    fprintf(stdout, "%-12s: Got errors %" PRId64 " times\n", "", errors_);
649
650
0
    if (FLAGS_histogram) {
651
0
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
652
0
    }
653
0
    fflush(stdout);
654
0
  }
655
};
656
657
// State shared by all concurrent executions of the same benchmark.
658
class SharedState {
659
 public:
660
  static const uint32_t SENTINEL;
661
662
  explicit SharedState(StressTest* stress_test)
663
      : cv_(&mu_),
664
        seed_(static_cast<uint32_t>(FLAGS_seed)),
665
        max_key_(FLAGS_max_key),
666
        log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
667
        num_threads_(FLAGS_threads),
668
        num_initialized_(0),
669
        num_populated_(0),
670
        vote_reopen_(0),
671
        num_done_(0),
672
        start_(false),
673
        start_verify_(false),
674
        should_stop_bg_thread_(false),
675
        bg_thread_finished_(false),
676
        stress_test_(stress_test),
677
        verification_failure_(false),
678
0
        no_overwrite_ids_(FLAGS_column_families) {
679
    // Pick random keys in each column family that will not experience
680
    // overwrite
681
682
0
    printf("Choosing random keys with no overwrite\n");
683
0
    Random rnd(seed_);
684
0
    size_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
685
0
    for (auto& cf_ids : no_overwrite_ids_) {
686
0
      for (size_t i = 0; i < num_no_overwrite_keys; i++) {
687
0
        size_t rand_key;
688
0
        do {
689
0
          rand_key = rnd.Next() % max_key_;
690
0
        } while (cf_ids.find(rand_key) != cf_ids.end());
691
0
        cf_ids.insert(rand_key);
692
0
      }
693
0
      assert(cf_ids.size() == num_no_overwrite_keys);
694
0
    }
695
696
0
    if (FLAGS_test_batches_snapshots) {
697
0
      fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
698
0
      return;
699
0
    }
700
0
    values_.resize(FLAGS_column_families);
701
702
0
    for (int i = 0; i < FLAGS_column_families; ++i) {
703
0
      values_[i] = std::vector<uint32_t>(max_key_, SENTINEL);
704
0
    }
705
706
0
    auto num_locks = max_key_ >> log2_keys_per_lock_;
707
0
    if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
708
0
      num_locks++;
709
0
    }
710
0
    fprintf(stdout, "Creating %" PRId64 " locks\n", num_locks * FLAGS_column_families);
711
0
    key_locks_.resize(FLAGS_column_families);
712
713
0
    for (int i = 0; i < FLAGS_column_families; ++i) {
714
0
      key_locks_[i].resize(num_locks);
715
0
      for (auto& ptr : key_locks_[i]) {
716
0
        ptr.reset(new port::Mutex);
717
0
      }
718
0
    }
719
0
  }
720
721
0
  ~SharedState() {}
722
723
0
  port::Mutex* GetMutex() {
724
0
    return &mu_;
725
0
  }
726
727
0
  port::CondVar* GetCondVar() {
728
0
    return &cv_;
729
0
  }
730
731
0
  StressTest* GetStressTest() const {
732
0
    return stress_test_;
733
0
  }
734
735
0
  int64_t GetMaxKey() const {
736
0
    return max_key_;
737
0
  }
738
739
0
  uint32_t GetNumThreads() const {
740
0
    return num_threads_;
741
0
  }
742
743
0
  void IncInitialized() {
744
0
    num_initialized_++;
745
0
  }
746
747
0
  void IncOperated() {
748
0
    num_populated_++;
749
0
  }
750
751
0
  void IncDone() {
752
0
    num_done_++;
753
0
  }
754
755
0
  void IncVotedReopen() {
756
0
    vote_reopen_ = (vote_reopen_ + 1) % num_threads_;
757
0
  }
758
759
0
  bool AllInitialized() const {
760
0
    return num_initialized_ >= num_threads_;
761
0
  }
762
763
0
  bool AllOperated() const {
764
0
    return num_populated_ >= num_threads_;
765
0
  }
766
767
0
  bool AllDone() const {
768
0
    return num_done_ >= num_threads_;
769
0
  }
770
771
0
  bool AllVotedReopen() {
772
0
    return (vote_reopen_ == 0);
773
0
  }
774
775
0
  void SetStart() {
776
0
    start_ = true;
777
0
  }
778
779
0
  void SetStartVerify() {
780
0
    start_verify_ = true;
781
0
  }
782
783
0
  bool Started() const {
784
0
    return start_;
785
0
  }
786
787
0
  bool VerifyStarted() const {
788
0
    return start_verify_;
789
0
  }
790
791
0
  void SetVerificationFailure() { verification_failure_.store(true); }
792
793
0
  bool HasVerificationFailedYet() { return verification_failure_.load(); }
794
795
0
  port::Mutex* GetMutexForKey(int cf, size_t key) {
796
0
    return key_locks_[cf][key >> log2_keys_per_lock_].get();
797
0
  }
798
799
0
  void LockColumnFamily(int cf) {
800
0
    for (auto& mutex : key_locks_[cf]) {
801
0
      mutex->Lock();
802
0
    }
803
0
  }
804
805
0
  void UnlockColumnFamily(int cf) {
806
0
    for (auto& mutex : key_locks_[cf]) {
807
0
      mutex->Unlock();
808
0
    }
809
0
  }
810
811
0
  void ClearColumnFamily(int cf) {
812
0
    std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL);
813
0
  }
814
815
0
  void Put(int cf, int64_t key, uint32_t value_base) {
816
0
    values_[cf][key] = value_base;
817
0
  }
818
819
0
  uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; }
820
821
0
  void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; }
822
823
0
  void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; }
824
825
0
  bool AllowsOverwrite(int cf, int64_t key) {
826
0
    return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end();
827
0
  }
828
829
0
  bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; }
830
831
0
  uint32_t GetSeed() const { return seed_; }
832
833
0
  void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
834
835
0
  bool ShoudStopBgThread() { return should_stop_bg_thread_; }
836
837
0
  void SetBgThreadFinish() { bg_thread_finished_ = true; }
838
839
0
  bool BgThreadFinished() const { return bg_thread_finished_; }
840
841
 private:
842
  port::Mutex mu_;
843
  port::CondVar cv_;
844
  const uint32_t seed_;
845
  const int64_t max_key_;
846
  const uint32_t log2_keys_per_lock_;
847
  const int num_threads_;
848
  int64_t num_initialized_;
849
  int64_t num_populated_;
850
  int64_t vote_reopen_;
851
  int64_t num_done_;
852
  bool start_;
853
  bool start_verify_;
854
  bool should_stop_bg_thread_;
855
  bool bg_thread_finished_;
856
  StressTest* stress_test_;
857
  std::atomic<bool> verification_failure_;
858
859
  // Keys that should not be overwritten
860
  std::vector<std::set<size_t> > no_overwrite_ids_;
861
862
  std::vector<std::vector<uint32_t>> values_;
863
  // Has to make it owned by a smart ptr as port::Mutex is not copyable
864
  // and storing it in the container may require copying depending on the impl.
865
  std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
866
};
867
868
const uint32_t SharedState::SENTINEL = 0xffffffff;
869
870
// Per-thread state for concurrent executions of the same benchmark.
871
struct ThreadState {
872
  uint32_t tid; // 0..n-1
873
  Random rand;  // Has different seeds for different threads
874
  SharedState* shared;
875
  Stats stats;
876
877
  ThreadState(uint32_t index, SharedState* _shared)
878
0
      : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
879
};
880
881
class DbStressListener : public EventListener {
882
 public:
883
  DbStressListener(
884
      const std::string& db_name,
885
      const std::vector<DbPath>& db_paths) :
886
      db_name_(db_name),
887
      db_paths_(db_paths),
888
0
      rand_(301) {}
889
0
  virtual ~DbStressListener() {}
890
#ifndef ROCKSDB_LITE
891
  virtual void OnFlushCompleted(
892
0
      DB* db, const FlushJobInfo& info) override {
893
0
    assert(db);
894
0
    assert(db->GetName() == db_name_);
895
0
    assert(IsValidColumnFamilyName(info.cf_name));
896
0
    VerifyFilePath(info.file_path);
897
    // pretending doing some work here
898
0
    std::this_thread::sleep_for(
899
0
        std::chrono::microseconds(rand_.Uniform(5000)));
900
0
  }
901
902
  virtual void OnCompactionCompleted(
903
0
      DB *db, const CompactionJobInfo& ci) override {
904
0
    assert(db);
905
0
    assert(db->GetName() == db_name_);
906
0
    assert(IsValidColumnFamilyName(ci.cf_name));
907
0
    assert(ci.input_files.size() + ci.output_files.size() > 0U);
908
0
    for (const auto& file_path : ci.input_files) {
909
0
      VerifyFilePath(file_path);
910
0
    }
911
0
    for (const auto& file_path : ci.output_files) {
912
0
      VerifyFilePath(file_path);
913
0
    }
914
    // pretending doing some work here
915
0
    std::this_thread::sleep_for(
916
0
        std::chrono::microseconds(rand_.Uniform(5000)));
917
0
  }
918
919
  virtual void OnTableFileCreated(
920
0
      const TableFileCreationInfo& info) override {
921
0
    assert(info.db_name == db_name_);
922
0
    assert(IsValidColumnFamilyName(info.cf_name));
923
0
    VerifyFilePath(info.file_path);
924
0
    assert(info.file_size > 0);
925
0
    assert(info.job_id > 0);
926
0
    assert(info.table_properties.data_size > 0);
927
0
    assert(info.table_properties.raw_key_size > 0);
928
0
    assert(info.table_properties.num_entries > 0);
929
0
  }
930
931
 protected:
932
0
  bool IsValidColumnFamilyName(const std::string& cf_name) const {
933
0
    if (cf_name == kDefaultColumnFamilyName) {
934
0
      return true;
935
0
    }
936
    // The column family names in the stress tests are numbers.
937
0
    for (size_t i = 0; i < cf_name.size(); ++i) {
938
0
      if (cf_name[i] < '0' || cf_name[i] > '9') {
939
0
        return false;
940
0
      }
941
0
    }
942
0
    return true;
943
0
  }
944
945
0
  void VerifyFileDir(const std::string& file_dir) {
946
0
#ifndef NDEBUG
947
0
    if (db_name_ == file_dir) {
948
0
      return;
949
0
    }
950
0
    for (const auto& db_path : db_paths_) {
951
0
      if (db_path.path == file_dir) {
952
0
        return;
953
0
      }
954
0
    }
955
0
    assert(false);
956
0
#endif  // !NDEBUG
957
0
  }
958
959
0
  void VerifyFileName(const std::string& file_name) {
960
0
#ifndef NDEBUG
961
0
    uint64_t file_number;
962
0
    FileType file_type;
963
0
    bool result = ParseFileName(file_name, &file_number, &file_type);
964
0
    assert(result);
965
0
    assert(file_type == kTableFile);
966
0
#endif  // !NDEBUG
967
0
  }
968
969
0
  void VerifyFilePath(const std::string& file_path) {
970
0
#ifndef NDEBUG
971
0
    size_t pos = file_path.find_last_of("/");
972
0
    if (pos == std::string::npos) {
973
0
      VerifyFileName(file_path);
974
0
    } else {
975
0
      if (pos > 0) {
976
0
        VerifyFileDir(file_path.substr(0, pos));
977
0
      }
978
0
      VerifyFileName(file_path.substr(pos));
979
0
    }
980
0
#endif  // !NDEBUG
981
0
  }
982
#endif  // !ROCKSDB_LITE
983
984
 private:
985
  std::string db_name_;
986
  std::vector<DbPath> db_paths_;
987
  Random rand_;
988
};
989
990
}  // namespace
991
992
class StressTest {
993
 public:
994
  StressTest()
995
      : cache_(NewLRUCache(FLAGS_cache_size)),
996
        compressed_cache_(FLAGS_compressed_cache_size >= 0
997
                              ? NewLRUCache(FLAGS_compressed_cache_size)
998
                              : nullptr),
999
        filter_policy_(FLAGS_bloom_bits >= 0
1000
                   ? NewBloomFilterPolicy(FLAGS_bloom_bits, FLAGS_use_block_based_filter)
1001
                   : nullptr),
1002
        db_(nullptr),
1003
        new_column_family_name_(1),
1004
0
        num_times_reopened_(0) {
1005
0
    if (FLAGS_destroy_db_initially) {
1006
0
      std::vector<std::string> files;
1007
0
      CHECK_OK(FLAGS_env->GetChildren(FLAGS_db, &files));
1008
0
      for (unsigned int i = 0; i < files.size(); i++) {
1009
0
        if (Slice(files[i]).starts_with("heap-")) {
1010
0
          CHECK_OK(FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]));
1011
0
        }
1012
0
      }
1013
0
      CHECK_OK(DestroyDB(FLAGS_db, Options()));
1014
0
    }
1015
0
  }
1016
1017
0
  ~StressTest() {
1018
0
    for (auto cf : column_families_) {
1019
0
      delete cf;
1020
0
    }
1021
0
    column_families_.clear();
1022
0
    delete db_;
1023
0
  }
1024
1025
0
  bool BuildOptionsTable() {
1026
0
    if (FLAGS_set_options_one_in <= 0) {
1027
0
      return true;
1028
0
    }
1029
1030
0
    std::unordered_map<std::string, std::vector<std::string> > options_tbl = {
1031
0
        {"write_buffer_size",
1032
0
         {ToString(FLAGS_write_buffer_size),
1033
0
          ToString(FLAGS_write_buffer_size * 2),
1034
0
          ToString(FLAGS_write_buffer_size * 4)}},
1035
0
        {"max_write_buffer_number",
1036
0
         {ToString(FLAGS_max_write_buffer_number),
1037
0
          ToString(FLAGS_max_write_buffer_number * 2),
1038
0
          ToString(FLAGS_max_write_buffer_number * 4)}},
1039
0
        {"arena_block_size",
1040
0
         {
1041
0
             ToString(Options().arena_block_size),
1042
0
             ToString(FLAGS_write_buffer_size / 4),
1043
0
             ToString(FLAGS_write_buffer_size / 8),
1044
0
         }},
1045
0
        {"memtable_prefix_bloom_bits", {"0", "8", "10"}},
1046
0
        {"memtable_prefix_bloom_probes", {"4", "5", "6"}},
1047
0
        {"memtable_prefix_bloom_huge_page_tlb_size",
1048
0
         {"0", ToString(2 * 1024 * 1024)}},
1049
0
        {"max_successive_merges", {"0", "2", "4"}},
1050
0
        {"filter_deletes", {"0", "1"}},
1051
0
        {"inplace_update_num_locks", {"100", "200", "300"}},
1052
        // TODO(ljin): enable test for this option
1053
        // {"disable_auto_compactions", {"100", "200", "300"}},
1054
0
        {"soft_rate_limit", {"0", "0.5", "0.9"}},
1055
0
        {"hard_rate_limit", {"0", "1.1", "2.0"}},
1056
0
        {"level0_file_num_compaction_trigger",
1057
0
         {
1058
0
             ToString(FLAGS_level0_file_num_compaction_trigger),
1059
0
             ToString(FLAGS_level0_file_num_compaction_trigger + 2),
1060
0
             ToString(FLAGS_level0_file_num_compaction_trigger + 4),
1061
0
         }},
1062
0
        {"level0_slowdown_writes_trigger",
1063
0
         {
1064
0
             ToString(FLAGS_level0_slowdown_writes_trigger),
1065
0
             ToString(FLAGS_level0_slowdown_writes_trigger + 2),
1066
0
             ToString(FLAGS_level0_slowdown_writes_trigger + 4),
1067
0
         }},
1068
0
        {"level0_stop_writes_trigger",
1069
0
         {
1070
0
             ToString(FLAGS_level0_stop_writes_trigger),
1071
0
             ToString(FLAGS_level0_stop_writes_trigger + 2),
1072
0
             ToString(FLAGS_level0_stop_writes_trigger + 4),
1073
0
         }},
1074
0
        {"max_grandparent_overlap_factor",
1075
0
         {
1076
0
             ToString(Options().max_grandparent_overlap_factor - 5),
1077
0
             ToString(Options().max_grandparent_overlap_factor),
1078
0
             ToString(Options().max_grandparent_overlap_factor + 5),
1079
0
         }},
1080
0
        {"expanded_compaction_factor",
1081
0
         {
1082
0
             ToString(Options().expanded_compaction_factor - 5),
1083
0
             ToString(Options().expanded_compaction_factor),
1084
0
             ToString(Options().expanded_compaction_factor + 5),
1085
0
         }},
1086
0
        {"source_compaction_factor",
1087
0
         {
1088
0
             ToString(Options().source_compaction_factor),
1089
0
             ToString(Options().source_compaction_factor * 2),
1090
0
             ToString(Options().source_compaction_factor * 4),
1091
0
         }},
1092
0
        {"target_file_size_base",
1093
0
         {
1094
0
             ToString(FLAGS_target_file_size_base),
1095
0
             ToString(FLAGS_target_file_size_base * 2),
1096
0
             ToString(FLAGS_target_file_size_base * 4),
1097
0
         }},
1098
0
        {"target_file_size_multiplier",
1099
0
         {
1100
0
             ToString(FLAGS_target_file_size_multiplier), "1", "2",
1101
0
         }},
1102
0
        {"max_bytes_for_level_base",
1103
0
         {
1104
0
             ToString(FLAGS_max_bytes_for_level_base / 2),
1105
0
             ToString(FLAGS_max_bytes_for_level_base),
1106
0
             ToString(FLAGS_max_bytes_for_level_base * 2),
1107
0
         }},
1108
0
        {"max_bytes_for_level_multiplier",
1109
0
         {
1110
0
             ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2",
1111
0
         }},
1112
0
        {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
1113
0
    };
1114
1115
0
    options_table_ = std::move(options_tbl);
1116
1117
0
    for (const auto& iter : options_table_) {
1118
0
      options_index_.push_back(iter.first);
1119
0
    }
1120
0
    return true;
1121
0
  }
1122
1123
0
  bool Run() {
1124
0
    PrintEnv();
1125
0
    BuildOptionsTable();
1126
0
    Open();
1127
0
    SharedState shared(this);
1128
0
    uint32_t n = shared.GetNumThreads();
1129
1130
0
    std::vector<ThreadState*> threads(n);
1131
0
    for (uint32_t i = 0; i < n; i++) {
1132
0
      threads[i] = new ThreadState(i, &shared);
1133
0
      FLAGS_env->StartThread(ThreadBody, threads[i]);
1134
0
    }
1135
0
    ThreadState bg_thread(0, &shared);
1136
0
    if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1137
0
      FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
1138
0
    }
1139
1140
    // Each thread goes through the following states:
1141
    // initializing -> wait for others to init -> read/populate/depopulate
1142
    // wait for others to operate -> verify -> done
1143
1144
0
    {
1145
0
      MutexLock l(shared.GetMutex());
1146
0
      while (!shared.AllInitialized()) {
1147
0
        shared.GetCondVar()->Wait();
1148
0
      }
1149
1150
0
      auto now = FLAGS_env->NowMicros();
1151
0
      fprintf(stdout, "%s Starting database operations\n",
1152
0
              FLAGS_env->TimeToString(now/1000000).c_str());
1153
1154
0
      shared.SetStart();
1155
0
      shared.GetCondVar()->SignalAll();
1156
0
      while (!shared.AllOperated()) {
1157
0
        shared.GetCondVar()->Wait();
1158
0
      }
1159
1160
0
      now = FLAGS_env->NowMicros();
1161
0
      if (FLAGS_test_batches_snapshots) {
1162
0
        fprintf(stdout, "%s Limited verification already done during gets\n",
1163
0
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1164
0
      } else {
1165
0
        fprintf(stdout, "%s Starting verification\n",
1166
0
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
1167
0
      }
1168
1169
0
      shared.SetStartVerify();
1170
0
      shared.GetCondVar()->SignalAll();
1171
0
      while (!shared.AllDone()) {
1172
0
        shared.GetCondVar()->Wait();
1173
0
      }
1174
0
    }
1175
1176
0
    for (unsigned int i = 1; i < n; i++) {
1177
0
      threads[0]->stats.Merge(threads[i]->stats);
1178
0
    }
1179
0
    threads[0]->stats.Report("Stress Test");
1180
1181
0
    for (unsigned int i = 0; i < n; i++) {
1182
0
      delete threads[i];
1183
0
      threads[i] = nullptr;
1184
0
    }
1185
0
    auto now = FLAGS_env->NowMicros();
1186
0
    if (!FLAGS_test_batches_snapshots) {
1187
0
      fprintf(stdout, "%s Verification successful\n",
1188
0
              FLAGS_env->TimeToString(now/1000000).c_str());
1189
0
    }
1190
0
    PrintStatistics();
1191
1192
0
    if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
1193
0
      MutexLock l(shared.GetMutex());
1194
0
      shared.SetShouldStopBgThread();
1195
0
      while (!shared.BgThreadFinished()) {
1196
0
        shared.GetCondVar()->Wait();
1197
0
      }
1198
0
    }
1199
1200
0
    if (shared.HasVerificationFailedYet()) {
1201
0
      printf("Verification failed :(\n");
1202
0
      return false;
1203
0
    }
1204
0
    return true;
1205
0
  }
1206
1207
 private:
1208
1209
0
  static void ThreadBody(void* v) {
1210
0
    ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1211
0
    SharedState* shared = thread->shared;
1212
1213
0
    {
1214
0
      MutexLock l(shared->GetMutex());
1215
0
      shared->IncInitialized();
1216
0
      if (shared->AllInitialized()) {
1217
0
        shared->GetCondVar()->SignalAll();
1218
0
      }
1219
0
      while (!shared->Started()) {
1220
0
        shared->GetCondVar()->Wait();
1221
0
      }
1222
0
    }
1223
0
    thread->shared->GetStressTest()->OperateDb(thread);
1224
1225
0
    {
1226
0
      MutexLock l(shared->GetMutex());
1227
0
      shared->IncOperated();
1228
0
      if (shared->AllOperated()) {
1229
0
        shared->GetCondVar()->SignalAll();
1230
0
      }
1231
0
      while (!shared->VerifyStarted()) {
1232
0
        shared->GetCondVar()->Wait();
1233
0
      }
1234
0
    }
1235
1236
0
    if (!FLAGS_test_batches_snapshots) {
1237
0
      thread->shared->GetStressTest()->VerifyDb(thread);
1238
0
    }
1239
1240
0
    {
1241
0
      MutexLock l(shared->GetMutex());
1242
0
      shared->IncDone();
1243
0
      if (shared->AllDone()) {
1244
0
        shared->GetCondVar()->SignalAll();
1245
0
      }
1246
0
    }
1247
1248
0
  }
1249
1250
0
  static void PoolSizeChangeThread(void* v) {
1251
0
    assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
1252
0
    ThreadState* thread = reinterpret_cast<ThreadState*>(v);
1253
0
    SharedState* shared = thread->shared;
1254
1255
0
    while (true) {
1256
0
      {
1257
0
        MutexLock l(shared->GetMutex());
1258
0
        if (shared->ShoudStopBgThread()) {
1259
0
          shared->SetBgThreadFinish();
1260
0
          shared->GetCondVar()->SignalAll();
1261
0
          return;
1262
0
        }
1263
0
      }
1264
1265
0
      auto thread_pool_size_base = FLAGS_max_background_compactions;
1266
0
      auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
1267
0
      int new_thread_pool_size =
1268
0
          thread_pool_size_base - thread_pool_size_var +
1269
0
          thread->rand.Next() % (thread_pool_size_var * 2 + 1);
1270
0
      if (new_thread_pool_size < 1) {
1271
0
        new_thread_pool_size = 1;
1272
0
      }
1273
0
      FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
1274
      // Sleep up to 3 seconds
1275
0
      FLAGS_env->SleepForMicroseconds(
1276
0
          thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1277
0
              1000 +
1278
0
          1);
1279
0
    }
1280
0
  }
1281
1282
  // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
1283
  // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
1284
  // Also refer MultiGet.
1285
  Status MultiPut(ThreadState* thread, const WriteOptions& writeoptions,
1286
                  ColumnFamilyHandle* column_family, const Slice& key,
1287
0
                  const Slice& value, size_t sz) {
1288
0
    std::string keys[10] = {"9", "8", "7", "6", "5",
1289
0
                            "4", "3", "2", "1", "0"};
1290
0
    std::string values[10] = {"9", "8", "7", "6", "5",
1291
0
                              "4", "3", "2", "1", "0"};
1292
0
    Slice value_slices[10];
1293
0
    WriteBatch batch;
1294
0
    Status s;
1295
0
    for (int i = 0; i < 10; i++) {
1296
0
      keys[i] += key.ToString();
1297
0
      values[i] += value.ToString();
1298
0
      value_slices[i] = values[i];
1299
0
      if (FLAGS_use_merge) {
1300
0
        batch.Merge(column_family, keys[i], value_slices[i]);
1301
0
      } else {
1302
0
        batch.Put(column_family, keys[i], value_slices[i]);
1303
0
      }
1304
0
    }
1305
1306
0
    s = db_->Write(writeoptions, &batch);
1307
0
    if (!s.ok()) {
1308
0
      fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
1309
0
      thread->stats.AddErrors(1);
1310
0
    } else {
1311
      // we did 10 writes each of size sz + 1
1312
0
      thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
1313
0
    }
1314
1315
0
    return s;
1316
0
  }
1317
1318
  // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
1319
  // in DB atomically i.e in a single batch. Also refer MultiGet.
1320
  Status MultiDelete(ThreadState* thread, const WriteOptions& writeoptions,
1321
0
                     ColumnFamilyHandle* column_family, const Slice& key) {
1322
0
    std::string keys[10] = {"9", "7", "5", "3", "1",
1323
0
                            "8", "6", "4", "2", "0"};
1324
1325
0
    WriteBatch batch;
1326
0
    Status s;
1327
0
    for (int i = 0; i < 10; i++) {
1328
0
      keys[i] += key.ToString();
1329
0
      batch.Delete(column_family, keys[i]);
1330
0
    }
1331
1332
0
    s = db_->Write(writeoptions, &batch);
1333
0
    if (!s.ok()) {
1334
0
      fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
1335
0
      thread->stats.AddErrors(1);
1336
0
    } else {
1337
0
      thread->stats.AddDeletes(10);
1338
0
    }
1339
1340
0
    return s;
1341
0
  }
1342
1343
  // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
1344
  // in the same snapshot, and verifies that all the values are of the form
1345
  // "0"+V, "1"+V,..."9"+V.
1346
  // ASSUMES that MultiPut was used to put (K, V) into the DB.
1347
  Status MultiGet(ThreadState* thread, const ReadOptions& readoptions,
1348
                  ColumnFamilyHandle* column_family, const Slice& key,
1349
0
                  std::string* value) {
1350
0
    std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1351
0
    Slice key_slices[10];
1352
0
    std::string values[10];
1353
0
    ReadOptions readoptionscopy = readoptions;
1354
0
    readoptionscopy.snapshot = db_->GetSnapshot();
1355
0
    Status s;
1356
0
    for (int i = 0; i < 10; i++) {
1357
0
      keys[i] += key.ToString();
1358
0
      key_slices[i] = keys[i];
1359
0
      s = db_->Get(readoptionscopy, column_family, key_slices[i], value);
1360
0
      if (!s.ok() && !s.IsNotFound()) {
1361
0
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
1362
0
        values[i] = "";
1363
0
        thread->stats.AddErrors(1);
1364
        // we continue after error rather than exiting so that we can
1365
        // find more errors if any
1366
0
      } else if (s.IsNotFound()) {
1367
0
        values[i] = "";
1368
0
        thread->stats.AddGets(1, 0);
1369
0
      } else {
1370
0
        values[i] = *value;
1371
1372
0
        char expected_prefix = (keys[i])[0];
1373
0
        char actual_prefix = (values[i])[0];
1374
0
        if (actual_prefix != expected_prefix) {
1375
0
          fprintf(stderr, "error expected prefix = %c actual = %c\n",
1376
0
                  expected_prefix, actual_prefix);
1377
0
        }
1378
0
        (values[i])[0] = ' '; // blank out the differing character
1379
0
        thread->stats.AddGets(1, 1);
1380
0
      }
1381
0
    }
1382
0
    db_->ReleaseSnapshot(readoptionscopy.snapshot);
1383
1384
    // Now that we retrieved all values, check that they all match
1385
0
    for (int i = 1; i < 10; i++) {
1386
0
      if (values[i] != values[0]) {
1387
0
        fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
1388
0
                key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
1389
0
                StringToHex(values[i]).c_str());
1390
      // we continue after error rather than exiting so that we can
1391
      // find more errors if any
1392
0
      }
1393
0
    }
1394
1395
0
    return s;
1396
0
  }
1397
1398
  // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
1399
  // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
1400
  // of the key. Each of these 10 scans returns a series of values;
1401
  // each series should be the same length, and it is verified for each
1402
  // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
1403
  // ASSUMES that MultiPut was used to put (K, V)
1404
  Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
1405
                         ColumnFamilyHandle* column_family,
1406
0
                         const Slice& key) {
1407
0
    std::string prefixes[10] = {"0", "1", "2", "3", "4",
1408
0
                                "5", "6", "7", "8", "9"};
1409
0
    Slice prefix_slices[10];
1410
0
    ReadOptions readoptionscopy[10];
1411
0
    const Snapshot* snapshot = db_->GetSnapshot();
1412
0
    Iterator* iters[10];
1413
0
    Status s = Status::OK();
1414
0
    for (int i = 0; i < 10; i++) {
1415
0
      prefixes[i] += key.ToString();
1416
0
      prefixes[i].resize(FLAGS_prefix_size);
1417
0
      prefix_slices[i] = Slice(prefixes[i]);
1418
0
      readoptionscopy[i] = readoptions;
1419
0
      readoptionscopy[i].snapshot = snapshot;
1420
0
      iters[i] = db_->NewIterator(readoptionscopy[i], column_family);
1421
0
      iters[i]->Seek(prefix_slices[i]);
1422
0
    }
1423
1424
0
    int count = 0;
1425
0
    while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
1426
0
      count++;
1427
0
      std::string values[10];
1428
      // get list of all values for this iteration
1429
0
      for (int i = 0; i < 10; i++) {
1430
        // no iterator should finish before the first one
1431
0
        assert(iters[i]->Valid() &&
1432
0
               iters[i]->key().starts_with(prefix_slices[i]));
1433
0
        values[i] = iters[i]->value().ToString();
1434
1435
0
        char expected_first = (prefixes[i])[0];
1436
0
        char actual_first = (values[i])[0];
1437
1438
0
        if (actual_first != expected_first) {
1439
0
          fprintf(stderr, "error expected first = %c actual = %c\n",
1440
0
                  expected_first, actual_first);
1441
0
        }
1442
0
        (values[i])[0] = ' '; // blank out the differing character
1443
0
      }
1444
      // make sure all values are equivalent
1445
0
      for (int i = 0; i < 10; i++) {
1446
0
        if (values[i] != values[0]) {
1447
0
          fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n",
1448
0
                  i, prefixes[i].c_str(), StringToHex(values[0]).c_str(),
1449
0
                  StringToHex(values[i]).c_str());
1450
          // we continue after error rather than exiting so that we can
1451
          // find more errors if any
1452
0
        }
1453
0
        iters[i]->Next();
1454
0
      }
1455
0
    }
1456
1457
    // cleanup iterators and snapshot
1458
0
    for (int i = 0; i < 10; i++) {
1459
      // if the first iterator finished, they should have all finished
1460
0
      assert(!iters[i]->Valid() ||
1461
0
             !iters[i]->key().starts_with(prefix_slices[i]));
1462
0
      assert(iters[i]->status().ok());
1463
0
      delete iters[i];
1464
0
    }
1465
0
    db_->ReleaseSnapshot(snapshot);
1466
1467
0
    if (s.ok()) {
1468
0
      thread->stats.AddPrefixes(1, count);
1469
0
    } else {
1470
0
      thread->stats.AddErrors(1);
1471
0
    }
1472
1473
0
    return s;
1474
0
  }
1475
1476
  // Given a key K, this creates an iterator which scans to K and then
1477
  // does a random sequence of Next/Prev operations.
1478
  Status MultiIterate(ThreadState* thread, const ReadOptions& readoptions,
1479
0
                      ColumnFamilyHandle* column_family, const Slice& key) {
1480
0
    Status s;
1481
0
    const Snapshot* snapshot = db_->GetSnapshot();
1482
0
    ReadOptions readoptionscopy = readoptions;
1483
0
    readoptionscopy.snapshot = snapshot;
1484
0
    unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, column_family));
1485
1486
0
    iter->Seek(key);
1487
0
    for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1488
0
      if (thread->rand.OneIn(2)) {
1489
0
        iter->Next();
1490
0
      } else {
1491
0
        iter->Prev();
1492
0
      }
1493
0
    }
1494
1495
0
    if (s.ok()) {
1496
0
      thread->stats.AddIterations(1);
1497
0
    } else {
1498
0
      thread->stats.AddErrors(1);
1499
0
    }
1500
1501
0
    db_->ReleaseSnapshot(snapshot);
1502
1503
0
    return s;
1504
0
  }
1505
1506
0
  Status SetOptions(ThreadState* thread) {
1507
0
    assert(FLAGS_set_options_one_in > 0);
1508
0
    std::unordered_map<std::string, std::string> opts;
1509
0
    std::string name = options_index_[
1510
0
      thread->rand.Next() % options_index_.size()];
1511
0
    int value_idx = thread->rand.Next() % options_table_[name].size();
1512
0
    if (name == "soft_rate_limit" || name == "hard_rate_limit") {
1513
0
      opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
1514
0
      opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
1515
0
    } else if (name == "level0_file_num_compaction_trigger" ||
1516
0
               name == "level0_slowdown_writes_trigger" ||
1517
0
               name == "level0_stop_writes_trigger") {
1518
0
      opts["level0_file_num_compaction_trigger"] =
1519
0
        options_table_["level0_file_num_compaction_trigger"][value_idx];
1520
0
      opts["level0_slowdown_writes_trigger"] =
1521
0
        options_table_["level0_slowdown_writes_trigger"][value_idx];
1522
0
      opts["level0_stop_writes_trigger"] =
1523
0
        options_table_["level0_stop_writes_trigger"][value_idx];
1524
0
    } else {
1525
0
      opts[name] = options_table_[name][value_idx];
1526
0
    }
1527
1528
0
    int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
1529
0
    auto cfh = column_families_[rand_cf_idx];
1530
0
    return db_->SetOptions(cfh, opts);
1531
0
  }
1532
1533
0
  void OperateDb(ThreadState* thread) {
1534
0
    ReadOptions read_opts(FLAGS_verify_checksum, true);
1535
0
    WriteOptions write_opts;
1536
0
    auto shared = thread->shared;
1537
0
    char value[100];
1538
0
    auto max_key = thread->shared->GetMaxKey();
1539
0
    std::string from_db;
1540
0
    if (FLAGS_sync) {
1541
0
      write_opts.sync = true;
1542
0
    }
1543
0
    write_opts.disableWAL = FLAGS_disable_wal;
1544
0
    const int prefixBound = FLAGS_readpercent + FLAGS_prefixpercent;
1545
0
    const int writeBound = prefixBound + FLAGS_writepercent;
1546
0
    const int delBound = writeBound + FLAGS_delpercent;
1547
1548
0
    thread->stats.Start();
1549
0
    for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
1550
0
      if (thread->shared->HasVerificationFailedYet()) {
1551
0
        break;
1552
0
      }
1553
0
      if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
1554
0
        {
1555
0
          thread->stats.FinishedSingleOp();
1556
0
          MutexLock l(thread->shared->GetMutex());
1557
0
          thread->shared->IncVotedReopen();
1558
0
          if (thread->shared->AllVotedReopen()) {
1559
0
            thread->shared->GetStressTest()->Reopen();
1560
0
            thread->shared->GetCondVar()->SignalAll();
1561
0
          } else {
1562
0
            thread->shared->GetCondVar()->Wait();
1563
0
          }
1564
          // Commenting this out as we don't want to reset stats on each open.
1565
          // thread->stats.Start();
1566
0
        }
1567
0
      }
1568
1569
      // Change Options
1570
0
      if (FLAGS_set_options_one_in > 0 &&
1571
0
          thread->rand.OneIn(FLAGS_set_options_one_in)) {
1572
0
        CHECK_OK(SetOptions(thread));
1573
0
      }
1574
1575
0
      if (FLAGS_set_in_place_one_in > 0 &&
1576
0
          thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
1577
0
        options_.inplace_update_support ^= options_.inplace_update_support;
1578
0
      }
1579
1580
0
      if (!FLAGS_test_batches_snapshots &&
1581
0
          FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) {
1582
0
        if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) {
1583
          // drop column family and then create it again (can't drop default)
1584
0
          int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
1585
0
          std::string new_name =
1586
0
              ToString(new_column_family_name_.fetch_add(1));
1587
0
          {
1588
0
            MutexLock l(thread->shared->GetMutex());
1589
0
            fprintf(
1590
0
                stdout,
1591
0
                "[CF %d] Dropping and recreating column family. new name: %s\n",
1592
0
                cf, new_name.c_str());
1593
0
          }
1594
0
          thread->shared->LockColumnFamily(cf);
1595
0
          Status s __attribute__((unused));
1596
0
          s = db_->DropColumnFamily(column_families_[cf]);
1597
0
          delete column_families_[cf];
1598
0
          if (!s.ok()) {
1599
0
            fprintf(stderr, "dropping column family error: %s\n",
1600
0
                s.ToString().c_str());
1601
0
            std::terminate();
1602
0
          }
1603
0
          s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
1604
0
                                      &column_families_[cf]);
1605
0
          column_family_names_[cf] = new_name;
1606
0
          thread->shared->ClearColumnFamily(cf);
1607
0
          if (!s.ok()) {
1608
0
            fprintf(stderr, "creating column family error: %s\n",
1609
0
                s.ToString().c_str());
1610
0
            std::terminate();
1611
0
          }
1612
0
          thread->shared->UnlockColumnFamily(cf);
1613
0
        }
1614
0
      }
1615
1616
0
      size_t rand_key = thread->rand.Next() % max_key;
1617
0
      int rand_column_family = thread->rand.Next() % FLAGS_column_families;
1618
0
      std::string keystr = Key(rand_key);
1619
0
      Slice key = keystr;
1620
0
      std::unique_ptr<MutexLock> l;
1621
0
      if (!FLAGS_test_batches_snapshots) {
1622
0
        l.reset(new MutexLock(
1623
0
            shared->GetMutexForKey(rand_column_family, rand_key)));
1624
0
      }
1625
0
      auto column_family = column_families_[rand_column_family];
1626
1627
0
      int prob_op = thread->rand.Uniform(100);
1628
0
      if (prob_op >= 0 && prob_op < FLAGS_readpercent) {
1629
        // OPERATION read
1630
0
        if (!FLAGS_test_batches_snapshots) {
1631
0
          Status s = db_->Get(read_opts, column_family, key, &from_db);
1632
0
          if (s.ok()) {
1633
            // found case
1634
0
            thread->stats.AddGets(1, 1);
1635
0
          } else if (s.IsNotFound()) {
1636
            // not found case
1637
0
            thread->stats.AddGets(1, 0);
1638
0
          } else {
1639
            // errors case
1640
0
            thread->stats.AddErrors(1);
1641
0
          }
1642
0
        } else {
1643
0
          CHECK_OK(MultiGet(thread, read_opts, column_family, key, &from_db));
1644
0
        }
1645
0
      } else if (FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
1646
        // OPERATION prefix scan
1647
        // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
1648
        // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
1649
        // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
1650
        // prefix
1651
0
        if (!FLAGS_test_batches_snapshots) {
1652
0
          Slice prefix = Slice(key.data(), FLAGS_prefix_size);
1653
0
          Iterator* iter = db_->NewIterator(read_opts, column_family);
1654
0
          int64_t count = 0;
1655
0
          for (iter->Seek(prefix);
1656
0
               iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
1657
0
            ++count;
1658
0
          }
1659
0
          assert(count <=
1660
0
                 (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
1661
0
          if (iter->status().ok()) {
1662
0
            thread->stats.AddPrefixes(1, static_cast<int>(count));
1663
0
          } else {
1664
0
            thread->stats.AddErrors(1);
1665
0
          }
1666
0
          delete iter;
1667
0
        } else {
1668
0
          CHECK_OK(MultiPrefixScan(thread, read_opts, column_family, key));
1669
0
        }
1670
0
      } else if (prefixBound <= prob_op && prob_op < writeBound) {
1671
        // OPERATION write
1672
0
        uint32_t value_base = thread->rand.Next();
1673
0
        size_t sz = GenerateValue(value_base, value, sizeof(value));
1674
0
        Slice v(value, sz);
1675
0
        if (!FLAGS_test_batches_snapshots) {
1676
          // If the chosen key does not allow overwrite and it already
1677
          // exists, choose another key.
1678
0
          while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
1679
0
                 shared->Exists(rand_column_family, rand_key)) {
1680
0
            l.reset();
1681
0
            rand_key = thread->rand.Next() % max_key;
1682
0
            rand_column_family = thread->rand.Next() % FLAGS_column_families;
1683
0
            l.reset(new MutexLock(
1684
0
                shared->GetMutexForKey(rand_column_family, rand_key)));
1685
0
          }
1686
1687
0
          keystr = Key(rand_key);
1688
0
          key = keystr;
1689
0
          column_family = column_families_[rand_column_family];
1690
1691
0
          if (FLAGS_verify_before_write) {
1692
0
            std::string keystr2 = Key(rand_key);
1693
0
            Slice k = keystr2;
1694
0
            Status s = db_->Get(read_opts, column_family, k, &from_db);
1695
0
            if (!VerifyValue(rand_column_family, rand_key, read_opts,
1696
0
                             thread->shared, from_db, s, true)) {
1697
0
              break;
1698
0
            }
1699
0
          }
1700
0
          shared->Put(rand_column_family, rand_key, value_base);
1701
0
          Status s;
1702
0
          if (FLAGS_use_merge) {
1703
0
            s = db_->Merge(write_opts, column_family, key, v);
1704
0
          } else {
1705
0
            s = db_->Put(write_opts, column_family, key, v);
1706
0
          }
1707
0
          if (!s.ok()) {
1708
0
            fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
1709
0
            std::terminate();
1710
0
          }
1711
0
          thread->stats.AddBytesForWrites(1, sz);
1712
0
        } else {
1713
0
          CHECK_OK(MultiPut(thread, write_opts, column_family, key, v, sz));
1714
0
        }
1715
0
        PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key),
1716
0
                      value, sz);
1717
0
      } else if (writeBound <= prob_op && prob_op < delBound) {
1718
        // OPERATION delete
1719
0
        if (!FLAGS_test_batches_snapshots) {
1720
          // If the chosen key does not allow overwrite and it does not exist,
1721
          // choose another key.
1722
0
          while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
1723
0
                 !shared->Exists(rand_column_family, rand_key)) {
1724
0
            l.reset();
1725
0
            rand_key = thread->rand.Next() % max_key;
1726
0
            rand_column_family = thread->rand.Next() % FLAGS_column_families;
1727
0
            l.reset(new MutexLock(
1728
0
                shared->GetMutexForKey(rand_column_family, rand_key)));
1729
0
          }
1730
1731
0
          keystr = Key(rand_key);
1732
0
          key = keystr;
1733
0
          column_family = column_families_[rand_column_family];
1734
1735
          // Use delete if the key may be overwritten and a single deletion
1736
          // otherwise.
1737
0
          if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
1738
0
            shared->Delete(rand_column_family, rand_key);
1739
0
            Status s = db_->Delete(write_opts, column_family, key);
1740
0
            thread->stats.AddDeletes(1);
1741
0
            if (!s.ok()) {
1742
0
              fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
1743
0
              std::terminate();
1744
0
            }
1745
0
          } else {
1746
0
            shared->SingleDelete(rand_column_family, rand_key);
1747
0
            Status s = db_->SingleDelete(write_opts, column_family, key);
1748
0
            thread->stats.AddSingleDeletes(1);
1749
0
            if (!s.ok()) {
1750
0
              fprintf(stderr, "single delete error: %s\n",
1751
0
                      s.ToString().c_str());
1752
0
              std::terminate();
1753
0
            }
1754
0
          }
1755
0
        } else {
1756
0
          CHECK_OK(MultiDelete(thread, write_opts, column_family, key));
1757
0
        }
1758
0
      } else {
1759
        // OPERATION iterate
1760
0
        CHECK_OK(MultiIterate(thread, read_opts, column_family, key));
1761
0
      }
1762
0
      thread->stats.FinishedSingleOp();
1763
0
    }
1764
1765
0
    thread->stats.Stop();
1766
0
  }
1767
1768
0
  void VerifyDb(ThreadState* thread) const {
1769
0
    ReadOptions options(FLAGS_verify_checksum, true);
1770
0
    auto shared = thread->shared;
1771
0
    const int64_t max_key = shared->GetMaxKey();
1772
0
    const int64_t keys_per_thread = max_key / shared->GetNumThreads();
1773
0
    int64_t start = keys_per_thread * thread->tid;
1774
0
    int64_t end = start + keys_per_thread;
1775
0
    if (thread->tid == shared->GetNumThreads() - 1) {
1776
0
      end = max_key;
1777
0
    }
1778
0
    for (size_t cf = 0; cf < column_families_.size(); ++cf) {
1779
0
      if (thread->shared->HasVerificationFailedYet()) {
1780
0
        break;
1781
0
      }
1782
0
      if (!thread->rand.OneIn(2)) {
1783
        // Use iterator to verify this range
1784
0
        unique_ptr<Iterator> iter(
1785
0
            db_->NewIterator(options, column_families_[cf]));
1786
0
        iter->Seek(Key(start));
1787
0
        for (auto i = start; i < end; i++) {
1788
0
          if (thread->shared->HasVerificationFailedYet()) {
1789
0
            break;
1790
0
          }
1791
          // TODO(ljin): update "long" to uint64_t
1792
          // Reseek when the prefix changes
1793
0
          if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
1794
0
              0) {
1795
0
            iter->Seek(Key(i));
1796
0
          }
1797
0
          std::string from_db;
1798
0
          std::string keystr = Key(i);
1799
0
          Slice k = keystr;
1800
0
          Status s = iter->status();
1801
0
          if (iter->Valid()) {
1802
0
            if (iter->key().compare(k) > 0) {
1803
0
              s = STATUS(NotFound, Slice());
1804
0
            } else if (iter->key().compare(k) == 0) {
1805
0
              from_db = iter->value().ToString();
1806
0
              iter->Next();
1807
0
            } else if (iter->key().compare(k) < 0) {
1808
0
              VerificationAbort(shared, "An out of range key was found",
1809
0
                                static_cast<int>(cf), i);
1810
0
            }
1811
0
          } else {
1812
            // The iterator found no value for the key in question, so do not
1813
            // move to the next item in the iterator
1814
0
            s = STATUS(NotFound, Slice());
1815
0
          }
1816
0
          VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
1817
0
                      true);
1818
0
          if (from_db.length()) {
1819
0
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
1820
0
                          from_db.data(), from_db.length());
1821
0
          }
1822
0
        }
1823
0
      } else {
1824
        // Use Get to verify this range
1825
0
        for (auto i = start; i < end; i++) {
1826
0
          if (thread->shared->HasVerificationFailedYet()) {
1827
0
            break;
1828
0
          }
1829
0
          std::string from_db;
1830
0
          std::string keystr = Key(i);
1831
0
          Slice k = keystr;
1832
0
          Status s = db_->Get(options, column_families_[cf], k, &from_db);
1833
0
          VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
1834
0
                      true);
1835
0
          if (from_db.length()) {
1836
0
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
1837
0
                          from_db.data(), from_db.length());
1838
0
          }
1839
0
        }
1840
0
      }
1841
0
    }
1842
0
  }
1843
1844
  void VerificationAbort(SharedState* shared, std::string msg, int cf,
1845
0
                         int64_t key) const {
1846
0
    printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key,
1847
0
           msg.c_str());
1848
0
    shared->SetVerificationFailure();
1849
0
  }
1850
1851
  bool VerifyValue(int cf, int64_t key, const ReadOptions& opts,
1852
                   SharedState* shared, const std::string& value_from_db,
1853
0
                   Status s, bool strict = false) const {
1854
0
    if (shared->HasVerificationFailedYet()) {
1855
0
      return false;
1856
0
    }
1857
    // compare value_from_db with the value in the shared state
1858
0
    char value[100];
1859
0
    uint32_t value_base = shared->Get(cf, key);
1860
0
    if (value_base == SharedState::SENTINEL && !strict) {
1861
0
      return true;
1862
0
    }
1863
1864
0
    if (s.ok()) {
1865
0
      if (value_base == SharedState::SENTINEL) {
1866
0
        VerificationAbort(shared, "Unexpected value found", cf, key);
1867
0
        return false;
1868
0
      }
1869
0
      size_t sz = GenerateValue(value_base, value, sizeof(value));
1870
0
      if (value_from_db.length() != sz) {
1871
0
        VerificationAbort(shared, "Length of value read is not equal", cf, key);
1872
0
        return false;
1873
0
      }
1874
0
      if (memcmp(value_from_db.data(), value, sz) != 0) {
1875
0
        VerificationAbort(shared, "Contents of value read don't match", cf,
1876
0
                          key);
1877
0
        return false;
1878
0
      }
1879
0
    } else {
1880
0
      if (value_base != SharedState::SENTINEL) {
1881
0
        VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
1882
0
        return false;
1883
0
      }
1884
0
    }
1885
0
    return true;
1886
0
  }
1887
1888
  static void PrintKeyValue(int cf, int64_t key, const char* value,
1889
0
                            size_t sz) {
1890
0
    if (!FLAGS_verbose) {
1891
0
      return;
1892
0
    }
1893
0
    fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz);
1894
0
    for (size_t i = 0; i < sz; i++) {
1895
0
      fprintf(stdout, "%X", value[i]);
1896
0
    }
1897
0
    fprintf(stdout, "\n");
1898
0
  }
1899
1900
0
  static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
1901
0
    size_t value_sz = ((rand % 3) + 1) * FLAGS_value_size_mult;
1902
0
    assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
1903
0
    memcpy(v, &rand, sizeof(rand));
1904
0
    for (size_t i=sizeof(uint32_t); i < value_sz; i++) {
1905
0
      v[i] = static_cast<char>(rand ^ i);
1906
0
    }
1907
0
    v[value_sz] = '\0';
1908
0
    return value_sz; // the size of the value set.
1909
0
  }
1910
1911
0
  void PrintEnv() const {
1912
0
    fprintf(stdout, "Column families           : %d\n", FLAGS_column_families);
1913
0
    if (!FLAGS_test_batches_snapshots) {
1914
0
      fprintf(stdout, "Clear CFs one in          : %d\n",
1915
0
              FLAGS_clear_column_family_one_in);
1916
0
    }
1917
0
    fprintf(stdout, "Number of threads         : %d\n", FLAGS_threads);
1918
0
    fprintf(stdout, "Ops per thread            : %" PRIu64 "\n", FLAGS_ops_per_thread);
1919
0
    std::string ttl_state("unused");
1920
0
    if (FLAGS_ttl > 0) {
1921
0
      ttl_state = NumberToString(FLAGS_ttl);
1922
0
    }
1923
0
    fprintf(stdout, "Time to live(sec)         : %s\n", ttl_state.c_str());
1924
0
    fprintf(stdout, "Read percentage           : %d%%\n", FLAGS_readpercent);
1925
0
    fprintf(stdout, "Prefix percentage         : %d%%\n", FLAGS_prefixpercent);
1926
0
    fprintf(stdout, "Write percentage          : %d%%\n", FLAGS_writepercent);
1927
0
    fprintf(stdout, "Delete percentage         : %d%%\n", FLAGS_delpercent);
1928
0
    fprintf(stdout, "No overwrite percentage   : %d%%\n",
1929
0
            FLAGS_nooverwritepercent);
1930
0
    fprintf(stdout, "Iterate percentage        : %d%%\n", FLAGS_iterpercent);
1931
0
    fprintf(stdout, "DB-write-buffer-size      : %" PRIu64 "\n",
1932
0
            FLAGS_db_write_buffer_size);
1933
0
    fprintf(stdout, "Write-buffer-size         : %d\n",
1934
0
            FLAGS_write_buffer_size);
1935
0
    fprintf(stdout, "Iterations                : %" PRIu64 "\n", FLAGS_num_iterations);
1936
0
    fprintf(stdout, "Max key                   : %" PRId64 "\n", FLAGS_max_key);
1937
0
    fprintf(stdout, "Ratio #ops/#keys          : %f\n",
1938
0
            (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
1939
0
    fprintf(stdout, "Num times DB reopens      : %d\n", FLAGS_reopen);
1940
0
    fprintf(stdout, "Batches/snapshots         : %d\n",
1941
0
            FLAGS_test_batches_snapshots);
1942
0
    fprintf(stdout, "Deletes use filter        : %d\n", FLAGS_filter_deletes);
1943
0
    fprintf(stdout, "Do update in place        : %d\n", FLAGS_in_place_update);
1944
0
    fprintf(stdout, "Num keys per lock         : %d\n",
1945
0
            1 << FLAGS_log2_keys_per_lock);
1946
0
    std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
1947
0
    fprintf(stdout, "Compression               : %s\n", compression.c_str());
1948
1949
0
    const char* memtablerep = "";
1950
0
    switch (FLAGS_rep_factory) {
1951
0
      case kSkipList:
1952
0
        memtablerep = "skip_list";
1953
0
        break;
1954
0
      case kHashSkipList:
1955
0
        memtablerep = "prefix_hash";
1956
0
        break;
1957
0
      case kVectorRep:
1958
0
        memtablerep = "vector";
1959
0
        break;
1960
0
    }
1961
1962
0
    fprintf(stdout, "Memtablerep               : %s\n", memtablerep);
1963
1964
0
    fprintf(stdout, "Test kill odd             : %d\n", rocksdb_kill_odds);
1965
0
    if (!rocksdb_kill_prefix_blacklist.empty()) {
1966
0
      fprintf(stdout, "Skipping kill points prefixes:\n");
1967
0
      for (auto& p : rocksdb_kill_prefix_blacklist) {
1968
0
        fprintf(stdout, "  %s\n", p.c_str());
1969
0
      }
1970
0
    }
1971
1972
0
    fprintf(stdout, "------------------------------------------------\n");
1973
0
  }
1974
1975
0
  void Open() {
1976
0
    assert(db_ == nullptr);
1977
0
    BlockBasedTableOptions block_based_options;
1978
0
    block_based_options.block_cache = cache_;
1979
0
    block_based_options.block_cache_compressed = compressed_cache_;
1980
0
    block_based_options.block_size = FLAGS_block_size;
1981
0
    block_based_options.format_version = 2;
1982
0
    block_based_options.filter_policy = filter_policy_;
1983
0
    options_.table_factory.reset(
1984
0
        NewBlockBasedTableFactory(block_based_options));
1985
0
    options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
1986
0
    options_.write_buffer_size = FLAGS_write_buffer_size;
1987
0
    options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
1988
0
    options_.min_write_buffer_number_to_merge =
1989
0
        FLAGS_min_write_buffer_number_to_merge;
1990
0
    options_.max_write_buffer_number_to_maintain =
1991
0
        FLAGS_max_write_buffer_number_to_maintain;
1992
0
    options_.max_background_compactions = FLAGS_max_background_compactions;
1993
0
    options_.max_background_flushes = FLAGS_max_background_flushes;
1994
0
    options_.compaction_style =
1995
0
        static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style);
1996
0
    options_.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
1997
0
    options_.max_open_files = FLAGS_open_files;
1998
0
    options_.statistics = dbstats;
1999
0
    options_.env = FLAGS_env;
2000
0
    options_.disableDataSync = FLAGS_disable_data_sync;
2001
0
    options_.use_fsync = FLAGS_use_fsync;
2002
0
    options_.allow_mmap_reads = FLAGS_mmap_read;
2003
0
    options_.target_file_size_base = FLAGS_target_file_size_base;
2004
0
    options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2005
0
    options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2006
0
    options_.max_bytes_for_level_multiplier =
2007
0
        FLAGS_max_bytes_for_level_multiplier;
2008
0
    options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2009
0
    options_.level0_slowdown_writes_trigger =
2010
0
        FLAGS_level0_slowdown_writes_trigger;
2011
0
    options_.level0_file_num_compaction_trigger =
2012
0
        FLAGS_level0_file_num_compaction_trigger;
2013
0
    options_.compression = FLAGS_compression_type_e;
2014
0
    options_.create_if_missing = true;
2015
0
    options_.max_manifest_file_size = 10 * 1024;
2016
0
    options_.filter_deletes = FLAGS_filter_deletes;
2017
0
    options_.inplace_update_support = FLAGS_in_place_update;
2018
0
    options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2019
0
    options_.allow_concurrent_memtable_write =
2020
0
        FLAGS_allow_concurrent_memtable_write;
2021
0
    options_.enable_write_thread_adaptive_yield =
2022
0
        FLAGS_enable_write_thread_adaptive_yield;
2023
2024
0
    if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2025
0
      fprintf(stderr,
2026
0
              "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2027
0
      exit(1);
2028
0
    }
2029
0
    if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2030
0
      fprintf(stderr,
2031
0
              "WARNING: prefix_size is non-zero but "
2032
0
              "memtablerep != prefix_hash\n");
2033
0
    }
2034
0
    switch (FLAGS_rep_factory) {
2035
0
      case kSkipList:
2036
        // no need to do anything
2037
0
        break;
2038
0
#ifndef ROCKSDB_LITE
2039
0
      case kHashSkipList:
2040
0
        options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2041
0
        break;
2042
0
      case kVectorRep:
2043
0
        options_.memtable_factory.reset(new VectorRepFactory());
2044
0
        break;
2045
#else
2046
      default:
2047
        fprintf(stderr,
2048
                "RocksdbLite only supports skip list mem table. Skip "
2049
                "--rep_factory\n");
2050
#endif  // ROCKSDB_LITE
2051
0
    }
2052
2053
0
    if (FLAGS_use_merge) {
2054
0
      options_.merge_operator = MergeOperators::CreatePutOperator();
2055
0
    }
2056
2057
    // set universal style compaction configurations, if applicable
2058
0
    if (FLAGS_universal_size_ratio != 0) {
2059
0
      options_.compaction_options_universal.size_ratio =
2060
0
          FLAGS_universal_size_ratio;
2061
0
    }
2062
0
    if (FLAGS_universal_min_merge_width != 0) {
2063
0
      options_.compaction_options_universal.min_merge_width =
2064
0
          FLAGS_universal_min_merge_width;
2065
0
    }
2066
0
    if (FLAGS_universal_max_merge_width != 0) {
2067
0
      options_.compaction_options_universal.max_merge_width =
2068
0
          FLAGS_universal_max_merge_width;
2069
0
    }
2070
0
    if (FLAGS_universal_max_size_amplification_percent != 0) {
2071
0
      options_.compaction_options_universal.max_size_amplification_percent =
2072
0
          FLAGS_universal_max_size_amplification_percent;
2073
0
    }
2074
2075
0
    fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2076
2077
0
    Status s;
2078
0
    if (FLAGS_ttl == -1) {
2079
0
      std::vector<std::string> existing_column_families;
2080
0
      s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2081
0
                                 &existing_column_families);  // ignore errors
2082
0
      if (!s.ok()) {
2083
        // DB doesn't exist
2084
0
        assert(existing_column_families.empty());
2085
0
        assert(column_family_names_.empty());
2086
0
        column_family_names_.push_back(kDefaultColumnFamilyName);
2087
0
      } else if (column_family_names_.empty()) {
2088
        // this is the first call to the function Open()
2089
0
        column_family_names_ = existing_column_families;
2090
0
      } else {
2091
        // this is a reopen. just assert that existing column_family_names are
2092
        // equivalent to what we remember
2093
0
        auto sorted_cfn = column_family_names_;
2094
0
        sort(sorted_cfn.begin(), sorted_cfn.end());
2095
0
        sort(existing_column_families.begin(), existing_column_families.end());
2096
0
        if (sorted_cfn != existing_column_families) {
2097
0
          fprintf(stderr,
2098
0
                  "Expected column families differ from the existing:\n");
2099
0
          printf("Expected: {");
2100
0
          for (auto cf : sorted_cfn) {
2101
0
            printf("%s ", cf.c_str());
2102
0
          }
2103
0
          printf("}\n");
2104
0
          printf("Existing: {");
2105
0
          for (auto cf : existing_column_families) {
2106
0
            printf("%s ", cf.c_str());
2107
0
          }
2108
0
          printf("}\n");
2109
0
        }
2110
0
        assert(sorted_cfn == existing_column_families);
2111
0
      }
2112
0
      std::vector<ColumnFamilyDescriptor> cf_descriptors;
2113
0
      for (auto name : column_family_names_) {
2114
0
        if (name != kDefaultColumnFamilyName) {
2115
0
          new_column_family_name_ =
2116
0
              std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2117
0
        }
2118
0
        cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2119
0
      }
2120
0
      while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2121
0
        std::string name = ToString(new_column_family_name_.load());
2122
0
        new_column_family_name_++;
2123
0
        cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2124
0
        column_family_names_.push_back(name);
2125
0
      }
2126
0
      options_.listeners.clear();
2127
0
      options_.listeners.emplace_back(
2128
0
          new DbStressListener(FLAGS_db, options_.db_paths));
2129
0
      options_.create_missing_column_families = true;
2130
0
      s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2131
0
                   &column_families_, &db_);
2132
0
      assert(!s.ok() || column_families_.size() ==
2133
0
                            static_cast<size_t>(FLAGS_column_families));
2134
0
    } else {
2135
0
#ifndef ROCKSDB_LITE
2136
0
      DBWithTTL* db_with_ttl;
2137
0
      s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2138
0
      db_ = db_with_ttl;
2139
#else
2140
      fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2141
      exit(1);
2142
#endif
2143
0
    }
2144
0
    if (!s.ok()) {
2145
0
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2146
0
      exit(1);
2147
0
    }
2148
0
  }
2149
2150
0
  void Reopen() {
2151
0
    for (auto cf : column_families_) {
2152
0
      delete cf;
2153
0
    }
2154
0
    column_families_.clear();
2155
0
    delete db_;
2156
0
    db_ = nullptr;
2157
2158
0
    num_times_reopened_++;
2159
0
    auto now = FLAGS_env->NowMicros();
2160
0
    fprintf(stdout, "%s Reopening database for the %dth time\n",
2161
0
            FLAGS_env->TimeToString(now/1000000).c_str(),
2162
0
            num_times_reopened_);
2163
0
    Open();
2164
0
  }
2165
2166
0
  void PrintStatistics() {
2167
0
    if (dbstats) {
2168
0
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2169
0
    }
2170
0
  }
2171
2172
 private:
2173
  std::shared_ptr<Cache> cache_;
2174
  std::shared_ptr<Cache> compressed_cache_;
2175
  std::shared_ptr<const FilterPolicy> filter_policy_;
2176
  DB* db_;
2177
  Options options_;
2178
  std::vector<ColumnFamilyHandle*> column_families_;
2179
  std::vector<std::string> column_family_names_;
2180
  std::atomic<int> new_column_family_name_;
2181
  int num_times_reopened_;
2182
  std::unordered_map<std::string, std::vector<std::string>> options_table_;
2183
  std::vector<std::string> options_index_;
2184
};
2185
2186
}  // namespace rocksdb
2187
2188
int main(int argc, char** argv) {
2189
  SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
2190
                  " [OPTIONS]...");
2191
  ParseCommandLineFlags(&argc, &argv, true);
2192
2193
  if (FLAGS_statistics) {
2194
    dbstats = rocksdb::CreateDBStatisticsForTests();
2195
  }
2196
  FLAGS_compression_type_e =
2197
    StringToCompressionType(FLAGS_compression_type.c_str());
2198
  if (!FLAGS_hdfs.empty()) {
2199
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
2200
  }
2201
  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
2202
2203
  // The number of background threads should be at least as much the
2204
  // max number of concurrent compactions.
2205
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
2206
2207
  if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) {
2208
    fprintf(stderr,
2209
            "Error: prefixpercent is non-zero while prefix_size is "
2210
            "not positive!\n");
2211
    exit(1);
2212
  }
2213
  if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) {
2214
    fprintf(stderr,
2215
            "Error: please specify prefix_size for "
2216
            "test_batches_snapshots test!\n");
2217
    exit(1);
2218
  }
2219
  if ((FLAGS_readpercent + FLAGS_prefixpercent +
2220
       FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) {
2221
      fprintf(stderr,
2222
              "Error: Read+Prefix+Write+Delete+Iterate percents != 100!\n");
2223
      exit(1);
2224
  }
2225
  if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
2226
      fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n");
2227
      exit(1);
2228
  }
2229
  if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) {
2230
      fprintf(stderr,
2231
              "Error: #DB-reopens should be < ops_per_thread\n"
2232
              "Provided reopens = %d and ops_per_thread = %" PRIu64 "\n",
2233
              FLAGS_reopen,
2234
              FLAGS_ops_per_thread);
2235
      exit(1);
2236
  }
2237
2238
  // Choose a location for the test database if none given with --db=<path>
2239
  if (FLAGS_db.empty()) {
2240
      std::string default_db_path;
2241
      CHECK_OK(rocksdb::Env::Default()->GetTestDirectory(&default_db_path));
2242
      default_db_path += "/dbstress";
2243
      FLAGS_db = default_db_path;
2244
  }
2245
2246
  rocksdb_kill_odds = FLAGS_kill_random_test;
2247
  rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
2248
2249
  rocksdb::StressTest stress;
2250
  if (stress.Run()) {
2251
    return 0;
2252
  } else {
2253
    return 1;
2254
  }
2255
}
2256
2257
#endif  // GFLAGS