YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/create-table-stress-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <memory>
34
#include <thread>
35
36
#include <glog/logging.h>
37
#include <glog/stl_logging.h>
38
#include <gtest/gtest.h>
39
40
#include "yb/client/client.h"
41
#include "yb/client/schema.h"
42
#include "yb/client/table_creator.h"
43
44
#include "yb/common/partition.h"
45
#include "yb/common/wire_protocol.h"
46
47
#include "yb/consensus/consensus.proxy.h"
48
49
#include "yb/fs/fs_manager.h"
50
51
#include "yb/integration-tests/cluster_itest_util.h"
52
#include "yb/integration-tests/mini_cluster.h"
53
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
54
55
#include "yb/master/catalog_entity_info.h"
56
#include "yb/master/catalog_manager_if.h"
57
#include "yb/master/master-test-util.h"
58
#include "yb/master/master.h"
59
#include "yb/master/master_client.proxy.h"
60
#include "yb/master/master_cluster.proxy.h"
61
#include "yb/master/master_heartbeat.proxy.h"
62
#include "yb/master/mini_master.h"
63
64
#include "yb/rpc/messenger.h"
65
#include "yb/rpc/proxy.h"
66
#include "yb/rpc/rpc_controller.h"
67
#include "yb/rpc/rpc_test_util.h"
68
69
#include "yb/tserver/mini_tablet_server.h"
70
#include "yb/tserver/tablet_server.h"
71
#include "yb/tserver/ts_tablet_manager.h"
72
#include "yb/tserver/tserver_service.proxy.h"
73
74
#include "yb/util/hdr_histogram.h"
75
#include "yb/util/metrics.h"
76
#include "yb/util/scope_exit.h"
77
#include "yb/util/spinlock_profiling.h"
78
#include "yb/util/status_log.h"
79
#include "yb/util/stopwatch.h"
80
#include "yb/util/test_util.h"
81
#include "yb/util/tsan_util.h"
82
83
using yb::client::YBClient;
84
using yb::client::YBClientBuilder;
85
using yb::client::YBSchema;
86
using yb::client::YBSchemaBuilder;
87
using yb::client::YBTableCreator;
88
using yb::client::YBTableName;
89
using yb::itest::CreateTabletServerMap;
90
using yb::itest::TabletServerMap;
91
using yb::rpc::Messenger;
92
using yb::rpc::MessengerBuilder;
93
using yb::rpc::RpcController;
94
95
DECLARE_int32(heartbeat_interval_ms);
96
DECLARE_bool(log_preallocate_segments);
97
DECLARE_bool(TEST_enable_remote_bootstrap);
98
DECLARE_int32(tserver_unresponsive_timeout_ms);
99
DECLARE_int32(max_create_tablets_per_ts);
100
DECLARE_int32(tablet_report_limit);
101
DECLARE_uint64(TEST_inject_latency_during_tablet_report_ms);
102
DECLARE_int32(heartbeat_rpc_timeout_ms);
103
DECLARE_int32(catalog_manager_report_batch_size);
104
DECLARE_int32(tablet_report_limit);
105
106
DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");
107
DEFINE_int32(benchmark_runtime_secs, 5, "Number of seconds to run the benchmark");
108
DEFINE_int32(benchmark_num_threads, 16, "Number of threads to run the benchmark");
109
// Increase this for actually using this as a benchmark test.
110
DEFINE_int32(benchmark_num_tablets, 8, "Number of tablets to create");
111
112
METRIC_DECLARE_histogram(handler_latency_yb_master_MasterClient_GetTableLocations);
113
114
using std::string;
115
using std::vector;
116
using std::thread;
117
using std::unique_ptr;
118
using strings::Substitute;
119
120
namespace yb {
121
122
class CreateTableStressTest : public YBMiniClusterTestBase<MiniCluster> {
123
 public:
124
9
  CreateTableStressTest() {
125
9
    YBSchemaBuilder b;
126
9
    b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey();
127
9
    b.AddColumn("v1")->Type(INT64)->NotNull();
128
9
    b.AddColumn("v2")->Type(STRING)->NotNull();
129
9
    CHECK_OK(b.Build(&schema_));
130
9
  }
131
132
9
  void SetUp() override {
133
    // Make heartbeats faster to speed test runtime.
134
9
    FLAGS_heartbeat_interval_ms = 10;
135
136
    // Don't preallocate log segments, since we're creating thousands
137
    // of tablets here. If each preallocates 64M or so, we use
138
    // a ton of disk space in this test, and it fails on normal
139
    // sized /tmp dirs.
140
    // TODO: once we collapse multiple tablets into shared WAL files,
141
    // this won't be necessary.
142
9
    FLAGS_log_preallocate_segments = false;
143
144
    // Workaround KUDU-941: without this, it's likely that while shutting
145
    // down tablets, they'll get resuscitated by their existing leaders.
146
9
    FLAGS_TEST_enable_remote_bootstrap = false;
147
148
9
    YBMiniClusterTestBase::SetUp();
149
9
    MiniClusterOptions opts;
150
9
    opts.num_tablet_servers = 3;
151
9
    cluster_.reset(new MiniCluster(opts));
152
9
    ASSERT_OK(cluster_->Start());
153
154
0
    client_ = ASSERT_RESULT(YBClientBuilder()
155
0
        .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
156
0
        .Build());
157
158
0
    messenger_ = ASSERT_RESULT(
159
0
        MessengerBuilder("stress-test-msgr").set_num_reactors(1).Build());
160
0
    rpc::ProxyCache proxy_cache(messenger_.get());
161
0
    master::MasterClusterProxy proxy(&proxy_cache, cluster_->mini_master()->bound_rpc_addr());
162
0
    ts_map_ = ASSERT_RESULT(CreateTabletServerMap(proxy, &proxy_cache));
163
0
  }
164
165
3
  void DoTearDown() override {
166
3
    messenger_->Shutdown();
167
3
    client_.reset();
168
3
    cluster_->Shutdown();
169
3
    ts_map_.clear();
170
3
  }
171
172
  void CreateBigTable(const YBTableName& table_name, int num_tablets);
173
174
 protected:
175
  std::unique_ptr<YBClient> client_;
176
  YBSchema schema_;
177
  std::unique_ptr<Messenger> messenger_;
178
  std::unique_ptr<master::MasterClusterProxy> master_proxy_;
179
  TabletServerMap ts_map_;
180
};
181
182
0
void CreateTableStressTest::CreateBigTable(const YBTableName& table_name, int num_tablets) {
183
0
  ASSERT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name(),
184
0
                                                table_name.namespace_type()));
185
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
186
0
  ASSERT_OK(table_creator->table_name(table_name)
187
0
            .schema(&schema_)
188
0
            .num_tablets(num_tablets)
189
0
            .wait(false)
190
0
            .Create());
191
0
}
192
193
0
TEST_F(CreateTableStressTest, GetTableLocationsBenchmark) {
194
0
  FLAGS_max_create_tablets_per_ts = FLAGS_benchmark_num_tablets;
195
0
  DontVerifyClusterBeforeNextTearDown();
196
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
197
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 1. Creating big table "
198
0
            << table_name.ToString() << " ...";
199
0
  LOG_TIMING(INFO, "creating big table") {
200
0
    ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_benchmark_num_tablets));
201
0
  }
202
203
  // Make sure the table is completely created before we start poking.
204
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 2. Waiting for creation of big table "
205
0
            << table_name.ToString() << " to complete...";
206
0
  master::GetTableLocationsResponsePB create_resp;
207
0
  LOG_TIMING(INFO, "waiting for creation of big table") {
208
0
    ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
209
0
                                        FLAGS_benchmark_num_tablets, &create_resp));
210
0
  }
211
  // Sleep for a while to let all TS heartbeat to master.
212
0
  SleepFor(MonoDelta::FromSeconds(10));
213
0
  const int kNumThreads = FLAGS_benchmark_num_threads;
214
0
  const auto kRuntime = MonoDelta::FromSeconds(FLAGS_benchmark_runtime_secs);
215
216
  // Make one proxy per thread, so each thread gets its own messenger and
217
  // reactor. If there were only one messenger, then only one reactor thread
218
  // would be used for the connection to the master, so this benchmark would
219
  // probably be testing the serialization and network code rather than the
220
  // master GTL code.
221
0
  vector<rpc::AutoShutdownMessengerHolder> messengers;
222
0
  vector<master::MasterClientProxy> proxies;
223
0
  vector<unique_ptr<rpc::ProxyCache>> caches;
224
0
  messengers.reserve(kNumThreads);
225
0
  proxies.reserve(kNumThreads);
226
0
  caches.reserve(kNumThreads);
227
0
  for (int i = 0; i < kNumThreads; i++) {
228
0
    messengers.emplace_back(
229
0
        ASSERT_RESULT(MessengerBuilder("Client").set_num_reactors(1).Build()).release());
230
0
    caches.emplace_back(new rpc::ProxyCache(messengers.back().get()));
231
0
    proxies.emplace_back(caches.back().get(), cluster_->mini_master()->bound_rpc_addr());
232
0
  }
233
234
0
  std::atomic<bool> stop { false };
235
0
  vector<std::thread> threads;
236
0
  threads.reserve(kNumThreads);
237
0
  for (int i = 0; i < kNumThreads; i++) {
238
0
    threads.emplace_back([&, i]() {
239
0
        while (!stop) {
240
0
          master::GetTableLocationsRequestPB req;
241
0
          master::GetTableLocationsResponsePB resp;
242
0
          RpcController controller;
243
          // Silence errors.
244
0
          controller.set_timeout(MonoDelta::FromSeconds(10));
245
0
          table_name.SetIntoTableIdentifierPB(req.mutable_table());
246
0
          req.set_max_returned_locations(1000);
247
0
          CHECK_OK(proxies[i].GetTableLocations(req, &resp, &controller));
248
0
          CHECK_EQ(resp.tablet_locations_size(), FLAGS_benchmark_num_tablets);
249
0
        }
250
0
      });
251
0
  }
252
253
0
  std::stringstream profile;
254
0
  StartSynchronizationProfiling();
255
0
  SleepFor(kRuntime);
256
0
  stop = true;
257
0
  for (auto& t : threads) {
258
0
    t.join();
259
0
  }
260
0
  StopSynchronizationProfiling();
261
0
  int64_t discarded_samples = 0;
262
0
  FlushSynchronizationProfile(&profile, &discarded_samples);
263
264
0
  const auto& ent = cluster_->mini_master()->master()->metric_entity();
265
0
  auto hist = METRIC_handler_latency_yb_master_MasterClient_GetTableLocations
266
0
      .Instantiate(ent);
267
268
0
  cluster_->Shutdown();
269
270
0
  LOG(INFO) << "LOCK PROFILE\n" << profile.str();
271
0
  LOG(INFO) << "BENCHMARK HISTOGRAM:";
272
0
  hist->histogram()->DumpHumanReadable(&LOG(INFO));
273
0
}
274
275
class CreateMultiHBTableStressTest : public CreateTableStressTest,
276
                                     public testing::WithParamInterface<bool /* is_multiHb */> {
277
4
  void SetUp() override {
278
    // "MultiHB" Tables are too large to be reported in a single heartbeat from a TS.
279
    // Setup so all 3 TS will have to break tablet report updates into multiple chunks.
280
4
    bool is_multiHb = GetParam();
281
4
    if (is_multiHb) {
282
      // 90 Tablets * 3 TS < 300 Tablets
283
2
      FLAGS_tablet_report_limit = 90;
284
2
      FLAGS_num_test_tablets = 300;
285
2
      FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets;
286
      // 1000 ms deadline / 20 ms wait/batch ~= 40 Tablets processed before Master hits deadline
287
2
      FLAGS_TEST_inject_latency_during_tablet_report_ms = 20;
288
2
      FLAGS_heartbeat_rpc_timeout_ms = 1000;
289
2
      FLAGS_catalog_manager_report_batch_size = 1;
290
2
    }
291
4
    CreateTableStressTest::SetUp();
292
4
  }
293
};
294
INSTANTIATE_TEST_CASE_P(MultiHeartbeat, CreateMultiHBTableStressTest, ::testing::Bool());
295
296
// Replaces itest version, which requires an External Mini Cluster.
297
Status ListRunningTabletIds(std::shared_ptr<tserver::TabletServerServiceProxy> ts_proxy,
298
                            const MonoDelta& timeout,
299
0
                            std::vector<string>* tablet_ids) {
300
0
  tserver::ListTabletsRequestPB req;
301
0
  tserver::ListTabletsResponsePB resp;
302
0
  RpcController rpc;
303
0
  rpc.set_timeout(timeout);
304
305
0
  RETURN_NOT_OK(ts_proxy->ListTablets(req, &resp, &rpc));
306
0
  tablet_ids->clear();
307
0
  for (const auto& t : resp.status_and_schema()) {
308
0
    if (t.tablet_status().state() == tablet::RUNNING) {
309
0
      tablet_ids->push_back(t.tablet_status().tablet_id());
310
0
    }
311
0
  }
312
0
  return Status::OK();
313
0
}
314
315
0
TEST_P(CreateMultiHBTableStressTest, CreateAndDeleteBigTable) {
316
0
  DontVerifyClusterBeforeNextTearDown();
317
0
  if (IsSanitizer()) {
318
0
    LOG(INFO) << "Skipping slow test";
319
0
    return;
320
0
  }
321
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
322
0
  ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
323
0
  master::GetTableLocationsResponsePB resp;
324
0
  ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
325
0
                                      FLAGS_num_test_tablets, &resp));
326
0
  LOG(INFO) << "Created table successfully!";
327
  // Use std::cout instead of log, since these responses are large and log
328
  // messages have a max size.
329
0
  std::cout << "Response:\n" << resp.DebugString();
330
0
  std::cout << "CatalogManager state:\n";
331
0
  cluster_->mini_master()->catalog_manager().DumpState(&std::cerr);
332
333
  // Store all relevant tablets for this big table we've created.
334
0
  std::vector<string> big_table_tablets;
335
0
  for (const auto & loc : resp.tablet_locations()) {
336
0
    big_table_tablets.push_back(loc.tablet_id());
337
0
  }
338
0
  std::sort(big_table_tablets.begin(), big_table_tablets.end());
339
340
0
  LOG(INFO) << "Deleting table...";
341
0
  ASSERT_OK(client_->DeleteTable(table_name));
342
343
  // The actual removal of the tablets is asynchronous, so we loop for a bit
344
  // waiting for them to get removed.
345
0
  LOG(INFO) << "Waiting for tablets to be removed on TS#1";
346
0
  std::vector<string> big_tablet_left, tablet_ids;
347
0
  auto ts_proxy = cluster_->mini_tablet_server(0)->server()->proxy();
348
0
  for (int i = 0; i < 1000; i++) {
349
0
    ASSERT_OK(ListRunningTabletIds(ts_proxy, 10s, &tablet_ids));
350
0
    std::sort(tablet_ids.begin(), tablet_ids.end());
351
0
    big_tablet_left.clear();
352
0
    std::set_intersection(big_table_tablets.begin(), big_table_tablets.end(),
353
0
                          tablet_ids.begin(), tablet_ids.end(),
354
0
                          big_tablet_left.begin());
355
0
    if (big_tablet_left.empty()) return;
356
0
    SleepFor(MonoDelta::FromMilliseconds(100));
357
0
  }
358
0
  ASSERT_TRUE(big_tablet_left.empty()) << "Tablets remaining: " << big_tablet_left.size()
359
0
                                       << " : " << big_tablet_left;
360
0
}
361
362
0
TEST_P(CreateMultiHBTableStressTest, RestartServersAfterCreation) {
363
0
  DontVerifyClusterBeforeNextTearDown();
364
0
  if (IsSanitizer()) {
365
0
    LOG(INFO) << "Skipping slow test";
366
0
    return;
367
0
  }
368
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
369
0
  ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
370
371
0
  for (int i = 0; i < 3; i++) {
372
0
    SleepFor(MonoDelta::FromMicroseconds(500));
373
0
    LOG(INFO) << "Restarting master...";
374
0
    ASSERT_OK(cluster_->mini_master()->Restart());
375
0
    ASSERT_OK(cluster_->mini_master()->master()->
376
0
        WaitUntilCatalogManagerIsLeaderAndReadyForTests());
377
0
    LOG(INFO) << "Master restarted.";
378
0
  }
379
380
  // Restart TS#2, which forces a full tablet report on TS #2 and incremental updates on the others.
381
0
  ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
382
383
0
  master::GetTableLocationsResponsePB resp;
384
0
  Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name,
385
0
                                       FLAGS_num_test_tablets, &resp);
386
0
  if (!s.ok()) {
387
0
    cluster_->mini_master()->catalog_manager().DumpState(&std::cerr);
388
0
    CHECK_OK(s);
389
0
  }
390
0
}
391
392
class CreateSmallHBTableStressTest : public CreateTableStressTest {
393
1
  void SetUp() override {
394
    // 40 / 3 ~= 13 tablets / server.  2 / report >= 7 reports to finish a heartbeat
395
1
    FLAGS_tablet_report_limit = 2;
396
1
    FLAGS_num_test_tablets = 40;
397
1
    FLAGS_max_create_tablets_per_ts = FLAGS_num_test_tablets;
398
399
1
    CreateTableStressTest::SetUp();
400
1
  }
401
};
402
0
TEST_F(CreateSmallHBTableStressTest, TestRestartMasterDuringFullHeartbeat) {
403
0
  DontVerifyClusterBeforeNextTearDown();
404
0
  if (IsSanitizer()) {
405
0
    LOG(INFO) << "Skipping slow test";
406
0
    return;
407
0
  }
408
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
409
0
  ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
410
411
  // 100 ms wait / tablet >= 1.3 sec to receive a full report
412
0
  FLAGS_TEST_inject_latency_during_tablet_report_ms = 100;
413
0
  FLAGS_catalog_manager_report_batch_size = 1;
414
415
  // Restart Master #1.  Triggers Full Report from all TServers.
416
0
  ASSERT_OK(cluster_->mini_master()->Restart());
417
0
  ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
418
419
  // Wait until the Master is ~25% complete with getting the heartbeats from the TS.
420
0
  master::GetTableLocationsResponsePB resp;
421
0
  ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
422
0
                                      FLAGS_num_test_tablets / 4, &resp));
423
0
  ASSERT_LT(resp.tablet_locations_size(), FLAGS_num_test_tablets / 2);
424
0
  LOG(INFO) << "Resetting Master after seeing table count: " << resp.tablet_locations_size();
425
426
  // Restart Master #2.  Re-triggers a Full Report from all TServers, even though they were in the
427
  // middle of sending a full report to the old master.
428
0
  ASSERT_OK(cluster_->mini_master()->Restart());
429
0
  ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
430
431
  // Speed up the test now...
432
0
  FLAGS_TEST_inject_latency_during_tablet_report_ms = 0;
433
434
  // The TS should send a full report.  If they just sent the remainder from their original
435
  // Full Report, this test will fail.
436
0
  Status s = WaitForRunningTabletCount(cluster_->mini_master(), table_name,
437
0
                                       FLAGS_num_test_tablets, &resp);
438
0
  if (!s.ok()) {
439
0
    cluster_->mini_master()->catalog_manager().DumpState(&std::cerr);
440
0
    CHECK_OK(s);
441
0
  }
442
0
}
443
444
0
TEST_F(CreateTableStressTest, TestHeartbeatDeadline) {
445
0
  DontVerifyClusterBeforeNextTearDown();
446
447
  // 500ms deadline / 50 ms wait ~= 10 Tablets processed before Master hits deadline
448
0
  FLAGS_catalog_manager_report_batch_size = 1;
449
0
  FLAGS_TEST_inject_latency_during_tablet_report_ms = 50;
450
0
  FLAGS_heartbeat_rpc_timeout_ms = 500;
451
0
  FLAGS_num_test_tablets = 60;
452
453
  // Create a Table with 60 tablets, so ~20 per TS.
454
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
455
0
  ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
456
0
  master::GetTableLocationsResponsePB resp;
457
0
  ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
458
0
    FLAGS_num_test_tablets, &resp));
459
460
  // Grab TS#1 and Generate a Full Report for it.
461
0
  auto ts_server = cluster_->mini_tablet_server(0)->server();
462
0
  master::TSHeartbeatRequestPB hb_req;
463
0
  hb_req.mutable_common()->mutable_ts_instance()->CopyFrom(ts_server->instance_pb());
464
0
  ts_server->tablet_manager()->StartFullTabletReport(hb_req.mutable_tablet_report());
465
0
  ASSERT_GT(hb_req.tablet_report().updated_tablets_size(),
466
0
            FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms);
467
0
  ASSERT_EQ(ts_server->tablet_manager()->GetReportLimit(), FLAGS_tablet_report_limit);
468
0
  ASSERT_LE(hb_req.tablet_report().updated_tablets_size(), FLAGS_tablet_report_limit);
469
470
0
  rpc::ProxyCache proxy_cache(messenger_.get());
471
0
  master::MasterHeartbeatProxy proxy(&proxy_cache, cluster_->mini_master()->bound_rpc_addr());
472
473
  // Grab Master and Process this Tablet Report.
474
  // This should go over the deadline and get truncated.
475
0
  master::TSHeartbeatResponsePB hb_resp;
476
0
  hb_req.mutable_tablet_report()->set_is_incremental(true);
477
0
  hb_req.mutable_tablet_report()->set_sequence_number(1);
478
0
  Status heartbeat_status;
479
  // Regression testbed often has stalls at this timing granularity.  Allow a couple hiccups.
480
0
  for (int tries = 0; tries < 3; ++tries) {
481
0
    RpcController rpc;
482
0
    rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
483
0
    heartbeat_status = proxy.TSHeartbeat(hb_req, &hb_resp, &rpc);
484
0
    if (heartbeat_status.ok()) break;
485
0
    ASSERT_TRUE(heartbeat_status.IsTimedOut());
486
0
  }
487
0
  ASSERT_OK(heartbeat_status);
488
0
  ASSERT_TRUE(hb_resp.tablet_report().processing_truncated());
489
0
  ASSERT_LE(hb_resp.tablet_report().tablets_size(),
490
0
            FLAGS_heartbeat_rpc_timeout_ms / FLAGS_TEST_inject_latency_during_tablet_report_ms);
491
0
}
492
493
494
0
TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
495
0
DontVerifyClusterBeforeNextTearDown();
496
0
  if (!AllowSlowTests()) {
497
0
    LOG(INFO) << "Skipping slow test";
498
0
    return;
499
0
  }
500
501
0
  YBTableName table_name(YQL_DATABASE_CQL, "my_keyspace", "test_table");
502
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 1. Creating big table "
503
0
            << table_name.ToString() << " ...";
504
0
  LOG_TIMING(INFO, "creating big table") {
505
0
    ASSERT_NO_FATALS(CreateBigTable(table_name, FLAGS_num_test_tablets));
506
0
  }
507
508
0
  master::GetTableLocationsRequestPB req;
509
0
  master::GetTableLocationsResponsePB resp;
510
511
  // Make sure the table is completely created before we start poking.
512
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 2. Waiting for creation of big table "
513
0
            << table_name.ToString() << " to complete...";
514
0
  LOG_TIMING(INFO, "waiting for creation of big table") {
515
0
    ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
516
0
                                       FLAGS_num_test_tablets, &resp));
517
0
  }
518
519
  // Test asking for 0 tablets, should fail
520
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 3. Asking for zero tablets...";
521
0
  LOG_TIMING(INFO, "asking for zero tablets") {
522
0
    req.Clear();
523
0
    resp.Clear();
524
0
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
525
0
    req.set_max_returned_locations(0);
526
0
    Status s = cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp);
527
0
    ASSERT_STR_CONTAINS(s.ToString(), "must be greater than 0");
528
0
  }
529
530
  // Ask for one, get one, verify
531
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 4. Asking for one tablet...";
532
0
  LOG_TIMING(INFO, "asking for one tablet") {
533
0
    req.Clear();
534
0
    resp.Clear();
535
0
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
536
0
    req.set_max_returned_locations(1);
537
0
    ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
538
0
    ASSERT_EQ(resp.tablet_locations_size(), 1);
539
    // empty since it's the first
540
0
    ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), "");
541
0
    ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_end(), string("\x80\0\0\1", 4));
542
0
  }
543
544
0
  int half_tablets = FLAGS_num_test_tablets / 2;
545
  // Ask for half of them, get that number back
546
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 5. Asking for half the tablets...";
547
0
  LOG_TIMING(INFO, "asking for half the tablets") {
548
0
    req.Clear();
549
0
    resp.Clear();
550
0
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
551
0
    req.set_max_returned_locations(half_tablets);
552
0
    ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
553
0
    ASSERT_EQ(half_tablets, resp.tablet_locations_size());
554
0
  }
555
556
  // Ask for all of them, get that number back
557
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 6. Asking for all the tablets...";
558
0
  LOG_TIMING(INFO, "asking for all the tablets") {
559
0
    req.Clear();
560
0
    resp.Clear();
561
0
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
562
0
    req.set_max_returned_locations(FLAGS_num_test_tablets);
563
0
    ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
564
0
    ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size());
565
0
  }
566
567
0
  LOG(INFO) << "========================================================";
568
0
  LOG(INFO) << "Tables and tablets:";
569
0
  LOG(INFO) << "========================================================";
570
0
  auto tables = cluster_->mini_master()->catalog_manager().GetTables(
571
0
      master::GetTablesMode::kAll);
572
0
  for (const scoped_refptr<master::TableInfo>& table_info : tables) {
573
0
    LOG(INFO) << "Table: " << table_info->ToString();
574
0
    auto tablets = table_info->GetTablets();
575
0
    for (const scoped_refptr<master::TabletInfo>& tablet_info : tablets) {
576
0
      auto l_tablet = tablet_info->LockForRead();
577
0
      const master::SysTabletsEntryPB& metadata = l_tablet->pb;
578
0
      LOG(INFO) << "  Tablet: " << tablet_info->ToString()
579
0
                << " { start_key: "
580
0
                << ((metadata.partition().has_partition_key_start())
581
0
                    ? metadata.partition().partition_key_start() : "<< none >>")
582
0
                << ", end_key: "
583
0
                << ((metadata.partition().has_partition_key_end())
584
0
                    ? metadata.partition().partition_key_end() : "<< none >>")
585
0
                << ", running = " << tablet_info->metadata().state().is_running() << " }";
586
0
    }
587
0
    ASSERT_EQ(FLAGS_num_test_tablets, tablets.size());
588
0
  }
589
0
  LOG(INFO) << "========================================================";
590
591
  // Get a single tablet in the middle, make sure we get that one back
592
593
0
  std::unique_ptr<YBPartialRow> row(schema_.NewRow());
594
0
  ASSERT_OK(row->SetInt32(0, half_tablets - 1));
595
0
  string start_key_middle;
596
0
  ASSERT_OK(row->EncodeRowKey(&start_key_middle));
597
598
0
  LOG(INFO) << "Start key middle: " << start_key_middle;
599
0
  LOG(INFO) << CURRENT_TEST_NAME() << ": Step 7. Asking for single middle tablet...";
600
0
  LOG_TIMING(INFO, "asking for single middle tablet") {
601
0
    req.Clear();
602
0
    resp.Clear();
603
0
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
604
0
    req.set_max_returned_locations(1);
605
0
    req.set_partition_key_start(start_key_middle);
606
0
    ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
607
0
    ASSERT_EQ(1, resp.tablet_locations_size()) << "Response: [" << resp.DebugString() << "]";
608
0
    ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start());
609
0
  }
610
0
}
611
612
// Creates tables and reloads on-disk metadata concurrently to test for races
613
// between the two operations.
614
0
TEST_F(CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata) {
615
0
  AtomicBool stop(false);
616
617
  // Since this test constantly invokes VisitSysCatalog() which is the function
618
  // that runs after a new leader gets elected, during that period the leader rejects
619
  // tablet server heart-beats (because it holds the leader_lock_), and this leads
620
  // the master to mistakenly think that the tablet servers are dead. To avoid this
621
  // increase the TS unresponsive timeout so that the leader correctly thinks that
622
  // they are alive.
623
0
  SetAtomicFlag(5 * 60 * 1000, &FLAGS_tserver_unresponsive_timeout_ms);
624
625
0
  thread reload_metadata_thread([&]() {
626
0
    while (!stop.Load()) {
627
0
      CHECK_OK(cluster_->mini_master()->catalog_manager().VisitSysCatalog(0));
628
      // Give table creation a chance to run.
629
0
      SleepFor(MonoDelta::FromMilliseconds(10 * kTimeMultiplier));
630
0
    }
631
0
  });
632
633
0
  auto se = ScopeExit([&stop, &reload_metadata_thread] {
634
0
    stop.Store(true);
635
0
    reload_metadata_thread.join();
636
0
  });
637
638
0
  for (int num_tables_created = 0; num_tables_created < 20;) {
639
0
    YBTableName table_name(
640
0
        YQL_DATABASE_CQL, "my_keyspace", Substitute("test-$0", num_tables_created));
641
0
    LOG(INFO) << "Creating table " << table_name.ToString();
642
0
    Status s = client_->CreateNamespaceIfNotExists(table_name.namespace_name(),
643
0
                                                   table_name.namespace_type());
644
0
    if (s.ok()) {
645
0
      unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
646
0
      s = table_creator->table_name(table_name)
647
0
          .schema(&schema_)
648
0
          .hash_schema(YBHashSchema::kMultiColumnHash)
649
0
          .set_range_partition_columns({ "key" })
650
0
          .num_tablets(1)
651
0
          .wait(false)
652
0
          .Create();
653
0
    }
654
0
    if (s.IsServiceUnavailable()) {
655
      // The master was busy reloading its metadata. Try again.
656
      //
657
      // This is a purely synthetic case. In real life, it only manifests at
658
      // startup (single master) or during leader failover (multiple masters).
659
      // In the latter case, the client will transparently retry to another
660
      // master. That won't happen here as we've only got one master, so we
661
      // must handle retrying ourselves.
662
0
      continue;
663
0
    }
664
0
    ASSERT_OK(s);
665
0
    num_tables_created++;
666
0
    LOG(INFO) << "Total created: " << num_tables_created;
667
0
  }
668
0
}
669
670
}  // namespace yb