YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/compaction_job_stats_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#ifndef __STDC_FORMAT_MACROS
25
#define __STDC_FORMAT_MACROS
26
#endif
27
28
#include <inttypes.h>
29
#include <algorithm>
30
#include <mutex>
31
#include <queue>
32
#include <set>
33
#include <thread>
34
#include <unordered_set>
35
#include <utility>
36
37
#include "yb/rocksdb/db/db_impl.h"
38
#include "yb/rocksdb/db/dbformat.h"
39
#include "yb/rocksdb/db/filename.h"
40
#include "yb/rocksdb/db/job_context.h"
41
#include "yb/rocksdb/db/version_set.h"
42
#include "yb/rocksdb/db/write_batch_internal.h"
43
#include "yb/rocksdb/port/stack_trace.h"
44
#include "yb/rocksdb/cache.h"
45
#include "yb/rocksdb/compaction_filter.h"
46
#include "yb/rocksdb/db.h"
47
#include "yb/rocksdb/env.h"
48
#include "yb/rocksdb/filter_policy.h"
49
#include "yb/rocksdb/options.h"
50
#include "yb/util/slice.h"
51
#include "yb/rocksdb/slice_transform.h"
52
#include "yb/rocksdb/table.h"
53
#include "yb/rocksdb/table_properties.h"
54
#include "yb/rocksdb/table/block_based_table_factory.h"
55
#include "yb/rocksdb/table/plain_table_factory.h"
56
#include "yb/rocksdb/table/scoped_arena_iterator.h"
57
#include "yb/rocksdb/util/compression.h"
58
#include "yb/rocksdb/util/hash.h"
59
#include "yb/rocksdb/util/logging.h"
60
#include "yb/rocksdb/util/mutexlock.h"
61
#include "yb/rocksdb/util/statistics.h"
62
#include "yb/rocksdb/util/sync_point.h"
63
#include "yb/rocksdb/util/testharness.h"
64
#include "yb/rocksdb/util/testutil.h"
65
66
#include "yb/util/string_util.h"
67
#include "yb/util/test_util.h"
68
69
#if !defined(IOS_CROSS_COMPILE)
70
#ifndef ROCKSDB_LITE
71
namespace rocksdb {
72
73
6.90k
static std::string RandomString(Random* rnd, int len, double ratio) {
74
6.90k
  std::string r;
75
6.90k
  CompressibleString(rnd, ratio, len, &r);
76
6.90k
  return r;
77
6.90k
}
78
79
7.12k
std::string Key(uint64_t key, int length) {
80
7.12k
  const int kBufSize = 1000;
81
7.12k
  char buf[kBufSize];
82
7.12k
  if (length > kBufSize) {
83
0
    length = kBufSize;
84
0
  }
85
7.12k
  snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
86
7.12k
  return std::string(buf);
87
7.12k
}
88
89
class CompactionJobStatsTest : public RocksDBTest,
90
                               public testing::WithParamInterface<bool> {
91
 public:
92
  std::string dbname_;
93
  std::string alternative_wal_dir_;
94
  Env* env_;
95
  DB* db_;
96
  std::vector<ColumnFamilyHandle*> handles_;
97
  uint32_t max_subcompactions_;
98
99
  Options last_options_;
100
101
6
  CompactionJobStatsTest() : env_(Env::Default()) {
102
6
    env_->SetBackgroundThreads(1, Env::LOW);
103
6
    env_->SetBackgroundThreads(1, Env::HIGH);
104
6
    dbname_ = test::TmpDir(env_) + "/compaction_job_stats_test";
105
6
    alternative_wal_dir_ = dbname_ + "/wal";
106
6
    Options options;
107
6
    options.create_if_missing = true;
108
6
    max_subcompactions_ = GetParam();
109
6
    options.max_subcompactions = max_subcompactions_;
110
6
    auto delete_options = options;
111
6
    delete_options.wal_dir = alternative_wal_dir_;
112
6
    EXPECT_OK(DestroyDB(dbname_, delete_options));
113
    // Destroy it for not alternative WAL dir is used.
114
6
    EXPECT_OK(DestroyDB(dbname_, options));
115
6
    db_ = nullptr;
116
6
    Reopen(options);
117
6
  }
118
119
6
  ~CompactionJobStatsTest() {
120
6
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
121
6
    rocksdb::SyncPoint::GetInstance()->LoadDependency({});
122
6
    rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
123
6
    Close();
124
6
    Options options;
125
6
    options.db_paths.emplace_back(dbname_, 0);
126
6
    options.db_paths.emplace_back(dbname_ + "_2", 0);
127
6
    options.db_paths.emplace_back(dbname_ + "_3", 0);
128
6
    options.db_paths.emplace_back(dbname_ + "_4", 0);
129
6
    EXPECT_OK(DestroyDB(dbname_, options));
130
6
  }
131
132
  // Required if inheriting from testing::WithParamInterface<>
133
6
  static void SetUpTestCase() {}
134
6
  static void TearDownTestCase() {}
135
136
32
  DBImpl* dbfull() {
137
32
    return reinterpret_cast<DBImpl*>(db_);
138
32
  }
139
140
  void CreateColumnFamilies(const std::vector<std::string>& cfs,
141
8
                            const Options& options) {
142
8
    ColumnFamilyOptions cf_opts(options);
143
8
    size_t cfi = handles_.size();
144
8
    handles_.resize(cfi + cfs.size());
145
8
    for (auto cf : cfs) {
146
8
      ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
147
8
    }
148
8
  }
149
150
  void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
151
8
                             const Options& options) {
152
8
    CreateColumnFamilies(cfs, options);
153
8
    std::vector<std::string> cfs_plus_default = cfs;
154
8
    cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
155
8
    ReopenWithColumnFamilies(cfs_plus_default, options);
156
8
  }
157
158
  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
159
0
                                const std::vector<Options>& options) {
160
0
    ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
161
0
  }
162
163
  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
164
8
                                const Options& options) {
165
8
    ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
166
8
  }
167
168
  Status TryReopenWithColumnFamilies(
169
      const std::vector<std::string>& cfs,
170
8
      const std::vector<Options>& options) {
171
8
    Close();
172
8
    EXPECT_EQ(cfs.size(), options.size());
173
8
    std::vector<ColumnFamilyDescriptor> column_families;
174
24
    for (size_t i = 0; i < cfs.size(); ++i) {
175
16
      column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
176
16
    }
177
8
    DBOptions db_opts = DBOptions(options[0]);
178
8
    return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
179
8
  }
180
181
  Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
182
8
                                     const Options& options) {
183
8
    Close();
184
8
    std::vector<Options> v_opts(cfs.size(), options);
185
8
    return TryReopenWithColumnFamilies(cfs, v_opts);
186
8
  }
187
188
6
  void Reopen(const Options& options) {
189
6
    ASSERT_OK(TryReopen(options));
190
6
  }
191
192
44
  void Close() {
193
24
    for (auto h : handles_) {
194
24
      delete h;
195
24
    }
196
44
    handles_.clear();
197
44
    delete db_;
198
44
    db_ = nullptr;
199
44
  }
200
201
8
  void DestroyAndReopen(const Options& options) {
202
    // Destroy using last options
203
8
    Destroy(last_options_);
204
8
    ASSERT_OK(TryReopen(options));
205
8
  }
206
207
8
  void Destroy(const Options& options) {
208
8
    Close();
209
8
    ASSERT_OK(DestroyDB(dbname_, options));
210
8
  }
211
212
0
  Status ReadOnlyReopen(const Options& options) {
213
0
    return DB::OpenForReadOnly(options, dbname_, &db_);
214
0
  }
215
216
14
  Status TryReopen(const Options& options) {
217
14
    Close();
218
14
    last_options_ = options;
219
14
    return DB::Open(options, dbname_, &db_);
220
14
  }
221
222
96
  Status Flush(int cf = 0) {
223
96
    if (cf == 0) {
224
0
      return db_->Flush(FlushOptions());
225
96
    } else {
226
96
      return db_->Flush(FlushOptions(), handles_[cf]);
227
96
    }
228
96
  }
229
230
0
  Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
231
0
    return db_->Put(wo, k, v);
232
0
  }
233
234
  Status Put(int cf, const Slice& k, const Slice& v,
235
6.90k
             WriteOptions wo = WriteOptions()) {
236
6.90k
    return db_->Put(wo, handles_[cf], k, v);
237
6.90k
  }
238
239
0
  Status Delete(const std::string& k) {
240
0
    return db_->Delete(WriteOptions(), k);
241
0
  }
242
243
112
  Status Delete(int cf, const std::string& k) {
244
112
    return db_->Delete(WriteOptions(), handles_[cf], k);
245
112
  }
246
247
0
  std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
248
0
    ReadOptions options;
249
0
    options.verify_checksums = true;
250
0
    options.snapshot = snapshot;
251
0
    std::string result;
252
0
    Status s = db_->Get(options, k, &result);
253
0
    if (s.IsNotFound()) {
254
0
      result = "NOT_FOUND";
255
0
    } else if (!s.ok()) {
256
0
      result = s.ToString();
257
0
    }
258
0
    return result;
259
0
  }
260
261
  std::string Get(int cf, const std::string& k,
262
0
                  const Snapshot* snapshot = nullptr) {
263
0
    ReadOptions options;
264
0
    options.verify_checksums = true;
265
0
    options.snapshot = snapshot;
266
0
    std::string result;
267
0
    Status s = db_->Get(options, handles_[cf], k, &result);
268
0
    if (s.IsNotFound()) {
269
0
      result = "NOT_FOUND";
270
0
    } else if (!s.ok()) {
271
0
      result = s.ToString();
272
0
    }
273
0
    return result;
274
0
  }
275
276
292
  int NumTableFilesAtLevel(int level, int cf = 0) {
277
292
    std::string property;
278
292
    if (cf == 0) {
279
      // default cfd
280
0
      EXPECT_TRUE(db_->GetProperty(
281
0
          "rocksdb.num-files-at-level" + NumberToString(level), &property));
282
292
    } else {
283
292
      EXPECT_TRUE(db_->GetProperty(
284
292
          handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
285
292
          &property));
286
292
    }
287
292
    return atoi(property.c_str());
288
292
  }
289
290
  // Return spread of files per level
291
96
  std::string FilesPerLevel(int cf = 0) {
292
96
    int num_levels =
293
96
        (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
294
96
    std::string result;
295
96
    size_t last_non_zero_offset = 0;
296
384
    for (int level = 0; level < num_levels; level++) {
297
288
      int f = NumTableFilesAtLevel(level, cf);
298
288
      char buf[100];
299
192
      snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
300
288
      result += buf;
301
288
      if (f > 0) {
302
148
        last_non_zero_offset = result.size();
303
148
      }
304
288
    }
305
96
    result.resize(last_non_zero_offset);
306
96
    return result;
307
96
  }
308
309
0
  uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) {
310
0
    Range r(start, limit);
311
0
    uint64_t size;
312
0
    if (cf == 0) {
313
0
      db_->GetApproximateSizes(&r, 1, &size);
314
0
    } else {
315
0
      db_->GetApproximateSizes(handles_[1], &r, 1, &size);
316
0
    }
317
0
    return size;
318
0
  }
319
320
  void Compact(int cf, const Slice& start, const Slice& limit,
321
0
               uint32_t target_path_id) {
322
0
    CompactRangeOptions compact_options;
323
0
    compact_options.target_path_id = target_path_id;
324
0
    ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
325
0
  }
326
327
20
  void Compact(int cf, const Slice& start, const Slice& limit) {
328
20
    ASSERT_OK(
329
20
        db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
330
20
  }
331
332
0
  void Compact(const Slice& start, const Slice& limit) {
333
0
    ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
334
0
  }
335
336
32
  void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
337
32
    ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
338
32
                                          true /* disallow trivial move */));
339
32
  }
340
341
  // Do n memtable compactions, each of which produces an sstable
342
  // covering the range [small,large].
343
  void MakeTables(int n, const std::string& small, const std::string& large,
344
0
                  int cf = 0) {
345
0
    for (int i = 0; i < n; i++) {
346
0
      ASSERT_OK(Put(cf, small, "begin"));
347
0
      ASSERT_OK(Put(cf, large, "end"));
348
0
      ASSERT_OK(Flush(cf));
349
0
    }
350
0
  }
351
352
  static void SetDeletionCompactionStats(
353
      CompactionJobStats *stats, uint64_t input_deletions,
354
2
      uint64_t expired_deletions, uint64_t records_replaced) {
355
2
    stats->num_input_deletion_records = input_deletions;
356
2
    stats->num_expired_deletion_records = expired_deletions;
357
2
    stats->num_records_replaced = records_replaced;
358
2
  }
359
360
  void MakeTableWithKeyValues(
361
    Random* rnd, uint64_t smallest, uint64_t largest,
362
    int key_size, int value_size, uint64_t interval,
363
88
    double ratio, int cf = 0) {
364
6.96k
    for (auto key = smallest; key < largest; key += interval) {
365
6.88k
      ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
366
6.88k
                        Slice(RandomString(rnd, value_size, ratio))));
367
6.88k
    }
368
88
    ASSERT_OK(Flush(cf));
369
88
  }
370
371
  // This function behaves with the implicit understanding that two
372
  // rounds of keys are inserted into the database, as per the behavior
373
  // of the DeletionStatsTest.
374
  void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
375
    uint64_t interval, int deletion_interval, int key_size,
376
2
    uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
377
378
    // interval needs to be >= 2 so that deletion entries can be inserted
379
    // that are intended to not result in an actual key deletion by using
380
    // an offset of 1 from another existing key
381
2
    ASSERT_GE(interval, 2);
382
383
2
    uint64_t ctr = 1;
384
2
    uint32_t deletions_made = 0;
385
2
    uint32_t num_deleted = 0;
386
2
    uint32_t num_expired = 0;
387
322
    for (auto key = smallest; key <= largest; key += interval, ctr++) {
388
320
      if (ctr % deletion_interval == 0) {
389
106
        ASSERT_OK(Delete(cf, Key(key, key_size)));
390
106
        deletions_made++;
391
106
        num_deleted++;
392
393
106
        if (key > cutoff_key_num) {
394
54
          num_expired++;
395
54
        }
396
106
      }
397
320
    }
398
399
    // Insert some deletions for keys that don't exist that
400
    // are both in and out of the key range
401
2
    ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
402
2
    deletions_made++;
403
404
2
    ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
405
2
    deletions_made++;
406
2
    num_expired++;
407
408
2
    ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
409
2
    deletions_made++;
410
2
    num_expired++;
411
412
2
    ASSERT_OK(Flush(cf));
413
2
    SetDeletionCompactionStats(stats, deletions_made, num_expired,
414
2
      num_deleted);
415
2
  }
416
};
417
418
// An EventListener which helps verify the compaction results in
419
// test CompactionJobStatsTest.
420
class CompactionJobStatsChecker : public EventListener {
421
 public:
422
  CompactionJobStatsChecker()
423
6
      : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
424
425
50
  size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
426
427
4
  void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
428
429
  // Once a compaction completed, this function will verify the returned
430
  // CompactionJobInfo with the oldest CompactionJobInfo added earlier
431
  // in "expected_stats_" which has not yet being used for verification.
432
62
  virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {
433
62
    if (verify_next_comp_io_stats_) {
434
4
      ASSERT_GT(ci.stats.file_write_nanos, 0);
435
4
      ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
436
4
      ASSERT_GT(ci.stats.file_fsync_nanos, 0);
437
4
      ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
438
4
      verify_next_comp_io_stats_ = false;
439
4
    }
440
441
62
    std::lock_guard<std::mutex> lock(mutex_);
442
62
    if (expected_stats_.size()) {
443
54
      Verify(ci.stats, expected_stats_.front());
444
54
      expected_stats_.pop();
445
54
    }
446
62
  }
447
448
  // A helper function which verifies whether two CompactionJobStats
449
  // match.  The verification of all compaction stats are done by
450
  // ASSERT_EQ except for the total input / output bytes, which we
451
  // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
452
  // 10% in uncompressed case and 20% when compression is used.
453
  virtual void Verify(const CompactionJobStats& current_stats,
454
52
              const CompactionJobStats& stats) {
455
    // time
456
52
    ASSERT_GT(current_stats.elapsed_micros, 0U);
457
458
52
    ASSERT_EQ(current_stats.num_input_records,
459
52
        stats.num_input_records);
460
52
    ASSERT_EQ(current_stats.num_input_files,
461
52
        stats.num_input_files);
462
52
    ASSERT_EQ(current_stats.num_input_files_at_output_level,
463
52
        stats.num_input_files_at_output_level);
464
465
52
    ASSERT_EQ(current_stats.num_output_records,
466
52
        stats.num_output_records);
467
52
    ASSERT_EQ(current_stats.num_output_files,
468
52
        stats.num_output_files);
469
470
52
    ASSERT_EQ(current_stats.is_manual_compaction,
471
52
        stats.is_manual_compaction);
472
473
    // file size
474
52
    double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
475
52
    ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
476
52
              stats.total_input_bytes);
477
52
    ASSERT_LE(current_stats.total_input_bytes,
478
52
              stats.total_input_bytes * (1.00 + kFileSizeBias));
479
52
    ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
480
52
              stats.total_output_bytes);
481
52
    ASSERT_LE(current_stats.total_output_bytes,
482
52
              stats.total_output_bytes * (1.00 + kFileSizeBias));
483
52
    ASSERT_EQ(current_stats.total_input_raw_key_bytes,
484
52
              stats.total_input_raw_key_bytes);
485
52
    ASSERT_EQ(current_stats.total_input_raw_value_bytes,
486
52
              stats.total_input_raw_value_bytes);
487
488
52
    ASSERT_EQ(current_stats.num_records_replaced,
489
52
        stats.num_records_replaced);
490
491
52
    ASSERT_EQ(current_stats.num_corrupt_keys,
492
52
        stats.num_corrupt_keys);
493
494
52
    ASSERT_EQ(
495
52
        std::string(current_stats.smallest_output_key_prefix),
496
52
        std::string(stats.smallest_output_key_prefix));
497
52
    ASSERT_EQ(
498
52
        std::string(current_stats.largest_output_key_prefix),
499
52
        std::string(stats.largest_output_key_prefix));
500
52
  }
501
502
  // Add an expected compaction stats, which will be used to
503
  // verify the CompactionJobStats returned by the OnCompactionCompleted()
504
  // callback.
505
54
  void AddExpectedStats(const CompactionJobStats& stats) {
506
54
    std::lock_guard<std::mutex> lock(mutex_);
507
54
    expected_stats_.push(stats);
508
54
  }
509
510
4
  void EnableCompression(bool flag) {
511
4
    compression_enabled_ = flag;
512
4
  }
513
514
4
  bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
515
516
 private:
517
  std::mutex mutex_;
518
  std::queue<CompactionJobStats> expected_stats_;
519
  bool compression_enabled_;
520
  bool verify_next_comp_io_stats_;
521
};
522
523
// An EventListener which helps verify the compaction statistics in
524
// the test DeletionStatsTest.
525
class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
526
 public:
527
  // Verifies whether two CompactionJobStats match.
528
  void Verify(const CompactionJobStats& current_stats,
529
2
              const CompactionJobStats& stats) {
530
2
    ASSERT_EQ(
531
2
      current_stats.num_input_deletion_records,
532
2
      stats.num_input_deletion_records);
533
2
    ASSERT_EQ(
534
2
        current_stats.num_expired_deletion_records,
535
2
        stats.num_expired_deletion_records);
536
2
    ASSERT_EQ(
537
2
        current_stats.num_records_replaced,
538
2
        stats.num_records_replaced);
539
540
2
    ASSERT_EQ(current_stats.num_corrupt_keys,
541
2
        stats.num_corrupt_keys);
542
2
  }
543
};
544
545
namespace {
546
547
uint64_t EstimatedFileSize(
548
    uint64_t num_records, size_t key_size, size_t value_size,
549
    double compression_ratio = 1.0,
550
    size_t block_size = 4096,
551
104
    int bloom_bits_per_key = 10) {
552
104
  const size_t kPerKeyOverhead = 8;
553
104
  const size_t kFooterSize = 512;
554
555
104
  uint64_t data_size =
556
104
    static_cast<uint64_t>(
557
104
      num_records * (key_size + value_size * compression_ratio +
558
104
                     kPerKeyOverhead));
559
560
104
  return data_size + kFooterSize
561
104
         + num_records * bloom_bits_per_key / 8      // filter block
562
104
         + data_size * (key_size + 8) / block_size;  // index block
563
104
}
564
565
namespace {
566
567
104
void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
568
104
  assert(prefix_length > 0);
569
104
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
570
104
  dst->assign(src.cdata(), length);
571
104
}
572
573
}  // namespace
574
575
CompactionJobStats NewManualCompactionJobStats(
576
    const std::string& smallest_key, const std::string& largest_key,
577
    size_t num_input_files, size_t num_input_files_at_output_level,
578
    uint64_t num_input_records, size_t key_size, size_t value_size,
579
    size_t num_output_files, uint64_t num_output_records,
580
    double compression_ratio, uint64_t num_records_replaced,
581
52
    bool is_manual = true) {
582
52
  CompactionJobStats stats;
583
52
  stats.Reset();
584
585
52
  stats.num_input_records = num_input_records;
586
52
  stats.num_input_files = num_input_files;
587
52
  stats.num_input_files_at_output_level = num_input_files_at_output_level;
588
589
52
  stats.num_output_records = num_output_records;
590
52
  stats.num_output_files = num_output_files;
591
592
52
  stats.total_input_bytes =
593
52
      EstimatedFileSize(
594
52
          num_input_records / num_input_files,
595
52
          key_size, value_size, compression_ratio) * num_input_files;
596
52
  stats.total_output_bytes =
597
52
      EstimatedFileSize(
598
52
          num_output_records / num_output_files,
599
52
          key_size, value_size, compression_ratio) * num_output_files;
600
52
  stats.total_input_raw_key_bytes =
601
52
      num_input_records * (key_size + 8);
602
52
  stats.total_input_raw_value_bytes =
603
52
      num_input_records * value_size;
604
605
52
  stats.is_manual_compaction = is_manual;
606
607
52
  stats.num_records_replaced = num_records_replaced;
608
609
52
  CopyPrefix(smallest_key,
610
52
             CompactionJobStats::kMaxPrefixLength,
611
52
             &stats.smallest_output_key_prefix);
612
52
  CopyPrefix(largest_key,
613
52
             CompactionJobStats::kMaxPrefixLength,
614
52
             &stats.largest_output_key_prefix);
615
616
52
  return stats;
617
52
}
618
619
4
CompressionType GetAnyCompression() {
620
4
  if (Snappy_Supported()) {
621
4
    return kSnappyCompression;
622
0
  } else if (Zlib_Supported()) {
623
0
    return kZlibCompression;
624
0
  } else if (BZip2_Supported()) {
625
0
    return kBZip2Compression;
626
0
  } else if (LZ4_Supported()) {
627
0
    return kLZ4Compression;
628
0
  }
629
0
  return kNoCompression;
630
0
}
631
632
}  // namespace
633
634
2
TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
635
2
  Random rnd(301);
636
2
  const int kBufSize = 100;
637
2
  char buf[kBufSize];
638
2
  uint64_t key_base = 100000000l;
639
  // Note: key_base must be multiple of num_keys_per_L0_file
640
2
  int num_keys_per_L0_file = 100;
641
2
  const int kTestScale = 8;
642
2
  const int kKeySize = 10;
643
2
  const int kValueSize = 1000;
644
2
  const double kCompressionRatio = 0.5;
645
2
  double compression_ratio = 1.0;
646
2
  uint64_t key_interval = key_base / num_keys_per_L0_file;
647
648
  // Whenever a compaction completes, this listener will try to
649
  // verify whether the returned CompactionJobStats matches
650
  // what we expect.  The expected CompactionJobStats is added
651
  // via AddExpectedStats().
652
2
  auto* stats_checker = new CompactionJobStatsChecker();
653
2
  Options options;
654
2
  options.listeners.emplace_back(stats_checker);
655
2
  options.create_if_missing = true;
656
2
  options.max_background_flushes = 0;
657
  // just enough setting to hold off auto-compaction.
658
2
  options.level0_file_num_compaction_trigger = kTestScale + 1;
659
2
  options.num_levels = 3;
660
2
  options.compression = kNoCompression;
661
2
  options.max_subcompactions = max_subcompactions_;
662
2
  options.bytes_per_sync = 512 * 1024;
663
664
2
  options.compaction_measure_io_stats = true;
665
6
  for (int test = 0; test < 2; ++test) {
666
4
    DestroyAndReopen(options);
667
4
    CreateAndReopenWithCF({"pikachu"}, options);
668
669
    // 1st Phase: generate "num_L0_files" L0 files.
670
4
    int num_L0_files = 0;
671
4
    for (uint64_t start_key = key_base;
672
36
                  start_key <= key_base * kTestScale;
673
32
                  start_key += key_base) {
674
32
      MakeTableWithKeyValues(
675
32
          &rnd, start_key, start_key + key_base - 1,
676
32
          kKeySize, kValueSize, key_interval,
677
32
          compression_ratio, 1);
678
32
      snprintf(buf, kBufSize, "%d", ++num_L0_files);
679
32
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
680
32
    }
681
4
    ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));
682
683
    // 2nd Phase: perform L0 -> L1 compaction.
684
4
    int L0_compaction_count = 6;
685
4
    int count = 1;
686
4
    std::string smallest_key;
687
4
    std::string largest_key;
688
4
    for (uint64_t start_key = key_base;
689
28
         start_key <= key_base * L0_compaction_count;
690
24
         start_key += key_base, count++) {
691
24
      smallest_key = Key(start_key, 10);
692
24
      largest_key = Key(start_key + key_base - key_interval, 10);
693
24
      stats_checker->AddExpectedStats(
694
24
          NewManualCompactionJobStats(
695
24
              smallest_key, largest_key,
696
24
              1, 0, num_keys_per_L0_file,
697
24
              kKeySize, kValueSize,
698
24
              1, num_keys_per_L0_file,
699
24
              compression_ratio, 0));
700
24
      ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
701
24
      TEST_Compact(0, 1, smallest_key, largest_key);
702
24
      snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
703
24
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
704
24
    }
705
706
    // compact two files into one in the last L0 -> L1 compaction
707
4
    int num_remaining_L0 = num_L0_files - L0_compaction_count;
708
4
    smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
709
4
    largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
710
4
    stats_checker->AddExpectedStats(
711
4
        NewManualCompactionJobStats(
712
4
            smallest_key, largest_key,
713
4
            num_remaining_L0,
714
4
            0, num_keys_per_L0_file * num_remaining_L0,
715
4
            kKeySize, kValueSize,
716
4
            1, num_keys_per_L0_file * num_remaining_L0,
717
4
            compression_ratio, 0));
718
4
    ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
719
4
    TEST_Compact(0, 1, smallest_key, largest_key);
720
721
4
    int num_L1_files = num_L0_files - num_remaining_L0 + 1;
722
4
    num_L0_files = 0;
723
4
    snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
724
4
    ASSERT_EQ(std::string(buf), FilesPerLevel(1));
725
726
    // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
727
4
    int sparseness = 2;
728
4
    for (uint64_t start_key = key_base;
729
20
                  start_key <= key_base * kTestScale;
730
16
                  start_key += key_base * sparseness) {
731
16
      MakeTableWithKeyValues(
732
16
          &rnd, start_key, start_key + key_base * sparseness - 1,
733
16
          kKeySize, kValueSize,
734
16
          key_base * sparseness / num_keys_per_L0_file,
735
16
          compression_ratio, 1);
736
16
      snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
737
16
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
738
16
    }
739
740
    // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
741
    // When subcompactions are enabled, the number of output files increases
742
    // by 1 because multiple threads are consuming the input and generating
743
    // output files without coordinating to see if the output could fit into
744
    // a smaller number of files like it does when it runs sequentially
745
4
    int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
746
4
    for (uint64_t start_key = key_base;
747
16
         num_L0_files > 1;
748
12
         start_key += key_base * sparseness) {
749
12
      smallest_key = Key(start_key, 10);
750
12
      largest_key =
751
12
          Key(start_key + key_base * sparseness - key_interval, 10);
752
12
      stats_checker->AddExpectedStats(
753
12
          NewManualCompactionJobStats(
754
12
              smallest_key, largest_key,
755
12
              3, 2, num_keys_per_L0_file * 3,
756
12
              kKeySize, kValueSize,
757
12
              num_output_files,
758
12
              num_keys_per_L0_file * 2,  // 1/3 of the data will be updated.
759
12
              compression_ratio,
760
12
              num_keys_per_L0_file));
761
12
      ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
762
12
      Compact(1, smallest_key, largest_key);
763
12
      if (options.max_subcompactions == 1) {
764
12
        --num_L1_files;
765
12
      }
766
12
      snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
767
12
      ASSERT_EQ(std::string(buf), FilesPerLevel(1));
768
12
    }
769
770
    // 5th Phase: Do a full compaction, which involves in two sub-compactions.
771
    // Here we expect to have 1 L0 files and 4 L1 files
772
    // In the first sub-compaction, we expect L0 compaction.
773
4
    smallest_key = Key(key_base, 10);
774
4
    largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
775
4
    stats_checker->AddExpectedStats(
776
4
        NewManualCompactionJobStats(
777
4
            Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
778
4
            2, 1, num_keys_per_L0_file * 3,
779
4
            kKeySize, kValueSize,
780
4
            1, num_keys_per_L0_file * 2,
781
4
            compression_ratio,
782
4
            num_keys_per_L0_file));
783
4
    ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
784
4
    Compact(1, smallest_key, largest_key);
785
786
4
    num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
787
4
    char L1_buf[4];
788
4
    snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
789
4
    std::string L1_files(L1_buf);
790
4
    ASSERT_EQ(L1_files, FilesPerLevel(1));
791
4
    options.compression = GetAnyCompression();
792
4
    if (options.compression == kNoCompression) {
793
0
      break;
794
0
    }
795
4
    stats_checker->EnableCompression(true);
796
4
    compression_ratio = kCompressionRatio;
797
798
24
    for (int i = 0; i < 5; i++) {
799
20
      ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
800
20
                    Slice(RandomString(&rnd, 512 * 1024, 1))));
801
20
    }
802
803
4
    ASSERT_OK(Flush(1));
804
4
    ASSERT_OK(reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact());
805
806
4
    stats_checker->set_verify_next_comp_io_stats(true);
807
4
    std::atomic<bool> first_prepare_write(true);
808
4
    rocksdb::SyncPoint::GetInstance()->SetCallBack(
809
504
        "WritableFileWriter::Append:BeforePrepareWrite", [&](void* arg) {
810
504
          if (first_prepare_write.load()) {
811
4
            options.env->SleepForMicroseconds(3);
812
4
            first_prepare_write.store(false);
813
4
          }
814
504
        });
815
816
4
    std::atomic<bool> first_flush(true);
817
4
    rocksdb::SyncPoint::GetInstance()->SetCallBack(
818
252
        "WritableFileWriter::Flush:BeforeAppend", [&](void* arg) {
819
252
          if (first_flush.load()) {
820
4
            options.env->SleepForMicroseconds(3);
821
4
            first_flush.store(false);
822
4
          }
823
252
        });
824
825
4
    std::atomic<bool> first_sync(true);
826
4
    rocksdb::SyncPoint::GetInstance()->SetCallBack(
827
20
        "WritableFileWriter::SyncInternal:0", [&](void* arg) {
828
20
          if (first_sync.load()) {
829
4
            options.env->SleepForMicroseconds(3);
830
4
            first_sync.store(false);
831
4
          }
832
20
        });
833
834
4
    std::atomic<bool> first_range_sync(true);
835
4
    rocksdb::SyncPoint::GetInstance()->SetCallBack(
836
12
        "WritableFileWriter::RangeSync:0", [&](void* arg) {
837
12
          if (first_range_sync.load()) {
838
4
            options.env->SleepForMicroseconds(3);
839
4
            first_range_sync.store(false);
840
4
          }
841
12
        });
842
4
    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
843
844
4
    Compact(1, smallest_key, largest_key);
845
846
4
    ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
847
4
    ASSERT_TRUE(!first_prepare_write.load());
848
4
    ASSERT_TRUE(!first_flush.load());
849
4
    ASSERT_TRUE(!first_sync.load());
850
4
    ASSERT_TRUE(!first_range_sync.load());
851
4
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
852
4
  }
853
2
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
854
2
}
855
856
2
TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
857
2
  Random rnd(301);
858
2
  uint64_t key_base = 100000l;
859
  // Note: key_base must be multiple of num_keys_per_L0_file
860
2
  int num_keys_per_L0_file = 20;
861
2
  const int kTestScale = 8;  // make sure this is even
862
2
  const int kKeySize = 10;
863
2
  const int kValueSize = 100;
864
2
  double compression_ratio = 1.0;
865
2
  uint64_t key_interval = key_base / num_keys_per_L0_file;
866
2
  uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
867
2
  uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
868
2
  const std::string smallest_key = Key(key_base - 10, kKeySize);
869
2
  const std::string largest_key = Key(largest_key_num + 10, kKeySize);
870
871
  // Whenever a compaction completes, this listener will try to
872
  // verify whether the returned CompactionJobStats matches
873
  // what we expect.
874
2
  auto* stats_checker = new CompactionJobDeletionStatsChecker();
875
2
  Options options;
876
2
  options.listeners.emplace_back(stats_checker);
877
2
  options.create_if_missing = true;
878
2
  options.max_background_flushes = 0;
879
2
  options.level0_file_num_compaction_trigger = kTestScale+1;
880
2
  options.num_levels = 3;
881
2
  options.compression = kNoCompression;
882
2
  options.max_bytes_for_level_multiplier = 2;
883
2
  options.max_subcompactions = max_subcompactions_;
884
885
2
  DestroyAndReopen(options);
886
2
  CreateAndReopenWithCF({"pikachu"}, options);
887
888
  // Stage 1: Generate several L0 files and then send them to L2 by
889
  // using CompactRangeOptions and CompactRange(). These files will
890
  // have a strict subset of the keys from the full key-range
891
2
  for (uint64_t start_key = key_base;
892
10
                start_key <= key_base * kTestScale / 2;
893
8
                start_key += key_base) {
894
8
    MakeTableWithKeyValues(
895
8
        &rnd, start_key, start_key + key_base - 1,
896
8
        kKeySize, kValueSize, key_interval,
897
8
        compression_ratio, 1);
898
8
  }
899
900
2
  CompactRangeOptions cr_options;
901
2
  cr_options.change_level = true;
902
2
  cr_options.target_level = 2;
903
2
  ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
904
2
  ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
905
906
  // Stage 2: Generate files including keys from the entire key range
907
2
  for (uint64_t start_key = key_base;
908
18
                start_key <= key_base * kTestScale;
909
16
                start_key += key_base) {
910
16
    MakeTableWithKeyValues(
911
16
        &rnd, start_key, start_key + key_base - 1,
912
16
        kKeySize, kValueSize, key_interval,
913
16
        compression_ratio, 1);
914
16
  }
915
916
  // Send these L0 files to L1
917
2
  TEST_Compact(0, 1, smallest_key, largest_key);
918
2
  ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
919
920
  // Add a new record and flush so now there is a L0 file
921
  // with a value too (not just deletions from the next step)
922
2
  ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
923
2
  ASSERT_OK(Flush(1));
924
925
  // Stage 3: Generate L0 files with some deletions so now
926
  // there are files with the same key range in L0, L1, and L2
927
2
  int deletion_interval = 3;
928
2
  CompactionJobStats first_compaction_stats;
929
2
  SelectivelyDeleteKeys(key_base, largest_key_num,
930
2
      key_interval, deletion_interval, kKeySize, cutoff_key_num,
931
2
      &first_compaction_stats, 1);
932
933
2
  stats_checker->AddExpectedStats(first_compaction_stats);
934
935
  // Stage 4: Trigger compaction and verify the stats
936
2
  TEST_Compact(0, 1, smallest_key, largest_key);
937
2
}
938
939
namespace {
940
14
int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
941
14
  uint32_t compaction_input_units;
942
14
  for (compaction_input_units = 1;
943
28
       num_flushes >= compaction_input_units;
944
28
       compaction_input_units *= 2) {
945
28
    if ((num_flushes & compaction_input_units) != 0) {
946
8
      return compaction_input_units > 1 ? compaction_input_units : 0;
947
14
    }
948
28
  }
949
0
  return 0;
950
14
}
951
}  // namespace
952
953
2
TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
954
2
  Random rnd(301);
955
2
  uint64_t key_base = 100000000l;
956
  // Note: key_base must be multiple of num_keys_per_L0_file
957
2
  int num_keys_per_table = 100;
958
2
  const uint32_t kTestScale = 8;
959
2
  const int kKeySize = 10;
960
2
  const int kValueSize = 900;
961
2
  double compression_ratio = 1.0;
962
2
  uint64_t key_interval = key_base / num_keys_per_table;
963
964
2
  auto* stats_checker = new CompactionJobStatsChecker();
965
2
  Options options;
966
2
  options.listeners.emplace_back(stats_checker);
967
2
  options.create_if_missing = true;
968
2
  options.num_levels = 3;
969
2
  options.compression = kNoCompression;
970
2
  options.level0_file_num_compaction_trigger = 2;
971
2
  options.target_file_size_base = num_keys_per_table * 1000;
972
2
  options.compaction_style = kCompactionStyleUniversal;
973
2
  options.compaction_options_universal.size_ratio = 1;
974
2
  options.compaction_options_universal.max_size_amplification_percent = 1000;
975
2
  options.max_subcompactions = max_subcompactions_;
976
977
2
  DestroyAndReopen(options);
978
2
  CreateAndReopenWithCF({"pikachu"}, options);
979
980
  // Generates the expected CompactionJobStats for each compaction
981
16
  for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
982
    // Here we treat one newly flushed file as an unit.
983
    //
984
    // For example, if a newly flushed file is 100k, and a compaction has
985
    // 4 input units, then this compaction inputs 400k.
986
14
    uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
987
14
    if (num_input_units == 0) {
988
6
      continue;
989
6
    }
990
    // The following statement determines the expected smallest key
991
    // based on whether it is a full compaction.  A full compaction only
992
    // happens when the number of flushes equals to the number of compaction
993
    // input runs.
994
8
    uint64_t smallest_key =
995
8
        (num_flushes == num_input_units) ?
996
6
            key_base : key_base * (num_flushes - 1);
997
998
8
    stats_checker->AddExpectedStats(
999
8
        NewManualCompactionJobStats(
1000
8
            Key(smallest_key, 10),
1001
8
            Key(smallest_key + key_base * num_input_units - key_interval, 10),
1002
8
            num_input_units,
1003
4
            num_input_units > 2 ? num_input_units / 2 : 0,
1004
8
            num_keys_per_table * num_input_units,
1005
8
            kKeySize, kValueSize,
1006
8
            num_input_units,
1007
8
            num_keys_per_table * num_input_units,
1008
8
            1.0, 0, false));
1009
8
  }
1010
2
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 4U);
1011
1012
2
  for (uint64_t start_key = key_base;
1013
18
                start_key <= key_base * kTestScale;
1014
16
                start_key += key_base) {
1015
16
    MakeTableWithKeyValues(
1016
16
        &rnd, start_key, start_key + key_base - 1,
1017
16
        kKeySize, kValueSize, key_interval,
1018
16
        compression_ratio, 1);
1019
16
    ASSERT_OK(reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact());
1020
16
  }
1021
2
  ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
1022
2
}
1023
1024
INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
1025
                        ::testing::Values(1, 4));
1026
}  // namespace rocksdb
1027
1028
13.2k
int main(int argc, char** argv) {
1029
13.2k
  rocksdb::port::InstallStackTraceHandler();
1030
13.2k
  ::testing::InitGoogleTest(&argc, argv);
1031
13.2k
  return RUN_ALL_TESTS();
1032
13.2k
}
1033
1034
#else
1035
#include <stdio.h>
1036
1037
int main(int argc, char** argv) {
1038
  fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
1039
  return 0;
1040
}
1041
1042
#endif  // !ROCKSDB_LITE
1043
1044
#else
1045
1046
int main(int argc, char** argv) { return 0; }
1047
#endif  // !defined(IOS_CROSS_COMPILE)