/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_stub-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <condition_variable> |
34 | | #include <functional> |
35 | | #include <thread> |
36 | | #include <vector> |
37 | | |
38 | | #include <gtest/gtest.h> |
39 | | |
40 | | #include "yb/gutil/stl_util.h" |
41 | | |
42 | | #include "yb/rpc/proxy.h" |
43 | | #include "yb/rpc/rpc-test-base.h" |
44 | | #include "yb/rpc/rpc_controller.h" |
45 | | #include "yb/rpc/rpc_introspection.pb.h" |
46 | | #include "yb/rpc/rtest.proxy.h" |
47 | | #include "yb/rpc/rtest.service.h" |
48 | | #include "yb/rpc/yb_rpc.h" |
49 | | |
50 | | #include "yb/util/countdown_latch.h" |
51 | | #include "yb/util/metrics.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/size_literals.h" |
54 | | #include "yb/util/status_log.h" |
55 | | #include "yb/util/subprocess.h" |
56 | | #include "yb/util/test_macros.h" |
57 | | #include "yb/util/test_util.h" |
58 | | #include "yb/util/tsan_util.h" |
59 | | #include "yb/util/tostring.h" |
60 | | #include "yb/util/user.h" |
61 | | |
62 | | DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic"); |
63 | | DECLARE_bool(socket_inject_short_recvs); |
64 | | DECLARE_int32(rpc_slow_query_threshold_ms); |
65 | | DECLARE_int32(TEST_delay_connect_ms); |
66 | | |
67 | | METRIC_DECLARE_counter(service_request_bytes_yb_rpc_test_CalculatorService_Echo); |
68 | | METRIC_DECLARE_counter(service_response_bytes_yb_rpc_test_CalculatorService_Echo); |
69 | | METRIC_DECLARE_counter(proxy_request_bytes_yb_rpc_test_CalculatorService_Echo); |
70 | | METRIC_DECLARE_counter(proxy_response_bytes_yb_rpc_test_CalculatorService_Echo); |
71 | | |
72 | | using namespace std::chrono_literals; |
73 | | |
74 | | namespace yb { |
75 | | namespace rpc { |
76 | | |
77 | | using yb::rpc_test::AddRequestPB; |
78 | | using yb::rpc_test::AddResponsePB; |
79 | | using yb::rpc_test::EchoRequestPB; |
80 | | using yb::rpc_test::EchoResponsePB; |
81 | | using yb::rpc_test::ForwardRequestPB; |
82 | | using yb::rpc_test::ForwardResponsePB; |
83 | | using yb::rpc_test::PanicRequestPB; |
84 | | using yb::rpc_test::PanicResponsePB; |
85 | | using yb::rpc_test::SendStringsRequestPB; |
86 | | using yb::rpc_test::SendStringsResponsePB; |
87 | | using yb::rpc_test::SleepRequestPB; |
88 | | using yb::rpc_test::SleepResponsePB; |
89 | | using yb::rpc_test::WhoAmIRequestPB; |
90 | | using yb::rpc_test::WhoAmIResponsePB; |
91 | | using yb::rpc_test::PingRequestPB; |
92 | | using yb::rpc_test::PingResponsePB; |
93 | | using yb::rpc_test::DisconnectRequestPB; |
94 | | using yb::rpc_test::DisconnectResponsePB; |
95 | | using yb::rpc_test_diff_package::ReqDiffPackagePB; |
96 | | using yb::rpc_test_diff_package::RespDiffPackagePB; |
97 | | |
98 | | using std::shared_ptr; |
99 | | using std::vector; |
100 | | |
101 | | using rpc_test::AddRequestPartialPB; |
102 | | using rpc_test::CalculatorServiceProxy; |
103 | | |
104 | | class RpcStubTest : public RpcTestBase { |
105 | | public: |
106 | 26 | void SetUp() override { |
107 | 26 | RpcTestBase::SetUp(); |
108 | 26 | StartTestServerWithGeneratedCode(&server_hostport_); |
109 | 26 | client_messenger_ = CreateAutoShutdownMessengerHolder("Client"); |
110 | 26 | proxy_cache_ = std::make_unique<ProxyCache>(client_messenger_.get()); |
111 | 26 | } |
112 | | |
113 | | protected: |
114 | 103 | void SendSimpleCall() { |
115 | 103 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
116 | | |
117 | 103 | RpcController controller; |
118 | 103 | AddRequestPB req; |
119 | 103 | req.set_x(10); |
120 | 103 | req.set_y(20); |
121 | 103 | AddResponsePB resp; |
122 | 103 | ASSERT_OK(p.Add(req, &resp, &controller)); |
123 | 103 | ASSERT_EQ(30, resp.result()); |
124 | 103 | } |
125 | | |
126 | | template <class T> |
127 | | struct ProxyWithMessenger { |
128 | | AutoShutdownMessengerHolder messenger; |
129 | | std::unique_ptr<T> proxy; |
130 | | }; |
131 | | |
132 | 3 | ProxyWithMessenger<CalculatorServiceProxy> CreateCalculatorProxyHolder(const Endpoint& remote) { |
133 | 3 | auto messenger = CreateAutoShutdownMessengerHolder("Client"); |
134 | 3 | IpAddress local_address = remote.address().is_v6() |
135 | 1 | ? IpAddress(boost::asio::ip::address_v6::loopback()) |
136 | 2 | : IpAddress(boost::asio::ip::address_v4::loopback()); |
137 | | // To have outbound calls with appropriate address |
138 | 3 | EXPECT_OK(messenger->ListenAddress( |
139 | 3 | CreateConnectionContextFactory<YBInboundConnectionContext>(), |
140 | 3 | Endpoint(local_address, 0))); |
141 | 3 | EXPECT_OK(messenger->StartAcceptor()); |
142 | 3 | EXPECT_FALSE(messenger->io_service().stopped()); |
143 | 3 | ProxyCache proxy_cache(messenger.get()); |
144 | 3 | return { move(messenger), |
145 | 3 | std::make_unique<CalculatorServiceProxy>(&proxy_cache, HostPort(remote)) }; |
146 | 3 | } |
147 | | |
148 | | HostPort server_hostport_; |
149 | | AutoShutdownMessengerHolder client_messenger_; |
150 | | std::unique_ptr<ProxyCache> proxy_cache_; |
151 | | }; |
152 | | |
153 | 1 | TEST_F(RpcStubTest, TestSimpleCall) { |
154 | 1 | SendSimpleCall(); |
155 | 1 | } |
156 | | |
157 | 1 | TEST_F(RpcStubTest, ConnectTimeout) { |
158 | 1 | FLAGS_TEST_delay_connect_ms = 5000; |
159 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
160 | 1 | const MonoDelta kWaitTime = 1s; |
161 | 1 | const MonoDelta kAllowedError = 100ms; |
162 | | |
163 | 1 | RpcController controller; |
164 | 1 | controller.set_timeout(kWaitTime); |
165 | 1 | AddRequestPB req; |
166 | 1 | req.set_x(10); |
167 | 1 | req.set_y(20); |
168 | 1 | AddResponsePB resp; |
169 | 1 | auto start = MonoTime::Now(); |
170 | 1 | auto status = p.Add(req, &resp, &controller); |
171 | 1 | auto passed = MonoTime::Now() - start; |
172 | 2 | ASSERT_TRUE(status.IsTimedOut()) << "Status: " << status; |
173 | 1 | ASSERT_GE(passed, kWaitTime - kAllowedError); |
174 | 1 | ASSERT_LE(passed, kWaitTime + kAllowedError); |
175 | | |
176 | 1 | SendSimpleCall(); |
177 | 1 | } |
178 | | |
179 | 1 | TEST_F(RpcStubTest, RandomTimeout) { |
180 | 1 | const size_t kTotalCalls = 1000; |
181 | 1 | const MonoDelta kMaxTimeout = 2s; |
182 | | |
183 | 1 | FLAGS_TEST_delay_connect_ms = narrow_cast<int>(kMaxTimeout.ToMilliseconds() / 2); |
184 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
185 | | |
186 | 1 | struct CallData { |
187 | 1 | RpcController controller; |
188 | 1 | AddRequestPB req; |
189 | 1 | AddResponsePB resp; |
190 | 1 | }; |
191 | 1 | std::vector<CallData> calls(kTotalCalls); |
192 | 1 | CountDownLatch latch(kTotalCalls); |
193 | | |
194 | 1.00k | for (auto& call : calls) { |
195 | 1.00k | auto timeout = MonoDelta::FromMilliseconds( |
196 | 1.00k | RandomUniformInt<int64_t>(0, kMaxTimeout.ToMilliseconds())); |
197 | 1.00k | call.controller.set_timeout(timeout); |
198 | 1.00k | call.req.set_x(RandomUniformInt(-1000, 1000)); |
199 | 1.00k | call.req.set_y(RandomUniformInt(-1000, 1000)); |
200 | 1.00k | p.AddAsync(call.req, &call.resp, &call.controller, [&latch] { |
201 | 1.00k | latch.CountDown(); |
202 | 1.00k | }); |
203 | 1.00k | } |
204 | | |
205 | 1 | ASSERT_TRUE(latch.WaitFor(kMaxTimeout)); |
206 | | |
207 | 1 | size_t timed_out = 0; |
208 | 1.00k | for (auto& call : calls) { |
209 | 1.00k | if (call.controller.status().IsTimedOut()) { |
210 | 556 | ++timed_out; |
211 | 444 | } else { |
212 | 444 | ASSERT_OK(call.controller.status()); |
213 | 444 | ASSERT_EQ(call.req.x() + call.req.y(), call.resp.result()); |
214 | 444 | } |
215 | 1.00k | } |
216 | | |
217 | 1 | LOG(INFO) << "Timed out calls: " << timed_out; |
218 | | |
219 | | // About half of calls should expire, so we do a bit more relaxed checks. |
220 | 1 | ASSERT_GT(timed_out, kTotalCalls / 4); |
221 | 1 | ASSERT_LT(timed_out, kTotalCalls * 3 / 4); |
222 | 1 | } |
223 | | |
224 | | // Regression test for a bug in which we would not properly parse a call |
225 | | // response when recv() returned a 'short read'. This injects such short |
226 | | // reads and then makes a number of calls. |
227 | 1 | TEST_F(RpcStubTest, TestShortRecvs) { |
228 | 1 | google::FlagSaver saver; |
229 | 1 | FLAGS_socket_inject_short_recvs = true; |
230 | | |
231 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
232 | | |
233 | 101 | for (int i = 0; i < 100; i++) { |
234 | 100 | ASSERT_NO_FATALS(SendSimpleCall()); |
235 | 100 | } |
236 | 1 | } |
237 | | |
238 | | void CheckForward(CalculatorServiceProxy* proxy, |
239 | | const Endpoint& endpoint, |
240 | 8 | const std::string& expected) { |
241 | 8 | ForwardRequestPB req; |
242 | 8 | if (!endpoint.address().is_unspecified()) { |
243 | 6 | req.set_host(endpoint.address().to_string()); |
244 | 6 | req.set_port(endpoint.port()); |
245 | 6 | } |
246 | 8 | ForwardResponsePB resp; |
247 | | |
248 | 8 | RpcController controller; |
249 | 8 | controller.set_timeout(1s); |
250 | 8 | auto status = proxy->Forward(req, &resp, &controller); |
251 | 8 | if (expected.empty()) { |
252 | 2 | LOG(INFO) << "Call status: " << status; |
253 | 4 | ASSERT_NOK(status) << "Name: " << resp.name(); |
254 | 6 | } else { |
255 | 6 | ASSERT_OK(status); |
256 | 6 | ASSERT_EQ(expected, resp.name()); |
257 | 6 | } |
258 | 8 | } |
259 | | |
260 | | // Test making successful RPC calls. |
261 | 1 | TEST_F(RpcStubTest, TestIncoherence) { |
262 | 1 | static const std::string kServer1Name = "Server1"; |
263 | 1 | TestServerOptions server1options; |
264 | 1 | server1options.endpoint = Endpoint(IpAddress::from_string("127.0.0.11"), 0); |
265 | 1 | static const std::string kServer2Name = "Server2"; |
266 | 1 | TestServerOptions server2options; |
267 | 1 | server2options.endpoint = Endpoint(IpAddress::from_string("127.0.0.12"), 0); |
268 | | |
269 | 1 | auto server1 = StartTestServer(server1options, kServer1Name); |
270 | 1 | auto proxy1holder = CreateCalculatorProxyHolder(server1.bound_endpoint()); |
271 | 1 | auto& proxy1 = *proxy1holder.proxy; |
272 | 1 | auto server2 = StartTestServer(server2options, kServer2Name); |
273 | 1 | auto proxy2holder = CreateCalculatorProxyHolder(server2.bound_endpoint()); |
274 | 1 | auto& proxy2 = *proxy2holder.proxy; |
275 | | |
276 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), kServer2Name)); |
277 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), kServer1Name)); |
278 | | |
279 | 1 | server2.messenger()->BreakConnectivityWith(server1.bound_endpoint().address()); |
280 | | |
281 | 1 | LOG(INFO) << "Checking connectivity"; |
282 | | // No connection between servers. |
283 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), std::string())); |
284 | | // We could connect to server1. |
285 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy1, Endpoint(), kServer1Name)); |
286 | | // No connection between servers. |
287 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), std::string())); |
288 | | // We could connect to server2. |
289 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy2, Endpoint(), kServer2Name)); |
290 | | |
291 | 1 | server2.messenger()->RestoreConnectivityWith(server1.bound_endpoint().address()); |
292 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), kServer2Name)); |
293 | 1 | ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), kServer1Name)); |
294 | 1 | } |
295 | | |
296 | | // Test calls which are rather large. |
297 | | // This test sends many of them at once using the async API and then |
298 | | // waits for them all to return. This is meant to ensure that the |
299 | | // IO threads can deal with read/write calls that don't succeed |
300 | | // in sending the entire data in one go. |
301 | 1 | TEST_F(RpcStubTest, TestBigCallData) { |
302 | 1 | constexpr int kNumSentAtOnce = 1; |
303 | 1 | constexpr size_t kMessageSize = NonTsanVsTsan(32_MB, 4_MB); |
304 | | |
305 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
306 | | |
307 | 1 | EchoRequestPB req; |
308 | 1 | req.set_data(RandomHumanReadableString(kMessageSize)); |
309 | | |
310 | 1 | std::vector<EchoResponsePB> resps(kNumSentAtOnce); |
311 | 1 | std::vector<RpcController> controllers(kNumSentAtOnce); |
312 | | |
313 | 1 | CountDownLatch latch(kNumSentAtOnce); |
314 | 2 | for (int i = 0; i < kNumSentAtOnce; i++) { |
315 | 1 | auto resp = &resps[i]; |
316 | 1 | auto controller = &controllers[i]; |
317 | 1 | controller->set_timeout(60s); |
318 | | |
319 | 1 | p.EchoAsync(req, resp, controller, [&latch]() { latch.CountDown(); }); |
320 | 1 | } |
321 | | |
322 | 1 | latch.Wait(); |
323 | | |
324 | 1 | for (RpcController &c : controllers) { |
325 | 1 | EXPECT_OK(c.status()); |
326 | 1 | } |
327 | | |
328 | 1 | for (auto& resp : resps) { |
329 | 1 | ASSERT_EQ(resp.data(), req.data()); |
330 | 1 | } |
331 | 1 | } |
332 | | |
333 | 1 | TEST_F(RpcStubTest, TestRespondDeferred) { |
334 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
335 | | |
336 | 1 | RpcController controller; |
337 | 1 | SleepRequestPB req; |
338 | 1 | req.set_sleep_micros(1000); |
339 | 1 | req.set_deferred(true); |
340 | 1 | SleepResponsePB resp; |
341 | 1 | ASSERT_OK(p.Sleep(req, &resp, &controller)); |
342 | 1 | } |
343 | | |
344 | | // Test that the default user credentials are propagated to the server. |
345 | 1 | TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) { |
346 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
347 | | |
348 | 1 | string expected = ASSERT_RESULT(GetLoggedInUser()); |
349 | | |
350 | 1 | RpcController controller; |
351 | 1 | WhoAmIRequestPB req; |
352 | 1 | WhoAmIResponsePB resp; |
353 | 1 | ASSERT_OK(p.WhoAmI(req, &resp, &controller)); |
354 | 1 | } |
355 | | |
356 | | // Test that the user can specify other credentials. |
357 | 1 | TEST_F(RpcStubTest, TestCustomCredentialsPropagated) { |
358 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
359 | | |
360 | 1 | RpcController controller; |
361 | 1 | WhoAmIRequestPB req; |
362 | 1 | WhoAmIResponsePB resp; |
363 | 1 | ASSERT_OK(p.WhoAmI(req, &resp, &controller)); |
364 | 1 | } |
365 | | |
366 | | // Test that the user's remote address is accessible to the server. |
367 | 1 | TEST_F(RpcStubTest, TestRemoteAddress) { |
368 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
369 | | |
370 | 1 | RpcController controller; |
371 | 1 | WhoAmIRequestPB req; |
372 | 1 | WhoAmIResponsePB resp; |
373 | 1 | ASSERT_OK(p.WhoAmI(req, &resp, &controller)); |
374 | 1 | ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:"); |
375 | 1 | } |
376 | | |
377 | | //////////////////////////////////////////////////////////// |
378 | | // Tests for error cases |
379 | | //////////////////////////////////////////////////////////// |
380 | | |
381 | | // Test sending a PB parameter with a missing field, where the client |
382 | | // thinks it has sent a full PB. (eg due to version mismatch) |
383 | 1 | TEST_F(RpcStubTest, TestCallWithInvalidParam) { |
384 | 1 | Proxy p(client_messenger_.get(), server_hostport_); |
385 | | |
386 | 1 | AddRequestPartialPB req; |
387 | 1 | req.set_x(RandomUniformInt<uint32_t>()); |
388 | | // AddRequestPartialPB is missing the 'y' field. |
389 | 1 | AddResponsePB resp; |
390 | 1 | RpcController controller; |
391 | 1 | Status s = p.SyncRequest( |
392 | 1 | CalculatorServiceMethods::AddMethod(), /* method_metrics= */ nullptr, req, &resp, |
393 | 1 | &controller); |
394 | 2 | ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s; |
395 | | // Remote error messages always contain file name and line number. |
396 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument ("); |
397 | 1 | ASSERT_STR_CONTAINS(s.ToString(), |
398 | 1 | "Invalid parameter for call yb.rpc_test.CalculatorService.Add: y"); |
399 | 1 | } |
400 | | |
401 | | // Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old' |
402 | | // value, and our callback needs to be a void function. |
403 | 1 | static void DoIncrement(Atomic32* count) { |
404 | 1 | base::subtle::Barrier_AtomicIncrement(count, 1); |
405 | 1 | } |
406 | | |
407 | | // Test sending a PB parameter with a missing field on the client side. |
408 | | // This also ensures that the async callback is only called once |
409 | | // (regression test for a previously-encountered bug). |
410 | 1 | TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) { |
411 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
412 | | |
413 | 1 | RpcController controller; |
414 | 1 | AddRequestPB req; |
415 | 1 | req.set_x(10); |
416 | | // Request is missing the 'y' field. |
417 | 1 | AddResponsePB resp; |
418 | 1 | Atomic32 callback_count = 0; |
419 | 1 | p.AddAsync(req, &resp, &controller, std::bind(&DoIncrement, &callback_count)); |
420 | 1 | while (NoBarrier_Load(&callback_count) == 0) { |
421 | 0 | SleepFor(MonoDelta::FromMicroseconds(10)); |
422 | 0 | } |
423 | 1 | SleepFor(MonoDelta::FromMicroseconds(100)); |
424 | 1 | ASSERT_EQ(1, NoBarrier_Load(&callback_count)); |
425 | 1 | ASSERT_STR_CONTAINS(controller.status().ToString(false), |
426 | 1 | "Invalid argument: RPC argument missing required fields: y"); |
427 | 1 | } |
428 | | |
429 | | // Test sending a call which isn't implemented by the server. |
430 | 1 | TEST_F(RpcStubTest, TestCallMissingMethod) { |
431 | 1 | Proxy proxy(client_messenger_.get(), server_hostport_); |
432 | | |
433 | 1 | RemoteMethod method(yb::rpc_test::CalculatorServiceIf::static_service_name(), "DoesNotExist"); |
434 | 1 | Status s = DoTestSyncCall(&proxy, &method); |
435 | 2 | ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); |
436 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist"); |
437 | 1 | } |
438 | | |
439 | 1 | TEST_F(RpcStubTest, TestApplicationError) { |
440 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
441 | | |
442 | 1 | RpcController controller; |
443 | 1 | SleepRequestPB req; |
444 | 1 | SleepResponsePB resp; |
445 | 1 | req.set_sleep_micros(1); |
446 | 1 | req.set_return_app_error(true); |
447 | 1 | Status s = p.Sleep(req, &resp, &controller); |
448 | 1 | ASSERT_TRUE(s.IsRemoteError()); |
449 | 1 | EXPECT_EQ("Remote error: Got some error", s.ToString(false)); |
450 | 1 | EXPECT_EQ("message: \"Got some error\"\n" |
451 | 1 | "[yb.rpc_test.CalculatorError.app_error_ext] {\n" |
452 | 1 | " extra_error_data: \"some application-specific error data\"\n" |
453 | 1 | "}\n", controller.error_response()->DebugString()); |
454 | 1 | } |
455 | | |
456 | 2 | TEST_F(RpcStubTest, TestRpcPanic) { |
457 | 2 | if (!FLAGS_is_panic_test_child) { |
458 | | // This is a poor man's death test. We call this same |
459 | | // test case, but set the above flag, and verify that |
460 | | // it aborted. gtest death tests don't work here because |
461 | | // there are already threads started up. |
462 | 1 | vector<string> argv; |
463 | 1 | string executable_path; |
464 | 1 | CHECK_OK(env_->GetExecutablePath(&executable_path)); |
465 | 1 | argv.push_back(executable_path); |
466 | 1 | argv.push_back("--is_panic_test_child"); |
467 | 1 | argv.push_back("--gtest_filter=RpcStubTest.TestRpcPanic"); |
468 | | |
469 | 1 | Subprocess subp(argv[0], argv); |
470 | 1 | subp.PipeParentStderr(); |
471 | 1 | CHECK_OK(subp.Start()); |
472 | 1 | FILE* in = fdopen(subp.from_child_stderr_fd(), "r"); |
473 | 1 | PCHECK(in); |
474 | | |
475 | | // Search for string "Test method panicking!" somewhere in stderr |
476 | 1 | std::string error_message; |
477 | 1 | char buf[1024]; |
478 | 91 | while (fgets(buf, sizeof(buf), in)) { |
479 | 90 | error_message += buf; |
480 | 90 | } |
481 | 1 | ASSERT_STR_CONTAINS(error_message, "Test method panicking!"); |
482 | | |
483 | | // Check return status |
484 | 1 | int wait_status = 0; |
485 | 1 | CHECK_OK(subp.Wait(&wait_status)); |
486 | 1 | CHECK(!WIFEXITED(wait_status)); // should not have been successful |
487 | 1 | if (WIFSIGNALED(wait_status)) { |
488 | 1 | CHECK_EQ(WTERMSIG(wait_status), SIGABRT); |
489 | 0 | } else { |
490 | | // On some systems, we get exit status 134 from SIGABRT rather than |
491 | | // WIFSIGNALED getting flagged. |
492 | 0 | CHECK_EQ(WEXITSTATUS(wait_status), 134); |
493 | 0 | } |
494 | 1 | return; |
495 | 1 | } else { |
496 | | // Before forcing the panic, explicitly remove the test directory. This |
497 | | // should be safe; this test doesn't generate any data. |
498 | 1 | CHECK_OK(env_->DeleteRecursively(GetTestDataDirectory())); |
499 | | |
500 | | // Make an RPC which causes the server to abort. |
501 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
502 | 1 | RpcController controller; |
503 | 1 | PanicRequestPB req; |
504 | 1 | PanicResponsePB resp; |
505 | 1 | ASSERT_OK(p.Panic(req, &resp, &controller)); |
506 | 1 | } |
507 | 2 | } |
508 | | |
509 | | struct AsyncSleep { |
510 | | RpcController rpc; |
511 | | SleepRequestPB req; |
512 | | SleepResponsePB resp; |
513 | | }; |
514 | | |
515 | 1 | TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) { |
516 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
517 | 1 | vector<AsyncSleep*> sleeps; |
518 | 1 | ElementDeleter d(&sleeps); |
519 | | |
520 | | // Send enough sleep calls to occupy the worker threads. |
521 | 1 | auto count = client_messenger_->max_concurrent_requests() * 4; |
522 | 1 | CountDownLatch latch(count); |
523 | 33 | for (size_t i = 0; i < count; i++) { |
524 | 32 | auto sleep = std::make_unique<AsyncSleep>(); |
525 | 32 | sleep->rpc.set_timeout(10s); |
526 | 32 | sleep->req.set_sleep_micros(500 * 1000); // 100ms |
527 | 32 | p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc, [&latch]() { latch.CountDown(); }); |
528 | 32 | sleeps.push_back(sleep.release()); |
529 | 32 | } |
530 | | |
531 | | // Send another call with a short timeout. This shouldn't get processed, because |
532 | | // it'll get stuck in the queue for longer than its timeout. |
533 | 1 | RpcController rpc; |
534 | 1 | SleepRequestPB req; |
535 | 1 | SleepResponsePB resp; |
536 | 1 | req.set_sleep_micros(1000); |
537 | 1 | rpc.set_timeout(250ms); |
538 | 1 | Status s = p.Sleep(req, &resp, &rpc); |
539 | 2 | ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
540 | | |
541 | 1 | latch.Wait(); |
542 | | |
543 | | // Verify that the timedout call got short circuited before being processed. |
544 | 1 | const Counter* timed_out_in_queue = server().service_pool().RpcsTimedOutInQueueMetricForTests(); |
545 | 1 | ASSERT_EQ(1, timed_out_in_queue->value()); |
546 | 1 | } |
547 | | |
548 | 1 | TEST_F(RpcStubTest, TestDumpCallsInFlight) { |
549 | 1 | CountDownLatch latch(1); |
550 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
551 | 1 | AsyncSleep sleep; |
552 | 1 | sleep.req.set_sleep_micros(1000 * 1000); // 100ms |
553 | 1 | p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, [&latch]() { latch.CountDown(); }); |
554 | | |
555 | | // Check the running RPC status on the client messenger. |
556 | 1 | DumpRunningRpcsRequestPB dump_req; |
557 | 1 | DumpRunningRpcsResponsePB dump_resp; |
558 | 1 | dump_req.set_include_traces(true); |
559 | | |
560 | 1 | std::this_thread::sleep_for(10ms); |
561 | 1 | ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp)); |
562 | 1 | LOG(INFO) << "client messenger: " << dump_resp.DebugString(); |
563 | 1 | ASSERT_EQ(1, dump_resp.outbound_connections_size()); |
564 | 1 | ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size()); |
565 | 1 | ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0). |
566 | 1 | header().remote_method().method_name()); |
567 | 1 | ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).elapsed_millis(), 0); |
568 | | |
569 | | // And the server messenger. |
570 | | // We have to loop this until we find a result since the actual call is sent |
571 | | // asynchronously off of the main thread (ie the server may not be handling it yet) |
572 | 1 | for (int i = 0; i < 100; i++) { |
573 | 1 | dump_resp.Clear(); |
574 | 1 | ASSERT_OK(server_messenger()->DumpRunningRpcs(dump_req, &dump_resp)); |
575 | 1 | if (dump_resp.inbound_connections_size() > 0 && |
576 | 1 | dump_resp.inbound_connections(0).calls_in_flight_size() > 0) { |
577 | 1 | break; |
578 | 1 | } |
579 | 0 | SleepFor(MonoDelta::FromMilliseconds(1)); |
580 | 0 | } |
581 | | |
582 | 1 | LOG(INFO) << "server messenger: " << dump_resp.DebugString(); |
583 | 1 | ASSERT_EQ(1, dump_resp.inbound_connections_size()); |
584 | 1 | ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size()); |
585 | 1 | ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0). |
586 | 1 | header().remote_method().method_name()); |
587 | 1 | ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).elapsed_millis(), 0); |
588 | 1 | ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(), |
589 | 1 | "Inserting onto call queue"); |
590 | 1 | latch.Wait(); |
591 | 1 | } |
592 | | |
593 | | namespace { |
594 | | struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> { |
595 | | }; |
596 | | |
597 | | // Test callback which takes a refcounted pointer. |
598 | | // We don't use this parameter, but it's used to validate that the bound callback |
599 | | // is cleared in TestCallbackClearedAfterRunning. |
600 | 1 | void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) { |
601 | 1 | latch->CountDown(); |
602 | 1 | } |
603 | | } // anonymous namespace |
604 | | |
605 | | // Verify that, after a call has returned, no copy of the call's callback |
606 | | // is held. This is important when the callback holds a refcounted ptr, |
607 | | // since we expect to be able to release that pointer when the call is done. |
608 | 1 | TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) { |
609 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
610 | | |
611 | 1 | CountDownLatch latch(1); |
612 | 1 | scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest); |
613 | 1 | RpcController controller; |
614 | 1 | AddRequestPB req; |
615 | 1 | req.set_x(10); |
616 | 1 | req.set_y(20); |
617 | 1 | AddResponsePB resp; |
618 | 1 | p.AddAsync(req, &resp, &controller, std::bind(MyTestCallback, &latch, my_refptr)); |
619 | 1 | latch.Wait(); |
620 | | |
621 | | // The ref count should go back down to 1. However, we need to loop a little |
622 | | // bit, since the deref is happening on another thread. If the other thread gets |
623 | | // descheduled directly after calling our callback, we'd fail without these sleeps. |
624 | 1 | for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) { |
625 | 0 | SleepFor(MonoDelta::FromMilliseconds(1)); |
626 | 0 | } |
627 | 1 | ASSERT_TRUE(my_refptr->HasOneRef()); |
628 | 1 | } |
629 | | |
630 | | struct PingCall { |
631 | | PingResponsePB response; |
632 | | RpcController controller; |
633 | | MonoTime handle_time; |
634 | | MonoTime reply_time; |
635 | | MonoTime start_time; |
636 | | }; |
637 | | |
638 | | class PingTestHelper { |
639 | | public: |
640 | | PingTestHelper(CalculatorServiceProxy* proxy, size_t calls_count) |
641 | 1 | : proxy_(proxy), calls_(calls_count) { |
642 | 50.0k | for (auto& call : calls_) { |
643 | 50.0k | call.controller.set_timeout(MonoDelta::FromSeconds(1)); |
644 | 50.0k | } |
645 | 1 | } |
646 | | |
647 | 49.9k | void Launch(size_t id) { |
648 | 49.9k | PingRequestPB req; |
649 | 49.9k | auto& call = calls_[id]; |
650 | 49.9k | call.start_time = MonoTime::Now(); |
651 | 49.9k | req.set_id(id); |
652 | 49.9k | call.handle_time = MonoTime::Max(); |
653 | 49.9k | call.reply_time = MonoTime::Max(); |
654 | 49.9k | proxy_->PingAsync(req, |
655 | 49.9k | &call.response, |
656 | 49.9k | &call.controller, |
657 | 49.9k | std::bind(&PingTestHelper::Done, this, id)); |
658 | 49.9k | } |
659 | | |
660 | 50.0k | void LaunchNext() { |
661 | 50.0k | auto id = call_idx_++; |
662 | 50.0k | if (id < calls_.size()) { |
663 | 50.0k | Launch(id); |
664 | 50.0k | } |
665 | 50.0k | } |
666 | | |
667 | 50.0k | void Done(size_t idx) { |
668 | 50.0k | auto& call = calls_[idx]; |
669 | 50.0k | call.handle_time = MonoTime::FromUint64(call.response.time()); |
670 | 50.0k | call.reply_time = MonoTime::Now(); |
671 | 50.0k | call.controller.Reset(); |
672 | 50.0k | LaunchNext(); |
673 | 50.0k | auto calls_size = calls_.size(); |
674 | 50.0k | if (++done_calls_ == calls_size) { |
675 | 1 | LOG(INFO) << "Calls done"; |
676 | 1 | std::unique_lock<std::mutex> lock(mutex_); |
677 | 1 | finished_ = true; |
678 | 1 | cond_.notify_one(); |
679 | 1 | } |
680 | 50.0k | } |
681 | | |
682 | 1 | void Wait() { |
683 | 1 | std::unique_lock<std::mutex> lock(mutex_); |
684 | 2 | cond_.wait(lock, [this] { return finished_; }); |
685 | 1 | } |
686 | | |
687 | 1 | const std::vector<PingCall>& calls() const { |
688 | 1 | return calls_; |
689 | 1 | } |
690 | | |
691 | | private: |
692 | | CalculatorServiceProxy* proxy_; |
693 | | std::atomic<size_t> done_calls_ = {0}; |
694 | | std::atomic<size_t> call_idx_ = {0}; |
695 | | std::vector<PingCall> calls_; |
696 | | std::mutex mutex_; |
697 | | std::condition_variable cond_; |
698 | | bool finished_ = false; |
699 | | }; |
700 | | |
701 | | DEFINE_uint64(test_rpc_concurrency, 20, "Number of concurrent RPC requests"); |
702 | | DEFINE_int32(test_rpc_count, 50000, "Total number of RPC requests"); |
703 | | |
704 | 1 | TEST_F(RpcStubTest, TestRpcPerformance) { |
705 | 1 | FLAGS_rpc_slow_query_threshold_ms = std::numeric_limits<int32_t>::max(); |
706 | | |
707 | 1 | MessengerOptions messenger_options = kDefaultClientMessengerOptions; |
708 | 1 | messenger_options.n_reactors = 4; |
709 | 1 | proxy_cache_.reset(); |
710 | 1 | client_messenger_ = CreateAutoShutdownMessengerHolder("Client", messenger_options); |
711 | 1 | proxy_cache_ = std::make_unique<ProxyCache>(client_messenger_.get()); |
712 | 1 | CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_); |
713 | | |
714 | 1 | const size_t kWarmupCalls = 50; |
715 | 1 | const size_t concurrent = FLAGS_test_rpc_concurrency; |
716 | 1 | const size_t total_calls = kWarmupCalls + FLAGS_test_rpc_count; |
717 | | |
718 | 1 | auto start = MonoTime::Now(); |
719 | 1 | PingTestHelper helper(&p, total_calls); |
720 | 1 | { |
721 | 21 | for (uint64_t id = 0; id != concurrent; ++id) { |
722 | 20 | helper.LaunchNext(); |
723 | 20 | } |
724 | 1 | LOG(INFO) << "Warmup done, Calls left: " << total_calls - kWarmupCalls; |
725 | 1 | helper.Wait(); |
726 | 1 | } |
727 | 1 | auto finish = MonoTime::Now(); |
728 | | |
729 | 1 | #ifndef NDEBUG |
730 | 1 | const int kTimeMultiplier = 5; |
731 | | #else |
732 | | const int kTimeMultiplier = 1; |
733 | | #endif |
734 | 1 | const MonoDelta kMaxLimit = MonoDelta::FromMilliseconds(50 * kTimeMultiplier); |
735 | 1 | const MonoDelta kReplyAverageLimit = MonoDelta::FromMilliseconds(10 * kTimeMultiplier); |
736 | 1 | const MonoDelta kHandleAverageLimit = MonoDelta::FromMilliseconds(5 * kTimeMultiplier); |
737 | | |
738 | 1 | MonoDelta min_processing = MonoDelta::kMax; |
739 | 1 | MonoDelta max_processing = MonoDelta::kMin; |
740 | 1 | MonoDelta reply_sum = MonoDelta::kZero; |
741 | 1 | MonoDelta handle_sum = MonoDelta::kZero; |
742 | 1 | size_t measured_calls = 0; |
743 | 1 | size_t slow_calls = 0; |
744 | 1 | auto& calls = helper.calls(); |
745 | 50.0k | for (size_t i = kWarmupCalls; i != total_calls; ++i) { |
746 | 50.0k | const auto& call = calls[i]; |
747 | 50.0k | auto call_processing_delta = call.reply_time.GetDeltaSince(call.start_time); |
748 | 50.0k | min_processing = std::min(min_processing, call_processing_delta); |
749 | 50.0k | max_processing = std::max(max_processing, call_processing_delta); |
750 | 50.0k | if (call_processing_delta > kReplyAverageLimit) { |
751 | 0 | ++slow_calls; |
752 | 0 | } |
753 | 50.0k | reply_sum += call_processing_delta; |
754 | 50.0k | handle_sum += call.handle_time.GetDeltaSince(call.start_time); |
755 | 50.0k | ++measured_calls; |
756 | 50.0k | } |
757 | | |
758 | 1 | ASSERT_NE(measured_calls, 0); |
759 | 1 | auto reply_average = MonoDelta::FromNanoseconds(reply_sum.ToNanoseconds() / measured_calls); |
760 | 1 | auto handle_average = MonoDelta::FromNanoseconds(handle_sum.ToNanoseconds() / measured_calls); |
761 | 1 | auto passed_us = finish.GetDeltaSince(start).ToMicroseconds(); |
762 | 1 | auto us_per_call = passed_us * 1.0 / measured_calls; |
763 | 1 | LOG(INFO) << "Min: " << min_processing.ToMicroseconds() << "us, " |
764 | 1 | << "max: " << max_processing.ToMicroseconds() << "us, " |
765 | 1 | << "reply avg: " << reply_average.ToMicroseconds() << "us, " |
766 | 1 | << "handle avg: " << handle_average.ToMicroseconds() << "us"; |
767 | 1 | LOG(INFO) << "Total: " << passed_us << "us, " |
768 | 1 | << "calls per second: " << measured_calls * 1000000 / passed_us |
769 | 1 | << " (" << us_per_call << "us per call, NOT latency), " |
770 | 1 | << " slow calls: " << slow_calls * 100.0 / measured_calls << "%"; |
771 | 1 | EXPECT_PERF_LE(slow_calls * 200, measured_calls); |
772 | 1 | EXPECT_PERF_LE(max_processing, kMaxLimit); |
773 | 1 | EXPECT_PERF_LE(reply_average, kReplyAverageLimit); |
774 | 1 | EXPECT_PERF_LE(handle_average, kHandleAverageLimit); |
775 | 1 | } |
776 | | |
777 | 1 | TEST_F(RpcStubTest, IPv6) { |
778 | 1 | google::FlagSaver saver; |
779 | 1 | FLAGS_net_address_filter = "all"; |
780 | 1 | std::vector<IpAddress> addresses; |
781 | 1 | ASSERT_OK(GetLocalAddresses(&addresses, AddressFilter::ANY)); |
782 | | |
783 | 1 | IpAddress server_address; |
784 | 2 | for (const auto& address : addresses) { |
785 | 2 | if (address.is_v6()) { |
786 | 1 | LOG(INFO) << "Found IPv6 address: " << address; |
787 | 1 | server_address = address; |
788 | 1 | break; |
789 | 1 | } |
790 | 2 | } |
791 | | |
792 | 1 | ASSERT_FALSE(server_address.is_unspecified()); |
793 | 1 | TestServerOptions options; |
794 | 1 | options.endpoint = Endpoint(server_address, 0); |
795 | 1 | auto server = StartTestServer(options, "Server"); |
796 | 1 | ASSERT_TRUE(server.bound_endpoint().address().is_v6()); |
797 | 1 | auto proxy_holder = CreateCalculatorProxyHolder(server.bound_endpoint()); |
798 | 1 | auto& proxy = *proxy_holder.proxy; |
799 | | |
800 | 1 | WhoAmIRequestPB req; |
801 | 1 | WhoAmIResponsePB resp; |
802 | 1 | RpcController controller; |
803 | 1 | ASSERT_OK(proxy.WhoAmI(req, &resp, &controller)); |
804 | 1 | ASSERT_OK(controller.status()); |
805 | 1 | LOG(INFO) << "I'm " << resp.address(); |
806 | 1 | auto parsed = ParseEndpoint(resp.address(), 0); |
807 | 1 | ASSERT_TRUE(parsed.ok()); |
808 | 1 | ASSERT_TRUE(parsed->address().is_v6()); |
809 | 1 | } |
810 | | |
811 | 1 | TEST_F(RpcStubTest, ExpireInQueue) { |
812 | 1 | CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_); |
813 | | |
814 | 1 | struct Entry { |
815 | 1 | EchoRequestPB req; |
816 | 1 | boost::optional<EchoResponsePB> resp; |
817 | 1 | RpcController controller; |
818 | 1 | }; |
819 | | |
820 | 1 | std::vector<Entry> entries(10000); |
821 | | |
822 | 1 | CountDownLatch latch(entries.size()); |
823 | | |
824 | 10.0k | for (size_t i = 0; i != entries.size(); ++i) { |
825 | 10.0k | auto& entry = entries[i]; |
826 | 10.0k | entry.req.set_data(std::string(100_KB, 'X')); |
827 | 10.0k | entry.resp.emplace(); |
828 | 10.0k | entry.controller.set_timeout(1ms); |
829 | 9.99k | proxy.EchoAsync(entry.req, entry.resp.get_ptr(), &entry.controller, [&entry, &latch] { |
830 | 9.99k | auto ptr = entry.resp.get_ptr(); |
831 | 9.99k | entry.resp.reset(); |
832 | 9.99k | memset(static_cast<void*>(ptr), 'X', sizeof(*ptr)); |
833 | 9.99k | latch.CountDown(); |
834 | 9.99k | }); |
835 | 10.0k | } |
836 | | |
837 | 1 | latch.Wait(); |
838 | 1 | } |
839 | | |
840 | 1 | TEST_F(RpcStubTest, TrafficMetrics) { |
841 | 1 | constexpr size_t kStringLen = 1_KB; |
842 | 1 | constexpr size_t kUpperBytesLimit = kStringLen + 64; |
843 | | |
844 | 1 | CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_); |
845 | | |
846 | 1 | RpcController controller; |
847 | 1 | rpc_test::EchoRequestPB req; |
848 | 1 | req.set_data(RandomHumanReadableString(kStringLen)); |
849 | 1 | rpc_test::EchoResponsePB resp; |
850 | 1 | ASSERT_OK(proxy.Echo(req, &resp, &controller)); |
851 | | |
852 | 1 | auto server_metrics = server_messenger()->metric_entity()->UnsafeMetricsMapForTests(); |
853 | | |
854 | 1 | auto* service_request_bytes = down_cast<Counter*>(FindOrDie( |
855 | 1 | server_metrics, &METRIC_service_request_bytes_yb_rpc_test_CalculatorService_Echo).get()); |
856 | 1 | auto* service_response_bytes = down_cast<Counter*>(FindOrDie( |
857 | 1 | server_metrics, &METRIC_service_response_bytes_yb_rpc_test_CalculatorService_Echo).get()); |
858 | | |
859 | 1 | auto client_metrics = client_messenger_->metric_entity()->UnsafeMetricsMapForTests(); |
860 | | |
861 | 1 | auto* proxy_request_bytes = down_cast<Counter*>(FindOrDie( |
862 | 1 | client_metrics, &METRIC_proxy_request_bytes_yb_rpc_test_CalculatorService_Echo).get()); |
863 | 1 | auto* proxy_response_bytes = down_cast<Counter*>(FindOrDie( |
864 | 1 | client_metrics, &METRIC_proxy_response_bytes_yb_rpc_test_CalculatorService_Echo).get()); |
865 | | |
866 | 1 | LOG(INFO) << "Inbound request bytes: " << service_request_bytes->value() |
867 | 1 | << ", response bytes: " << service_response_bytes->value(); |
868 | 1 | LOG(INFO) << "Outbound request bytes: " << proxy_request_bytes->value() |
869 | 1 | << ", response bytes: " << proxy_response_bytes->value(); |
870 | | |
871 | | // We don't expect that sent and received bytes on client and server matches, because some |
872 | | // auxilary fields are not calculated. |
873 | | // For instance request size is taken into account on client, but not server. |
874 | 1 | ASSERT_GE(service_request_bytes->value(), kStringLen); |
875 | 1 | ASSERT_LT(service_request_bytes->value(), kUpperBytesLimit); |
876 | 1 | ASSERT_GE(service_response_bytes->value(), kStringLen); |
877 | 1 | ASSERT_LT(service_response_bytes->value(), kUpperBytesLimit); |
878 | 1 | ASSERT_GE(proxy_request_bytes->value(), kStringLen); |
879 | 1 | ASSERT_LT(proxy_request_bytes->value(), kUpperBytesLimit); |
880 | 1 | ASSERT_GE(proxy_request_bytes->value(), kStringLen); |
881 | 1 | ASSERT_LT(proxy_request_bytes->value(), kUpperBytesLimit); |
882 | 1 | } |
883 | | |
884 | | template <class T> |
885 | 10 | std::string ReversedAsString(T t) { |
886 | 10 | std::reverse(t.begin(), t.end()); |
887 | 10 | return AsString(t); |
888 | 10 | } _ZN2yb3rpc16ReversedAsStringIN6google8protobuf16RepeatedPtrFieldINSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEEEESB_T_ Line | Count | Source | 885 | 3 | std::string ReversedAsString(T t) { | 886 | 3 | std::reverse(t.begin(), t.end()); | 887 | 3 | return AsString(t); | 888 | 3 | } |
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIiEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_ Line | Count | Source | 885 | 1 | std::string ReversedAsString(T t) { | 886 | 1 | std::reverse(t.begin(), t.end()); | 887 | 1 | return AsString(t); | 888 | 1 | } |
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf16RepeatedPtrFieldINS_8rpc_test23LightweightSubMessagePBEEEEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEET_ Line | Count | Source | 885 | 2 | std::string ReversedAsString(T t) { | 886 | 2 | std::reverse(t.begin(), t.end()); | 887 | 2 | return AsString(t); | 888 | 2 | } |
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIyEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_ Line | Count | Source | 885 | 2 | std::string ReversedAsString(T t) { | 886 | 2 | std::reverse(t.begin(), t.end()); | 887 | 2 | return AsString(t); | 888 | 2 | } |
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIjEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_ Line | Count | Source | 885 | 2 | std::string ReversedAsString(T t) { | 886 | 2 | std::reverse(t.begin(), t.end()); | 887 | 2 | return AsString(t); | 888 | 2 | } |
|
889 | | |
890 | 14 | void Generate(rpc_test::LightweightSubMessagePB* sub_message) { |
891 | 14 | auto& msg = *sub_message; |
892 | 196 | for (int i = 0; i != 13; ++i) { |
893 | 182 | msg.mutable_rsi32()->Add(RandomUniformInt<int32_t>()); |
894 | 182 | } |
895 | 14 | msg.set_sf32(RandomUniformInt<int32_t>()); |
896 | 14 | msg.set_str(RandomHumanReadableString(32)); |
897 | 168 | for (int i = 0; i != 11; ++i) { |
898 | 154 | msg.mutable_rbytes()->Add(RandomHumanReadableString(32)); |
899 | 154 | } |
900 | 14 | if (RandomUniformBool()) { |
901 | 7 | Generate(msg.mutable_cycle()); |
902 | 7 | } |
903 | 14 | } |
904 | | |
905 | 1 | TEST_F(RpcStubTest, Lightweight) { |
906 | 1 | CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_); |
907 | | |
908 | 1 | RpcController controller; |
909 | 1 | rpc_test::LightweightRequestPB req; |
910 | 1 | req.set_i32(RandomUniformInt<int32_t>()); |
911 | 1 | req.set_i64(RandomUniformInt<int64_t>()); |
912 | 1 | req.set_f32(RandomUniformInt<uint32_t>()); |
913 | 1 | req.set_f64(RandomUniformInt<uint64_t>()); |
914 | 1 | req.set_u32(RandomUniformInt<uint32_t>()); |
915 | 1 | req.set_u64(RandomUniformInt<uint64_t>()); |
916 | 1 | req.set_r32(RandomUniformReal<float>()); |
917 | 1 | req.set_r64(RandomUniformReal<double>()); |
918 | | |
919 | 1 | req.set_str(RandomHumanReadableString(32)); |
920 | 1 | req.set_bytes(RandomHumanReadableString(32)); |
921 | 1 | req.set_en(rpc_test::LightweightEnum::TWO); |
922 | | |
923 | 1 | req.set_sf32(RandomUniformInt<int32_t>()); |
924 | 1 | req.set_sf64(RandomUniformInt<int64_t>()); |
925 | 1 | req.set_si32(RandomUniformInt<int32_t>()); |
926 | 1 | req.set_si64(RandomUniformInt<int64_t>()); |
927 | | |
928 | 11 | for (int i = 0; i != 10; ++i) { |
929 | 10 | req.mutable_ru32()->Add(RandomUniformInt<uint32_t>()); |
930 | 10 | } |
931 | | |
932 | 21 | for (int i = 0; i != 20; ++i) { |
933 | 20 | req.mutable_rf32()->Add(RandomUniformInt<uint32_t>()); |
934 | 20 | } |
935 | | |
936 | 8 | for (int i = 0; i != 7; ++i) { |
937 | 7 | req.mutable_rstr()->Add(RandomHumanReadableString(32)); |
938 | 7 | } |
939 | | |
940 | 1 | Generate(req.mutable_message()); |
941 | 6 | for (int i = 0; i != 5; ++i) { |
942 | 5 | Generate(req.mutable_repeated_messages()->Add()); |
943 | 5 | } |
944 | | |
945 | 128 | for (int i = 0; i != 127; ++i) { |
946 | 127 | req.mutable_packed_u64()->Add(RandomUniformInt<uint64_t>()); |
947 | 127 | } |
948 | | |
949 | 38 | for (int i = 0; i != 37; ++i) { |
950 | 37 | req.mutable_packed_f32()->Add(RandomUniformInt<uint32_t>()); |
951 | 37 | } |
952 | | |
953 | 14 | for (int i = 0; i != 13; ++i) { |
954 | 13 | auto& pair = *req.mutable_pairs()->Add(); |
955 | 13 | pair.set_s1(RandomHumanReadableString(16)); |
956 | 13 | pair.set_s2(RandomHumanReadableString(48)); |
957 | 13 | } |
958 | | |
959 | 12 | for (int i = 0; i != 11; ++i) { |
960 | 11 | (*req.mutable_map())[RandomHumanReadableString(8)] = RandomUniformInt<int64_t>(); |
961 | 11 | } |
962 | | |
963 | 1 | Generate(req.mutable_ptr_message()); |
964 | | |
965 | 1 | rpc_test::LightweightResponsePB resp; |
966 | 1 | ASSERT_OK(proxy.Lightweight(req, &resp, &controller)); |
967 | | |
968 | 1 | ASSERT_EQ(resp.i32(), -req.i32()); |
969 | 1 | ASSERT_EQ(resp.i64(), -req.i64()); |
970 | | |
971 | 1 | ASSERT_EQ(resp.f32(), req.u32()); |
972 | 1 | ASSERT_EQ(resp.u32(), req.f32()); |
973 | 1 | ASSERT_EQ(resp.f64(), req.u64()); |
974 | 1 | ASSERT_EQ(resp.u64(), req.f64()); |
975 | | |
976 | 1 | ASSERT_EQ(resp.r32(), -req.r32()); |
977 | 1 | ASSERT_EQ(resp.r64(), -req.r64()); |
978 | | |
979 | 1 | ASSERT_EQ(resp.bytes(), req.str()); |
980 | 1 | ASSERT_EQ(resp.str(), req.bytes()); |
981 | | |
982 | 1 | ASSERT_EQ(resp.en(), (req.en() + 1)); |
983 | | |
984 | 1 | ASSERT_EQ(resp.sf32(), req.si32()); |
985 | 1 | ASSERT_EQ(resp.si32(), req.sf32()); |
986 | 1 | ASSERT_EQ(resp.sf64(), req.si64()); |
987 | 1 | ASSERT_EQ(resp.si64(), req.sf64()); |
988 | | |
989 | 1 | ASSERT_EQ(AsString(resp.ru32()), AsString(req.rf32())); |
990 | 1 | ASSERT_EQ(AsString(resp.rf32()), AsString(req.ru32())); |
991 | 1 | ASSERT_EQ(AsString(resp.rstr()), ReversedAsString(req.rstr())); |
992 | | |
993 | 1 | ASSERT_EQ(resp.message().sf32(), -req.message().sf32()); |
994 | 1 | ASSERT_EQ(AsString(resp.message().rsi32()), ReversedAsString(req.message().rsi32())); |
995 | 1 | ASSERT_EQ(resp.message().str(), ">" + req.message().str() + "<"); |
996 | 1 | ASSERT_STR_EQ(AsString(resp.message().rbytes()), ReversedAsString(req.message().rbytes())); |
997 | 1 | ASSERT_STR_EQ(AsString(resp.repeated_messages()), ReversedAsString(req.repeated_messages())); |
998 | 1 | ASSERT_STR_EQ(AsString(resp.repeated_messages_copy()), AsString(req.repeated_messages())); |
999 | | |
1000 | 1 | ASSERT_STR_EQ(AsString(resp.packed_u64()), ReversedAsString(req.packed_u64())); |
1001 | 1 | ASSERT_STR_EQ(AsString(resp.packed_f32()), ReversedAsString(req.packed_f32())); |
1002 | | |
1003 | 1 | ASSERT_EQ(resp.pairs().size(), req.pairs().size()); |
1004 | 14 | for (int i = 0; i != req.pairs().size(); ++i) { |
1005 | 13 | ASSERT_EQ(resp.pairs()[i].s1(), req.pairs()[i].s2()); |
1006 | 13 | ASSERT_EQ(resp.pairs()[i].s2(), req.pairs()[i].s1()); |
1007 | 13 | } |
1008 | | |
1009 | 1 | ASSERT_STR_EQ(AsString(resp.ptr_message()), AsString(req.ptr_message())); |
1010 | | |
1011 | 11 | for (const auto& entry : resp.map()) { |
1012 | 11 | ASSERT_EQ(entry.second, req.map().at(entry.first)); |
1013 | 11 | } |
1014 | | |
1015 | 1 | req.mutable_map()->clear(); |
1016 | 1 | std::string req_str = req.ShortDebugString(); |
1017 | | |
1018 | 1 | auto lw_req = CopySharedMessage<rpc_test::LWLightweightRequestPB>(req); |
1019 | 1 | req.Clear(); |
1020 | 1 | ASSERT_STR_EQ(AsString(*lw_req), req_str); |
1021 | 1 | ASSERT_STR_EQ(AsString(resp.short_debug_string()), req_str); |
1022 | 1 | } |
1023 | | |
1024 | 1 | TEST_F(RpcStubTest, CustomServiceName) { |
1025 | 1 | SendSimpleCall(); |
1026 | | |
1027 | 1 | rpc_test::ConcatRequestPB req; |
1028 | 1 | req.set_lhs("yuga"); |
1029 | 1 | req.set_rhs("byte"); |
1030 | 1 | rpc_test::ConcatResponsePB resp; |
1031 | | |
1032 | 1 | RpcController controller; |
1033 | 1 | controller.set_timeout(30s); |
1034 | | |
1035 | 1 | rpc_test::AbacusServiceProxy proxy(proxy_cache_.get(), server_hostport_); |
1036 | 1 | ASSERT_OK(proxy.Concat(req, &resp, &controller)); |
1037 | 1 | ASSERT_EQ(resp.result(), "yugabyte"); |
1038 | 1 | } |
1039 | | |
1040 | 1 | TEST_F(RpcStubTest, Trivial) { |
1041 | 1 | CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_); |
1042 | | |
1043 | 1 | RpcController controller; |
1044 | 1 | controller.set_timeout(30s); |
1045 | | |
1046 | 1 | rpc_test::TrivialRequestPB req; |
1047 | 1 | req.set_value(42); |
1048 | 1 | rpc_test::TrivialResponsePB resp; |
1049 | 1 | ASSERT_OK(proxy.Trivial(req, &resp, &controller)); |
1050 | 1 | ASSERT_EQ(resp.value(), req.value()); |
1051 | | |
1052 | 1 | req.set_value(-1); |
1053 | 1 | controller.Reset(); |
1054 | 1 | controller.set_timeout(30s); |
1055 | 1 | ASSERT_OK(proxy.Trivial(req, &resp, &controller)); |
1056 | 1 | ASSERT_TRUE(resp.has_error()); |
1057 | 1 | ASSERT_EQ(resp.error().code(), Status::Code::kInvalidArgument); |
1058 | 1 | } |
1059 | | |
1060 | | } // namespace rpc |
1061 | | } // namespace yb |