YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/merge_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include <assert.h>
22
23
#include <memory>
24
25
#include <gtest/gtest.h>
26
27
#include "yb/rocksdb/cache.h"
28
#include "yb/rocksdb/db.h"
29
#include "yb/rocksdb/env.h"
30
#include "yb/rocksdb/merge_operator.h"
31
#include "yb/rocksdb/port/stack_trace.h"
32
#include "yb/rocksdb/util/coding.h"
33
#include "yb/rocksdb/util/testharness.h"
34
#include "yb/rocksdb/util/testutil.h"
35
#include "yb/rocksdb/utilities/db_ttl.h"
36
#include "yb/rocksdb/utilities/merge_operators.h"
37
38
#include "yb/util/test_macros.h"
39
40
DECLARE_bool(never_fsync);
41
42
namespace rocksdb {
43
44
namespace {
45
size_t num_merge_operator_calls;
46
44
void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
47
48
size_t num_partial_merge_calls;
49
0
void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
50
}
51
52
class CountMergeOperator: public AssociativeMergeOperator {
53
 public:
54
4
  CountMergeOperator() {
55
4
    mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
56
4
  }
57
58
  virtual bool Merge(const Slice& key,
59
                     const Slice* existing_value,
60
                     const Slice& value,
61
                     std::string* new_value,
62
384
                     Logger* logger) const override {
63
384
    assert(new_value->empty());
64
384
    ++num_merge_operator_calls;
65
384
    if (existing_value == nullptr) {
66
20
      new_value->assign(value.cdata(), value.size());
67
20
      return true;
68
20
    }
69
70
364
    return mergeOperator_->PartialMerge(
71
364
        key,
72
364
        *existing_value,
73
364
        value,
74
364
        new_value,
75
364
        logger);
76
364
  }
77
78
  virtual bool PartialMergeMulti(const Slice &key,
79
                                 const std::deque<Slice> &operand_list,
80
                                 std::string *new_value,
81
0
                                 Logger *logger) const override {
82
0
    assert(new_value->empty());
83
0
    ++num_partial_merge_calls;
84
0
    return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
85
0
        logger);
86
0
  }
87
88
6
  const char *Name() const override {
89
6
    return "UInt64AddOperator";
90
6
  }
91
92
 private:
93
  std::shared_ptr<MergeOperator> mergeOperator_;
94
};
95
96
namespace {
97
std::shared_ptr<DB> OpenDb(const std::string &dbname, const bool ttl = false,
98
                           const size_t max_successive_merges = 0,
99
4
                           const uint32_t min_partial_merge_operands = 2) {
100
4
  DB *db;
101
4
  Options options;
102
4
  options.create_if_missing = true;
103
4
  options.merge_operator = std::make_shared<CountMergeOperator>();
104
4
  options.max_successive_merges = max_successive_merges;
105
4
  options.min_partial_merge_operands = min_partial_merge_operands;
106
4
  Status s;
107
4
  CHECK_OK(DestroyDB(dbname, Options()));
108
// DBWithTTL is not supported in ROCKSDB_LITE
109
4
#ifndef ROCKSDB_LITE
110
4
  if (ttl) {
111
2
    std::cout << "Opening database with TTL\n";
112
2
    DBWithTTL *db_with_ttl;
113
2
    s = DBWithTTL::Open(options, dbname, &db_with_ttl);
114
2
    db = db_with_ttl;
115
2
  } else {
116
2
    s = DB::Open(options, dbname, &db);
117
2
  }
118
#else
119
  assert(!ttl);
120
  s = DB::Open(options, dbname, &db);
121
#endif  // !ROCKSDB_LITE
122
4
  if (!s.ok()) {
123
0
    std::cerr << s.ToString() << std::endl;
124
0
    assert(false);
125
0
  }
126
4
  return std::shared_ptr<DB>(db);
127
4
}
128
}  // namespace
129
130
// Imagine we are maintaining a set of uint64 counters.
131
// Each counter has a distinct name. And we would like
132
// to support four high level operations:
133
// set, add, get and remove
134
// This is a quick implementation without a Merge operation.
135
class Counters {
136
137
 protected:
138
  std::shared_ptr<DB> db_;
139
140
  WriteOptions put_option_;
141
  ReadOptions get_option_;
142
  WriteOptions delete_option_;
143
144
  uint64_t default_;
145
146
 public:
147
  explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
148
      : db_(db),
149
        put_option_(),
150
        get_option_(),
151
        delete_option_(),
152
6
        default_(defaultCount) {
153
6
    assert(db_);
154
6
  }
155
156
6
  virtual ~Counters() {}
157
158
  // public interface of Counters.
159
  // All four functions return false
160
  // if the underlying level db operation failed.
161
162
  // mapped to a levedb Put
163
106
  bool set(const std::string &key, uint64_t value) {
164
    // just treat the internal rep of int64 as the string
165
106
    Slice slice(reinterpret_cast<char *>(&value), sizeof(value));
166
106
    auto s = db_->Put(put_option_, key, slice);
167
168
106
    if (s.ok()) {
169
106
      return true;
170
0
    } else {
171
0
      std::cerr << s.ToString() << std::endl;
172
0
      return false;
173
0
    }
174
106
  }
175
176
  // mapped to a rocksdb Delete
177
8
  bool remove(const std::string &key) {
178
8
    auto s = db_->Delete(delete_option_, key);
179
180
8
    if (s.ok()) {
181
8
      return true;
182
0
    } else {
183
0
      std::cerr << s.ToString() << std::endl;
184
0
      return false;
185
0
    }
186
8
  }
187
188
  // mapped to a rocksdb Get
189
148
  bool get(const std::string &key, uint64_t *value) {
190
148
    std::string str;
191
148
    auto s = db_->Get(get_option_, key, &str);
192
193
148
    if (s.IsNotFound()) {
194
      // return default value if not found;
195
8
      *value = default_;
196
8
      return true;
197
140
    } else if (s.ok()) {
198
      // deserialization
199
140
      if (str.size() != sizeof(uint64_t)) {
200
0
        std::cerr << "value corruption\n";
201
0
        return false;
202
0
      }
203
140
      *value = DecodeFixed64(&str[0]);
204
140
      return true;
205
0
    } else {
206
0
      std::cerr << s.ToString() << std::endl;
207
0
      return false;
208
0
    }
209
148
  }
210
211
  // 'add' is implemented as get -> modify -> set
212
  // An alternative is a single merge operation, see MergeBasedCounters
213
100
  virtual bool add(const std::string &key, uint64_t value) {
214
100
    uint64_t base = default_;
215
100
    return get(key, &base) && set(key, base + value);
216
100
  }
217
218
219
  // convenience functions for testing
220
6
  void assert_set(const std::string &key, uint64_t value) {
221
6
    assert(set(key, value));
222
6
  }
223
224
8
  void assert_remove(const std::string &key) { assert(remove(key)); }
225
226
48
  uint64_t assert_get(const std::string &key) {
227
48
    uint64_t value = default_;
228
48
    int result = get(key, &value);
229
48
    assert(result);
230
48
    if (result == 0) exit(1); // Disable unused variable warning.
231
48
    return value;
232
48
  }
233
234
320
  void assert_add(const std::string &key, uint64_t value) {
235
320
    int result = add(key, value);
236
320
    assert(result);
237
320
    if (result == 0) exit(1); // Disable unused variable warning.
238
320
  }
239
};
240
241
// Implement 'add' directly with the new Merge operation
242
class MergeBasedCounters: public Counters {
243
 private:
244
  WriteOptions merge_option_; // for merge
245
246
 public:
247
  explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
248
      : Counters(db, defaultCount),
249
4
        merge_option_() {
250
4
  }
251
252
  // mapped to a rocksdb Merge operation
253
220
  bool add(const std::string &key, uint64_t value) override {
254
220
    char encoded[sizeof(uint64_t)];
255
220
    EncodeFixed64(encoded, value);
256
220
    Slice slice(encoded, sizeof(uint64_t));
257
220
    auto s = db_->Merge(merge_option_, key, slice);
258
259
220
    if (s.ok()) {
260
220
      return true;
261
0
    } else {
262
0
      std::cerr << s.ToString() << std::endl;
263
0
      return false;
264
0
    }
265
220
  }
266
};
267
268
namespace {
269
14
void dumpDb(DB *db) {
270
14
  auto it = unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
271
36
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
272
22
    uint64_t value = DecodeFixed64(it->value().data());
273
22
    std::cout << it->key().ToString() << ": " << value << std::endl;
274
22
  }
275
14
  assert(it->status().ok());  // Check for any errors found during the scan
276
14
}
277
278
6
void testCounters(Counters* counters_ptr, DB *db, bool test_compaction) {
279
6
  Counters& counters = *counters_ptr;
280
281
6
  FlushOptions o;
282
6
  o.wait = true;
283
284
6
  counters.assert_set("a", 1);
285
286
6
  if (test_compaction) {
287
2
    ASSERT_OK(db->Flush(o));
288
2
  }
289
290
6
  assert(counters.assert_get("a") == 1);
291
292
6
  counters.assert_remove("b");
293
294
  // defaut value is 0 if non-existent
295
6
  assert(counters.assert_get("b") == 0);
296
297
6
  counters.assert_add("a", 2);
298
299
6
  if (test_compaction) {
300
2
    ASSERT_OK(db->Flush(o));
301
2
  }
302
303
  // 1+2 = 3
304
6
  assert(counters.assert_get("a") == 3);
305
306
6
  dumpDb(db);
307
308
6
  std::cout << "1\n";
309
310
  // 1+...+49 = ?
311
6
  uint64_t sum = 0;
312
300
  for (int i = 1; i < 50; i++) {
313
294
    counters.assert_add("b", i);
314
294
    sum += i;
315
294
  }
316
6
  assert(counters.assert_get("b") == sum);
317
318
6
  std::cout << "2\n";
319
6
  dumpDb(db);
320
321
6
  std::cout << "3\n";
322
323
6
  if (test_compaction) {
324
2
    ASSERT_OK(db->Flush(o));
325
326
2
    std::cout << "Compaction started ...\n";
327
2
    ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
328
2
    std::cout << "Compaction ended\n";
329
330
2
    dumpDb(db);
331
332
2
    assert(counters.assert_get("a") == 3);
333
2
    assert(counters.assert_get("b") == sum);
334
2
  }
335
6
}
336
337
void testSuccessiveMerge(Counters *counters_ptr, size_t max_num_merges,
338
2
                         size_t num_merges) {
339
2
  Counters& counters = *counters_ptr;
340
341
2
  counters.assert_remove("z");
342
2
  uint64_t sum = 0;
343
344
22
  for (size_t i = 1; i <= num_merges; ++i) {
345
20
    resetNumMergeOperatorCalls();
346
20
    counters.assert_add("z", i);
347
20
    sum += i;
348
349
20
    if (i % (max_num_merges + 1) == 0) {
350
2
      assert(num_merge_operator_calls == max_num_merges + 1);
351
18
    } else {
352
18
      assert(num_merge_operator_calls == 0);
353
18
    }
354
355
20
    resetNumMergeOperatorCalls();
356
20
    assert(counters.assert_get("z") == sum);
357
20
    assert(num_merge_operator_calls == i % (max_num_merges + 1));
358
20
  }
359
2
}
360
361
void testPartialMerge(Counters *counters, DB *db, size_t max_merge,
362
0
                      size_t min_merge, size_t count) {
363
0
  FlushOptions o;
364
0
  o.wait = true;
365
366
  // Test case 1: partial merge should be called when the number of merge
367
  //              operands exceeds the threshold.
368
0
  uint64_t tmp_sum = 0;
369
0
  resetNumPartialMergeCalls();
370
0
  for (size_t i = 1; i <= count; i++) {
371
0
    counters->assert_add("b", i);
372
0
    tmp_sum += i;
373
0
  }
374
0
  ASSERT_OK(db->Flush(o));
375
0
  ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
376
0
  ASSERT_EQ(tmp_sum, counters->assert_get("b"));
377
0
  if (count > max_merge) {
378
    // in this case, FullMerge should be called instead.
379
0
    ASSERT_EQ(num_partial_merge_calls, 0U);
380
0
  } else {
381
    // if count >= min_merge, then partial merge should be called once.
382
0
    ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
383
0
  }
384
385
  // Test case 2: partial merge should not be called when a put is found.
386
0
  resetNumPartialMergeCalls();
387
0
  tmp_sum = 0;
388
0
  ASSERT_OK(db->Put(rocksdb::WriteOptions(), "c", "10"));
389
0
  for (size_t i = 1; i <= count; i++) {
390
0
    counters->assert_add("c", i);
391
0
    tmp_sum += i;
392
0
  }
393
0
  ASSERT_OK(db->Flush(o));
394
0
  ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
395
0
  ASSERT_EQ(tmp_sum, counters->assert_get("c"));
396
0
  ASSERT_EQ(num_partial_merge_calls, 0U);
397
0
}
398
399
void testSingleBatchSuccessiveMerge(DB *db, size_t max_num_merges,
400
2
                                    size_t num_merges) {
401
2
  assert(num_merges > max_num_merges);
402
403
2
  Slice key("BatchSuccessiveMerge");
404
2
  uint64_t merge_value = 1;
405
2
  Slice merge_value_slice(reinterpret_cast<char*>(&merge_value), sizeof(merge_value));
406
407
  // Create the batch
408
2
  WriteBatch batch;
409
16
  for (size_t i = 0; i < num_merges; ++i) {
410
14
    batch.Merge(key, merge_value_slice);
411
14
  }
412
413
  // Apply to memtable and count the number of merges
414
2
  resetNumMergeOperatorCalls();
415
2
  {
416
2
    Status s = db->Write(WriteOptions(), &batch);
417
2
    assert(s.ok());
418
2
  }
419
2
  ASSERT_EQ(
420
2
      num_merge_operator_calls,
421
2
      static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
422
423
  // Get the value
424
2
  resetNumMergeOperatorCalls();
425
2
  std::string get_value_str;
426
2
  {
427
2
    Status s = db->Get(ReadOptions(), key, &get_value_str);
428
2
    assert(s.ok());
429
2
  }
430
2
  assert(get_value_str.size() == sizeof(uint64_t));
431
2
  uint64_t get_value = DecodeFixed64(&get_value_str[0]);
432
2
  ASSERT_EQ(get_value, num_merges * merge_value);
433
2
  ASSERT_EQ(num_merge_operator_calls,
434
2
      static_cast<size_t>((num_merges % (max_num_merges + 1))));
435
2
}
436
437
2
void runTest(int argc, const std::string &dbname, const bool use_ttl = false) {
438
2
  bool compact = false;
439
2
  if (argc > 1) {
440
0
    compact = true;
441
0
    std::cout << "Turn on Compaction\n";
442
0
  }
443
444
2
  {
445
2
    auto db = OpenDb(dbname, use_ttl);
446
447
2
    {
448
2
      std::cout << "Test read-modify-write counters... \n";
449
2
      Counters counters(db, 0);
450
2
      testCounters(&counters, db.get(), true);
451
2
    }
452
453
2
    {
454
2
      std::cout << "Test merge-based counters... \n";
455
2
      MergeBasedCounters counters(db, 0);
456
2
      testCounters(&counters, db.get(), compact);
457
2
    }
458
2
  }
459
460
2
  ASSERT_OK(DestroyDB(dbname, Options()));
461
462
2
  {
463
2
    std::cout << "Test merge in memtable... \n";
464
2
    size_t max_merge = 5;
465
2
    auto db = OpenDb(dbname, use_ttl, max_merge);
466
2
    MergeBasedCounters counters(db, 0);
467
2
    testCounters(&counters, db.get(), compact);
468
2
    testSuccessiveMerge(&counters, max_merge, max_merge * 2);
469
2
    testSingleBatchSuccessiveMerge(db.get(), 5, 7);
470
2
    ASSERT_OK(DestroyDB(dbname, Options()));
471
2
  }
472
473
0
  {
474
0
    std::cout << "Test Partial-Merge\n";
475
0
    size_t max_merge = 100;
476
0
    for (uint32_t min_merge = 5; min_merge < 25; min_merge += 5) {
477
0
      for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
478
0
        auto db = OpenDb(dbname, use_ttl, max_merge, min_merge);
479
0
        MergeBasedCounters counters(db, 0);
480
0
        testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
481
0
        ASSERT_OK(DestroyDB(dbname, Options()));
482
0
      }
483
0
      {
484
0
        auto db = OpenDb(dbname, use_ttl, max_merge, min_merge);
485
0
        MergeBasedCounters counters(db, 0);
486
0
        testPartialMerge(&counters, db.get(), max_merge, min_merge,
487
0
            min_merge * 10);
488
0
        ASSERT_OK(DestroyDB(dbname, Options()));
489
0
      }
490
0
    }
491
0
  }
492
493
0
  {
494
0
    std::cout << "Test merge-operator not set after reopen\n";
495
0
    {
496
0
      auto db = OpenDb(dbname);
497
0
      MergeBasedCounters counters(db, 0);
498
0
      counters.add("test-key", 1);
499
0
      counters.add("test-key", 1);
500
0
      counters.add("test-key", 1);
501
0
      ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
502
0
    }
503
504
0
    DB *reopen_db;
505
0
    ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
506
0
    std::string value;
507
0
    ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
508
0
    delete reopen_db;
509
0
    ASSERT_OK(DestroyDB(dbname, Options()));
510
0
  }
511
512
  /* Temporary remove this test
513
  {
514
    std::cout << "Test merge-operator not set after reopen (recovery case)\n";
515
    {
516
      auto db = OpenDb(dbname);
517
      MergeBasedCounters counters(db, 0);
518
      counters.add("test-key", 1);
519
      counters.add("test-key", 1);
520
      counters.add("test-key", 1);
521
    }
522
523
    DB* reopen_db;
524
    ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
525
  }
526
  */
527
0
}
528
}  // namespace
529
} // namespace rocksdb
530
531
13.2k
int main(int argc, char *argv[]) {
532
  // TODO: Make this test like a general rocksdb unit-test
533
  // This is set by default in RocksDBTest and YBTest, and can be removed once this test is a
534
  // general rocksdb unit-test
535
13.2k
  FLAGS_never_fsync = true;
536
13.2k
  rocksdb::port::InstallStackTraceHandler();
537
13.2k
  rocksdb::runTest(argc, rocksdb::test::TmpDir() + "/merge_testdb");
538
// DBWithTTL is not supported in ROCKSDB_LITE
539
13.2k
#ifndef ROCKSDB_LITE
540
13.2k
  rocksdb::runTest(argc,
541
13.2k
                   rocksdb::test::TmpDir() + "/merge_testdbttl",
542
13.2k
                   true); // Run test on TTL database
543
13.2k
#endif  // !ROCKSDB_LITE
544
13.2k
  printf("Passed all tests!\n");
545
13.2k
  return 0;
546
13.2k
}