/Users/deen/code/yugabyte-db/src/yb/integration-tests/create-table-stress-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <memory> |
34 | | #include <thread> |
35 | | |
36 | | #include <glog/logging.h> |
37 | | #include <glog/stl_logging.h> |
38 | | #include <gtest/gtest.h> |
39 | | |
40 | | #include "yb/client/client.h" |
41 | | #include "yb/client/schema.h" |
42 | | #include "yb/client/table_creator.h" |
43 | | |
44 | | #include "yb/common/partition.h" |
45 | | #include "yb/common/wire_protocol.h" |
46 | | |
47 | | #include "yb/consensus/consensus.proxy.h" |
48 | | |
49 | | #include "yb/fs/fs_manager.h" |
50 | | |
51 | | #include "yb/integration-tests/cluster_itest_util.h" |
52 | | #include "yb/integration-tests/mini_cluster.h" |
53 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
54 | | |
55 | | #include "yb/master/catalog_entity_info.h" |
56 | | #include "yb/master/catalog_manager_if.h" |
57 | | #include "yb/master/master-test-util.h" |
58 | | #include "yb/master/master.h" |
59 | | #include "yb/master/master_client.proxy.h" |
60 | | #include "yb/master/master_cluster.proxy.h" |
61 | | #include "yb/master/master_heartbeat.proxy.h" |
62 | | #include "yb/master/mini_master.h" |
63 | | |
64 | | #include "yb/rpc/messenger.h" |
65 | | #include "yb/rpc/proxy.h" |
66 | | #include "yb/rpc/rpc_controller.h" |
67 | | #include "yb/rpc/rpc_test_util.h" |
68 | | |
69 | | #include "yb/tserver/mini_tablet_server.h" |
70 | | #include "yb/tserver/tablet_server.h" |
71 | | #include "yb/tserver/ts_tablet_manager.h" |
72 | | #include "yb/tserver/tserver_service.proxy.h" |
73 | | |
74 | | #include "yb/util/hdr_histogram.h" |
75 | | #include "yb/util/metrics.h" |
76 | | #include "yb/util/scope_exit.h" |
77 | | #include "yb/util/spinlock_profiling.h" |
78 | | #include "yb/util/status_log.h" |
79 | | #include "yb/util/stopwatch.h" |
80 | | #include "yb/util/test_util.h" |
81 | | #include "yb/util/tsan_util.h" |
82 | | |
83 | | using yb::client::YBClient; |
84 | | using yb::client::YBClientBuilder; |
85 | | using yb::client::YBSchema; |
86 | | using yb::client::YBSchemaBuilder; |
87 | | using yb::client::YBTableCreator; |
88 | | using yb::client::YBTableName; |
89 | | using yb::itest::CreateTabletServerMap; |
90 | | using yb::itest::TabletServerMap; |
91 | | using yb::rpc::Messenger; |
92 | | using yb::rpc::MessengerBuilder; |
93 | | using yb::rpc::RpcController; |
94 | | |
95 | | DECLARE_int32(heartbeat_interval_ms); |
96 | | DECLARE_bool(log_preallocate_segments); |
97 | | DECLARE_bool(TEST_enable_remote_bootstrap); |
98 | | DECLARE_int32(tserver_unresponsive_timeout_ms); |
99 | | DECLARE_int32(max_create_tablets_per_ts); |
100 | | DECLARE_int32(tablet_report_limit); |
101 | | DECLARE_uint64(TEST_inject_latency_during_tablet_report_ms); |
102 | | DECLARE_int32(heartbeat_rpc_timeout_ms); |
103 | | DECLARE_int32(catalog_manager_report_batch_size); |
104 | | DECLARE_int32(tablet_report_limit); |
105 | | |
106 | | DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test"); |
107 | | DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to run the benchmark"); |
108 | | DEFINE_int32(benchmark_num_threads, 16, "Number of threads to run the benchmark"); |
109 | | // Increase this for actually using this as a benchmark test. |
110 | | DEFINE_int32(benchmark_num_tablets, 8, "Number of tablets to create"); |
111 | | |
112 | | METRIC_DECLARE_histogram(handler_latency_yb_master_MasterClient_GetTableLocations); |
113 | | |
114 | | using std::string; |
115 | | using std::vector; |
116 | | using std::thread; |
117 | | using std::unique_ptr; |
118 | | using strings::Substitute; |
119 | | |
120 | | namespace yb { |
121 | | |
122 | | class CreateTableStressTest : public YBMiniClusterTestBase<MiniCluster> { |
123 | | public: |
124 | 9 | CreateTableStressTest() { |
125 | 9 | YBSchemaBuilder b; |
126 | 9 | b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey(); |
127 | 9 | b.AddColumn("v1")->Type(INT64)->NotNull(); |
128 | 9 | b.AddColumn("v2")->Type(STRING)->NotNull(); |
129 | 9 | CHECK_OK(b.Build(&schema_)); |
130 | 9 | } |
131 | | |
132 | 9 | void SetUp() override { |
133 | | // Make heartbeats faster to speed test runtime. |
134 | 9 | FLAGS_heartbeat_interval_ms = 10; |
135 | | |
136 | | // Don't preallocate log segments, since we're creating thousands |
137 | | // of tablets here. If each preallocates 64M or so, we use |
138 | | // a ton of disk space in this test, and it fails on normal |
139 | | // sized /tmp dirs. |
140 | | // TODO: once we collapse multiple tablets into shared WAL files, |
141 | | // this won't be necessary. |
142 | 9 | FLAGS_log_preallocate_segments = false; |
143 | | |
144 | | // Workaround KUDU-941: without this, it's likely that while shutting |
145 | | // down tablets, they'll get resuscitated by their existing leaders. |
146 | 9 | FLAGS_TEST_enable_remote_bootstrap = false; |
147 | | |
148 | 9 | YBMiniClusterTestBase::SetUp(); |
149 | 9 | MiniClusterOptions opts; |
150 | 9 | opts.num_tablet_servers = 3; |
151 | 9 | cluster_.reset(new MiniCluster(opts)); |
152 | 9 | ASSERT_OK(cluster_->Start()); |
153 | | |
154 | 0 | client_ = ASSERT_RESULT(YBClientBuilder() |
155 | 0 | .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str()) |
156 | 0 | .Build()); |
157 | |
|
158 | 0 | messenger_ = ASSERT_RESULT( |
159 | 0 | MessengerBuilder("stress-test-msgr").set_num_reactors(1).Build()); |
160 | 0 | rpc::ProxyCache proxy_cache(messenger_.get()); |
161 | 0 | master::MasterClusterProxy proxy(&proxy_cache, cluster_->mini_master()->bound_rpc_addr()); |
162 | 0 | ts_map_ = ASSERT_RESULT(CreateTabletServerMap(proxy, &proxy_cache)); |
163 | 0 | } |
164 | | |
165 | 3 | void DoTearDown() override { |
166 | 3 | messenger_->Shutdown(); |
167 | 3 | client_.reset(); |
168 | 3 | cluster_->Shutdown(); |
169 | 3 | ts_map_.clear(); |
170 | 3 | } |
171 | | |
172 | | void CreateBigTable(const YBTableName& table_name, int num_tablets); |
173 | | |
174 | | protected: |
175 | | std::unique_ptr<YBClient> client_; |
176 | | YBSchema schema_; |
177 | | std::unique_ptr<Messenger> messenger_; |
178 | | std::unique_ptr<master::MasterClusterProxy> master_proxy_; |
179 | | TabletServerMap ts_map_; |
180 | | }; |
181 | | |
182 | 0 | void CreateTableStressTest::CreateBigTable(const YBTableName& table_name, int num_tablets) { |
183 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name(), |
184 | 0 | table_name.namespace_type())); |
185 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
186 | 0 | ASSERT_OK(table_creator->table_name(table_name) |
187 | 0 | .schema(&schema_) |
188 | 0 | .num_tablets(num_tablets) |
189 | 0 | .wait(false) |
190 | 0 | .Create()); |
191 | 0 | } |
192 | | |
193 | 0 | TEST_F(CreateTableStressTest, GetTableLocationsBenchmark) { |
194 | 0 | FLAGS_max_create_tablets_per_ts = FLAGS_benchmark_num_tablets; |
195 | 0 | DontVerifyClusterBeforeNextTearDown(); |
196 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
197 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 1. Creating big table " |
198 | 0 | << table_name.ToString() << " ..."; |
199 | 0 | LOG_TIMING(INFO, "creating big table") { |
200 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_benchmark_num_tablets)); |
201 | 0 | } |
202 | | |
203 | | // Make sure the table is completely created before we start poking. |
204 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 2. Waiting for creation of big table " |
205 | 0 | << table_name.ToString() << " to complete..."; |
206 | 0 | master::GetTableLocationsResponsePB create_resp; |
207 | 0 | LOG_TIMING(INFO, "waiting for creation of big table") { |
208 | 0 | ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
209 | 0 | FLAGS_benchmark_num_tablets, &create_resp)); |
210 | 0 | } |
211 | | // Sleep for a while to let all TS heartbeat to master. |
212 | 0 | SleepFor(MonoDelta::FromSeconds(10)); |
213 | 0 | const int kNumThreads = FLAGS_benchmark_num_threads; |
214 | 0 | const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs); |
215 | | |
216 | | // Make one proxy per thread, so each thread gets its own messenger and |
217 | | // reactor. If there were only one messenger, then only one reactor thread |
218 | | // would be used for the connection to the master, so this benchmark would |
219 | | // probably be testing the serialization and network code rather than the |
220 | | // master GTL code. |
221 | 0 | vector<rpc::AutoShutdownMessengerHolder> messengers; |
222 | 0 | vector<master::MasterClientProxy> proxies; |
223 | 0 | vector<unique_ptr<rpc::ProxyCache>> caches; |
224 | 0 | messengers.reserve(kNumThreads); |
225 | 0 | proxies.reserve(kNumThreads); |
226 | 0 | caches.reserve(kNumThreads); |
227 | 0 | for (int i = 0; i < kNumThreads; i++) { |
228 | 0 | messengers.emplace_back( |
229 | 0 | ASSERT_RESULT(MessengerBuilder("Client").set_num_reactors(1).Build()).release()); |
230 | 0 | caches.emplace_back(new rpc::ProxyCache(messengers.back().get())); |
231 | 0 | proxies.emplace_back(caches.back().get(), cluster_->mini_master()->bound_rpc_addr()); |
232 | 0 | } |
233 | |
|
234 | 0 | std::atomic<bool> stop { false }; |
235 | 0 | vector<std::thread> threads; |
236 | 0 | threads.reserve(kNumThreads); |
237 | 0 | for (int i = 0; i < kNumThreads; i++) { |
238 | 0 | threads.emplace_back([&, i]() { |
239 | 0 | while (!stop) { |
240 | 0 | master::GetTableLocationsRequestPB req; |
241 | 0 | master::GetTableLocationsResponsePB resp; |
242 | 0 | RpcController controller; |
243 | | // Silence errors. |
244 | 0 | controller.set_timeout(MonoDelta::FromSeconds(10)); |
245 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
246 | 0 | req.set_max_returned_locations(1000); |
247 | 0 | CHECK_OK(proxies[i].GetTableLocations(req, &resp, &controller)); |
248 | 0 | CHECK_EQ(resp.tablet_locations_size(), FLAGS_benchmark_num_tablets); |
249 | 0 | } |
250 | 0 | }); |
251 | 0 | } |
252 | |
|
253 | 0 | std::stringstream profile; |
254 | 0 | StartSynchronizationProfiling(); |
255 | 0 | SleepFor(kRuntime); |
256 | 0 | stop = true; |
257 | 0 | for (auto& t : threads) { |
258 | 0 | t.join(); |
259 | 0 | } |
260 | 0 | StopSynchronizationProfiling(); |
261 | 0 | int64_t discarded_samples = 0; |
262 | 0 | FlushSynchronizationProfile(&profile, &discarded_samples); |
263 | |
|
264 | 0 | const auto& ent = cluster_->mini_master()->master()->metric_entity(); |
265 | 0 | auto hist = METRIC_handler_latency_yb_master_MasterClient_GetTableLocations |
266 | 0 | .Instantiate(ent); |
267 | |
|
268 | 0 | cluster_->Shutdown(); |
269 | |
|
270 | 0 | LOG(INFO) << "LOCK PROFILE\n" << profile.str(); |
271 | 0 | LOG(INFO) << "BENCHMARK HISTOGRAM:"; |
272 | 0 | hist->histogram()->DumpHumanReadable(&LOG(INFO)); |
273 | 0 | } |
274 | | |
275 | | class CreateMultiHBTableStressTest : public CreateTableStressTest, |
276 | | public testing::WithParamInterface<bool /* is_multiHb */> { |
277 | 4 | void SetUp() override { |
278 | | // "MultiHB" Tables are too large to be reported in a single heartbeat from a TS. |
279 | | // Setup so all 3 TS will have to break tablet report updates into multiple chunks. |
280 | 4 | bool is_multiHb = GetParam(); |
281 | 4 | if (is_multiHb) { |
282 | | // 90 Tablets * 3 TS < 300 Tablets |
283 | 2 | FLAGS_tablet_report_limit = 90; |
284 | 2 | FLAGS_num_test_tablets = 300; |
285 | 2 | FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets; |
286 | | // 1000 ms deadline / 20 ms wait/batch ~= 40 Tablets processed before Master hits deadline |
287 | 2 | FLAGS_TEST_inject_latency_during_tablet_report_ms = 20; |
288 | 2 | FLAGS_heartbeat_rpc_timeout_ms = 1000; |
289 | 2 | FLAGS_catalog_manager_report_batch_size = 1; |
290 | 2 | } |
291 | 4 | CreateTableStressTest::SetUp(); |
292 | 4 | } |
293 | | }; |
294 | | INSTANTIATE_TEST_CASE_P(MultiHeartbeat, CreateMultiHBTableStressTest, ::testing::Bool()); |
295 | | |
296 | | // Replaces itest version, which requires an External Mini Cluster. |
297 | | Status ListRunningTabletIds(std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy, |
298 | | const MonoDelta& timeout, |
299 | 0 | std::vector<string>* tablet_ids) { |
300 | 0 | tserver::ListTabletsRequestPB req; |
301 | 0 | tserver::ListTabletsResponsePB resp; |
302 | 0 | RpcController rpc; |
303 | 0 | rpc.set_timeout(timeout); |
304 | |
|
305 | 0 | RETURN_NOT_OK(ts_proxy->ListTablets(req, &resp, &rpc)); |
306 | 0 | tablet_ids->clear(); |
307 | 0 | for (const auto& t : resp.status_and_schema()) { |
308 | 0 | if (t.tablet_status().state() == tablet::RUNNING) { |
309 | 0 | tablet_ids->push_back(t.tablet_status().tablet_id()); |
310 | 0 | } |
311 | 0 | } |
312 | 0 | return Status::OK(); |
313 | 0 | } |
314 | | |
315 | 0 | TEST_P(CreateMultiHBTableStressTest, CreateAndDeleteBigTable) { |
316 | 0 | DontVerifyClusterBeforeNextTearDown(); |
317 | 0 | if (IsSanitizer()) { |
318 | 0 | LOG(INFO) << "Skipping slow test"; |
319 | 0 | return; |
320 | 0 | } |
321 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
322 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
323 | 0 | master::GetTableLocationsResponsePB resp; |
324 | 0 | ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
325 | 0 | FLAGS_num_test_tablets, &resp)); |
326 | 0 | LOG(INFO) << "Created table successfully!"; |
327 | | // Use std::cout instead of log, since these responses are large and log |
328 | | // messages have a max size. |
329 | 0 | std::cout << "Response:\n" << resp.DebugString(); |
330 | 0 | std::cout << "CatalogManager state:\n"; |
331 | 0 | cluster_->mini_master()->catalog_manager().DumpState(&std::cerr); |
332 | | |
333 | | // Store all relevant tablets for this big table we've created. |
334 | 0 | std::vector<string> big_table_tablets; |
335 | 0 | for (const auto & loc : resp.tablet_locations()) { |
336 | 0 | big_table_tablets.push_back(loc.tablet_id()); |
337 | 0 | } |
338 | 0 | std::sort(big_table_tablets.begin(), big_table_tablets.end()); |
339 | |
|
340 | 0 | LOG(INFO) << "Deleting table..."; |
341 | 0 | ASSERT_OK(client_->DeleteTable(table_name)); |
342 | | |
343 | | // The actual removal of the tablets is asynchronous, so we loop for a bit |
344 | | // waiting for them to get removed. |
345 | 0 | LOG(INFO) << "Waiting for tablets to be removed on TS#1"; |
346 | 0 | std::vector<string> big_tablet_left, tablet_ids; |
347 | 0 | auto ts_proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
348 | 0 | for (int i = 0; i < 1000; i++) { |
349 | 0 | ASSERT_OK(ListRunningTabletIds(ts_proxy, 10s, &tablet_ids)); |
350 | 0 | std::sort(tablet_ids.begin(), tablet_ids.end()); |
351 | 0 | big_tablet_left.clear(); |
352 | 0 | std::set_intersection(big_table_tablets.begin(), big_table_tablets.end(), |
353 | 0 | tablet_ids.begin(), tablet_ids.end(), |
354 | 0 | big_tablet_left.begin()); |
355 | 0 | if (big_tablet_left.empty()) return; |
356 | 0 | SleepFor(MonoDelta::FromMilliseconds(100)); |
357 | 0 | } |
358 | 0 | ASSERT_TRUE(big_tablet_left.empty()) << "Tablets remaining: " << big_tablet_left.size() |
359 | 0 | << " : " << big_tablet_left; |
360 | 0 | } |
361 | | |
362 | 0 | TEST_P(CreateMultiHBTableStressTest, RestartServersAfterCreation) { |
363 | 0 | DontVerifyClusterBeforeNextTearDown(); |
364 | 0 | if (IsSanitizer()) { |
365 | 0 | LOG(INFO) << "Skipping slow test"; |
366 | 0 | return; |
367 | 0 | } |
368 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
369 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
370 | |
|
371 | 0 | for (int i = 0; i < 3; i++) { |
372 | 0 | SleepFor(MonoDelta::FromMicroseconds(500)); |
373 | 0 | LOG(INFO) << "Restarting master..."; |
374 | 0 | ASSERT_OK(cluster_->mini_master()->Restart()); |
375 | 0 | ASSERT_OK(cluster_->mini_master()->master()-> |
376 | 0 | WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
377 | 0 | LOG(INFO) << "Master restarted."; |
378 | 0 | } |
379 | | |
380 | | // Restart TS#2, which forces a full tablet report on TS #2 and incremental updates on the others. |
381 | 0 | ASSERT_OK(cluster_->mini_tablet_server(1)->Restart()); |
382 | |
|
383 | 0 | master::GetTableLocationsResponsePB resp; |
384 | 0 | Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
385 | 0 | FLAGS_num_test_tablets, &resp); |
386 | 0 | if (!s.ok()) { |
387 | 0 | cluster_->mini_master()->catalog_manager().DumpState(&std::cerr); |
388 | 0 | CHECK_OK(s); |
389 | 0 | } |
390 | 0 | } |
391 | | |
392 | | class CreateSmallHBTableStressTest : public CreateTableStressTest { |
393 | 1 | void SetUp() override { |
394 | | // 40 / 3 ~= 13 tablets / server. 2 / report >= 7 reports to finish a heartbeat |
395 | 1 | FLAGS_tablet_report_limit = 2; |
396 | 1 | FLAGS_num_test_tablets = 40; |
397 | 1 | FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets; |
398 | | |
399 | 1 | CreateTableStressTest::SetUp(); |
400 | 1 | } |
401 | | }; |
402 | 0 | TEST_F(CreateSmallHBTableStressTest, TestRestartMasterDuringFullHeartbeat) { |
403 | 0 | DontVerifyClusterBeforeNextTearDown(); |
404 | 0 | if (IsSanitizer()) { |
405 | 0 | LOG(INFO) << "Skipping slow test"; |
406 | 0 | return; |
407 | 0 | } |
408 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
409 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
410 | | |
411 | | // 100 ms wait / tablet >= 1.3 sec to receive a full report |
412 | 0 | FLAGS_TEST_inject_latency_during_tablet_report_ms = 100; |
413 | 0 | FLAGS_catalog_manager_report_batch_size = 1; |
414 | | |
415 | | // Restart Master #1. Triggers Full Report from all TServers. |
416 | 0 | ASSERT_OK(cluster_->mini_master()->Restart()); |
417 | 0 | ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
418 | | |
419 | | // Wait until the Master is ~25% complete with getting the heartbeats from the TS. |
420 | 0 | master::GetTableLocationsResponsePB resp; |
421 | 0 | ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
422 | 0 | FLAGS_num_test_tablets / 4, &resp)); |
423 | 0 | ASSERT_LT(resp.tablet_locations_size(), FLAGS_num_test_tablets / 2); |
424 | 0 | LOG(INFO) << "Resetting Master after seeing table count: " << resp.tablet_locations_size(); |
425 | | |
426 | | // Restart Master #2. Re-triggers a Full Report from all TServers, even though they were in the |
427 | | // middle of sending a full report to the old master. |
428 | 0 | ASSERT_OK(cluster_->mini_master()->Restart()); |
429 | 0 | ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
430 | | |
431 | | // Speed up the test now... |
432 | 0 | FLAGS_TEST_inject_latency_during_tablet_report_ms = 0; |
433 | | |
434 | | // The TS should send a full report. If they just sent the remainder from their original |
435 | | // Full Report, this test will fail. |
436 | 0 | Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
437 | 0 | FLAGS_num_test_tablets, &resp); |
438 | 0 | if (!s.ok()) { |
439 | 0 | cluster_->mini_master()->catalog_manager().DumpState(&std::cerr); |
440 | 0 | CHECK_OK(s); |
441 | 0 | } |
442 | 0 | } |
443 | | |
444 | 0 | TEST_F(CreateTableStressTest, TestHeartbeatDeadline) { |
445 | 0 | DontVerifyClusterBeforeNextTearDown(); |
446 | | |
447 | | // 500ms deadline / 50 ms wait ~= 10 Tablets processed before Master hits deadline |
448 | 0 | FLAGS_catalog_manager_report_batch_size = 1; |
449 | 0 | FLAGS_TEST_inject_latency_during_tablet_report_ms = 50; |
450 | 0 | FLAGS_heartbeat_rpc_timeout_ms = 500; |
451 | 0 | FLAGS_num_test_tablets = 60; |
452 | | |
453 | | // Create a Table with 60 tablets, so ~20 per TS. |
454 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
455 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
456 | 0 | master::GetTableLocationsResponsePB resp; |
457 | 0 | ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
458 | 0 | FLAGS_num_test_tablets, &resp)); |
459 | | |
460 | | // Grab TS#1 and Generate a Full Report for it. |
461 | 0 | auto ts_server = cluster_->mini_tablet_server(0)->server(); |
462 | 0 | master::TSHeartbeatRequestPB hb_req; |
463 | 0 | hb_req.mutable_common()->mutable_ts_instance()->CopyFrom(ts_server->instance_pb()); |
464 | 0 | ts_server->tablet_manager()->StartFullTabletReport(hb_req.mutable_tablet_report()); |
465 | 0 | ASSERT_GT(hb_req.tablet_report().updated_tablets_size(), |
466 | 0 | FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms); |
467 | 0 | ASSERT_EQ(ts_server->tablet_manager()->GetReportLimit(), FLAGS_tablet_report_limit); |
468 | 0 | ASSERT_LE(hb_req.tablet_report().updated_tablets_size(), FLAGS_tablet_report_limit); |
469 | |
|
470 | 0 | rpc::ProxyCache proxy_cache(messenger_.get()); |
471 | 0 | master::MasterHeartbeatProxy proxy(&proxy_cache, cluster_->mini_master()->bound_rpc_addr()); |
472 | | |
473 | | // Grab Master and Process this Tablet Report. |
474 | | // This should go over the deadline and get truncated. |
475 | 0 | master::TSHeartbeatResponsePB hb_resp; |
476 | 0 | hb_req.mutable_tablet_report()->set_is_incremental(true); |
477 | 0 | hb_req.mutable_tablet_report()->set_sequence_number(1); |
478 | 0 | Status heartbeat_status; |
479 | | // Regression testbed often has stalls at this timing granularity. Allow a couple hiccups. |
480 | 0 | for (int tries = 0; tries < 3; ++tries) { |
481 | 0 | RpcController rpc; |
482 | 0 | rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms)); |
483 | 0 | heartbeat_status = proxy.TSHeartbeat(hb_req, &hb_resp, &rpc); |
484 | 0 | if (heartbeat_status.ok()) break; |
485 | 0 | ASSERT_TRUE(heartbeat_status.IsTimedOut()); |
486 | 0 | } |
487 | 0 | ASSERT_OK(heartbeat_status); |
488 | 0 | ASSERT_TRUE(hb_resp.tablet_report().processing_truncated()); |
489 | 0 | ASSERT_LE(hb_resp.tablet_report().tablets_size(), |
490 | 0 | FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms); |
491 | 0 | } |
492 | | |
493 | | |
494 | 0 | TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) { |
495 | 0 | DontVerifyClusterBeforeNextTearDown(); |
496 | 0 | if (!AllowSlowTests()) { |
497 | 0 | LOG(INFO) << "Skipping slow test"; |
498 | 0 | return; |
499 | 0 | } |
500 | | |
501 | 0 | YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table"); |
502 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 1. Creating big table " |
503 | 0 | << table_name.ToString() << " ..."; |
504 | 0 | LOG_TIMING(INFO, "creating big table") { |
505 | 0 | ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets)); |
506 | 0 | } |
507 | |
|
508 | 0 | master::GetTableLocationsRequestPB req; |
509 | 0 | master::GetTableLocationsResponsePB resp; |
510 | | |
511 | | // Make sure the table is completely created before we start poking. |
512 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 2. Waiting for creation of big table " |
513 | 0 | << table_name.ToString() << " to complete..."; |
514 | 0 | LOG_TIMING(INFO, "waiting for creation of big table") { |
515 | 0 | ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name, |
516 | 0 | FLAGS_num_test_tablets, &resp)); |
517 | 0 | } |
518 | | |
519 | | // Test asking for 0 tablets, should fail |
520 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 3. Asking for zero tablets..."; |
521 | 0 | LOG_TIMING(INFO, "asking for zero tablets") { |
522 | 0 | req.Clear(); |
523 | 0 | resp.Clear(); |
524 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
525 | 0 | req.set_max_returned_locations(0); |
526 | 0 | Status s = cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp); |
527 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "must be greater than 0"); |
528 | 0 | } |
529 | | |
530 | | // Ask for one, get one, verify |
531 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 4. Asking for one tablet..."; |
532 | 0 | LOG_TIMING(INFO, "asking for one tablet") { |
533 | 0 | req.Clear(); |
534 | 0 | resp.Clear(); |
535 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
536 | 0 | req.set_max_returned_locations(1); |
537 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
538 | 0 | ASSERT_EQ(resp.tablet_locations_size(), 1); |
539 | | // empty since it's the first |
540 | 0 | ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), ""); |
541 | 0 | ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_end(), string("\x80\0\0\1", 4)); |
542 | 0 | } |
543 | |
|
544 | 0 | int half_tablets = FLAGS_num_test_tablets / 2; |
545 | | // Ask for half of them, get that number back |
546 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 5. Asking for half the tablets..."; |
547 | 0 | LOG_TIMING(INFO, "asking for half the tablets") { |
548 | 0 | req.Clear(); |
549 | 0 | resp.Clear(); |
550 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
551 | 0 | req.set_max_returned_locations(half_tablets); |
552 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
553 | 0 | ASSERT_EQ(half_tablets, resp.tablet_locations_size()); |
554 | 0 | } |
555 | | |
556 | | // Ask for all of them, get that number back |
557 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 6. Asking for all the tablets..."; |
558 | 0 | LOG_TIMING(INFO, "asking for all the tablets") { |
559 | 0 | req.Clear(); |
560 | 0 | resp.Clear(); |
561 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
562 | 0 | req.set_max_returned_locations(FLAGS_num_test_tablets); |
563 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
564 | 0 | ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size()); |
565 | 0 | } |
566 | |
|
567 | 0 | LOG(INFO) << "========================================================"; |
568 | 0 | LOG(INFO) << "Tables and tablets:"; |
569 | 0 | LOG(INFO) << "========================================================"; |
570 | 0 | auto tables = cluster_->mini_master()->catalog_manager().GetTables( |
571 | 0 | master::GetTablesMode::kAll); |
572 | 0 | for (const scoped_refptr<master::TableInfo>& table_info : tables) { |
573 | 0 | LOG(INFO) << "Table: " << table_info->ToString(); |
574 | 0 | auto tablets = table_info->GetTablets(); |
575 | 0 | for (const scoped_refptr<master::TabletInfo>& tablet_info : tablets) { |
576 | 0 | auto l_tablet = tablet_info->LockForRead(); |
577 | 0 | const master::SysTabletsEntryPB& metadata = l_tablet->pb; |
578 | 0 | LOG(INFO) << " Tablet: " << tablet_info->ToString() |
579 | 0 | << " { start_key: " |
580 | 0 | << ((metadata.partition().has_partition_key_start()) |
581 | 0 | ? metadata.partition().partition_key_start() : "<< none >>") |
582 | 0 | << ", end_key: " |
583 | 0 | << ((metadata.partition().has_partition_key_end()) |
584 | 0 | ? metadata.partition().partition_key_end() : "<< none >>") |
585 | 0 | << ", running = " << tablet_info->metadata().state().is_running() << " }"; |
586 | 0 | } |
587 | 0 | ASSERT_EQ(FLAGS_num_test_tablets, tablets.size()); |
588 | 0 | } |
589 | 0 | LOG(INFO) << "========================================================"; |
590 | | |
591 | | // Get a single tablet in the middle, make sure we get that one back |
592 | |
|
593 | 0 | std::unique_ptr<YBPartialRow> row(schema_.NewRow()); |
594 | 0 | ASSERT_OK(row->SetInt32(0, half_tablets - 1)); |
595 | 0 | string start_key_middle; |
596 | 0 | ASSERT_OK(row->EncodeRowKey(&start_key_middle)); |
597 | |
|
598 | 0 | LOG(INFO) << "Start key middle: " << start_key_middle; |
599 | 0 | LOG(INFO) << CURRENT_TEST_NAME() << ": Step 7. Asking for single middle tablet..."; |
600 | 0 | LOG_TIMING(INFO, "asking for single middle tablet") { |
601 | 0 | req.Clear(); |
602 | 0 | resp.Clear(); |
603 | 0 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
604 | 0 | req.set_max_returned_locations(1); |
605 | 0 | req.set_partition_key_start(start_key_middle); |
606 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
607 | 0 | ASSERT_EQ(1, resp.tablet_locations_size()) << "Response: [" << resp.DebugString() << "]"; |
608 | 0 | ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start()); |
609 | 0 | } |
610 | 0 | } |
611 | | |
612 | | // Creates tables and reloads on-disk metadata concurrently to test for races |
613 | | // between the two operations. |
614 | 0 | TEST_F(CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata) { |
615 | 0 | AtomicBool stop(false); |
616 | | |
617 | | // Since this test constantly invokes VisitSysCatalog() which is the function |
618 | | // that runs after a new leader gets elected, during that period the leader rejects |
619 | | // tablet server heart-beats (because it holds the leader_lock_), and this leads |
620 | | // the master to mistakenly think that the tablet servers are dead. To avoid this |
621 | | // increase the TS unresponsive timeout so that the leader correctly thinks that |
622 | | // they are alive. |
623 | 0 | SetAtomicFlag(5 * 60 * 1000, &FLAGS_tserver_unresponsive_timeout_ms); |
624 | |
|
625 | 0 | thread reload_metadata_thread([&]() { |
626 | 0 | while (!stop.Load()) { |
627 | 0 | CHECK_OK(cluster_->mini_master()->catalog_manager().VisitSysCatalog(0)); |
628 | | // Give table creation a chance to run. |
629 | 0 | SleepFor(MonoDelta::FromMilliseconds(10 * kTimeMultiplier)); |
630 | 0 | } |
631 | 0 | }); |
632 | |
|
633 | 0 | auto se = ScopeExit([&stop, &reload_metadata_thread] { |
634 | 0 | stop.Store(true); |
635 | 0 | reload_metadata_thread.join(); |
636 | 0 | }); |
637 | |
|
638 | 0 | for (int num_tables_created = 0; num_tables_created < 20;) { |
639 | 0 | YBTableName table_name( |
640 | 0 | YQL_DATABASE_CQL, "my_keyspace", Substitute("test-$0", num_tables_created)); |
641 | 0 | LOG(INFO) << "Creating table " << table_name.ToString(); |
642 | 0 | Status s = client_->CreateNamespaceIfNotExists(table_name.namespace_name(), |
643 | 0 | table_name.namespace_type()); |
644 | 0 | if (s.ok()) { |
645 | 0 | unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
646 | 0 | s = table_creator->table_name(table_name) |
647 | 0 | .schema(&schema_) |
648 | 0 | .hash_schema(YBHashSchema::kMultiColumnHash) |
649 | 0 | .set_range_partition_columns({ "key" }) |
650 | 0 | .num_tablets(1) |
651 | 0 | .wait(false) |
652 | 0 | .Create(); |
653 | 0 | } |
654 | 0 | if (s.IsServiceUnavailable()) { |
655 | | // The master was busy reloading its metadata. Try again. |
656 | | // |
657 | | // This is a purely synthetic case. In real life, it only manifests at |
658 | | // startup (single master) or during leader failover (multiple masters). |
659 | | // In the latter case, the client will transparently retry to another |
660 | | // master. That won't happen here as we've only got one master, so we |
661 | | // must handle retrying ourselves. |
662 | 0 | continue; |
663 | 0 | } |
664 | 0 | ASSERT_OK(s); |
665 | 0 | num_tables_created++; |
666 | 0 | LOG(INFO) << "Total created: " << num_tables_created; |
667 | 0 | } |
668 | 0 | } |
669 | | |
670 | | } // namespace yb |