YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/wal_manager_test.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include <map>
24
#include <string>
25
26
#include "yb/rocksdb/cache.h"
27
#include "yb/rocksdb/write_batch.h"
28
29
#include "yb/rocksdb/db/column_family.h"
30
#include "yb/rocksdb/db/filename.h"
31
#include "yb/rocksdb/db/log_writer.h"
32
#include "yb/rocksdb/db/version_set.h"
33
#include "yb/rocksdb/db/wal_manager.h"
34
#include "yb/rocksdb/db/writebuffer.h"
35
#include "yb/rocksdb/util/file_reader_writer.h"
36
#include "yb/rocksdb/util/mock_env.h"
37
#include "yb/rocksdb/util/testharness.h"
38
#include "yb/rocksdb/util/testutil.h"
39
#include "yb/rocksdb/table/mock_table.h"
40
#include "yb/rocksdb/db/db_impl.h"
41
42
#include "yb/util/status_log.h"
43
#include "yb/util/string_util.h"
44
#include "yb/util/test_util.h"
45
46
namespace rocksdb {
47
48
// TODO(icanadi) mock out VersionSet
49
// TODO(icanadi) move other WalManager-specific tests from db_test here
50
class WalManagerTest : public RocksDBTest {
51
 public:
52
  WalManagerTest()
53
      : env_(new MockEnv(Env::Default())),
54
        dbname_(test::TmpDir() + "/wal_manager_test"),
55
        table_cache_(NewLRUCache(50000, 16)),
56
        write_buffer_(db_options_.db_write_buffer_size),
57
5
        current_log_number_(0) {
58
5
    CHECK_OK(DestroyDB(dbname_, Options()));
59
5
  }
60
61
5
  void Init() {
62
5
    ASSERT_OK(env_->CreateDirIfMissing(dbname_));
63
5
    ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
64
5
    db_options_.db_paths.emplace_back(dbname_,
65
5
                                      std::numeric_limits<uint64_t>::max());
66
5
    db_options_.wal_dir = dbname_;
67
5
    db_options_.env = env_.get();
68
69
5
    versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
70
5
                                   table_cache_.get(), &write_buffer_,
71
5
                                   &write_controller_));
72
73
5
    wal_manager_.reset(new WalManager(db_options_, env_options_));
74
5
  }
75
76
3
  void Reopen() {
77
3
    wal_manager_.reset(new WalManager(db_options_, env_options_));
78
3
  }
79
80
  // NOT thread safe
81
200k
  void Put(const std::string& key, const std::string& value) {
82
200k
    assert(current_log_writer_.get() != nullptr);
83
200k
    uint64_t seq =  versions_->LastSequence() + 1;
84
200k
    WriteBatch batch;
85
200k
    batch.Put(key, value);
86
200k
    WriteBatchInternal::SetSequence(&batch, seq);
87
200k
    ASSERT_OK(current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)));
88
200k
    versions_->SetLastSequence(seq);
89
200k
  }
90
91
  // NOT thread safe
92
44
  void RollTheLog(bool archived) {
93
44
    current_log_number_++;
94
44
    std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
95
44
    unique_ptr<WritableFile> file;
96
44
    ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
97
44
    unique_ptr<WritableFileWriter> file_writer(
98
44
        new WritableFileWriter(std::move(file), env_options_));
99
44
    current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
100
44
  }
101
102
2
  void CreateArchiveLogs(int num_logs, int entries_per_log) {
103
42
    for (int i = 1; i <= num_logs; ++i) {
104
40
      RollTheLog(true);
105
200k
      for (int k = 0; k < entries_per_log; ++k) {
106
200k
        Put(ToString(k), std::string(1024, 'a'));
107
200k
      }
108
40
    }
109
2
  }
110
111
  std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
112
2
      const SequenceNumber seq) {
113
2
    unique_ptr<TransactionLogIterator> iter;
114
2
    Status status = wal_manager_->GetUpdatesSince(
115
2
        seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
116
2
    EXPECT_OK(status);
117
2
    return iter;
118
2
  }
119
120
  std::unique_ptr<MockEnv> env_;
121
  std::string dbname_;
122
  WriteController write_controller_;
123
  EnvOptions env_options_;
124
  std::shared_ptr<Cache> table_cache_;
125
  DBOptions db_options_;
126
  WriteBuffer write_buffer_;
127
  std::unique_ptr<VersionSet> versions_;
128
  std::unique_ptr<WalManager> wal_manager_;
129
130
  std::unique_ptr<log::Writer> current_log_writer_;
131
  uint64_t current_log_number_;
132
};
133
134
1
TEST_F(WalManagerTest, ReadFirstRecordCache) {
135
1
  Init();
136
1
  std::string path = dbname_ + "/000001.log";
137
1
  unique_ptr<WritableFile> file;
138
1
  ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
139
140
1
  SequenceNumber s;
141
1
  ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s));
142
1
  ASSERT_EQ(s, 0U);
143
144
1
  ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
145
1
  ASSERT_EQ(s, 0U);
146
147
1
  unique_ptr<WritableFileWriter> file_writer(
148
1
      new WritableFileWriter(std::move(file), EnvOptions()));
149
1
  log::Writer writer(std::move(file_writer), 1, db_options_.recycle_log_file_num > 0);
150
1
  WriteBatch batch;
151
1
  batch.Put("foo", "bar");
152
1
  WriteBatchInternal::SetSequence(&batch, 10);
153
1
  ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
154
155
  // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
156
  // Waiting for lei to finish with db_test
157
  // env_->count_sequential_reads_ = true;
158
  // sequential_read_counter_ sanity test
159
  // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
160
161
1
  ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
162
1
  ASSERT_EQ(s, 10U);
163
  // did a read
164
  // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
165
  // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
166
167
1
  ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
168
1
  ASSERT_EQ(s, 10U);
169
  // no new reads since the value is cached
170
  // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
171
  // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
172
1
}
173
174
namespace {
175
1
uint64_t GetLogDirSize(std::string dir_path, Env* env) {
176
1
  uint64_t dir_size = 0;
177
1
  std::vector<std::string> files;
178
1
  EXPECT_OK(env->GetChildren(dir_path, &files));
179
1
  for (auto& f : files) {
180
1
    uint64_t number;
181
1
    FileType type;
182
1
    if (ParseFileName(f, &number, &type) && type == kLogFile) {
183
1
      std::string const file_path = dir_path + "/" + f;
184
1
      uint64_t file_size;
185
1
      EXPECT_OK(env->GetFileSize(file_path, &file_size));
186
1
      dir_size += file_size;
187
1
    }
188
1
  }
189
1
  return dir_size;
190
1
}
191
std::vector<std::uint64_t> ListSpecificFiles(
192
4
    Env* env, const std::string& path, const FileType expected_file_type) {
193
4
  std::vector<std::string> files;
194
4
  std::vector<uint64_t> file_numbers;
195
4
  EXPECT_OK(env->GetChildren(path, &files));
196
4
  uint64_t number;
197
4
  FileType type;
198
44
  for (size_t i = 0; i < files.size(); ++i) {
199
40
    if (ParseFileName(files[i], &number, &type)) {
200
40
      if (type == expected_file_type) {
201
40
        file_numbers.push_back(number);
202
40
      }
203
40
    }
204
40
  }
205
4
  return file_numbers;
206
4
}
207
208
1
int CountRecords(TransactionLogIterator* iter) {
209
1
  int count = 0;
210
1
  SequenceNumber lastSequence = 0;
211
1
  BatchResult res;
212
3
  while (iter->Valid()) {
213
2
    res = iter->GetBatch();
214
2
    EXPECT_TRUE(res.sequence > lastSequence);
215
2
    ++count;
216
2
    lastSequence = res.sequence;
217
2
    EXPECT_OK(iter->status());
218
2
    iter->Next();
219
2
  }
220
1
  return count;
221
1
}
222
}  // namespace
223
224
1
TEST_F(WalManagerTest, WALArchivalSizeLimit) {
225
1
  db_options_.WAL_ttl_seconds = 0;
226
1
  db_options_.WAL_size_limit_MB = 1000;
227
1
  Init();
228
229
  // TEST : Create WalManager with huge size limit and no ttl.
230
  // Create some archived files and call PurgeObsoleteWALFiles().
231
  // Count the archived log files that survived.
232
  // Assert that all of them did.
233
  // Change size limit. Re-open WalManager.
234
  // Assert that archive is not greater than WAL_size_limit_MB after
235
  // PurgeObsoleteWALFiles()
236
  // Set ttl and time_to_check_ to small values. Re-open db.
237
  // Assert that there are no archived logs left.
238
239
1
  std::string archive_dir = ArchivalDirectory(dbname_);
240
1
  CreateArchiveLogs(20, 5000);
241
242
1
  std::vector<std::uint64_t> log_files =
243
1
      ListSpecificFiles(env_.get(), archive_dir, kLogFile);
244
1
  ASSERT_EQ(log_files.size(), 20U);
245
246
1
  db_options_.WAL_size_limit_MB = 8;
247
1
  Reopen();
248
1
  wal_manager_->PurgeObsoleteWALFiles();
249
250
1
  uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
251
1
  ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);
252
253
1
  db_options_.WAL_ttl_seconds = 1;
254
1
  env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
255
1
  Reopen();
256
1
  wal_manager_->PurgeObsoleteWALFiles();
257
258
1
  log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
259
1
  ASSERT_TRUE(log_files.empty());
260
1
}
261
262
1
TEST_F(WalManagerTest, WALArchivalTtl) {
263
1
  db_options_.WAL_ttl_seconds = 1000;
264
1
  Init();
265
266
  // TEST : Create WalManager with a ttl and no size limit.
267
  // Create some archived log files and call PurgeObsoleteWALFiles().
268
  // Assert that files are not deleted
269
  // Reopen db with small ttl.
270
  // Assert that all archived logs was removed.
271
272
1
  std::string archive_dir = ArchivalDirectory(dbname_);
273
1
  CreateArchiveLogs(20, 5000);
274
275
1
  std::vector<uint64_t> log_files =
276
1
      ListSpecificFiles(env_.get(), archive_dir, kLogFile);
277
1
  ASSERT_GT(log_files.size(), 0U);
278
279
1
  db_options_.WAL_ttl_seconds = 1;
280
1
  env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
281
1
  Reopen();
282
1
  wal_manager_->PurgeObsoleteWALFiles();
283
284
1
  log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
285
1
  ASSERT_TRUE(log_files.empty());
286
1
}
287
288
1
TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
289
1
  Init();
290
1
  RollTheLog(false);
291
1
  Put("key1", std::string(1024, 'a'));
292
  // Create a zero record WAL file.
293
1
  RollTheLog(false);
294
1
  RollTheLog(false);
295
296
1
  Put("key2", std::string(1024, 'a'));
297
298
1
  auto iter = OpenTransactionLogIter(0);
299
1
  ASSERT_EQ(2, CountRecords(iter.get()));
300
1
}
301
302
1
TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
303
1
  Init();
304
1
  RollTheLog(false);
305
1
  auto iter = OpenTransactionLogIter(0);
306
  // Check that an empty iterator is returned
307
1
  ASSERT_TRUE(!iter->Valid());
308
1
}
309
310
}  // namespace rocksdb
311
312
13.2k
int main(int argc, char** argv) {
313
13.2k
  ::testing::InitGoogleTest(&argc, argv);
314
13.2k
  return RUN_ALL_TESTS();
315
13.2k
}
316
317
#else
318
#include <stdio.h>
319
320
int main(int argc, char** argv) {
321
  fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
322
  return 0;
323
}
324
325
#endif  // !ROCKSDB_LITE