/Users/deen/code/yugabyte-db/src/yb/integration-tests/client-stress-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <memory> |
34 | | #include <regex> |
35 | | #include <vector> |
36 | | |
37 | | #include "yb/client/client.h" |
38 | | #include "yb/client/error.h" |
39 | | #include "yb/client/schema.h" |
40 | | #include "yb/client/session.h" |
41 | | #include "yb/client/table_handle.h" |
42 | | #include "yb/client/yb_op.h" |
43 | | |
44 | | #include "yb/common/schema.h" |
45 | | |
46 | | #include "yb/gutil/bind.h" |
47 | | #include "yb/gutil/mathlimits.h" |
48 | | #include "yb/gutil/strings/human_readable.h" |
49 | | #include "yb/gutil/strings/substitute.h" |
50 | | |
51 | | #include "yb/integration-tests/external_mini_cluster.h" |
52 | | #include "yb/integration-tests/test_workload.h" |
53 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
54 | | |
55 | | #include "yb/master/catalog_entity_info.pb.h" |
56 | | #include "yb/master/master_rpc.h" |
57 | | |
58 | | #include "yb/rpc/rpc.h" |
59 | | |
60 | | #include "yb/util/curl_util.h" |
61 | | #include "yb/util/format.h" |
62 | | #include "yb/util/logging.h" |
63 | | #include "yb/util/metrics.h" |
64 | | #include "yb/util/pstack_watcher.h" |
65 | | #include "yb/util/result.h" |
66 | | #include "yb/util/size_literals.h" |
67 | | #include "yb/util/status_format.h" |
68 | | #include "yb/util/test_util.h" |
69 | | #include "yb/util/tsan_util.h" |
70 | | |
71 | | DECLARE_int32(memory_limit_soft_percentage); |
72 | | |
73 | | METRIC_DECLARE_entity(tablet); |
74 | | METRIC_DECLARE_counter(leader_memory_pressure_rejections); |
75 | | METRIC_DECLARE_counter(follower_memory_pressure_rejections); |
76 | | |
77 | | using strings::Substitute; |
78 | | using std::vector; |
79 | | using namespace std::literals; // NOLINT |
80 | | using namespace std::placeholders; |
81 | | |
82 | | namespace yb { |
83 | | |
84 | | using client::YBClient; |
85 | | using client::YBClientBuilder; |
86 | | using client::YBTable; |
87 | | using client::YBTableName; |
88 | | |
89 | | class ClientStressTest : public YBMiniClusterTestBase<ExternalMiniCluster> { |
90 | | public: |
91 | 8 | void SetUp() override { |
92 | 8 | YBMiniClusterTestBase::SetUp(); |
93 | | |
94 | 8 | ExternalMiniClusterOptions opts = default_opts(); |
95 | 8 | cluster_.reset(new ExternalMiniCluster(opts)); |
96 | 8 | ASSERT_OK(cluster_->Start()); |
97 | 8 | } |
98 | | |
99 | 7 | void DoTearDown() override { |
100 | 7 | alarm(0); |
101 | 7 | YBMiniClusterTestBase::DoTearDown(); |
102 | 7 | } |
103 | | |
104 | | protected: |
105 | 1 | virtual ExternalMiniClusterOptions default_opts() { |
106 | 1 | ExternalMiniClusterOptions result; |
107 | 1 | result.num_tablet_servers = 3; |
108 | 1 | return result; |
109 | 1 | } |
110 | | }; |
111 | | |
112 | | // Stress test a case where most of the operations are expected to time out. |
113 | | // This is a regression test for various bugs we've seen in timeout handling, |
114 | | // especially with concurrent requests. |
115 | 1 | TEST_F(ClientStressTest, TestLookupTimeouts) { |
116 | 1 | const int kSleepMillis = AllowSlowTests() ? 5000 : 100; |
117 | | |
118 | 1 | TestWorkload work(cluster_.get()); |
119 | 1 | work.set_num_write_threads(64); |
120 | 1 | work.set_write_timeout_millis(10); |
121 | 1 | work.set_timeout_allowed(true); |
122 | 1 | work.Setup(); |
123 | 1 | work.Start(); |
124 | 1 | SleepFor(MonoDelta::FromMilliseconds(kSleepMillis)); |
125 | 1 | } |
126 | | |
127 | | // Override the base test to run in multi-master mode. |
128 | | class ClientStressTest_MultiMaster : public ClientStressTest { |
129 | | protected: |
130 | 1 | ExternalMiniClusterOptions default_opts() override { |
131 | 1 | ExternalMiniClusterOptions result; |
132 | 1 | result.num_masters = 3; |
133 | 1 | result.master_rpc_ports = {0, 0, 0}; |
134 | 1 | result.num_tablet_servers = 3; |
135 | 1 | return result; |
136 | 1 | } |
137 | | }; |
138 | | |
139 | | // Stress test a case where most of the operations are expected to time out. |
140 | | // This is a regression test for KUDU-614 - it would cause a deadlock prior |
141 | | // to fixing that bug. |
142 | 1 | TEST_F(ClientStressTest_MultiMaster, TestLeaderResolutionTimeout) { |
143 | 1 | TestWorkload work(cluster_.get()); |
144 | 1 | work.set_num_write_threads(64); |
145 | | |
146 | | // This timeout gets applied to the master requests. It's lower than the |
147 | | // amount of time that we sleep the masters, to ensure they timeout. |
148 | 1 | work.set_client_default_rpc_timeout_millis(250); |
149 | | // This is the time budget for the whole request. It has to be longer than |
150 | | // the above timeout so that the client actually attempts to resolve |
151 | | // the leader. |
152 | 1 | work.set_write_timeout_millis(280); |
153 | 1 | work.set_timeout_allowed(true); |
154 | 1 | work.Setup(); |
155 | | |
156 | 1 | work.Start(); |
157 | | |
158 | 1 | ASSERT_OK(cluster_->tablet_server(0)->Pause()); |
159 | 1 | ASSERT_OK(cluster_->tablet_server(1)->Pause()); |
160 | 1 | ASSERT_OK(cluster_->tablet_server(2)->Pause()); |
161 | 1 | ASSERT_OK(cluster_->master(0)->Pause()); |
162 | 1 | ASSERT_OK(cluster_->master(1)->Pause()); |
163 | 1 | ASSERT_OK(cluster_->master(2)->Pause()); |
164 | 1 | SleepFor(MonoDelta::FromMilliseconds(300)); |
165 | 1 | ASSERT_OK(cluster_->tablet_server(0)->Resume()); |
166 | 1 | ASSERT_OK(cluster_->tablet_server(1)->Resume()); |
167 | 1 | ASSERT_OK(cluster_->tablet_server(2)->Resume()); |
168 | 1 | ASSERT_OK(cluster_->master(0)->Resume()); |
169 | 1 | ASSERT_OK(cluster_->master(1)->Resume()); |
170 | 1 | ASSERT_OK(cluster_->master(2)->Resume()); |
171 | 1 | SleepFor(MonoDelta::FromMilliseconds(100)); |
172 | | |
173 | | // Set an explicit timeout. This test has caused deadlocks in the past. |
174 | | // Also make sure to dump stacks before the alarm goes off. |
175 | 1 | PstackWatcher watcher(MonoDelta::FromSeconds(30)); |
176 | 1 | alarm(60); |
177 | 1 | } |
178 | | |
179 | | namespace { |
180 | | |
181 | | class ClientStressTestSlowMultiMaster : public ClientStressTest { |
182 | | protected: |
183 | 1 | ExternalMiniClusterOptions default_opts() override { |
184 | 1 | ExternalMiniClusterOptions result; |
185 | 1 | result.num_masters = 3; |
186 | 1 | result.master_rpc_ports = { 0, 0, 0 }; |
187 | 1 | result.extra_master_flags = { "--master_slow_get_registration_probability=0.2"s }; |
188 | 1 | result.num_tablet_servers = 0; |
189 | 1 | return result; |
190 | 1 | } |
191 | | }; |
192 | | |
193 | | void LeaderMasterCallback(Synchronizer* sync, |
194 | | const Status& status, |
195 | 395k | const HostPort& result) { |
196 | 395k | LOG_IF(INFO, status.ok()) << "Leader master host port: " << result.ToString(); |
197 | 395k | sync->StatusCB(status); |
198 | 395k | } |
199 | | |
200 | 2 | void RepeatGetLeaderMaster(ExternalMiniCluster* cluster) { |
201 | 2 | server::MasterAddresses master_addrs; |
202 | 8 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
203 | 6 | master_addrs.push_back({cluster->master(i)->bound_rpc_addr()}); |
204 | 6 | } |
205 | 2 | auto stop_time = std::chrono::steady_clock::now() + 60s; |
206 | 2 | std::vector<std::thread> threads; |
207 | 22 | for (auto i = 0; i != 10; ++i) { |
208 | 20 | threads.emplace_back([cluster, stop_time, master_addrs]() { |
209 | 395k | while (std::chrono::steady_clock::now() < stop_time) { |
210 | 395k | rpc::Rpcs rpcs; |
211 | 395k | Synchronizer sync; |
212 | 395k | auto deadline = CoarseMonoClock::Now() + 20s; |
213 | 395k | auto rpc = std::make_shared<master::GetLeaderMasterRpc>( |
214 | 395k | Bind(&LeaderMasterCallback, &sync), |
215 | 395k | master_addrs, |
216 | 395k | deadline, |
217 | 395k | cluster->messenger(), |
218 | 395k | &cluster->proxy_cache(), |
219 | 395k | &rpcs); |
220 | 395k | rpc->SendRpc(); |
221 | 395k | auto status = sync.Wait(); |
222 | 18.4E | LOG_IF(INFO, !status.ok()) << "Get leader master failed: " << status; |
223 | 395k | } |
224 | 20 | }); |
225 | 20 | } |
226 | 20 | for (auto& thread : threads) { |
227 | 20 | thread.join(); |
228 | 20 | } |
229 | 2 | } |
230 | | |
231 | | } // namespace |
232 | | |
233 | 1 | TEST_F_EX(ClientStressTest, SlowLeaderResolution, ClientStressTestSlowMultiMaster) { |
234 | 1 | DontVerifyClusterBeforeNextTearDown(); |
235 | | |
236 | 1 | RepeatGetLeaderMaster(cluster_.get()); |
237 | 1 | } |
238 | | |
239 | | namespace { |
240 | | |
241 | | class ClientStressTestSmallQueueMultiMaster : public ClientStressTest { |
242 | | protected: |
243 | 1 | ExternalMiniClusterOptions default_opts() override { |
244 | 1 | ExternalMiniClusterOptions result; |
245 | 1 | result.num_masters = 3; |
246 | 1 | result.master_rpc_ports = { 0, 0, 0 }; |
247 | 1 | result.extra_master_flags = { "--master_svc_queue_length=5"s }; |
248 | 1 | result.num_tablet_servers = 0; |
249 | 1 | return result; |
250 | 1 | } |
251 | | }; |
252 | | |
253 | | } // namespace |
254 | | |
255 | 1 | TEST_F_EX(ClientStressTest, RetryLeaderResolution, ClientStressTestSmallQueueMultiMaster) { |
256 | 1 | DontVerifyClusterBeforeNextTearDown(); |
257 | | |
258 | 1 | RepeatGetLeaderMaster(cluster_.get()); |
259 | 1 | } |
260 | | |
261 | | // Override the base test to start a cluster with a low memory limit. |
262 | | class ClientStressTest_LowMemory : public ClientStressTest { |
263 | | protected: |
264 | 1 | ExternalMiniClusterOptions default_opts() override { |
265 | | // There's nothing scientific about this number; it must be low enough to |
266 | | // trigger memory pressure request rejection yet high enough for the |
267 | | // servers to make forward progress. |
268 | | // |
269 | | // Note that if this number is set too low, the test will fail in a CHECK in TestWorkload |
270 | | // after retries are exhausted when writing an entry. This happened e.g. when a substantial |
271 | | // upfront memory overhead was introduced by adding a large lock-free queue in Preparer. |
272 | 1 | const int kMemLimitBytes = RegularBuildVsSanitizers(64_MB, 2_MB); |
273 | 1 | ExternalMiniClusterOptions opts; |
274 | | |
275 | 1 | opts.extra_tserver_flags = { Substitute("--memory_limit_hard_bytes=$0", kMemLimitBytes), |
276 | 1 | "--memory_limit_soft_percentage=0"s }; |
277 | | |
278 | 1 | opts.num_tablet_servers = 3; |
279 | 1 | return opts; |
280 | 1 | } |
281 | | }; |
282 | | |
283 | | // Stress test where, due to absurdly low memory limits, many client requests |
284 | | // are rejected, forcing the client to retry repeatedly. |
285 | 1 | TEST_F(ClientStressTest_LowMemory, TestMemoryThrottling) { |
286 | | // Sanitized tests run much slower, so we don't want to wait for as many |
287 | | // rejections before declaring the test to be passed. |
288 | 1 | const int64_t kMinRejections = 15; |
289 | | |
290 | 1 | const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60); |
291 | | |
292 | 1 | TestWorkload work(cluster_.get()); |
293 | 1 | work.set_write_batch_size(RegularBuildVsSanitizers(25, 5)); |
294 | | |
295 | 1 | work.Setup(); |
296 | 1 | work.Start(); |
297 | | |
298 | | // Wait until we've rejected some number of requests. |
299 | 1 | MonoTime deadline = MonoTime::Now() + kMaxWaitTime; |
300 | 8 | while (true) { |
301 | 8 | int64_t total_num_rejections = 0; |
302 | | |
303 | | // It can take some time for the tablets (and their metric entities) to |
304 | | // appear on every server. Rather than explicitly wait for that above, |
305 | | // we'll just treat the lack of a metric as non-fatal. If the entity |
306 | | // or metric is truly missing, we'll eventually timeout and fail. |
307 | 32 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
308 | 24 | for (const auto* metric : { &METRIC_leader_memory_pressure_rejections, |
309 | 48 | &METRIC_follower_memory_pressure_rejections }) { |
310 | 48 | auto result = cluster_->tablet_server(i)->GetInt64Metric( |
311 | 48 | &METRIC_ENTITY_tablet, nullptr, metric, "value"); |
312 | 48 | if (result.ok()) { |
313 | 48 | total_num_rejections += *result; |
314 | 0 | } else { |
315 | 0 | ASSERT_TRUE(result.status().IsNotFound()) << result.status(); |
316 | 0 | } |
317 | 48 | } |
318 | 24 | } |
319 | 8 | if (total_num_rejections >= kMinRejections) { |
320 | 1 | break; |
321 | 7 | } else if (deadline.ComesBefore(MonoTime::Now())) { |
322 | 0 | FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired and only saw " |
323 | 0 | << total_num_rejections << " memory rejections"; |
324 | 0 | } |
325 | 7 | SleepFor(MonoDelta::FromMilliseconds(200)); |
326 | 7 | } |
327 | 1 | } |
328 | | |
329 | | namespace { |
330 | | |
331 | | class ClientStressTestSmallQueueMultiMasterWithTServers : public ClientStressTest { |
332 | | protected: |
333 | 1 | ExternalMiniClusterOptions default_opts() override { |
334 | 1 | ExternalMiniClusterOptions result; |
335 | 1 | result.num_masters = 3; |
336 | 1 | result.master_rpc_ports = { 0, 0, 0 }; |
337 | 1 | result.extra_master_flags = { |
338 | 1 | "--master_svc_queue_length=5"s, "--master_inject_latency_on_tablet_lookups_ms=50"s }; |
339 | 1 | result.num_tablet_servers = 3; |
340 | 1 | return result; |
341 | 1 | } |
342 | | }; |
343 | | |
344 | | } // namespace |
345 | | |
346 | | // Check behaviour of meta cache in case of server queue is full. |
347 | 1 | TEST_F_EX(ClientStressTest, MasterQueueFull, ClientStressTestSmallQueueMultiMasterWithTServers) { |
348 | 1 | TestWorkload workload(cluster_.get()); |
349 | 1 | workload.Setup(); |
350 | | |
351 | 1 | struct Item { |
352 | 1 | std::unique_ptr<client::YBClient> client; |
353 | 1 | std::unique_ptr<client::TableHandle> table; |
354 | 1 | client::YBSessionPtr session; |
355 | 1 | std::future<client::FlushStatus> future; |
356 | 1 | }; |
357 | | |
358 | 1 | std::vector<Item> items; |
359 | 1 | constexpr size_t kNumRequests = 40; |
360 | 1 | while (items.size() != kNumRequests) { |
361 | 1 | Item item; |
362 | 0 | item.client = ASSERT_RESULT(cluster_->CreateClient()); |
363 | 0 | item.table = std::make_unique<client::TableHandle>(); |
364 | 0 | ASSERT_OK(item.table->Open(TestWorkloadOptions::kDefaultTableName, item.client.get())); |
365 | 0 | item.session = std::make_shared<client::YBSession>(item.client.get()); |
366 | 0 | items.push_back(std::move(item)); |
367 | 0 | } |
368 | | |
369 | 0 | int32_t key = 0; |
370 | 0 | const std::string kStringValue("string value"); |
371 | 0 | for (auto& item : items) { |
372 | 0 | auto op = item.table->NewInsertOp(); |
373 | 0 | auto req = op->mutable_request(); |
374 | 0 | QLAddInt32HashValue(req, ++key); |
375 | 0 | item.table->AddInt32ColumnValue(req, item.table->schema().columns()[1].name(), -key); |
376 | 0 | item.table->AddStringColumnValue(req, item.table->schema().columns()[2].name(), kStringValue); |
377 | 0 | item.session->Apply(op); |
378 | 0 | item.future = item.session->FlushFuture(); |
379 | 0 | } |
380 | |
|
381 | 0 | for (auto& item : items) { |
382 | 0 | ASSERT_OK(item.future.get().status); |
383 | 0 | } |
384 | 0 | } |
385 | | |
386 | | namespace { |
387 | | |
388 | | // TODO: Add peak root mem tracker metric after https://github.com/yugabyte/yugabyte-db/issues/3442 |
389 | | // is implemented. Retrieve metric value using RPC instead of parsing HTML report. |
390 | 2 | Result<size_t> GetPeakRootConsumption(const ExternalTabletServer& ts) { |
391 | 2 | EasyCurl c; |
392 | 2 | faststring buf; |
393 | 2 | EXPECT_OK(c.FetchURL(Format("http://$0/mem-trackers?raw=1", ts.bound_http_hostport().ToString()), |
394 | 2 | &buf)); |
395 | 2 | static const std::regex re( |
396 | 2 | R"#(\s*<td>root</td><td>([0-9.]+\w)(\s+\([0-9.]+\w\))?</td>)#" |
397 | 2 | R"#(<td>([0-9.]+\w)</td><td>([0-9.]+\w)</td>\s*)#"); |
398 | 2 | const auto str = buf.ToString(); |
399 | 2 | std::smatch match; |
400 | 2 | if (std::regex_search(str, match, re)) { |
401 | 2 | const auto consumption_str = match.str(3); |
402 | 2 | int64_t consumption; |
403 | 2 | if (HumanReadableNumBytes::ToInt64(consumption_str, &consumption)) { |
404 | 2 | return consumption; |
405 | 0 | } else { |
406 | 0 | return STATUS_FORMAT( |
407 | 0 | InvalidArgument, |
408 | 0 | "Failed to parse memory consumption: $0", consumption_str); |
409 | 0 | } |
410 | 0 | } else { |
411 | 0 | return STATUS_FORMAT( |
412 | 0 | InvalidArgument, |
413 | 0 | "Failed to parse root mem tracker consumption from: $0", str); |
414 | 0 | } |
415 | 2 | } |
416 | | |
417 | | class ThrottleLogCounter : public ExternalDaemon::StringListener { |
418 | | public: |
419 | 1 | explicit ThrottleLogCounter(ExternalDaemon* daemon) : daemon_(daemon) { |
420 | 1 | daemon_->SetLogListener(this); |
421 | 1 | } |
422 | | |
423 | 1 | ~ThrottleLogCounter() { |
424 | 1 | daemon_->RemoveLogListener(this); |
425 | 1 | } |
426 | | |
427 | 39 | size_t rejected_call_messages() { return rejected_call_messages_; } |
428 | | |
429 | 39 | size_t ignored_call_messages() { return ignored_call_messages_; } |
430 | | |
431 | | private: |
432 | 834 | void Handle(const GStringPiece& s) override { |
433 | 834 | if (s.contains("Rejecting RPC call")) { |
434 | 1 | rejected_call_messages_.fetch_add(1); |
435 | 833 | } else if (s.contains("Ignoring RPC call")) { |
436 | 5 | ignored_call_messages_.fetch_add(1); |
437 | 5 | } |
438 | 834 | } |
439 | | |
440 | | ExternalDaemon* daemon_; |
441 | | std::atomic<size_t> rejected_call_messages_{0}; |
442 | | std::atomic<size_t> ignored_call_messages_{0}; |
443 | | }; |
444 | | |
445 | | class ClientStressTest_FollowerOom : public ClientStressTest { |
446 | | protected: |
447 | 1 | ExternalMiniClusterOptions default_opts() override { |
448 | 1 | ExternalMiniClusterOptions opts; |
449 | | |
450 | 1 | opts.extra_tserver_flags = { |
451 | 1 | Format("--memory_limit_hard_bytes=$0", kHardLimitBytes), |
452 | 1 | Format("--consensus_max_batch_size_bytes=$0", kConsensusMaxBatchSizeBytes), |
453 | | // Turn off exponential backoff and lagging follower threshold in order to hit soft memory |
454 | | // limit and check throttling. |
455 | 1 | "--enable_consensus_exponential_backoff=false", |
456 | | // The global log cache limit should only have to be set on the restarting tserver for |
457 | | // the PauseFollower test, but it does not seem to pass without the limit set on all |
458 | | // tservers. See GitHub issue #10689. |
459 | 1 | "--global_log_cache_size_limit_percentage=100", |
460 | 1 | "--consensus_lagging_follower_threshold=-1" |
461 | 1 | }; |
462 | | |
463 | 1 | opts.num_tablet_servers = 3; |
464 | 1 | return opts; |
465 | 1 | } |
466 | | |
467 | | const size_t kHardLimitBytes = 500_MB; |
468 | | const size_t kConsensusMaxBatchSizeBytes = 32_MB; |
469 | | }; |
470 | | |
471 | | } // namespace |
472 | | |
473 | | // Original scenario for reproducing the issue is the following: |
474 | | // 1. Kill follower, wait some time, then restart. |
475 | | // 2. That should lead to follower trying to catch up with leader, but due to big UpdateConsensus |
476 | | // RPC requests and slow parsing, requests will be consuming more and more memory |
477 | | // (https://github.com/yugabyte/yugabyte-db/issues/2563, |
478 | | // https://github.com/yugabyte/yugabyte-db/issues/2564) |
479 | | // 3. We expect in this scenario follower to hit soft memory limit and fix for #2563 should |
480 | | // start throttling inbound RPCs. |
481 | | // |
482 | | // In this test we simulate slow inbound RPC requests parsing using |
483 | | // TEST_yb_inbound_big_calls_parse_delay_ms flag. |
484 | 1 | TEST_F_EX(ClientStressTest, PauseFollower, ClientStressTest_FollowerOom) { |
485 | 1 | TestWorkload workload(cluster_.get()); |
486 | 1 | workload.set_write_timeout_millis(30000); |
487 | 1 | workload.set_num_tablets(1); |
488 | 1 | workload.set_num_write_threads(4); |
489 | 1 | workload.set_write_batch_size(500); |
490 | 1 | workload.set_payload_bytes(100); |
491 | 1 | workload.Setup(); |
492 | 1 | workload.Start(); |
493 | | |
494 | 5 | while (workload.rows_inserted() < 500) { |
495 | 4 | std::this_thread::sleep_for(10ms); |
496 | 4 | } |
497 | | |
498 | 1 | auto ts = cluster_->tablet_server(0); |
499 | | |
500 | 1 | LOG(INFO) << "Peak root mem tracker consumption: " |
501 | 1 | << HumanReadableNumBytes::ToString(ASSERT_RESULT(GetPeakRootConsumption(*ts))); |
502 | | |
503 | 1 | LOG(INFO) << "Killing ts-1"; |
504 | 1 | ts->Shutdown(); |
505 | 1 | std::this_thread::sleep_for(30s); |
506 | 1 | LOG(INFO) << "Restarting ts-1"; |
507 | 1 | ts->mutable_flags()->push_back("--TEST_yb_inbound_big_calls_parse_delay_ms=30000"); |
508 | 1 | ts->mutable_flags()->push_back("--binary_call_parser_reject_on_mem_tracker_hard_limit=true"); |
509 | 1 | ts->mutable_flags()->push_back(Format("--rpc_throttle_threshold_bytes=$0", 1_MB)); |
510 | | // Read buffer should be large enough to accept the large RPCs. |
511 | 1 | ts->mutable_flags()->push_back("--read_buffer_memory_limit=-10"); |
512 | 1 | ASSERT_OK(ts->Restart()); |
513 | | |
514 | 1 | ThrottleLogCounter log_counter(ts); |
515 | 39 | for (;;) { |
516 | 39 | const auto ignored_rejected_call_messages = |
517 | 39 | log_counter.ignored_call_messages() + log_counter.rejected_call_messages(); |
518 | 39 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Ignored/rejected call messages: " |
519 | 8 | << ignored_rejected_call_messages; |
520 | 39 | if (ignored_rejected_call_messages > 5) { |
521 | 1 | break; |
522 | 1 | } |
523 | 38 | std::this_thread::sleep_for(1s); |
524 | 38 | } |
525 | | |
526 | 1 | const auto peak_consumption = ASSERT_RESULT(GetPeakRootConsumption(*ts)); |
527 | 1 | LOG(INFO) << "Peak root mem tracker consumption: " |
528 | 1 | << HumanReadableNumBytes::ToString(peak_consumption); |
529 | | |
530 | 1 | ASSERT_GT(peak_consumption, kHardLimitBytes * FLAGS_memory_limit_soft_percentage / 100); |
531 | | |
532 | 1 | LOG(INFO) << "Stopping cluster"; |
533 | | |
534 | 1 | workload.StopAndJoin(); |
535 | | |
536 | 1 | cluster_->Shutdown(); |
537 | | |
538 | 1 | LOG(INFO) << "Done"; |
539 | 1 | } |
540 | | |
541 | | |
542 | | class RF1ClientStressTest : public ClientStressTest { |
543 | | public: |
544 | 1 | ExternalMiniClusterOptions default_opts() override { |
545 | 1 | ExternalMiniClusterOptions result; |
546 | 1 | result.num_tablet_servers = 1; |
547 | 1 | result.extra_master_flags = { "--replication_factor=1" }; |
548 | 1 | return result; |
549 | 1 | } |
550 | | }; |
551 | | |
552 | | // Test that config change works while running a workload. |
553 | 1 | TEST_F_EX(ClientStressTest, IncreaseReplicationFactorUnderLoad, RF1ClientStressTest) { |
554 | 1 | TestWorkload work(cluster_.get()); |
555 | 1 | work.set_num_write_threads(1); |
556 | 1 | work.set_num_tablets(6); |
557 | 1 | work.Setup(); |
558 | 1 | work.Start(); |
559 | | |
560 | | // Fill table with some records. |
561 | 1 | std::this_thread::sleep_for(1s); |
562 | | |
563 | 1 | ASSERT_OK(cluster_->AddTabletServer(/* start_cql_proxy= */ false, {"--time_source=skewed,-500"})); |
564 | | |
565 | 1 | master::ReplicationInfoPB replication_info; |
566 | 1 | replication_info.mutable_live_replicas()->set_num_replicas(2); |
567 | 1 | ASSERT_OK(work.client().SetReplicationInfo(replication_info)); |
568 | | |
569 | 1 | LOG(INFO) << "Replication factor changed"; |
570 | | |
571 | 1 | auto deadline = CoarseMonoClock::now() + 3s; |
572 | 31 | while (CoarseMonoClock::now() < deadline) { |
573 | 30 | ASSERT_NO_FATALS(cluster_->AssertNoCrashes()); |
574 | 30 | std::this_thread::sleep_for(100ms); |
575 | 30 | } |
576 | | |
577 | 1 | work.StopAndJoin(); |
578 | | |
579 | 1 | LOG(INFO) << "Written rows: " << work.rows_inserted(); |
580 | 1 | } |
581 | | |
582 | | } // namespace yb |