/Users/deen/code/yugabyte-db/src/yb/tserver/ts_tablet_manager-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <memory> |
34 | | #include <set> |
35 | | #include <string> |
36 | | #include <vector> |
37 | | |
38 | | #include <gtest/gtest.h> |
39 | | |
40 | | #include "yb/common/common.pb.h" |
41 | | #include "yb/common/index.h" |
42 | | #include "yb/common/partition.h" |
43 | | #include "yb/common/schema.h" |
44 | | |
45 | | #include "yb/consensus/consensus.pb.h" |
46 | | #include "yb/consensus/consensus_round.h" |
47 | | #include "yb/consensus/metadata.pb.h" |
48 | | #include "yb/consensus/raft_consensus.h" |
49 | | |
50 | | #include "yb/docdb/docdb_rocksdb_util.h" |
51 | | |
52 | | #include "yb/fs/fs_manager.h" |
53 | | |
54 | | #include "yb/master/master_heartbeat.pb.h" |
55 | | |
56 | | #include "yb/rocksdb/db.h" |
57 | | #include "yb/rocksdb/rate_limiter.h" |
58 | | |
59 | | #include "yb/tablet/tablet-harness.h" |
60 | | #include "yb/tablet/tablet.h" |
61 | | #include "yb/tablet/tablet_metadata.h" |
62 | | #include "yb/tablet/tablet_peer.h" |
63 | | |
64 | | #include "yb/tserver/mini_tablet_server.h" |
65 | | #include "yb/tserver/tablet_memory_manager.h" |
66 | | #include "yb/tserver/tablet_server.h" |
67 | | #include "yb/tserver/ts_tablet_manager.h" |
68 | | |
69 | | #include "yb/util/format.h" |
70 | | #include "yb/util/test_util.h" |
71 | | |
72 | | #define ASSERT_REPORT_HAS_UPDATED_TABLET(report, tablet_id) \ |
73 | 4 | ASSERT_NO_FATALS(AssertReportHasUpdatedTablet(report, tablet_id)) |
74 | | |
75 | | #define ASSERT_MONOTONIC_REPORT_SEQNO(report_seqno, tablet_report) \ |
76 | 8 | ASSERT_NO_FATALS(AssertMonotonicReportSeqno(report_seqno, tablet_report)) |
77 | | |
78 | | DECLARE_bool(TEST_pretend_memory_exceeded_enforce_flush); |
79 | | DECLARE_bool(TEST_tserver_disable_heartbeat); |
80 | | DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec); |
81 | | DECLARE_string(rocksdb_compact_flush_rate_limit_sharing_mode); |
82 | | |
83 | | namespace yb { |
84 | | namespace tserver { |
85 | | |
86 | | using consensus::kInvalidOpIdIndex; |
87 | | using consensus::RaftConfigPB; |
88 | | using consensus::ConsensusRound; |
89 | | using consensus::ConsensusRoundPtr; |
90 | | using consensus::ReplicateMsg; |
91 | | using docdb::RateLimiterSharingMode; |
92 | | using master::ReportedTabletPB; |
93 | | using master::TabletReportPB; |
94 | | using master::TabletReportUpdatesPB; |
95 | | using strings::Substitute; |
96 | | using tablet::TabletPeer; |
97 | | using gflags::FlagSaver; |
98 | | |
99 | | static const char* const kTableId = "my-table-id"; |
100 | | static const char* const kTabletId = "my-tablet-id"; |
101 | | static const int kConsensusRunningWaitMs = 10000; |
102 | | static const int kDrivesNum = 4; |
103 | | |
104 | | class TsTabletManagerTest : public YBTest { |
105 | | public: |
106 | | TsTabletManagerTest() |
107 | 7 | : schema_({ ColumnSchema("key", UINT32) }, 1) { |
108 | 7 | } |
109 | | |
110 | 73 | string GetDrivePath(int index) { |
111 | 73 | return JoinPathSegments(test_data_root_, Substitute("drive-$0", index + 1)); |
112 | 73 | } |
113 | | |
114 | 18 | void CreateMiniTabletServer() { |
115 | 18 | auto options_result = TabletServerOptions::CreateTabletServerOptions(); |
116 | 18 | ASSERT_OK(options_result); |
117 | 18 | std::vector<std::string> paths; |
118 | 90 | for (int i = 0; i < kDrivesNum; ++i) { |
119 | 72 | auto s = GetDrivePath(i); |
120 | 72 | ASSERT_OK(env_->CreateDirs(s)); |
121 | 72 | paths.push_back(s); |
122 | 72 | } |
123 | 18 | mini_server_ = std::make_unique<MiniTabletServer>(paths, paths, 0, *options_result, 0); |
124 | 18 | } |
125 | | |
126 | 7 | void SetUp() override { |
127 | 7 | YBTest::SetUp(); |
128 | | |
129 | | // Requred before tserver creation as using of `mini_server_->FailHeartbeats()` |
130 | | // does not guarantee the heartbeat events is off immediately and a couple of events |
131 | | // may happen until heartbeat's thread sees the effect of `mini_server_->FailHeartbeats()` |
132 | 7 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_tserver_disable_heartbeat) = true; |
133 | | |
134 | 7 | test_data_root_ = GetTestPath("TsTabletManagerTest-fsroot"); |
135 | 7 | CreateMiniTabletServer(); |
136 | 7 | ASSERT_OK(mini_server_->Start()); |
137 | 7 | mini_server_->FailHeartbeats(); |
138 | | |
139 | 7 | config_ = mini_server_->CreateLocalConfig(); |
140 | | |
141 | 7 | tablet_manager_ = mini_server_->server()->tablet_manager(); |
142 | 7 | fs_manager_ = mini_server_->server()->fs_manager(); |
143 | 7 | } |
144 | | |
145 | 6 | void TearDown() override { |
146 | 6 | if (mini_server_) { |
147 | 6 | mini_server_->Shutdown(); |
148 | 6 | } |
149 | 6 | } |
150 | | |
151 | | Status CreateNewTablet(const std::string& table_id, |
152 | | const std::string& tablet_id, |
153 | | const Schema& schema, |
154 | 34 | std::shared_ptr<tablet::TabletPeer>* out_tablet_peer) { |
155 | 34 | Schema full_schema = SchemaBuilder(schema).Build(); |
156 | 34 | std::pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition(full_schema); |
157 | | |
158 | 34 | auto table_info = std::make_shared<tablet::TableInfo>( |
159 | 34 | table_id, tablet_id, tablet_id, TableType::DEFAULT_TABLE_TYPE, full_schema, IndexMap(), |
160 | 34 | boost::none /* index_info */, 0 /* schema_version */, partition.first); |
161 | 34 | auto tablet_peer = VERIFY_RESULT(tablet_manager_->CreateNewTablet( |
162 | 34 | table_info, tablet_id, partition.second, config_)); |
163 | 34 | if (out_tablet_peer) { |
164 | 15 | (*out_tablet_peer) = tablet_peer; |
165 | 15 | } |
166 | | |
167 | 34 | RETURN_NOT_OK(tablet_peer->WaitUntilConsensusRunning( |
168 | 34 | MonoDelta::FromMilliseconds(kConsensusRunningWaitMs))); |
169 | | |
170 | 33 | return tablet_peer->consensus()->EmulateElection(); |
171 | 34 | } |
172 | | |
173 | 4 | void Reload() { |
174 | 4 | LOG(INFO) << "Shutting down tablet manager"; |
175 | 4 | mini_server_->Shutdown(); |
176 | 4 | LOG(INFO) << "Restarting tablet manager"; |
177 | 4 | ASSERT_NO_FATAL_FAILURE(CreateMiniTabletServer()); |
178 | 4 | ASSERT_OK(mini_server_->Start()); |
179 | 3 | ASSERT_OK(mini_server_->WaitStarted()); |
180 | 3 | tablet_manager_ = mini_server_->server()->tablet_manager(); |
181 | 3 | } |
182 | | |
183 | 5 | void AddTablets(size_t num, TSTabletManager::TabletPeers* peers = nullptr) { |
184 | | // Add series of tablets |
185 | 5 | ASSERT_NE(num, 0); |
186 | 15 | for (size_t i = 0; i < num; ++i) { |
187 | 10 | std::shared_ptr<TabletPeer> peer; |
188 | 10 | const auto tid = Format("tablet-$0", peers->size()); |
189 | 10 | ASSERT_OK(CreateNewTablet(kTableId, tid, schema_, &peer)); |
190 | 10 | ASSERT_EQ(tid, peer->tablet()->tablet_id()); |
191 | 10 | if (peers) { |
192 | 10 | peers->push_back(peer); |
193 | 10 | } |
194 | 10 | } |
195 | 5 | } |
196 | | |
197 | | Result<TSTabletManager::TabletPeers> GetPeers( |
198 | 3 | boost::optional<size_t> expected_count = boost::none) { |
199 | 3 | auto peers = tablet_manager_->GetTabletPeers(nullptr); |
200 | 3 | if (expected_count.has_value()) { |
201 | 3 | SCHECK_EQ(*expected_count, peers.size(), IllegalState, "Unexpected number of peers"); |
202 | 3 | } |
203 | 3 | return std::move(peers); |
204 | 3 | } |
205 | | |
206 | | protected: |
207 | | std::unique_ptr<MiniTabletServer> mini_server_; |
208 | | FsManager* fs_manager_; |
209 | | TSTabletManager* tablet_manager_; |
210 | | |
211 | | Schema schema_; |
212 | | RaftConfigPB config_; |
213 | | |
214 | | string test_data_root_; |
215 | | }; |
216 | | |
217 | 1 | TEST_F(TsTabletManagerTest, TestCreateTablet) { |
218 | | // Create a new tablet. |
219 | 1 | std::shared_ptr<TabletPeer> peer; |
220 | 1 | ASSERT_OK(CreateNewTablet(kTableId, kTabletId, schema_, &peer)); |
221 | 1 | ASSERT_EQ(kTabletId, peer->tablet()->tablet_id()); |
222 | 1 | peer.reset(); |
223 | | |
224 | | // Re-load the tablet manager from the filesystem. |
225 | 1 | LOG(INFO) << "Shutting down tablet manager"; |
226 | 1 | mini_server_->Shutdown(); |
227 | 1 | LOG(INFO) << "Restarting tablet manager"; |
228 | 1 | CreateMiniTabletServer(); |
229 | 1 | ASSERT_OK(mini_server_->Start()); |
230 | 1 | ASSERT_OK(mini_server_->WaitStarted()); |
231 | 1 | tablet_manager_ = mini_server_->server()->tablet_manager(); |
232 | | |
233 | | // Ensure that the tablet got re-loaded and re-opened off disk. |
234 | 1 | ASSERT_TRUE(tablet_manager_->LookupTablet(kTabletId, &peer)); |
235 | 1 | ASSERT_EQ(kTabletId, peer->tablet()->tablet_id()); |
236 | 1 | } |
237 | | |
238 | 1 | TEST_F(TsTabletManagerTest, TestTombstonedTabletsAreUnregistered) { |
239 | 1 | const std::string kTableId = "my-table-id"; |
240 | 1 | const std::string kTabletId1 = "my-tablet-id-1"; |
241 | 1 | const std::string kTabletId2 = "my-tablet-id-2"; |
242 | | |
243 | 3 | auto shutdown_tserver_and_reload_tablet_manager = [this]() { |
244 | | // Re-load the tablet manager from the filesystem. |
245 | 3 | LOG(INFO) << "Shutting down tablet manager"; |
246 | 3 | mini_server_->Shutdown(); |
247 | 3 | LOG(INFO) << "Restarting tablet manager"; |
248 | 3 | CreateMiniTabletServer(); |
249 | 3 | ASSERT_OK(mini_server_->Start()); |
250 | 3 | ASSERT_OK(mini_server_->WaitStarted()); |
251 | 3 | tablet_manager_ = mini_server_->server()->tablet_manager(); |
252 | 3 | }; |
253 | | |
254 | 1 | auto count_tablet_in_assignment_map = |
255 | 1 | [&kTableId](const TSTabletManager::TableDiskAssignmentMap* table_assignment_map, |
256 | 24 | const std::string& tablet_id) { |
257 | 24 | auto table_assignment_iter = table_assignment_map->find(kTableId); |
258 | 24 | EXPECT_NE(table_assignment_iter, table_assignment_map->end()); |
259 | | // the number of data directories for this table should be non-empty. |
260 | 24 | EXPECT_GT(table_assignment_iter->second.size(), 0); |
261 | 24 | int tablet_count = 0; |
262 | 96 | for (const auto& tablet_assignment_iter : table_assignment_iter->second) { |
263 | | // directory_map maps a directory name to a set of tablet ids. |
264 | 32 | for (const TabletId& tablet : tablet_assignment_iter.second) { |
265 | 32 | if (tablet_id == tablet) { |
266 | 16 | tablet_count++; |
267 | 16 | } |
268 | 32 | } |
269 | 96 | } |
270 | 24 | return tablet_count; |
271 | 24 | }; |
272 | | |
273 | 1 | auto assert_tablet_assignment_count = |
274 | 12 | [this, &count_tablet_in_assignment_map](const std::string& tablet_id, int count) { |
275 | 12 | ASSERT_EQ( |
276 | 12 | count_tablet_in_assignment_map(&tablet_manager_->table_data_assignment_map_, tablet_id), |
277 | 12 | count); |
278 | 12 | ASSERT_EQ( |
279 | 12 | count_tablet_in_assignment_map(&tablet_manager_->table_wal_assignment_map_, tablet_id), |
280 | 12 | count); |
281 | 12 | }; |
282 | | |
283 | | // Create a new tablet. |
284 | 1 | std::shared_ptr<TabletPeer> peer; |
285 | 1 | ASSERT_OK(CreateNewTablet(kTableId, kTabletId1, schema_, &peer)); |
286 | 1 | ASSERT_EQ(kTabletId1, peer->tablet()->tablet_id()); |
287 | 1 | peer.reset(); |
288 | 1 | ASSERT_OK(CreateNewTablet(kTableId, kTabletId2, schema_, &peer)); |
289 | 1 | ASSERT_EQ(kTabletId2, peer->tablet()->tablet_id()); |
290 | | |
291 | 1 | assert_tablet_assignment_count(kTabletId1, 1); |
292 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
293 | | |
294 | 1 | shutdown_tserver_and_reload_tablet_manager(); |
295 | | |
296 | 1 | assert_tablet_assignment_count(kTabletId1, 1); |
297 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
298 | | |
299 | 1 | boost::optional<int64_t> cas_config_opid_index_less_or_equal; |
300 | 1 | boost::optional<TabletServerErrorPB::Code> error_code; |
301 | 1 | ASSERT_OK(tablet_manager_->DeleteTablet(kTabletId1, |
302 | 1 | tablet::TABLET_DATA_TOMBSTONED, |
303 | 1 | cas_config_opid_index_less_or_equal, |
304 | 1 | false, |
305 | 1 | &error_code)); |
306 | | |
307 | 1 | assert_tablet_assignment_count(kTabletId1, 0); |
308 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
309 | | |
310 | 1 | shutdown_tserver_and_reload_tablet_manager(); |
311 | | |
312 | 1 | assert_tablet_assignment_count(kTabletId1, 0); |
313 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
314 | | |
315 | 1 | ASSERT_OK(tablet_manager_->DeleteTablet(kTabletId1, |
316 | 1 | tablet::TABLET_DATA_DELETED, |
317 | 1 | cas_config_opid_index_less_or_equal, |
318 | 1 | false, |
319 | 1 | &error_code)); |
320 | | |
321 | 1 | assert_tablet_assignment_count(kTabletId1, 0); |
322 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
323 | | |
324 | 1 | shutdown_tserver_and_reload_tablet_manager(); |
325 | | |
326 | 1 | assert_tablet_assignment_count(kTabletId1, 0); |
327 | 1 | assert_tablet_assignment_count(kTabletId2, 1); |
328 | 1 | } |
329 | | |
330 | 1 | TEST_F(TsTabletManagerTest, TestProperBackgroundFlushOnStartup) { |
331 | 1 | FlagSaver flag_saver; |
332 | 1 | FLAGS_TEST_pretend_memory_exceeded_enforce_flush = true; |
333 | | |
334 | 1 | const int kNumTablets = 2; |
335 | 1 | const int kNumRestarts = 3; |
336 | | |
337 | 1 | std::vector<TabletId> tablet_ids; |
338 | 1 | std::vector<ConsensusRoundPtr> consensus_rounds; |
339 | | |
340 | 3 | for (int i = 0; i < kNumTablets; ++i) { |
341 | 2 | std::shared_ptr<TabletPeer> peer; |
342 | 2 | const auto tablet_id = Format("my-tablet-$0", i + 1); |
343 | 2 | tablet_ids.emplace_back(tablet_id); |
344 | 2 | ASSERT_OK(CreateNewTablet(kTableId, tablet_id, schema_, &peer)); |
345 | 2 | ASSERT_EQ(tablet_id, peer->tablet()->tablet_id()); |
346 | | |
347 | 2 | auto replicate_ptr = std::make_shared<ReplicateMsg>(); |
348 | 2 | replicate_ptr->set_op_type(consensus::NO_OP); |
349 | 2 | replicate_ptr->set_hybrid_time(peer->clock().Now().ToUint64()); |
350 | 2 | ConsensusRoundPtr round(new ConsensusRound(peer->consensus(), std::move(replicate_ptr))); |
351 | 2 | consensus_rounds.emplace_back(round); |
352 | 2 | round->BindToTerm(peer->raft_consensus()->TEST_LeaderTerm()); |
353 | 2 | round->SetCallback(consensus::MakeNonTrackedRoundCallback(round.get(), [](const Status&){})); |
354 | 2 | ASSERT_OK(peer->consensus()->TEST_Replicate(round)); |
355 | 2 | } |
356 | | |
357 | 4 | for (int i = 0; i < kNumRestarts; ++i) { |
358 | 3 | LOG(INFO) << "Shutting down tablet manager"; |
359 | 3 | mini_server_->Shutdown(); |
360 | 3 | LOG(INFO) << "Restarting tablet manager"; |
361 | 3 | CreateMiniTabletServer(); |
362 | 3 | ASSERT_OK(mini_server_->Start()); |
363 | 3 | auto* tablet_manager = mini_server_->server()->tablet_manager(); |
364 | 3 | ASSERT_NE(nullptr, tablet_manager); |
365 | 3 | tablet_manager->tablet_memory_manager()->FlushTabletIfLimitExceeded(); |
366 | 3 | ASSERT_OK(mini_server_->WaitStarted()); |
367 | 6 | for (auto& tablet_id : tablet_ids) { |
368 | 6 | std::shared_ptr<TabletPeer> peer; |
369 | 6 | ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &peer)); |
370 | 6 | ASSERT_EQ(tablet_id, peer->tablet()->tablet_id()); |
371 | 6 | } |
372 | 3 | } |
373 | 1 | } |
374 | | |
375 | | static void AssertMonotonicReportSeqno(int32_t* report_seqno, |
376 | 9 | const TabletReportPB& report) { |
377 | 9 | ASSERT_LT(*report_seqno, report.sequence_number()); |
378 | 9 | *report_seqno = report.sequence_number(); |
379 | 9 | } |
380 | | |
381 | | static void AssertReportHasUpdatedTablet(const TabletReportPB& report, |
382 | 4 | const string& tablet_id) { |
383 | 4 | ASSERT_GE(report.updated_tablets_size(), 0); |
384 | 4 | bool found_tablet = false; |
385 | 6 | for (ReportedTabletPB reported_tablet : report.updated_tablets()) { |
386 | 6 | if (reported_tablet.tablet_id() == tablet_id) { |
387 | 4 | found_tablet = true; |
388 | 4 | ASSERT_TRUE(reported_tablet.has_committed_consensus_state()); |
389 | 8 | ASSERT_TRUE(reported_tablet.committed_consensus_state().has_current_term()) |
390 | 8 | << reported_tablet.ShortDebugString(); |
391 | 8 | ASSERT_TRUE(reported_tablet.committed_consensus_state().has_leader_uuid()) |
392 | 8 | << reported_tablet.ShortDebugString(); |
393 | 4 | ASSERT_TRUE(reported_tablet.committed_consensus_state().has_config()); |
394 | 4 | const RaftConfigPB& committed_config = reported_tablet.committed_consensus_state().config(); |
395 | 4 | ASSERT_EQ(kInvalidOpIdIndex, committed_config.opid_index()); |
396 | 4 | ASSERT_EQ(1, committed_config.peers_size()); |
397 | 8 | ASSERT_TRUE(committed_config.peers(0).has_permanent_uuid()) |
398 | 8 | << reported_tablet.ShortDebugString(); |
399 | 8 | ASSERT_EQ(committed_config.peers(0).permanent_uuid(), |
400 | 8 | reported_tablet.committed_consensus_state().leader_uuid()) |
401 | 8 | << reported_tablet.ShortDebugString(); |
402 | 4 | } |
403 | 6 | } |
404 | 4 | ASSERT_TRUE(found_tablet); |
405 | 4 | } |
406 | | |
407 | 7 | static void CopyReportToUpdates(const TabletReportPB& req, TabletReportUpdatesPB* resp) { |
408 | 7 | resp->Clear(); |
409 | 2 | for (const auto & tablet : req.updated_tablets()) { |
410 | 2 | auto new_tablet = resp->add_tablets(); |
411 | 2 | new_tablet->set_tablet_id(tablet.tablet_id()); |
412 | 2 | } |
413 | 7 | } |
414 | | |
415 | 1 | TEST_F(TsTabletManagerTest, TestTabletReports) { |
416 | 1 | TabletReportPB report; |
417 | 1 | TabletReportUpdatesPB updates; |
418 | 1 | int32_t seqno = -1; |
419 | | |
420 | | // Generate a tablet report before any tablets are loaded. Should be empty. |
421 | 1 | tablet_manager_->StartFullTabletReport(&report); |
422 | 1 | ASSERT_EQ(0, report.updated_tablets().size()); |
423 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
424 | 1 | CopyReportToUpdates(report, &updates); |
425 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
426 | | |
427 | | // Another report should now be incremental, but with no changes. |
428 | 1 | tablet_manager_->GenerateTabletReport(&report); |
429 | 1 | ASSERT_EQ(0, report.updated_tablets().size()); |
430 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
431 | 1 | CopyReportToUpdates(report, &updates); |
432 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
433 | | |
434 | | // Create a tablet and do another incremental report - should include the tablet. |
435 | 1 | ASSERT_OK(CreateNewTablet(kTableId, "tablet-1", schema_, nullptr)); |
436 | 1 | int updated_tablets = 0; |
437 | 2 | while (updated_tablets != 1) { |
438 | 1 | tablet_manager_->GenerateTabletReport(&report); |
439 | 1 | updated_tablets = report.updated_tablets().size(); |
440 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
441 | 1 | } |
442 | | |
443 | 1 | ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
444 | | |
445 | | // If we don't acknowledge the report, and ask for another incremental report, |
446 | | // it should include the tablet again. |
447 | 1 | tablet_manager_->GenerateTabletReport(&report); |
448 | 1 | ASSERT_EQ(1, report.updated_tablets().size()); |
449 | 1 | ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
450 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
451 | | |
452 | | // Now acknowledge the last report, and further incrementals should be empty. |
453 | 1 | CopyReportToUpdates(report, &updates); |
454 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
455 | 1 | tablet_manager_->GenerateTabletReport(&report); |
456 | 1 | ASSERT_EQ(0, report.updated_tablets().size()); |
457 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
458 | 1 | CopyReportToUpdates(report, &updates); |
459 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
460 | | |
461 | | // Create a second tablet, and ensure the incremental report shows it. |
462 | 1 | ASSERT_OK(CreateNewTablet(kTableId, "tablet-2", schema_, nullptr)); |
463 | | |
464 | | // Wait up to 10 seconds to get a tablet report from tablet-2. |
465 | | // TabletPeer does not mark tablets dirty until after it commits the |
466 | | // initial configuration change, so there is also a window for tablet-1 to |
467 | | // have been marked dirty since the last report. |
468 | 1 | MonoDelta timeout(MonoDelta::FromSeconds(10)); |
469 | 1 | MonoTime start(MonoTime::Now()); |
470 | 1 | report.Clear(); |
471 | 1 | while (true) { |
472 | 1 | bool found_tablet_2 = false; |
473 | 1 | tablet_manager_->GenerateTabletReport(&report); |
474 | 2 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report) << report.ShortDebugString(); |
475 | 1 | for (const ReportedTabletPB& reported_tablet : report.updated_tablets()) { |
476 | 1 | if (reported_tablet.tablet_id() == "tablet-2") { |
477 | 1 | found_tablet_2 = true; |
478 | 1 | break; |
479 | 1 | } |
480 | 1 | } |
481 | 1 | if (found_tablet_2) break; |
482 | 0 | MonoDelta elapsed(MonoTime::Now().GetDeltaSince(start)); |
483 | 0 | ASSERT_TRUE(elapsed.LessThan(timeout)) << "Waited too long for tablet-2 to be marked dirty: " |
484 | 0 | << elapsed.ToString() << ". " |
485 | 0 | << "Latest report: " << report.ShortDebugString(); |
486 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
487 | 0 | } |
488 | | |
489 | 1 | CopyReportToUpdates(report, &updates); |
490 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
491 | | |
492 | | // Asking for a full tablet report should re-report both tablets |
493 | 1 | tablet_manager_->StartFullTabletReport(&report); |
494 | 1 | ASSERT_EQ(2, report.updated_tablets().size()); |
495 | 1 | ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1"); |
496 | 1 | ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-2"); |
497 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
498 | 1 | } |
499 | | |
500 | 1 | TEST_F(TsTabletManagerTest, TestTabletReportLimit) { |
501 | 1 | TabletReportPB report; |
502 | 1 | TabletReportUpdatesPB updates; |
503 | 1 | int32_t seqno = -1; |
504 | | |
505 | | // Generate a tablet report before any tablets are loaded. Should be empty. |
506 | 1 | tablet_manager_->StartFullTabletReport(&report); |
507 | 1 | ASSERT_EQ(0, report.updated_tablets().size()); |
508 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
509 | 1 | CopyReportToUpdates(report, &updates); |
510 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
511 | | |
512 | | // Another report should now be incremental, but with no changes. |
513 | 1 | tablet_manager_->GenerateTabletReport(&report); |
514 | 1 | ASSERT_EQ(0, report.updated_tablets().size()); |
515 | 1 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
516 | 1 | CopyReportToUpdates(report, &updates); |
517 | 1 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
518 | | |
519 | | // Set a report limit and create a set of tablets clearly over that limit. |
520 | 1 | const int32_t limit = 10, total_tablets = 25; |
521 | 1 | tablet_manager_->SetReportLimit(limit); |
522 | 1 | std::set<std::string> tablet_ids, tablet_ids_full; |
523 | 17 | for (int i = 0; i < total_tablets; ++i) { |
524 | 17 | auto id = "tablet-" + std::to_string(i); |
525 | 17 | ASSERT_OK(CreateNewTablet(kTableId, id, schema_, nullptr)); |
526 | 16 | tablet_ids.insert(id); |
527 | 16 | tablet_ids_full.insert(id); |
528 | 16 | LOG(INFO) << "Adding " << id; |
529 | 16 | } |
530 | | |
531 | | // Ensure that incremental report requests returns all in batches. |
532 | 0 | for (int n = limit, left = total_tablets; left > 0; left -= n, n = std::min(limit, left)) { |
533 | 0 | tablet_manager_->GenerateTabletReport(&report); |
534 | 0 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
535 | 0 | ASSERT_EQ(n, report.updated_tablets().size()); |
536 | 0 | for (auto& t : report.updated_tablets()) { |
537 | 0 | LOG(INFO) << "Erasing " << t.tablet_id(); |
538 | 0 | ASSERT_EQ(1, tablet_ids.erase(t.tablet_id())); |
539 | 0 | } |
540 | 0 | CopyReportToUpdates(report, &updates); |
541 | 0 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
542 | 0 | } |
543 | | |
544 | | // Generate a Full Report and ensure that the same batching occurs. |
545 | 0 | tablet_manager_->StartFullTabletReport(&report); |
546 | 0 | for (int n = limit, left = total_tablets; left > 0; left -= n, n = std::min(limit, left)) { |
547 | 0 | ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report); |
548 | 0 | ASSERT_EQ(n, report.updated_tablets().size()); |
549 | 0 | for (auto& t : report.updated_tablets()) { |
550 | 0 | ASSERT_EQ(1, tablet_ids_full.erase(t.tablet_id())); |
551 | 0 | } |
552 | 0 | CopyReportToUpdates(report, &updates); |
553 | 0 | tablet_manager_->MarkTabletReportAcknowledged(seqno, updates); |
554 | 0 | tablet_manager_->GenerateTabletReport(&report); |
555 | 0 | } |
556 | 0 | ASSERT_EQ(0, report.updated_tablets().size()); // Last incremental report is empty. |
557 | 0 | } |
558 | | |
559 | | namespace { |
560 | | |
561 | 2 | void SetRateLimiterSharingMode(RateLimiterSharingMode mode) { |
562 | 2 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode) = ToString(mode); |
563 | 2 | } |
564 | | |
565 | | Result<size_t> CountUniqueLimiters(const TSTabletManager::TabletPeers& peers, |
566 | 7 | const size_t start_idx = 0) { |
567 | 7 | SCHECK_LT(start_idx, peers.size(), IllegalState, |
568 | 7 | "Start index must be less than number of peers"); |
569 | 7 | std::unordered_set<rocksdb::RateLimiter*> unique; |
570 | 47 | for (size_t i = start_idx; i < peers.size(); ++i) { |
571 | 40 | auto db = peers[i]->tablet()->TEST_db(); |
572 | 40 | SCHECK_NOTNULL(db); |
573 | 40 | auto rl = db->GetDBOptions().rate_limiter.get(); |
574 | 40 | if (rl) { |
575 | 24 | unique.insert(rl); |
576 | 24 | } |
577 | 40 | } |
578 | 7 | return unique.size(); |
579 | 7 | } |
580 | | |
581 | | } // namespace |
582 | | |
583 | 1 | TEST_F(TsTabletManagerTest, RateLimiterSharing) { |
584 | | // The test checks rocksdb::RateLimiter is correctly shared between RocksDB instances |
585 | | // depending on the flags `FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec` and |
586 | | // `FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode`, inlcuding possible effect |
587 | | // of changing flags on-the-fly (emulating forced changed) |
588 | | |
589 | | // No tablets exist, reset flags and reload |
590 | 1 | size_t peers_num = 0; |
591 | 1 | constexpr auto kBPS = 128_MB; |
592 | 1 | SetRateLimiterSharingMode(RateLimiterSharingMode::NONE); |
593 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS; |
594 | 1 | ASSERT_NO_FATAL_FAILURE(Reload()); |
595 | 1 | TSTabletManager::TabletPeers peers = ASSERT_RESULT(GetPeers(peers_num)); |
596 | | |
597 | | // `NONE`: add tablets and make sure they have unique limiters |
598 | 1 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
599 | 1 | ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers))); |
600 | 1 | peers_num = peers.size(); |
601 | | |
602 | | // `NONE`: emulating forced change for bps flag: make sure new unique limiters are created |
603 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS / 2; |
604 | 1 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
605 | 1 | ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers))); |
606 | 1 | peers_num = peers.size(); |
607 | | |
608 | | // `NONE`: emulating forced reset for bps flag: make sure new tablets are added with no limiters |
609 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = 0; |
610 | 1 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
611 | 1 | ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers, peers_num))); |
612 | 1 | peers_num = peers.size(); |
613 | | |
614 | | // `NONE` + no bps: reload the cluster with bps flag unset to make sure no limiters are created |
615 | 1 | ASSERT_NO_FATAL_FAILURE(Reload()); |
616 | 1 | peers = ASSERT_RESULT(GetPeers(peers_num)); |
617 | 1 | ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers))); |
618 | | |
619 | | // `NONE` + no bps: add tablets and make sure limiters are not created |
620 | 1 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
621 | 1 | ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers))); |
622 | 1 | peers_num = peers.size(); |
623 | | |
624 | | // `NONE`: reload the cluster with bps flag set and make sure all limiter are unique |
625 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS; |
626 | 1 | ASSERT_NO_FATAL_FAILURE(Reload()); |
627 | 1 | peers = ASSERT_RESULT(GetPeers(peers_num)); |
628 | 1 | ASSERT_EQ(peers_num, ASSERT_RESULT(CountUniqueLimiters(peers))); |
629 | | |
630 | | // `NONE`: emulating forced change for mode flag: should act as if `NONE` is still set |
631 | 1 | SetRateLimiterSharingMode(RateLimiterSharingMode::TSERVER); |
632 | 1 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
633 | 1 | ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers))); |
634 | 1 | peers_num = peers.size(); |
635 | | |
636 | | // `TSERVER`: reload the cluster to apply `TSERVER` sharing mode |
637 | | // and make sure all tablets share the same rate limiter |
638 | 1 | ASSERT_NO_FATAL_FAILURE(Reload()); |
639 | 0 | peers = ASSERT_RESULT(GetPeers(peers_num)); |
640 | 0 | ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers))); |
641 | | |
642 | | // `TSERVER`: emulating forced change for bps flag: make sure this has no effect on sharing |
643 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS / 2; |
644 | 0 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
645 | 0 | ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers))); |
646 | 0 | peers_num = peers.size(); |
647 | | |
648 | | // `TSERVER`: emulating forced reset for bps flag: make sure this has no effect on sharing |
649 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = 0; |
650 | 0 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
651 | 0 | ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers))); |
652 | 0 | peers_num = peers.size(); |
653 | | |
654 | | // `TSERVER`: emulating forced change for mode flag: |
655 | | // should act as if `TSERVER` is still set |
656 | 0 | SetRateLimiterSharingMode(RateLimiterSharingMode::NONE); |
657 | 0 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
658 | 0 | ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers))); |
659 | 0 | peers_num = peers.size(); |
660 | | |
661 | | // `TSERVER` + no bps: reload the cluster |
662 | | // with bps flag unset to make sure no limiters are created |
663 | 0 | SetRateLimiterSharingMode(RateLimiterSharingMode::TSERVER); |
664 | 0 | ASSERT_NO_FATAL_FAILURE(Reload()); |
665 | 0 | peers = ASSERT_RESULT(GetPeers(peers_num)); |
666 | 0 | ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers))); |
667 | | |
668 | | // `TSERVER` + no bps: add tablets and make sure no limiters are created |
669 | 0 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
670 | 0 | ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers))); |
671 | 0 | peers_num = peers.size(); |
672 | | |
673 | | // `TSERVER` + no bps: emulating forced change for both flags: |
674 | | // should act as a `NONE` is applied with some bps set |
675 | 0 | SetRateLimiterSharingMode(RateLimiterSharingMode::NONE); |
676 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS; |
677 | 0 | ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers)); |
678 | 0 | ASSERT_EQ(2, ASSERT_RESULT(CountUniqueLimiters(peers, peers_num))); |
679 | 0 | peers_num = peers.size(); |
680 | 0 | } |
681 | | |
682 | 1 | TEST_F(TsTabletManagerTest, DataAndWalFilesLocations) { |
683 | 1 | std::string wal; |
684 | 1 | std::string data; |
685 | 1 | auto drive_path_len = GetDrivePath(0).size(); |
686 | 5 | for (int i = 0; i < kDrivesNum; ++i) { |
687 | 4 | tablet_manager_->GetAndRegisterDataAndWalDir(fs_manager_, |
688 | 4 | kTableId, |
689 | 4 | Substitute("tablet-$0", i + 1), |
690 | 4 | &data, |
691 | 4 | &wal); |
692 | 4 | ASSERT_EQ(data.substr(0, drive_path_len), wal.substr(0, drive_path_len)); |
693 | 4 | } |
694 | 1 | } |
695 | | |
696 | | } // namespace tserver |
697 | | } // namespace yb |