/Users/deen/code/yugabyte-db/src/yb/rpc/rpc-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/rpc-test-base.h" |
34 | | |
35 | | #include <condition_variable> |
36 | | #include <functional> |
37 | | #include <memory> |
38 | | #include <string> |
39 | | #include <thread> |
40 | | #include <unordered_map> |
41 | | |
42 | | #include <boost/ptr_container/ptr_vector.hpp> |
43 | | |
44 | | #include <gtest/gtest.h> |
45 | | |
46 | | #if defined(TCMALLOC_ENABLED) |
47 | | #include <gperftools/heap-profiler.h> |
48 | | #endif |
49 | | |
50 | | |
51 | | #include "yb/gutil/map-util.h" |
52 | | #include "yb/gutil/strings/human_readable.h" |
53 | | |
54 | | #include "yb/rpc/compressed_stream.h" |
55 | | #include "yb/rpc/proxy.h" |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | #include "yb/rpc/secure_stream.h" |
58 | | #include "yb/rpc/serialization.h" |
59 | | #include "yb/rpc/tcp_stream.h" |
60 | | #include "yb/rpc/yb_rpc.h" |
61 | | |
62 | | #include "yb/util/countdown_latch.h" |
63 | | #include "yb/util/env.h" |
64 | | #include "yb/util/format.h" |
65 | | #include "yb/util/logging_test_util.h" |
66 | | #include "yb/util/net/net_util.h" |
67 | | #include "yb/util/result.h" |
68 | | #include "yb/util/status_format.h" |
69 | | #include "yb/util/status_log.h" |
70 | | #include "yb/util/test_macros.h" |
71 | | #include "yb/util/test_util.h" |
72 | | #include "yb/util/tsan_util.h" |
73 | | #include "yb/util/thread.h" |
74 | | |
75 | | #include "yb/util/memory/memory_usage_test_util.h" |
76 | | |
77 | | METRIC_DECLARE_histogram(handler_latency_yb_rpc_test_CalculatorService_Sleep); |
78 | | METRIC_DECLARE_histogram(rpc_incoming_queue_time); |
79 | | METRIC_DECLARE_counter(tcp_bytes_sent); |
80 | | METRIC_DECLARE_counter(tcp_bytes_received); |
81 | | METRIC_DECLARE_counter(rpcs_timed_out_early_in_queue); |
82 | | |
83 | | DEFINE_int32(rpc_test_connection_keepalive_num_iterations, 1, |
84 | | "Number of iterations in TestRpc.TestConnectionKeepalive"); |
85 | | |
86 | | DECLARE_bool(TEST_pause_calculator_echo_request); |
87 | | DECLARE_bool(binary_call_parser_reject_on_mem_tracker_hard_limit); |
88 | | DECLARE_bool(enable_rpc_keepalive); |
89 | | DECLARE_int32(num_connections_to_server); |
90 | | DECLARE_int64(rpc_throttle_threshold_bytes); |
91 | | DECLARE_int32(stream_compression_algo); |
92 | | DECLARE_int64(memory_limit_hard_bytes); |
93 | | DECLARE_string(vmodule); |
94 | | DECLARE_uint64(rpc_connection_timeout_ms); |
95 | | DECLARE_uint64(rpc_read_buffer_size); |
96 | | |
97 | | using namespace std::chrono_literals; |
98 | | using std::string; |
99 | | using std::shared_ptr; |
100 | | using std::unordered_map; |
101 | | |
102 | | namespace yb { |
103 | | |
104 | | using rpc_test::CalculatorServiceProxy; |
105 | | |
106 | | namespace rpc { |
107 | | |
108 | | namespace { |
109 | | |
110 | | template <class MessengerFactory, class F> |
111 | | void RunTest(RpcTestBase* test, const TestServerOptions& options, |
112 | 35 | const MessengerFactory& messenger_factory, const F& f) { |
113 | 35 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( |
114 | 35 | messenger_factory("Client", kDefaultClientMessengerOptions)); |
115 | 35 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); |
116 | | |
117 | 35 | HostPort server_hostport; |
118 | 35 | test->StartTestServerWithGeneratedCode( |
119 | 35 | messenger_factory("TestServer", options.messenger_options), &server_hostport, |
120 | 35 | options); |
121 | | |
122 | 35 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); |
123 | 35 | f(&p); |
124 | 35 | } rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_7TestRpc12RunPlainTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_ Line | Count | Source | 112 | 1 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 1 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 1 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 1 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 1 | HostPort server_hostport; | 118 | 1 | test->StartTestServerWithGeneratedCode( | 119 | 1 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 1 | options); | 121 | | | 122 | 1 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 1 | f(&p); | 124 | 1 | } |
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_13TestRpcSecure13RunSecureTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_ Line | Count | Source | 112 | 6 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 6 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 6 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 6 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 6 | HostPort server_hostport; | 118 | 6 | test->StartTestServerWithGeneratedCode( | 119 | 6 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 6 | options); | 121 | | | 122 | 6 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 6 | f(&p); | 124 | 6 | } |
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_ Line | Count | Source | 112 | 18 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 18 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 18 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 18 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 18 | HostPort server_hostport; | 118 | 18 | test->StartTestServerWithGeneratedCode( | 119 | 18 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 18 | options); | 121 | | | 122 | 18 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 18 | f(&p); | 124 | 18 | } |
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S6_EEvPNS0_11RpcTestBaseESC_S9_RKT0_ Line | Count | Source | 112 | 3 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 3 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 3 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 3 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 3 | HostPort server_hostport; | 118 | 3 | test->StartTestServerWithGeneratedCode( | 119 | 3 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 3 | options); | 121 | | | 122 | 3 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 3 | f(&p); | 124 | 3 | } |
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_ Line | Count | Source | 112 | 6 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 6 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 6 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 6 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 6 | HostPort server_hostport; | 118 | 6 | test->StartTestServerWithGeneratedCode( | 119 | 6 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 6 | options); | 121 | | | 122 | 6 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 6 | f(&p); | 124 | 6 | } |
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S6_EEvPNS0_11RpcTestBaseESC_S9_RKT0_ Line | Count | Source | 112 | 1 | const MessengerFactory& messenger_factory, const F& f) { | 113 | 1 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( | 114 | 1 | messenger_factory("Client", kDefaultClientMessengerOptions)); | 115 | 1 | auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get()); | 116 | | | 117 | 1 | HostPort server_hostport; | 118 | 1 | test->StartTestServerWithGeneratedCode( | 119 | 1 | messenger_factory("TestServer", options.messenger_options), &server_hostport, | 120 | 1 | options); | 121 | | | 122 | 1 | CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol()); | 123 | 1 | f(&p); | 124 | 1 | } |
|
125 | | |
126 | | } // namespace |
127 | | |
128 | | class TestRpc : public RpcTestBase { |
129 | | public: |
130 | 4 | void CheckServerMessengerConnections(size_t num_connections) { |
131 | 4 | ReactorMetrics metrics; |
132 | 4 | ASSERT_OK(server_messenger()->TEST_GetReactorMetrics(0, &metrics)); |
133 | 8 | ASSERT_EQ(metrics.num_server_connections, num_connections) |
134 | 8 | << "Server should have " << num_connections << " server connection(s)"; |
135 | 8 | ASSERT_EQ(metrics.num_client_connections, 0) << "Server should have 0 client connections"; |
136 | 4 | } |
137 | | |
138 | 4 | void CheckClientMessengerConnections(Messenger* messenger, size_t num_connections) { |
139 | 4 | ReactorMetrics metrics; |
140 | 4 | ASSERT_OK(messenger->TEST_GetReactorMetrics(0, &metrics)); |
141 | 8 | ASSERT_EQ(metrics.num_server_connections, 0) << "Client should have 0 server connections"; |
142 | 8 | ASSERT_EQ(metrics.num_client_connections, num_connections) |
143 | 8 | << "Client should have " << num_connections << " client connection(s)"; |
144 | 4 | } |
145 | | |
146 | | template <class F> |
147 | 1 | void RunPlainTest(const F& f, const TestServerOptions& server_options = TestServerOptions()) { |
148 | 2 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { |
149 | 2 | return CreateMessenger(name, options); |
150 | 2 | }, f); |
151 | 1 | } |
152 | | }; |
153 | | |
154 | | namespace { |
155 | | |
156 | | // Used only to test parsing. |
157 | | const uint16_t kDefaultPort = 80; |
158 | | |
159 | 8 | void CheckParseEndpoint(const std::string& input, std::string expected = std::string()) { |
160 | 8 | if (expected.empty()) { |
161 | 2 | expected = input; |
162 | 2 | } |
163 | 8 | auto endpoint = ParseEndpoint(input, kDefaultPort); |
164 | 16 | ASSERT_TRUE(endpoint.ok()) << "input: " << input << ", status: " << endpoint.status().ToString(); |
165 | 8 | ASSERT_EQ(expected, AsString(*endpoint)); |
166 | 8 | } |
167 | | |
168 | | } // namespace |
169 | | |
170 | 1 | TEST_F(TestRpc, Endpoint) { |
171 | 1 | Endpoint addr1, addr2; |
172 | 1 | addr1.port(1000); |
173 | 1 | addr2.port(2000); |
174 | 1 | ASSERT_TRUE(addr1 < addr2); |
175 | 1 | ASSERT_FALSE(addr2 < addr1); |
176 | 1 | ASSERT_EQ(1000, addr1.port()); |
177 | 1 | ASSERT_EQ(2000, addr2.port()); |
178 | 1 | ASSERT_EQ(string("0.0.0.0:1000"), AsString(addr1)); |
179 | 1 | ASSERT_EQ(string("0.0.0.0:2000"), AsString(addr2)); |
180 | 1 | Endpoint addr3(addr1); |
181 | 1 | ASSERT_EQ(string("0.0.0.0:1000"), AsString(addr3)); |
182 | | |
183 | 1 | CheckParseEndpoint("127.0.0.1", "127.0.0.1:80"); |
184 | 1 | CheckParseEndpoint("192.168.0.1:123"); |
185 | 1 | CheckParseEndpoint("[10.8.0.137]", "10.8.0.137:80"); |
186 | 1 | CheckParseEndpoint("[10.8.0.137]:123", "10.8.0.137:123"); |
187 | | |
188 | 1 | CheckParseEndpoint("fe80::1", "[fe80::1]:80"); |
189 | 1 | CheckParseEndpoint("[fe80::1]", "[fe80::1]:80"); |
190 | 1 | CheckParseEndpoint("fe80::1:123", "[fe80::1:123]:80"); |
191 | 1 | CheckParseEndpoint("[fe80::1]:123"); |
192 | | |
193 | 1 | ASSERT_NOK(ParseEndpoint("[127.0.0.1]:", kDefaultPort)); |
194 | 1 | ASSERT_NOK(ParseEndpoint("[127.0.0.1:123", kDefaultPort)); |
195 | 1 | ASSERT_NOK(ParseEndpoint("fe80::1:12345", kDefaultPort)); |
196 | 1 | } |
197 | | |
198 | 1 | TEST_F(TestRpc, TestMessengerCreateDestroy) { |
199 | 1 | std::unique_ptr<Messenger> messenger = CreateMessenger("TestCreateDestroy"); |
200 | 1 | LOG(INFO) << "started messenger " << messenger->name(); |
201 | 1 | messenger->Shutdown(); |
202 | 1 | } |
203 | | |
204 | | // Test starting and stopping a messenger. This is a regression |
205 | | // test for a segfault seen in early versions of the RPC code, |
206 | | // in which shutting down the acceptor would trigger an assert, |
207 | | // making our tests flaky. |
208 | 1 | TEST_F(TestRpc, TestAcceptorPoolStartStop) { |
209 | 1 | int n_iters = AllowSlowTests() ? 100 : 5; |
210 | 6 | for (int i = 0; i < n_iters; i++) { |
211 | 5 | std::unique_ptr<Messenger> messenger = CreateMessenger("TestAcceptorPoolStartStop"); |
212 | 5 | Endpoint bound_endpoint; |
213 | 5 | ASSERT_OK(messenger->ListenAddress( |
214 | 5 | CreateConnectionContextFactory<YBInboundConnectionContext>(), |
215 | 5 | Endpoint(), &bound_endpoint)); |
216 | 5 | ASSERT_OK(messenger->StartAcceptor()); |
217 | 5 | ASSERT_NE(0, bound_endpoint.port()); |
218 | 5 | messenger->Shutdown(); |
219 | 5 | } |
220 | 1 | } |
221 | | |
222 | | // Test making successful RPC calls. |
223 | 1 | TEST_F(TestRpc, TestCall) { |
224 | | // Set up server. |
225 | 1 | HostPort server_addr; |
226 | 1 | StartTestServer(&server_addr); |
227 | | |
228 | | // Set up client. |
229 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
230 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
231 | 1 | Proxy p(client_messenger.get(), server_addr); |
232 | | |
233 | 11 | for (int i = 0; i < 10; i++) { |
234 | 10 | ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod())); |
235 | 10 | } |
236 | 1 | } |
237 | | |
238 | 1 | TEST_F(TestRpc, BigTimeout) { |
239 | | // Set up server. |
240 | 1 | TestServerOptions options; |
241 | 1 | options.messenger_options.keep_alive_timeout = 60s; |
242 | 1 | HostPort server_addr; |
243 | 1 | StartTestServer(&server_addr, options); |
244 | | |
245 | | // Set up client. |
246 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
247 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
248 | 1 | Proxy p(client_messenger.get(), server_addr); |
249 | | |
250 | 11 | for (int i = 0; i < 10; i++) { |
251 | 10 | ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod())); |
252 | 10 | } |
253 | | |
254 | 1 | LOG(INFO) << "Calls OK"; |
255 | | |
256 | 1 | auto call_consumption = MemTracker::GetRootTracker()->FindChild("Call")->consumption(); |
257 | 1 | ASSERT_EQ(call_consumption, 0); |
258 | 1 | } |
259 | | |
260 | | // Test that connecting to an invalid server properly throws an error. |
261 | 1 | TEST_F(TestRpc, TestCallToBadServer) { |
262 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
263 | 1 | HostPort addr; |
264 | 1 | Proxy p(client_messenger.get(), addr); |
265 | | |
266 | | // Loop a few calls to make sure that we properly set up and tear down |
267 | | // the connections. |
268 | 6 | for (int i = 0; i < 5; i++) { |
269 | 5 | Status s = DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()); |
270 | 5 | LOG(INFO) << "Status: " << s.ToString(); |
271 | 10 | ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); |
272 | 5 | } |
273 | 1 | } |
274 | | |
275 | | // Test that RPC calls can be failed with an error status on the server. |
276 | 1 | TEST_F(TestRpc, TestInvalidMethodCall) { |
277 | | // Set up server. |
278 | 1 | HostPort server_addr; |
279 | 1 | StartTestServer(&server_addr); |
280 | | |
281 | | // Set up client. |
282 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
283 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
284 | 1 | Proxy p(client_messenger.get(), server_addr); |
285 | | |
286 | | // Call the method which fails. |
287 | 1 | static RemoteMethod method( |
288 | 1 | rpc_test::CalculatorServiceIf::static_service_name(), "ThisMethodDoesNotExist"); |
289 | 1 | Status s = DoTestSyncCall(&p, &method); |
290 | 2 | ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); |
291 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "invalid method name"); |
292 | 1 | } |
293 | | |
294 | | // Test that the error message returned when connecting to the wrong service |
295 | | // is reasonable. |
296 | 1 | TEST_F(TestRpc, TestWrongService) { |
297 | | // Set up server. |
298 | 1 | HostPort server_addr; |
299 | 1 | StartTestServer(&server_addr); |
300 | | |
301 | | // Set up client with the wrong service name. |
302 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
303 | 1 | Proxy p(client_messenger.get(), server_addr); |
304 | | |
305 | | // Call the method which fails. |
306 | 1 | static RemoteMethod method("WrongServiceName", "ThisMethodDoesNotExist"); |
307 | 1 | Status s = DoTestSyncCall(&p, &method); |
308 | 1 | auto message = s.ToString(); |
309 | 2 | ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << message; |
310 | | // Remote errors always contain file name and line number. |
311 | 1 | ASSERT_STR_CONTAINS(message, "Remote error ("); |
312 | 1 | ASSERT_STR_CONTAINS(message, "): Service unavailable ("); |
313 | 1 | ASSERT_STR_CONTAINS(message, "): Service WrongServiceName not registered on TestServer"); |
314 | 1 | } |
315 | | |
316 | | namespace { |
317 | | |
318 | 1 | uint64_t GetOpenFileLimit() { |
319 | 1 | struct rlimit limit; |
320 | 1 | PCHECK(getrlimit(RLIMIT_NOFILE, &limit) == 0); |
321 | 1 | return limit.rlim_cur; |
322 | 1 | } |
323 | | |
324 | | } // anonymous namespace |
325 | | |
326 | | // Test that we can still make RPC connections even if many fds are in use. |
327 | | // This is a regression test for KUDU-650. |
328 | 1 | TEST_F(TestRpc, TestHighFDs) { |
329 | | // This test can only run if ulimit is set high. |
330 | 1 | const uint64_t kNumFakeFiles = 3500; |
331 | 1 | const uint64_t kMinUlimit = kNumFakeFiles + 100; |
332 | 1 | if (GetOpenFileLimit() < kMinUlimit) { |
333 | 1 | LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit; |
334 | 1 | return; |
335 | 1 | } |
336 | | |
337 | | // Open a bunch of fds just to increase our fd count. |
338 | 0 | std::vector<std::unique_ptr<RandomAccessFile>> fake_files; |
339 | 0 | for (uint64_t i = 0; i < kNumFakeFiles; i++) { |
340 | 0 | std::unique_ptr<RandomAccessFile> f; |
341 | 0 | CHECK_OK(Env::Default()->NewRandomAccessFile("/dev/zero", &f)); |
342 | 0 | fake_files.emplace_back(f.release()); |
343 | 0 | } |
344 | | |
345 | | // Set up server and client, and verify we can make a successful call. |
346 | 0 | HostPort server_addr; |
347 | 0 | StartTestServer(&server_addr); |
348 | 0 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
349 | 0 | Proxy p(client_messenger.get(), server_addr); |
350 | 0 | ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod())); |
351 | 0 | } |
352 | | |
353 | | // Test that connections are kept alive by ScanIdleConnections between calls. |
354 | 1 | TEST_F(TestRpc, TestConnectionKeepalive) { |
355 | 1 | google::FlagSaver saver; |
356 | | |
357 | | // Only run one reactor per messenger, so we can grab the metrics from that |
358 | | // one without having to check all. |
359 | 1 | const auto kGcTimeout = 300ms; |
360 | 1 | MessengerOptions messenger_options = { 1, kGcTimeout }; |
361 | 1 | TestServerOptions options; |
362 | 1 | options.messenger_options = messenger_options; |
363 | | // RPC heartbeats shouldn't prevent idle connections from being GCed. To test that we set |
364 | | // rpc_connection_timeout less than kGcTimeout. |
365 | 1 | FLAGS_rpc_connection_timeout_ms = MonoDelta(kGcTimeout).ToMilliseconds() / 2; |
366 | 1 | FLAGS_enable_rpc_keepalive = true; |
367 | 1 | if (!FLAGS_vmodule.empty()) { |
368 | 0 | FLAGS_vmodule = FLAGS_vmodule + ",yb_rpc=5"; |
369 | 1 | } else { |
370 | 1 | FLAGS_vmodule = "yb_rpc=5"; |
371 | 1 | } |
372 | | // Set up server. |
373 | 1 | HostPort server_addr; |
374 | 1 | StartTestServer(&server_addr, options); |
375 | 2 | for (int i = 0; i < FLAGS_rpc_test_connection_keepalive_num_iterations; ++i) { |
376 | | // Set up client. |
377 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
378 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client", messenger_options); |
379 | 1 | Proxy p(client_messenger.get(), server_addr); |
380 | | |
381 | 1 | ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod())); |
382 | 1 | ASSERT_NO_FATALS(CheckServerMessengerConnections(1)); |
383 | 1 | ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1)); |
384 | 1 | LOG(INFO) << "Connections are up"; |
385 | | |
386 | 1 | SleepFor(kGcTimeout / 2); |
387 | | |
388 | 1 | LOG(INFO) << "Checking connections"; |
389 | 1 | ASSERT_NO_FATALS(CheckServerMessengerConnections(1)); |
390 | 1 | ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1)); |
391 | | |
392 | 1 | SleepFor(kGcTimeout * 2); |
393 | | |
394 | | // After sleeping, the keepalive timer should have closed both sides of the connection. |
395 | 1 | ASSERT_NO_FATALS(CheckServerMessengerConnections(0)); |
396 | 1 | ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 0)); |
397 | 1 | } |
398 | 1 | } |
399 | | |
400 | | // Test that a call which takes longer than the keepalive time |
401 | | // succeeds -- i.e that we don't consider a connection to be "idle" on the |
402 | | // server if there is a call outstanding on it. |
403 | 1 | TEST_F(TestRpc, TestCallLongerThanKeepalive) { |
404 | 1 | TestServerOptions options; |
405 | | // set very short keepalive |
406 | 1 | options.messenger_options.keep_alive_timeout = 100ms; |
407 | | |
408 | | // Set up server. |
409 | 1 | HostPort server_addr; |
410 | 1 | StartTestServer(&server_addr, options); |
411 | | |
412 | | // Set up client. |
413 | 1 | auto client_options = kDefaultClientMessengerOptions; |
414 | 1 | client_options.keep_alive_timeout = 100ms; |
415 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client", client_options); |
416 | 1 | Proxy p(client_messenger.get(), server_addr); |
417 | | |
418 | | // Make a call which sleeps longer than the keepalive. |
419 | 1 | RpcController controller; |
420 | 1 | rpc_test::SleepRequestPB req; |
421 | 1 | req.set_sleep_micros(200 * 1000); |
422 | 1 | req.set_deferred(true); |
423 | 1 | rpc_test::SleepResponsePB resp; |
424 | 1 | ASSERT_OK(p.SyncRequest( |
425 | 1 | CalculatorServiceMethods::SleepMethod(), nullptr, req, &resp, &controller)); |
426 | 1 | } |
427 | | |
428 | | // Test that connections are kept alive by heartbeats between calls. |
429 | 1 | TEST_F(TestRpc, TestConnectionHeartbeating) { |
430 | 1 | google::FlagSaver saver; |
431 | | |
432 | 1 | const auto kTestTimeout = 300ms; |
433 | | |
434 | | // Only run one reactor per messenger, so we can grab the metrics from that |
435 | | // one without having to check all. Set ScanIdleConnections keep alive to huge value in order |
436 | | // to not affect heartbeats testing. |
437 | 1 | MessengerOptions messenger_options = { 1, kTestTimeout * 100 }; |
438 | 1 | TestServerOptions options; |
439 | 1 | options.messenger_options = messenger_options; |
440 | 1 | FLAGS_num_connections_to_server = 1; |
441 | 1 | FLAGS_rpc_connection_timeout_ms = MonoDelta(kTestTimeout).ToMilliseconds(); |
442 | | |
443 | | // Set up server. |
444 | 1 | HostPort server_addr; |
445 | 1 | StartTestServer(&server_addr, options); |
446 | | |
447 | 2 | for (int i = 0; i < FLAGS_rpc_test_connection_keepalive_num_iterations; ++i) { |
448 | | // Set up client. |
449 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
450 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client", messenger_options); |
451 | 1 | Proxy p(client_messenger.get(), server_addr); |
452 | | |
453 | 1 | ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod())); |
454 | | |
455 | 1 | SleepFor(kTestTimeout * 3); |
456 | | // Both client and server connections should survive when there is no application traffic. |
457 | 1 | ASSERT_NO_FATALS(CheckServerMessengerConnections(1)); |
458 | 1 | ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1)); |
459 | 1 | } |
460 | 1 | } |
461 | | |
462 | | // Test that the RpcSidecar transfers the expected messages. |
463 | 1 | TEST_F(TestRpc, TestRpcSidecar) { |
464 | | // Set up server. |
465 | 1 | HostPort server_addr; |
466 | 1 | StartTestServer(&server_addr); |
467 | | |
468 | | // Set up client. |
469 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
470 | 1 | Proxy p(client_messenger.get(), server_addr); |
471 | | |
472 | | // Test some small sidecars |
473 | 1 | DoTestSidecar(&p, {123, 456}); |
474 | | |
475 | | // Test some larger sidecars to verify that we properly handle the case where |
476 | | // we can't write the whole response to the socket in a single call. |
477 | 1 | DoTestSidecar(&p, {3_MB, 2_MB, 240_MB}); |
478 | | |
479 | 1 | std::vector<size_t> sizes(20); |
480 | 1 | std::fill(sizes.begin(), sizes.end(), 123); |
481 | 1 | DoTestSidecar(&p, sizes); |
482 | 1 | } |
483 | | |
484 | | // Test that timeouts are properly handled. |
485 | 1 | TEST_F(TestRpc, TestCallTimeout) { |
486 | 1 | HostPort server_addr; |
487 | 1 | StartTestServer(&server_addr); |
488 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
489 | 1 | Proxy p(client_messenger.get(), server_addr); |
490 | | |
491 | 1 | uint64_t delay_ns = 1; |
492 | | |
493 | | // Test a very short timeout - we expect this will time out while the |
494 | | // call is still trying to connect, or in the send queue. This was triggering ASAN failures |
495 | | // before. |
496 | | |
497 | 28 | while (delay_ns < 100ul * 1000 * 1000) { |
498 | 27 | ASSERT_NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromNanoseconds(delay_ns))); |
499 | 27 | delay_ns *= 2; |
500 | 27 | } |
501 | 1 | } |
502 | | |
503 | 1 | static void AcceptAndReadForever(Socket* listen_sock) { |
504 | | // Accept the TCP connection. |
505 | 1 | Socket server_sock; |
506 | 1 | Endpoint remote; |
507 | 1 | CHECK_OK(listen_sock->Accept(&server_sock, &remote, 0)); |
508 | | |
509 | 1 | MonoTime deadline = MonoTime::Now(); |
510 | 1 | deadline.AddDelta(MonoDelta::FromSeconds(10)); |
511 | | |
512 | 1 | uint8_t buf[1024]; |
513 | 1 | while (server_sock.BlockingRecv(buf, sizeof(buf), deadline).ok()) { |
514 | 0 | } |
515 | 1 | } |
516 | | |
517 | | // Starts a fake listening socket which never actually negotiates. |
518 | | // Ensures that the client gets a reasonable status code in this case. |
519 | 1 | TEST_F(TestRpc, TestNegotiationTimeout) { |
520 | | // Set up a simple socket server which accepts a connection. |
521 | 1 | HostPort server_addr; |
522 | 1 | Socket listen_sock; |
523 | 1 | ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); |
524 | | |
525 | | // Create another thread to accept the connection on the fake server. |
526 | 1 | scoped_refptr<Thread> acceptor_thread; |
527 | 1 | ASSERT_OK(Thread::Create("test", "acceptor", |
528 | 1 | AcceptAndReadForever, &listen_sock, |
529 | 1 | &acceptor_thread)); |
530 | | |
531 | | // Set up client. |
532 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
533 | 1 | Proxy p(client_messenger.get(), server_addr); |
534 | | |
535 | 1 | ASSERT_NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(100))); |
536 | | |
537 | 1 | acceptor_thread->Join(); |
538 | 1 | } |
539 | | |
540 | | // Test that client calls get failed properly when the server they're connected to |
541 | | // shuts down. |
542 | 1 | TEST_F(TestRpc, TestServerShutsDown) { |
543 | | // Set up a simple socket server which accepts a connection. |
544 | 1 | HostPort server_addr; |
545 | 1 | Socket listen_sock; |
546 | 1 | ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); |
547 | | |
548 | | // Set up client. |
549 | 1 | LOG(INFO) << "Connecting to " << server_addr; |
550 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
551 | 1 | Proxy p(client_messenger.get(), server_addr); |
552 | | |
553 | | // Send a call. |
554 | 1 | rpc_test::AddRequestPB req; |
555 | 1 | unsigned int seed = SeedRandom(); |
556 | 1 | req.set_x(rand_r(&seed)); |
557 | 1 | req.set_y(rand_r(&seed)); |
558 | 1 | rpc_test::AddResponsePB resp; |
559 | | |
560 | 1 | boost::ptr_vector<RpcController> controllers; |
561 | | |
562 | | // We'll send several calls async, and ensure that they all |
563 | | // get the error status when the connection drops. |
564 | 1 | int n_calls = 5; |
565 | | |
566 | 1 | CountDownLatch latch(n_calls); |
567 | 6 | for (int i = 0; i < n_calls; i++) { |
568 | 5 | auto controller = new RpcController(); |
569 | 5 | controllers.push_back(controller); |
570 | 5 | p.AsyncRequest( |
571 | 5 | CalculatorServiceMethods::AddMethod(), /* method_metrics= */ nullptr, req, &resp, |
572 | 5 | controller, latch.CountDownCallback()); |
573 | 5 | } |
574 | | |
575 | | // Accept the TCP connection. |
576 | 1 | Socket server_sock; |
577 | 1 | Endpoint remote; |
578 | 1 | ASSERT_OK(listen_sock.Accept(&server_sock, &remote, 0)); |
579 | | |
580 | | // The call is still in progress at this point. |
581 | 1 | for (const RpcController &controller : controllers) { |
582 | 1 | ASSERT_FALSE(controller.finished()); |
583 | 1 | } |
584 | | |
585 | | // Shut down the socket. |
586 | 0 | ASSERT_OK(listen_sock.Close()); |
587 | 0 | ASSERT_OK(server_sock.Close()); |
588 | | |
589 | | // Wait for the call to be marked finished. |
590 | 0 | latch.Wait(); |
591 | | |
592 | | // Should get the appropriate error on the client for all calls; |
593 | 0 | for (const RpcController &controller : controllers) { |
594 | 0 | ASSERT_TRUE(controller.finished()); |
595 | 0 | Status s = controller.status(); |
596 | 0 | ASSERT_TRUE(s.IsNetworkError()) << |
597 | 0 | "Unexpected status: " << s.ToString(); |
598 | | |
599 | | // Any of these errors could happen, depending on whether we were |
600 | | // in the middle of sending a call while the connection died, or |
601 | | // if we were already waiting for responses. |
602 | | // |
603 | | // ECONNREFUSED is possible because the sending of the calls is async. |
604 | | // For example, the following interleaving: |
605 | | // - Enqueue 3 calls |
606 | | // - Reactor wakes up, creates connection, starts writing calls |
607 | | // - Enqueue 2 more calls |
608 | | // - Shut down socket |
609 | | // - Reactor wakes up, tries to write more of the first 3 calls, gets error |
610 | | // - Reactor shuts down connection |
611 | | // - Reactor sees the 2 remaining calls, makes a new connection |
612 | | // - Because the socket is shut down, gets ECONNREFUSED. |
613 | | // |
614 | | // EINVAL is possible if the controller socket had already disconnected by |
615 | | // the time it tries to set the SO_SNDTIMEO socket option as part of the |
616 | | // normal blocking SASL handshake. |
617 | | // |
618 | | // EPROTOTYPE sometimes happens on Mac OS X. |
619 | | // TODO: figure out why. |
620 | 0 | Errno err(s); |
621 | 0 | ASSERT_TRUE(err == EPIPE || |
622 | 0 | err == ECONNRESET || |
623 | 0 | err == ESHUTDOWN || |
624 | 0 | err == ECONNREFUSED || |
625 | 0 | err == EINVAL |
626 | 0 | #if defined(__APPLE__) |
627 | 0 | || err == EPROTOTYPE |
628 | 0 | #endif |
629 | 0 | ) |
630 | 0 | << "Unexpected status: " << s.ToString(); |
631 | 0 | } |
632 | 0 | } |
633 | | |
634 | | Result<MetricPtr> GetMetric( |
635 | 75 | const MetricEntityPtr& metric_entity, const MetricPrototype& prototype) { |
636 | 75 | const auto& metric_map = metric_entity->UnsafeMetricsMapForTests(); |
637 | | |
638 | 75 | auto it = metric_map.find(&prototype); |
639 | 75 | if (it == metric_map.end()) { |
640 | 0 | return STATUS_FORMAT(NotFound, "Metric $0 not found", prototype.name()); |
641 | 0 | } |
642 | | |
643 | 75 | return it->second; |
644 | 75 | } |
645 | | |
646 | | Result<HistogramPtr> GetHistogram( |
647 | 2 | const MetricEntityPtr& metric_entity, const HistogramPrototype& prototype) { |
648 | 2 | return down_cast<Histogram*>(VERIFY_RESULT(GetMetric(metric_entity, prototype)).get()); |
649 | 2 | } |
650 | | |
651 | | Result<CounterPtr> GetCounter( |
652 | 73 | const MetricEntityPtr& metric_entity, const CounterPrototype& prototype) { |
653 | 73 | return down_cast<Counter*>(VERIFY_RESULT(GetMetric(metric_entity, prototype)).get()); |
654 | 73 | } |
655 | | |
656 | | // Test handler latency metric. |
657 | 1 | TEST_F(TestRpc, TestRpcHandlerLatencyMetric) { |
658 | | |
659 | 1 | const uint64_t sleep_micros = 20 * 1000; |
660 | | |
661 | | // Set up server. |
662 | 1 | HostPort server_addr; |
663 | 1 | StartTestServerWithGeneratedCode(&server_addr); |
664 | | |
665 | | // Set up client. |
666 | | |
667 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
668 | 1 | Proxy p(client_messenger.get(), server_addr); |
669 | | |
670 | 1 | RpcController controller; |
671 | 1 | rpc_test::SleepRequestPB req; |
672 | 1 | req.set_sleep_micros(sleep_micros); |
673 | 1 | req.set_deferred(true); |
674 | 1 | rpc_test::SleepResponsePB resp; |
675 | 1 | ASSERT_OK(p.SyncRequest( |
676 | 1 | CalculatorServiceMethods::SleepMethod(), /* method_metrics= */ nullptr, req, &resp, |
677 | 1 | &controller)); |
678 | | |
679 | 1 | auto latency_histogram = ASSERT_RESULT(GetHistogram( |
680 | 1 | metric_entity(), METRIC_handler_latency_yb_rpc_test_CalculatorService_Sleep)); |
681 | | |
682 | 1 | LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests(); |
683 | 1 | LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests(); |
684 | 1 | LOG(INFO) << "Sleep() max lat: " << latency_histogram->MaxValueForTests(); |
685 | 1 | LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount(); |
686 | | |
687 | 1 | ASSERT_EQ(1, latency_histogram->TotalCount()); |
688 | 1 | ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros); |
689 | 1 | ASSERT_TRUE(latency_histogram->MinValueForTests() == latency_histogram->MaxValueForTests()); |
690 | | |
691 | | // TODO: Implement an incoming queue latency test. |
692 | | // For now we just assert that the metric exists. |
693 | 1 | ASSERT_OK(GetHistogram(metric_entity(), METRIC_rpc_incoming_queue_time)); |
694 | 1 | } |
695 | | |
696 | 1 | TEST_F(TestRpc, TestRpcCallbackDestroysMessenger) { |
697 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
698 | 1 | HostPort bad_addr; |
699 | 1 | CountDownLatch latch(1); |
700 | | |
701 | 1 | rpc_test::AddRequestPB req; |
702 | 1 | unsigned int seed = SeedRandom(); |
703 | 1 | req.set_x(rand_r(&seed)); |
704 | 1 | req.set_y(rand_r(&seed)); |
705 | 1 | rpc_test::AddResponsePB resp; |
706 | 1 | RpcController controller; |
707 | 1 | controller.set_timeout(MonoDelta::FromMilliseconds(1)); |
708 | 1 | { |
709 | 1 | Proxy p(client_messenger.get(), bad_addr); |
710 | 1 | static RemoteMethod method( |
711 | 1 | rpc_test::CalculatorServiceIf::static_service_name(), "my-fake-method"); |
712 | 1 | p.AsyncRequest(&method, /* method_metrics= */ nullptr, req, &resp, &controller, |
713 | 1 | latch.CountDownCallback()); |
714 | 1 | } |
715 | 1 | latch.Wait(); |
716 | 1 | } |
717 | | |
718 | | // Test that setting the client timeout / deadline gets propagated to RPC |
719 | | // services. |
720 | 1 | TEST_F(TestRpc, TestRpcContextClientDeadline) { |
721 | 1 | const uint64_t sleep_micros = 20 * 1000; |
722 | | |
723 | | // Set up server. |
724 | 1 | HostPort server_addr; |
725 | 1 | StartTestServerWithGeneratedCode(&server_addr); |
726 | | |
727 | | // Set up client. |
728 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
729 | 1 | Proxy p(client_messenger.get(), server_addr); |
730 | | |
731 | 1 | rpc_test::SleepRequestPB req; |
732 | 1 | req.set_sleep_micros(sleep_micros); |
733 | 1 | req.set_client_timeout_defined(true); |
734 | 1 | rpc_test::SleepResponsePB resp; |
735 | 1 | RpcController controller; |
736 | 1 | const auto* method = CalculatorServiceMethods::SleepMethod(); |
737 | 1 | Status s = p.SyncRequest(method, /* method_metrics= */ nullptr, req, &resp, &controller); |
738 | 1 | ASSERT_TRUE(s.IsRemoteError()); |
739 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "Missing required timeout"); |
740 | | |
741 | 1 | controller.Reset(); |
742 | 1 | controller.set_timeout(MonoDelta::FromMilliseconds(1000)); |
743 | 1 | ASSERT_OK(p.SyncRequest(method, /* method_metrics= */ nullptr, req, &resp, &controller)); |
744 | 1 | } |
745 | | |
746 | | // Send multiple long running calls to a single worker thread. All of them except the first one, |
747 | | // should time out early w/o starting processing them. |
748 | 1 | TEST_F(TestRpc, QueueTimeout) { |
749 | 1 | const MonoDelta kSleep = 1s; |
750 | 1 | constexpr auto kCalls = 10; |
751 | | |
752 | | // Set up server. |
753 | 1 | TestServerOptions options; |
754 | 1 | options.n_worker_threads = 1; |
755 | 1 | HostPort server_addr; |
756 | 1 | StartTestServerWithGeneratedCode(&server_addr, options); |
757 | | |
758 | | // Set up client. |
759 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
760 | 1 | Proxy p(client_messenger.get(), server_addr); |
761 | | |
762 | 1 | const auto* method = CalculatorServiceMethods::SleepMethod(); |
763 | | |
764 | 1 | CountDownLatch latch(kCalls); |
765 | | |
766 | 1 | struct Call { |
767 | 1 | rpc_test::SleepRequestPB req; |
768 | 1 | rpc_test::SleepResponsePB resp; |
769 | 1 | RpcController controller; |
770 | 1 | }; |
771 | 1 | std::vector<Call> calls(kCalls); |
772 | | |
773 | 11 | for (int i = 0; i != kCalls; ++i) { |
774 | 10 | auto& call = calls[i]; |
775 | 10 | auto& req = call.req; |
776 | 10 | req.set_sleep_micros(narrow_cast<uint32_t>(kSleep.ToMicroseconds())); |
777 | 10 | req.set_client_timeout_defined(true); |
778 | 10 | call.controller.set_timeout(kSleep / 2); |
779 | 10 | p.AsyncRequest(method, /* method_metrics= */ nullptr, req, &call.resp, &call.controller, |
780 | 10 | [&latch, &call] { |
781 | 10 | latch.CountDown(); |
782 | 20 | ASSERT_TRUE(call.controller.status().IsTimedOut()) << call.controller.status(); |
783 | 10 | }); |
784 | 10 | } |
785 | | |
786 | 1 | latch.Wait(); |
787 | | |
788 | | // Give some time for algorithm to work. |
789 | 1 | std::this_thread::sleep_for((kSleep / 2).ToSteadyDuration()); |
790 | | |
791 | 1 | auto counter = ASSERT_RESULT(GetCounter(metric_entity(), METRIC_rpcs_timed_out_early_in_queue)); |
792 | | |
793 | | // First call should succeed, other should timeout. |
794 | 1 | ASSERT_EQ(counter->value(), kCalls - 1); |
795 | 1 | } |
796 | | |
797 | | struct DisconnectShare { |
798 | | Proxy proxy; |
799 | | size_t left; |
800 | | std::mutex mutex; |
801 | | std::condition_variable cond; |
802 | | std::unordered_map<std::string, size_t> counts; |
803 | | }; |
804 | | |
805 | | class DisconnectTask { |
806 | | public: |
807 | 10.0k | explicit DisconnectTask(DisconnectShare* share) : share_(share) { |
808 | 10.0k | } |
809 | | |
810 | 10.0k | void Launch() { |
811 | 10.0k | controller_.set_timeout(MonoDelta::FromSeconds(1)); |
812 | 10.0k | share_->proxy.AsyncRequest(CalculatorServiceMethods::DisconnectMethod(), |
813 | 10.0k | /* method_metrics= */ nullptr, |
814 | 10.0k | rpc_test::DisconnectRequestPB(), |
815 | 10.0k | &response_, |
816 | 10.0k | &controller_, |
817 | 9.99k | [this]() { this->Done(); }); |
818 | 10.0k | } |
819 | | private: |
820 | 9.99k | void Done() { |
821 | 9.99k | bool notify; |
822 | 9.99k | { |
823 | 9.99k | std::lock_guard<std::mutex> lock(share_->mutex); |
824 | 9.99k | ++share_->counts[controller_.status().ToString()]; |
825 | 9.99k | notify = 0 == --share_->left; |
826 | 9.99k | } |
827 | 9.99k | if (notify) |
828 | 1 | share_->cond.notify_one(); |
829 | 9.99k | } |
830 | | |
831 | | DisconnectShare* share_; |
832 | | rpc_test::DisconnectResponsePB response_; |
833 | | RpcController controller_; |
834 | | }; |
835 | | |
836 | 1 | TEST_F(TestRpc, TestDisconnect) { |
837 | | // Set up server. |
838 | 1 | HostPort server_addr; |
839 | 1 | StartTestServerWithGeneratedCode(&server_addr); |
840 | | |
841 | | // Set up client. |
842 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client"); |
843 | | |
844 | 1 | constexpr size_t kRequests = 10000; |
845 | 1 | DisconnectShare share = { { client_messenger.get(), server_addr }, kRequests }; |
846 | | |
847 | 1 | std::vector<DisconnectTask> tasks; |
848 | 10.0k | for (size_t i = 0; i != kRequests; ++i) { |
849 | 10.0k | tasks.emplace_back(&share); |
850 | 10.0k | } |
851 | 10.0k | for (size_t i = 0; i != kRequests; ++i) { |
852 | 10.0k | tasks[i].Launch(); |
853 | 10.0k | } |
854 | 1 | { |
855 | 1 | std::unique_lock<std::mutex> lock(share.mutex); |
856 | 2 | share.cond.wait(lock, [&share]() { return !share.left; }); |
857 | 1 | } |
858 | | |
859 | 1 | size_t total = 0; |
860 | 3 | for (const auto& pair : share.counts) { |
861 | 3 | ASSERT_NE(pair.first, "OK"); |
862 | 3 | total += pair.second; |
863 | 3 | LOG(INFO) << pair.first << ": " << pair.second; |
864 | 3 | } |
865 | 1 | ASSERT_EQ(kRequests, total); |
866 | 1 | } |
867 | | |
868 | | // Check that we could perform DumpRunningRpcs while timed out calls are in queue. |
869 | | // |
870 | | // Start listenting socket, that will accept one connection and does not read it. |
871 | | // Send big RPC request, that does not fit into socket buffer, so it will be sending forever. |
872 | | // Wait until this call is timed out. |
873 | | // Check that we could invoke DumpRunningRpcs after it. |
874 | 1 | TEST_F(TestRpc, DumpTimedOutCall) { |
875 | | // Set up a simple socket server which accepts a connection. |
876 | 1 | HostPort server_addr; |
877 | 1 | Socket listen_sock; |
878 | 1 | ASSERT_OK(StartFakeServer(&listen_sock, &server_addr)); |
879 | | |
880 | 1 | std::atomic<bool> stop(false); |
881 | | |
882 | 1 | std::thread thread([&listen_sock, &stop] { |
883 | 1 | Socket socket; |
884 | 1 | Endpoint remote; |
885 | 1 | ASSERT_OK(listen_sock.Accept(&socket, &remote, 0)); |
886 | 2 | while (!stop.load(std::memory_order_acquire)) { |
887 | 1 | std::this_thread::sleep_for(100ms); |
888 | 1 | } |
889 | 1 | }); |
890 | | |
891 | 1 | auto messenger = CreateAutoShutdownMessengerHolder("Client"); |
892 | 1 | Proxy p(messenger.get(), server_addr); |
893 | | |
894 | 1 | { |
895 | 1 | rpc_test::EchoRequestPB req; |
896 | 1 | req.set_data(std::string(1_MB, 'X')); |
897 | 1 | rpc_test::EchoResponsePB resp; |
898 | 1 | std::aligned_storage<sizeof(RpcController), alignof(RpcController)>::type storage; |
899 | 1 | auto controller = new (&storage) RpcController; |
900 | 1 | controller->set_timeout(100ms); |
901 | 1 | auto status = p.SyncRequest( |
902 | 1 | CalculatorServiceMethods::EchoMethod(), /* method_metrics= */ nullptr, req, &resp, |
903 | 1 | controller); |
904 | 2 | ASSERT_TRUE(status.IsTimedOut()) << status; |
905 | 1 | controller->~RpcController(); |
906 | 1 | memset(&storage, 0xff, sizeof(storage)); |
907 | 1 | } |
908 | | |
909 | 1 | DumpRunningRpcsRequestPB dump_req; |
910 | 1 | DumpRunningRpcsResponsePB dump_resp; |
911 | 1 | ASSERT_OK(messenger->DumpRunningRpcs(dump_req, &dump_resp)); |
912 | | |
913 | 1 | stop.store(true, std::memory_order_release); |
914 | 1 | thread.join(); |
915 | 1 | } |
916 | | |
917 | | #if defined(TCMALLOC_ENABLED) |
918 | | |
919 | | namespace { |
920 | | |
921 | | const char kEmptyMsgLengthPrefix[kMsgLengthPrefixLength] = {0}; |
922 | | |
923 | | } |
924 | | |
925 | | // Test that even with small packets we track memory usage in sending queue with acceptable |
926 | | // accuracy. |
927 | | TEST_F(TestRpc, SendingQueueMemoryUsage) { |
928 | | std::deque<TcpStreamSendingData> sending; |
929 | | |
930 | | auto tracker = MemTracker::CreateTracker("t"); |
931 | | |
932 | | MemoryUsage current, latest_before_realloc; |
933 | | |
934 | | StartAllocationsTracking(); |
935 | | const auto heap_allocated_bytes_initial = MemTracker::GetTCMallocCurrentAllocatedBytes(); |
936 | | while (current.heap_allocated_bytes < 1_MB) { |
937 | | auto data_ptr = std::make_shared<StringOutboundData>( |
938 | | kEmptyMsgLengthPrefix, kMsgLengthPrefixLength, "Empty message"); |
939 | | sending.emplace_back(data_ptr, tracker); |
940 | | |
941 | | const size_t heap_allocated_bytes = |
942 | | MemTracker::GetTCMallocCurrentAllocatedBytes() - heap_allocated_bytes_initial; |
943 | | if (heap_allocated_bytes != current.heap_allocated_bytes) { |
944 | | latest_before_realloc = current; |
945 | | } |
946 | | current.heap_allocated_bytes = heap_allocated_bytes; |
947 | | current.heap_requested_bytes = GetHeapRequestedBytes(); |
948 | | current.tracked_consumption += sending.back().consumption.consumption(); |
949 | | // Account data_ptr as well. |
950 | | current.tracked_consumption += sizeof(data_ptr); |
951 | | current.entities_count = sending.size(); |
952 | | } |
953 | | StopAllocationsTracking(); |
954 | | |
955 | | LOG(INFO) << DumpMemoryUsage(latest_before_realloc); |
956 | | |
957 | | LOG(INFO) << "Tracked consumption: " << latest_before_realloc.tracked_consumption; |
958 | | LOG(INFO) << "Requested bytes: " << latest_before_realloc.heap_requested_bytes; |
959 | | LOG(INFO) << "Allocated bytes: " << latest_before_realloc.heap_allocated_bytes; |
960 | | |
961 | | ASSERT_LE(latest_before_realloc.tracked_consumption, latest_before_realloc.heap_requested_bytes); |
962 | | // We should track at least kDynamicMemoryUsageAccuracyLowLimit memory requested from heap. |
963 | | ASSERT_GT( |
964 | | latest_before_realloc.tracked_consumption, |
965 | | size_t(latest_before_realloc.heap_requested_bytes * kDynamicMemoryUsageAccuracyLowLimit)); |
966 | | |
967 | | ASSERT_LE(latest_before_realloc.heap_requested_bytes, latest_before_realloc.heap_allocated_bytes); |
968 | | // Expect TCMalloc to allocate more memory than requested due to roundup, but limited by |
969 | | // kMemoryAllocationAccuracyHighLimit. |
970 | | ASSERT_LE( |
971 | | latest_before_realloc.heap_allocated_bytes, |
972 | | latest_before_realloc.heap_requested_bytes * kMemoryAllocationAccuracyHighLimit); |
973 | | } |
974 | | |
975 | | #endif |
976 | | |
977 | | namespace { |
978 | | |
979 | | constexpr auto kMemoryLimitHardBytes = 100_MB; |
980 | | |
981 | 6 | TestServerOptions SetupServerForTestCantAllocateReadBuffer() { |
982 | 6 | FLAGS_binary_call_parser_reject_on_mem_tracker_hard_limit = true; |
983 | 6 | FLAGS_memory_limit_hard_bytes = kMemoryLimitHardBytes; |
984 | 6 | FLAGS_rpc_throttle_threshold_bytes = -1; |
985 | 6 | TestServerOptions options; |
986 | 6 | options.messenger_options.n_reactors = 1; |
987 | 6 | options.messenger_options.num_connections_to_server = 1; |
988 | 6 | options.messenger_options.keep_alive_timeout = 60s; |
989 | 6 | return options; |
990 | 6 | } |
991 | | |
992 | 6 | void TestCantAllocateReadBuffer(CalculatorServiceProxy* proxy) { |
993 | 6 | const MonoDelta kTimeToWaitForOom = 20s; |
994 | | // Reactor threads are blocked by pauses injected into calls processing by the test and also we |
995 | | // can have other random slow downs in this tests due to large requests processing in reactor |
996 | | // thread, so we turn off application level RPC keepalive mechanism to prevent connections from |
997 | | // being closed. |
998 | 6 | FLAGS_enable_rpc_keepalive = false; |
999 | | |
1000 | 6 | rpc_test::EchoRequestPB req; |
1001 | 6 | rpc_test::EchoResponsePB resp; |
1002 | | |
1003 | 6 | std::vector<std::unique_ptr<RpcController>> controllers; |
1004 | | |
1005 | 6 | auto n_calls = 50; |
1006 | | |
1007 | 6 | SetAtomicFlag(true, &FLAGS_TEST_pause_calculator_echo_request); |
1008 | 6 | StringWaiterLogSink log_waiter("Unable to allocate read buffer because of limit"); |
1009 | | |
1010 | 6 | LOG(INFO) << "Start sending calls..."; |
1011 | 6 | CountDownLatch latch(n_calls); |
1012 | 306 | for (int i = 0; i < n_calls; i++) { |
1013 | 300 | req.set_data(std::string(10_MB + i, 'X')); |
1014 | 300 | auto controller = std::make_unique<RpcController>(); |
1015 | | // No need to wait more than kTimeToWaitForOom + some delay, because we only need these |
1016 | | // calls to cause hitting hard memory limit. |
1017 | 300 | controller->set_timeout(kTimeToWaitForOom + 5s); |
1018 | 300 | proxy->EchoAsync(req, &resp, controller.get(), latch.CountDownCallback()); |
1019 | 300 | if ((i + 1) % 10 == 0) { |
1020 | 30 | LOG(INFO) << "Sent " << i + 1 << " calls."; |
1021 | 30 | LOG(INFO) << DumpMemoryUsage(); |
1022 | 30 | } |
1023 | 300 | controllers.push_back(std::move(controller)); |
1024 | 300 | } |
1025 | 6 | LOG(INFO) << n_calls << " calls sent."; |
1026 | | |
1027 | 6 | auto wait_status = log_waiter.WaitFor(kTimeToWaitForOom); |
1028 | | |
1029 | 6 | SetAtomicFlag(false, &FLAGS_TEST_pause_calculator_echo_request); |
1030 | 6 | LOG(INFO) << "Resumed call function."; |
1031 | | |
1032 | 6 | LOG(INFO) << "Waiting for the calls to be marked finished..."; |
1033 | 6 | latch.Wait(); |
1034 | | |
1035 | 6 | LOG(INFO) << n_calls << " calls marked as finished."; |
1036 | | |
1037 | 306 | for (size_t i = 0; i < controllers.size(); ++i) { |
1038 | 300 | auto& controller = controllers[i]; |
1039 | 300 | ASSERT_TRUE(controller->finished()); |
1040 | 300 | auto s = controller->status(); |
1041 | 600 | ASSERT_TRUE(s.ok() || s.IsTimedOut()) |
1042 | 600 | << "Unexpected error for call #" << i + 1 << ": " << s; |
1043 | 300 | } |
1044 | 6 | controllers.clear(); |
1045 | 6 | req.clear_data(); |
1046 | | |
1047 | 6 | ASSERT_OK(wait_status); |
1048 | | |
1049 | 6 | LOG(INFO) << DumpMemoryUsage(); |
1050 | 6 | { |
1051 | 6 | constexpr auto target_memory_consumption = kMemoryLimitHardBytes * 0.6; |
1052 | 6 | wait_status = LoggedWaitFor( |
1053 | 6 | [] { |
1054 | | #if defined(TCMALLOC_ENABLED) |
1055 | | // Don't rely on root mem tracker consumption, since it includes memory released by |
1056 | | // the application, but not yet released by TCMalloc. |
1057 | | const auto consumption = MemTracker::GetTCMallocCurrentAllocatedBytes(); |
1058 | | #else |
1059 | | // For TSAN/ASAN we don't have TCMalloc and rely on root mem tracker consumption. |
1060 | 6 | const auto consumption = MemTracker::GetRootTracker()->consumption(); |
1061 | 6 | #endif |
1062 | 6 | LOG(INFO) << "Memory consumption: " << HumanReadableNumBytes::ToString(consumption); |
1063 | 6 | return consumption < target_memory_consumption; |
1064 | 6 | }, 10s * kTimeMultiplier, |
1065 | 6 | Format("Waiting until memory consumption is less than $0 ...", |
1066 | 6 | HumanReadableNumBytes::ToString(target_memory_consumption))); |
1067 | 6 | LOG(INFO) << DumpMemoryUsage(); |
1068 | 6 | ASSERT_OK(wait_status); |
1069 | 6 | } |
1070 | | |
1071 | | // Further calls should be processed successfully since memory consumption is now under limit. |
1072 | 6 | n_calls = 20; |
1073 | 6 | const MonoDelta kCallsTimeout = 60s * kTimeMultiplier; |
1074 | 6 | LOG(INFO) << "Start sending more calls..."; |
1075 | 6 | latch.Reset(n_calls); |
1076 | 126 | for (int i = 0; i < n_calls; i++) { |
1077 | 120 | req.set_data(std::string(i + 1, 'Y')); |
1078 | 120 | auto controller = std::make_unique<RpcController>(); |
1079 | 120 | controller->set_timeout(kCallsTimeout); |
1080 | 120 | proxy->EchoAsync(req, &resp, controller.get(), latch.CountDownCallback()); |
1081 | 120 | controllers.push_back(std::move(controller)); |
1082 | 120 | } |
1083 | 6 | LOG(INFO) << n_calls << " calls sent."; |
1084 | 6 | latch.Wait(); |
1085 | 6 | LOG(INFO) << n_calls << " calls marked as finished."; |
1086 | | |
1087 | 126 | for (size_t i = 0; i < controllers.size(); ++i) { |
1088 | 120 | auto& controller = controllers[i]; |
1089 | 120 | ASSERT_TRUE(controller->finished()); |
1090 | 120 | auto s = controller->status(); |
1091 | 240 | ASSERT_TRUE(s.ok()) << "Unexpected error for call #" << i + 1 << ": " << AsString(s); |
1092 | 120 | } |
1093 | 6 | } |
1094 | | |
1095 | | } // namespace |
1096 | | |
1097 | 1 | TEST_F(TestRpc, CantAllocateReadBuffer) { |
1098 | 1 | RunPlainTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer()); |
1099 | 1 | } |
1100 | | |
1101 | | class TestRpcSecure : public RpcTestBase { |
1102 | | public: |
1103 | 13 | void SetUp() override { |
1104 | 13 | RpcTestBase::SetUp(); |
1105 | 13 | secure_context_ = std::make_unique<SecureContext>(); |
1106 | 13 | EXPECT_OK(secure_context_->TEST_GenerateKeys(1024, "127.0.0.1")); |
1107 | 13 | } |
1108 | | |
1109 | | protected: |
1110 | 26 | auto CreateSecureStreamFactory() { |
1111 | 26 | return SecureStreamFactory( |
1112 | 26 | TcpStream::Factory(), MemTracker::GetRootTracker(), secure_context_.get()); |
1113 | 26 | } |
1114 | | |
1115 | | std::unique_ptr<Messenger> CreateSecureMessenger( |
1116 | 12 | const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) { |
1117 | 12 | auto builder = CreateMessengerBuilder(name, options); |
1118 | 12 | builder.SetListenProtocol(SecureStreamProtocol()); |
1119 | 12 | builder.AddStreamFactory(SecureStreamProtocol(), CreateSecureStreamFactory()); |
1120 | 12 | return EXPECT_RESULT(builder.Build()); |
1121 | 12 | } |
1122 | | |
1123 | | std::unique_ptr<SecureContext> secure_context_; |
1124 | | |
1125 | | template <class F> |
1126 | 6 | void RunSecureTest(const F& f, const TestServerOptions& server_options = TestServerOptions()) { |
1127 | 12 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { |
1128 | 12 | return CreateSecureMessenger(name, options); |
1129 | 12 | }, f); |
1130 | 6 | } |
1131 | | }; |
1132 | | |
1133 | 5 | void TestSimple(CalculatorServiceProxy* proxy) { |
1134 | 5 | RpcController controller; |
1135 | 5 | controller.set_timeout(5s * kTimeMultiplier); |
1136 | 5 | rpc_test::AddRequestPB req; |
1137 | 5 | req.set_x(10); |
1138 | 5 | req.set_y(20); |
1139 | 5 | rpc_test::AddResponsePB resp; |
1140 | 5 | ASSERT_OK(proxy->Add(req, &resp, &controller)); |
1141 | 5 | ASSERT_EQ(30, resp.result()); |
1142 | 5 | } |
1143 | | |
1144 | 1 | TEST_F(TestRpcSecure, TLS) { |
1145 | 1 | RunSecureTest(&TestSimple); |
1146 | 1 | } |
1147 | | |
1148 | 10 | void TestBigOp(CalculatorServiceProxy* proxy) { |
1149 | 10 | RpcController controller; |
1150 | 10 | controller.set_timeout(5s * kTimeMultiplier); |
1151 | 10 | rpc_test::EchoRequestPB req; |
1152 | 10 | req.set_data(RandomHumanReadableString(4_MB)); |
1153 | 10 | rpc_test::EchoResponsePB resp; |
1154 | 10 | ASSERT_OK(proxy->Echo(req, &resp, &controller)); |
1155 | 10 | ASSERT_EQ(req.data(), resp.data()); |
1156 | 10 | } |
1157 | | |
1158 | 1 | TEST_F(TestRpcSecure, BigOp) { |
1159 | 1 | RunSecureTest(&TestBigOp); |
1160 | 1 | } |
1161 | | |
1162 | 1 | TEST_F(TestRpcSecure, BigOpWithSmallBuffer) { |
1163 | 1 | FLAGS_rpc_read_buffer_size = 128; |
1164 | 1 | RunSecureTest(&TestBigOp); |
1165 | 1 | } |
1166 | | |
1167 | 5 | void TestManyOps(CalculatorServiceProxy* proxy) { |
1168 | 5.00k | for (int i = 0; i != RegularBuildVsSanitizers(1000, 100); ++i) { |
1169 | 5.00k | RpcController controller; |
1170 | 5.00k | controller.set_timeout(5s * kTimeMultiplier); |
1171 | 5.00k | rpc_test::EchoRequestPB req; |
1172 | 5.00k | req.set_data(RandomHumanReadableString(4_KB)); |
1173 | 5.00k | rpc_test::EchoResponsePB resp; |
1174 | 5.00k | ASSERT_OK(proxy->Echo(req, &resp, &controller)); |
1175 | 5.00k | ASSERT_EQ(req.data(), resp.data()); |
1176 | 5.00k | } |
1177 | 5 | } |
1178 | | |
1179 | 1 | TEST_F(TestRpcSecure, ManyOps) { |
1180 | 1 | RunSecureTest(&TestManyOps); |
1181 | 1 | } |
1182 | | |
1183 | 5 | void TestConcurrentOps(CalculatorServiceProxy* proxy) { |
1184 | 5 | struct Op { |
1185 | 5 | RpcController controller; |
1186 | 5 | rpc_test::EchoRequestPB req; |
1187 | 5 | rpc_test::EchoResponsePB resp; |
1188 | 5 | }; |
1189 | 5 | std::vector<Op> ops(RegularBuildVsSanitizers(1000, 100)); |
1190 | 5 | CountDownLatch latch(ops.size()); |
1191 | 5.00k | for (auto& op : ops) { |
1192 | 5.00k | op.controller.set_timeout(5s * kTimeMultiplier); |
1193 | 5.00k | op.req.set_data(RandomHumanReadableString(4_KB)); |
1194 | 5.00k | proxy->EchoAsync(op.req, &op.resp, &op.controller, [&latch]() { |
1195 | 5.00k | latch.CountDown(); |
1196 | 5.00k | }); |
1197 | 5.00k | } |
1198 | 5 | latch.Wait(); |
1199 | 5.00k | for (const auto& op : ops) { |
1200 | 5.00k | ASSERT_OK(op.controller.status()); |
1201 | 5.00k | ASSERT_EQ(op.req.data(), op.resp.data()); |
1202 | 5.00k | } |
1203 | 5 | } |
1204 | | |
1205 | 1 | TEST_F(TestRpcSecure, ConcurrentOps) { |
1206 | 1 | RunSecureTest(&TestConcurrentOps); |
1207 | 1 | } |
1208 | | |
1209 | 1 | TEST_F(TestRpcSecure, CantAllocateReadBuffer) { |
1210 | 1 | RunSecureTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer()); |
1211 | 1 | } |
1212 | | |
1213 | | class TestRpcCompression : public RpcTestBase, public testing::WithParamInterface<int> { |
1214 | | public: |
1215 | 21 | void SetUp() override { |
1216 | 21 | FLAGS_stream_compression_algo = GetParam(); |
1217 | 21 | RpcTestBase::SetUp(); |
1218 | 21 | } |
1219 | | |
1220 | | protected: |
1221 | | std::unique_ptr<Messenger> CreateCompressedMessenger( |
1222 | 42 | const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) { |
1223 | 42 | auto builder = CreateMessengerBuilder(name, options); |
1224 | 42 | builder.SetListenProtocol(CompressedStreamProtocol()); |
1225 | 42 | builder.AddStreamFactory( |
1226 | 42 | CompressedStreamProtocol(), |
1227 | 42 | CompressedStreamFactory(TcpStream::Factory(), MemTracker::GetRootTracker())); |
1228 | 42 | return EXPECT_RESULT(builder.Build()); |
1229 | 42 | } |
1230 | | |
1231 | | template <class F> |
1232 | | void RunCompressionTest( |
1233 | 21 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { |
1234 | 42 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { |
1235 | 42 | return CreateCompressedMessenger(name, options); |
1236 | 42 | }, f); _ZZN2yb3rpc18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSE_11char_traitsIcEENSE_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESM_SP_ Line | Count | Source | 1234 | 36 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1235 | 36 | return CreateCompressedMessenger(name, options); | 1236 | 36 | }, f); |
rpc-test.cc:_ZZN2yb3rpc18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSB_11char_traitsIcEENSB_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESJ_SM_ Line | Count | Source | 1234 | 6 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1235 | 6 | return CreateCompressedMessenger(name, options); | 1236 | 6 | }, f); |
|
1237 | 21 | } _ZN2yb3rpc18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsE Line | Count | Source | 1233 | 18 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { | 1234 | 18 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1235 | 18 | return CreateCompressedMessenger(name, options); | 1236 | 18 | }, f); | 1237 | 18 | } |
rpc-test.cc:_ZN2yb3rpc18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsE Line | Count | Source | 1233 | 3 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { | 1234 | 3 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1235 | 3 | return CreateCompressedMessenger(name, options); | 1236 | 3 | }, f); | 1237 | 3 | } |
|
1238 | | }; |
1239 | | |
1240 | 3 | TEST_P(TestRpcCompression, Simple) { |
1241 | 3 | RunCompressionTest(&TestSimple); |
1242 | 3 | } |
1243 | | |
1244 | 3 | TEST_P(TestRpcCompression, BigOp) { |
1245 | 3 | RunCompressionTest(&TestBigOp); |
1246 | 3 | } |
1247 | | |
1248 | 3 | TEST_P(TestRpcCompression, BigOpWithSmallBuffer) { |
1249 | 3 | FLAGS_rpc_read_buffer_size = 128; |
1250 | 3 | RunCompressionTest(&TestBigOp); |
1251 | 3 | } |
1252 | | |
1253 | 3 | TEST_P(TestRpcCompression, ManyOps) { |
1254 | 3 | RunCompressionTest(&TestManyOps); |
1255 | 3 | } |
1256 | | |
1257 | 3 | TEST_P(TestRpcCompression, ConcurrentOps) { |
1258 | 3 | RunCompressionTest(&TestConcurrentOps); |
1259 | 3 | } |
1260 | | |
1261 | 3 | TEST_P(TestRpcCompression, CantAllocateReadBuffer) { |
1262 | 3 | RunCompressionTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer()); |
1263 | 3 | } |
1264 | | |
1265 | 4 | void TestCompression(CalculatorServiceProxy* proxy, const MetricEntityPtr& metric_entity) { |
1266 | 4 | constexpr size_t kStringLen = 4_KB; |
1267 | | |
1268 | 4 | size_t prev_sent = 0; |
1269 | 4 | size_t prev_received = 0; |
1270 | 36 | for (int i = 0;; ++i) { |
1271 | 36 | RpcController controller; |
1272 | 36 | controller.set_timeout(5s * kTimeMultiplier); |
1273 | 36 | rpc_test::EchoRequestPB req; |
1274 | 36 | req.set_data(std::string(kStringLen, 'Y')); |
1275 | 36 | rpc_test::EchoResponsePB resp; |
1276 | 36 | ASSERT_OK(proxy->Echo(req, &resp, &controller)); |
1277 | 36 | ASSERT_EQ(req.data(), resp.data()); |
1278 | | |
1279 | 36 | auto sent_counter = ASSERT_RESULT(GetCounter(metric_entity, METRIC_tcp_bytes_sent)); |
1280 | 36 | auto received_counter = ASSERT_RESULT(GetCounter(metric_entity, METRIC_tcp_bytes_received)); |
1281 | | |
1282 | | // First FLAGS_num_connections_to_server runs were warmup. |
1283 | | // To avoid counting handshake bytes. |
1284 | 36 | if (i == FLAGS_num_connections_to_server) { |
1285 | 4 | auto sent = sent_counter->value() - prev_sent; |
1286 | 4 | auto received = received_counter->value() - prev_received; |
1287 | 4 | LOG(INFO) << "Sent: " << sent << ", received: " << received; |
1288 | | |
1289 | 4 | ASSERT_GT(sent, 10); // Check that metric even work. |
1290 | 4 | ASSERT_LE(sent, kStringLen / 5); // Check that compression work. |
1291 | 4 | ASSERT_GT(received, 10); // Check that metric even work. |
1292 | 4 | ASSERT_LE(received, kStringLen / 5); // Check that compression work. |
1293 | 4 | break; |
1294 | 32 | } |
1295 | | |
1296 | 32 | prev_sent = sent_counter->value(); |
1297 | 32 | prev_received = received_counter->value(); |
1298 | 32 | } |
1299 | 4 | } |
1300 | | |
1301 | 3 | TEST_P(TestRpcCompression, Compression) { |
1302 | 3 | RunCompressionTest([this](CalculatorServiceProxy* proxy) { |
1303 | 3 | TestCompression(proxy, metric_entity()); |
1304 | 3 | }); |
1305 | 3 | } |
1306 | | |
1307 | 1.19k | std::string CompressionName(const testing::TestParamInfo<int>& info) { |
1308 | 1.19k | switch (info.param) { |
1309 | 399 | case 1: return "Zlib"; |
1310 | 399 | case 2: return "Snappy"; |
1311 | 399 | case 3: return "LZ4"; |
1312 | 0 | } |
1313 | 0 | return Format("Unknown compression $0", info.param); |
1314 | 0 | } |
1315 | | |
1316 | | INSTANTIATE_TEST_CASE_P(, TestRpcCompression, testing::Range(1, 4), CompressionName); |
1317 | | |
1318 | | class TestRpcSecureCompression : public TestRpcSecure { |
1319 | | public: |
1320 | 7 | void SetUp() override { |
1321 | 7 | FLAGS_stream_compression_algo = 1; |
1322 | 7 | TestRpcSecure::SetUp(); |
1323 | 7 | } |
1324 | | |
1325 | | protected: |
1326 | | std::unique_ptr<Messenger> CreateSecureCompressedMessenger( |
1327 | 14 | const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) { |
1328 | 14 | auto builder = CreateMessengerBuilder(name, options); |
1329 | 14 | builder.SetListenProtocol(CompressedStreamProtocol()); |
1330 | 14 | builder.AddStreamFactory( |
1331 | 14 | CompressedStreamProtocol(), |
1332 | 14 | CompressedStreamFactory(CreateSecureStreamFactory(), MemTracker::GetRootTracker())); |
1333 | 14 | return EXPECT_RESULT(builder.Build()); |
1334 | 14 | } |
1335 | | |
1336 | | template <class F> |
1337 | | void RunSecureCompressionTest( |
1338 | 7 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { |
1339 | 14 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { |
1340 | 14 | return CreateSecureCompressedMessenger(name, options); |
1341 | 14 | }, f); _ZZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSE_11char_traitsIcEENSE_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESM_SP_ Line | Count | Source | 1339 | 12 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1340 | 12 | return CreateSecureCompressedMessenger(name, options); | 1341 | 12 | }, f); |
rpc-test.cc:_ZZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSB_11char_traitsIcEENSB_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESJ_SM_ Line | Count | Source | 1339 | 2 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1340 | 2 | return CreateSecureCompressedMessenger(name, options); | 1341 | 2 | }, f); |
|
1342 | 7 | } _ZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsE Line | Count | Source | 1338 | 6 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { | 1339 | 6 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1340 | 6 | return CreateSecureCompressedMessenger(name, options); | 1341 | 6 | }, f); | 1342 | 6 | } |
rpc-test.cc:_ZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsE Line | Count | Source | 1338 | 1 | const F& f, const TestServerOptions& server_options = TestServerOptions()) { | 1339 | 1 | RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) { | 1340 | 1 | return CreateSecureCompressedMessenger(name, options); | 1341 | 1 | }, f); | 1342 | 1 | } |
|
1343 | | }; |
1344 | | |
1345 | 1 | TEST_F(TestRpcSecureCompression, Simple) { |
1346 | 1 | RunSecureCompressionTest(&TestSimple); |
1347 | 1 | } |
1348 | | |
1349 | 1 | TEST_F(TestRpcSecureCompression, BigOp) { |
1350 | 1 | RunSecureCompressionTest(&TestBigOp); |
1351 | 1 | } |
1352 | | |
1353 | 1 | TEST_F(TestRpcSecureCompression, BigOpWithSmallBuffer) { |
1354 | 1 | FLAGS_rpc_read_buffer_size = 128; |
1355 | 1 | RunSecureCompressionTest(&TestBigOp); |
1356 | 1 | } |
1357 | | |
1358 | 1 | TEST_F(TestRpcSecureCompression, ManyOps) { |
1359 | 1 | RunSecureCompressionTest(&TestManyOps); |
1360 | 1 | } |
1361 | | |
1362 | 1 | TEST_F(TestRpcSecureCompression, ConcurrentOps) { |
1363 | 1 | RunSecureCompressionTest(&TestConcurrentOps); |
1364 | 1 | } |
1365 | | |
1366 | 1 | TEST_F(TestRpcSecureCompression, CantAllocateReadBuffer) { |
1367 | 1 | RunSecureCompressionTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer()); |
1368 | 1 | } |
1369 | | |
1370 | 1 | TEST_F(TestRpcSecureCompression, Compression) { |
1371 | 1 | RunSecureCompressionTest([this](CalculatorServiceProxy* proxy) { |
1372 | 1 | TestCompression(proxy, metric_entity()); |
1373 | 1 | }); |
1374 | 1 | } |
1375 | | |
1376 | | } // namespace rpc |
1377 | | } // namespace yb |