/Users/deen/code/yugabyte-db/src/yb/integration-tests/flush-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <chrono> |
15 | | #include <regex> |
16 | | |
17 | | #include "yb/client/table.h" |
18 | | |
19 | | #include "yb/common/common.pb.h" |
20 | | |
21 | | #include "yb/integration-tests/mini_cluster.h" |
22 | | #include "yb/integration-tests/test_workload.h" |
23 | | |
24 | | #include "yb/rocksdb/db/db_impl.h" |
25 | | #include "yb/rocksdb/memory_monitor.h" |
26 | | #include "yb/rocksdb/util/testutil.h" |
27 | | |
28 | | #include "yb/rpc/rpc_fwd.h" |
29 | | |
30 | | #include "yb/tablet/tablet.h" |
31 | | #include "yb/tablet/tablet_peer.h" |
32 | | |
33 | | #include "yb/tserver/mini_tablet_server.h" |
34 | | #include "yb/tserver/tablet_memory_manager.h" |
35 | | #include "yb/tserver/tablet_server.h" |
36 | | #include "yb/tserver/ts_tablet_manager.h" |
37 | | |
38 | | #include "yb/util/logging.h" |
39 | | #include "yb/util/status_format.h" |
40 | | #include "yb/util/status_log.h" |
41 | | #include "yb/util/test_util.h" |
42 | | |
43 | | using namespace std::literals; |
44 | | |
45 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
46 | | DECLARE_int64(global_memstore_size_percentage); |
47 | | DECLARE_int64(global_memstore_size_mb_max); |
48 | | DECLARE_int32(memstore_size_mb); |
49 | | DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); |
50 | | DECLARE_int32(rocksdb_max_background_flushes); |
51 | | |
52 | | namespace yb { |
53 | | namespace client { |
54 | | class YBTableName; |
55 | | } |
56 | | |
57 | | namespace tserver { |
58 | | |
59 | | namespace { |
60 | | |
61 | | constexpr auto kWaitDelay = 10ms; |
62 | | |
63 | | class RocksDbListener : public rocksdb::test::FlushedFileCollector { |
64 | | public: |
65 | 0 | void OnCompactionStarted() override { |
66 | 0 | ++num_compactions_started_; |
67 | 0 | } |
68 | | |
69 | 0 | int GetNumCompactionsStarted() { return num_compactions_started_; } |
70 | | |
71 | | private: |
72 | | std::atomic<int> num_compactions_started_; |
73 | | }; |
74 | | |
75 | 0 | Result<TabletId> GetTabletIdFromSstFilename(const std::string& filename) { |
76 | 0 | static std::regex re(R"#(.*/tablet-([^/]+)/\d+\.sst)#"); |
77 | 0 | std::smatch match; |
78 | 0 | if (std::regex_search(filename, match, re)) { |
79 | 0 | return match.str(1); |
80 | 0 | } else { |
81 | 0 | return STATUS_FORMAT(InvalidArgument, "Cannot parse tablet id from SST filename: $0", filename); |
82 | 0 | } |
83 | 0 | } |
84 | | |
85 | | class TabletManagerListener : public tserver::TabletMemoryManagerListenerIf { |
86 | | public: |
87 | 0 | void StartedFlush(const TabletId& tablet_id) override { |
88 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
89 | 0 | flushed_tablets_.push_back(tablet_id); |
90 | 0 | } |
91 | | |
92 | 0 | std::vector<TabletId> GetFlushedTablets() { |
93 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
94 | 0 | return flushed_tablets_; |
95 | 0 | } |
96 | | |
97 | | private: |
98 | | std::mutex mutex_; |
99 | | std::vector<TabletId> flushed_tablets_; |
100 | | }; |
101 | | |
102 | | } // namespace |
103 | | |
104 | | class FlushITest : public YBTest { |
105 | | public: |
106 | 3 | FlushITest() {} |
107 | | |
108 | 3 | void SetUp() override { |
109 | 3 | HybridTime::TEST_SetPrettyToString(true); |
110 | 3 | FLAGS_memstore_size_mb = kOverServerLimitMB; |
111 | | // Set the global memstore to kServerLimitMB. |
112 | 3 | FLAGS_global_memstore_size_percentage = 100; |
113 | 3 | FLAGS_global_memstore_size_mb_max = kServerLimitMB; |
114 | 3 | YBTest::SetUp(); |
115 | | |
116 | 3 | rocksdb_listener_ = std::make_shared<RocksDbListener>(); |
117 | 3 | tablet_manager_listener_ = std::make_shared<TabletManagerListener>(); |
118 | | |
119 | | // Start cluster |
120 | 3 | MiniClusterOptions opts; |
121 | 3 | opts.num_tablet_servers = 1; |
122 | 3 | cluster_.reset(new MiniCluster(opts)); |
123 | 3 | ASSERT_OK(cluster_->Start()); |
124 | | // Patch tablet options inside tablet manager, will be applied to new created tablets. |
125 | 3 | cluster_->GetTabletManager(0)->TEST_tablet_options()->listeners.push_back( |
126 | 3 | rocksdb_listener_); |
127 | | // Patch tablet options inside mini tablet server, will use for new tablet server after |
128 | | // minicluster restart. |
129 | 3 | cluster_->mini_tablet_server(0)->options()->listeners.push_back(rocksdb_listener_); |
130 | 3 | SetupCluster(); |
131 | 3 | SetupWorkload(); |
132 | 3 | } |
133 | | |
134 | 1 | void TearDown() override { |
135 | 1 | workload_->StopAndJoin(); |
136 | 1 | cluster_->Shutdown(); |
137 | 1 | YBTest::TearDown(); |
138 | 1 | } |
139 | | |
140 | 3 | void SetupCluster() { |
141 | 3 | cluster_->GetTabletManager(0)->tablet_memory_manager()->TEST_listeners.push_back( |
142 | 3 | tablet_manager_listener_); |
143 | 3 | } |
144 | | |
145 | | void SetupWorkload( |
146 | 3 | const client::YBTableName& table_name = TestWorkloadOptions::kDefaultTableName) { |
147 | 3 | workload_.reset(new TestWorkload(cluster_.get())); |
148 | 3 | workload_->set_table_name(table_name); |
149 | 3 | workload_->set_timeout_allowed(true); |
150 | 3 | workload_->set_payload_bytes(kPayloadBytes); |
151 | 3 | workload_->set_write_batch_size(1); |
152 | 3 | workload_->set_num_write_threads(4); |
153 | 3 | workload_->set_num_tablets(kNumTablets); |
154 | 3 | workload_->Setup(); |
155 | 3 | } |
156 | | |
157 | | protected: |
158 | 0 | size_t TotalBytesFlushed() { |
159 | 0 | size_t bytes = 0; |
160 | 0 | auto tablet_peers = cluster_->GetTabletPeers(0); |
161 | 0 | for (auto& peer : tablet_peers) { |
162 | 0 | bytes += peer->tablet()->regulardb_statistics()->getTickerCount(rocksdb::FLUSH_WRITE_BYTES); |
163 | 0 | } |
164 | 0 | return bytes; |
165 | 0 | } |
166 | | |
167 | 0 | size_t BytesWritten() { |
168 | 0 | return workload_->rows_inserted() * kPayloadBytes; |
169 | 0 | } |
170 | | |
171 | 0 | void WriteAtLeast(size_t size_bytes) { |
172 | 0 | workload_->Start(); |
173 | 0 | ASSERT_OK(LoggedWaitFor( |
174 | 0 | [this, size_bytes] { return BytesWritten() >= size_bytes; }, 60s, |
175 | 0 | Format("Waiting until we've written at least $0 bytes ...", size_bytes), kWaitDelay)); |
176 | 0 | workload_->StopAndJoin(); |
177 | 0 | LOG(INFO) << "Wrote " << BytesWritten() << " bytes."; |
178 | 0 | } |
179 | | |
180 | 0 | client::YBTableName GetTableName(int i) { |
181 | 0 | auto table_name = TestWorkloadOptions::kDefaultTableName; |
182 | 0 | table_name.set_table_name(Format("$0_$1", table_name.table_name(), i)); |
183 | 0 | return table_name; |
184 | 0 | } |
185 | | |
186 | 0 | int NumRunningFlushes() { |
187 | 0 | int compactions = 0; |
188 | 0 | for (auto& peer : cluster_->GetTabletPeers(0)) { |
189 | 0 | auto* db = pointer_cast<rocksdb::DBImpl*>(peer->tablet()->TEST_db()); |
190 | 0 | if (db) { |
191 | 0 | compactions += db->TEST_NumRunningFlushes(); |
192 | 0 | } |
193 | 0 | } |
194 | 0 | return compactions; |
195 | 0 | } |
196 | | |
197 | 0 | void WriteUntilCompaction() { |
198 | 0 | int num_compactions_started = rocksdb_listener_->GetNumCompactionsStarted(); |
199 | 0 | workload_->Start(); |
200 | 0 | ASSERT_OK(LoggedWaitFor( |
201 | 0 | [this, num_compactions_started] { |
202 | 0 | return rocksdb_listener_->GetNumCompactionsStarted() > num_compactions_started; |
203 | 0 | }, 60s, "Waiting until we've written to start compaction ...", kWaitDelay)); |
204 | 0 | workload_->StopAndJoin(); |
205 | 0 | LOG(INFO) << "Wrote " << BytesWritten() << " bytes."; |
206 | 0 | ASSERT_OK(LoggedWaitFor( |
207 | 0 | [this] { |
208 | 0 | return NumTotalRunningCompactions(cluster_.get()) == 0 && NumRunningFlushes() == 0; |
209 | 0 | }, 60s, "Waiting until compactions and flushes are done ...", kWaitDelay)); |
210 | 0 | } |
211 | | |
212 | 0 | void AddTabletsWithNonEmptyMemTable(std::unordered_map<TabletId, int>* tablets, int order) { |
213 | 0 | for (auto& peer : cluster_->GetTabletPeers(0)) { |
214 | 0 | const auto tablet_id = peer->tablet_id(); |
215 | 0 | if (tablets->count(tablet_id) == 0) { |
216 | 0 | auto* db = pointer_cast<rocksdb::DBImpl*>(peer->tablet()->TEST_db()); |
217 | 0 | if (db) { |
218 | 0 | auto* cf = pointer_cast<rocksdb::ColumnFamilyHandleImpl*>(db->DefaultColumnFamily()); |
219 | 0 | if (cf->cfd()->mem()->num_entries() > 0) { |
220 | 0 | tablets->insert(std::make_pair(tablet_id, order)); |
221 | 0 | } |
222 | 0 | } |
223 | 0 | } |
224 | 0 | } |
225 | 0 | } |
226 | | |
227 | 0 | MemoryMonitor* memory_monitor() { |
228 | 0 | return cluster_->GetTabletManager(0)->memory_monitor(); |
229 | 0 | } |
230 | | |
231 | 0 | size_t GetRocksDbMemoryUsage() { |
232 | 0 | return memory_monitor()->memory_usage(); |
233 | 0 | } |
234 | | |
235 | 0 | void DumpMemoryUsage() { |
236 | 0 | auto* server = cluster_->mini_tablet_server(0)->server(); |
237 | 0 | LOG(INFO) << server->mem_tracker()->FindChild("Tablets")->LogUsage("MEM "); |
238 | 0 | LOG(INFO) << "rocksdb memory usage: " << GetRocksDbMemoryUsage(); |
239 | 0 | } |
240 | | |
241 | | void TestFlushPicksOldestInactiveTabletAfterCompaction(bool with_restart); |
242 | | |
243 | | const int32_t kServerLimitMB = 2; |
244 | | // Used to set memstore limit to value higher than server limit, so flushes are only being |
245 | | // triggered by memory monitor which we want to test. |
246 | | const int32_t kOverServerLimitMB = kServerLimitMB * 10; |
247 | | const int kNumTablets = 3; |
248 | | const size_t kPayloadBytes = 8_KB; |
249 | | std::unique_ptr<MiniCluster> cluster_; |
250 | | std::unique_ptr<TestWorkload> workload_; |
251 | | std::shared_ptr<RocksDbListener> rocksdb_listener_; |
252 | | std::shared_ptr<TabletManagerListener> tablet_manager_listener_; |
253 | | }; |
254 | | |
255 | 0 | TEST_F(FlushITest, TestFlushHappens) { |
256 | 0 | const size_t flushed_before_writes = TotalBytesFlushed(); |
257 | 0 | WriteAtLeast((kServerLimitMB * 1_MB) + 1); |
258 | 0 | ASSERT_OK(WaitFor( |
259 | 0 | [this, flushed_before_writes] { |
260 | 0 | return TotalBytesFlushed() > flushed_before_writes; }, |
261 | 0 | 60s, "Flush", 10ms)); |
262 | 0 | const size_t flushed_after_writes = TotalBytesFlushed() - flushed_before_writes; |
263 | 0 | LOG(INFO) << "Flushed " << flushed_after_writes << " bytes."; |
264 | 0 | ASSERT_GT(flushed_after_writes, 0); |
265 | 0 | } |
266 | | |
267 | 0 | void FlushITest::TestFlushPicksOldestInactiveTabletAfterCompaction(bool with_restart) { |
268 | | // Trigger compaction early. |
269 | 0 | FLAGS_rocksdb_level0_file_num_compaction_trigger = 2; |
270 | | |
271 | | // Set memstore limit to 1/nth of server limit so we can cause memory usage with enough |
272 | | // granularity. |
273 | 0 | FLAGS_memstore_size_mb = kServerLimitMB / 5; |
274 | 0 | std::unordered_map<TabletId, int> inactive_tablets_to_flush; |
275 | | |
276 | | // Write to tables until compaction started and until we occupy 50% of kServerLimitMB by |
277 | | // memtables. |
278 | 0 | int tables = 1; // First empty table is created by test setup. |
279 | 0 | while (GetRocksDbMemoryUsage() < kServerLimitMB * 1_MB / 2) { |
280 | 0 | SetupWorkload(GetTableName(tables)); |
281 | 0 | WriteUntilCompaction(); |
282 | 0 | AddTabletsWithNonEmptyMemTable(&inactive_tablets_to_flush, tables); |
283 | 0 | tables++; |
284 | 0 | } |
285 | |
|
286 | 0 | LOG(INFO) << "Tablets to flush: " << inactive_tablets_to_flush.size(); |
287 | |
|
288 | 0 | std::unordered_set<TabletId> inactive_tablets; |
289 | 0 | for (auto& peer : cluster_->GetTabletPeers(0)) { |
290 | 0 | inactive_tablets.insert(peer->tablet_id()); |
291 | 0 | } |
292 | |
|
293 | 0 | DumpMemoryUsage(); |
294 | |
|
295 | 0 | if (with_restart) { |
296 | | // We want to restore memtables from log on restart to test how memory monitor based flush |
297 | | // works after restart. |
298 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
299 | 0 | LOG(INFO) << "Restarting cluster ..."; |
300 | 0 | ASSERT_OK(cluster_->RestartSync()); |
301 | 0 | LOG(INFO) << "Cluster has been restarted"; |
302 | 0 | SetupCluster(); |
303 | 0 | DumpMemoryUsage(); |
304 | 0 | } |
305 | |
|
306 | 0 | rocksdb_listener_->Clear(); |
307 | |
|
308 | 0 | FLAGS_memstore_size_mb = kOverServerLimitMB; |
309 | 0 | SetupWorkload(GetTableName(tables)); |
310 | 0 | tables++; |
311 | 0 | WriteAtLeast((kServerLimitMB * 1_MB) + 1); |
312 | |
|
313 | 0 | DumpMemoryUsage(); |
314 | |
|
315 | 0 | ASSERT_OK(LoggedWaitFor( |
316 | 0 | [this] { return !memory_monitor()->Exceeded(); }, 30s, |
317 | 0 | "Waiting until memory is freed by flushes...", kWaitDelay)); |
318 | |
|
319 | 0 | LOG(INFO) << "Tables: " << tables; |
320 | |
|
321 | 0 | ERROR_NOT_OK( |
322 | 0 | LoggedWaitFor( |
323 | 0 | [this, &inactive_tablets_to_flush] { |
324 | 0 | const auto flushed_tablets = tablet_manager_listener_->GetFlushedTablets().size(); |
325 | 0 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Flushed tablets: " << flushed_tablets; |
326 | 0 | return flushed_tablets >= inactive_tablets_to_flush.size(); |
327 | 0 | }, 30s, |
328 | 0 | "Waiting until flush started for all inactive tablets with non-empty memtable ...", |
329 | 0 | kWaitDelay), |
330 | 0 | "Flush wasn't started for some inactive tablets with non-empty memtable"); |
331 | |
|
332 | 0 | auto flushed_tablets = tablet_manager_listener_->GetFlushedTablets(); |
333 | 0 | LOG(INFO) << "Flushed tablets: " << flushed_tablets.size(); |
334 | |
|
335 | 0 | { |
336 | 0 | std::unordered_set<TabletId> flushed_tablets_not_completed( |
337 | 0 | flushed_tablets.begin(), flushed_tablets.end()); |
338 | 0 | ERROR_NOT_OK( |
339 | 0 | LoggedWaitFor( |
340 | 0 | [&] { |
341 | 0 | for (const auto& filename : rocksdb_listener_->GetAndClearFlushedFiles()) { |
342 | 0 | const auto tablet_id = CHECK_RESULT(GetTabletIdFromSstFilename(filename)); |
343 | 0 | flushed_tablets_not_completed.erase(tablet_id); |
344 | 0 | } |
345 | 0 | return flushed_tablets_not_completed.size() == 0; |
346 | 0 | }, 30s, "Waiting for flushes to complete ...", kWaitDelay), |
347 | 0 | "Some flushes weren't completed"); |
348 | 0 | ASSERT_EQ(flushed_tablets_not_completed.size(), 0) << "Flush wasn't completed for tablets: " |
349 | 0 | << yb::ToString(flushed_tablets_not_completed); |
350 | 0 | } |
351 | | |
352 | 0 | int current_order = 0; |
353 | 0 | for (const auto& tablet_id : flushed_tablets) { |
354 | 0 | auto iter = inactive_tablets_to_flush.find(tablet_id); |
355 | 0 | if (iter != inactive_tablets_to_flush.end()) { |
356 | 0 | const auto expected_flush_order = iter->second; |
357 | 0 | inactive_tablets_to_flush.erase(iter); |
358 | 0 | LOG(INFO) << "Checking tablet " << tablet_id << " expected order: " << expected_flush_order; |
359 | 0 | ASSERT_GE(expected_flush_order, current_order) << "Tablet was flushed not in order: " |
360 | 0 | << tablet_id; |
361 | 0 | current_order = std::max(current_order, expected_flush_order); |
362 | 0 | } else { |
363 | 0 | ASSERT_EQ(inactive_tablets.count(tablet_id), 0) |
364 | 0 | << "Flushed inactive tablet with empty memstore: " << tablet_id; |
365 | 0 | ASSERT_EQ(inactive_tablets_to_flush.size(), 0) |
366 | 0 | << "Tablet " << tablet_id << " from active table was flushed before inactive tablets " |
367 | 0 | "with non-empty memtable: " |
368 | 0 | << yb::ToString(inactive_tablets_to_flush); |
369 | 0 | } |
370 | 0 | } |
371 | 0 | ASSERT_EQ(inactive_tablets_to_flush.size(), 0) << "Some inactive tables with non-empty memtable " |
372 | 0 | "were not flushed: " |
373 | 0 | << yb::ToString(inactive_tablets_to_flush); |
374 | 0 | } |
375 | | |
376 | 0 | TEST_F(FlushITest, TestFlushPicksOldestInactiveTabletAfterCompaction) { |
377 | 0 | TestFlushPicksOldestInactiveTabletAfterCompaction(false /* with_restart */); |
378 | 0 | } |
379 | | |
380 | 0 | TEST_F(FlushITest, TestFlushPicksOldestInactiveTabletAfterCompactionWithRestart) { |
381 | 0 | TestFlushPicksOldestInactiveTabletAfterCompaction(true /* with_restart */); |
382 | 0 | } |
383 | | |
384 | | } // namespace tserver |
385 | | } // namespace yb |