/Users/deen/code/yugabyte-db/src/yb/rocksdb/tools/db_repl_stress.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | #ifndef GFLAGS |
23 | | #include <cstdio> |
24 | | int main() { |
25 | | fprintf(stderr, "Please install gflags to run rocksdb tools\n"); |
26 | | return 1; |
27 | | } |
28 | | #else |
29 | | |
30 | | #include <cstdio> |
31 | | #include <atomic> |
32 | | |
33 | | #include <gflags/gflags.h> |
34 | | |
35 | | #include "yb/rocksdb/db/write_batch_internal.h" |
36 | | #include "yb/rocksdb/db.h" |
37 | | #include "yb/rocksdb/types.h" |
38 | | #include "yb/rocksdb/util/testutil.h" |
39 | | |
40 | | #include "yb/util/status_log.h" |
41 | | |
42 | | // Run a thread to perform Put's. |
43 | | // Another thread uses GetUpdatesSince API to keep getting the updates. |
44 | | // options : |
45 | | // --num_inserts = the num of inserts the first thread should perform. |
46 | | // --wal_ttl = the wal ttl for the run. |
47 | | |
48 | | using GFLAGS::ParseCommandLineFlags; |
49 | | using GFLAGS::SetUsageMessage; |
50 | | |
51 | | namespace rocksdb { |
52 | | |
53 | | struct DataPumpThread { |
54 | | size_t no_records; |
55 | | DB* db; // Assumption DB is Open'ed already. |
56 | | }; |
57 | | |
58 | 0 | static void DataPumpThreadBody(void* arg) { |
59 | 0 | DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg); |
60 | 0 | DB* db = t->db; |
61 | 0 | Random rnd(301); |
62 | 0 | size_t i = 0; |
63 | 0 | while (i++ < t->no_records) { |
64 | 0 | if (!db->Put( |
65 | 0 | WriteOptions(), Slice(RandomString(&rnd, 500)), |
66 | 0 | Slice(RandomString(&rnd, 500))).ok()) { |
67 | 0 | fprintf(stderr, "Error in put\n"); |
68 | 0 | exit(1); |
69 | 0 | } |
70 | 0 | } |
71 | 0 | } |
72 | | |
73 | | struct ReplicationThread { |
74 | | std::atomic<bool> stop; |
75 | | DB* db; |
76 | | volatile size_t no_read; |
77 | | }; |
78 | | |
79 | 0 | static void ReplicationThreadBody(void* arg) { |
80 | 0 | ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg); |
81 | 0 | DB* db = t->db; |
82 | 0 | unique_ptr<TransactionLogIterator> iter; |
83 | 0 | SequenceNumber currentSeqNum = 1; |
84 | 0 | while (!t->stop.load(std::memory_order_acquire)) { |
85 | 0 | iter.reset(); |
86 | 0 | Status s; |
87 | 0 | while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { |
88 | 0 | if (t->stop.load(std::memory_order_acquire)) { |
89 | 0 | return; |
90 | 0 | } |
91 | 0 | } |
92 | 0 | fprintf(stderr, "Refreshing iterator\n"); |
93 | 0 | for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { |
94 | 0 | BatchResult res = iter->GetBatch(); |
95 | 0 | if (res.sequence != currentSeqNum) { |
96 | 0 | fprintf( |
97 | 0 | stderr, |
98 | 0 | "Missed a seq no. b/w %" PRISN " and %" PRISN "\n", |
99 | 0 | currentSeqNum, |
100 | 0 | res.sequence); |
101 | 0 | exit(1); |
102 | 0 | } |
103 | 0 | } |
104 | 0 | } |
105 | 0 | } |
106 | | |
107 | | DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should" |
108 | | " perform."); |
109 | | DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)"); |
110 | | DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run" |
111 | | "(in MB)"); |
112 | | |
113 | 0 | int db_repl_stress(int argc, const char** argv) { |
114 | 0 | SetUsageMessage( |
115 | 0 | std::string("\nUSAGE:\n") + std::string(argv[0]) + |
116 | 0 | " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" + |
117 | 0 | " --wal_size_limit_MB=<WAL_size_limit_MB>"); |
118 | 0 | ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true); |
119 | |
|
120 | 0 | Env* env = Env::Default(); |
121 | 0 | std::string default_db_path; |
122 | 0 | CHECK_OK(env->GetTestDirectory(&default_db_path)); |
123 | 0 | default_db_path += "db_repl_stress"; |
124 | 0 | Options options; |
125 | 0 | options.create_if_missing = true; |
126 | 0 | options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; |
127 | 0 | options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; |
128 | 0 | DB* db; |
129 | 0 | CHECK_OK(DestroyDB(default_db_path, options)); |
130 | |
|
131 | 0 | Status s = DB::Open(options, default_db_path, &db); |
132 | |
|
133 | 0 | if (!s.ok()) { |
134 | 0 | fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); |
135 | 0 | exit(1); |
136 | 0 | } |
137 | | |
138 | 0 | DataPumpThread dataPump; |
139 | 0 | dataPump.no_records = FLAGS_num_inserts; |
140 | 0 | dataPump.db = db; |
141 | 0 | env->StartThread(DataPumpThreadBody, &dataPump); |
142 | |
|
143 | 0 | ReplicationThread replThread; |
144 | 0 | replThread.db = db; |
145 | 0 | replThread.no_read = 0; |
146 | 0 | replThread.stop.store(false, std::memory_order_release); |
147 | |
|
148 | 0 | env->StartThread(ReplicationThreadBody, &replThread); |
149 | 0 | while (replThread.no_read < FLAGS_num_inserts) {} |
150 | 0 | replThread.stop.store(true, std::memory_order_release); |
151 | 0 | if (replThread.no_read < dataPump.no_records) { |
152 | | // no. read should be => than inserted. |
153 | 0 | fprintf( |
154 | 0 | stderr, |
155 | 0 | "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt |
156 | 0 | " Written : %" ROCKSDB_PRIszt "\n", |
157 | 0 | replThread.no_read, dataPump.no_records); |
158 | 0 | exit(1); |
159 | 0 | } |
160 | 0 | fprintf(stderr, "Successful!\n"); |
161 | 0 | exit(0); |
162 | 0 | } |
163 | | |
164 | | } // namespace rocksdb |
165 | | |
166 | | int main(int argc, const char** argv) { rocksdb::db_repl_stress(argc, argv); } |
167 | | |
168 | | #endif // GFLAGS |
169 | | |
170 | | #else // ROCKSDB_LITE |
171 | | #include <stdio.h> |
172 | | int main(int argc, char** argv) { |
173 | | fprintf(stderr, "Not supported in lite mode.\n"); |
174 | | return 1; |
175 | | } |
176 | | #endif // ROCKSDB_LITE |