YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/compaction-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <boost/function.hpp>
15
16
#include "yb/client/client.h"
17
#include "yb/client/table_alterer.h"
18
#include "yb/client/transaction_manager.h"
19
#include "yb/client/transaction_pool.h"
20
21
#include "yb/common/common_fwd.h"
22
#include "yb/common/read_hybrid_time.h"
23
#include "yb/common/schema.h"
24
#include "yb/common/snapshot.h"
25
26
#include "yb/consensus/consensus.h"
27
#include "yb/consensus/consensus.pb.h"
28
29
#include "yb/docdb/consensus_frontier.h"
30
#include "yb/docdb/doc_ttl_util.h"
31
32
#include "yb/gutil/integral_types.h"
33
#include "yb/gutil/ref_counted.h"
34
35
#include "yb/integration-tests/mini_cluster.h"
36
#include "yb/integration-tests/test_workload.h"
37
38
#include "yb/master/catalog_entity_info.h"
39
40
#include "yb/rocksdb/db.h"
41
#include "yb/rocksdb/options.h"
42
#include "yb/rocksdb/statistics.h"
43
#include "yb/rocksdb/types.h"
44
#undef TEST_SYNC_POINT
45
#include "yb/rocksdb/util/sync_point.h"
46
47
#include "yb/server/hybrid_clock.h"
48
49
#include "yb/tablet/tablet_fwd.h"
50
#include "yb/tablet/tablet_options.h"
51
#include "yb/tablet/tablet_peer.h"
52
53
#include "yb/tserver/ts_tablet_manager.h"
54
55
#include "yb/util/compare_util.h"
56
#include "yb/util/enums.h"
57
#include "yb/util/monotime.h"
58
#include "yb/util/net/net_fwd.h"
59
#include "yb/util/operation_counter.h"
60
#include "yb/util/result.h"
61
#include "yb/util/size_literals.h"
62
#include "yb/util/strongly_typed_bool.h"
63
#include "yb/util/test_util.h"
64
#include "yb/util/threadpool.h"
65
#include "yb/util/tsan_util.h"
66
67
using namespace std::literals; // NOLINT
68
69
DECLARE_int64(db_write_buffer_size);
70
DECLARE_int32(rocksdb_level0_file_num_compaction_trigger);
71
DECLARE_int32(timestamp_history_retention_interval_sec);
72
DECLARE_bool(tablet_enable_ttl_file_filter);
73
DECLARE_int32(rocksdb_base_background_compactions);
74
DECLARE_int32(rocksdb_max_background_compactions);
75
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
76
DECLARE_bool(file_expiration_ignore_value_ttl);
77
DECLARE_bool(file_expiration_value_ttl_overrides_table_ttl);
78
DECLARE_bool(TEST_disable_adding_user_frontier_to_sst);
79
DECLARE_bool(TEST_disable_getting_user_frontier_from_mem_table);
80
81
namespace yb {
82
83
namespace tserver {
84
85
namespace {
86
87
constexpr auto kWaitDelay = 10ms;
88
constexpr auto kPayloadBytes = 8_KB;
89
constexpr auto kMemStoreSize = 100_KB;
90
constexpr auto kNumTablets = 3;
91
92
93
94
class RocksDbListener : public rocksdb::EventListener {
95
 public:
96
0
  void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo&) override {
97
0
    std::lock_guard<std::mutex> lock(mutex_);
98
0
    ++num_compactions_completed_[db];
99
0
  }
100
101
0
  size_t GetNumCompactionsCompleted(rocksdb::DB* db) {
102
0
    std::lock_guard<std::mutex> lock(mutex_);
103
0
    return num_compactions_completed_[db];
104
0
  }
105
106
0
  void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo&) override {
107
0
    std::lock_guard<std::mutex> lock(mutex_);
108
0
    ++num_flushes_completed_[db];
109
0
  }
110
111
0
  size_t GetNumFlushesCompleted(rocksdb::DB* db) {
112
0
    std::lock_guard<std::mutex> lock(mutex_);
113
0
    return num_flushes_completed_[db];
114
0
  }
115
116
0
  void Reset() {
117
0
    std::lock_guard<std::mutex> lock(mutex_);
118
0
    num_compactions_completed_.clear();
119
0
    num_flushes_completed_.clear();
120
0
  }
121
122
 private:
123
  typedef std::unordered_map<const rocksdb::DB*, size_t> CountByDbMap;
124
125
  std::mutex mutex_;
126
  CountByDbMap num_compactions_completed_ GUARDED_BY(mutex_);
127
  CountByDbMap num_flushes_completed_ GUARDED_BY(mutex_);
128
};
129
130
} // namespace
131
132
class CompactionTest : public YBTest {
133
 public:
134
24
  CompactionTest() {}
135
136
24
  void SetUp() override {
137
24
    YBTest::SetUp();
138
139
24
    ASSERT_OK(clock_->Init());
140
24
    rocksdb_listener_ = std::make_shared<RocksDbListener>();
141
142
    // Start cluster.
143
24
    MiniClusterOptions opts;
144
24
    opts.num_tablet_servers = NumTabletServers();
145
24
    cluster_.reset(new MiniCluster(opts));
146
24
    ASSERT_OK(cluster_->Start());
147
    // These flags should be set after minicluster start, so it wouldn't override them.
148
22
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_db_write_buffer_size) = kMemStoreSize;
149
22
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 3;
150
    // Patch tablet options inside tablet manager, will be applied to newly created tablets.
151
44
    for (int i = 0 ; i < NumTabletServers(); i++) {
152
22
      cluster_->GetTabletManager(i)->TEST_tablet_options()->listeners.push_back(rocksdb_listener_);
153
22
    }
154
155
0
    client_ = ASSERT_RESULT(cluster_->CreateClient());
156
0
    transaction_manager_ = std::make_unique<client::TransactionManager>(
157
0
        client_.get(), clock_, client::LocalTabletFilter());
158
0
    transaction_pool_ = std::make_unique<client::TransactionPool>(
159
0
        transaction_manager_.get(), nullptr /* metric_entity */);
160
0
  }
161
162
13
  void TearDown() override {
163
13
    workload_->StopAndJoin();
164
    // Shutdown client before destroying transaction manager, so we don't have transaction RPCs
165
    // in progress after transaction manager is destroyed.
166
13
    client_->Shutdown();
167
13
    cluster_->Shutdown();
168
13
    YBTest::TearDown();
169
13
  }
170
171
0
  void SetupWorkload(IsolationLevel isolation_level) {
172
0
    workload_.reset(new TestWorkload(cluster_.get()));
173
0
    workload_->set_timeout_allowed(true);
174
0
    workload_->set_payload_bytes(kPayloadBytes);
175
0
    workload_->set_write_batch_size(1);
176
0
    workload_->set_num_write_threads(4);
177
0
    workload_->set_num_tablets(kNumTablets);
178
0
    workload_->set_transactional(isolation_level, transaction_pool_.get());
179
0
    workload_->set_ttl(ttl_to_use());
180
0
    workload_->set_table_ttl(table_ttl_to_use());
181
0
    workload_->Setup();
182
0
  }
183
184
 protected:
185
186
  // -1 implies no ttl.
187
0
  virtual int ttl_to_use() {
188
0
    return -1;
189
0
  }
190
191
  // -1 implies no table ttl.
192
0
  virtual int table_ttl_to_use() {
193
0
    return -1;
194
0
  }
195
196
66
  virtual int NumTabletServers() {
197
66
    return 1;
198
66
  }
199
200
0
  size_t BytesWritten() {
201
0
    return workload_->rows_inserted() * kPayloadBytes;
202
0
  }
203
204
0
  CHECKED_STATUS WriteAtLeast(size_t size_bytes) {
205
0
    workload_->Start();
206
0
    RETURN_NOT_OK(LoggedWaitFor(
207
0
        [this, size_bytes] { return BytesWritten() >= size_bytes; }, 60s,
208
0
        Format("Waiting until we've written at least $0 bytes ...", size_bytes), kWaitDelay));
209
0
    workload_->StopAndJoin();
210
0
    LOG(INFO) << "Wrote " << BytesWritten() << " bytes.";
211
0
    return Status::OK();
212
0
  }
213
214
0
  CHECKED_STATUS WriteAtLeastFilesPerDb(size_t num_files) {
215
0
    auto dbs = GetAllRocksDbs(cluster_.get());
216
0
    workload_->Start();
217
0
    RETURN_NOT_OK(LoggedWaitFor(
218
0
        [this, &dbs, num_files] {
219
0
            for (auto* db : dbs) {
220
0
              if (rocksdb_listener_->GetNumFlushesCompleted(db) < num_files) {
221
0
                return false;
222
0
              }
223
0
            }
224
0
            return true;
225
0
          }, 60s,
226
0
        Format("Waiting until we've written at least $0 files per rocksdb ...", num_files),
227
0
        kWaitDelay * kTimeMultiplier));
228
0
    workload_->StopAndJoin();
229
0
    LOG(INFO) << "Wrote " << BytesWritten() << " bytes.";
230
0
    return Status::OK();
231
0
  }
232
233
0
  CHECKED_STATUS WaitForNumCompactionsPerDb(size_t num_compactions) {
234
0
    auto dbs = GetAllRocksDbs(cluster_.get());
235
0
    RETURN_NOT_OK(LoggedWaitFor(
236
0
        [this, &dbs, num_compactions] {
237
0
            for (auto* db : dbs) {
238
0
              if (rocksdb_listener_->GetNumCompactionsCompleted(db) < num_compactions) {
239
0
                return false;
240
0
              }
241
0
            }
242
0
            return true;
243
0
          }, 60s,
244
0
        Format("Waiting until at least $0 compactions per rocksdb finished...", num_compactions),
245
0
        kWaitDelay * kTimeMultiplier));
246
0
    return Status::OK();
247
0
  }
248
249
0
  CHECKED_STATUS ChangeTableTTL(const client::YBTableName& table_name, int ttl_sec) {
250
0
    RETURN_NOT_OK(client_->TableExists(table_name));
251
0
    auto alterer = client_->NewTableAlterer(table_name);
252
0
    TableProperties table_properties;
253
0
    table_properties.SetDefaultTimeToLive(ttl_sec * MonoTime::kMillisecondsPerSecond);
254
0
    alterer->SetTableProperties(table_properties);
255
0
    return alterer->Alter();
256
0
  }
257
258
0
  CHECKED_STATUS ExecuteManualCompaction() {
259
0
    constexpr int kCompactionTimeoutSec = 60;
260
0
    const auto table_info = VERIFY_RESULT(FindTable(cluster_.get(), workload_->table_name()));
261
0
    return workload_->client().FlushTables(
262
0
      {table_info->id()}, false, kCompactionTimeoutSec, /* compaction */ true);
263
0
  }
264
265
  void TestCompactionAfterTruncate();
266
  void TestCompactionWithoutFrontiers(
267
      const size_t num_without_frontiers,
268
      const size_t num_with_frontiers,
269
      const bool trigger_manual_compaction);
270
271
  std::unique_ptr<MiniCluster> cluster_;
272
  std::unique_ptr<client::YBClient> client_;
273
  server::ClockPtr clock_{new server::HybridClock()};
274
  std::unique_ptr<client::TransactionManager> transaction_manager_;
275
  std::unique_ptr<client::TransactionPool> transaction_pool_;
276
  std::unique_ptr<TestWorkload> workload_;
277
  std::shared_ptr<RocksDbListener> rocksdb_listener_;
278
};
279
280
0
void CompactionTest::TestCompactionAfterTruncate() {
281
  // Write some data before truncate to make sure truncate wouldn't be noop.
282
0
  ASSERT_OK(WriteAtLeast(kMemStoreSize * kNumTablets * 1.2));
283
284
0
  const auto table_info = ASSERT_RESULT(FindTable(cluster_.get(), workload_->table_name()));
285
0
  ASSERT_OK(workload_->client().TruncateTable(table_info->id(), true /* wait */));
286
287
0
  rocksdb_listener_->Reset();
288
  // Write enough to trigger compactions.
289
0
  ASSERT_OK(WriteAtLeastFilesPerDb(FLAGS_rocksdb_level0_file_num_compaction_trigger + 1));
290
291
0
  auto dbs = GetAllRocksDbs(cluster_.get());
292
0
  ASSERT_OK(LoggedWaitFor(
293
0
      [&dbs] {
294
0
        for (auto* db : dbs) {
295
0
          if (db->GetLiveFilesMetaData().size() >
296
0
              implicit_cast<size_t>(FLAGS_rocksdb_level0_file_num_compaction_trigger)) {
297
0
            return false;
298
0
          }
299
0
        }
300
0
        return true;
301
0
      },
302
0
      60s, "Waiting until we have number of SST files not higher than threshold ...", kWaitDelay));
303
0
}
304
305
void CompactionTest::TestCompactionWithoutFrontiers(
306
    const size_t num_without_frontiers,
307
    const size_t num_with_frontiers,
308
0
    const bool trigger_manual_compaction) {
309
  // Write a number of files without frontiers
310
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_adding_user_frontier_to_sst) = true;
311
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_getting_user_frontier_from_mem_table) = true;
312
0
  SetupWorkload(IsolationLevel::SNAPSHOT_ISOLATION);
313
0
  ASSERT_OK(WriteAtLeastFilesPerDb(num_without_frontiers));
314
  // If requested, write a number of files with frontiers second.
315
0
  if (num_with_frontiers > 0) {
316
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_adding_user_frontier_to_sst) = false;
317
0
    rocksdb_listener_->Reset();
318
0
    ASSERT_OK(WriteAtLeastFilesPerDb(num_with_frontiers));
319
0
  }
320
321
  // Trigger manual compaction if requested.
322
0
  if (trigger_manual_compaction) {
323
0
    constexpr int kCompactionTimeoutSec = 60;
324
0
    const auto table_info = ASSERT_RESULT(FindTable(cluster_.get(), workload_->table_name()));
325
0
    ASSERT_OK(workload_->client().FlushTables(
326
0
      {table_info->id()}, false, kCompactionTimeoutSec, /* compaction */ true));
327
0
  }
328
  // Wait for the compaction.
329
0
  auto dbs = GetAllRocksDbs(cluster_.get());
330
0
  ASSERT_OK(LoggedWaitFor(
331
0
      [&dbs, num_without_frontiers, num_with_frontiers] {
332
0
        for (auto* db : dbs) {
333
0
          if (db->GetLiveFilesMetaData().size() >= num_without_frontiers + num_with_frontiers) {
334
0
            return false;
335
0
          }
336
0
        }
337
0
        return true;
338
0
      },
339
0
      60s, "Waiting until we see fewer SST files than were written initially ...", kWaitDelay));
340
0
}
341
342
0
TEST_F(CompactionTest, CompactionAfterTruncate) {
343
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
344
0
  TestCompactionAfterTruncate();
345
0
}
346
347
0
TEST_F(CompactionTest, CompactionAfterTruncateTransactional) {
348
0
  SetupWorkload(IsolationLevel::SNAPSHOT_ISOLATION);
349
0
  TestCompactionAfterTruncate();
350
0
}
351
352
0
TEST_F(CompactionTest, AutomaticCompactionWithoutAnyUserFrontiers) {
353
0
  constexpr int files_without_frontiers = 5;
354
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger)
355
0
      = files_without_frontiers;
356
  // Create all SST files without user frontiers.
357
0
  TestCompactionWithoutFrontiers(files_without_frontiers, 0, false);
358
0
}
359
360
0
TEST_F(CompactionTest, AutomaticCompactionWithSomeUserFrontiers) {
361
0
  constexpr int files_without_frontiers = 1;
362
0
  constexpr int files_with_frontiers = 4;
363
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger)
364
0
      = files_without_frontiers + files_with_frontiers;
365
  // Create only one SST file without user frontiers.
366
0
  TestCompactionWithoutFrontiers(files_without_frontiers, files_with_frontiers, false);
367
0
}
368
369
0
TEST_F(CompactionTest, ManualCompactionWithoutAnyUserFrontiers) {
370
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
371
  // Create all SST files without user frontiers.
372
0
  TestCompactionWithoutFrontiers(5, 0, true);
373
0
}
374
375
0
TEST_F(CompactionTest, ManualCompactionWithSomeUserFrontiers) {
376
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
377
  // Create only one SST file without user frontiers.
378
0
  TestCompactionWithoutFrontiers(1, 5, true);
379
0
}
380
381
0
TEST_F(CompactionTest, ManualCompactionProducesOneFilePerDb) {
382
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
383
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
384
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
385
386
0
  ASSERT_OK(ExecuteManualCompaction());
387
388
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
389
0
  for (auto* db : dbs) {
390
0
    ASSERT_EQ(1, db->GetCurrentVersionNumSSTFiles());
391
0
  }
392
0
}
393
394
0
TEST_F(CompactionTest, FilesOverMaxSizeWithTableTTLDoNotGetAutoCompacted) {
395
0
  #ifndef NDEBUG
396
0
    rocksdb::SyncPoint::GetInstance()->LoadDependency({
397
0
        {"UniversalCompactionPicker::PickCompaction:SkippingCompaction",
398
0
            "CompactionTest::FilesOverMaxSizeDoNotGetAutoCompacted:WaitNoCompaction"}}
399
0
    );
400
0
    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
401
0
  #endif // NDEBUG
402
403
0
  const int kNumFilesToWrite = 10;
404
  // Auto compaction will be triggered once 10 files are written.
405
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = kNumFilesToWrite;
406
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = 10_KB;
407
408
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
409
  // Change the table to have a default time to live.
410
0
  ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000));
411
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesToWrite));
412
413
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
414
0
  TEST_SYNC_POINT("CompactionTest::FilesOverMaxSizeDoNotGetAutoCompacted:WaitNoCompaction");
415
416
0
  for (auto* db : dbs) {
417
0
    ASSERT_GE(db->GetCurrentVersionNumSSTFiles(), kNumFilesToWrite);
418
0
  }
419
420
0
  #ifndef NDEBUG
421
0
    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
422
0
    rocksdb::SyncPoint::GetInstance()->ClearTrace();
423
0
  #endif // NDEBUG
424
0
}
425
426
0
TEST_F(CompactionTest, FilesOverMaxSizeWithTableTTLStillGetManualCompacted) {
427
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
428
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = 10_KB;
429
430
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
431
  // Change the table to have a default time to live.
432
0
  ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000));
433
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
434
435
0
  ASSERT_OK(ExecuteManualCompaction());
436
0
  ASSERT_OK(WaitForNumCompactionsPerDb(1));
437
438
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
439
0
  for (auto* db : dbs) {
440
0
    ASSERT_EQ(db->GetCurrentVersionNumSSTFiles(), 1);
441
0
  }
442
0
}
443
444
0
TEST_F(CompactionTest, MaxFileSizeIgnoredIfNoTableTTL) {
445
0
  const int kNumFilesToWrite = 10;
446
  // Auto compactions will be triggered every kNumFilesToWrite files written.
447
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = kNumFilesToWrite;
448
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = 10_KB;
449
450
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
451
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesToWrite));
452
0
  ASSERT_OK(WaitForNumCompactionsPerDb(1));
453
454
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
455
0
  for (auto* db : dbs) {
456
0
    ASSERT_LT(db->GetCurrentVersionNumSSTFiles(), kNumFilesToWrite);
457
0
  }
458
0
}
459
460
class CompactionTestWithTTL : public CompactionTest {
461
 protected:
462
0
  int ttl_to_use() override {
463
0
    return kTTLSec;
464
0
  }
465
  const int kTTLSec = 1;
466
};
467
468
0
TEST_F(CompactionTestWithTTL, CompactionAfterExpiry) {
469
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
470
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 10;
471
  // Testing compaction without compaction file filtering for TTL expiration.
472
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_tablet_enable_ttl_file_filter) = false;
473
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
474
475
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
476
477
  // Write enough to be short of triggering compactions.
478
0
  ASSERT_OK(WriteAtLeastFilesPerDb(0.8 * FLAGS_rocksdb_level0_file_num_compaction_trigger));
479
0
  size_t size_before_compaction = 0;
480
0
  for (auto* db : dbs) {
481
0
    size_before_compaction += db->GetCurrentVersionSstFilesUncompressedSize();
482
0
  }
483
0
  LOG(INFO) << "size_before_compaction is " << size_before_compaction;
484
485
0
  LOG(INFO) << "Sleeping";
486
0
  SleepFor(MonoDelta::FromSeconds(2 * kTTLSec));
487
488
  // Write enough to trigger compactions.
489
0
  ASSERT_OK(WriteAtLeastFilesPerDb(FLAGS_rocksdb_level0_file_num_compaction_trigger));
490
491
0
  ASSERT_OK(LoggedWaitFor(
492
0
      [&dbs] {
493
0
        for (auto* db : dbs) {
494
0
          if (db->GetLiveFilesMetaData().size() >
495
0
              implicit_cast<size_t>(FLAGS_rocksdb_level0_file_num_compaction_trigger)) {
496
0
            return false;
497
0
          }
498
0
        }
499
0
        return true;
500
0
      },
501
0
      60s, "Waiting until we have number of SST files not higher than threshold ...", kWaitDelay));
502
503
  // Assert that the data size is smaller now.
504
0
  size_t size_after_compaction = 0;
505
0
  for (auto* db : dbs) {
506
0
    size_after_compaction += db->GetCurrentVersionSstFilesUncompressedSize();
507
0
  }
508
0
  LOG(INFO) << "size_after_compaction is " << size_after_compaction;
509
0
  EXPECT_LT(size_after_compaction, size_before_compaction);
510
511
0
  SleepFor(MonoDelta::FromSeconds(2 * kTTLSec));
512
513
0
  constexpr int kCompactionTimeoutSec = 60;
514
0
  const auto table_info = ASSERT_RESULT(FindTable(cluster_.get(), workload_->table_name()));
515
0
  ASSERT_OK(workload_->client().FlushTables(
516
0
    {table_info->id()}, false, kCompactionTimeoutSec, /* compaction */ true));
517
  // Assert that the data size is all wiped up now.
518
0
  size_t size_after_manual_compaction = 0;
519
0
  uint64_t num_sst_files_filtered = 0;
520
0
  for (auto* db : dbs) {
521
0
    size_after_manual_compaction += db->GetCurrentVersionSstFilesUncompressedSize();
522
0
    auto stats = db->GetOptions().statistics;
523
0
    num_sst_files_filtered
524
0
        += stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED);
525
0
  }
526
0
  LOG(INFO) << "size_after_manual_compaction is " << size_after_manual_compaction;
527
0
  EXPECT_EQ(size_after_manual_compaction, 0);
528
0
  EXPECT_EQ(num_sst_files_filtered, 0);
529
0
}
530
531
class CompactionTestWithFileExpiration : public CompactionTest {
532
 public:
533
13
  void SetUp() override {
534
13
    CompactionTest::SetUp();
535
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_tablet_enable_ttl_file_filter) = true;
536
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
537
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_ignore_value_ttl) = false;
538
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_value_ttl_overrides_table_ttl) = false;
539
    // Disable automatic compactions, but continue to allow manual compactions.
540
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_base_background_compactions) = 0;
541
13
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_background_compactions) = 0;
542
13
  }
543
 protected:
544
  size_t GetTotalSizeOfDbs();
545
  uint64_t GetNumFilesInDbs();
546
  uint64_t CountFilteredSSTFiles();
547
  uint64_t CountUnfilteredSSTFiles();
548
  void LogSizeAndFilesInDbs(bool after_compaction);
549
  void WriteRecordsAllExpire();
550
  void AssertNoFilesExpired();
551
  void AssertAllFilesExpired();
552
  bool CheckEachDbHasExactlyNumFiles(size_t num_files);
553
  bool CheckEachDbHasAtLeastNumFiles(size_t num_files);
554
  bool CheckAtLeastFileExpirationsPerDb(size_t num_expirations);
555
0
  int table_ttl_to_use() override {
556
0
    return kTableTTLSec;
557
0
  }
558
  const int kTableTTLSec = 1;
559
};
560
561
0
size_t CompactionTestWithFileExpiration::GetTotalSizeOfDbs() {
562
0
  size_t total_size_dbs = 0;
563
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
564
0
  for (auto* db : dbs) {
565
0
    total_size_dbs += db->GetCurrentVersionSstFilesUncompressedSize();
566
0
  }
567
0
  return total_size_dbs;
568
0
}
569
570
0
uint64_t CompactionTestWithFileExpiration::GetNumFilesInDbs() {
571
0
  uint64_t total_files_dbs = 0;
572
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
573
0
  for (auto* db : dbs) {
574
0
    total_files_dbs += db->GetCurrentVersionNumSSTFiles();
575
0
  }
576
0
  return total_files_dbs;
577
0
}
578
579
0
uint64_t CompactionTestWithFileExpiration::CountFilteredSSTFiles() {
580
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
581
0
  uint64_t num_sst_files_filtered = 0;
582
0
  for (auto* db : dbs) {
583
0
    auto stats = db->GetOptions().statistics;
584
0
    num_sst_files_filtered
585
0
        += stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED);
586
0
  }
587
0
  LOG(INFO) << "Number of filtered SST files: " << num_sst_files_filtered;
588
0
  return num_sst_files_filtered;
589
0
}
590
591
0
uint64_t CompactionTestWithFileExpiration::CountUnfilteredSSTFiles() {
592
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
593
0
  uint64_t num_sst_files_unfiltered = 0;
594
0
  for (auto* db : dbs) {
595
0
    auto stats = db->GetOptions().statistics;
596
0
    num_sst_files_unfiltered
597
0
        += stats->getTickerCount(rocksdb::COMPACTION_FILES_NOT_FILTERED);
598
0
  }
599
0
  LOG(INFO) << "Number of unfiltered SST files: " << num_sst_files_unfiltered;
600
0
  return num_sst_files_unfiltered;
601
0
}
602
603
0
void CompactionTestWithFileExpiration::LogSizeAndFilesInDbs(bool after_compaction = false) {
604
0
  auto size_before_compaction = GetTotalSizeOfDbs();
605
0
  auto files_before_compaction = GetNumFilesInDbs();
606
0
  auto descriptor = after_compaction ? "after compaction" : "before compaction";
607
0
  LOG(INFO) << "Total size " << descriptor << ": " << size_before_compaction <<
608
0
      ", num files: " << files_before_compaction;
609
0
}
610
611
void CompactionTestWithFileExpiration::AssertAllFilesExpired() {
612
  auto size_after_manual_compaction = GetTotalSizeOfDbs();
613
  auto files_after_compaction = GetNumFilesInDbs();
614
  LOG(INFO) << "Total size after compaction: " << size_after_manual_compaction <<
615
      ", num files: " << files_after_compaction;
616
  EXPECT_EQ(size_after_manual_compaction, 0);
617
  EXPECT_EQ(files_after_compaction, 0);
618
  ASSERT_GT(CountFilteredSSTFiles(), 0);
619
}
620
621
void CompactionTestWithFileExpiration::AssertNoFilesExpired() {
622
  auto size_after_manual_compaction = GetTotalSizeOfDbs();
623
  auto files_after_compaction = GetNumFilesInDbs();
624
  LOG(INFO) << "Total size after compaction: " << size_after_manual_compaction <<
625
      ", num files: " << files_after_compaction;
626
  EXPECT_GT(size_after_manual_compaction, 0);
627
  EXPECT_GT(files_after_compaction, 0);
628
  ASSERT_EQ(CountFilteredSSTFiles(), 0);
629
}
630
631
0
bool CompactionTestWithFileExpiration::CheckEachDbHasExactlyNumFiles(size_t num_files) {
632
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
633
0
  for (auto* db : dbs) {
634
0
    if (db->GetCurrentVersionNumSSTFiles() != num_files) {
635
0
      return false;
636
0
    }
637
0
  }
638
0
  return true;
639
0
}
640
641
0
bool CompactionTestWithFileExpiration::CheckEachDbHasAtLeastNumFiles(size_t num_files) {
642
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
643
0
  for (auto* db : dbs) {
644
0
    if (db->GetCurrentVersionNumSSTFiles() < num_files) {
645
0
      return false;
646
0
    }
647
0
  }
648
0
  return true;
649
0
}
650
651
0
bool CompactionTestWithFileExpiration::CheckAtLeastFileExpirationsPerDb(size_t num_expirations) {
652
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
653
0
  for (auto db : dbs) {
654
0
    auto stats = db->GetOptions().statistics;
655
0
    if (stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED) < num_expirations) {
656
0
      return false;
657
0
    }
658
0
  }
659
0
  return true;
660
0
}
661
662
0
void CompactionTestWithFileExpiration::WriteRecordsAllExpire() {
663
  // Disable auto compactions to prevent any files from accidentally expiring early.
664
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
665
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
666
667
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
668
0
  auto size_before_compaction = GetTotalSizeOfDbs();
669
0
  auto files_before_compaction = GetNumFilesInDbs();
670
0
  LOG(INFO) << "Total size before compaction: " << size_before_compaction <<
671
0
      ", num files: " << files_before_compaction;
672
673
0
  LOG(INFO) << "Sleeping long enough to expire all data";
674
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
675
676
0
  ASSERT_OK(ExecuteManualCompaction());
677
  // Assert that the data size is all wiped up now.
678
0
  EXPECT_EQ(GetTotalSizeOfDbs(), 0);
679
0
  EXPECT_EQ(GetNumFilesInDbs(), 0);
680
0
}
681
682
0
TEST_F(CompactionTestWithFileExpiration, CompactionNoFileExpiration) {
683
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_tablet_enable_ttl_file_filter) = false;
684
0
  WriteRecordsAllExpire();
685
0
  ASSERT_GT(CountUnfilteredSSTFiles(), 0);
686
0
  ASSERT_EQ(CountFilteredSSTFiles(), 0);
687
0
}
688
689
0
TEST_F(CompactionTestWithFileExpiration, FileExpirationAfterExpiry) {
690
0
  WriteRecordsAllExpire();
691
0
  ASSERT_GT(CountFilteredSSTFiles(), 0);
692
0
}
693
694
0
TEST_F(CompactionTestWithFileExpiration, ValueTTLOverridesTableTTL) {
695
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
696
  // Set the value-level TTL to too high to expire.
697
0
  workload_->set_ttl(10000000);
698
699
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
700
0
  LogSizeAndFilesInDbs();
701
702
0
  LOG(INFO) << "Sleeping long enough to expire all data if TTL were not increased";
703
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
704
705
0
  ASSERT_OK(ExecuteManualCompaction());
706
  // Assert that the data is not completely removed
707
0
  AssertNoFilesExpired();
708
0
}
709
710
0
TEST_F(CompactionTestWithFileExpiration, ValueTTLWillNotOverrideTableTTLWhenTableOnlyFlagSet) {
711
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_ignore_value_ttl) = true;
712
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
713
  // Set the value-level TTL to too high to expire.
714
0
  workload_->set_ttl(10000000);
715
716
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
717
0
  LogSizeAndFilesInDbs();
718
719
0
  LOG(INFO) << "Sleeping long enough to expire all data (based on table-level TTL)";
720
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
721
722
0
  ASSERT_OK(ExecuteManualCompaction());
723
  // Assert that the data is completely removed (i.e. value-level TTL was ignored)
724
0
  AssertAllFilesExpired();
725
0
}
726
727
0
TEST_F(CompactionTestWithFileExpiration, ValueTTLWillOverrideTableTTLWhenFlagSet) {
728
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
729
  // Change the table TTL to a large value that won't expire.
730
0
  ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000000));
731
  // Set the value-level TTL that will expire.
732
0
  const auto kValueExpiryTimeSec = 1;
733
0
  workload_->set_ttl(kValueExpiryTimeSec);
734
735
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
736
737
0
  LOG(INFO) << "Sleeping long enough to expire all data (based on value-level TTL)";
738
0
  SleepFor(2s * kValueExpiryTimeSec);
739
740
0
  ASSERT_OK(ExecuteManualCompaction());
741
  // Add data will be deleted by compaction, but no files should expire after the
742
  // first compaction (protected by table TTL).
743
0
  EXPECT_EQ(GetTotalSizeOfDbs(), 0);
744
0
  EXPECT_EQ(GetNumFilesInDbs(), 0);
745
0
  ASSERT_EQ(CountFilteredSSTFiles(), 0);
746
747
  // Change the file_expiration_value_ttl_overrides_table_ttl flag and create more files.
748
  // Then, run another compaction and assert that all files have expired.
749
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_value_ttl_overrides_table_ttl) = true;
750
0
  rocksdb_listener_->Reset();
751
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
752
0
  LogSizeAndFilesInDbs();
753
0
  LOG(INFO) << "Sleeping long enough to expire all data (based on value-level TTL)";
754
0
  SleepFor(MonoDelta::FromSeconds(2 * kValueExpiryTimeSec));
755
756
0
  ASSERT_OK(ExecuteManualCompaction());
757
  // Assert that the data is completely removed (i.e. table-level TTL was ignored)
758
0
  AssertAllFilesExpired();
759
0
}
760
761
0
TEST_F(CompactionTestWithFileExpiration, MixedExpiringAndNonExpiring) {
762
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
763
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
764
765
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
766
0
  auto size_before_sleep = GetTotalSizeOfDbs();
767
0
  auto files_before_sleep = GetNumFilesInDbs();
768
0
  LOG(INFO) << "Total size of " << files_before_sleep <<
769
0
      " files that should expire: " << size_before_sleep;
770
771
0
  LOG(INFO) << "Sleeping long enough to expire all data";
772
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
773
774
0
  rocksdb_listener_->Reset();
775
  // Write a file and compact before it expires.
776
0
  ASSERT_OK(WriteAtLeastFilesPerDb(1));
777
0
  ASSERT_OK(ExecuteManualCompaction());
778
  // Assert that the data is not completely removed, but some files expired.
779
0
  size_t size_after_manual_compaction = GetTotalSizeOfDbs();
780
0
  uint64_t files_after_compaction = GetNumFilesInDbs();
781
0
  LOG(INFO) << "Total size of " << files_after_compaction << " files after compaction: "
782
0
      << size_after_manual_compaction;
783
0
  EXPECT_GT(size_after_manual_compaction, 0);
784
0
  EXPECT_LT(size_after_manual_compaction, size_before_sleep);
785
0
  EXPECT_GT(files_after_compaction, 0);
786
0
  EXPECT_LT(files_after_compaction, files_before_sleep);
787
0
  ASSERT_GT(CountFilteredSSTFiles(), 0);
788
0
}
789
790
0
TEST_F(CompactionTestWithFileExpiration, FileThatNeverExpires) {
791
0
  const int kNumFilesToWrite = 10;
792
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
793
794
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesToWrite));
795
0
  LogSizeAndFilesInDbs();
796
797
0
  LOG(INFO) << "Sleeping to expire files";
798
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
799
800
  // Set workload TTL to not expire.
801
0
  workload_->set_ttl(docdb::kResetTTL);
802
0
  rocksdb_listener_->Reset();
803
0
  ASSERT_OK(WriteAtLeastFilesPerDb(1));
804
0
  ASSERT_OK(ExecuteManualCompaction());
805
806
0
  auto filtered_sst_files = CountFilteredSSTFiles();
807
0
  ASSERT_GT(filtered_sst_files, 0);
808
809
  // Write 10 more files that would expire if not for the non-expiring file previously written.
810
0
  rocksdb_listener_->Reset();
811
0
  workload_->set_ttl(-1);
812
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesToWrite));
813
814
0
  LOG(INFO) << "Sleeping to expire files";
815
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
816
0
  ASSERT_OK(ExecuteManualCompaction());
817
818
  // Assert that there is still some data remaining, and that we haven't filtered any new files.
819
0
  auto size_after_manual_compaction = GetTotalSizeOfDbs();
820
0
  auto files_after_compaction = GetNumFilesInDbs();
821
0
  LOG(INFO) << "Total size after compaction: " << size_after_manual_compaction <<
822
0
      ", num files: " << files_after_compaction;
823
0
  EXPECT_GT(size_after_manual_compaction, 0);
824
0
  EXPECT_GT(files_after_compaction, 0);
825
0
  ASSERT_EQ(filtered_sst_files, CountFilteredSSTFiles());
826
0
}
827
828
0
TEST_F(CompactionTestWithFileExpiration, ShouldNotExpireDueToHistoryRetention) {
829
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 1000000;
830
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
831
832
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
833
0
  LogSizeAndFilesInDbs();
834
835
0
  LOG(INFO) << "Sleeping to expire files according to TTL (history retention prevents deletion)";
836
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
837
838
0
  ASSERT_OK(ExecuteManualCompaction());
839
  // Assert that there is still data after compaction, and no SST files have been filtered.
840
0
  AssertNoFilesExpired();
841
0
}
842
843
0
TEST_F(CompactionTestWithFileExpiration, TableTTLChangesWillChangeWhetherFilesExpire) {
844
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
845
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
846
  // Change the table TTL to a large value that won't expire.
847
0
  ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000000));
848
849
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
850
0
  LogSizeAndFilesInDbs();
851
852
0
  LOG(INFO) << "Sleeping for the original table TTL seconds "
853
0
      << "(would expire if table TTL weren't changed)";
854
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
855
856
0
  ASSERT_OK(ExecuteManualCompaction());
857
858
  // Assert the data hasn't changed, as we don't expect any expirations.
859
0
  AssertNoFilesExpired();
860
861
  // Change the table TTL back to a small value and execute a manual compaction.
862
0
  ASSERT_OK(ChangeTableTTL(workload_->table_name(), kTableTTLSec));
863
864
0
  rocksdb_listener_->Reset();
865
0
  ASSERT_OK(WriteAtLeastFilesPerDb(10));
866
867
0
  LOG(INFO) << "Sleeping for the original table TTL seconds (will now expire rows)";
868
0
  SleepFor(MonoDelta::FromSeconds(2 * kTableTTLSec));
869
870
0
  ASSERT_OK(ExecuteManualCompaction());
871
  // Assert data has expired.
872
0
  AssertAllFilesExpired();
873
0
}
874
875
0
TEST_F(CompactionTestWithFileExpiration, FewerFilesThanCompactionTriggerCanExpire) {
876
  // Set the number of files required to trigger compactions too high to initially trigger.
877
0
  const int kNumFilesTriggerCompaction = 10;
878
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = 1_KB;
879
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger)
880
0
      = kNumFilesTriggerCompaction;
881
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
882
  // Write fewer files than are required to trigger an auto compaction.
883
  // These will be the only files that will be eligible for expiration.
884
0
  ASSERT_OK(WriteAtLeastFilesPerDb(1));
885
0
  LogSizeAndFilesInDbs();
886
887
0
  LOG(INFO) << "Sleeping for table TTL seconds";
888
0
  SleepFor(2s * kTableTTLSec);
889
890
  // Write enough files to trigger an automatic compaction.
891
0
  rocksdb_listener_->Reset();
892
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesTriggerCompaction));
893
894
0
  LogSizeAndFilesInDbs(true);
895
  // Verify that at least one file has expired per DB.
896
0
  ASSERT_TRUE(CheckAtLeastFileExpirationsPerDb(1));
897
0
}
898
899
// In the past, we have observed behavior of one disporportionately large file
900
// being unable to be directly deleted after it expires (and preventing subsequent
901
// files from also being deleted). This test verifies that large files will not
902
// prevent expiration.
903
0
TEST_F(CompactionTestWithFileExpiration, LargeFileDoesNotPreventExpiration) {
904
0
  const int kNumFilesTriggerCompaction = 10;
905
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger)
906
0
      = kNumFilesTriggerCompaction;
907
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
908
  // Write a disporportionately large amount of data, then compact into one file.
909
0
  ASSERT_OK(WriteAtLeast(1000_KB));
910
0
  ASSERT_OK(ExecuteManualCompaction());
911
0
  LogSizeAndFilesInDbs();
912
0
  ASSERT_TRUE(CheckEachDbHasExactlyNumFiles(1));
913
0
  const auto files_compacted_without_expiration = CountUnfilteredSSTFiles();
914
915
  // Add a flag to limit file size for compaction, then write several more files.
916
  // At this point, there will be one large ~1000_KB file, followed by several files
917
  // ~1_KB large. None of these files will be included in normal compactions
918
  // (but all are eligible for deletion).
919
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = 1_KB;
920
0
  rocksdb_listener_->Reset();
921
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesTriggerCompaction));
922
923
0
  LOG(INFO) << "Sleeping for table TTL seconds";
924
0
  SleepFor(2s * kTableTTLSec);
925
926
  // Write enough files to trigger an auto compaction, even though all are too large
927
  // to be considered for normal compaction.
928
0
  rocksdb_listener_->Reset();
929
0
  ASSERT_OK(WriteAtLeastFilesPerDb(kNumFilesTriggerCompaction));
930
931
0
  LogSizeAndFilesInDbs(true);
932
  // Check that 1 or more files have expired per database.
933
0
  ASSERT_TRUE(CheckAtLeastFileExpirationsPerDb(1));
934
  // Verify that no files have been compacted other than the manual compaction and deletions.
935
0
  ASSERT_EQ(CountUnfilteredSSTFiles(), files_compacted_without_expiration);
936
0
}
937
938
class FileExpirationWithRF3 : public CompactionTestWithFileExpiration {
939
 public:
940
2
  void SetUp() override {
941
2
    CompactionTestWithFileExpiration::SetUp();
942
2
  }
943
 protected:
944
  bool AllFilesHaveTTLMetadata();
945
  void WaitUntilAllCommittedOpsApplied(const MonoDelta timeout);
946
  void ExpirationWhenReplicated(bool withValueTTL);
947
2
  int NumTabletServers() override {
948
2
    return 3;
949
2
  }
950
0
  int ttl_to_use() override {
951
0
    return kTTLSec;
952
0
  }
953
  const int kTTLSec = 1;
954
};
955
956
0
bool FileExpirationWithRF3::AllFilesHaveTTLMetadata() {
957
0
  auto dbs = GetAllRocksDbs(cluster_.get(), false);
958
0
  for (auto* db : dbs) {
959
0
    auto metas = db->GetLiveFilesMetaData();
960
0
    for (auto file : metas) {
961
0
      const docdb::ConsensusFrontier largest =
962
0
          down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier);
963
0
      auto max_ttl_expiry = largest.max_value_level_ttl_expiration_time();
964
      // If value is not valid, then it wasn't initialized.
965
      // If value is kInitial, then the table-level TTL will be used (no value metadata).
966
0
      if (!max_ttl_expiry.is_valid() || max_ttl_expiry == HybridTime::kInitial) {
967
0
        return false;
968
0
      }
969
0
    }
970
0
  }
971
0
  return true;
972
0
}
973
974
0
void FileExpirationWithRF3::WaitUntilAllCommittedOpsApplied(const MonoDelta timeout) {
975
0
  const auto completion_deadline = MonoTime::Now() + timeout;
976
0
  for (auto& peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) {
977
0
    auto consensus = peer->shared_consensus();
978
0
    if (consensus) {
979
0
      ASSERT_OK(Wait([consensus]() -> Result<bool> {
980
0
        return consensus->GetLastAppliedOpId() >= consensus->GetLastCommittedOpId();
981
0
      }, completion_deadline, "Waiting for all committed ops to be applied"));
982
0
    }
983
0
  }
984
0
}
985
986
0
void FileExpirationWithRF3::ExpirationWhenReplicated(bool withValueTTL) {
987
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
988
0
  SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
989
0
  if (withValueTTL) {
990
    // Change the table TTL to a large value that won't expire.
991
0
    ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000000));
992
0
  } else {
993
    // Set workload to not have value TTL.
994
0
    workload_->set_ttl(-1);
995
0
  }
996
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_value_ttl_overrides_table_ttl) = withValueTTL;
997
998
0
  ASSERT_OK(WriteAtLeastFilesPerDb(5));
999
0
  WaitUntilAllCommittedOpsApplied(15s);
1000
0
  ASSERT_EQ(AllFilesHaveTTLMetadata(), withValueTTL);
1001
1002
0
  LOG(INFO) << "Sleeping to expire files according to value TTL";
1003
0
  auto timeToSleep = 2 * (withValueTTL ? kTTLSec : kTableTTLSec);
1004
0
  SleepFor(MonoDelta::FromSeconds(timeToSleep));
1005
1006
0
  ASSERT_OK(ExecuteManualCompaction());
1007
  // Assert that all data has been deleted, and that we're filtering SST files.
1008
0
  AssertAllFilesExpired();
1009
0
}
1010
1011
TEST_F_EX(
1012
0
    CompactionTestWithFileExpiration, ReplicatedMetadataCanExpireFile, FileExpirationWithRF3) {
1013
0
  ExpirationWhenReplicated(true);
1014
0
}
1015
1016
TEST_F_EX(
1017
0
    CompactionTestWithFileExpiration, ReplicatedNoMetadataUsesTableTTL, FileExpirationWithRF3) {
1018
0
  ExpirationWhenReplicated(false);
1019
0
}
1020
1021
} // namespace tserver
1022
} // namespace yb