YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_tailing_iter_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
// Introduction of SyncPoint effectively disabled building and running this test
25
// in Release build.
26
// which is a pity, it is a good test
27
#if !defined(ROCKSDB_LITE)
28
29
#include "yb/rocksdb/db/db_test_util.h"
30
#include "yb/rocksdb/db/forward_iterator.h"
31
#include "yb/rocksdb/port/stack_trace.h"
32
33
#include "yb/util/test_macros.h"
34
35
namespace rocksdb {
36
37
class DBTestTailingIterator : public DBTestBase {
38
 public:
39
17
  DBTestTailingIterator() : DBTestBase("/db_tailing_iterator_test") {}
40
};
41
42
1
TEST_F(DBTestTailingIterator, TailingIteratorSingle) {
43
1
  ReadOptions read_options;
44
1
  read_options.tailing = true;
45
46
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
47
1
  iter->SeekToFirst();
48
1
  ASSERT_TRUE(!iter->Valid());
49
50
  // add a record and check that iter can see it
51
1
  ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
52
1
  iter->SeekToFirst();
53
1
  ASSERT_TRUE(iter->Valid());
54
1
  ASSERT_EQ(iter->key().ToString(), "mirko");
55
56
1
  iter->Next();
57
1
  ASSERT_TRUE(!iter->Valid());
58
1
}
59
60
1
TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) {
61
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
62
1
  ReadOptions read_options;
63
1
  read_options.tailing = true;
64
65
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
66
1
  std::string value(1024, 'a');
67
68
1
  const int num_records = 10000;
69
10.0k
  for (int i = 0; i < num_records; ++i) {
70
10.0k
    char buf[32];
71
10.0k
    snprintf(buf, sizeof(buf), "%016d", i);
72
73
10.0k
    Slice key(buf, 16);
74
10.0k
    ASSERT_OK(Put(1, key, value));
75
76
10.0k
    iter->Seek(key);
77
10.0k
    ASSERT_TRUE(iter->Valid());
78
10.0k
    ASSERT_EQ(iter->key().compare(key), 0);
79
10.0k
  }
80
1
}
81
82
1
TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
83
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
84
1
  ReadOptions read_options;
85
1
  read_options.tailing = true;
86
87
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
88
1
  std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
89
1
  std::string value(1024, 'a');
90
91
1
  const int num_records = 1000;
92
1.00k
  for (int i = 1; i < num_records; ++i) {
93
999
    char buf1[32];
94
999
    char buf2[32];
95
999
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
96
97
999
    Slice key(buf1, 20);
98
999
    ASSERT_OK(Put(1, key, value));
99
100
999
    if (i % 100 == 99) {
101
10
      ASSERT_OK(Flush(1));
102
10
    }
103
104
999
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
105
999
    Slice target(buf2, 20);
106
999
    iter->Seek(target);
107
999
    ASSERT_TRUE(iter->Valid());
108
999
    ASSERT_EQ(iter->key().compare(key), 0);
109
999
    if (i == 1) {
110
1
      itern->SeekToFirst();
111
998
    } else {
112
998
      itern->Next();
113
998
    }
114
999
    ASSERT_TRUE(itern->Valid());
115
999
    ASSERT_EQ(itern->key().compare(key), 0);
116
999
  }
117
1
  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
118
1
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
119
2.00k
  for (int i = 2 * num_records; i > 0; --i) {
120
2.00k
    char buf1[32];
121
2.00k
    char buf2[32];
122
2.00k
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
123
124
2.00k
    Slice key(buf1, 20);
125
2.00k
    ASSERT_OK(Put(1, key, value));
126
127
2.00k
    if (i % 100 == 99) {
128
20
      ASSERT_OK(Flush(1));
129
20
    }
130
131
2.00k
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
132
2.00k
    Slice target(buf2, 20);
133
2.00k
    iter->Seek(target);
134
2.00k
    ASSERT_TRUE(iter->Valid());
135
2.00k
    ASSERT_EQ(iter->key().compare(key), 0);
136
2.00k
  }
137
1
}
138
139
1
TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
140
1
  const uint64_t k150KB = 150 * 1024;
141
1
  Options options;
142
1
  options.write_buffer_size = k150KB;
143
1
  options.max_write_buffer_number = 3;
144
1
  options.min_write_buffer_number_to_merge = 2;
145
1
  CreateAndReopenWithCF({"pikachu"}, options);
146
1
  ReadOptions read_options;
147
1
  read_options.tailing = true;
148
1
  int num_iters, deleted_iters;
149
150
1
  char bufe[32];
151
1
  snprintf(bufe, sizeof(bufe), "00b0%016d", 0);
152
1
  Slice keyu(bufe, 20);
153
1
  read_options.iterate_upper_bound = &keyu;
154
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
155
1
  std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
156
1
  std::unique_ptr<Iterator> iterh(db_->NewIterator(read_options, handles_[1]));
157
1
  std::string value(1024, 'a');
158
1
  bool file_iters_deleted = false;
159
1
  bool file_iters_renewed_null = false;
160
1
  bool file_iters_renewed_copy = false;
161
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
162
3.03k
      "ForwardIterator::SeekInternal:Return", [&](void* arg) {
163
3.03k
        ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
164
3.03k
        ASSERT_TRUE(!file_iters_deleted ||
165
3.03k
                    fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
166
3.03k
      });
167
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
168
3.73k
      "ForwardIterator::Next:Return", [&](void* arg) {
169
3.73k
        ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
170
3.73k
        ASSERT_TRUE(!file_iters_deleted ||
171
3.73k
                    fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
172
3.73k
      });
173
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
174
1
      "ForwardIterator::RenewIterators:Null",
175
48
      [&](void* arg) { file_iters_renewed_null = true; });
176
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
177
1
      "ForwardIterator::RenewIterators:Copy",
178
84
      [&](void* arg) { file_iters_renewed_copy = true; });
179
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
180
1
  const int num_records = 1000;
181
1.00k
  for (int i = 1; i < num_records; ++i) {
182
999
    char buf1[32];
183
999
    char buf2[32];
184
999
    char buf3[32];
185
999
    char buf4[32];
186
999
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
187
999
    snprintf(buf3, sizeof(buf3), "00b0%016d", i * 5);
188
189
999
    Slice key(buf1, 20);
190
999
    ASSERT_OK(Put(1, key, value));
191
999
    Slice keyn(buf3, 20);
192
999
    ASSERT_OK(Put(1, keyn, value));
193
194
999
    if (i % 100 == 99) {
195
10
      ASSERT_OK(Flush(1));
196
10
      ASSERT_OK(dbfull()->TEST_WaitForCompact());
197
10
      if (i == 299) {
198
1
        file_iters_deleted = true;
199
1
      }
200
10
      snprintf(buf4, sizeof(buf4), "00a0%016d", i * 5 / 2);
201
10
      Slice target(buf4, 20);
202
10
      iterh->Seek(target);
203
10
      ASSERT_TRUE(iter->Valid());
204
2.75k
      for (int j = (i + 1) * 5 / 2; j < i * 5; j += 5) {
205
2.74k
        iterh->Next();
206
2.74k
        ASSERT_TRUE(iterh->Valid());
207
2.74k
      }
208
10
      if (i == 299) {
209
1
        file_iters_deleted = false;
210
1
      }
211
10
    }
212
213
999
    file_iters_deleted = true;
214
999
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
215
999
    Slice target(buf2, 20);
216
999
    iter->Seek(target);
217
999
    ASSERT_TRUE(iter->Valid());
218
999
    ASSERT_EQ(iter->key().compare(key), 0);
219
999
    ASSERT_LE(num_iters, 1);
220
999
    if (i == 1) {
221
1
      itern->SeekToFirst();
222
998
    } else {
223
998
      itern->Next();
224
998
    }
225
999
    ASSERT_TRUE(itern->Valid());
226
999
    ASSERT_EQ(itern->key().compare(key), 0);
227
999
    ASSERT_LE(num_iters, 1);
228
999
    file_iters_deleted = false;
229
999
  }
230
1
  ASSERT_TRUE(file_iters_renewed_null);
231
1
  ASSERT_TRUE(file_iters_renewed_copy);
232
1
  iter = 0;
233
1
  itern = 0;
234
1
  iterh = 0;
235
1
  BlockBasedTableOptions table_options;
236
1
  table_options.no_block_cache = true;
237
1
  table_options.block_cache_compressed = nullptr;
238
1
  options.table_factory.reset(NewBlockBasedTableFactory(table_options));
239
1
  ReopenWithColumnFamilies({"default", "pikachu"}, options);
240
1
  read_options.read_tier = kBlockCacheTier;
241
1
  std::unique_ptr<Iterator> iteri(db_->NewIterator(read_options, handles_[1]));
242
1
  char buf5[32];
243
1
  snprintf(buf5, sizeof(buf5), "00a0%016d", (num_records / 2) * 5 - 2);
244
1
  Slice target1(buf5, 20);
245
1
  iteri->Seek(target1);
246
1
  ASSERT_TRUE(iteri->status().IsIncomplete());
247
1
  iteri = 0;
248
249
1
  read_options.read_tier = kReadAllTier;
250
1
  options.table_factory.reset(NewBlockBasedTableFactory());
251
1
  ReopenWithColumnFamilies({"default", "pikachu"}, options);
252
1
  iter.reset(db_->NewIterator(read_options, handles_[1]));
253
2.00k
  for (int i = 2 * num_records; i > 0; --i) {
254
2.00k
    char buf1[32];
255
2.00k
    char buf2[32];
256
2.00k
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
257
258
2.00k
    Slice key(buf1, 20);
259
2.00k
    ASSERT_OK(Put(1, key, value));
260
261
2.00k
    if (i % 100 == 99) {
262
20
      ASSERT_OK(Flush(1));
263
20
    }
264
265
2.00k
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
266
2.00k
    Slice target(buf2, 20);
267
2.00k
    iter->Seek(target);
268
2.00k
    ASSERT_TRUE(iter->Valid());
269
2.00k
    ASSERT_EQ(iter->key().compare(key), 0);
270
2.00k
  }
271
1
}
272
273
1
TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
274
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
275
1
  ReadOptions read_options;
276
1
  read_options.tailing = true;
277
278
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
279
280
  // write a single record, read it using the iterator, then delete it
281
1
  ASSERT_OK(Put(1, "0test", "test"));
282
1
  iter->SeekToFirst();
283
1
  ASSERT_TRUE(iter->Valid());
284
1
  ASSERT_EQ(iter->key().ToString(), "0test");
285
1
  ASSERT_OK(Delete(1, "0test"));
286
287
  // write many more records
288
1
  const int num_records = 10000;
289
1
  std::string value(1024, 'A');
290
291
10.0k
  for (int i = 0; i < num_records; ++i) {
292
10.0k
    char buf[32];
293
10.0k
    snprintf(buf, sizeof(buf), "1%015d", i);
294
295
10.0k
    Slice key(buf, 16);
296
10.0k
    ASSERT_OK(Put(1, key, value));
297
10.0k
  }
298
299
  // force a flush to make sure that no records are read from memtable
300
1
  ASSERT_OK(Flush(1));
301
302
  // skip "0test"
303
1
  iter->Next();
304
305
  // make sure we can read all new records using the existing iterator
306
1
  int count = 0;
307
10.0k
  for (; iter->Valid(); iter->Next(), ++count) ;
308
309
1
  ASSERT_EQ(count, num_records);
310
1
}
311
312
1
TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) {
313
1
  XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip,
314
1
             kSkipNoPrefix);
315
1
  ReadOptions read_options;
316
1
  read_options.tailing = true;
317
318
1
  Options options = CurrentOptions();
319
1
  options.env = env_;
320
1
  options.create_if_missing = true;
321
1
  options.disable_auto_compactions = true;
322
1
  options.prefix_extractor.reset(NewFixedPrefixTransform(2));
323
1
  options.memtable_factory.reset(NewHashSkipListRepFactory(16));
324
1
  DestroyAndReopen(options);
325
1
  CreateAndReopenWithCF({"pikachu"}, options);
326
327
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
328
1
  ASSERT_OK(Put(1, "0101", "test"));
329
330
1
  ASSERT_OK(Flush(1));
331
332
1
  ASSERT_OK(Put(1, "0202", "test"));
333
334
  // Seek(0102) shouldn't find any records since 0202 has a different prefix
335
1
  iter->Seek("0102");
336
1
  ASSERT_TRUE(!iter->Valid());
337
338
1
  iter->Seek("0202");
339
1
  ASSERT_TRUE(iter->Valid());
340
1
  ASSERT_EQ(iter->key().ToString(), "0202");
341
342
1
  iter->Next();
343
1
  ASSERT_TRUE(!iter->Valid());
344
1
  XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0);
345
1
}
346
347
1
TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) {
348
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
349
1
  ReadOptions read_options;
350
1
  read_options.tailing = true;
351
1
  read_options.read_tier = kBlockCacheTier;
352
353
1
  std::string key("key");
354
1
  std::string value("value");
355
356
1
  ASSERT_OK(db_->Put(WriteOptions(), key, value));
357
358
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
359
1
  iter->SeekToFirst();
360
  // we either see the entry or it's not in cache
361
1
  ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
362
363
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
364
1
  iter->SeekToFirst();
365
  // should still be true after compaction
366
1
  ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
367
1
}
368
369
1
TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
370
1
  Options options = CurrentOptions();
371
1
  options.compaction_style = kCompactionStyleUniversal;
372
1
  options.write_buffer_size = 1000;
373
1
  CreateAndReopenWithCF({"pikachu"}, options);
374
375
1
  ReadOptions read_options;
376
1
  read_options.tailing = true;
377
378
1
  const int NROWS = 10000;
379
  // Write rows with keys 00000, 00002, 00004 etc.
380
10.0k
  for (int i = 0; i < NROWS; ++i) {
381
10.0k
    char buf[100];
382
10.0k
    snprintf(buf, sizeof(buf), "%05d", 2*i);
383
10.0k
    std::string key(buf);
384
10.0k
    std::string value("value");
385
10.0k
    ASSERT_OK(db_->Put(WriteOptions(), key, value));
386
10.0k
  }
387
388
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
389
  // Seek to 00001.  We expect to find 00002.
390
1
  std::string start_key = "00001";
391
1
  iter->Seek(start_key);
392
1
  ASSERT_TRUE(iter->Valid());
393
394
1
  std::string found = iter->key().ToString();
395
1
  ASSERT_EQ("00002", found);
396
397
  // Now seek to the same key.  The iterator should remain in the same
398
  // position.
399
1
  iter->Seek(found);
400
1
  ASSERT_TRUE(iter->Valid());
401
1
  ASSERT_EQ(found, iter->key().ToString());
402
1
}
403
404
// Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
405
// Seek() on immutable iterators when target key is >= prev_key and all
406
// iterators, including the memtable iterator, are over the upper bound.
407
1
TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) {
408
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
409
410
1
  const Slice upper_bound("20", 3);
411
1
  ReadOptions read_options;
412
1
  read_options.tailing = true;
413
1
  read_options.iterate_upper_bound = &upper_bound;
414
415
1
  ASSERT_OK(Put(1, "11", "11"));
416
1
  ASSERT_OK(Put(1, "12", "12"));
417
1
  ASSERT_OK(Put(1, "22", "22"));
418
1
  ASSERT_OK(Flush(1));  // flush all those keys to an immutable SST file
419
420
  // Add another key to the memtable.
421
1
  ASSERT_OK(Put(1, "21", "21"));
422
423
1
  std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
424
1
  it->Seek("12");
425
1
  ASSERT_TRUE(it->Valid());
426
1
  ASSERT_EQ("12", it->key().ToString());
427
428
1
  it->Next();
429
  // Not valid since "21" is over the upper bound.
430
1
  ASSERT_FALSE(it->Valid());
431
432
  // This keeps track of the number of times NeedToSeekImmutable() was true.
433
1
  int immutable_seeks = 0;
434
1
  rocksdb::SyncPoint::GetInstance()->SetCallBack(
435
1
      "ForwardIterator::SeekInternal:Immutable",
436
0
      [&](void* arg) { ++immutable_seeks; });
437
438
  // Seek to 13. This should not require any immutable seeks.
439
1
  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
440
1
  it->Seek("13");
441
1
  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
442
443
1
  ASSERT_FALSE(it->Valid());
444
1
  ASSERT_EQ(0, immutable_seeks);
445
1
}
446
447
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSingle) {
448
1
  ReadOptions read_options;
449
1
  read_options.tailing = true;
450
1
  read_options.managed = true;
451
452
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
453
1
  iter->SeekToFirst();
454
1
  ASSERT_TRUE(!iter->Valid());
455
456
  // add a record and check that iter can see it
457
1
  ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
458
1
  iter->SeekToFirst();
459
1
  ASSERT_TRUE(iter->Valid());
460
1
  ASSERT_EQ(iter->key().ToString(), "mirko");
461
462
1
  iter->Next();
463
1
  ASSERT_TRUE(!iter->Valid());
464
1
}
465
466
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorKeepAdding) {
467
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
468
1
  ReadOptions read_options;
469
1
  read_options.tailing = true;
470
1
  read_options.managed = true;
471
472
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
473
1
  std::string value(1024, 'a');
474
475
1
  const int num_records = 10000;
476
10.0k
  for (int i = 0; i < num_records; ++i) {
477
10.0k
    char buf[32];
478
10.0k
    snprintf(buf, sizeof(buf), "%016d", i);
479
480
10.0k
    Slice key(buf, 16);
481
10.0k
    ASSERT_OK(Put(1, key, value));
482
483
10.0k
    iter->Seek(key);
484
10.0k
    ASSERT_TRUE(iter->Valid());
485
10.0k
    ASSERT_EQ(iter->key().compare(key), 0);
486
10.0k
  }
487
1
}
488
489
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToNext) {
490
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
491
1
  ReadOptions read_options;
492
1
  read_options.tailing = true;
493
1
  read_options.managed = true;
494
495
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
496
1
  std::string value(1024, 'a');
497
498
1
  const int num_records = 1000;
499
1.00k
  for (int i = 1; i < num_records; ++i) {
500
999
    char buf1[32];
501
999
    char buf2[32];
502
999
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
503
504
999
    Slice key(buf1, 20);
505
999
    ASSERT_OK(Put(1, key, value));
506
507
999
    if (i % 100 == 99) {
508
10
      ASSERT_OK(Flush(1));
509
10
    }
510
511
999
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
512
999
    Slice target(buf2, 20);
513
999
    iter->Seek(target);
514
999
    ASSERT_TRUE(iter->Valid());
515
999
    ASSERT_EQ(iter->key().compare(key), 0);
516
999
  }
517
2.00k
  for (int i = 2 * num_records; i > 0; --i) {
518
2.00k
    char buf1[32];
519
2.00k
    char buf2[32];
520
2.00k
    snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
521
522
2.00k
    Slice key(buf1, 20);
523
2.00k
    ASSERT_OK(Put(1, key, value));
524
525
2.00k
    if (i % 100 == 99) {
526
20
      ASSERT_OK(Flush(1));
527
20
    }
528
529
2.00k
    snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
530
2.00k
    Slice target(buf2, 20);
531
2.00k
    iter->Seek(target);
532
2.00k
    ASSERT_TRUE(iter->Valid());
533
2.00k
    ASSERT_EQ(iter->key().compare(key), 0);
534
2.00k
  }
535
1
}
536
537
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorDeletes) {
538
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
539
1
  ReadOptions read_options;
540
1
  read_options.tailing = true;
541
1
  read_options.managed = true;
542
543
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
544
545
  // write a single record, read it using the iterator, then delete it
546
1
  ASSERT_OK(Put(1, "0test", "test"));
547
1
  iter->SeekToFirst();
548
1
  ASSERT_TRUE(iter->Valid());
549
1
  ASSERT_EQ(iter->key().ToString(), "0test");
550
1
  ASSERT_OK(Delete(1, "0test"));
551
552
  // write many more records
553
1
  const int num_records = 10000;
554
1
  std::string value(1024, 'A');
555
556
10.0k
  for (int i = 0; i < num_records; ++i) {
557
10.0k
    char buf[32];
558
10.0k
    snprintf(buf, sizeof(buf), "1%015d", i);
559
560
10.0k
    Slice key(buf, 16);
561
10.0k
    ASSERT_OK(Put(1, key, value));
562
10.0k
  }
563
564
  // force a flush to make sure that no records are read from memtable
565
1
  ASSERT_OK(Flush(1));
566
567
  // skip "0test"
568
1
  iter->Next();
569
570
  // make sure we can read all new records using the existing iterator
571
1
  int count = 0;
572
10.0k
  for (; iter->Valid(); iter->Next(), ++count) {
573
10.0k
  }
574
575
1
  ASSERT_EQ(count, num_records);
576
1
}
577
578
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorPrefixSeek) {
579
1
  XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip,
580
1
             kSkipNoPrefix);
581
1
  ReadOptions read_options;
582
1
  read_options.tailing = true;
583
1
  read_options.managed = true;
584
585
1
  Options options = CurrentOptions();
586
1
  options.env = env_;
587
1
  options.create_if_missing = true;
588
1
  options.disable_auto_compactions = true;
589
1
  options.prefix_extractor.reset(NewFixedPrefixTransform(2));
590
1
  options.memtable_factory.reset(NewHashSkipListRepFactory(16));
591
1
  DestroyAndReopen(options);
592
1
  CreateAndReopenWithCF({"pikachu"}, options);
593
594
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
595
1
  ASSERT_OK(Put(1, "0101", "test"));
596
597
1
  ASSERT_OK(Flush(1));
598
599
1
  ASSERT_OK(Put(1, "0202", "test"));
600
601
  // Seek(0102) shouldn't find any records since 0202 has a different prefix
602
1
  iter->Seek("0102");
603
1
  ASSERT_TRUE(!iter->Valid());
604
605
1
  iter->Seek("0202");
606
1
  ASSERT_TRUE(iter->Valid());
607
1
  ASSERT_EQ(iter->key().ToString(), "0202");
608
609
1
  iter->Next();
610
1
  ASSERT_TRUE(!iter->Valid());
611
1
  XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0);
612
1
}
613
614
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorIncomplete) {
615
1
  CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
616
1
  ReadOptions read_options;
617
1
  read_options.tailing = true;
618
1
  read_options.managed = true;
619
1
  read_options.read_tier = kBlockCacheTier;
620
621
1
  std::string key = "key";
622
1
  std::string value = "value";
623
624
1
  ASSERT_OK(db_->Put(WriteOptions(), key, value));
625
626
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
627
1
  iter->SeekToFirst();
628
  // we either see the entry or it's not in cache
629
1
  ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
630
631
1
  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
632
1
  iter->SeekToFirst();
633
  // should still be true after compaction
634
1
  ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
635
1
}
636
637
1
TEST_F(DBTestTailingIterator, ManagedTailingIteratorSeekToSame) {
638
1
  Options options = CurrentOptions();
639
1
  options.compaction_style = kCompactionStyleUniversal;
640
1
  options.write_buffer_size = 1000;
641
1
  CreateAndReopenWithCF({"pikachu"}, options);
642
643
1
  ReadOptions read_options;
644
1
  read_options.tailing = true;
645
1
  read_options.managed = true;
646
647
1
  const int NROWS = 10000;
648
  // Write rows with keys 00000, 00002, 00004 etc.
649
10.0k
  for (int i = 0; i < NROWS; ++i) {
650
10.0k
    char buf[100];
651
10.0k
    snprintf(buf, sizeof(buf), "%05d", 2 * i);
652
10.0k
    std::string key(buf);
653
10.0k
    std::string value("value");
654
10.0k
    ASSERT_OK(db_->Put(WriteOptions(), key, value));
655
10.0k
  }
656
657
1
  std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
658
  // Seek to 00001.  We expect to find 00002.
659
1
  std::string start_key = "00001";
660
1
  iter->Seek(start_key);
661
1
  ASSERT_TRUE(iter->Valid());
662
663
1
  std::string found = iter->key().ToString();
664
1
  ASSERT_EQ("00002", found);
665
666
  // Now seek to the same key.  The iterator should remain in the same
667
  // position.
668
1
  iter->Seek(found);
669
1
  ASSERT_TRUE(iter->Valid());
670
1
  ASSERT_EQ(found, iter->key().ToString());
671
1
}
672
673
1
TEST_F(DBTestTailingIterator, ForwardIteratorVersionProperty) {
674
1
  Options options = CurrentOptions();
675
1
  options.write_buffer_size = 1000;
676
677
1
  ReadOptions read_options;
678
1
  read_options.tailing = true;
679
680
1
  ASSERT_OK(Put("foo", "bar"));
681
682
1
  uint64_t v1, v2, v3, v4;
683
1
  {
684
1
    std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
685
1
    iter->Seek("foo");
686
1
    std::string prop_value;
687
1
    ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
688
1
                                &prop_value));
689
1
    v1 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
690
691
1
    ASSERT_OK(Put("foo1", "bar1"));
692
1
    ASSERT_OK(Flush());
693
694
1
    ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
695
1
                                &prop_value));
696
1
    v2 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
697
698
1
    iter->Seek("f");
699
700
1
    ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
701
1
                                &prop_value));
702
1
    v3 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
703
704
1
    ASSERT_EQ(v1, v2);
705
1
    ASSERT_GT(v3, v2);
706
1
  }
707
708
1
  {
709
1
    std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
710
1
    iter->Seek("foo");
711
1
    std::string prop_value;
712
1
    ASSERT_OK(iter->GetProperty("rocksdb.iterator.super-version-number",
713
1
                                &prop_value));
714
1
    v4 = static_cast<uint64_t>(std::atoi(prop_value.c_str()));
715
1
  }
716
1
  ASSERT_EQ(v3, v4);
717
1
}
718
}  // namespace rocksdb
719
720
#endif  // !defined(ROCKSDB_LITE)
721
722
13.2k
int main(int argc, char** argv) {
723
13.2k
#if !defined(ROCKSDB_LITE)
724
13.2k
  rocksdb::port::InstallStackTraceHandler();
725
13.2k
  ::testing::InitGoogleTest(&argc, argv);
726
13.2k
  return RUN_ALL_TESTS();
727
#else
728
  return 0;
729
#endif
730
13.2k
}