YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/client-stress-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <memory>
34
#include <regex>
35
#include <vector>
36
37
#include "yb/client/client.h"
38
#include "yb/client/error.h"
39
#include "yb/client/schema.h"
40
#include "yb/client/session.h"
41
#include "yb/client/table_handle.h"
42
#include "yb/client/yb_op.h"
43
44
#include "yb/common/schema.h"
45
46
#include "yb/gutil/bind.h"
47
#include "yb/gutil/mathlimits.h"
48
#include "yb/gutil/strings/human_readable.h"
49
#include "yb/gutil/strings/substitute.h"
50
51
#include "yb/integration-tests/external_mini_cluster.h"
52
#include "yb/integration-tests/test_workload.h"
53
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
54
55
#include "yb/master/catalog_entity_info.pb.h"
56
#include "yb/master/master_rpc.h"
57
58
#include "yb/rpc/rpc.h"
59
60
#include "yb/util/curl_util.h"
61
#include "yb/util/format.h"
62
#include "yb/util/logging.h"
63
#include "yb/util/metrics.h"
64
#include "yb/util/pstack_watcher.h"
65
#include "yb/util/result.h"
66
#include "yb/util/size_literals.h"
67
#include "yb/util/status_format.h"
68
#include "yb/util/test_util.h"
69
#include "yb/util/tsan_util.h"
70
71
DECLARE_int32(memory_limit_soft_percentage);
72
73
METRIC_DECLARE_entity(tablet);
74
METRIC_DECLARE_counter(leader_memory_pressure_rejections);
75
METRIC_DECLARE_counter(follower_memory_pressure_rejections);
76
77
using strings::Substitute;
78
using std::vector;
79
using namespace std::literals; // NOLINT
80
using namespace std::placeholders;
81
82
namespace yb {
83
84
using client::YBClient;
85
using client::YBClientBuilder;
86
using client::YBTable;
87
using client::YBTableName;
88
89
class ClientStressTest : public YBMiniClusterTestBase<ExternalMiniCluster> {
90
 public:
91
8
  void SetUp() override {
92
8
    YBMiniClusterTestBase::SetUp();
93
94
8
    ExternalMiniClusterOptions opts = default_opts();
95
8
    cluster_.reset(new ExternalMiniCluster(opts));
96
8
    ASSERT_OK(cluster_->Start());
97
8
  }
98
99
7
  void DoTearDown() override {
100
7
    alarm(0);
101
7
    YBMiniClusterTestBase::DoTearDown();
102
7
  }
103
104
 protected:
105
1
  virtual ExternalMiniClusterOptions default_opts() {
106
1
    ExternalMiniClusterOptions result;
107
1
    result.num_tablet_servers = 3;
108
1
    return result;
109
1
  }
110
};
111
112
// Stress test a case where most of the operations are expected to time out.
113
// This is a regression test for various bugs we've seen in timeout handling,
114
// especially with concurrent requests.
115
1
TEST_F(ClientStressTest, TestLookupTimeouts) {
116
1
  const int kSleepMillis = AllowSlowTests() ? 5000 : 100;
117
118
1
  TestWorkload work(cluster_.get());
119
1
  work.set_num_write_threads(64);
120
1
  work.set_write_timeout_millis(10);
121
1
  work.set_timeout_allowed(true);
122
1
  work.Setup();
123
1
  work.Start();
124
1
  SleepFor(MonoDelta::FromMilliseconds(kSleepMillis));
125
1
}
126
127
// Override the base test to run in multi-master mode.
128
class ClientStressTest_MultiMaster : public ClientStressTest {
129
 protected:
130
1
  ExternalMiniClusterOptions default_opts() override {
131
1
    ExternalMiniClusterOptions result;
132
1
    result.num_masters = 3;
133
1
    result.master_rpc_ports = {0, 0, 0};
134
1
    result.num_tablet_servers = 3;
135
1
    return result;
136
1
  }
137
};
138
139
// Stress test a case where most of the operations are expected to time out.
140
// This is a regression test for KUDU-614 - it would cause a deadlock prior
141
// to fixing that bug.
142
1
TEST_F(ClientStressTest_MultiMaster, TestLeaderResolutionTimeout) {
143
1
  TestWorkload work(cluster_.get());
144
1
  work.set_num_write_threads(64);
145
146
  // This timeout gets applied to the master requests. It's lower than the
147
  // amount of time that we sleep the masters, to ensure they timeout.
148
1
  work.set_client_default_rpc_timeout_millis(250);
149
  // This is the time budget for the whole request. It has to be longer than
150
  // the above timeout so that the client actually attempts to resolve
151
  // the leader.
152
1
  work.set_write_timeout_millis(280);
153
1
  work.set_timeout_allowed(true);
154
1
  work.Setup();
155
156
1
  work.Start();
157
158
1
  ASSERT_OK(cluster_->tablet_server(0)->Pause());
159
1
  ASSERT_OK(cluster_->tablet_server(1)->Pause());
160
1
  ASSERT_OK(cluster_->tablet_server(2)->Pause());
161
1
  ASSERT_OK(cluster_->master(0)->Pause());
162
1
  ASSERT_OK(cluster_->master(1)->Pause());
163
1
  ASSERT_OK(cluster_->master(2)->Pause());
164
1
  SleepFor(MonoDelta::FromMilliseconds(300));
165
1
  ASSERT_OK(cluster_->tablet_server(0)->Resume());
166
1
  ASSERT_OK(cluster_->tablet_server(1)->Resume());
167
1
  ASSERT_OK(cluster_->tablet_server(2)->Resume());
168
1
  ASSERT_OK(cluster_->master(0)->Resume());
169
1
  ASSERT_OK(cluster_->master(1)->Resume());
170
1
  ASSERT_OK(cluster_->master(2)->Resume());
171
1
  SleepFor(MonoDelta::FromMilliseconds(100));
172
173
  // Set an explicit timeout. This test has caused deadlocks in the past.
174
  // Also make sure to dump stacks before the alarm goes off.
175
1
  PstackWatcher watcher(MonoDelta::FromSeconds(30));
176
1
  alarm(60);
177
1
}
178
179
namespace {
180
181
class ClientStressTestSlowMultiMaster : public ClientStressTest {
182
 protected:
183
1
  ExternalMiniClusterOptions default_opts() override {
184
1
    ExternalMiniClusterOptions result;
185
1
    result.num_masters = 3;
186
1
    result.master_rpc_ports = { 0, 0, 0 };
187
1
    result.extra_master_flags = { "--master_slow_get_registration_probability=0.2"s };
188
1
    result.num_tablet_servers = 0;
189
1
    return result;
190
1
  }
191
};
192
193
void LeaderMasterCallback(Synchronizer* sync,
194
                          const Status& status,
195
395k
                          const HostPort& result) {
196
395k
  LOG_IF(INFO, status.ok()) << "Leader master host port: " << result.ToString();
197
395k
  sync->StatusCB(status);
198
395k
}
199
200
2
void RepeatGetLeaderMaster(ExternalMiniCluster* cluster) {
201
2
  server::MasterAddresses master_addrs;
202
8
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
203
6
    master_addrs.push_back({cluster->master(i)->bound_rpc_addr()});
204
6
  }
205
2
  auto stop_time = std::chrono::steady_clock::now() + 60s;
206
2
  std::vector<std::thread> threads;
207
22
  for (auto i = 0; i != 10; ++i) {
208
20
    threads.emplace_back([cluster, stop_time, master_addrs]() {
209
395k
      while (std::chrono::steady_clock::now() < stop_time) {
210
395k
        rpc::Rpcs rpcs;
211
395k
        Synchronizer sync;
212
395k
        auto deadline = CoarseMonoClock::Now() + 20s;
213
395k
        auto rpc = std::make_shared<master::GetLeaderMasterRpc>(
214
395k
            Bind(&LeaderMasterCallback, &sync),
215
395k
            master_addrs,
216
395k
            deadline,
217
395k
            cluster->messenger(),
218
395k
            &cluster->proxy_cache(),
219
395k
            &rpcs);
220
395k
        rpc->SendRpc();
221
395k
        auto status = sync.Wait();
222
18.4E
        LOG_IF(INFO, !status.ok()) << "Get leader master failed: " << status;
223
395k
      }
224
20
    });
225
20
  }
226
20
  for (auto& thread : threads) {
227
20
    thread.join();
228
20
  }
229
2
}
230
231
} // namespace
232
233
1
TEST_F_EX(ClientStressTest, SlowLeaderResolution, ClientStressTestSlowMultiMaster) {
234
1
  DontVerifyClusterBeforeNextTearDown();
235
236
1
  RepeatGetLeaderMaster(cluster_.get());
237
1
}
238
239
namespace {
240
241
class ClientStressTestSmallQueueMultiMaster : public ClientStressTest {
242
 protected:
243
1
  ExternalMiniClusterOptions default_opts() override {
244
1
    ExternalMiniClusterOptions result;
245
1
    result.num_masters = 3;
246
1
    result.master_rpc_ports = { 0, 0, 0 };
247
1
    result.extra_master_flags = { "--master_svc_queue_length=5"s };
248
1
    result.num_tablet_servers = 0;
249
1
    return result;
250
1
  }
251
};
252
253
} // namespace
254
255
1
TEST_F_EX(ClientStressTest, RetryLeaderResolution, ClientStressTestSmallQueueMultiMaster) {
256
1
  DontVerifyClusterBeforeNextTearDown();
257
258
1
  RepeatGetLeaderMaster(cluster_.get());
259
1
}
260
261
// Override the base test to start a cluster with a low memory limit.
262
class ClientStressTest_LowMemory : public ClientStressTest {
263
 protected:
264
1
  ExternalMiniClusterOptions default_opts() override {
265
    // There's nothing scientific about this number; it must be low enough to
266
    // trigger memory pressure request rejection yet high enough for the
267
    // servers to make forward progress.
268
    //
269
    // Note that if this number is set too low, the test will fail in a CHECK in TestWorkload
270
    // after retries are exhausted when writing an entry. This happened e.g. when a substantial
271
    // upfront memory overhead was introduced by adding a large lock-free queue in Preparer.
272
1
    const int kMemLimitBytes = RegularBuildVsSanitizers(64_MB, 2_MB);
273
1
    ExternalMiniClusterOptions opts;
274
275
1
    opts.extra_tserver_flags = { Substitute("--memory_limit_hard_bytes=$0", kMemLimitBytes),
276
1
                                 "--memory_limit_soft_percentage=0"s };
277
278
1
    opts.num_tablet_servers = 3;
279
1
    return opts;
280
1
  }
281
};
282
283
// Stress test where, due to absurdly low memory limits, many client requests
284
// are rejected, forcing the client to retry repeatedly.
285
1
TEST_F(ClientStressTest_LowMemory, TestMemoryThrottling) {
286
  // Sanitized tests run much slower, so we don't want to wait for as many
287
  // rejections before declaring the test to be passed.
288
1
  const int64_t kMinRejections = 15;
289
290
1
  const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60);
291
292
1
  TestWorkload work(cluster_.get());
293
1
  work.set_write_batch_size(RegularBuildVsSanitizers(25, 5));
294
295
1
  work.Setup();
296
1
  work.Start();
297
298
  // Wait until we've rejected some number of requests.
299
1
  MonoTime deadline = MonoTime::Now() + kMaxWaitTime;
300
8
  while (true) {
301
8
    int64_t total_num_rejections = 0;
302
303
    // It can take some time for the tablets (and their metric entities) to
304
    // appear on every server. Rather than explicitly wait for that above,
305
    // we'll just treat the lack of a metric as non-fatal. If the entity
306
    // or metric is truly missing, we'll eventually timeout and fail.
307
32
    for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
308
24
      for (const auto* metric : { &METRIC_leader_memory_pressure_rejections,
309
48
                                  &METRIC_follower_memory_pressure_rejections }) {
310
48
        auto result = cluster_->tablet_server(i)->GetInt64Metric(
311
48
            &METRIC_ENTITY_tablet, nullptr, metric, "value");
312
48
        if (result.ok()) {
313
48
          total_num_rejections += *result;
314
0
        } else {
315
0
          ASSERT_TRUE(result.status().IsNotFound()) << result.status();
316
0
        }
317
48
      }
318
24
    }
319
8
    if (total_num_rejections >= kMinRejections) {
320
1
      break;
321
7
    } else if (deadline.ComesBefore(MonoTime::Now())) {
322
0
      FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired and only saw "
323
0
             << total_num_rejections << " memory rejections";
324
0
    }
325
7
    SleepFor(MonoDelta::FromMilliseconds(200));
326
7
  }
327
1
}
328
329
namespace {
330
331
class ClientStressTestSmallQueueMultiMasterWithTServers : public ClientStressTest {
332
 protected:
333
1
  ExternalMiniClusterOptions default_opts() override {
334
1
    ExternalMiniClusterOptions result;
335
1
    result.num_masters = 3;
336
1
    result.master_rpc_ports = { 0, 0, 0 };
337
1
    result.extra_master_flags = {
338
1
        "--master_svc_queue_length=5"s, "--master_inject_latency_on_tablet_lookups_ms=50"s };
339
1
    result.num_tablet_servers = 3;
340
1
    return result;
341
1
  }
342
};
343
344
} // namespace
345
346
// Check behaviour of meta cache in case of server queue is full.
347
1
TEST_F_EX(ClientStressTest, MasterQueueFull, ClientStressTestSmallQueueMultiMasterWithTServers) {
348
1
  TestWorkload workload(cluster_.get());
349
1
  workload.Setup();
350
351
1
  struct Item {
352
1
    std::unique_ptr<client::YBClient> client;
353
1
    std::unique_ptr<client::TableHandle> table;
354
1
    client::YBSessionPtr session;
355
1
    std::future<client::FlushStatus> future;
356
1
  };
357
358
1
  std::vector<Item> items;
359
1
  constexpr size_t kNumRequests = 40;
360
1
  while (items.size() != kNumRequests) {
361
1
    Item item;
362
0
    item.client = ASSERT_RESULT(cluster_->CreateClient());
363
0
    item.table = std::make_unique<client::TableHandle>();
364
0
    ASSERT_OK(item.table->Open(TestWorkloadOptions::kDefaultTableName, item.client.get()));
365
0
    item.session = std::make_shared<client::YBSession>(item.client.get());
366
0
    items.push_back(std::move(item));
367
0
  }
368
369
0
  int32_t key = 0;
370
0
  const std::string kStringValue("string value");
371
0
  for (auto& item : items) {
372
0
    auto op = item.table->NewInsertOp();
373
0
    auto req = op->mutable_request();
374
0
    QLAddInt32HashValue(req, ++key);
375
0
    item.table->AddInt32ColumnValue(req, item.table->schema().columns()[1].name(), -key);
376
0
    item.table->AddStringColumnValue(req, item.table->schema().columns()[2].name(), kStringValue);
377
0
    item.session->Apply(op);
378
0
    item.future = item.session->FlushFuture();
379
0
  }
380
381
0
  for (auto& item : items) {
382
0
    ASSERT_OK(item.future.get().status);
383
0
  }
384
0
}
385
386
namespace {
387
388
// TODO: Add peak root mem tracker metric after https://github.com/yugabyte/yugabyte-db/issues/3442
389
// is implemented. Retrieve metric value using RPC instead of parsing HTML report.
390
2
Result<size_t> GetPeakRootConsumption(const ExternalTabletServer& ts) {
391
2
  EasyCurl c;
392
2
  faststring buf;
393
2
  EXPECT_OK(c.FetchURL(Format("http://$0/mem-trackers?raw=1", ts.bound_http_hostport().ToString()),
394
2
                       &buf));
395
2
  static const std::regex re(
396
2
      R"#(\s*<td>root</td><td>([0-9.]+\w)(\s+\([0-9.]+\w\))?</td>)#"
397
2
      R"#(<td>([0-9.]+\w)</td><td>([0-9.]+\w)</td>\s*)#");
398
2
  const auto str = buf.ToString();
399
2
  std::smatch match;
400
2
  if (std::regex_search(str, match, re)) {
401
2
    const auto consumption_str = match.str(3);
402
2
    int64_t consumption;
403
2
    if (HumanReadableNumBytes::ToInt64(consumption_str, &consumption)) {
404
2
      return consumption;
405
0
    } else {
406
0
      return STATUS_FORMAT(
407
0
          InvalidArgument,
408
0
          "Failed to parse memory consumption: $0", consumption_str);
409
0
    }
410
0
  } else {
411
0
    return STATUS_FORMAT(
412
0
        InvalidArgument,
413
0
        "Failed to parse root mem tracker consumption from: $0", str);
414
0
  }
415
2
}
416
417
class ThrottleLogCounter : public ExternalDaemon::StringListener {
418
 public:
419
1
  explicit ThrottleLogCounter(ExternalDaemon* daemon) : daemon_(daemon) {
420
1
    daemon_->SetLogListener(this);
421
1
  }
422
423
1
  ~ThrottleLogCounter() {
424
1
    daemon_->RemoveLogListener(this);
425
1
  }
426
427
39
  size_t rejected_call_messages() { return rejected_call_messages_; }
428
429
39
  size_t ignored_call_messages() { return ignored_call_messages_; }
430
431
 private:
432
834
  void Handle(const GStringPiece& s) override {
433
834
    if (s.contains("Rejecting RPC call")) {
434
1
      rejected_call_messages_.fetch_add(1);
435
833
    } else if (s.contains("Ignoring RPC call")) {
436
5
      ignored_call_messages_.fetch_add(1);
437
5
    }
438
834
  }
439
440
  ExternalDaemon* daemon_;
441
  std::atomic<size_t> rejected_call_messages_{0};
442
  std::atomic<size_t> ignored_call_messages_{0};
443
};
444
445
class ClientStressTest_FollowerOom : public ClientStressTest {
446
 protected:
447
1
  ExternalMiniClusterOptions default_opts() override {
448
1
    ExternalMiniClusterOptions opts;
449
450
1
    opts.extra_tserver_flags = {
451
1
        Format("--memory_limit_hard_bytes=$0", kHardLimitBytes),
452
1
        Format("--consensus_max_batch_size_bytes=$0", kConsensusMaxBatchSizeBytes),
453
        // Turn off exponential backoff and lagging follower threshold in order to hit soft memory
454
        // limit and check throttling.
455
1
        "--enable_consensus_exponential_backoff=false",
456
        // The global log cache limit should only have to be set on the restarting tserver for
457
        // the PauseFollower test, but it does not seem to pass without the limit set on all
458
        // tservers. See GitHub issue #10689.
459
1
        "--global_log_cache_size_limit_percentage=100",
460
1
        "--consensus_lagging_follower_threshold=-1"
461
1
    };
462
463
1
    opts.num_tablet_servers = 3;
464
1
    return opts;
465
1
  }
466
467
  const size_t kHardLimitBytes = 500_MB;
468
  const size_t kConsensusMaxBatchSizeBytes = 32_MB;
469
};
470
471
} // namespace
472
473
// Original scenario for reproducing the issue is the following:
474
// 1. Kill follower, wait some time, then restart.
475
// 2. That should lead to follower trying to catch up with leader, but due to big UpdateConsensus
476
// RPC requests and slow parsing, requests will be consuming more and more memory
477
// (https://github.com/yugabyte/yugabyte-db/issues/2563,
478
// https://github.com/yugabyte/yugabyte-db/issues/2564)
479
// 3. We expect in this scenario follower to hit soft memory limit and fix for #2563 should
480
// start throttling inbound RPCs.
481
//
482
// In this test we simulate slow inbound RPC requests parsing using
483
// TEST_yb_inbound_big_calls_parse_delay_ms flag.
484
1
TEST_F_EX(ClientStressTest, PauseFollower, ClientStressTest_FollowerOom) {
485
1
  TestWorkload workload(cluster_.get());
486
1
  workload.set_write_timeout_millis(30000);
487
1
  workload.set_num_tablets(1);
488
1
  workload.set_num_write_threads(4);
489
1
  workload.set_write_batch_size(500);
490
1
  workload.set_payload_bytes(100);
491
1
  workload.Setup();
492
1
  workload.Start();
493
494
5
  while (workload.rows_inserted() < 500) {
495
4
    std::this_thread::sleep_for(10ms);
496
4
  }
497
498
1
  auto ts = cluster_->tablet_server(0);
499
500
1
  LOG(INFO) << "Peak root mem tracker consumption: "
501
1
            << HumanReadableNumBytes::ToString(ASSERT_RESULT(GetPeakRootConsumption(*ts)));
502
503
1
  LOG(INFO) << "Killing ts-1";
504
1
  ts->Shutdown();
505
1
  std::this_thread::sleep_for(30s);
506
1
  LOG(INFO) << "Restarting ts-1";
507
1
  ts->mutable_flags()->push_back("--TEST_yb_inbound_big_calls_parse_delay_ms=30000");
508
1
  ts->mutable_flags()->push_back("--binary_call_parser_reject_on_mem_tracker_hard_limit=true");
509
1
  ts->mutable_flags()->push_back(Format("--rpc_throttle_threshold_bytes=$0", 1_MB));
510
  // Read buffer should be large enough to accept the large RPCs.
511
1
  ts->mutable_flags()->push_back("--read_buffer_memory_limit=-10");
512
1
  ASSERT_OK(ts->Restart());
513
514
1
  ThrottleLogCounter log_counter(ts);
515
39
  for (;;) {
516
39
    const auto ignored_rejected_call_messages =
517
39
        log_counter.ignored_call_messages() + log_counter.rejected_call_messages();
518
39
    YB_LOG_EVERY_N_SECS(INFO, 5) << "Ignored/rejected call messages: "
519
8
                                 << ignored_rejected_call_messages;
520
39
    if (ignored_rejected_call_messages > 5) {
521
1
      break;
522
1
    }
523
38
    std::this_thread::sleep_for(1s);
524
38
  }
525
526
1
  const auto peak_consumption = ASSERT_RESULT(GetPeakRootConsumption(*ts));
527
1
  LOG(INFO) << "Peak root mem tracker consumption: "
528
1
            << HumanReadableNumBytes::ToString(peak_consumption);
529
530
1
  ASSERT_GT(peak_consumption, kHardLimitBytes * FLAGS_memory_limit_soft_percentage / 100);
531
532
1
  LOG(INFO) << "Stopping cluster";
533
534
1
  workload.StopAndJoin();
535
536
1
  cluster_->Shutdown();
537
538
1
  LOG(INFO) << "Done";
539
1
}
540
541
542
class RF1ClientStressTest : public ClientStressTest {
543
 public:
544
1
  ExternalMiniClusterOptions default_opts() override {
545
1
    ExternalMiniClusterOptions result;
546
1
    result.num_tablet_servers = 1;
547
1
    result.extra_master_flags = { "--replication_factor=1" };
548
1
    return result;
549
1
  }
550
};
551
552
// Test that config change works while running a workload.
553
1
TEST_F_EX(ClientStressTest, IncreaseReplicationFactorUnderLoad, RF1ClientStressTest) {
554
1
  TestWorkload work(cluster_.get());
555
1
  work.set_num_write_threads(1);
556
1
  work.set_num_tablets(6);
557
1
  work.Setup();
558
1
  work.Start();
559
560
  // Fill table with some records.
561
1
  std::this_thread::sleep_for(1s);
562
563
1
  ASSERT_OK(cluster_->AddTabletServer(/* start_cql_proxy= */ false, {"--time_source=skewed,-500"}));
564
565
1
  master::ReplicationInfoPB replication_info;
566
1
  replication_info.mutable_live_replicas()->set_num_replicas(2);
567
1
  ASSERT_OK(work.client().SetReplicationInfo(replication_info));
568
569
1
  LOG(INFO) << "Replication factor changed";
570
571
1
  auto deadline = CoarseMonoClock::now() + 3s;
572
31
  while (CoarseMonoClock::now() < deadline) {
573
30
    ASSERT_NO_FATALS(cluster_->AssertNoCrashes());
574
30
    std::this_thread::sleep_for(100ms);
575
30
  }
576
577
1
  work.StopAndJoin();
578
579
1
  LOG(INFO) << "Written rows: " << work.rows_inserted();
580
1
}
581
582
}  // namespace yb