YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_log_iter_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
// Introduction of SyncPoint effectively disabled building and running this test
25
// in Release build.
26
// which is a pity, it is a good test
27
#if !defined(ROCKSDB_LITE)
28
29
#include "yb/rocksdb/db/db_test_util.h"
30
#include "yb/rocksdb/port/stack_trace.h"
31
32
#include "yb/util/test_macros.h"
33
34
namespace rocksdb {
35
36
class DBTestXactLogIterator : public DBTestBase {
37
 public:
38
7
  DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
39
40
  std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
41
48
      const SequenceNumber seq) {
42
48
    unique_ptr<TransactionLogIterator> iter;
43
48
    Status status = dbfull()->GetUpdatesSince(seq, &iter);
44
48
    EXPECT_OK(status);
45
48
    EXPECT_TRUE(iter->Valid());
46
48
    return iter;
47
48
  }
48
};
49
50
namespace {
51
SequenceNumber ReadRecords(
52
    const std::unique_ptr<TransactionLogIterator>& iter,
53
42
    int* const count) {
54
42
  *count = 0;
55
42
  SequenceNumber lastSequence = 0;
56
42
  BatchResult res;
57
2.73k
  while (iter->Valid()) {
58
2.68k
    res = iter->GetBatch();
59
2.68k
    EXPECT_TRUE(res.sequence > lastSequence);
60
2.68k
    ++*count;
61
2.68k
    lastSequence = res.sequence;
62
2.68k
    EXPECT_OK(iter->status());
63
2.68k
    iter->Next();
64
2.68k
  }
65
42
  return res.sequence;
66
42
}
67
68
void ExpectRecords(
69
    const int expected_no_records,
70
37
    const std::unique_ptr<TransactionLogIterator>& iter) {
71
37
  int num_records;
72
37
  ReadRecords(iter, &num_records);
73
37
  ASSERT_EQ(num_records, expected_no_records);
74
37
}
75
}  // namespace
76
77
1
TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
78
5
  do {
79
5
    Options options = OptionsForLogIterTest();
80
5
    DestroyAndReopen(options);
81
5
    CreateAndReopenWithCF({"pikachu"}, options);
82
5
    ASSERT_OK(Put(0, "key1", DummyString(1024)));
83
5
    ASSERT_OK(Put(1, "key2", DummyString(1024)));
84
5
    ASSERT_OK(Put(1, "key2", DummyString(1024)));
85
5
    ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
86
5
    {
87
5
      auto iter = OpenTransactionLogIter(0);
88
5
      ExpectRecords(3, iter);
89
5
    }
90
5
    ReopenWithColumnFamilies({"default", "pikachu"}, options);
91
5
    env_->SleepForMicroseconds(2 * 1000 * 1000);
92
5
    {
93
5
      ASSERT_OK(Put(0, "key4", DummyString(1024)));
94
5
      ASSERT_OK(Put(1, "key5", DummyString(1024)));
95
5
      ASSERT_OK(Put(0, "key6", DummyString(1024)));
96
5
    }
97
5
    {
98
5
      auto iter = OpenTransactionLogIter(0);
99
5
      ExpectRecords(6, iter);
100
5
    }
101
5
  } while (ChangeCompactOptions());
102
1
}
103
104
#ifndef NDEBUG  // sync point is not included with DNDEBUG build
105
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
106
1
  static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
107
1
  static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
108
1
      {"WalManager::GetSortedWalFiles:1",  "WalManager::PurgeObsoleteFiles:1",
109
1
       "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
110
1
      {"WalManager::GetSortedWalsOfType:1",
111
1
       "WalManager::PurgeObsoleteFiles:1",
112
1
       "WalManager::PurgeObsoleteFiles:2",
113
1
       "WalManager::GetSortedWalsOfType:2"}};
114
3
  for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
115
    // Setup sync point dependency to reproduce the race condition of
116
    // a log file moved to archived dir, in the middle of GetSortedWalFiles
117
2
    rocksdb::SyncPoint::GetInstance()->LoadDependency(
118
2
      { { sync_points[test][0], sync_points[test][1] },
119
2
        { sync_points[test][2], sync_points[test][3] },
120
2
      });
121
122
6
    do {
123
6
      rocksdb::SyncPoint::GetInstance()->ClearTrace();
124
6
      rocksdb::SyncPoint::GetInstance()->DisableProcessing();
125
6
      Options options = OptionsForLogIterTest();
126
6
      DestroyAndReopen(options);
127
6
      ASSERT_OK(Put("key1", DummyString(1024)));
128
6
      ASSERT_OK(dbfull()->Flush(FlushOptions()));
129
6
      ASSERT_OK(Put("key2", DummyString(1024)));
130
6
      ASSERT_OK(dbfull()->Flush(FlushOptions()));
131
6
      ASSERT_OK(Put("key3", DummyString(1024)));
132
6
      ASSERT_OK(dbfull()->Flush(FlushOptions()));
133
6
      ASSERT_OK(Put("key4", DummyString(1024)));
134
6
      ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
135
136
6
      {
137
6
        auto iter = OpenTransactionLogIter(0);
138
6
        ExpectRecords(4, iter);
139
6
      }
140
141
6
      rocksdb::SyncPoint::GetInstance()->EnableProcessing();
142
      // trigger async flush, and log move. Well, log move will
143
      // wait until the GetSortedWalFiles:1 to reproduce the race
144
      // condition
145
6
      FlushOptions flush_options;
146
6
      flush_options.wait = false;
147
6
      ASSERT_OK(dbfull()->Flush(flush_options));
148
149
      // "key5" would be written in a new memtable and log
150
6
      ASSERT_OK(Put("key5", DummyString(1024)));
151
6
      {
152
        // this iter would miss "key4" if not fixed
153
6
        auto iter = OpenTransactionLogIter(0);
154
6
        ExpectRecords(5, iter);
155
6
      }
156
6
    } while (ChangeCompactOptions());
157
2
  }
158
1
}
159
#endif
160
161
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
162
5
  do {
163
5
    Options options = OptionsForLogIterTest();
164
5
    DestroyAndReopen(options);
165
5
    ASSERT_OK(Put("key1", DummyString(1024)));
166
5
    auto iter = OpenTransactionLogIter(0);
167
5
    ASSERT_OK(iter->status());
168
5
    ASSERT_TRUE(iter->Valid());
169
5
    iter->Next();
170
5
    ASSERT_TRUE(!iter->Valid());
171
5
    ASSERT_OK(iter->status());
172
5
    ASSERT_OK(Put("key2", DummyString(1024)));
173
5
    iter->Next();
174
5
    ASSERT_OK(iter->status());
175
5
    ASSERT_TRUE(iter->Valid());
176
5
  } while (ChangeCompactOptions());
177
1
}
178
179
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
180
5
  do {
181
5
    Options options = OptionsForLogIterTest();
182
5
    DestroyAndReopen(options);
183
5
    ASSERT_OK(Put("key1", DummyString(1024)));
184
5
    ASSERT_OK(Put("key2", DummyString(1023)));
185
5
    ASSERT_OK(dbfull()->Flush(FlushOptions()));
186
5
    Reopen(options);
187
5
    auto iter = OpenTransactionLogIter(0);
188
5
    ExpectRecords(2, iter);
189
5
  } while (ChangeCompactOptions());
190
1
}
191
192
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
193
5
  do {
194
5
    Options options = OptionsForLogIterTest();
195
5
    DestroyAndReopen(options);
196
5.12k
    for (int i = 0; i < 1024; i++) {
197
5.12k
      ASSERT_OK(Put("key"+ToString(i), DummyString(10)));
198
5.12k
    }
199
5
    ASSERT_OK(dbfull()->Flush(FlushOptions()));
200
    // Corrupt this log to create a gap
201
5
    rocksdb::VectorLogPtr wal_files;
202
5
    ASSERT_OK(dbfull()->GetSortedWalFiles(&wal_files));
203
5
    const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
204
5
    if (mem_env_) {
205
0
      ASSERT_OK(mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2));
206
5
    } else {
207
5
      ASSERT_EQ(0, truncate(logfile_path.c_str(),
208
5
                   wal_files.front()->SizeFileBytes() / 2));
209
5
    }
210
211
    // Insert a new entry to a new log file
212
5
    ASSERT_OK(Put("key1025", DummyString(10)));
213
    // Try to read from the beginning. Should stop before the gap and read less
214
    // than 1025 entries
215
5
    auto iter = OpenTransactionLogIter(0);
216
5
    int count;
217
5
    SequenceNumber last_sequence_read = ReadRecords(iter, &count);
218
5
    ASSERT_LT(last_sequence_read, 1025U);
219
    // Try to read past the gap, should be able to seek to key1025
220
5
    auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
221
5
    ExpectRecords(1, iter2);
222
5
  } while (ChangeCompactOptions());
223
1
}
224
225
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
226
5
  do {
227
5
    Options options = OptionsForLogIterTest();
228
5
    DestroyAndReopen(options);
229
5
    CreateAndReopenWithCF({"pikachu"}, options);
230
5
    WriteBatch batch;
231
5
    batch.Put(handles_[1], "key1", DummyString(1024));
232
5
    batch.Put(handles_[0], "key2", DummyString(1024));
233
5
    batch.Put(handles_[1], "key3", DummyString(1024));
234
5
    batch.Delete(handles_[0], "key2");
235
5
    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
236
5
    ASSERT_OK(Flush(1));
237
5
    ASSERT_OK(Flush(0));
238
5
    ReopenWithColumnFamilies({"default", "pikachu"}, options);
239
5
    ASSERT_OK(Put(1, "key4", DummyString(1024)));
240
5
    auto iter = OpenTransactionLogIter(3);
241
5
    ExpectRecords(2, iter);
242
5
  } while (ChangeCompactOptions());
243
1
}
244
245
1
TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
246
1
  Options options = OptionsForLogIterTest();
247
1
  DestroyAndReopen(options);
248
1
  CreateAndReopenWithCF({"pikachu"}, options);
249
1
  {
250
1
    WriteBatch batch;
251
1
    batch.Put(handles_[1], "key1", DummyString(1024));
252
1
    batch.Put(handles_[0], "key2", DummyString(1024));
253
1
    batch.PutLogData(Slice("blob1"));
254
1
    batch.Put(handles_[1], "key3", DummyString(1024));
255
1
    batch.PutLogData(Slice("blob2"));
256
1
    batch.Delete(handles_[0], "key2");
257
1
    ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
258
1
    ReopenWithColumnFamilies({"default", "pikachu"}, options);
259
1
  }
260
261
1
  auto res = OpenTransactionLogIter(0)->GetBatch();
262
1
  struct Handler : public WriteBatch::Handler {
263
1
    std::string seen;
264
1
    virtual Status PutCF(uint32_t cf, const SliceParts& key,
265
3
                         const SliceParts& value) override {
266
3
      seen += "Put(" + ToString(cf) + ", " + key.TheOnlyPart().ToString() + ", " +
267
3
              ToString(value.TheOnlyPart().size()) + ")";
268
3
      return Status::OK();
269
3
    }
270
1
    virtual Status MergeCF(uint32_t cf, const Slice& key,
271
0
                           const Slice& value) override {
272
0
      seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
273
0
              ToString(value.size()) + ")";
274
0
      return Status::OK();
275
0
    }
276
2
    void LogData(const Slice& blob) override {
277
2
      seen += "LogData(" + blob.ToString() + ")";
278
2
    }
279
1
    Status DeleteCF(uint32_t cf, const Slice& key) override {
280
1
      seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
281
1
      return Status::OK();
282
1
    }
283
1
  } handler;
284
1
  ASSERT_OK(res.writeBatchPtr->Iterate(&handler));
285
1
  ASSERT_EQ(
286
1
      "Put(1, key1, 1024)"
287
1
      "Put(0, key2, 1024)"
288
1
      "LogData(blob1)"
289
1
      "Put(1, key3, 1024)"
290
1
      "LogData(blob2)"
291
1
      "Delete(0, key2)",
292
1
      handler.seen);
293
1
}
294
}  // namespace rocksdb
295
296
#endif  // !defined(ROCKSDB_LITE)
297
298
13.2k
int main(int argc, char** argv) {
299
13.2k
#if !defined(ROCKSDB_LITE)
300
13.2k
  rocksdb::port::InstallStackTraceHandler();
301
13.2k
  ::testing::InitGoogleTest(&argc, argv);
302
13.2k
  return RUN_ALL_TESTS();
303
#else
304
  return 0;
305
#endif
306
13.2k
}