/Users/deen/code/yugabyte-db/src/yb/integration-tests/tablet-split-itest-base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/integration-tests/tablet-split-itest-base.h" |
15 | | |
16 | | #include <signal.h> |
17 | | |
18 | | #include <boost/range/adaptor/transformed.hpp> |
19 | | |
20 | | #include "yb/client/client-test-util.h" |
21 | | #include "yb/client/session.h" |
22 | | #include "yb/client/snapshot_test_util.h" |
23 | | #include "yb/client/table_info.h" |
24 | | #include "yb/client/transaction.h" |
25 | | #include "yb/client/yb_op.h" |
26 | | |
27 | | #include "yb/common/schema.h" |
28 | | #include "yb/common/ql_expr.h" |
29 | | #include "yb/common/ql_value.h" |
30 | | #include "yb/common/wire_protocol.h" |
31 | | |
32 | | #include "yb/consensus/consensus.h" |
33 | | #include "yb/consensus/consensus_util.h" |
34 | | |
35 | | #include "yb/docdb/doc_key.h" |
36 | | #include "yb/docdb/ql_rowwise_iterator_interface.h" |
37 | | |
38 | | #include "yb/integration-tests/mini_cluster.h" |
39 | | #include "yb/integration-tests/test_workload.h" |
40 | | |
41 | | #include "yb/master/catalog_entity_info.h" |
42 | | #include "yb/master/master_admin.proxy.h" |
43 | | #include "yb/master/master_client.pb.h" |
44 | | |
45 | | #include "yb/rocksdb/db.h" |
46 | | |
47 | | #include "yb/rpc/messenger.h" |
48 | | |
49 | | #include "yb/tablet/tablet.h" |
50 | | #include "yb/tablet/tablet_metadata.h" |
51 | | #include "yb/tablet/tablet_peer.h" |
52 | | |
53 | | #include "yb/tserver/mini_tablet_server.h" |
54 | | #include "yb/tserver/tserver_service.pb.h" |
55 | | #include "yb/tserver/tserver_service.proxy.h" |
56 | | |
57 | | #include "yb/yql/cql/ql/util/statement_result.h" |
58 | | |
59 | | DECLARE_int32(cleanup_split_tablets_interval_sec); |
60 | | DECLARE_int64(db_block_size_bytes); |
61 | | DECLARE_int64(db_filter_block_size_bytes); |
62 | | DECLARE_int64(db_index_block_size_bytes); |
63 | | DECLARE_int64(db_write_buffer_size); |
64 | | DECLARE_bool(enable_automatic_tablet_splitting); |
65 | | DECLARE_int32(raft_heartbeat_interval_ms); |
66 | | DECLARE_int32(replication_factor); |
67 | | DECLARE_int32(tserver_heartbeat_metrics_interval_ms); |
68 | | DECLARE_bool(TEST_do_not_start_election_test_only); |
69 | | DECLARE_bool(TEST_skip_deleting_split_tablets); |
70 | | DECLARE_bool(TEST_validate_all_tablet_candidates); |
71 | | |
72 | | namespace yb { |
73 | | |
74 | | Result<size_t> SelectRowsCount( |
75 | 0 | const client::YBSessionPtr& session, const client::TableHandle& table) { |
76 | 0 | LOG(INFO) << "Running full scan on test table..."; |
77 | 0 | session->SetTimeout(5s * kTimeMultiplier); |
78 | 0 | QLPagingStatePB paging_state; |
79 | 0 | size_t row_count = 0; |
80 | 0 | for (;;) { |
81 | 0 | const auto op = table.NewReadOp(); |
82 | 0 | auto* const req = op->mutable_request(); |
83 | 0 | req->set_return_paging_state(true); |
84 | 0 | if (paging_state.has_table_id()) { |
85 | 0 | if (paging_state.has_read_time()) { |
86 | 0 | ReadHybridTime read_time = ReadHybridTime::FromPB(paging_state.read_time()); |
87 | 0 | if (read_time) { |
88 | 0 | session->SetReadPoint(read_time); |
89 | 0 | } |
90 | 0 | } |
91 | 0 | session->SetForceConsistentRead(client::ForceConsistentRead::kTrue); |
92 | 0 | *req->mutable_paging_state() = std::move(paging_state); |
93 | 0 | } |
94 | 0 | RETURN_NOT_OK(session->ApplyAndFlush(op)); |
95 | 0 | auto rowblock = ql::RowsResult(op.get()).GetRowBlock(); |
96 | 0 | row_count += rowblock->row_count(); |
97 | 0 | if (!op->response().has_paging_state()) { |
98 | 0 | break; |
99 | 0 | } |
100 | 0 | paging_state = op->response().paging_state(); |
101 | 0 | } |
102 | 0 | return row_count; |
103 | 0 | } |
104 | | |
105 | | void DumpTableLocations( |
106 | 0 | master::CatalogManagerIf* catalog_mgr, const client::YBTableName& table_name) { |
107 | 0 | master::GetTableLocationsResponsePB resp; |
108 | 0 | master::GetTableLocationsRequestPB req; |
109 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
110 | 0 | req.set_max_returned_locations(std::numeric_limits<int32_t>::max()); |
111 | 0 | ASSERT_OK(catalog_mgr->GetTableLocations(&req, &resp)); |
112 | 0 | LOG(INFO) << "Table locations:"; |
113 | 0 | for (auto& tablet : resp.tablet_locations()) { |
114 | 0 | LOG(INFO) << "Tablet: " << tablet.tablet_id() |
115 | 0 | << " partition: " << tablet.partition().ShortDebugString(); |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | 0 | void DumpWorkloadStats(const TestWorkload& workload) { |
120 | 0 | LOG(INFO) << "Rows inserted: " << workload.rows_inserted(); |
121 | 0 | LOG(INFO) << "Rows insert failed: " << workload.rows_insert_failed(); |
122 | 0 | LOG(INFO) << "Rows read ok: " << workload.rows_read_ok(); |
123 | 0 | LOG(INFO) << "Rows read empty: " << workload.rows_read_empty(); |
124 | 0 | LOG(INFO) << "Rows read error: " << workload.rows_read_error(); |
125 | 0 | LOG(INFO) << "Rows read try again: " << workload.rows_read_try_again(); |
126 | 0 | } |
127 | | |
128 | 0 | Status SplitTablet(master::CatalogManagerIf* catalog_mgr, const tablet::Tablet& tablet) { |
129 | 0 | const auto& tablet_id = tablet.tablet_id(); |
130 | 0 | LOG(INFO) << "Tablet: " << tablet_id; |
131 | 0 | LOG(INFO) << "Number of SST files: " << tablet.TEST_db()->GetCurrentVersionNumSSTFiles(); |
132 | 0 | std::string properties; |
133 | 0 | tablet.TEST_db()->GetProperty(rocksdb::DB::Properties::kAggregatedTableProperties, &properties); |
134 | 0 | LOG(INFO) << "DB properties: " << properties; |
135 | |
|
136 | 0 | return catalog_mgr->SplitTablet(tablet_id, true /* select_all_tablets_for_split */); |
137 | 0 | } |
138 | | |
139 | 0 | Status DoSplitTablet(master::CatalogManagerIf* catalog_mgr, const tablet::Tablet& tablet) { |
140 | 0 | const auto& tablet_id = tablet.tablet_id(); |
141 | 0 | LOG(INFO) << "Tablet: " << tablet_id; |
142 | 0 | LOG(INFO) << "Number of SST files: " << tablet.TEST_db()->GetCurrentVersionNumSSTFiles(); |
143 | 0 | std::string properties; |
144 | 0 | tablet.TEST_db()->GetProperty(rocksdb::DB::Properties::kAggregatedTableProperties, &properties); |
145 | 0 | LOG(INFO) << "DB properties: " << properties; |
146 | |
|
147 | 0 | const auto encoded_split_key = VERIFY_RESULT(tablet.GetEncodedMiddleSplitKey()); |
148 | 0 | std::string partition_split_key = encoded_split_key; |
149 | 0 | if (tablet.metadata()->partition_schema()->IsHashPartitioning()) { |
150 | 0 | const auto doc_key_hash = VERIFY_RESULT(docdb::DecodeDocKeyHash(encoded_split_key)).value(); |
151 | 0 | LOG(INFO) << "Middle hash key: " << doc_key_hash; |
152 | 0 | partition_split_key = PartitionSchema::EncodeMultiColumnHashValue(doc_key_hash); |
153 | 0 | } |
154 | 0 | LOG(INFO) << "Partition split key: " << Slice(partition_split_key).ToDebugHexString(); |
155 | |
|
156 | 0 | return catalog_mgr->TEST_SplitTablet(tablet_id, encoded_split_key, partition_split_key); |
157 | 0 | } |
158 | | |
159 | | // |
160 | | // TabletSplitITestBase |
161 | | // |
162 | | |
163 | | // Need to define the static constexpr members as well. |
164 | | template<class MiniClusterType> |
165 | | constexpr std::chrono::duration<int64> TabletSplitITestBase<MiniClusterType>::kRpcTimeout; |
166 | | |
167 | | template<class MiniClusterType> |
168 | | constexpr int TabletSplitITestBase<MiniClusterType>::kDefaultNumRows; |
169 | | |
170 | | template<class MiniClusterType> |
171 | | constexpr size_t TabletSplitITestBase<MiniClusterType>::kDbBlockSizeBytes; |
172 | | |
173 | | template <class MiniClusterType> |
174 | 57 | void TabletSplitITestBase<MiniClusterType>::SetUp() { |
175 | 57 | this->SetNumTablets(3); |
176 | 57 | this->create_table_ = false; |
177 | 57 | this->mini_cluster_opt_.num_tablet_servers = GetRF(); |
178 | 57 | client::TransactionTestBase<MiniClusterType>::SetUp(); |
179 | 57 | proxy_cache_ = std::make_unique<rpc::ProxyCache>(this->client_->messenger()); |
180 | 57 | } yb::TabletSplitITestBase<yb::MiniCluster>::SetUp() Line | Count | Source | 174 | 49 | void TabletSplitITestBase<MiniClusterType>::SetUp() { | 175 | 49 | this->SetNumTablets(3); | 176 | 49 | this->create_table_ = false; | 177 | 49 | this->mini_cluster_opt_.num_tablet_servers = GetRF(); | 178 | 49 | client::TransactionTestBase<MiniClusterType>::SetUp(); | 179 | 49 | proxy_cache_ = std::make_unique<rpc::ProxyCache>(this->client_->messenger()); | 180 | 49 | } |
yb::TabletSplitITestBase<yb::ExternalMiniCluster>::SetUp() Line | Count | Source | 174 | 8 | void TabletSplitITestBase<MiniClusterType>::SetUp() { | 175 | 8 | this->SetNumTablets(3); | 176 | 8 | this->create_table_ = false; | 177 | 8 | this->mini_cluster_opt_.num_tablet_servers = GetRF(); | 178 | 8 | client::TransactionTestBase<MiniClusterType>::SetUp(); | 179 | 8 | proxy_cache_ = std::make_unique<rpc::ProxyCache>(this->client_->messenger()); | 180 | 8 | } |
|
181 | | |
182 | | template <class MiniClusterType> |
183 | | Result<tserver::ReadRequestPB> TabletSplitITestBase<MiniClusterType>::CreateReadRequest( |
184 | 0 | const TabletId& tablet_id, int32_t key) { |
185 | 0 | tserver::ReadRequestPB req; |
186 | 0 | auto op = client::CreateReadOp(key, this->table_, this->kValueColumn); |
187 | 0 | auto* ql_batch = req.add_ql_batch(); |
188 | 0 | *ql_batch = op->request(); |
189 | |
|
190 | 0 | std::string partition_key; |
191 | 0 | RETURN_NOT_OK(op->GetPartitionKey(&partition_key)); |
192 | 0 | const auto& hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
193 | 0 | ql_batch->set_hash_code(hash_code); |
194 | 0 | ql_batch->set_max_hash_code(hash_code); |
195 | 0 | req.set_tablet_id(tablet_id); |
196 | 0 | req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
197 | 0 | return req; |
198 | 0 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::CreateReadRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) Unexecuted instantiation: yb::TabletSplitITestBase<yb::ExternalMiniCluster>::CreateReadRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) |
199 | | |
200 | | template <class MiniClusterType> |
201 | | tserver::WriteRequestPB TabletSplitITestBase<MiniClusterType>::CreateInsertRequest( |
202 | 0 | const TabletId& tablet_id, int32_t key, int32_t value) { |
203 | 0 | tserver::WriteRequestPB req; |
204 | 0 | auto op = this->table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
205 | |
|
206 | 0 | { |
207 | 0 | auto op_req = op->mutable_request(); |
208 | 0 | QLAddInt32HashValue(op_req, key); |
209 | 0 | this->table_.AddInt32ColumnValue(op_req, this->kValueColumn, value); |
210 | 0 | } |
211 | |
|
212 | 0 | auto* ql_batch = req.add_ql_write_batch(); |
213 | 0 | *ql_batch = op->request(); |
214 | |
|
215 | 0 | std::string partition_key; |
216 | 0 | EXPECT_OK(op->GetPartitionKey(&partition_key)); |
217 | 0 | const auto& hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
218 | 0 | ql_batch->set_hash_code(hash_code); |
219 | 0 | req.set_tablet_id(tablet_id); |
220 | 0 | return req; |
221 | 0 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::CreateInsertRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int) Unexecuted instantiation: yb::TabletSplitITestBase<yb::ExternalMiniCluster>::CreateInsertRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int) |
222 | | |
223 | | template <class MiniClusterType> |
224 | | Result<std::pair<docdb::DocKeyHash, docdb::DocKeyHash>> |
225 | | TabletSplitITestBase<MiniClusterType>::WriteRows( |
226 | | client::TableHandle* table, const uint32_t num_rows, |
227 | 13 | const int32_t start_key, const int32_t start_value) { |
228 | 13 | auto min_hash_code = std::numeric_limits<docdb::DocKeyHash>::max(); |
229 | 13 | auto max_hash_code = std::numeric_limits<docdb::DocKeyHash>::min(); |
230 | | |
231 | 13 | LOG(INFO) << "Writing " << num_rows << " rows..."; |
232 | | |
233 | 13 | auto txn = this->CreateTransaction(); |
234 | 13 | auto session = this->CreateSession(txn); |
235 | 13 | for (int32_t i = start_key, v = start_value; |
236 | 18.5k | i < start_key + static_cast<int32_t>(num_rows); |
237 | 18.5k | ++i, ++v) { |
238 | 18.5k | client::YBqlWriteOpPtr op = VERIFY_RESULT( |
239 | 18.5k | client::kv_table_test::WriteRow(table, |
240 | 18.5k | session, |
241 | 18.5k | i /* key */, |
242 | 18.5k | v /* value */, |
243 | 18.5k | client::WriteOpType::INSERT, |
244 | 18.5k | client::Flush::kFalse)); |
245 | 0 | const auto hash_code = op->GetHashCode(); |
246 | 18.5k | min_hash_code = std::min(min_hash_code, hash_code); |
247 | 18.5k | max_hash_code = std::max(max_hash_code, hash_code); |
248 | 18.5k | YB_LOG_EVERY_N_SECS(INFO, 10) << "Rows written: " << start_key << "..." << i11 ; |
249 | 18.5k | } |
250 | 13 | RETURN_NOT_OK(session->Flush()); |
251 | 13 | if (txn) { |
252 | 13 | RETURN_NOT_OK(txn->CommitFuture().get()); |
253 | 13 | LOG(INFO) << "Committed: " << txn->id(); |
254 | 13 | } |
255 | | |
256 | 13 | LOG(INFO) << num_rows << " rows has been written"; |
257 | 13 | LOG(INFO) << "min_hash_code = " << min_hash_code; |
258 | 13 | LOG(INFO) << "max_hash_code = " << max_hash_code; |
259 | 13 | return std::make_pair(min_hash_code, max_hash_code); |
260 | 13 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::WriteRows(yb::client::TableHandle*, unsigned int, int, int) yb::TabletSplitITestBase<yb::ExternalMiniCluster>::WriteRows(yb::client::TableHandle*, unsigned int, int, int) Line | Count | Source | 227 | 13 | const int32_t start_key, const int32_t start_value) { | 228 | 13 | auto min_hash_code = std::numeric_limits<docdb::DocKeyHash>::max(); | 229 | 13 | auto max_hash_code = std::numeric_limits<docdb::DocKeyHash>::min(); | 230 | | | 231 | 13 | LOG(INFO) << "Writing " << num_rows << " rows..."; | 232 | | | 233 | 13 | auto txn = this->CreateTransaction(); | 234 | 13 | auto session = this->CreateSession(txn); | 235 | 13 | for (int32_t i = start_key, v = start_value; | 236 | 18.5k | i < start_key + static_cast<int32_t>(num_rows); | 237 | 18.5k | ++i, ++v) { | 238 | 18.5k | client::YBqlWriteOpPtr op = VERIFY_RESULT( | 239 | 18.5k | client::kv_table_test::WriteRow(table, | 240 | 18.5k | session, | 241 | 18.5k | i /* key */, | 242 | 18.5k | v /* value */, | 243 | 18.5k | client::WriteOpType::INSERT, | 244 | 18.5k | client::Flush::kFalse)); | 245 | 0 | const auto hash_code = op->GetHashCode(); | 246 | 18.5k | min_hash_code = std::min(min_hash_code, hash_code); | 247 | 18.5k | max_hash_code = std::max(max_hash_code, hash_code); | 248 | 18.5k | YB_LOG_EVERY_N_SECS(INFO, 10) << "Rows written: " << start_key << "..." << i11 ; | 249 | 18.5k | } | 250 | 13 | RETURN_NOT_OK(session->Flush()); | 251 | 13 | if (txn) { | 252 | 13 | RETURN_NOT_OK(txn->CommitFuture().get()); | 253 | 13 | LOG(INFO) << "Committed: " << txn->id(); | 254 | 13 | } | 255 | | | 256 | 13 | LOG(INFO) << num_rows << " rows has been written"; | 257 | 13 | LOG(INFO) << "min_hash_code = " << min_hash_code; | 258 | 13 | LOG(INFO) << "max_hash_code = " << max_hash_code; | 259 | 13 | return std::make_pair(min_hash_code, max_hash_code); | 260 | 13 | } |
|
261 | | |
262 | | template <class MiniClusterType> |
263 | 7 | Status TabletSplitITestBase<MiniClusterType>::FlushTestTable() { |
264 | 7 | return this->client_->FlushTables( |
265 | 7 | {this->table_->id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, |
266 | 7 | /* is_compaction = */ false); |
267 | 7 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::FlushTestTable() yb::TabletSplitITestBase<yb::ExternalMiniCluster>::FlushTestTable() Line | Count | Source | 263 | 7 | Status TabletSplitITestBase<MiniClusterType>::FlushTestTable() { | 264 | 7 | return this->client_->FlushTables( | 265 | 7 | {this->table_->id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, | 266 | 7 | /* is_compaction = */ false); | 267 | 7 | } |
|
268 | | |
269 | | template <class MiniClusterType> |
270 | | Result<std::pair<docdb::DocKeyHash, docdb::DocKeyHash>> |
271 | | TabletSplitITestBase<MiniClusterType>::WriteRowsAndFlush( |
272 | 7 | const uint32_t num_rows, const int32_t start_key) { |
273 | 7 | auto result = VERIFY_RESULT(WriteRows(num_rows, start_key)); |
274 | 7 | RETURN_NOT_OK(FlushTestTable()); |
275 | 7 | return result; |
276 | 7 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::WriteRowsAndFlush(unsigned int, int) yb::TabletSplitITestBase<yb::ExternalMiniCluster>::WriteRowsAndFlush(unsigned int, int) Line | Count | Source | 272 | 7 | const uint32_t num_rows, const int32_t start_key) { | 273 | 7 | auto result = VERIFY_RESULT(WriteRows(num_rows, start_key)); | 274 | 7 | RETURN_NOT_OK(FlushTestTable()); | 275 | 7 | return result; | 276 | 7 | } |
|
277 | | |
278 | | template <class MiniClusterType> |
279 | | Result<docdb::DocKeyHash> TabletSplitITestBase<MiniClusterType>::WriteRowsAndGetMiddleHashCode( |
280 | 0 | uint32_t num_rows) { |
281 | 0 | auto min_max_hash_code = VERIFY_RESULT(WriteRowsAndFlush(num_rows, 1)); |
282 | 0 | const auto split_hash_code = (min_max_hash_code.first + min_max_hash_code.second) / 2; |
283 | 0 | LOG(INFO) << "Split hash code: " << split_hash_code; |
284 | |
|
285 | 0 | RETURN_NOT_OK(CheckRowsCount(num_rows)); |
286 | | |
287 | 0 | return split_hash_code; |
288 | 0 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::WriteRowsAndGetMiddleHashCode(unsigned int) Unexecuted instantiation: yb::TabletSplitITestBase<yb::ExternalMiniCluster>::WriteRowsAndGetMiddleHashCode(unsigned int) |
289 | | |
290 | | template <class MiniClusterType> |
291 | | Result<scoped_refptr<master::TabletInfo>> |
292 | | TabletSplitITestBase<MiniClusterType>::GetSingleTestTabletInfo( |
293 | 0 | master::CatalogManagerIf* catalog_mgr) { |
294 | 0 | auto tablet_infos = catalog_mgr->GetTableInfo(this->table_->id())->GetTablets(); |
295 | |
|
296 | 0 | SCHECK_EQ(tablet_infos.size(), 1U, IllegalState, "Expect test table to have only 1 tablet"); |
297 | 0 | return tablet_infos.front(); |
298 | 0 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::GetSingleTestTabletInfo(yb::master::CatalogManagerIf*) Unexecuted instantiation: yb::TabletSplitITestBase<yb::ExternalMiniCluster>::GetSingleTestTabletInfo(yb::master::CatalogManagerIf*) |
299 | | |
300 | | template <class MiniClusterType> |
301 | 0 | void TabletSplitITestBase<MiniClusterType>::CheckTableKeysInRange(const size_t num_keys) { |
302 | 0 | client::TableHandle table; |
303 | 0 | ASSERT_OK(table.Open(client::kTableName, this->client_.get())); |
304 | | |
305 | 0 | std::vector<int32> keys; |
306 | 0 | for (const auto& row : client::TableRange(table)) { |
307 | 0 | keys.push_back(row.column(0).int32_value()); |
308 | 0 | } |
309 | |
|
310 | 0 | LOG(INFO) << "Total rows read: " << keys.size(); |
311 | |
|
312 | 0 | std::sort(keys.begin(), keys.end()); |
313 | 0 | int32 prev_key = 0; |
314 | 0 | for (const auto& key : keys) { |
315 | 0 | if (key != prev_key + 1) { |
316 | 0 | LOG(ERROR) << "Keys missed: " << prev_key + 1 << "..." << key - 1; |
317 | 0 | } |
318 | 0 | prev_key = key; |
319 | 0 | } |
320 | 0 | LOG(INFO) << "Last key: " << prev_key; |
321 | |
|
322 | 0 | ASSERT_EQ(prev_key, num_keys); |
323 | 0 | ASSERT_EQ(keys.size(), num_keys); |
324 | 0 | } Unexecuted instantiation: yb::TabletSplitITestBase<yb::MiniCluster>::CheckTableKeysInRange(unsigned long) Unexecuted instantiation: yb::TabletSplitITestBase<yb::ExternalMiniCluster>::CheckTableKeysInRange(unsigned long) |
325 | | |
326 | | template class TabletSplitITestBase<MiniCluster>; |
327 | | template class TabletSplitITestBase<ExternalMiniCluster>; |
328 | | |
329 | | // |
330 | | // TabletSplitITest |
331 | | // |
332 | | |
333 | 49 | TabletSplitITest::TabletSplitITest() = default; |
334 | 4 | TabletSplitITest::~TabletSplitITest() = default; |
335 | | |
336 | 49 | void TabletSplitITest::SetUp() { |
337 | 49 | FLAGS_cleanup_split_tablets_interval_sec = 1; |
338 | 49 | FLAGS_enable_automatic_tablet_splitting = false; |
339 | 49 | FLAGS_TEST_validate_all_tablet_candidates = true; |
340 | 49 | FLAGS_db_block_size_bytes = kDbBlockSizeBytes; |
341 | | // We set other block sizes to be small for following test reasons: |
342 | | // 1) To have more granular change of SST file size depending on number of rows written. |
343 | | // This helps to do splits earlier and have faster tests. |
344 | | // 2) To don't have long flushes when simulating slow compaction/flush. This way we can |
345 | | // test compaction abort faster. |
346 | 49 | FLAGS_db_filter_block_size_bytes = 2_KB; |
347 | 49 | FLAGS_db_index_block_size_bytes = 2_KB; |
348 | | // Split size threshold less than memstore size is not effective, because splits are triggered |
349 | | // based on flushed SST files size. |
350 | 49 | FLAGS_db_write_buffer_size = 100_KB; |
351 | 49 | TabletSplitITestBase<MiniCluster>::SetUp(); |
352 | 49 | snapshot_util_ = std::make_unique<client::SnapshotTestUtil>(); |
353 | 49 | snapshot_util_->SetProxy(&client_->proxy_cache()); |
354 | 49 | snapshot_util_->SetCluster(cluster_.get()); |
355 | 49 | } |
356 | | |
357 | 0 | Result<master::TabletInfos> TabletSplitITest::GetTabletInfosForTable(const TableId& table_id) { |
358 | 0 | return VERIFY_RESULT(catalog_manager())->GetTableInfo(table_id)->GetTablets(); |
359 | 0 | } |
360 | | |
361 | 0 | Result<TabletId> TabletSplitITest::CreateSingleTabletAndSplit(uint32_t num_rows) { |
362 | 0 | CreateSingleTablet(); |
363 | 0 | const auto split_hash_code = VERIFY_RESULT(WriteRowsAndGetMiddleHashCode(num_rows)); |
364 | 0 | return SplitTabletAndValidate(split_hash_code, num_rows); |
365 | 0 | } |
366 | | |
367 | 0 | Result<tserver::GetSplitKeyResponsePB> TabletSplitITest::GetSplitKey(const std::string& tablet_id) { |
368 | 0 | auto tserver = cluster_->mini_tablet_server(0); |
369 | 0 | auto ts_service_proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
370 | 0 | proxy_cache_.get(), HostPort::FromBoundEndpoint(tserver->bound_rpc_addr())); |
371 | 0 | tserver::GetSplitKeyRequestPB req; |
372 | 0 | req.set_tablet_id(tablet_id); |
373 | 0 | rpc::RpcController controller; |
374 | 0 | controller.set_timeout(kRpcTimeout); |
375 | 0 | tserver::GetSplitKeyResponsePB resp; |
376 | 0 | RETURN_NOT_OK(ts_service_proxy->GetSplitKey(req, &resp, &controller)); |
377 | 0 | return resp; |
378 | 0 | } |
379 | | |
380 | | Status TabletSplitITest::WaitForTabletSplitCompletion( |
381 | | const size_t expected_non_split_tablets, |
382 | | const size_t expected_split_tablets, |
383 | | size_t num_replicas_online, |
384 | | const client::YBTableName& table, |
385 | 0 | bool core_dump_on_failure) { |
386 | 0 | if (num_replicas_online == 0) { |
387 | 0 | num_replicas_online = FLAGS_replication_factor; |
388 | 0 | } |
389 | |
|
390 | 0 | LOG(INFO) << "Waiting for tablet split to be completed... "; |
391 | 0 | LOG(INFO) << "expected_non_split_tablets: " << expected_non_split_tablets; |
392 | 0 | LOG(INFO) << "expected_split_tablets: " << expected_split_tablets; |
393 | |
|
394 | 0 | const auto expected_total_tablets = expected_non_split_tablets + expected_split_tablets; |
395 | 0 | LOG(INFO) << "expected_total_tablets: " << expected_total_tablets; |
396 | |
|
397 | 0 | std::vector<tablet::TabletPeerPtr> peers; |
398 | 0 | auto s = WaitFor([&] { |
399 | 0 | peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
400 | 0 | size_t num_peers_running = 0; |
401 | 0 | size_t num_peers_split = 0; |
402 | 0 | size_t num_peers_leader_ready = 0; |
403 | 0 | for (const auto& peer : peers) { |
404 | 0 | const auto tablet = peer->shared_tablet(); |
405 | 0 | const auto consensus = peer->shared_consensus(); |
406 | 0 | if (!tablet || !consensus) { |
407 | 0 | break; |
408 | 0 | } |
409 | 0 | if (tablet->metadata()->table_name() != table.table_name() || |
410 | 0 | tablet->table_type() == TRANSACTION_STATUS_TABLE_TYPE) { |
411 | 0 | continue; |
412 | 0 | } |
413 | 0 | const auto raft_group_state = peer->state(); |
414 | 0 | const auto tablet_data_state = tablet->metadata()->tablet_data_state(); |
415 | 0 | const auto leader_status = consensus->GetLeaderStatus(/* allow_stale =*/true); |
416 | 0 | if (raft_group_state == tablet::RaftGroupStatePB::RUNNING) { |
417 | 0 | ++num_peers_running; |
418 | 0 | } else { |
419 | 0 | return false; |
420 | 0 | } |
421 | 0 | num_peers_leader_ready += leader_status == consensus::LeaderStatus::LEADER_AND_READY; |
422 | 0 | num_peers_split += |
423 | 0 | tablet_data_state == tablet::TabletDataState::TABLET_DATA_SPLIT_COMPLETED; |
424 | 0 | } |
425 | 0 | VLOG(1) << "num_peers_running: " << num_peers_running; |
426 | 0 | VLOG(1) << "num_peers_split: " << num_peers_split; |
427 | 0 | VLOG(1) << "num_peers_leader_ready: " << num_peers_leader_ready; |
428 | |
|
429 | 0 | return num_peers_running == num_replicas_online * expected_total_tablets && |
430 | 0 | num_peers_split == num_replicas_online * expected_split_tablets && |
431 | 0 | num_peers_leader_ready == expected_total_tablets; |
432 | 0 | }, split_completion_timeout_, "Wait for tablet split to be completed"); |
433 | 0 | if (!s.ok()) { |
434 | 0 | for (const auto& peer : peers) { |
435 | 0 | const auto tablet = peer->shared_tablet(); |
436 | 0 | const auto consensus = peer->shared_consensus(); |
437 | 0 | if (!tablet || !consensus) { |
438 | 0 | LOG(INFO) << consensus::MakeTabletLogPrefix(peer->tablet_id(), peer->permanent_uuid()) |
439 | 0 | << "no tablet"; |
440 | 0 | continue; |
441 | 0 | } |
442 | 0 | if (tablet->table_type() == TRANSACTION_STATUS_TABLE_TYPE) { |
443 | 0 | continue; |
444 | 0 | } |
445 | 0 | LOG(INFO) << consensus::MakeTabletLogPrefix(peer->tablet_id(), peer->permanent_uuid()) |
446 | 0 | << "raft_group_state: " << AsString(peer->state()) |
447 | 0 | << " tablet_data_state: " |
448 | 0 | << TabletDataState_Name(tablet->metadata()->tablet_data_state()) |
449 | 0 | << " leader status: " |
450 | 0 | << AsString(consensus->GetLeaderStatus(/* allow_stale =*/true)); |
451 | 0 | } |
452 | 0 | if (core_dump_on_failure) { |
453 | 0 | LOG(INFO) << "Tablet splitting did not complete. Crashing test with core dump. " |
454 | 0 | << "Received error: " << s.ToString(); |
455 | 0 | raise(SIGSEGV); |
456 | 0 | } else { |
457 | 0 | LOG(INFO) << "Tablet splitting did not complete. Received error: " << s.ToString(); |
458 | 0 | return s; |
459 | 0 | } |
460 | 0 | } |
461 | 0 | LOG(INFO) << "Waiting for tablet split to be completed - DONE"; |
462 | |
|
463 | 0 | DumpTableLocations(VERIFY_RESULT(catalog_manager()), table); |
464 | 0 | return Status::OK(); |
465 | 0 | } |
466 | | |
467 | 0 | Result<TabletId> TabletSplitITest::SplitSingleTablet(docdb::DocKeyHash split_hash_code) { |
468 | 0 | auto* catalog_mgr = VERIFY_RESULT(catalog_manager()); |
469 | | |
470 | 0 | auto source_tablet_info = VERIFY_RESULT(GetSingleTestTabletInfo(catalog_mgr)); |
471 | 0 | const auto source_tablet_id = source_tablet_info->id(); |
472 | |
|
473 | 0 | RETURN_NOT_OK(catalog_mgr->TEST_SplitTablet(source_tablet_info, split_hash_code)); |
474 | 0 | return source_tablet_id; |
475 | 0 | } |
476 | | |
477 | | Result<TabletId> TabletSplitITest::SplitTabletAndValidate( |
478 | | docdb::DocKeyHash split_hash_code, |
479 | | size_t num_rows, |
480 | 0 | bool parent_tablet_protected_from_deletion) { |
481 | 0 | auto source_tablet_id = VERIFY_RESULT(SplitSingleTablet(split_hash_code)); |
482 | | |
483 | | // If the parent tablet will not be deleted, then we will expect another tablet at the end. |
484 | 0 | const auto expected_split_tablets = |
485 | 0 | (FLAGS_TEST_skip_deleting_split_tablets || parent_tablet_protected_from_deletion) ? 1 : 0; |
486 | |
|
487 | 0 | RETURN_NOT_OK( |
488 | 0 | WaitForTabletSplitCompletion(/* expected_non_split_tablets =*/2, expected_split_tablets)); |
489 | | |
490 | 0 | RETURN_NOT_OK(CheckPostSplitTabletReplicasData(num_rows)); |
491 | | |
492 | 0 | if (expected_split_tablets > 0) { |
493 | 0 | RETURN_NOT_OK(CheckSourceTabletAfterSplit(source_tablet_id)); |
494 | 0 | } |
495 | | |
496 | 0 | return source_tablet_id; |
497 | 0 | } |
498 | | |
499 | 0 | Status TabletSplitITest::CheckSourceTabletAfterSplit(const TabletId& source_tablet_id) { |
500 | 0 | LOG(INFO) << "Checking source tablet behavior after split..."; |
501 | 0 | google::FlagSaver saver; |
502 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_do_not_start_election_test_only) = true; |
503 | |
|
504 | 0 | size_t tablet_split_insert_error_count = 0; |
505 | 0 | size_t not_the_leader_insert_error_count = 0; |
506 | 0 | size_t ts_online_count = 0; |
507 | 0 | for (auto mini_ts : this->cluster_->mini_tablet_servers()) { |
508 | 0 | if (!mini_ts->is_started()) { |
509 | 0 | continue; |
510 | 0 | } |
511 | 0 | ++ts_online_count; |
512 | 0 | auto ts_service_proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
513 | 0 | proxy_cache_.get(), HostPort::FromBoundEndpoint(mini_ts->bound_rpc_addr())); |
514 | |
|
515 | 0 | { |
516 | 0 | tserver::ReadRequestPB req = VERIFY_RESULT(CreateReadRequest(source_tablet_id, 1 /* key */)); |
517 | | |
518 | 0 | rpc::RpcController controller; |
519 | 0 | controller.set_timeout(kRpcTimeout); |
520 | 0 | tserver::ReadResponsePB resp; |
521 | 0 | RETURN_NOT_OK(ts_service_proxy->Read(req, &resp, &controller)); |
522 | | |
523 | 0 | SCHECK(resp.has_error(), InternalError, "Expected error on read from split tablet"); |
524 | 0 | SCHECK_EQ( |
525 | 0 | resp.error().code(), |
526 | 0 | tserver::TabletServerErrorPB::TABLET_SPLIT, |
527 | 0 | InternalError, |
528 | 0 | "Expected error on read from split tablet to be " |
529 | 0 | "tserver::TabletServerErrorPB::TABLET_SPLIT"); |
530 | 0 | } |
531 | | |
532 | 0 | { |
533 | 0 | tserver::WriteRequestPB req = |
534 | 0 | CreateInsertRequest(source_tablet_id, 0 /* key */, 0 /* value */); |
535 | |
|
536 | 0 | rpc::RpcController controller; |
537 | 0 | controller.set_timeout(kRpcTimeout); |
538 | 0 | tserver::WriteResponsePB resp; |
539 | 0 | RETURN_NOT_OK(ts_service_proxy->Write(req, &resp, &controller)); |
540 | | |
541 | 0 | SCHECK(resp.has_error(), InternalError, "Expected error on write to split tablet"); |
542 | 0 | LOG(INFO) << "Error: " << AsString(resp.error()); |
543 | 0 | switch (resp.error().code()) { |
544 | 0 | case tserver::TabletServerErrorPB::TABLET_SPLIT: |
545 | 0 | SCHECK_EQ( |
546 | 0 | resp.error().status().code(), |
547 | 0 | AppStatusPB::ILLEGAL_STATE, |
548 | 0 | InternalError, |
549 | 0 | "tserver::TabletServerErrorPB::TABLET_SPLIT error should have " |
550 | 0 | "AppStatusPB::ILLEGAL_STATE on write to split tablet"); |
551 | 0 | tablet_split_insert_error_count++; |
552 | 0 | break; |
553 | 0 | case tserver::TabletServerErrorPB::NOT_THE_LEADER: |
554 | 0 | not_the_leader_insert_error_count++; |
555 | 0 | break; |
556 | 0 | case tserver::TabletServerErrorPB::TABLET_NOT_FOUND: |
557 | | // In the case that the source tablet was just hidden instead of deleted. |
558 | 0 | tablet_split_insert_error_count++; |
559 | 0 | break; |
560 | 0 | default: |
561 | 0 | return STATUS_FORMAT(InternalError, "Unexpected error: $0", resp.error()); |
562 | 0 | } |
563 | 0 | } |
564 | 0 | } |
565 | 0 | SCHECK_EQ( |
566 | 0 | tablet_split_insert_error_count, 1U, InternalError, |
567 | 0 | "Leader should return \"try again\" error on insert."); |
568 | 0 | SCHECK_EQ( |
569 | 0 | not_the_leader_insert_error_count, ts_online_count - 1, InternalError, |
570 | 0 | "Followers should return \"not the leader\" error."); |
571 | 0 | return Status::OK(); |
572 | 0 | } |
573 | | |
574 | 0 | Result<std::vector<tablet::TabletPeerPtr>> TabletSplitITest::ListSplitCompleteTabletPeers() { |
575 | 0 | return ListTableInactiveSplitTabletPeers(this->cluster_.get(), VERIFY_RESULT(GetTestTableId())); |
576 | 0 | } |
577 | | |
578 | 0 | Result<std::vector<tablet::TabletPeerPtr>> TabletSplitITest::ListPostSplitChildrenTabletPeers() { |
579 | 0 | return ListTableActiveTabletPeers(this->cluster_.get(), VERIFY_RESULT(GetTestTableId())); |
580 | 0 | } |
581 | | |
582 | 0 | Status TabletSplitITest::WaitForTestTablePostSplitTabletsFullyCompacted(MonoDelta timeout) { |
583 | 0 | auto peer_to_str = [](const tablet::TabletPeerPtr& peer) { |
584 | 0 | return peer->LogPrefix() + |
585 | 0 | (peer->tablet_metadata()->has_been_fully_compacted() ? "Compacted" : "NotCompacted"); |
586 | 0 | }; |
587 | 0 | std::vector<std::string> not_compacted_peers; |
588 | 0 | auto s = LoggedWaitFor( |
589 | 0 | [this, ¬_compacted_peers, &peer_to_str]() -> Result<bool> { |
590 | 0 | auto peers = ListPostSplitChildrenTabletPeers(); |
591 | 0 | if (!peers.ok()) { |
592 | 0 | return false; |
593 | 0 | } |
594 | 0 | LOG(INFO) << "Verifying post-split tablet peers:\n" |
595 | 0 | << JoinStrings(*peers | boost::adaptors::transformed(peer_to_str), "\n"); |
596 | 0 | not_compacted_peers.clear(); |
597 | 0 | for (auto peer : *peers) { |
598 | 0 | if (!peer->tablet_metadata()->has_been_fully_compacted()) { |
599 | 0 | not_compacted_peers.push_back(peer_to_str(peer)); |
600 | 0 | } |
601 | 0 | } |
602 | 0 | return not_compacted_peers.empty(); |
603 | 0 | }, |
604 | 0 | timeout, "Wait for post tablet split compaction to be completed"); |
605 | 0 | if (!s.ok()) { |
606 | 0 | LOG(ERROR) << "Following post-split tablet peers have not been fully compacted:\n" |
607 | 0 | << JoinStrings(not_compacted_peers, "\n"); |
608 | 0 | } |
609 | 0 | return s; |
610 | 0 | } |
611 | | |
612 | 0 | Result<int> TabletSplitITest::NumPostSplitTabletPeersFullyCompacted() { |
613 | 0 | int count = 0; |
614 | 0 | for (auto peer : VERIFY_RESULT(ListPostSplitChildrenTabletPeers())) { |
615 | 0 | const auto* tablet = peer->tablet(); |
616 | 0 | if (tablet->metadata()->has_been_fully_compacted()) { |
617 | 0 | ++count; |
618 | 0 | } |
619 | 0 | } |
620 | 0 | return count; |
621 | 0 | } |
622 | | |
623 | 0 | Result<uint64_t> TabletSplitITest::GetMinSstFileSizeAmongAllReplicas(const std::string& tablet_id) { |
624 | 0 | const auto test_table_id = VERIFY_RESULT(GetTestTableId()); |
625 | 0 | auto peers = ListTabletPeers(this->cluster_.get(), [&tablet_id](auto peer) { |
626 | 0 | return peer->tablet_id() == tablet_id; |
627 | 0 | }); |
628 | 0 | if (peers.size() == 0) { |
629 | 0 | return STATUS(IllegalState, "Table has no active peer tablets"); |
630 | 0 | } |
631 | 0 | uint64_t min_file_size = std::numeric_limits<uint64_t>::max(); |
632 | 0 | for (const auto& peer : peers) { |
633 | 0 | min_file_size = std::min( |
634 | 0 | min_file_size, |
635 | 0 | peer->shared_tablet()->GetCurrentVersionSstFilesSize()); |
636 | 0 | } |
637 | 0 | return min_file_size; |
638 | 0 | } |
639 | | |
640 | | Status TabletSplitITest::CheckPostSplitTabletReplicasData( |
641 | 0 | size_t num_rows, size_t num_replicas_online, size_t num_active_tablets) { |
642 | 0 | LOG(INFO) << "Checking post-split tablet replicas data..."; |
643 | |
|
644 | 0 | if (num_replicas_online == 0) { |
645 | 0 | num_replicas_online = FLAGS_replication_factor; |
646 | 0 | } |
647 | |
|
648 | 0 | const auto test_table_id = VERIFY_RESULT(GetTestTableId()); |
649 | 0 | auto active_leader_peers = VERIFY_RESULT(WaitForTableActiveTabletLeadersPeers( |
650 | 0 | this->cluster_.get(), test_table_id, num_active_tablets)); |
651 | | |
652 | 0 | std::unordered_map<TabletId, OpId> last_on_leader; |
653 | 0 | for (auto peer : active_leader_peers) { |
654 | 0 | last_on_leader[peer->tablet_id()] = peer->shared_consensus()->GetLastReceivedOpId(); |
655 | 0 | } |
656 | |
|
657 | 0 | const auto active_peers = ListTableActiveTabletPeers(this->cluster_.get(), test_table_id); |
658 | |
|
659 | 0 | std::vector<size_t> keys(num_rows, num_replicas_online); |
660 | 0 | std::unordered_map<size_t, std::vector<std::string>> key_replicas; |
661 | 0 | const auto key_column_id = this->table_.ColumnId(this->kKeyColumn); |
662 | 0 | const auto value_column_id = this->table_.ColumnId(this->kValueColumn); |
663 | 0 | for (auto peer : active_peers) { |
664 | 0 | RETURN_NOT_OK(LoggedWaitFor( |
665 | 0 | [&] { |
666 | 0 | return peer->shared_consensus()->GetLastAppliedOpId() >= |
667 | 0 | last_on_leader[peer->tablet_id()]; |
668 | 0 | }, |
669 | 0 | 15s * kTimeMultiplier, |
670 | 0 | Format( |
671 | 0 | "Waiting for tablet replica $0 to apply all ops from leader ...", peer->LogPrefix()))); |
672 | 0 | LOG(INFO) << "Last applied op id for " << peer->LogPrefix() << ": " |
673 | 0 | << AsString(peer->shared_consensus()->GetLastAppliedOpId()); |
674 | |
|
675 | 0 | const auto shared_tablet = peer->shared_tablet(); |
676 | 0 | const SchemaPtr schema = shared_tablet->metadata()->schema(); |
677 | 0 | auto client_schema = schema->CopyWithoutColumnIds(); |
678 | 0 | auto iter = VERIFY_RESULT(shared_tablet->NewRowIterator(client_schema)); |
679 | 0 | QLTableRow row; |
680 | 0 | std::unordered_set<size_t> tablet_keys; |
681 | 0 | while (VERIFY_RESULT(iter->HasNext())) { |
682 | 0 | RETURN_NOT_OK(iter->NextRow(&row)); |
683 | 0 | auto key_opt = row.GetValue(key_column_id); |
684 | 0 | SCHECK(key_opt.is_initialized(), InternalError, "Key is not initialized"); |
685 | 0 | SCHECK_EQ(key_opt, row.GetValue(value_column_id), InternalError, "Wrong value for key"); |
686 | 0 | auto key = key_opt->int32_value(); |
687 | 0 | SCHECK( |
688 | 0 | tablet_keys.insert(key).second, |
689 | 0 | InternalError, |
690 | 0 | Format("Duplicate key $0 in tablet $1", key, shared_tablet->tablet_id())); |
691 | 0 | SCHECK_GT( |
692 | 0 | keys[key - 1]--, |
693 | 0 | 0U, |
694 | 0 | InternalError, |
695 | 0 | Format("Extra key $0 in tablet $1", key, shared_tablet->tablet_id())); |
696 | 0 | key_replicas[key - 1].push_back(peer->LogPrefix()); |
697 | 0 | } |
698 | 0 | } |
699 | 0 | for (size_t key = 1; key <= num_rows; ++key) { |
700 | 0 | const auto key_missing_in_replicas = keys[key - 1]; |
701 | 0 | if (key_missing_in_replicas > 0) { |
702 | 0 | LOG(INFO) << Format("Key $0 replicas: $1", key, key_replicas[key - 1]); |
703 | 0 | return STATUS_FORMAT( |
704 | 0 | InternalError, "Missing key: $0 in $1 replicas", key, key_missing_in_replicas); |
705 | 0 | } |
706 | 0 | } |
707 | 0 | return Status::OK(); |
708 | 0 | } |
709 | | |
710 | | // |
711 | | // TabletSplitExternalMiniClusterITest |
712 | | // |
713 | | |
714 | 8 | void TabletSplitExternalMiniClusterITest::SetFlags() { |
715 | 8 | TabletSplitITestBase<ExternalMiniCluster>::SetFlags(); |
716 | 8 | for (const auto& master_flag : { |
717 | 8 | "--enable_automatic_tablet_splitting=false", |
718 | 8 | "--tablet_split_low_phase_shard_count_per_node=-1", |
719 | 8 | "--tablet_split_high_phase_shard_count_per_node=-1", |
720 | 8 | "--tablet_split_low_phase_size_threshold_bytes=-1", |
721 | 8 | "--tablet_split_high_phase_size_threshold_bytes=-1", |
722 | 8 | "--tablet_force_split_threshold_bytes=-1", |
723 | 48 | }) { |
724 | 48 | mini_cluster_opt_.extra_master_flags.push_back(master_flag); |
725 | 48 | } |
726 | | |
727 | 8 | for (const auto& tserver_flag : std::initializer_list<std::string>{ |
728 | 8 | Format("--db_block_size_bytes=$0", kDbBlockSizeBytes), |
729 | 8 | "--cleanup_split_tablets_interval_sec=1", |
730 | 8 | "--tserver_heartbeat_metrics_interval_ms=100", |
731 | 24 | }) { |
732 | 24 | mini_cluster_opt_.extra_tserver_flags.push_back(tserver_flag); |
733 | 24 | } |
734 | 8 | } |
735 | | |
736 | 66 | Status TabletSplitExternalMiniClusterITest::SplitTablet(const std::string& tablet_id) { |
737 | 66 | master::SplitTabletRequestPB req; |
738 | 66 | req.set_tablet_id(tablet_id); |
739 | 66 | master::SplitTabletResponsePB resp; |
740 | 66 | rpc::RpcController rpc; |
741 | 66 | rpc.set_timeout(30s * kTimeMultiplier); |
742 | | |
743 | 66 | RETURN_NOT_OK(cluster_->GetMasterProxy<master::MasterAdminProxy>().SplitTablet(req, &resp, &rpc)); |
744 | 66 | if (resp.has_error()) { |
745 | 56 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
746 | 56 | } |
747 | 10 | return Status::OK(); |
748 | 66 | } |
749 | | |
750 | | Status TabletSplitExternalMiniClusterITest::FlushTabletsOnSingleTServer( |
751 | 5 | size_t tserver_idx, const std::vector<yb::TabletId> tablet_ids, bool is_compaction) { |
752 | 5 | auto tserver = cluster_->tablet_server(tserver_idx); |
753 | 5 | RETURN_NOT_OK(cluster_->FlushTabletsOnSingleTServer(tserver, tablet_ids, is_compaction)); |
754 | 5 | return Status::OK(); |
755 | 5 | } |
756 | | |
757 | | Result<std::set<TabletId>> TabletSplitExternalMiniClusterITest::GetTestTableTabletIds( |
758 | 825 | size_t tserver_idx) { |
759 | 825 | std::set<TabletId> tablet_ids; |
760 | 825 | auto res = VERIFY_RESULT(cluster_->GetTablets(cluster_->tablet_server(tserver_idx))); |
761 | 3.59k | for (const auto& tablet : res) { |
762 | 3.59k | if (tablet.table_name() == table_->name().table_name()) { |
763 | 1.12k | tablet_ids.insert(tablet.tablet_id()); |
764 | 1.12k | } |
765 | 3.59k | } |
766 | 825 | return tablet_ids; |
767 | 825 | } |
768 | | |
769 | 274 | Result<std::set<TabletId>> TabletSplitExternalMiniClusterITest::GetTestTableTabletIds() { |
770 | 274 | std::set<TabletId> tablet_ids; |
771 | 1.09k | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i822 ) { |
772 | 822 | if (cluster_->tablet_server(i)->IsShutdown()) { |
773 | 3 | continue; |
774 | 3 | } |
775 | 819 | auto res = VERIFY_RESULT(GetTestTableTabletIds(i)); |
776 | 1.10k | for (const auto& id : res) { |
777 | 1.10k | tablet_ids.insert(id); |
778 | 1.10k | } |
779 | 819 | } |
780 | 274 | return tablet_ids; |
781 | 274 | } |
782 | | |
783 | | Result<vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB>> |
784 | 3 | TabletSplitExternalMiniClusterITest::ListTablets(size_t tserver_idx) { |
785 | 3 | vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB> tablets; |
786 | 3 | std::set<TabletId> tablet_ids; |
787 | 3 | auto res = VERIFY_RESULT(cluster_->ListTablets(cluster_->tablet_server(tserver_idx))); |
788 | 15 | for (const auto& tablet : res.status_and_schema()) { |
789 | 15 | auto tablet_id = tablet.tablet_status().tablet_id(); |
790 | 15 | if (tablet.tablet_status().table_name() == table_->name().table_name() && |
791 | 15 | tablet_ids.find(tablet_id) == tablet_ids.end()6 ) { |
792 | 6 | tablets.push_back(tablet); |
793 | 6 | tablet_ids.insert(tablet_id); |
794 | 6 | } |
795 | 15 | } |
796 | 3 | return tablets; |
797 | 3 | } |
798 | | |
799 | | Result<vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB>> |
800 | 1 | TabletSplitExternalMiniClusterITest::ListTablets() { |
801 | 1 | vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB> tablets; |
802 | 1 | std::set<TabletId> tablet_ids; |
803 | 4 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i3 ) { |
804 | 3 | auto res = VERIFY_RESULT(ListTablets(i)); |
805 | 6 | for (const auto& tablet : res) { |
806 | 6 | auto tablet_id = tablet.tablet_status().tablet_id(); |
807 | 6 | if (tablet_ids.find(tablet_id) == tablet_ids.end()) { |
808 | 2 | tablets.push_back(tablet); |
809 | 2 | tablet_ids.insert(tablet_id); |
810 | 2 | } |
811 | 6 | } |
812 | 3 | } |
813 | 1 | return tablets; |
814 | 1 | } |
815 | | |
816 | | Status TabletSplitExternalMiniClusterITest::WaitForTabletsExcept( |
817 | 5 | size_t num_tablets, size_t tserver_idx, const TabletId& exclude_tablet) { |
818 | 5 | std::set<TabletId> tablets; |
819 | 5 | auto status = WaitFor( |
820 | 5 | [&]() -> Result<bool> { |
821 | 5 | tablets = VERIFY_RESULT(GetTestTableTabletIds(tserver_idx)); |
822 | 0 | size_t count = 0; |
823 | 14 | for (auto& tablet_id : tablets) { |
824 | 14 | if (tablet_id != exclude_tablet) { |
825 | 14 | count++; |
826 | 14 | } |
827 | 14 | } |
828 | 5 | return count == num_tablets; |
829 | 5 | }, |
830 | 5 | 20s * kTimeMultiplier, |
831 | 5 | Format( |
832 | 5 | "Waiting for tablet count: $0 at tserver: $1", |
833 | 5 | num_tablets, |
834 | 5 | cluster_->tablet_server(tserver_idx)->uuid())); |
835 | 5 | if (!status.ok()) { |
836 | 0 | status = status.CloneAndAppend(Format("Got tablets: $0", tablets)); |
837 | 0 | } |
838 | 5 | return status; |
839 | 5 | } |
840 | | |
841 | 5 | Status TabletSplitExternalMiniClusterITest::WaitForTablets(size_t num_tablets, size_t tserver_idx) { |
842 | 5 | return WaitForTabletsExcept(num_tablets, tserver_idx, ""); |
843 | 5 | } |
844 | | |
845 | 10 | Status TabletSplitExternalMiniClusterITest::WaitForTablets(size_t num_tablets) { |
846 | 10 | std::set<TabletId> tablets; |
847 | 267 | auto status = WaitFor([&]() -> Result<bool> { |
848 | 267 | tablets = VERIFY_RESULT(GetTestTableTabletIds()); |
849 | 0 | return tablets.size() == num_tablets; |
850 | 267 | }, 20s * kTimeMultiplier, Format("Waiting for tablet count: $0", num_tablets)); |
851 | 10 | if (!status.ok()) { |
852 | 2 | status = status.CloneAndAppend(Format("Got tablets: $0", tablets)); |
853 | 2 | } |
854 | 10 | return status; |
855 | 10 | } |
856 | | |
857 | 1 | Result<TabletId> TabletSplitExternalMiniClusterITest::GetOnlyTestTabletId(size_t tserver_idx) { |
858 | 1 | auto tablet_ids = VERIFY_RESULT(GetTestTableTabletIds(tserver_idx)); |
859 | 1 | if (tablet_ids.size() != 1) { |
860 | 0 | return STATUS(InternalError, "Expected one tablet"); |
861 | 0 | } |
862 | 1 | return *tablet_ids.begin(); |
863 | 1 | } |
864 | | |
865 | 7 | Result<TabletId> TabletSplitExternalMiniClusterITest::GetOnlyTestTabletId() { |
866 | 7 | auto tablet_ids = VERIFY_RESULT(GetTestTableTabletIds()); |
867 | 7 | if (tablet_ids.size() != 1) { |
868 | 0 | return STATUS(InternalError, Format("Expected one tablet, got $0", tablet_ids.size())); |
869 | 0 | } |
870 | 7 | return *tablet_ids.begin(); |
871 | 7 | } |
872 | | |
873 | | Status TabletSplitExternalMiniClusterITest::SplitTabletCrashMaster( |
874 | 2 | bool change_split_boundary, string* split_partition_key) { |
875 | 2 | CreateSingleTablet(); |
876 | 2 | int key = 1, num_rows = 2000; |
877 | 2 | RETURN_NOT_OK(WriteRowsAndFlush(num_rows, key)); |
878 | 2 | key += num_rows; |
879 | 2 | auto tablet_id = CHECK_RESULT(GetOnlyTestTabletId()); |
880 | | |
881 | 2 | RETURN_NOT_OK(cluster_->SetFlagOnMasters("TEST_crash_after_creating_single_split_tablet", "1.0")); |
882 | | // Split tablet should crash before creating either tablet |
883 | 2 | if (split_partition_key) { |
884 | 1 | auto res = VERIFY_RESULT(cluster_->GetSplitKey(tablet_id)); |
885 | 0 | *split_partition_key = res.split_partition_key(); |
886 | 1 | } |
887 | 2 | RETURN_NOT_OK(SplitTablet(tablet_id)); |
888 | 2 | auto status = WaitForTablets(3); |
889 | 2 | if (status.ok()) { |
890 | 0 | return STATUS(IllegalState, "Tablet should not have split"); |
891 | 0 | } |
892 | | |
893 | 2 | RETURN_NOT_OK(RestartAllMasters(cluster_.get())); |
894 | 2 | RETURN_NOT_OK(cluster_->SetFlagOnMasters("TEST_crash_after_creating_single_split_tablet", "0.0")); |
895 | | |
896 | 2 | if (change_split_boundary) { |
897 | 1 | RETURN_NOT_OK(WriteRows(num_rows * 2, key)); |
898 | 4 | for (size_t i = 0; 1 i < cluster_->num_tablet_servers(); i++3 ) { |
899 | 3 | RETURN_NOT_OK(FlushTabletsOnSingleTServer(i, {tablet_id}, false)); |
900 | 3 | } |
901 | 1 | } |
902 | | |
903 | | // Wait for tablet split to complete |
904 | 2 | auto raft_heartbeat_roundtrip_time = FLAGS_raft_heartbeat_interval_ms * 2ms; |
905 | 2 | RETURN_NOT_OK(LoggedWaitFor( |
906 | 2 | [this, tablet_id]() -> Result<bool> { |
907 | 2 | auto status = SplitTablet(tablet_id); |
908 | 2 | if (!status.ok()) { |
909 | 2 | return false; |
910 | 2 | } |
911 | 2 | return WaitForTablets(3).ok(); |
912 | 2 | }, |
913 | 2 | 5 * raft_heartbeat_roundtrip_time * kTimeMultiplier |
914 | 2 | + 2ms * FLAGS_tserver_heartbeat_metrics_interval_ms, |
915 | 2 | Format("Wait for tablet to be split: $0", tablet_id))); |
916 | | |
917 | | // Wait for parent tablet clean up |
918 | 2 | std::this_thread::sleep_for(5 * raft_heartbeat_roundtrip_time * kTimeMultiplier); |
919 | 2 | RETURN_NOT_OK(WaitForTablets(2)); |
920 | | |
921 | 2 | return Status::OK(); |
922 | 2 | } |
923 | | |
924 | | } // namespace yb |