/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_callback_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include <atomic> |
24 | | #include <string> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "yb/rocksdb/db/db_impl.h" |
29 | | #include "yb/rocksdb/db/write_callback.h" |
30 | | #include "yb/rocksdb/db.h" |
31 | | #include "yb/rocksdb/write_batch.h" |
32 | | #include "yb/rocksdb/util/logging.h" |
33 | | #include "yb/rocksdb/util/random.h" |
34 | | #include "yb/rocksdb/util/sync_point.h" |
35 | | #include "yb/rocksdb/util/testharness.h" |
36 | | #include "yb/rocksdb/util/testutil.h" |
37 | | |
38 | | #include "yb/util/test_util.h" |
39 | | |
40 | | using std::atomic; |
41 | | using std::string; |
42 | | |
43 | | namespace rocksdb { |
44 | | |
45 | | class WriteCallbackTest : public RocksDBTest { |
46 | | public: |
47 | | string dbname; |
48 | | |
49 | 2 | WriteCallbackTest() { |
50 | 2 | dbname = test::TmpDir() + "/write_callback_testdb"; |
51 | 2 | } |
52 | | }; |
53 | | |
54 | | class WriteCallbackTestWriteCallback1 : public WriteCallback { |
55 | | public: |
56 | | bool was_called = false; |
57 | | |
58 | 1 | Status Callback(DB *db) override { |
59 | 1 | was_called = true; |
60 | | |
61 | | // Make sure db is a DBImpl |
62 | 1 | DBImpl* db_impl = dynamic_cast<DBImpl*> (db); |
63 | 1 | if (db_impl == nullptr) { |
64 | 0 | return STATUS(InvalidArgument, ""); |
65 | 0 | } |
66 | | |
67 | 1 | return Status::OK(); |
68 | 1 | } |
69 | | |
70 | 0 | bool AllowWriteBatching() override { return true; } |
71 | | }; |
72 | | |
73 | | class WriteCallbackTestWriteCallback2 : public WriteCallback { |
74 | | public: |
75 | 1 | Status Callback(DB *db) override { |
76 | 1 | return STATUS(Busy, ""); |
77 | 1 | } |
78 | 0 | bool AllowWriteBatching() override { return true; } |
79 | | }; |
80 | | |
81 | | class MockWriteCallback : public WriteCallback { |
82 | | public: |
83 | | bool should_fail_ = false; |
84 | | atomic<bool> was_called_; |
85 | | bool allow_batching_ = false; |
86 | | |
87 | 37 | MockWriteCallback() { |
88 | 37 | was_called_.store(false); |
89 | 37 | } |
90 | | |
91 | | MockWriteCallback(const MockWriteCallback& other) |
92 | | : should_fail_(other.should_fail_), |
93 | 74 | allow_batching_(other.allow_batching_) { |
94 | 74 | was_called_.store(other.was_called_.load()); |
95 | 74 | } |
96 | | |
97 | 488 | Status Callback(DB* db) override { |
98 | 488 | was_called_.store(true); |
99 | 488 | if (should_fail_) { |
100 | 216 | return STATUS(Busy, ""); |
101 | 272 | } else { |
102 | 272 | return Status::OK(); |
103 | 272 | } |
104 | 488 | } |
105 | | |
106 | 192 | bool AllowWriteBatching() override { return allow_batching_; } |
107 | | }; |
108 | | |
109 | 1 | TEST_F(WriteCallbackTest, WriteWithCallbackTest) { |
110 | 1 | struct WriteOP { |
111 | 37 | WriteOP(bool should_fail = false) { callback_.should_fail_ = should_fail; } // NOLINT |
112 | | |
113 | 2.95k | void Put(const string& key, const string& val) { |
114 | 2.95k | kvs_.push_back(std::make_pair(key, val)); |
115 | 2.95k | write_batch_.Put(key, val); |
116 | 2.95k | } |
117 | | |
118 | 294 | void Clear() { |
119 | 294 | kvs_.clear(); |
120 | 294 | write_batch_.Clear(); |
121 | 294 | callback_.was_called_.store(false); |
122 | 294 | } |
123 | | |
124 | 1 | MockWriteCallback callback_; |
125 | 1 | WriteBatch write_batch_; |
126 | 1 | std::vector<std::pair<string, string>> kvs_; |
127 | 1 | }; |
128 | | |
129 | 1 | std::vector<std::vector<WriteOP>> write_scenarios = { |
130 | 1 | {true}, |
131 | 1 | {false}, |
132 | 1 | {false, false}, |
133 | 1 | {true, true}, |
134 | 1 | {true, false}, |
135 | 1 | {false, true}, |
136 | 1 | {false, false, false}, |
137 | 1 | {true, true, true}, |
138 | 1 | {false, true, false}, |
139 | 1 | {true, false, true}, |
140 | 1 | {true, false, false, false, false}, |
141 | 1 | {false, false, false, false, true}, |
142 | 1 | {false, false, true, false, true}, |
143 | 1 | }; |
144 | | |
145 | 2 | for (auto& allow_parallel : {true, false}) { |
146 | 4 | for (auto& allow_batching : {true, false}) { |
147 | 8 | for (auto& enable_WAL : {true, false}) { |
148 | 104 | for (auto& write_group : write_scenarios) { |
149 | 104 | Options options; |
150 | 104 | options.create_if_missing = true; |
151 | 104 | options.allow_concurrent_memtable_write = allow_parallel; |
152 | | |
153 | 104 | ReadOptions read_options; |
154 | 104 | DB* db; |
155 | 104 | DBImpl* db_impl; |
156 | | |
157 | 104 | ASSERT_OK(DB::Open(options, dbname, &db)); |
158 | | |
159 | 104 | db_impl = dynamic_cast<DBImpl*>(db); |
160 | 104 | ASSERT_TRUE(db_impl); |
161 | | |
162 | 104 | std::atomic<uint64_t> threads_waiting(0); |
163 | 104 | std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber()); |
164 | 104 | ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); |
165 | | |
166 | 104 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
167 | 296 | "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { |
168 | 296 | uint64_t cur_threads_waiting = 0; |
169 | 296 | bool is_leader = false; |
170 | 296 | bool is_last = false; |
171 | | |
172 | | // who am i |
173 | 296 | do { |
174 | 296 | cur_threads_waiting = threads_waiting.load(); |
175 | 296 | is_leader = (cur_threads_waiting == 0); |
176 | 296 | is_last = (cur_threads_waiting == write_group.size() - 1); |
177 | 296 | } while (!threads_waiting.compare_exchange_strong( |
178 | 296 | cur_threads_waiting, cur_threads_waiting + 1)); |
179 | | |
180 | | // check my state |
181 | 296 | auto* writer = reinterpret_cast<WriteThread::Writer*>(arg); |
182 | | |
183 | 296 | if (is_leader) { |
184 | 104 | ASSERT_TRUE(writer->state == |
185 | 104 | WriteThread::State::STATE_GROUP_LEADER); |
186 | 192 | } else { |
187 | 192 | ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); |
188 | 192 | } |
189 | | |
190 | | // (meta test) the first WriteOP should indeed be the first |
191 | | // and the last should be the last (all others can be out of |
192 | | // order) |
193 | 296 | if (is_leader) { |
194 | 104 | ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == |
195 | 104 | !write_group.front().callback_.should_fail_); |
196 | 192 | } else if (is_last) { |
197 | 88 | ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == |
198 | 88 | !write_group.back().callback_.should_fail_); |
199 | 88 | } |
200 | | |
201 | | // wait for friends |
202 | 243k | while (threads_waiting.load() < write_group.size()) { |
203 | 243k | } |
204 | 296 | }); |
205 | | |
206 | 104 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
207 | 191 | "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { |
208 | | // check my state |
209 | 191 | auto* writer = reinterpret_cast<WriteThread::Writer*>(arg); |
210 | | |
211 | 191 | if (!allow_batching) { |
212 | | // no batching so everyone should be a leader |
213 | 96 | ASSERT_TRUE(writer->state == |
214 | 96 | WriteThread::State::STATE_GROUP_LEADER); |
215 | 95 | } else if (!allow_parallel) { |
216 | 48 | ASSERT_TRUE(writer->state == |
217 | 48 | WriteThread::State::STATE_COMPLETED); |
218 | 48 | } |
219 | 191 | }); |
220 | | |
221 | 104 | std::atomic<uint32_t> thread_num(0); |
222 | 104 | std::atomic<char> dummy_key(0); |
223 | 295 | std::function<void()> write_with_callback_func = [&]() { |
224 | 295 | uint32_t i = thread_num.fetch_add(1); |
225 | 295 | Random rnd(i); |
226 | | |
227 | | // leaders gotta lead |
228 | 38.9k | while (i > 0 && threads_waiting.load() < 1) { |
229 | 38.6k | } |
230 | | |
231 | | // loser has to lose |
232 | 12.4k | while (i == write_group.size() - 1 && |
233 | 12.3k | threads_waiting.load() < write_group.size() - 1) { |
234 | 12.2k | } |
235 | | |
236 | 295 | auto& write_op = write_group.at(i); |
237 | 295 | write_op.Clear(); |
238 | 295 | write_op.callback_.allow_batching_ = allow_batching; |
239 | | |
240 | | // insert some keys |
241 | 3.24k | for (uint32_t j = 0; j < rnd.Next() % 50; j++) { |
242 | | // grab unique key |
243 | 2.94k | char my_key = 0; |
244 | 2.98k | do { |
245 | 2.98k | my_key = dummy_key.load(); |
246 | 2.98k | } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1)); |
247 | | |
248 | 2.94k | string skey(5, my_key); |
249 | 2.94k | string sval(10, my_key); |
250 | 2.94k | write_op.Put(skey, sval); |
251 | | |
252 | 2.94k | if (!write_op.callback_.should_fail_) { |
253 | 1.83k | seq.fetch_add(1); |
254 | 1.83k | } |
255 | 2.94k | } |
256 | | |
257 | 295 | WriteOptions woptions; |
258 | 295 | woptions.disableWAL = !enable_WAL; |
259 | 295 | woptions.sync = enable_WAL; |
260 | 295 | Status s = db_impl->WriteWithCallback( |
261 | 295 | woptions, &write_op.write_batch_, &write_op.callback_); |
262 | | |
263 | 295 | if (write_op.callback_.should_fail_) { |
264 | 120 | ASSERT_TRUE(s.IsBusy()); |
265 | 175 | } else { |
266 | 175 | ASSERT_OK(s); |
267 | 175 | } |
268 | 295 | }; |
269 | | |
270 | 104 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
271 | | |
272 | | // do all the writes |
273 | 104 | std::vector<std::thread> threads; |
274 | 400 | for (uint32_t i = 0; i < write_group.size(); i++) { |
275 | 296 | threads.emplace_back(write_with_callback_func); |
276 | 296 | } |
277 | 296 | for (auto& t : threads) { |
278 | 296 | t.join(); |
279 | 296 | } |
280 | | |
281 | 104 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); |
282 | | |
283 | | // check for keys |
284 | 104 | string value; |
285 | 296 | for (auto& w : write_group) { |
286 | 296 | ASSERT_TRUE(w.callback_.was_called_.load()); |
287 | 2.95k | for (auto& kvp : w.kvs_) { |
288 | 2.95k | if (w.callback_.should_fail_) { |
289 | 1.11k | ASSERT_TRUE( |
290 | 1.11k | db->Get(read_options, kvp.first, &value).IsNotFound()); |
291 | 1.84k | } else { |
292 | 1.84k | ASSERT_OK(db->Get(read_options, kvp.first, &value)); |
293 | 1.84k | ASSERT_EQ(value, kvp.second); |
294 | 1.84k | } |
295 | 2.95k | } |
296 | 296 | } |
297 | | |
298 | 104 | ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); |
299 | | |
300 | 104 | delete db; |
301 | 104 | ASSERT_OK(DestroyDB(dbname, options)); |
302 | 104 | } |
303 | 8 | } |
304 | 4 | } |
305 | 2 | } |
306 | 1 | } |
307 | | |
308 | 1 | TEST_F(WriteCallbackTest, WriteCallBackTest) { |
309 | 1 | Options options; |
310 | 1 | WriteOptions write_options; |
311 | 1 | ReadOptions read_options; |
312 | 1 | string value; |
313 | 1 | DB* db; |
314 | 1 | DBImpl* db_impl; |
315 | | |
316 | 1 | options.create_if_missing = true; |
317 | 1 | Status s = DB::Open(options, dbname, &db); |
318 | 1 | ASSERT_OK(s); |
319 | | |
320 | 1 | db_impl = dynamic_cast<DBImpl*> (db); |
321 | 1 | ASSERT_TRUE(db_impl); |
322 | | |
323 | 1 | WriteBatch wb; |
324 | | |
325 | 1 | wb.Put("a", "value.a"); |
326 | 1 | wb.Delete("x"); |
327 | | |
328 | | // Test a simple Write |
329 | 1 | s = db->Write(write_options, &wb); |
330 | 1 | ASSERT_OK(s); |
331 | | |
332 | 1 | s = db->Get(read_options, "a", &value); |
333 | 1 | ASSERT_OK(s); |
334 | 1 | ASSERT_EQ("value.a", value); |
335 | | |
336 | | // Test WriteWithCallback |
337 | 1 | WriteCallbackTestWriteCallback1 callback1; |
338 | 1 | WriteBatch wb2; |
339 | | |
340 | 1 | wb2.Put("a", "value.a2"); |
341 | | |
342 | 1 | s = db_impl->WriteWithCallback(write_options, &wb2, &callback1); |
343 | 1 | ASSERT_OK(s); |
344 | 1 | ASSERT_TRUE(callback1.was_called); |
345 | | |
346 | 1 | s = db->Get(read_options, "a", &value); |
347 | 1 | ASSERT_OK(s); |
348 | 1 | ASSERT_EQ("value.a2", value); |
349 | | |
350 | | // Test WriteWithCallback for a callback that fails |
351 | 1 | WriteCallbackTestWriteCallback2 callback2; |
352 | 1 | WriteBatch wb3; |
353 | | |
354 | 1 | wb3.Put("a", "value.a3"); |
355 | | |
356 | 1 | s = db_impl->WriteWithCallback(write_options, &wb3, &callback2); |
357 | 1 | ASSERT_NOK(s); |
358 | | |
359 | 1 | s = db->Get(read_options, "a", &value); |
360 | 1 | ASSERT_OK(s); |
361 | 1 | ASSERT_EQ("value.a2", value); |
362 | | |
363 | 1 | delete db; |
364 | 1 | ASSERT_OK(DestroyDB(dbname, options)); |
365 | 1 | } |
366 | | |
367 | | } // namespace rocksdb |
368 | | |
369 | 13.2k | int main(int argc, char** argv) { |
370 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
371 | 13.2k | return RUN_ALL_TESTS(); |
372 | 13.2k | } |
373 | | |
374 | | #else |
375 | | #include <stdio.h> |
376 | | |
377 | | int main(int argc, char** argv) { |
378 | | fprintf(stderr, |
379 | | "SKIPPED as WriteWithCallback is not supported in ROCKSDB_LITE\n"); |
380 | | return 0; |
381 | | } |
382 | | |
383 | | #endif // !ROCKSDB_LITE |