YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/flush-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <chrono>
15
#include <regex>
16
17
#include "yb/client/table.h"
18
19
#include "yb/common/common.pb.h"
20
21
#include "yb/integration-tests/mini_cluster.h"
22
#include "yb/integration-tests/test_workload.h"
23
24
#include "yb/rocksdb/db/db_impl.h"
25
#include "yb/rocksdb/memory_monitor.h"
26
#include "yb/rocksdb/util/testutil.h"
27
28
#include "yb/rpc/rpc_fwd.h"
29
30
#include "yb/tablet/tablet.h"
31
#include "yb/tablet/tablet_peer.h"
32
33
#include "yb/tserver/mini_tablet_server.h"
34
#include "yb/tserver/tablet_memory_manager.h"
35
#include "yb/tserver/tablet_server.h"
36
#include "yb/tserver/ts_tablet_manager.h"
37
38
#include "yb/util/logging.h"
39
#include "yb/util/status_format.h"
40
#include "yb/util/status_log.h"
41
#include "yb/util/test_util.h"
42
43
using namespace std::literals;
44
45
DECLARE_bool(flush_rocksdb_on_shutdown);
46
DECLARE_int64(global_memstore_size_percentage);
47
DECLARE_int64(global_memstore_size_mb_max);
48
DECLARE_int32(memstore_size_mb);
49
DECLARE_int32(rocksdb_level0_file_num_compaction_trigger);
50
DECLARE_int32(rocksdb_max_background_flushes);
51
52
namespace yb {
53
namespace client {
54
class YBTableName;
55
}
56
57
namespace tserver {
58
59
namespace {
60
61
constexpr auto kWaitDelay = 10ms;
62
63
class RocksDbListener : public rocksdb::test::FlushedFileCollector {
64
 public:
65
0
  void OnCompactionStarted() override {
66
0
    ++num_compactions_started_;
67
0
  }
68
69
0
  int GetNumCompactionsStarted() { return num_compactions_started_; }
70
71
 private:
72
  std::atomic<int> num_compactions_started_;
73
};
74
75
0
Result<TabletId> GetTabletIdFromSstFilename(const std::string& filename) {
76
0
  static std::regex re(R"#(.*/tablet-([^/]+)/\d+\.sst)#");
77
0
  std::smatch match;
78
0
  if (std::regex_search(filename, match, re)) {
79
0
    return match.str(1);
80
0
  } else {
81
0
    return STATUS_FORMAT(InvalidArgument, "Cannot parse tablet id from SST filename: $0", filename);
82
0
  }
83
0
}
84
85
class TabletManagerListener : public tserver::TabletMemoryManagerListenerIf {
86
 public:
87
0
  void StartedFlush(const TabletId& tablet_id) override {
88
0
    std::lock_guard<std::mutex> lock(mutex_);
89
0
    flushed_tablets_.push_back(tablet_id);
90
0
  }
91
92
0
  std::vector<TabletId> GetFlushedTablets() {
93
0
    std::lock_guard<std::mutex> lock(mutex_);
94
0
    return flushed_tablets_;
95
0
  }
96
97
 private:
98
  std::mutex mutex_;
99
  std::vector<TabletId> flushed_tablets_;
100
};
101
102
} // namespace
103
104
class FlushITest : public YBTest {
105
 public:
106
3
  FlushITest() {}
107
108
3
  void SetUp() override {
109
3
    HybridTime::TEST_SetPrettyToString(true);
110
3
    FLAGS_memstore_size_mb = kOverServerLimitMB;
111
    // Set the global memstore to kServerLimitMB.
112
3
    FLAGS_global_memstore_size_percentage = 100;
113
3
    FLAGS_global_memstore_size_mb_max = kServerLimitMB;
114
3
    YBTest::SetUp();
115
116
3
    rocksdb_listener_ = std::make_shared<RocksDbListener>();
117
3
    tablet_manager_listener_ = std::make_shared<TabletManagerListener>();
118
119
    // Start cluster
120
3
    MiniClusterOptions opts;
121
3
    opts.num_tablet_servers = 1;
122
3
    cluster_.reset(new MiniCluster(opts));
123
3
    ASSERT_OK(cluster_->Start());
124
    // Patch tablet options inside tablet manager, will be applied to new created tablets.
125
3
    cluster_->GetTabletManager(0)->TEST_tablet_options()->listeners.push_back(
126
3
        rocksdb_listener_);
127
    // Patch tablet options inside mini tablet server, will use for new tablet server after
128
    // minicluster restart.
129
3
    cluster_->mini_tablet_server(0)->options()->listeners.push_back(rocksdb_listener_);
130
3
    SetupCluster();
131
3
    SetupWorkload();
132
3
  }
133
134
1
  void TearDown() override {
135
1
    workload_->StopAndJoin();
136
1
    cluster_->Shutdown();
137
1
    YBTest::TearDown();
138
1
  }
139
140
3
  void SetupCluster() {
141
3
    cluster_->GetTabletManager(0)->tablet_memory_manager()->TEST_listeners.push_back(
142
3
        tablet_manager_listener_);
143
3
  }
144
145
  void SetupWorkload(
146
3
      const client::YBTableName& table_name = TestWorkloadOptions::kDefaultTableName) {
147
3
    workload_.reset(new TestWorkload(cluster_.get()));
148
3
    workload_->set_table_name(table_name);
149
3
    workload_->set_timeout_allowed(true);
150
3
    workload_->set_payload_bytes(kPayloadBytes);
151
3
    workload_->set_write_batch_size(1);
152
3
    workload_->set_num_write_threads(4);
153
3
    workload_->set_num_tablets(kNumTablets);
154
3
    workload_->Setup();
155
3
  }
156
157
 protected:
158
0
  size_t TotalBytesFlushed() {
159
0
    size_t bytes = 0;
160
0
    auto tablet_peers = cluster_->GetTabletPeers(0);
161
0
    for (auto& peer : tablet_peers) {
162
0
      bytes += peer->tablet()->regulardb_statistics()->getTickerCount(rocksdb::FLUSH_WRITE_BYTES);
163
0
    }
164
0
    return bytes;
165
0
  }
166
167
0
  size_t BytesWritten() {
168
0
    return workload_->rows_inserted() * kPayloadBytes;
169
0
  }
170
171
0
  void WriteAtLeast(size_t size_bytes) {
172
0
    workload_->Start();
173
0
    ASSERT_OK(LoggedWaitFor(
174
0
        [this, size_bytes] { return BytesWritten() >= size_bytes; }, 60s,
175
0
        Format("Waiting until we've written at least $0 bytes ...", size_bytes), kWaitDelay));
176
0
    workload_->StopAndJoin();
177
0
    LOG(INFO) << "Wrote " << BytesWritten() << " bytes.";
178
0
  }
179
180
0
  client::YBTableName GetTableName(int i) {
181
0
    auto table_name = TestWorkloadOptions::kDefaultTableName;
182
0
    table_name.set_table_name(Format("$0_$1", table_name.table_name(), i));
183
0
    return table_name;
184
0
  }
185
186
0
  int NumRunningFlushes() {
187
0
    int compactions = 0;
188
0
    for (auto& peer : cluster_->GetTabletPeers(0)) {
189
0
      auto* db = pointer_cast<rocksdb::DBImpl*>(peer->tablet()->TEST_db());
190
0
      if (db) {
191
0
        compactions += db->TEST_NumRunningFlushes();
192
0
      }
193
0
    }
194
0
    return compactions;
195
0
  }
196
197
0
  void WriteUntilCompaction() {
198
0
    int num_compactions_started = rocksdb_listener_->GetNumCompactionsStarted();
199
0
    workload_->Start();
200
0
    ASSERT_OK(LoggedWaitFor(
201
0
        [this, num_compactions_started] {
202
0
          return rocksdb_listener_->GetNumCompactionsStarted() > num_compactions_started;
203
0
        }, 60s, "Waiting until we've written to start compaction ...", kWaitDelay));
204
0
    workload_->StopAndJoin();
205
0
    LOG(INFO) << "Wrote " << BytesWritten() << " bytes.";
206
0
    ASSERT_OK(LoggedWaitFor(
207
0
        [this] {
208
0
          return NumTotalRunningCompactions(cluster_.get()) == 0 && NumRunningFlushes() == 0;
209
0
        }, 60s, "Waiting until compactions and flushes are done ...", kWaitDelay));
210
0
  }
211
212
0
  void AddTabletsWithNonEmptyMemTable(std::unordered_map<TabletId, int>* tablets, int order) {
213
0
    for (auto& peer : cluster_->GetTabletPeers(0)) {
214
0
      const auto tablet_id = peer->tablet_id();
215
0
      if (tablets->count(tablet_id) == 0) {
216
0
        auto* db = pointer_cast<rocksdb::DBImpl*>(peer->tablet()->TEST_db());
217
0
        if (db) {
218
0
          auto* cf = pointer_cast<rocksdb::ColumnFamilyHandleImpl*>(db->DefaultColumnFamily());
219
0
          if (cf->cfd()->mem()->num_entries() > 0) {
220
0
            tablets->insert(std::make_pair(tablet_id, order));
221
0
          }
222
0
        }
223
0
      }
224
0
    }
225
0
  }
226
227
0
  MemoryMonitor* memory_monitor() {
228
0
    return cluster_->GetTabletManager(0)->memory_monitor();
229
0
  }
230
231
0
  size_t GetRocksDbMemoryUsage() {
232
0
    return memory_monitor()->memory_usage();
233
0
  }
234
235
0
  void DumpMemoryUsage() {
236
0
    auto* server = cluster_->mini_tablet_server(0)->server();
237
0
    LOG(INFO) << server->mem_tracker()->FindChild("Tablets")->LogUsage("MEM ");
238
0
    LOG(INFO) << "rocksdb memory usage: " << GetRocksDbMemoryUsage();
239
0
  }
240
241
  void TestFlushPicksOldestInactiveTabletAfterCompaction(bool with_restart);
242
243
  const int32_t kServerLimitMB = 2;
244
  // Used to set memstore limit to value higher than server limit, so flushes are only being
245
  // triggered by memory monitor which we want to test.
246
  const int32_t kOverServerLimitMB = kServerLimitMB * 10;
247
  const int kNumTablets = 3;
248
  const size_t kPayloadBytes = 8_KB;
249
  std::unique_ptr<MiniCluster> cluster_;
250
  std::unique_ptr<TestWorkload> workload_;
251
  std::shared_ptr<RocksDbListener> rocksdb_listener_;
252
  std::shared_ptr<TabletManagerListener> tablet_manager_listener_;
253
};
254
255
0
TEST_F(FlushITest, TestFlushHappens) {
256
0
  const size_t flushed_before_writes = TotalBytesFlushed();
257
0
  WriteAtLeast((kServerLimitMB * 1_MB) + 1);
258
0
  ASSERT_OK(WaitFor(
259
0
      [this, flushed_before_writes] {
260
0
        return TotalBytesFlushed() > flushed_before_writes; },
261
0
      60s, "Flush", 10ms));
262
0
  const size_t flushed_after_writes = TotalBytesFlushed() - flushed_before_writes;
263
0
  LOG(INFO) << "Flushed " << flushed_after_writes << " bytes.";
264
0
  ASSERT_GT(flushed_after_writes, 0);
265
0
}
266
267
0
void FlushITest::TestFlushPicksOldestInactiveTabletAfterCompaction(bool with_restart) {
268
  // Trigger compaction early.
269
0
  FLAGS_rocksdb_level0_file_num_compaction_trigger = 2;
270
271
  // Set memstore limit to 1/nth of server limit so we can cause memory usage with enough
272
  // granularity.
273
0
  FLAGS_memstore_size_mb = kServerLimitMB / 5;
274
0
  std::unordered_map<TabletId, int> inactive_tablets_to_flush;
275
276
  // Write to tables until compaction started and until we occupy 50% of kServerLimitMB by
277
  // memtables.
278
0
  int tables = 1; // First empty table is created by test setup.
279
0
  while (GetRocksDbMemoryUsage() < kServerLimitMB * 1_MB / 2) {
280
0
    SetupWorkload(GetTableName(tables));
281
0
    WriteUntilCompaction();
282
0
    AddTabletsWithNonEmptyMemTable(&inactive_tablets_to_flush, tables);
283
0
    tables++;
284
0
  }
285
286
0
  LOG(INFO) << "Tablets to flush: " << inactive_tablets_to_flush.size();
287
288
0
  std::unordered_set<TabletId> inactive_tablets;
289
0
  for (auto& peer : cluster_->GetTabletPeers(0)) {
290
0
    inactive_tablets.insert(peer->tablet_id());
291
0
  }
292
293
0
  DumpMemoryUsage();
294
295
0
  if (with_restart) {
296
    // We want to restore memtables from log on restart to test how memory monitor based flush
297
    // works after restart.
298
0
    FLAGS_flush_rocksdb_on_shutdown = false;
299
0
    LOG(INFO) << "Restarting cluster ...";
300
0
    ASSERT_OK(cluster_->RestartSync());
301
0
    LOG(INFO) << "Cluster has been restarted";
302
0
    SetupCluster();
303
0
    DumpMemoryUsage();
304
0
  }
305
306
0
  rocksdb_listener_->Clear();
307
308
0
  FLAGS_memstore_size_mb = kOverServerLimitMB;
309
0
  SetupWorkload(GetTableName(tables));
310
0
  tables++;
311
0
  WriteAtLeast((kServerLimitMB * 1_MB) + 1);
312
313
0
  DumpMemoryUsage();
314
315
0
  ASSERT_OK(LoggedWaitFor(
316
0
      [this] { return !memory_monitor()->Exceeded(); }, 30s,
317
0
      "Waiting until memory is freed by flushes...", kWaitDelay));
318
319
0
  LOG(INFO) << "Tables: " << tables;
320
321
0
  ERROR_NOT_OK(
322
0
      LoggedWaitFor(
323
0
          [this, &inactive_tablets_to_flush] {
324
0
            const auto flushed_tablets = tablet_manager_listener_->GetFlushedTablets().size();
325
0
            YB_LOG_EVERY_N_SECS(INFO, 5) << "Flushed tablets: " << flushed_tablets;
326
0
            return flushed_tablets >= inactive_tablets_to_flush.size();
327
0
          }, 30s,
328
0
          "Waiting until flush started for all inactive tablets with non-empty memtable ...",
329
0
          kWaitDelay),
330
0
      "Flush wasn't started for some inactive tablets with non-empty memtable");
331
332
0
  auto flushed_tablets = tablet_manager_listener_->GetFlushedTablets();
333
0
  LOG(INFO) << "Flushed tablets: " << flushed_tablets.size();
334
335
0
  {
336
0
    std::unordered_set<TabletId> flushed_tablets_not_completed(
337
0
        flushed_tablets.begin(), flushed_tablets.end());
338
0
    ERROR_NOT_OK(
339
0
        LoggedWaitFor(
340
0
            [&] {
341
0
              for (const auto& filename : rocksdb_listener_->GetAndClearFlushedFiles()) {
342
0
                const auto tablet_id = CHECK_RESULT(GetTabletIdFromSstFilename(filename));
343
0
                flushed_tablets_not_completed.erase(tablet_id);
344
0
              }
345
0
              return flushed_tablets_not_completed.size() == 0;
346
0
            }, 30s, "Waiting for flushes to complete ...", kWaitDelay),
347
0
        "Some flushes weren't completed");
348
0
    ASSERT_EQ(flushed_tablets_not_completed.size(), 0) << "Flush wasn't completed for tablets: "
349
0
        << yb::ToString(flushed_tablets_not_completed);
350
0
  }
351
352
0
  int current_order = 0;
353
0
  for (const auto& tablet_id : flushed_tablets) {
354
0
    auto iter = inactive_tablets_to_flush.find(tablet_id);
355
0
    if (iter != inactive_tablets_to_flush.end()) {
356
0
      const auto expected_flush_order = iter->second;
357
0
      inactive_tablets_to_flush.erase(iter);
358
0
      LOG(INFO) << "Checking tablet " << tablet_id << " expected order: " << expected_flush_order;
359
0
      ASSERT_GE(expected_flush_order, current_order) << "Tablet was flushed not in order: "
360
0
                                                      << tablet_id;
361
0
      current_order = std::max(current_order, expected_flush_order);
362
0
    } else {
363
0
      ASSERT_EQ(inactive_tablets.count(tablet_id), 0)
364
0
          << "Flushed inactive tablet with empty memstore: " << tablet_id;
365
0
      ASSERT_EQ(inactive_tablets_to_flush.size(), 0)
366
0
          << "Tablet " << tablet_id << " from active table was flushed before inactive tablets "
367
0
                                       "with non-empty memtable: "
368
0
          << yb::ToString(inactive_tablets_to_flush);
369
0
    }
370
0
  }
371
0
  ASSERT_EQ(inactive_tablets_to_flush.size(), 0) << "Some inactive tables with non-empty memtable "
372
0
                                                    "were not flushed: "
373
0
                                                 << yb::ToString(inactive_tablets_to_flush);
374
0
}
375
376
0
TEST_F(FlushITest, TestFlushPicksOldestInactiveTabletAfterCompaction) {
377
0
  TestFlushPicksOldestInactiveTabletAfterCompaction(false /* with_restart */);
378
0
}
379
380
0
TEST_F(FlushITest, TestFlushPicksOldestInactiveTabletAfterCompactionWithRestart) {
381
0
  TestFlushPicksOldestInactiveTabletAfterCompaction(true /* with_restart */);
382
0
}
383
384
} // namespace tserver
385
} // namespace yb