YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_job_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include <algorithm>
22
#include <map>
23
#include <string>
24
25
#include <gtest/gtest.h>
26
27
#include "yb/rocksdb/db/file_numbers.h"
28
#include "yb/rocksdb/db/filename.h"
29
#include "yb/rocksdb/db/flush_job.h"
30
#include "yb/rocksdb/db/version_set.h"
31
#include "yb/rocksdb/db/writebuffer.h"
32
#include "yb/rocksdb/table/mock_table.h"
33
#include "yb/rocksdb/util/file_reader_writer.h"
34
#include "yb/rocksdb/util/testharness.h"
35
#include "yb/rocksdb/util/testutil.h"
36
37
#include "yb/util/string_util.h"
38
#include "yb/util/test_macros.h"
39
40
namespace rocksdb {
41
42
// TODO(icanadi) Mock out everything else:
43
// 1. VersionSet
44
// 2. Memtable
45
class FlushJobTest : public RocksDBTest {
46
 public:
47
  FlushJobTest()
48
      : env_(Env::Default()),
49
        dbname_(test::TmpDir() + "/flush_job_test"),
50
        table_cache_(NewLRUCache(50000, 16)),
51
        write_buffer_(db_options_.db_write_buffer_size),
52
        versions_(new VersionSet(dbname_, &db_options_, env_options_,
53
                                 table_cache_.get(), &write_buffer_,
54
                                 &write_controller_)),
55
        shutting_down_(false),
56
3
        mock_table_factory_(new mock::MockTableFactory()) {
57
3
    EXPECT_OK(env_->CreateDirIfMissing(dbname_));
58
3
    db_options_.db_paths.emplace_back(dbname_,
59
3
                                      std::numeric_limits<uint64_t>::max());
60
3
    db_options_.boundary_extractor = test::MakeBoundaryValuesExtractor();
61
    // TODO(icanadi) Remove this once we mock out VersionSet
62
3
    NewDB();
63
3
    std::vector<ColumnFamilyDescriptor> column_families;
64
3
    cf_options_.table_factory = mock_table_factory_;
65
3
    column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
66
67
3
    EXPECT_OK(versions_->Recover(column_families, false));
68
3
  }
69
70
3
  void NewDB() {
71
3
    VersionEdit new_db;
72
3
    new_db.InitNewDB();
73
74
3
    const std::string manifest = DescriptorFileName(dbname_, 1);
75
3
    unique_ptr<WritableFile> file;
76
3
    Status s = env_->NewWritableFile(
77
3
        manifest, &file, env_->OptimizeForManifestWrite(env_options_));
78
3
    ASSERT_OK(s);
79
3
    unique_ptr<WritableFileWriter> file_writer(
80
3
        new WritableFileWriter(std::move(file), EnvOptions()));
81
3
    {
82
3
      log::Writer log(std::move(file_writer), 0, false);
83
3
      std::string record;
84
3
      new_db.AppendEncodedTo(&record);
85
3
      s = log.AddRecord(record);
86
3
    }
87
3
    ASSERT_OK(s);
88
    // Make "CURRENT" file that points to the new manifest file.
89
3
    s = SetCurrentFile(env_, dbname_, 1, nullptr, db_options_.disableDataSync);
90
3
  }
91
92
  Env* env_;
93
  std::string dbname_;
94
  EnvOptions env_options_;
95
  std::shared_ptr<Cache> table_cache_;
96
  WriteController write_controller_;
97
  DBOptions db_options_;
98
  WriteBuffer write_buffer_;
99
  ColumnFamilyOptions cf_options_;
100
  std::unique_ptr<VersionSet> versions_;
101
  InstrumentedMutex mutex_;
102
  std::atomic<bool> shutting_down_;
103
  std::atomic<bool> disable_flush_on_shutdown_{false};
104
  std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
105
};
106
107
1
TEST_F(FlushJobTest, Empty) {
108
1
  JobContext job_context(0);
109
1
  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
110
1
  EventLogger event_logger(db_options_.info_log.get());
111
1
  FileNumbersProvider file_numbers_provider(versions_.get());
112
1
  FlushJob flush_job(
113
1
      dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
114
1
      *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_,
115
1
      &disable_flush_on_shutdown_, {}, kMaxSequenceNumber, MemTableFilter(), &file_numbers_provider,
116
1
      &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger);
117
1
  {
118
1
    InstrumentedMutexLock l(&mutex_);
119
1
    ASSERT_OK(yb::ResultToStatus(flush_job.Run()));
120
1
  }
121
1
  job_context.Clean();
122
1
}
123
124
1
TEST_F(FlushJobTest, NonEmpty) {
125
1
  JobContext job_context(0);
126
1
  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
127
1
  auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
128
1
                                           kMaxSequenceNumber);
129
1
  new_mem->Ref();
130
1
  auto inserted_keys = mock::MakeMockFile();
131
  // Test data:
132
  //   seqno [    1,    2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
133
  //   key   [ 1001, 1002 ... 9998, 9999,    0,    1,    2 ...  999 ]
134
  // Expected:
135
  //   smallest_key   = "0"
136
  //   largest_key    = "9999"
137
  //   smallest_seqno = 1
138
  //   smallest_seqno = 9999
139
140
1
  test::BoundaryTestValues values;
141
142
10.0k
  for (int i = 1; i < 10000; ++i) {
143
9.99k
    std::string key(ToString((i + 1000) % 10000));
144
9.99k
    std::string value("value" + key);
145
9.99k
    Slice key_slice(key);
146
9.99k
    Slice value_slice(value);
147
9.99k
    new_mem->Add(
148
9.99k
        SequenceNumber(i), kTypeValue, SliceParts(&key_slice, 1), SliceParts(&value_slice, 1));
149
9.99k
    InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
150
9.99k
    inserted_keys.emplace(internal_key.Encode().ToBuffer(), value);
151
9.99k
    values.Feed(key);
152
9.99k
  }
153
1
  test::TestUserFrontiers frontiers(1, 12345);
154
1
  new_mem->UpdateFrontiers(frontiers);
155
156
1
  autovector<MemTable*> to_delete;
157
1
  cfd->imm()->Add(new_mem, &to_delete);
158
0
  for (auto& m : to_delete) {
159
0
    delete m;
160
0
  }
161
162
1
  EventLogger event_logger(db_options_.info_log.get());
163
1
  FileNumbersProvider file_numbers_provider(versions_.get());
164
1
  FlushJob flush_job(
165
1
      dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
166
1
      *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_,
167
1
      &disable_flush_on_shutdown_, {}, kMaxSequenceNumber, MemTableFilter(), &file_numbers_provider,
168
1
      &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger);
169
1
  FileMetaData fd;
170
1
  {
171
1
    InstrumentedMutexLock l(&mutex_);
172
1
    ASSERT_OK(yb::ResultToStatus(flush_job.Run(&fd)));
173
1
  }
174
1
  ASSERT_EQ(ToString(0), fd.smallest.key.user_key().ToString());
175
1
  ASSERT_EQ(ToString(9999), fd.largest.key.user_key().ToString());
176
1
  ASSERT_EQ(1, fd.smallest.seqno);
177
1
  ASSERT_EQ(9999, fd.largest.seqno);
178
1
  ASSERT_TRUE(frontiers.Smallest().Equals(*fd.smallest.user_frontier));
179
1
  ASSERT_TRUE(frontiers.Largest().Equals(*fd.largest.user_frontier));
180
1
  values.Check(fd.smallest, fd.largest);
181
1
  mock_table_factory_->AssertSingleFile(inserted_keys);
182
1
  job_context.Clean();
183
1
}
184
185
1
TEST_F(FlushJobTest, Snapshots) {
186
1
  JobContext job_context(0);
187
1
  auto cfd = versions_->GetColumnFamilySet()->GetDefault();
188
1
  auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
189
1
                                           kMaxSequenceNumber);
190
191
1
  std::vector<SequenceNumber> snapshots;
192
1
  std::set<SequenceNumber> snapshots_set;
193
1
  int keys = 10000;
194
1
  int max_inserts_per_keys = 8;
195
196
1
  Random rnd(301);
197
5.00k
  for (int i = 0; i < keys / 2; ++i) {
198
5.00k
    snapshots.push_back(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
199
5.00k
    snapshots_set.insert(snapshots.back());
200
5.00k
  }
201
1
  std::sort(snapshots.begin(), snapshots.end());
202
203
1
  new_mem->Ref();
204
1
  SequenceNumber current_seqno = 0;
205
1
  auto inserted_keys = mock::MakeMockFile();
206
10.0k
  for (int i = 1; i < keys; ++i) {
207
9.99k
    std::string key(ToString(i));
208
9.99k
    Slice key_slice(key);
209
9.99k
    int insertions = rnd.Uniform(max_inserts_per_keys);
210
45.1k
    for (int j = 0; j < insertions; ++j) {
211
35.1k
      std::string value(test::RandomHumanReadableString(&rnd, 10));
212
35.1k
      Slice value_slice(value);
213
35.1k
      auto seqno = ++current_seqno;
214
35.1k
      new_mem->Add(SequenceNumber(seqno), kTypeValue, SliceParts(&key_slice, 1),
215
35.1k
                   SliceParts(&value_slice, 1));
216
      // a key is visible only if:
217
      // 1. it's the last one written (j == insertions - 1)
218
      // 2. there's a snapshot pointing at it
219
35.1k
      bool visible = (j == insertions - 1) ||
220
26.3k
                     (snapshots_set.find(seqno) != snapshots_set.end());
221
35.1k
      if (visible) {
222
11.9k
        InternalKey internal_key(key, seqno, kTypeValue);
223
11.9k
        inserted_keys.insert({internal_key.Encode().ToString(), value});
224
11.9k
      }
225
35.1k
    }
226
9.99k
  }
227
228
1
  autovector<MemTable*> to_delete;
229
1
  cfd->imm()->Add(new_mem, &to_delete);
230
0
  for (auto& m : to_delete) {
231
0
    delete m;
232
0
  }
233
234
1
  EventLogger event_logger(db_options_.info_log.get());
235
1
  FileNumbersProvider file_numbers_provider(versions_.get());
236
1
  FlushJob flush_job(
237
1
      dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
238
1
      *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_,
239
1
      &disable_flush_on_shutdown_, snapshots, kMaxSequenceNumber, MemTableFilter(),
240
1
      &file_numbers_provider, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr,
241
1
      &event_logger);
242
1
  {
243
1
    InstrumentedMutexLock l(&mutex_);
244
1
    ASSERT_OK(yb::ResultToStatus(flush_job.Run()));
245
1
  }
246
1
  mock_table_factory_->AssertSingleFile(inserted_keys);
247
1
  job_context.Clean();
248
1
}
249
250
}  // namespace rocksdb
251
252
13.2k
int main(int argc, char** argv) {
253
13.2k
  ::testing::InitGoogleTest(&argc, argv);
254
13.2k
  return RUN_ALL_TESTS();
255
13.2k
}