YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/tools/write_stress.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
// This source code is licensed under the BSD-style license found in the
3
// LICENSE file in the root directory of this source tree. An additional grant
4
// of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
//
21
// The goal of this tool is to be a simple stress test with focus on catching:
22
// * bugs in compaction/flush processes, especially the ones that cause
23
// assertion errors
24
// * bugs in the code that deletes obsolete files
25
//
26
// There are two parts of the test:
27
// * write_stress, a binary that writes to the database
28
// * write_stress_runner.py, a script that invokes and kills write_stress
29
//
30
// Here are some interesting parts of write_stress:
31
// * Runs with very high concurrency of compactions and flushes (32 threads
32
// total) and tries to create a huge amount of small files
33
// * The keys written to the database are not uniformly distributed -- there is
34
// a 3-character prefix that mutates occasionally (in prefix mutator thread), in
35
// such a way that the first character mutates slower than second, which mutates
36
// slower than third character. That way, the compaction stress tests some
37
// interesting compaction features like trivial moves and bottommost level
38
// calculation
39
// * There is a thread that creates an iterator, holds it for couple of seconds
40
// and then iterates over all keys. This is supposed to test RocksDB's abilities
41
// to keep the files alive when there are references to them.
42
// * Some writes trigger WAL sync. This is stress testing our WAL sync code.
43
// * At the end of the run, we make sure that we didn't leak any of the sst
44
// files
45
//
46
// write_stress_runner.py changes the mode in which we run write_stress and also
47
// kills and restarts it. There are some interesting characteristics:
48
// * At the beginning we divide the full test runtime into smaller parts --
49
// shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds
50
// * The first time we run write_stress, we destroy the old DB. Every next time
51
// during the test, we use the same DB.
52
// * We can run in kill mode or clean-restart mode. Kill mode kills the
53
// write_stress violently.
54
// * We can run in mode where delete_obsolete_files_with_fullscan is true or
55
// false
56
// * We can run with low_open_files mode turned on or off. When it's turned on,
57
// we configure table cache to only hold a couple of files -- that way we need
58
// to reopen files every time we access them.
59
//
60
// Another goal was to create a stress test without a lot of parameters. So
61
// tools/write_stress_runner.py should only take one parameter -- runtime_sec
62
// and it should figure out everything else on its own.
63
64
65
#ifndef GFLAGS
66
int main() {
67
  fprintf(stderr, "Please install gflags to run rocksdb tools\n");
68
  return 1;
69
}
70
#else
71
72
#include <gflags/gflags.h>
73
74
#ifndef __STDC_FORMAT_MACROS
75
#define __STDC_FORMAT_MACROS
76
#endif
77
78
#include <inttypes.h>
79
#include <atomic>
80
#include <random>
81
#include <set>
82
#include <string>
83
#include <thread>
84
85
#include "yb/rocksdb/db.h"
86
#include "yb/rocksdb/env.h"
87
#include "yb/rocksdb/options.h"
88
#include "yb/util/slice.h"
89
90
#include "yb/rocksdb/db/filename.h"
91
92
using GFLAGS::ParseCommandLineFlags;
93
using GFLAGS::RegisterFlagValidator;
94
using GFLAGS::SetUsageMessage;
95
96
DEFINE_int32(key_size, 10, "Key size");
97
DEFINE_int32(value_size, 100, "Value size");
98
DEFINE_string(db, "", "Use the db with the following name.");
99
DEFINE_bool(destroy_db, true,
100
            "Destory the existing DB before running the test");
101
102
DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds");
103
DEFINE_int32(seed, 139, "Random seed");
104
105
DEFINE_double(prefix_mutate_period_sec, 1.0,
106
              "How often are we going to mutate the prefix");
107
DEFINE_double(first_char_mutate_probability, 0.1,
108
              "How likely are we to mutate the first char every period");
109
DEFINE_double(second_char_mutate_probability, 0.2,
110
              "How likely are we to mutate the second char every period");
111
DEFINE_double(third_char_mutate_probability, 0.5,
112
              "How likely are we to mutate the third char every period");
113
114
DEFINE_int32(iterator_hold_sec, 5,
115
             "How long will the iterator hold files before it gets destroyed");
116
117
DEFINE_double(sync_probability, 0.01, "How often are we syncing writes");
118
DEFINE_bool(delete_obsolete_files_with_fullscan, false,
119
            "If true, we delete obsolete files after each compaction/flush "
120
            "using GetChildren() API");
121
DEFINE_bool(low_open_files_mode, false,
122
            "If true, we set max_open_files to 20, so that every file access "
123
            "needs to reopen it");
124
125
namespace rocksdb {
126
127
static const int kPrefixSize = 3;
128
129
class WriteStress {
130
 public:
131
0
  WriteStress() : stop_(false) {
132
    // initialize key_prefix
133
0
    for (int i = 0; i < kPrefixSize; ++i) {
134
0
      key_prefix_[i].store('a');
135
0
    }
136
137
    // Choose a location for the test database if none given with --db=<path>
138
0
    if (FLAGS_db.empty()) {
139
0
      std::string default_db_path;
140
0
      CHECK_OK(Env::Default()->GetTestDirectory(&default_db_path));
141
0
      default_db_path += "/write_stress";
142
0
      FLAGS_db = default_db_path;
143
0
    }
144
145
0
    Options options;
146
0
    if (FLAGS_destroy_db) {
147
0
      CHECK_OK(DestroyDB(FLAGS_db, options));
148
0
    }
149
150
    // make the LSM tree deep, so that we have many concurrent flushes and
151
    // compactions
152
0
    options.create_if_missing = true;
153
0
    options.write_buffer_size = 256 * 1024;              // 256k
154
0
    options.max_bytes_for_level_base = 1 * 1024 * 1204;  // 1MB
155
0
    options.target_file_size_base = 100 * 1204;          // 100k
156
0
    options.max_write_buffer_number = 16;
157
0
    options.max_background_compactions = 16;
158
0
    options.max_background_flushes = 16;
159
0
    options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1;
160
0
    if (FLAGS_delete_obsolete_files_with_fullscan) {
161
0
      options.delete_obsolete_files_period_micros = 0;
162
0
    }
163
164
    // open DB
165
0
    DB* db;
166
0
    Status s = DB::Open(options, FLAGS_db, &db);
167
0
    if (!s.ok()) {
168
0
      fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str());
169
0
      std::abort();
170
0
    }
171
0
    db_.reset(db);
172
0
  }
173
174
0
  void WriteThread() {
175
0
    std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
176
0
    std::uniform_real_distribution<double> dist(0, 1);
177
178
0
    auto random_string = [](std::mt19937& r, int len) {
179
0
      std::uniform_int_distribution<int> char_dist('a', 'z');
180
0
      std::string ret;
181
0
      for (int i = 0; i < len; ++i) {
182
0
        ret += char_dist(r);
183
0
      }
184
0
      return ret;
185
0
    };
186
187
0
    while (!stop_.load(std::memory_order_relaxed)) {
188
0
      std::string prefix;
189
0
      prefix.resize(kPrefixSize);
190
0
      for (int i = 0; i < kPrefixSize; ++i) {
191
0
        prefix[i] = key_prefix_[i].load(std::memory_order_relaxed);
192
0
      }
193
0
      auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize);
194
0
      auto value = random_string(rng, FLAGS_value_size);
195
0
      WriteOptions woptions;
196
0
      woptions.sync = dist(rng) < FLAGS_sync_probability;
197
0
      auto s = db_->Put(woptions, key, value);
198
0
      if (!s.ok()) {
199
0
        fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str());
200
0
        std::abort();
201
0
      }
202
0
    }
203
0
  }
204
205
0
  void IteratorHoldThread() {
206
0
    while (!stop_.load(std::memory_order_relaxed)) {
207
0
      std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
208
0
      Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 *
209
0
                                           1000LL);
210
0
      for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
211
0
      }
212
0
      if (!iterator->status().ok()) {
213
0
        fprintf(stderr, "Iterator statuts not OK: %s\n",
214
0
                iterator->status().ToString().c_str());
215
0
        std::abort();
216
0
      }
217
0
    }
218
0
  }
219
220
0
  void PrefixMutatorThread() {
221
0
    std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed));
222
0
    std::uniform_real_distribution<double> dist(0, 1);
223
0
    std::uniform_int_distribution<int> char_dist('a', 'z');
224
0
    while (!stop_.load(std::memory_order_relaxed)) {
225
0
      Env::Default()->SleepForMicroseconds(static_cast<int>(
226
0
                                           FLAGS_prefix_mutate_period_sec *
227
0
                                           1000 * 1000LL));
228
0
      if (dist(rng) < FLAGS_first_char_mutate_probability) {
229
0
        key_prefix_[0].store(char_dist(rng), std::memory_order_relaxed);
230
0
      }
231
0
      if (dist(rng) < FLAGS_second_char_mutate_probability) {
232
0
        key_prefix_[1].store(char_dist(rng), std::memory_order_relaxed);
233
0
      }
234
0
      if (dist(rng) < FLAGS_third_char_mutate_probability) {
235
0
        key_prefix_[2].store(char_dist(rng), std::memory_order_relaxed);
236
0
      }
237
0
    }
238
0
  }
239
240
0
  int Run() {
241
0
    threads_.emplace_back([&]() { WriteThread(); });
242
0
    threads_.emplace_back([&]() { PrefixMutatorThread(); });
243
0
    threads_.emplace_back([&]() { IteratorHoldThread(); });
244
245
0
    if (FLAGS_runtime_sec == -1) {
246
      // infinite runtime, until we get killed
247
0
      while (true) {
248
0
        Env::Default()->SleepForMicroseconds(1000 * 1000);
249
0
      }
250
0
    }
251
252
0
    Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000);
253
254
0
    stop_.store(true, std::memory_order_relaxed);
255
0
    for (auto& t : threads_) {
256
0
      t.join();
257
0
    }
258
0
    threads_.clear();
259
260
// Skip checking for leaked files in ROCKSDB_LITE since we don't have access to
261
// function GetLiveFilesMetaData
262
0
#ifndef ROCKSDB_LITE
263
    // let's see if we leaked some files
264
0
    CHECK_OK(db_->PauseBackgroundWork());
265
0
    std::vector<LiveFileMetaData> metadata;
266
0
    db_->GetLiveFilesMetaData(&metadata);
267
0
    std::set<uint64_t> sst_file_numbers;
268
0
    for (const auto& file : metadata) {
269
0
      uint64_t number;
270
0
      FileType type;
271
0
      if (!ParseFileName(file.name, &number, "LOG", &type)) {
272
0
        continue;
273
0
      }
274
0
      if (type == kTableFile) {
275
0
        sst_file_numbers.insert(number);
276
0
      }
277
0
    }
278
279
0
    std::vector<std::string> children;
280
0
    CHECK_OK(Env::Default()->GetChildren(FLAGS_db, &children));
281
0
    for (const auto& child : children) {
282
0
      uint64_t number;
283
0
      FileType type;
284
0
      if (!ParseFileName(child, &number, "LOG", &type)) {
285
0
        continue;
286
0
      }
287
0
      if (type == kTableFile) {
288
0
        if (sst_file_numbers.find(number) == sst_file_numbers.end()) {
289
0
          fprintf(stderr,
290
0
                  "Found a table file in DB path that should have been "
291
0
                  "deleted: %s\n",
292
0
                  child.c_str());
293
0
          std::abort();
294
0
        }
295
0
      }
296
0
    }
297
0
    CHECK_OK(db_->ContinueBackgroundWork());
298
0
#endif  // !ROCKSDB_LITE
299
300
0
    return 0;
301
0
  }
302
303
 private:
304
  // each key is prepended with this prefix. we occasionally change it. third
305
  // letter is changed more frequently than second, which is changed more
306
  // frequently than the first one.
307
  std::atomic<char> key_prefix_[kPrefixSize];
308
  std::atomic<bool> stop_;
309
  std::vector<std::thread> threads_;
310
  std::unique_ptr<DB> db_;
311
};
312
313
}  // namespace rocksdb
314
315
18.6k
int main(int argc, char** argv) {
316
18.6k
  SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
317
18.6k
                  " [OPTIONS]...");
318
18.6k
  ParseCommandLineFlags(&argc, &argv, true);
319
18.6k
  rocksdb::WriteStress write_stress;
320
18.6k
  return write_stress.Run();
321
18.6k
}
322
323
#endif  // GFLAGS