/Users/deen/code/yugabyte-db/src/yb/rocksdb/tools/write_stress.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // |
21 | | // The goal of this tool is to be a simple stress test with focus on catching: |
22 | | // * bugs in compaction/flush processes, especially the ones that cause |
23 | | // assertion errors |
24 | | // * bugs in the code that deletes obsolete files |
25 | | // |
26 | | // There are two parts of the test: |
27 | | // * write_stress, a binary that writes to the database |
28 | | // * write_stress_runner.py, a script that invokes and kills write_stress |
29 | | // |
30 | | // Here are some interesting parts of write_stress: |
31 | | // * Runs with very high concurrency of compactions and flushes (32 threads |
32 | | // total) and tries to create a huge amount of small files |
33 | | // * The keys written to the database are not uniformly distributed -- there is |
34 | | // a 3-character prefix that mutates occasionally (in prefix mutator thread), in |
35 | | // such a way that the first character mutates slower than second, which mutates |
36 | | // slower than third character. That way, the compaction stress tests some |
37 | | // interesting compaction features like trivial moves and bottommost level |
38 | | // calculation |
39 | | // * There is a thread that creates an iterator, holds it for couple of seconds |
40 | | // and then iterates over all keys. This is supposed to test RocksDB's abilities |
41 | | // to keep the files alive when there are references to them. |
42 | | // * Some writes trigger WAL sync. This is stress testing our WAL sync code. |
43 | | // * At the end of the run, we make sure that we didn't leak any of the sst |
44 | | // files |
45 | | // |
46 | | // write_stress_runner.py changes the mode in which we run write_stress and also |
47 | | // kills and restarts it. There are some interesting characteristics: |
48 | | // * At the beginning we divide the full test runtime into smaller parts -- |
49 | | // shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds |
50 | | // * The first time we run write_stress, we destroy the old DB. Every next time |
51 | | // during the test, we use the same DB. |
52 | | // * We can run in kill mode or clean-restart mode. Kill mode kills the |
53 | | // write_stress violently. |
54 | | // * We can run in mode where delete_obsolete_files_with_fullscan is true or |
55 | | // false |
56 | | // * We can run with low_open_files mode turned on or off. When it's turned on, |
57 | | // we configure table cache to only hold a couple of files -- that way we need |
58 | | // to reopen files every time we access them. |
59 | | // |
60 | | // Another goal was to create a stress test without a lot of parameters. So |
61 | | // tools/write_stress_runner.py should only take one parameter -- runtime_sec |
62 | | // and it should figure out everything else on its own. |
63 | | |
64 | | |
65 | | #ifndef GFLAGS |
66 | | int main() { |
67 | | fprintf(stderr, "Please install gflags to run rocksdb tools\n"); |
68 | | return 1; |
69 | | } |
70 | | #else |
71 | | |
72 | | #include <gflags/gflags.h> |
73 | | |
74 | | #ifndef __STDC_FORMAT_MACROS |
75 | | #define __STDC_FORMAT_MACROS |
76 | | #endif |
77 | | |
78 | | #include <inttypes.h> |
79 | | #include <atomic> |
80 | | #include <random> |
81 | | #include <set> |
82 | | #include <string> |
83 | | #include <thread> |
84 | | |
85 | | #include "yb/rocksdb/db.h" |
86 | | #include "yb/rocksdb/env.h" |
87 | | #include "yb/rocksdb/options.h" |
88 | | #include "yb/util/slice.h" |
89 | | |
90 | | #include "yb/rocksdb/db/filename.h" |
91 | | |
92 | | using GFLAGS::ParseCommandLineFlags; |
93 | | using GFLAGS::RegisterFlagValidator; |
94 | | using GFLAGS::SetUsageMessage; |
95 | | |
96 | | DEFINE_int32(key_size, 10, "Key size"); |
97 | | DEFINE_int32(value_size, 100, "Value size"); |
98 | | DEFINE_string(db, "", "Use the db with the following name."); |
99 | | DEFINE_bool(destroy_db, true, |
100 | | "Destory the existing DB before running the test"); |
101 | | |
102 | | DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds"); |
103 | | DEFINE_int32(seed, 139, "Random seed"); |
104 | | |
105 | | DEFINE_double(prefix_mutate_period_sec, 1.0, |
106 | | "How often are we going to mutate the prefix"); |
107 | | DEFINE_double(first_char_mutate_probability, 0.1, |
108 | | "How likely are we to mutate the first char every period"); |
109 | | DEFINE_double(second_char_mutate_probability, 0.2, |
110 | | "How likely are we to mutate the second char every period"); |
111 | | DEFINE_double(third_char_mutate_probability, 0.5, |
112 | | "How likely are we to mutate the third char every period"); |
113 | | |
114 | | DEFINE_int32(iterator_hold_sec, 5, |
115 | | "How long will the iterator hold files before it gets destroyed"); |
116 | | |
117 | | DEFINE_double(sync_probability, 0.01, "How often are we syncing writes"); |
118 | | DEFINE_bool(delete_obsolete_files_with_fullscan, false, |
119 | | "If true, we delete obsolete files after each compaction/flush " |
120 | | "using GetChildren() API"); |
121 | | DEFINE_bool(low_open_files_mode, false, |
122 | | "If true, we set max_open_files to 20, so that every file access " |
123 | | "needs to reopen it"); |
124 | | |
125 | | namespace rocksdb { |
126 | | |
127 | | static const int kPrefixSize = 3; |
128 | | |
129 | | class WriteStress { |
130 | | public: |
131 | 0 | WriteStress() : stop_(false) { |
132 | | // initialize key_prefix |
133 | 0 | for (int i = 0; i < kPrefixSize; ++i) { |
134 | 0 | key_prefix_[i].store('a'); |
135 | 0 | } |
136 | | |
137 | | // Choose a location for the test database if none given with --db=<path> |
138 | 0 | if (FLAGS_db.empty()) { |
139 | 0 | std::string default_db_path; |
140 | 0 | CHECK_OK(Env::Default()->GetTestDirectory(&default_db_path)); |
141 | 0 | default_db_path += "/write_stress"; |
142 | 0 | FLAGS_db = default_db_path; |
143 | 0 | } |
144 | |
|
145 | 0 | Options options; |
146 | 0 | if (FLAGS_destroy_db) { |
147 | 0 | CHECK_OK(DestroyDB(FLAGS_db, options)); |
148 | 0 | } |
149 | | |
150 | | // make the LSM tree deep, so that we have many concurrent flushes and |
151 | | // compactions |
152 | 0 | options.create_if_missing = true; |
153 | 0 | options.write_buffer_size = 256 * 1024; // 256k |
154 | 0 | options.max_bytes_for_level_base = 1 * 1024 * 1204; // 1MB |
155 | 0 | options.target_file_size_base = 100 * 1204; // 100k |
156 | 0 | options.max_write_buffer_number = 16; |
157 | 0 | options.max_background_compactions = 16; |
158 | 0 | options.max_background_flushes = 16; |
159 | 0 | options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1; |
160 | 0 | if (FLAGS_delete_obsolete_files_with_fullscan) { |
161 | 0 | options.delete_obsolete_files_period_micros = 0; |
162 | 0 | } |
163 | | |
164 | | // open DB |
165 | 0 | DB* db; |
166 | 0 | Status s = DB::Open(options, FLAGS_db, &db); |
167 | 0 | if (!s.ok()) { |
168 | 0 | fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str()); |
169 | 0 | std::abort(); |
170 | 0 | } |
171 | 0 | db_.reset(db); |
172 | 0 | } |
173 | | |
174 | 0 | void WriteThread() { |
175 | 0 | std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed)); |
176 | 0 | std::uniform_real_distribution<double> dist(0, 1); |
177 | |
|
178 | 0 | auto random_string = [](std::mt19937& r, int len) { |
179 | 0 | std::uniform_int_distribution<int> char_dist('a', 'z'); |
180 | 0 | std::string ret; |
181 | 0 | for (int i = 0; i < len; ++i) { |
182 | 0 | ret += char_dist(r); |
183 | 0 | } |
184 | 0 | return ret; |
185 | 0 | }; |
186 | |
|
187 | 0 | while (!stop_.load(std::memory_order_relaxed)) { |
188 | 0 | std::string prefix; |
189 | 0 | prefix.resize(kPrefixSize); |
190 | 0 | for (int i = 0; i < kPrefixSize; ++i) { |
191 | 0 | prefix[i] = key_prefix_[i].load(std::memory_order_relaxed); |
192 | 0 | } |
193 | 0 | auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize); |
194 | 0 | auto value = random_string(rng, FLAGS_value_size); |
195 | 0 | WriteOptions woptions; |
196 | 0 | woptions.sync = dist(rng) < FLAGS_sync_probability; |
197 | 0 | auto s = db_->Put(woptions, key, value); |
198 | 0 | if (!s.ok()) { |
199 | 0 | fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str()); |
200 | 0 | std::abort(); |
201 | 0 | } |
202 | 0 | } |
203 | 0 | } |
204 | | |
205 | 0 | void IteratorHoldThread() { |
206 | 0 | while (!stop_.load(std::memory_order_relaxed)) { |
207 | 0 | std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions())); |
208 | 0 | Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 * |
209 | 0 | 1000LL); |
210 | 0 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { |
211 | 0 | } |
212 | 0 | if (!iterator->status().ok()) { |
213 | 0 | fprintf(stderr, "Iterator statuts not OK: %s\n", |
214 | 0 | iterator->status().ToString().c_str()); |
215 | 0 | std::abort(); |
216 | 0 | } |
217 | 0 | } |
218 | 0 | } |
219 | | |
220 | 0 | void PrefixMutatorThread() { |
221 | 0 | std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed)); |
222 | 0 | std::uniform_real_distribution<double> dist(0, 1); |
223 | 0 | std::uniform_int_distribution<int> char_dist('a', 'z'); |
224 | 0 | while (!stop_.load(std::memory_order_relaxed)) { |
225 | 0 | Env::Default()->SleepForMicroseconds(static_cast<int>( |
226 | 0 | FLAGS_prefix_mutate_period_sec * |
227 | 0 | 1000 * 1000LL)); |
228 | 0 | if (dist(rng) < FLAGS_first_char_mutate_probability) { |
229 | 0 | key_prefix_[0].store(char_dist(rng), std::memory_order_relaxed); |
230 | 0 | } |
231 | 0 | if (dist(rng) < FLAGS_second_char_mutate_probability) { |
232 | 0 | key_prefix_[1].store(char_dist(rng), std::memory_order_relaxed); |
233 | 0 | } |
234 | 0 | if (dist(rng) < FLAGS_third_char_mutate_probability) { |
235 | 0 | key_prefix_[2].store(char_dist(rng), std::memory_order_relaxed); |
236 | 0 | } |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | 0 | int Run() { |
241 | 0 | threads_.emplace_back([&]() { WriteThread(); }); |
242 | 0 | threads_.emplace_back([&]() { PrefixMutatorThread(); }); |
243 | 0 | threads_.emplace_back([&]() { IteratorHoldThread(); }); |
244 | |
|
245 | 0 | if (FLAGS_runtime_sec == -1) { |
246 | | // infinite runtime, until we get killed |
247 | 0 | while (true) { |
248 | 0 | Env::Default()->SleepForMicroseconds(1000 * 1000); |
249 | 0 | } |
250 | 0 | } |
251 | |
|
252 | 0 | Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000); |
253 | |
|
254 | 0 | stop_.store(true, std::memory_order_relaxed); |
255 | 0 | for (auto& t : threads_) { |
256 | 0 | t.join(); |
257 | 0 | } |
258 | 0 | threads_.clear(); |
259 | | |
260 | | // Skip checking for leaked files in ROCKSDB_LITE since we don't have access to |
261 | | // function GetLiveFilesMetaData |
262 | 0 | #ifndef ROCKSDB_LITE |
263 | | // let's see if we leaked some files |
264 | 0 | CHECK_OK(db_->PauseBackgroundWork()); |
265 | 0 | std::vector<LiveFileMetaData> metadata; |
266 | 0 | db_->GetLiveFilesMetaData(&metadata); |
267 | 0 | std::set<uint64_t> sst_file_numbers; |
268 | 0 | for (const auto& file : metadata) { |
269 | 0 | uint64_t number; |
270 | 0 | FileType type; |
271 | 0 | if (!ParseFileName(file.name, &number, "LOG", &type)) { |
272 | 0 | continue; |
273 | 0 | } |
274 | 0 | if (type == kTableFile) { |
275 | 0 | sst_file_numbers.insert(number); |
276 | 0 | } |
277 | 0 | } |
278 | |
|
279 | 0 | std::vector<std::string> children; |
280 | 0 | CHECK_OK(Env::Default()->GetChildren(FLAGS_db, &children)); |
281 | 0 | for (const auto& child : children) { |
282 | 0 | uint64_t number; |
283 | 0 | FileType type; |
284 | 0 | if (!ParseFileName(child, &number, "LOG", &type)) { |
285 | 0 | continue; |
286 | 0 | } |
287 | 0 | if (type == kTableFile) { |
288 | 0 | if (sst_file_numbers.find(number) == sst_file_numbers.end()) { |
289 | 0 | fprintf(stderr, |
290 | 0 | "Found a table file in DB path that should have been " |
291 | 0 | "deleted: %s\n", |
292 | 0 | child.c_str()); |
293 | 0 | std::abort(); |
294 | 0 | } |
295 | 0 | } |
296 | 0 | } |
297 | 0 | CHECK_OK(db_->ContinueBackgroundWork()); |
298 | 0 | #endif // !ROCKSDB_LITE |
299 | |
|
300 | 0 | return 0; |
301 | 0 | } |
302 | | |
303 | | private: |
304 | | // each key is prepended with this prefix. we occasionally change it. third |
305 | | // letter is changed more frequently than second, which is changed more |
306 | | // frequently than the first one. |
307 | | std::atomic<char> key_prefix_[kPrefixSize]; |
308 | | std::atomic<bool> stop_; |
309 | | std::vector<std::thread> threads_; |
310 | | std::unique_ptr<DB> db_; |
311 | | }; |
312 | | |
313 | | } // namespace rocksdb |
314 | | |
315 | 18.6k | int main(int argc, char** argv) { |
316 | 18.6k | SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + |
317 | 18.6k | " [OPTIONS]..."); |
318 | 18.6k | ParseCommandLineFlags(&argc, &argv, true); |
319 | 18.6k | rocksdb::WriteStress write_stress; |
320 | 18.6k | return write_stress.Run(); |
321 | 18.6k | } |
322 | | |
323 | | #endif // GFLAGS |