YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_callback_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include <atomic>
24
#include <string>
25
#include <utility>
26
#include <vector>
27
28
#include "yb/rocksdb/db/db_impl.h"
29
#include "yb/rocksdb/db/write_callback.h"
30
#include "yb/rocksdb/db.h"
31
#include "yb/rocksdb/write_batch.h"
32
#include "yb/rocksdb/util/logging.h"
33
#include "yb/rocksdb/util/random.h"
34
#include "yb/rocksdb/util/sync_point.h"
35
#include "yb/rocksdb/util/testharness.h"
36
#include "yb/rocksdb/util/testutil.h"
37
38
#include "yb/util/test_util.h"
39
40
using std::atomic;
41
using std::string;
42
43
namespace rocksdb {
44
45
class WriteCallbackTest : public RocksDBTest {
46
 public:
47
  string dbname;
48
49
2
  WriteCallbackTest() {
50
2
    dbname = test::TmpDir() + "/write_callback_testdb";
51
2
  }
52
};
53
54
class WriteCallbackTestWriteCallback1 : public WriteCallback {
55
 public:
56
  bool was_called = false;
57
58
1
  Status Callback(DB *db) override {
59
1
    was_called = true;
60
61
    // Make sure db is a DBImpl
62
1
    DBImpl* db_impl = dynamic_cast<DBImpl*> (db);
63
1
    if (db_impl == nullptr) {
64
0
      return STATUS(InvalidArgument, "");
65
0
    }
66
67
1
    return Status::OK();
68
1
  }
69
70
0
  bool AllowWriteBatching() override { return true; }
71
};
72
73
class WriteCallbackTestWriteCallback2 : public WriteCallback {
74
 public:
75
1
  Status Callback(DB *db) override {
76
1
    return STATUS(Busy, "");
77
1
  }
78
0
  bool AllowWriteBatching() override { return true; }
79
};
80
81
class MockWriteCallback : public WriteCallback {
82
 public:
83
  bool should_fail_ = false;
84
  atomic<bool> was_called_;
85
  bool allow_batching_ = false;
86
87
37
  MockWriteCallback() {
88
37
    was_called_.store(false);
89
37
  }
90
91
  MockWriteCallback(const MockWriteCallback& other)
92
      : should_fail_(other.should_fail_),
93
74
        allow_batching_(other.allow_batching_) {
94
74
    was_called_.store(other.was_called_.load());
95
74
  }
96
97
488
  Status Callback(DB* db) override {
98
488
    was_called_.store(true);
99
488
    if (should_fail_) {
100
216
      return STATUS(Busy, "");
101
272
    } else {
102
272
      return Status::OK();
103
272
    }
104
488
  }
105
106
192
  bool AllowWriteBatching() override { return allow_batching_; }
107
};
108
109
1
TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
110
1
  struct WriteOP {
111
37
    WriteOP(bool should_fail = false) { callback_.should_fail_ = should_fail; } // NOLINT
112
113
2.95k
    void Put(const string& key, const string& val) {
114
2.95k
      kvs_.push_back(std::make_pair(key, val));
115
2.95k
      write_batch_.Put(key, val);
116
2.95k
    }
117
118
294
    void Clear() {
119
294
      kvs_.clear();
120
294
      write_batch_.Clear();
121
294
      callback_.was_called_.store(false);
122
294
    }
123
124
1
    MockWriteCallback callback_;
125
1
    WriteBatch write_batch_;
126
1
    std::vector<std::pair<string, string>> kvs_;
127
1
  };
128
129
1
  std::vector<std::vector<WriteOP>> write_scenarios = {
130
1
      {true},
131
1
      {false},
132
1
      {false, false},
133
1
      {true, true},
134
1
      {true, false},
135
1
      {false, true},
136
1
      {false, false, false},
137
1
      {true, true, true},
138
1
      {false, true, false},
139
1
      {true, false, true},
140
1
      {true, false, false, false, false},
141
1
      {false, false, false, false, true},
142
1
      {false, false, true, false, true},
143
1
  };
144
145
2
  for (auto& allow_parallel : {true, false}) {
146
4
    for (auto& allow_batching : {true, false}) {
147
8
      for (auto& enable_WAL : {true, false}) {
148
104
        for (auto& write_group : write_scenarios) {
149
104
          Options options;
150
104
          options.create_if_missing = true;
151
104
          options.allow_concurrent_memtable_write = allow_parallel;
152
153
104
          ReadOptions read_options;
154
104
          DB* db;
155
104
          DBImpl* db_impl;
156
157
104
          ASSERT_OK(DB::Open(options, dbname, &db));
158
159
104
          db_impl = dynamic_cast<DBImpl*>(db);
160
104
          ASSERT_TRUE(db_impl);
161
162
104
          std::atomic<uint64_t> threads_waiting(0);
163
104
          std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
164
104
          ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
165
166
104
          rocksdb::SyncPoint::GetInstance()->SetCallBack(
167
296
              "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
168
296
                uint64_t cur_threads_waiting = 0;
169
296
                bool is_leader = false;
170
296
                bool is_last = false;
171
172
                // who am i
173
296
                do {
174
296
                  cur_threads_waiting = threads_waiting.load();
175
296
                  is_leader = (cur_threads_waiting == 0);
176
296
                  is_last = (cur_threads_waiting == write_group.size() - 1);
177
296
                } while (!threads_waiting.compare_exchange_strong(
178
296
                    cur_threads_waiting, cur_threads_waiting + 1));
179
180
                // check my state
181
296
                auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
182
183
296
                if (is_leader) {
184
104
                  ASSERT_TRUE(writer->state ==
185
104
                              WriteThread::State::STATE_GROUP_LEADER);
186
192
                } else {
187
192
                  ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT);
188
192
                }
189
190
                // (meta test) the first WriteOP should indeed be the first
191
                // and the last should be the last (all others can be out of
192
                // order)
193
296
                if (is_leader) {
194
104
                  ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
195
104
                              !write_group.front().callback_.should_fail_);
196
192
                } else if (is_last) {
197
88
                  ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
198
88
                              !write_group.back().callback_.should_fail_);
199
88
                }
200
201
                // wait for friends
202
243k
                while (threads_waiting.load() < write_group.size()) {
203
243k
                }
204
296
              });
205
206
104
          rocksdb::SyncPoint::GetInstance()->SetCallBack(
207
191
              "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
208
                // check my state
209
191
                auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
210
211
191
                if (!allow_batching) {
212
                  // no batching so everyone should be a leader
213
96
                  ASSERT_TRUE(writer->state ==
214
96
                              WriteThread::State::STATE_GROUP_LEADER);
215
95
                } else if (!allow_parallel) {
216
48
                  ASSERT_TRUE(writer->state ==
217
48
                              WriteThread::State::STATE_COMPLETED);
218
48
                }
219
191
              });
220
221
104
          std::atomic<uint32_t> thread_num(0);
222
104
          std::atomic<char> dummy_key(0);
223
295
          std::function<void()> write_with_callback_func = [&]() {
224
295
            uint32_t i = thread_num.fetch_add(1);
225
295
            Random rnd(i);
226
227
            // leaders gotta lead
228
38.9k
            while (i > 0 && threads_waiting.load() < 1) {
229
38.6k
            }
230
231
            // loser has to lose
232
12.4k
            while (i == write_group.size() - 1 &&
233
12.3k
                   threads_waiting.load() < write_group.size() - 1) {
234
12.2k
            }
235
236
295
            auto& write_op = write_group.at(i);
237
295
            write_op.Clear();
238
295
            write_op.callback_.allow_batching_ = allow_batching;
239
240
            // insert some keys
241
3.24k
            for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
242
              // grab unique key
243
2.94k
              char my_key = 0;
244
2.98k
              do {
245
2.98k
                my_key = dummy_key.load();
246
2.98k
              } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1));
247
248
2.94k
              string skey(5, my_key);
249
2.94k
              string sval(10, my_key);
250
2.94k
              write_op.Put(skey, sval);
251
252
2.94k
              if (!write_op.callback_.should_fail_) {
253
1.83k
                seq.fetch_add(1);
254
1.83k
              }
255
2.94k
            }
256
257
295
            WriteOptions woptions;
258
295
            woptions.disableWAL = !enable_WAL;
259
295
            woptions.sync = enable_WAL;
260
295
            Status s = db_impl->WriteWithCallback(
261
295
                woptions, &write_op.write_batch_, &write_op.callback_);
262
263
295
            if (write_op.callback_.should_fail_) {
264
120
              ASSERT_TRUE(s.IsBusy());
265
175
            } else {
266
175
              ASSERT_OK(s);
267
175
            }
268
295
          };
269
270
104
          rocksdb::SyncPoint::GetInstance()->EnableProcessing();
271
272
          // do all the writes
273
104
          std::vector<std::thread> threads;
274
400
          for (uint32_t i = 0; i < write_group.size(); i++) {
275
296
            threads.emplace_back(write_with_callback_func);
276
296
          }
277
296
          for (auto& t : threads) {
278
296
            t.join();
279
296
          }
280
281
104
          rocksdb::SyncPoint::GetInstance()->DisableProcessing();
282
283
          // check for keys
284
104
          string value;
285
296
          for (auto& w : write_group) {
286
296
            ASSERT_TRUE(w.callback_.was_called_.load());
287
2.95k
            for (auto& kvp : w.kvs_) {
288
2.95k
              if (w.callback_.should_fail_) {
289
1.11k
                ASSERT_TRUE(
290
1.11k
                    db->Get(read_options, kvp.first, &value).IsNotFound());
291
1.84k
              } else {
292
1.84k
                ASSERT_OK(db->Get(read_options, kvp.first, &value));
293
1.84k
                ASSERT_EQ(value, kvp.second);
294
1.84k
              }
295
2.95k
            }
296
296
          }
297
298
104
          ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
299
300
104
          delete db;
301
104
          ASSERT_OK(DestroyDB(dbname, options));
302
104
        }
303
8
      }
304
4
    }
305
2
  }
306
1
}
307
308
1
TEST_F(WriteCallbackTest, WriteCallBackTest) {
309
1
  Options options;
310
1
  WriteOptions write_options;
311
1
  ReadOptions read_options;
312
1
  string value;
313
1
  DB* db;
314
1
  DBImpl* db_impl;
315
316
1
  options.create_if_missing = true;
317
1
  Status s = DB::Open(options, dbname, &db);
318
1
  ASSERT_OK(s);
319
320
1
  db_impl = dynamic_cast<DBImpl*> (db);
321
1
  ASSERT_TRUE(db_impl);
322
323
1
  WriteBatch wb;
324
325
1
  wb.Put("a", "value.a");
326
1
  wb.Delete("x");
327
328
  // Test a simple Write
329
1
  s = db->Write(write_options, &wb);
330
1
  ASSERT_OK(s);
331
332
1
  s = db->Get(read_options, "a", &value);
333
1
  ASSERT_OK(s);
334
1
  ASSERT_EQ("value.a", value);
335
336
  // Test WriteWithCallback
337
1
  WriteCallbackTestWriteCallback1 callback1;
338
1
  WriteBatch wb2;
339
340
1
  wb2.Put("a", "value.a2");
341
342
1
  s = db_impl->WriteWithCallback(write_options, &wb2, &callback1);
343
1
  ASSERT_OK(s);
344
1
  ASSERT_TRUE(callback1.was_called);
345
346
1
  s = db->Get(read_options, "a", &value);
347
1
  ASSERT_OK(s);
348
1
  ASSERT_EQ("value.a2", value);
349
350
  // Test WriteWithCallback for a callback that fails
351
1
  WriteCallbackTestWriteCallback2 callback2;
352
1
  WriteBatch wb3;
353
354
1
  wb3.Put("a", "value.a3");
355
356
1
  s = db_impl->WriteWithCallback(write_options, &wb3, &callback2);
357
1
  ASSERT_NOK(s);
358
359
1
  s = db->Get(read_options, "a", &value);
360
1
  ASSERT_OK(s);
361
1
  ASSERT_EQ("value.a2", value);
362
363
1
  delete db;
364
1
  ASSERT_OK(DestroyDB(dbname, options));
365
1
}
366
367
}  // namespace rocksdb
368
369
13.2k
int main(int argc, char** argv) {
370
13.2k
  ::testing::InitGoogleTest(&argc, argv);
371
13.2k
  return RUN_ALL_TESTS();
372
13.2k
}
373
374
#else
375
#include <stdio.h>
376
377
int main(int argc, char** argv) {
378
  fprintf(stderr,
379
          "SKIPPED as WriteWithCallback is not supported in ROCKSDB_LITE\n");
380
  return 0;
381
}
382
383
#endif  // !ROCKSDB_LITE