/Users/deen/code/yugabyte-db/src/yb/rpc/mt-rpc-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <string> |
34 | | |
35 | | #include <gtest/gtest.h> |
36 | | |
37 | | #include "yb/gutil/stl_util.h" |
38 | | #include "yb/gutil/strings/substitute.h" |
39 | | |
40 | | #include "yb/rpc/proxy.h" |
41 | | #include "yb/rpc/rpc-test-base.h" |
42 | | #include "yb/rpc/rpc_controller.h" |
43 | | #include "yb/rpc/yb_rpc.h" |
44 | | |
45 | | #include "yb/util/countdown_latch.h" |
46 | | #include "yb/util/metrics.h" |
47 | | #include "yb/util/net/net_util.h" |
48 | | #include "yb/util/status_log.h" |
49 | | #include "yb/util/test_macros.h" |
50 | | #include "yb/util/test_util.h" |
51 | | #include "yb/util/thread.h" |
52 | | |
53 | | METRIC_DECLARE_counter(rpc_connections_accepted); |
54 | | METRIC_DECLARE_counter(rpcs_queue_overflow); |
55 | | |
56 | | using std::string; |
57 | | using std::shared_ptr; |
58 | | using strings::Substitute; |
59 | | using namespace std::literals; |
60 | | |
61 | | namespace yb { |
62 | | namespace rpc { |
63 | | |
64 | | class MultiThreadedRpcTest : public RpcTestBase { |
65 | | public: |
66 | | // Make a single RPC call. |
67 | | void SingleCall(const HostPort& server_addr, const RemoteMethod* method, |
68 | 3 | Status* result, CountDownLatch* latch) { |
69 | 3 | LOG(INFO) << "Connecting to " << server_addr; |
70 | 3 | auto client_messenger = CreateAutoShutdownMessengerHolder("ClientSC"); |
71 | 3 | Proxy p(client_messenger.get(), server_addr); |
72 | 3 | *result = DoTestSyncCall(&p, method); |
73 | 3 | latch->CountDown(); |
74 | 3 | } |
75 | | |
76 | | // Make RPC calls until we see a failure. |
77 | 4 | void HammerServer(const HostPort& server_addr, const RemoteMethod* method, Status* last_result) { |
78 | 4 | auto client_messenger = CreateAutoShutdownMessengerHolder("ClientHS"); |
79 | 4 | HammerServerWithMessenger(server_addr, method, last_result, client_messenger.get()); |
80 | 4 | } |
81 | | |
82 | | void HammerServerWithMessenger( |
83 | | const HostPort& server_addr, const RemoteMethod* method, Status* last_result, |
84 | 5 | Messenger* messenger) { |
85 | 5 | LOG(INFO) << "Connecting to " << server_addr; |
86 | 5 | Proxy p(messenger, server_addr); |
87 | | |
88 | 5 | int i = 0; |
89 | 739 | while (true) { |
90 | 739 | i++; |
91 | 739 | Status s = DoTestSyncCall(&p, method); |
92 | 739 | if (!s.ok()) { |
93 | | // Return on first failure. |
94 | 5 | LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: " << s; |
95 | 5 | *last_result = s; |
96 | 5 | return; |
97 | 5 | } |
98 | 739 | } |
99 | 5 | } |
100 | | }; |
101 | | |
102 | 4 | static void AssertShutdown(yb::Thread* thread, const Status* status) { |
103 | 4 | ASSERT_OK(ThreadJoiner(thread).warn_every(500ms).Join()); |
104 | 4 | string msg = status->ToString(); |
105 | 8 | ASSERT_TRUE(msg.find("Service unavailable") != string::npos || |
106 | 8 | msg.find("Network error") != string::npos || |
107 | 8 | msg.find("Resource unavailable") != string::npos) |
108 | 8 | << "Status is actually: " << msg; |
109 | 4 | } |
110 | | |
111 | | // Test making several concurrent RPC calls while shutting down. |
112 | | // Simply verify that we don't hit any CHECK errors. |
113 | 1 | TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) { |
114 | | // Set up server. |
115 | 1 | HostPort server_addr; |
116 | 1 | StartTestServer(&server_addr); |
117 | | |
118 | 1 | const int kNumThreads = 4; |
119 | 1 | scoped_refptr<yb::Thread> threads[kNumThreads]; |
120 | 1 | Status statuses[kNumThreads]; |
121 | 5 | for (int i = 0; i < kNumThreads; i++) { |
122 | 4 | ASSERT_OK(yb::Thread::Create("test", strings::Substitute("t$0", i), |
123 | 4 | &MultiThreadedRpcTest::HammerServer, this, server_addr, |
124 | 4 | CalculatorServiceMethods::AddMethod(), &statuses[i], &threads[i])); |
125 | 4 | } |
126 | | |
127 | 1 | SleepFor(MonoDelta::FromMilliseconds(50)); |
128 | | |
129 | | // Shut down server. |
130 | 1 | server().Shutdown(); |
131 | | |
132 | 5 | for (int i = 0; i < kNumThreads; i++) { |
133 | 4 | AssertShutdown(threads[i].get(), &statuses[i]); |
134 | 4 | } |
135 | 1 | } |
136 | | |
137 | | // Test shutting down the client messenger exactly as a thread is about to start |
138 | | // a new connection. This is a regression test for KUDU-104. |
139 | 1 | TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) { |
140 | | // Set up server. |
141 | 1 | HostPort server_addr; |
142 | 1 | StartTestServer(&server_addr); |
143 | | |
144 | 1 | std::unique_ptr<Messenger> client_messenger(CreateMessenger("Client")); |
145 | | |
146 | 1 | scoped_refptr<yb::Thread> thread; |
147 | 1 | Status status; |
148 | 1 | ASSERT_OK(yb::Thread::Create("test", "test", |
149 | 1 | &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr, |
150 | 1 | CalculatorServiceMethods::AddMethod(), &status, client_messenger.get(), &thread)); |
151 | | |
152 | | // Shut down the messenger after a very brief sleep. This often will race so that the |
153 | | // call gets submitted to the messenger before shutdown, but the negotiation won't have |
154 | | // started yet. In a debug build this fails about half the time without the bug fix. |
155 | | // See KUDU-104. |
156 | 1 | SleepFor(MonoDelta::FromMicroseconds(10)); |
157 | 1 | client_messenger->Shutdown(); |
158 | | |
159 | 1 | ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join()); |
160 | 1 | ASSERT_TRUE(status.IsAborted() || |
161 | 1 | status.IsServiceUnavailable()); |
162 | 1 | string msg = status.ToString(); |
163 | 1 | SCOPED_TRACE(msg); |
164 | 2 | ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos || |
165 | 2 | msg.find("Shutdown connection") != string::npos || |
166 | 2 | msg.find("Unable to start connection negotiation thread") != string::npos || |
167 | 2 | msg.find("Messenger already stopped") != string::npos) |
168 | 2 | << "Status is actually: " << msg; |
169 | 1 | } |
170 | | |
171 | 3 | void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) { |
172 | 3 | string msg = status->ToString(); |
173 | 3 | if (msg.find("queue is full") != string::npos) { |
174 | 1 | ++(*backpressure); |
175 | 2 | } else if (msg.find("shutting down") != string::npos) { |
176 | 2 | ++(*shutdown); |
177 | 0 | } else if (msg.find("got EOF from remote") != string::npos) { |
178 | 0 | ++(*shutdown); |
179 | 0 | } else { |
180 | 0 | FAIL() << "Unexpected status message: " << msg; |
181 | 0 | } |
182 | 3 | } |
183 | | |
184 | | // Test that we get a Service Unavailable error when we max out the incoming RPC service queue. |
185 | 1 | TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) { |
186 | 1 | const size_t kMaxConcurrency = 2; |
187 | | |
188 | 1 | MessengerBuilder bld("messenger1"); |
189 | 1 | bld.set_num_reactors(kMaxConcurrency); |
190 | 1 | bld.set_metric_entity(metric_entity()); |
191 | 1 | std::unique_ptr<Messenger> server_messenger = ASSERT_RESULT(bld.Build()); |
192 | | |
193 | 1 | Endpoint server_addr; |
194 | 1 | ASSERT_OK(server_messenger->ListenAddress( |
195 | 1 | CreateConnectionContextFactory<YBInboundConnectionContext>(), |
196 | 1 | Endpoint(), &server_addr)); |
197 | | |
198 | 1 | std::unique_ptr<ServiceIf> service(new GenericCalculatorService()); |
199 | 1 | auto service_name = service->service_name(); |
200 | 1 | ThreadPool thread_pool("bogus_pool", kMaxConcurrency, 0UL); |
201 | 1 | scoped_refptr<ServicePool> service_pool(new ServicePool(kMaxConcurrency, |
202 | 1 | &thread_pool, |
203 | 1 | &server_messenger->scheduler(), |
204 | 1 | std::move(service), |
205 | 1 | metric_entity())); |
206 | 1 | ASSERT_OK(server_messenger->RegisterService(service_name, service_pool)); |
207 | 1 | ASSERT_OK(server_messenger->StartAcceptor()); |
208 | | |
209 | 1 | scoped_refptr<yb::Thread> threads[3]; |
210 | 1 | Status status[3]; |
211 | 1 | CountDownLatch latch(1); |
212 | 4 | for (int i = 0; i < 3; i++) { |
213 | 3 | ASSERT_OK(yb::Thread::Create("test", strings::Substitute("t$0", i), |
214 | 3 | &MultiThreadedRpcTest::SingleCall, this, HostPort::FromBoundEndpoint(server_addr), |
215 | 3 | CalculatorServiceMethods::AddMethod(), &status[i], &latch, &threads[i])); |
216 | 3 | } |
217 | | |
218 | | // One should immediately fail due to backpressure. The latch is only initialized |
219 | | // to wait for the first of three threads to finish. |
220 | 1 | latch.Wait(); |
221 | | |
222 | | // The rest would time out after 10 sec, but we help them along. |
223 | 1 | server_messenger->UnregisterAllServices(); |
224 | 1 | service_pool->Shutdown(); |
225 | 1 | thread_pool.Shutdown(); |
226 | 1 | server_messenger->Shutdown(); |
227 | | |
228 | 3 | for (const auto& thread : threads) { |
229 | 3 | ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join()); |
230 | 3 | } |
231 | | |
232 | | // Verify that one error was due to backpressure. |
233 | 1 | int errors_backpressure = 0; |
234 | 1 | int errors_shutdown = 0; |
235 | | |
236 | 3 | for (const auto& s : status) { |
237 | 3 | IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown); |
238 | 3 | } |
239 | | |
240 | 1 | ASSERT_EQ(1, errors_backpressure); |
241 | 1 | ASSERT_EQ(2, errors_shutdown); |
242 | | |
243 | | // Check that RPC queue overflow metric is 1 |
244 | 1 | Counter *rpcs_queue_overflow = |
245 | 1 | METRIC_rpcs_queue_overflow.Instantiate(metric_entity()).get(); |
246 | 1 | ASSERT_EQ(1, rpcs_queue_overflow->value()); |
247 | 1 | } |
248 | | |
249 | 8 | static void HammerServerWithTCPConns(const Endpoint& addr) { |
250 | 14 | while (true) { |
251 | 13 | Socket socket; |
252 | 13 | CHECK_OK(socket.Init(0)); |
253 | 13 | Status s; |
254 | 13 | LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") { |
255 | 13 | s = socket.Connect(addr); |
256 | 13 | } |
257 | 13 | if (!s.ok()) { |
258 | 0 | CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString(); |
259 | 7 | return; |
260 | 7 | } |
261 | 6 | CHECK_OK(socket.Close()); |
262 | 6 | } |
263 | 8 | } |
264 | | |
265 | | // Regression test for KUDU-128. |
266 | | // Test that shuts down the server while new TCP connections are incoming. |
267 | 1 | TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) { |
268 | | // Set up server. |
269 | 1 | Endpoint server_addr; |
270 | 1 | StartTestServer(&server_addr); |
271 | | |
272 | | // Start a number of threads which just hammer the server with TCP connections. |
273 | 1 | std::vector<scoped_refptr<yb::Thread>> threads; |
274 | 9 | for (int i = 0; i < 8; i++) { |
275 | 8 | scoped_refptr<yb::Thread> new_thread; |
276 | 8 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("t$0", i), |
277 | 8 | &HammerServerWithTCPConns, server_addr, &new_thread)); |
278 | 8 | threads.push_back(new_thread); |
279 | 8 | } |
280 | | |
281 | | // Sleep until the server has started to actually accept some connections from the |
282 | | // test threads. |
283 | 1 | scoped_refptr<Counter> conns_accepted = |
284 | 1 | METRIC_rpc_connections_accepted.Instantiate(metric_entity()); |
285 | 1 | while (conns_accepted->value() == 0) { |
286 | 0 | SleepFor(MonoDelta::FromMicroseconds(100)); |
287 | 0 | } |
288 | | |
289 | | // Shutdown while there are still new connections appearing. |
290 | 1 | server().Shutdown(); |
291 | | |
292 | 8 | for (scoped_refptr<yb::Thread>& t : threads) { |
293 | 8 | ASSERT_OK(ThreadJoiner(t.get()).warn_every(500ms).Join()); |
294 | 8 | } |
295 | 1 | } |
296 | | |
297 | 1 | TEST_F(MultiThreadedRpcTest, MemoryLimit) { |
298 | 1 | constexpr size_t kMemoryLimit = 1; |
299 | 1 | auto read_buffer_tracker = MemTracker::FindOrCreateTracker(kMemoryLimit, "Read Buffer"); |
300 | | |
301 | | // Set up server. |
302 | 1 | HostPort server_addr; |
303 | 1 | StartTestServer(&server_addr); |
304 | | |
305 | 1 | LOG(INFO) << "Server " << server_addr; |
306 | | |
307 | 1 | std::atomic<bool> stop(false); |
308 | 1 | MessengerOptions options = kDefaultClientMessengerOptions; |
309 | 1 | options.n_reactors = 1; |
310 | 1 | options.num_connections_to_server = 1; |
311 | 1 | auto messenger_for_big = CreateAutoShutdownMessengerHolder("Client for big", options); |
312 | 1 | auto messenger_for_small = CreateAutoShutdownMessengerHolder("Client for small", options); |
313 | 1 | Proxy proxy_for_big(messenger_for_big.get(), server_addr); |
314 | 1 | Proxy proxy_for_small(messenger_for_small.get(), server_addr); |
315 | | |
316 | 1 | std::vector<std::thread> threads; |
317 | 11 | while (threads.size() != 10) { |
318 | 10 | bool big_call = threads.size() == 0; |
319 | 9 | auto proxy = big_call ? &proxy_for_big : &proxy_for_small; |
320 | 10 | threads.emplace_back([proxy, server_addr, &stop, big_call] { |
321 | 10 | rpc_test::EchoRequestPB req; |
322 | 9 | req.set_data(std::string(big_call ? 5_MB : 5_KB, 'X')); |
323 | 224k | while (!stop.load(std::memory_order_acquire)) { |
324 | 224k | rpc_test::EchoResponsePB resp; |
325 | 224k | RpcController controller; |
326 | 224k | controller.set_timeout(500ms); |
327 | 224k | auto status = proxy->SyncRequest( |
328 | 224k | CalculatorServiceMethods::EchoMethod(), /* method_metrics= */ nullptr, req, &resp, |
329 | 224k | &controller); |
330 | 224k | if (big_call) { |
331 | 20 | ASSERT_NOK(status); |
332 | 224k | } else { |
333 | 224k | ASSERT_OK(status); |
334 | 224k | } |
335 | 224k | } |
336 | 10 | }); |
337 | 10 | } |
338 | | |
339 | 1 | std::this_thread::sleep_for(10s); |
340 | | |
341 | 1 | stop.store(true, std::memory_order_release); |
342 | | |
343 | 10 | for (auto& thread : threads) { |
344 | 10 | thread.join(); |
345 | 10 | } |
346 | 1 | } |
347 | | |
348 | | } // namespace rpc |
349 | | } // namespace yb |
350 | | |