YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_stub-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <condition_variable>
34
#include <functional>
35
#include <thread>
36
#include <vector>
37
38
#include <gtest/gtest.h>
39
40
#include "yb/gutil/stl_util.h"
41
42
#include "yb/rpc/proxy.h"
43
#include "yb/rpc/rpc-test-base.h"
44
#include "yb/rpc/rpc_controller.h"
45
#include "yb/rpc/rpc_introspection.pb.h"
46
#include "yb/rpc/rtest.proxy.h"
47
#include "yb/rpc/rtest.service.h"
48
#include "yb/rpc/yb_rpc.h"
49
50
#include "yb/util/countdown_latch.h"
51
#include "yb/util/metrics.h"
52
#include "yb/util/result.h"
53
#include "yb/util/size_literals.h"
54
#include "yb/util/status_log.h"
55
#include "yb/util/subprocess.h"
56
#include "yb/util/test_macros.h"
57
#include "yb/util/test_util.h"
58
#include "yb/util/tsan_util.h"
59
#include "yb/util/tostring.h"
60
#include "yb/util/user.h"
61
62
DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic");
63
DECLARE_bool(socket_inject_short_recvs);
64
DECLARE_int32(rpc_slow_query_threshold_ms);
65
DECLARE_int32(TEST_delay_connect_ms);
66
67
METRIC_DECLARE_counter(service_request_bytes_yb_rpc_test_CalculatorService_Echo);
68
METRIC_DECLARE_counter(service_response_bytes_yb_rpc_test_CalculatorService_Echo);
69
METRIC_DECLARE_counter(proxy_request_bytes_yb_rpc_test_CalculatorService_Echo);
70
METRIC_DECLARE_counter(proxy_response_bytes_yb_rpc_test_CalculatorService_Echo);
71
72
using namespace std::chrono_literals;
73
74
namespace yb {
75
namespace rpc {
76
77
using yb::rpc_test::AddRequestPB;
78
using yb::rpc_test::AddResponsePB;
79
using yb::rpc_test::EchoRequestPB;
80
using yb::rpc_test::EchoResponsePB;
81
using yb::rpc_test::ForwardRequestPB;
82
using yb::rpc_test::ForwardResponsePB;
83
using yb::rpc_test::PanicRequestPB;
84
using yb::rpc_test::PanicResponsePB;
85
using yb::rpc_test::SendStringsRequestPB;
86
using yb::rpc_test::SendStringsResponsePB;
87
using yb::rpc_test::SleepRequestPB;
88
using yb::rpc_test::SleepResponsePB;
89
using yb::rpc_test::WhoAmIRequestPB;
90
using yb::rpc_test::WhoAmIResponsePB;
91
using yb::rpc_test::PingRequestPB;
92
using yb::rpc_test::PingResponsePB;
93
using yb::rpc_test::DisconnectRequestPB;
94
using yb::rpc_test::DisconnectResponsePB;
95
using yb::rpc_test_diff_package::ReqDiffPackagePB;
96
using yb::rpc_test_diff_package::RespDiffPackagePB;
97
98
using std::shared_ptr;
99
using std::vector;
100
101
using rpc_test::AddRequestPartialPB;
102
using rpc_test::CalculatorServiceProxy;
103
104
class RpcStubTest : public RpcTestBase {
105
 public:
106
26
  void SetUp() override {
107
26
    RpcTestBase::SetUp();
108
26
    StartTestServerWithGeneratedCode(&server_hostport_);
109
26
    client_messenger_ = CreateAutoShutdownMessengerHolder("Client");
110
26
    proxy_cache_ = std::make_unique<ProxyCache>(client_messenger_.get());
111
26
  }
112
113
 protected:
114
103
  void SendSimpleCall() {
115
103
    CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
116
117
103
    RpcController controller;
118
103
    AddRequestPB req;
119
103
    req.set_x(10);
120
103
    req.set_y(20);
121
103
    AddResponsePB resp;
122
103
    ASSERT_OK(p.Add(req, &resp, &controller));
123
103
    ASSERT_EQ(30, resp.result());
124
103
  }
125
126
  template <class T>
127
  struct ProxyWithMessenger {
128
    AutoShutdownMessengerHolder messenger;
129
    std::unique_ptr<T> proxy;
130
  };
131
132
3
  ProxyWithMessenger<CalculatorServiceProxy> CreateCalculatorProxyHolder(const Endpoint& remote) {
133
3
    auto messenger = CreateAutoShutdownMessengerHolder("Client");
134
3
    IpAddress local_address = remote.address().is_v6()
135
1
        ? IpAddress(boost::asio::ip::address_v6::loopback())
136
2
        : IpAddress(boost::asio::ip::address_v4::loopback());
137
    // To have outbound calls with appropriate address
138
3
    EXPECT_OK(messenger->ListenAddress(
139
3
        CreateConnectionContextFactory<YBInboundConnectionContext>(),
140
3
        Endpoint(local_address, 0)));
141
3
    EXPECT_OK(messenger->StartAcceptor());
142
3
    EXPECT_FALSE(messenger->io_service().stopped());
143
3
    ProxyCache proxy_cache(messenger.get());
144
3
    return { move(messenger),
145
3
        std::make_unique<CalculatorServiceProxy>(&proxy_cache, HostPort(remote)) };
146
3
  }
147
148
  HostPort server_hostport_;
149
  AutoShutdownMessengerHolder client_messenger_;
150
  std::unique_ptr<ProxyCache> proxy_cache_;
151
};
152
153
1
TEST_F(RpcStubTest, TestSimpleCall) {
154
1
  SendSimpleCall();
155
1
}
156
157
1
TEST_F(RpcStubTest, ConnectTimeout) {
158
1
  FLAGS_TEST_delay_connect_ms = 5000;
159
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
160
1
  const MonoDelta kWaitTime = 1s;
161
1
  const MonoDelta kAllowedError = 100ms;
162
163
1
  RpcController controller;
164
1
  controller.set_timeout(kWaitTime);
165
1
  AddRequestPB req;
166
1
  req.set_x(10);
167
1
  req.set_y(20);
168
1
  AddResponsePB resp;
169
1
  auto start = MonoTime::Now();
170
1
  auto status = p.Add(req, &resp, &controller);
171
1
  auto passed = MonoTime::Now() - start;
172
2
  ASSERT_TRUE(status.IsTimedOut()) << "Status: " << status;
173
1
  ASSERT_GE(passed, kWaitTime - kAllowedError);
174
1
  ASSERT_LE(passed, kWaitTime + kAllowedError);
175
176
1
  SendSimpleCall();
177
1
}
178
179
1
TEST_F(RpcStubTest, RandomTimeout) {
180
1
  const size_t kTotalCalls = 1000;
181
1
  const MonoDelta kMaxTimeout = 2s;
182
183
1
  FLAGS_TEST_delay_connect_ms = narrow_cast<int>(kMaxTimeout.ToMilliseconds() / 2);
184
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
185
186
1
  struct CallData {
187
1
    RpcController controller;
188
1
    AddRequestPB req;
189
1
    AddResponsePB resp;
190
1
  };
191
1
  std::vector<CallData> calls(kTotalCalls);
192
1
  CountDownLatch latch(kTotalCalls);
193
194
1.00k
  for (auto& call : calls) {
195
1.00k
    auto timeout = MonoDelta::FromMilliseconds(
196
1.00k
        RandomUniformInt<int64_t>(0, kMaxTimeout.ToMilliseconds()));
197
1.00k
    call.controller.set_timeout(timeout);
198
1.00k
    call.req.set_x(RandomUniformInt(-1000, 1000));
199
1.00k
    call.req.set_y(RandomUniformInt(-1000, 1000));
200
1.00k
    p.AddAsync(call.req, &call.resp, &call.controller, [&latch] {
201
1.00k
      latch.CountDown();
202
1.00k
    });
203
1.00k
  }
204
205
1
  ASSERT_TRUE(latch.WaitFor(kMaxTimeout));
206
207
1
  size_t timed_out = 0;
208
1.00k
  for (auto& call : calls) {
209
1.00k
    if (call.controller.status().IsTimedOut()) {
210
556
      ++timed_out;
211
444
    } else {
212
444
      ASSERT_OK(call.controller.status());
213
444
      ASSERT_EQ(call.req.x() + call.req.y(), call.resp.result());
214
444
    }
215
1.00k
  }
216
217
1
  LOG(INFO) << "Timed out calls: " << timed_out;
218
219
  // About half of calls should expire, so we do a bit more relaxed checks.
220
1
  ASSERT_GT(timed_out, kTotalCalls / 4);
221
1
  ASSERT_LT(timed_out, kTotalCalls * 3 / 4);
222
1
}
223
224
// Regression test for a bug in which we would not properly parse a call
225
// response when recv() returned a 'short read'. This injects such short
226
// reads and then makes a number of calls.
227
1
TEST_F(RpcStubTest, TestShortRecvs) {
228
1
  google::FlagSaver saver;
229
1
  FLAGS_socket_inject_short_recvs = true;
230
231
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
232
233
101
  for (int i = 0; i < 100; i++) {
234
100
    ASSERT_NO_FATALS(SendSimpleCall());
235
100
  }
236
1
}
237
238
void CheckForward(CalculatorServiceProxy* proxy,
239
                  const Endpoint& endpoint,
240
8
                  const std::string& expected) {
241
8
  ForwardRequestPB req;
242
8
  if (!endpoint.address().is_unspecified()) {
243
6
    req.set_host(endpoint.address().to_string());
244
6
    req.set_port(endpoint.port());
245
6
  }
246
8
  ForwardResponsePB resp;
247
248
8
  RpcController controller;
249
8
  controller.set_timeout(1s);
250
8
  auto status = proxy->Forward(req, &resp, &controller);
251
8
  if (expected.empty()) {
252
2
    LOG(INFO) << "Call status: " << status;
253
4
    ASSERT_NOK(status) << "Name: " << resp.name();
254
6
  } else {
255
6
    ASSERT_OK(status);
256
6
    ASSERT_EQ(expected, resp.name());
257
6
  }
258
8
}
259
260
// Test making successful RPC calls.
261
1
TEST_F(RpcStubTest, TestIncoherence) {
262
1
  static const std::string kServer1Name = "Server1";
263
1
  TestServerOptions server1options;
264
1
  server1options.endpoint = Endpoint(IpAddress::from_string("127.0.0.11"), 0);
265
1
  static const std::string kServer2Name = "Server2";
266
1
  TestServerOptions server2options;
267
1
  server2options.endpoint = Endpoint(IpAddress::from_string("127.0.0.12"), 0);
268
269
1
  auto server1 = StartTestServer(server1options, kServer1Name);
270
1
  auto proxy1holder = CreateCalculatorProxyHolder(server1.bound_endpoint());
271
1
  auto& proxy1 = *proxy1holder.proxy;
272
1
  auto server2 = StartTestServer(server2options, kServer2Name);
273
1
  auto proxy2holder = CreateCalculatorProxyHolder(server2.bound_endpoint());
274
1
  auto& proxy2 = *proxy2holder.proxy;
275
276
1
  ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), kServer2Name));
277
1
  ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), kServer1Name));
278
279
1
  server2.messenger()->BreakConnectivityWith(server1.bound_endpoint().address());
280
281
1
  LOG(INFO) << "Checking connectivity";
282
  // No connection between servers.
283
1
  ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), std::string()));
284
  // We could connect to server1.
285
1
  ASSERT_NO_FATALS(CheckForward(&proxy1, Endpoint(), kServer1Name));
286
  // No connection between servers.
287
1
  ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), std::string()));
288
  // We could connect to server2.
289
1
  ASSERT_NO_FATALS(CheckForward(&proxy2, Endpoint(), kServer2Name));
290
291
1
  server2.messenger()->RestoreConnectivityWith(server1.bound_endpoint().address());
292
1
  ASSERT_NO_FATALS(CheckForward(&proxy1, server2.bound_endpoint(), kServer2Name));
293
1
  ASSERT_NO_FATALS(CheckForward(&proxy2, server1.bound_endpoint(), kServer1Name));
294
1
}
295
296
// Test calls which are rather large.
297
// This test sends many of them at once using the async API and then
298
// waits for them all to return. This is meant to ensure that the
299
// IO threads can deal with read/write calls that don't succeed
300
// in sending the entire data in one go.
301
1
TEST_F(RpcStubTest, TestBigCallData) {
302
1
  constexpr int kNumSentAtOnce = 1;
303
1
  constexpr size_t kMessageSize = NonTsanVsTsan(32_MB, 4_MB);
304
305
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
306
307
1
  EchoRequestPB req;
308
1
  req.set_data(RandomHumanReadableString(kMessageSize));
309
310
1
  std::vector<EchoResponsePB> resps(kNumSentAtOnce);
311
1
  std::vector<RpcController> controllers(kNumSentAtOnce);
312
313
1
  CountDownLatch latch(kNumSentAtOnce);
314
2
  for (int i = 0; i < kNumSentAtOnce; i++) {
315
1
    auto resp = &resps[i];
316
1
    auto controller = &controllers[i];
317
1
    controller->set_timeout(60s);
318
319
1
    p.EchoAsync(req, resp, controller, [&latch]() { latch.CountDown(); });
320
1
  }
321
322
1
  latch.Wait();
323
324
1
  for (RpcController &c : controllers) {
325
1
    EXPECT_OK(c.status());
326
1
  }
327
328
1
  for (auto& resp : resps) {
329
1
    ASSERT_EQ(resp.data(), req.data());
330
1
  }
331
1
}
332
333
1
TEST_F(RpcStubTest, TestRespondDeferred) {
334
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
335
336
1
  RpcController controller;
337
1
  SleepRequestPB req;
338
1
  req.set_sleep_micros(1000);
339
1
  req.set_deferred(true);
340
1
  SleepResponsePB resp;
341
1
  ASSERT_OK(p.Sleep(req, &resp, &controller));
342
1
}
343
344
// Test that the default user credentials are propagated to the server.
345
1
TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
346
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
347
348
1
  string expected = ASSERT_RESULT(GetLoggedInUser());
349
350
1
  RpcController controller;
351
1
  WhoAmIRequestPB req;
352
1
  WhoAmIResponsePB resp;
353
1
  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
354
1
}
355
356
// Test that the user can specify other credentials.
357
1
TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
358
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
359
360
1
  RpcController controller;
361
1
  WhoAmIRequestPB req;
362
1
  WhoAmIResponsePB resp;
363
1
  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
364
1
}
365
366
// Test that the user's remote address is accessible to the server.
367
1
TEST_F(RpcStubTest, TestRemoteAddress) {
368
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
369
370
1
  RpcController controller;
371
1
  WhoAmIRequestPB req;
372
1
  WhoAmIResponsePB resp;
373
1
  ASSERT_OK(p.WhoAmI(req, &resp, &controller));
374
1
  ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:");
375
1
}
376
377
////////////////////////////////////////////////////////////
378
// Tests for error cases
379
////////////////////////////////////////////////////////////
380
381
// Test sending a PB parameter with a missing field, where the client
382
// thinks it has sent a full PB. (eg due to version mismatch)
383
1
TEST_F(RpcStubTest, TestCallWithInvalidParam) {
384
1
  Proxy p(client_messenger_.get(), server_hostport_);
385
386
1
  AddRequestPartialPB req;
387
1
  req.set_x(RandomUniformInt<uint32_t>());
388
  // AddRequestPartialPB is missing the 'y' field.
389
1
  AddResponsePB resp;
390
1
  RpcController controller;
391
1
  Status s = p.SyncRequest(
392
1
      CalculatorServiceMethods::AddMethod(), /* method_metrics= */ nullptr, req, &resp,
393
1
      &controller);
394
2
  ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s;
395
  // Remote error messages always contain file name and line number.
396
1
  ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument (");
397
1
  ASSERT_STR_CONTAINS(s.ToString(),
398
1
                      "Invalid parameter for call yb.rpc_test.CalculatorService.Add: y");
399
1
}
400
401
// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old'
402
// value, and our callback needs to be a void function.
403
1
static void DoIncrement(Atomic32* count) {
404
1
  base::subtle::Barrier_AtomicIncrement(count, 1);
405
1
}
406
407
// Test sending a PB parameter with a missing field on the client side.
408
// This also ensures that the async callback is only called once
409
// (regression test for a previously-encountered bug).
410
1
TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
411
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
412
413
1
  RpcController controller;
414
1
  AddRequestPB req;
415
1
  req.set_x(10);
416
  // Request is missing the 'y' field.
417
1
  AddResponsePB resp;
418
1
  Atomic32 callback_count = 0;
419
1
  p.AddAsync(req, &resp, &controller, std::bind(&DoIncrement, &callback_count));
420
1
  while (NoBarrier_Load(&callback_count) == 0) {
421
0
    SleepFor(MonoDelta::FromMicroseconds(10));
422
0
  }
423
1
  SleepFor(MonoDelta::FromMicroseconds(100));
424
1
  ASSERT_EQ(1, NoBarrier_Load(&callback_count));
425
1
  ASSERT_STR_CONTAINS(controller.status().ToString(false),
426
1
                      "Invalid argument: RPC argument missing required fields: y");
427
1
}
428
429
// Test sending a call which isn't implemented by the server.
430
1
TEST_F(RpcStubTest, TestCallMissingMethod) {
431
1
  Proxy proxy(client_messenger_.get(), server_hostport_);
432
433
1
  RemoteMethod method(yb::rpc_test::CalculatorServiceIf::static_service_name(), "DoesNotExist");
434
1
  Status s = DoTestSyncCall(&proxy, &method);
435
2
  ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
436
1
  ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist");
437
1
}
438
439
1
TEST_F(RpcStubTest, TestApplicationError) {
440
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
441
442
1
  RpcController controller;
443
1
  SleepRequestPB req;
444
1
  SleepResponsePB resp;
445
1
  req.set_sleep_micros(1);
446
1
  req.set_return_app_error(true);
447
1
  Status s = p.Sleep(req, &resp, &controller);
448
1
  ASSERT_TRUE(s.IsRemoteError());
449
1
  EXPECT_EQ("Remote error: Got some error", s.ToString(false));
450
1
  EXPECT_EQ("message: \"Got some error\"\n"
451
1
            "[yb.rpc_test.CalculatorError.app_error_ext] {\n"
452
1
            "  extra_error_data: \"some application-specific error data\"\n"
453
1
            "}\n", controller.error_response()->DebugString());
454
1
}
455
456
2
TEST_F(RpcStubTest, TestRpcPanic) {
457
2
  if (!FLAGS_is_panic_test_child) {
458
    // This is a poor man's death test. We call this same
459
    // test case, but set the above flag, and verify that
460
    // it aborted. gtest death tests don't work here because
461
    // there are already threads started up.
462
1
    vector<string> argv;
463
1
    string executable_path;
464
1
    CHECK_OK(env_->GetExecutablePath(&executable_path));
465
1
    argv.push_back(executable_path);
466
1
    argv.push_back("--is_panic_test_child");
467
1
    argv.push_back("--gtest_filter=RpcStubTest.TestRpcPanic");
468
469
1
    Subprocess subp(argv[0], argv);
470
1
    subp.PipeParentStderr();
471
1
    CHECK_OK(subp.Start());
472
1
    FILE* in = fdopen(subp.from_child_stderr_fd(), "r");
473
1
    PCHECK(in);
474
475
    // Search for string "Test method panicking!" somewhere in stderr
476
1
    std::string error_message;
477
1
    char buf[1024];
478
91
    while (fgets(buf, sizeof(buf), in)) {
479
90
      error_message += buf;
480
90
    }
481
1
    ASSERT_STR_CONTAINS(error_message, "Test method panicking!");
482
483
    // Check return status
484
1
    int wait_status = 0;
485
1
    CHECK_OK(subp.Wait(&wait_status));
486
1
    CHECK(!WIFEXITED(wait_status)); // should not have been successful
487
1
    if (WIFSIGNALED(wait_status)) {
488
1
      CHECK_EQ(WTERMSIG(wait_status), SIGABRT);
489
0
    } else {
490
      // On some systems, we get exit status 134 from SIGABRT rather than
491
      // WIFSIGNALED getting flagged.
492
0
      CHECK_EQ(WEXITSTATUS(wait_status), 134);
493
0
    }
494
1
    return;
495
1
  } else {
496
    // Before forcing the panic, explicitly remove the test directory. This
497
    // should be safe; this test doesn't generate any data.
498
1
    CHECK_OK(env_->DeleteRecursively(GetTestDataDirectory()));
499
500
    // Make an RPC which causes the server to abort.
501
1
    CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
502
1
    RpcController controller;
503
1
    PanicRequestPB req;
504
1
    PanicResponsePB resp;
505
1
    ASSERT_OK(p.Panic(req, &resp, &controller));
506
1
  }
507
2
}
508
509
struct AsyncSleep {
510
  RpcController rpc;
511
  SleepRequestPB req;
512
  SleepResponsePB resp;
513
};
514
515
1
TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
516
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
517
1
  vector<AsyncSleep*> sleeps;
518
1
  ElementDeleter d(&sleeps);
519
520
  // Send enough sleep calls to occupy the worker threads.
521
1
  auto count = client_messenger_->max_concurrent_requests() * 4;
522
1
  CountDownLatch latch(count);
523
33
  for (size_t i = 0; i < count; i++) {
524
32
    auto sleep = std::make_unique<AsyncSleep>();
525
32
    sleep->rpc.set_timeout(10s);
526
32
    sleep->req.set_sleep_micros(500 * 1000); // 100ms
527
32
    p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc, [&latch]() { latch.CountDown(); });
528
32
    sleeps.push_back(sleep.release());
529
32
  }
530
531
  // Send another call with a short timeout. This shouldn't get processed, because
532
  // it'll get stuck in the queue for longer than its timeout.
533
1
  RpcController rpc;
534
1
  SleepRequestPB req;
535
1
  SleepResponsePB resp;
536
1
  req.set_sleep_micros(1000);
537
1
  rpc.set_timeout(250ms);
538
1
  Status s = p.Sleep(req, &resp, &rpc);
539
2
  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
540
541
1
  latch.Wait();
542
543
  // Verify that the timedout call got short circuited before being processed.
544
1
  const Counter* timed_out_in_queue = server().service_pool().RpcsTimedOutInQueueMetricForTests();
545
1
  ASSERT_EQ(1, timed_out_in_queue->value());
546
1
}
547
548
1
TEST_F(RpcStubTest, TestDumpCallsInFlight) {
549
1
  CountDownLatch latch(1);
550
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
551
1
  AsyncSleep sleep;
552
1
  sleep.req.set_sleep_micros(1000 * 1000); // 100ms
553
1
  p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, [&latch]() { latch.CountDown(); });
554
555
  // Check the running RPC status on the client messenger.
556
1
  DumpRunningRpcsRequestPB dump_req;
557
1
  DumpRunningRpcsResponsePB dump_resp;
558
1
  dump_req.set_include_traces(true);
559
560
1
  std::this_thread::sleep_for(10ms);
561
1
  ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
562
1
  LOG(INFO) << "client messenger: " << dump_resp.DebugString();
563
1
  ASSERT_EQ(1, dump_resp.outbound_connections_size());
564
1
  ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
565
1
  ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0).
566
1
                        header().remote_method().method_name());
567
1
  ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).elapsed_millis(), 0);
568
569
  // And the server messenger.
570
  // We have to loop this until we find a result since the actual call is sent
571
  // asynchronously off of the main thread (ie the server may not be handling it yet)
572
1
  for (int i = 0; i < 100; i++) {
573
1
    dump_resp.Clear();
574
1
    ASSERT_OK(server_messenger()->DumpRunningRpcs(dump_req, &dump_resp));
575
1
    if (dump_resp.inbound_connections_size() > 0 &&
576
1
        dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
577
1
      break;
578
1
    }
579
0
    SleepFor(MonoDelta::FromMilliseconds(1));
580
0
  }
581
582
1
  LOG(INFO) << "server messenger: " << dump_resp.DebugString();
583
1
  ASSERT_EQ(1, dump_resp.inbound_connections_size());
584
1
  ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size());
585
1
  ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0).
586
1
                        header().remote_method().method_name());
587
1
  ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).elapsed_millis(), 0);
588
1
  ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(),
589
1
                      "Inserting onto call queue");
590
1
  latch.Wait();
591
1
}
592
593
namespace {
594
struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> {
595
};
596
597
// Test callback which takes a refcounted pointer.
598
// We don't use this parameter, but it's used to validate that the bound callback
599
// is cleared in TestCallbackClearedAfterRunning.
600
1
void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) {
601
1
  latch->CountDown();
602
1
}
603
} // anonymous namespace
604
605
// Verify that, after a call has returned, no copy of the call's callback
606
// is held. This is important when the callback holds a refcounted ptr,
607
// since we expect to be able to release that pointer when the call is done.
608
1
TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
609
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
610
611
1
  CountDownLatch latch(1);
612
1
  scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest);
613
1
  RpcController controller;
614
1
  AddRequestPB req;
615
1
  req.set_x(10);
616
1
  req.set_y(20);
617
1
  AddResponsePB resp;
618
1
  p.AddAsync(req, &resp, &controller, std::bind(MyTestCallback, &latch, my_refptr));
619
1
  latch.Wait();
620
621
  // The ref count should go back down to 1. However, we need to loop a little
622
  // bit, since the deref is happening on another thread. If the other thread gets
623
  // descheduled directly after calling our callback, we'd fail without these sleeps.
624
1
  for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) {
625
0
    SleepFor(MonoDelta::FromMilliseconds(1));
626
0
  }
627
1
  ASSERT_TRUE(my_refptr->HasOneRef());
628
1
}
629
630
struct PingCall {
631
  PingResponsePB response;
632
  RpcController controller;
633
  MonoTime handle_time;
634
  MonoTime reply_time;
635
  MonoTime start_time;
636
};
637
638
class PingTestHelper {
639
 public:
640
  PingTestHelper(CalculatorServiceProxy* proxy, size_t calls_count)
641
1
      : proxy_(proxy), calls_(calls_count) {
642
50.0k
    for (auto& call : calls_) {
643
50.0k
      call.controller.set_timeout(MonoDelta::FromSeconds(1));
644
50.0k
    }
645
1
  }
646
647
49.9k
  void Launch(size_t id) {
648
49.9k
    PingRequestPB req;
649
49.9k
    auto& call = calls_[id];
650
49.9k
    call.start_time = MonoTime::Now();
651
49.9k
    req.set_id(id);
652
49.9k
    call.handle_time = MonoTime::Max();
653
49.9k
    call.reply_time = MonoTime::Max();
654
49.9k
    proxy_->PingAsync(req,
655
49.9k
                      &call.response,
656
49.9k
                      &call.controller,
657
49.9k
                      std::bind(&PingTestHelper::Done, this, id));
658
49.9k
  }
659
660
50.0k
  void LaunchNext() {
661
50.0k
    auto id = call_idx_++;
662
50.0k
    if (id < calls_.size()) {
663
50.0k
      Launch(id);
664
50.0k
    }
665
50.0k
  }
666
667
50.0k
  void Done(size_t idx) {
668
50.0k
    auto& call = calls_[idx];
669
50.0k
    call.handle_time = MonoTime::FromUint64(call.response.time());
670
50.0k
    call.reply_time = MonoTime::Now();
671
50.0k
    call.controller.Reset();
672
50.0k
    LaunchNext();
673
50.0k
    auto calls_size = calls_.size();
674
50.0k
    if (++done_calls_ == calls_size) {
675
1
      LOG(INFO) << "Calls done";
676
1
      std::unique_lock<std::mutex> lock(mutex_);
677
1
      finished_ = true;
678
1
      cond_.notify_one();
679
1
    }
680
50.0k
  }
681
682
1
  void Wait() {
683
1
    std::unique_lock<std::mutex> lock(mutex_);
684
2
    cond_.wait(lock, [this] { return finished_; });
685
1
  }
686
687
1
  const std::vector<PingCall>& calls() const {
688
1
    return calls_;
689
1
  }
690
691
 private:
692
  CalculatorServiceProxy* proxy_;
693
  std::atomic<size_t> done_calls_ = {0};
694
  std::atomic<size_t> call_idx_ = {0};
695
  std::vector<PingCall> calls_;
696
  std::mutex mutex_;
697
  std::condition_variable cond_;
698
  bool finished_ = false;
699
};
700
701
DEFINE_uint64(test_rpc_concurrency, 20, "Number of concurrent RPC requests");
702
DEFINE_int32(test_rpc_count, 50000, "Total number of RPC requests");
703
704
1
TEST_F(RpcStubTest, TestRpcPerformance) {
705
1
  FLAGS_rpc_slow_query_threshold_ms = std::numeric_limits<int32_t>::max();
706
707
1
  MessengerOptions messenger_options = kDefaultClientMessengerOptions;
708
1
  messenger_options.n_reactors = 4;
709
1
  proxy_cache_.reset();
710
1
  client_messenger_ = CreateAutoShutdownMessengerHolder("Client", messenger_options);
711
1
  proxy_cache_ = std::make_unique<ProxyCache>(client_messenger_.get());
712
1
  CalculatorServiceProxy p(proxy_cache_.get(), server_hostport_);
713
714
1
  const size_t kWarmupCalls = 50;
715
1
  const size_t concurrent = FLAGS_test_rpc_concurrency;
716
1
  const size_t total_calls = kWarmupCalls + FLAGS_test_rpc_count;
717
718
1
  auto start = MonoTime::Now();
719
1
  PingTestHelper helper(&p, total_calls);
720
1
  {
721
21
    for (uint64_t id = 0; id != concurrent; ++id) {
722
20
      helper.LaunchNext();
723
20
    }
724
1
    LOG(INFO) << "Warmup done, Calls left: " << total_calls - kWarmupCalls;
725
1
    helper.Wait();
726
1
  }
727
1
  auto finish = MonoTime::Now();
728
729
1
#ifndef NDEBUG
730
1
  const int kTimeMultiplier = 5;
731
#else
732
  const int kTimeMultiplier = 1;
733
#endif
734
1
  const MonoDelta kMaxLimit = MonoDelta::FromMilliseconds(50 * kTimeMultiplier);
735
1
  const MonoDelta kReplyAverageLimit = MonoDelta::FromMilliseconds(10 * kTimeMultiplier);
736
1
  const MonoDelta kHandleAverageLimit = MonoDelta::FromMilliseconds(5 * kTimeMultiplier);
737
738
1
  MonoDelta min_processing = MonoDelta::kMax;
739
1
  MonoDelta max_processing = MonoDelta::kMin;
740
1
  MonoDelta reply_sum = MonoDelta::kZero;
741
1
  MonoDelta handle_sum = MonoDelta::kZero;
742
1
  size_t measured_calls = 0;
743
1
  size_t slow_calls = 0;
744
1
  auto& calls = helper.calls();
745
50.0k
  for (size_t i = kWarmupCalls; i != total_calls; ++i) {
746
50.0k
    const auto& call = calls[i];
747
50.0k
    auto call_processing_delta = call.reply_time.GetDeltaSince(call.start_time);
748
50.0k
    min_processing = std::min(min_processing, call_processing_delta);
749
50.0k
    max_processing = std::max(max_processing, call_processing_delta);
750
50.0k
    if (call_processing_delta > kReplyAverageLimit) {
751
0
      ++slow_calls;
752
0
    }
753
50.0k
    reply_sum += call_processing_delta;
754
50.0k
    handle_sum += call.handle_time.GetDeltaSince(call.start_time);
755
50.0k
    ++measured_calls;
756
50.0k
  }
757
758
1
  ASSERT_NE(measured_calls, 0);
759
1
  auto reply_average = MonoDelta::FromNanoseconds(reply_sum.ToNanoseconds() / measured_calls);
760
1
  auto handle_average = MonoDelta::FromNanoseconds(handle_sum.ToNanoseconds() / measured_calls);
761
1
  auto passed_us = finish.GetDeltaSince(start).ToMicroseconds();
762
1
  auto us_per_call = passed_us * 1.0 / measured_calls;
763
1
  LOG(INFO) << "Min: " << min_processing.ToMicroseconds() << "us, "
764
1
            << "max: " << max_processing.ToMicroseconds() << "us, "
765
1
            << "reply avg: " << reply_average.ToMicroseconds() << "us, "
766
1
            << "handle avg: " << handle_average.ToMicroseconds() << "us";
767
1
  LOG(INFO) << "Total: " << passed_us << "us, "
768
1
            << "calls per second: " << measured_calls * 1000000 / passed_us
769
1
            << " (" << us_per_call << "us per call, NOT latency), "
770
1
            << " slow calls: " << slow_calls * 100.0 / measured_calls << "%";
771
1
  EXPECT_PERF_LE(slow_calls * 200, measured_calls);
772
1
  EXPECT_PERF_LE(max_processing, kMaxLimit);
773
1
  EXPECT_PERF_LE(reply_average, kReplyAverageLimit);
774
1
  EXPECT_PERF_LE(handle_average, kHandleAverageLimit);
775
1
}
776
777
1
TEST_F(RpcStubTest, IPv6) {
778
1
  google::FlagSaver saver;
779
1
  FLAGS_net_address_filter = "all";
780
1
  std::vector<IpAddress> addresses;
781
1
  ASSERT_OK(GetLocalAddresses(&addresses, AddressFilter::ANY));
782
783
1
  IpAddress server_address;
784
2
  for (const auto& address : addresses) {
785
2
    if (address.is_v6()) {
786
1
      LOG(INFO) << "Found IPv6 address: " << address;
787
1
      server_address = address;
788
1
      break;
789
1
    }
790
2
  }
791
792
1
  ASSERT_FALSE(server_address.is_unspecified());
793
1
  TestServerOptions options;
794
1
  options.endpoint = Endpoint(server_address, 0);
795
1
  auto server = StartTestServer(options, "Server");
796
1
  ASSERT_TRUE(server.bound_endpoint().address().is_v6());
797
1
  auto proxy_holder = CreateCalculatorProxyHolder(server.bound_endpoint());
798
1
  auto& proxy = *proxy_holder.proxy;
799
800
1
  WhoAmIRequestPB req;
801
1
  WhoAmIResponsePB resp;
802
1
  RpcController controller;
803
1
  ASSERT_OK(proxy.WhoAmI(req, &resp, &controller));
804
1
  ASSERT_OK(controller.status());
805
1
  LOG(INFO) << "I'm " << resp.address();
806
1
  auto parsed = ParseEndpoint(resp.address(), 0);
807
1
  ASSERT_TRUE(parsed.ok());
808
1
  ASSERT_TRUE(parsed->address().is_v6());
809
1
}
810
811
1
TEST_F(RpcStubTest, ExpireInQueue) {
812
1
  CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_);
813
814
1
  struct Entry {
815
1
    EchoRequestPB req;
816
1
    boost::optional<EchoResponsePB> resp;
817
1
    RpcController controller;
818
1
  };
819
820
1
  std::vector<Entry> entries(10000);
821
822
1
  CountDownLatch latch(entries.size());
823
824
10.0k
  for (size_t i = 0; i != entries.size(); ++i) {
825
10.0k
    auto& entry = entries[i];
826
10.0k
    entry.req.set_data(std::string(100_KB, 'X'));
827
10.0k
    entry.resp.emplace();
828
10.0k
    entry.controller.set_timeout(1ms);
829
9.99k
    proxy.EchoAsync(entry.req, entry.resp.get_ptr(), &entry.controller, [&entry, &latch] {
830
9.99k
      auto ptr = entry.resp.get_ptr();
831
9.99k
      entry.resp.reset();
832
9.99k
      memset(static_cast<void*>(ptr), 'X', sizeof(*ptr));
833
9.99k
      latch.CountDown();
834
9.99k
    });
835
10.0k
  }
836
837
1
  latch.Wait();
838
1
}
839
840
1
TEST_F(RpcStubTest, TrafficMetrics) {
841
1
  constexpr size_t kStringLen = 1_KB;
842
1
  constexpr size_t kUpperBytesLimit = kStringLen + 64;
843
844
1
  CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_);
845
846
1
  RpcController controller;
847
1
  rpc_test::EchoRequestPB req;
848
1
  req.set_data(RandomHumanReadableString(kStringLen));
849
1
  rpc_test::EchoResponsePB resp;
850
1
  ASSERT_OK(proxy.Echo(req, &resp, &controller));
851
852
1
  auto server_metrics = server_messenger()->metric_entity()->UnsafeMetricsMapForTests();
853
854
1
  auto* service_request_bytes = down_cast<Counter*>(FindOrDie(
855
1
      server_metrics, &METRIC_service_request_bytes_yb_rpc_test_CalculatorService_Echo).get());
856
1
  auto* service_response_bytes = down_cast<Counter*>(FindOrDie(
857
1
      server_metrics, &METRIC_service_response_bytes_yb_rpc_test_CalculatorService_Echo).get());
858
859
1
  auto client_metrics = client_messenger_->metric_entity()->UnsafeMetricsMapForTests();
860
861
1
  auto* proxy_request_bytes = down_cast<Counter*>(FindOrDie(
862
1
      client_metrics, &METRIC_proxy_request_bytes_yb_rpc_test_CalculatorService_Echo).get());
863
1
  auto* proxy_response_bytes = down_cast<Counter*>(FindOrDie(
864
1
      client_metrics, &METRIC_proxy_response_bytes_yb_rpc_test_CalculatorService_Echo).get());
865
866
1
  LOG(INFO) << "Inbound request bytes: " << service_request_bytes->value()
867
1
            << ", response bytes: " << service_response_bytes->value();
868
1
  LOG(INFO) << "Outbound request bytes: " << proxy_request_bytes->value()
869
1
            << ", response bytes: " << proxy_response_bytes->value();
870
871
  // We don't expect that sent and received bytes on client and server matches, because some
872
  // auxilary fields are not calculated.
873
  // For instance request size is taken into account on client, but not server.
874
1
  ASSERT_GE(service_request_bytes->value(), kStringLen);
875
1
  ASSERT_LT(service_request_bytes->value(), kUpperBytesLimit);
876
1
  ASSERT_GE(service_response_bytes->value(), kStringLen);
877
1
  ASSERT_LT(service_response_bytes->value(), kUpperBytesLimit);
878
1
  ASSERT_GE(proxy_request_bytes->value(), kStringLen);
879
1
  ASSERT_LT(proxy_request_bytes->value(), kUpperBytesLimit);
880
1
  ASSERT_GE(proxy_request_bytes->value(), kStringLen);
881
1
  ASSERT_LT(proxy_request_bytes->value(), kUpperBytesLimit);
882
1
}
883
884
template <class T>
885
10
std::string ReversedAsString(T t) {
886
10
  std::reverse(t.begin(), t.end());
887
10
  return AsString(t);
888
10
}
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf16RepeatedPtrFieldINSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEEEESB_T_
Line
Count
Source
885
3
std::string ReversedAsString(T t) {
886
3
  std::reverse(t.begin(), t.end());
887
3
  return AsString(t);
888
3
}
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIiEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_
Line
Count
Source
885
1
std::string ReversedAsString(T t) {
886
1
  std::reverse(t.begin(), t.end());
887
1
  return AsString(t);
888
1
}
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf16RepeatedPtrFieldINS_8rpc_test23LightweightSubMessagePBEEEEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEET_
Line
Count
Source
885
2
std::string ReversedAsString(T t) {
886
2
  std::reverse(t.begin(), t.end());
887
2
  return AsString(t);
888
2
}
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIyEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_
Line
Count
Source
885
2
std::string ReversedAsString(T t) {
886
2
  std::reverse(t.begin(), t.end());
887
2
  return AsString(t);
888
2
}
_ZN2yb3rpc16ReversedAsStringIN6google8protobuf13RepeatedFieldIjEEEENSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEET_
Line
Count
Source
885
2
std::string ReversedAsString(T t) {
886
2
  std::reverse(t.begin(), t.end());
887
2
  return AsString(t);
888
2
}
889
890
14
void Generate(rpc_test::LightweightSubMessagePB* sub_message) {
891
14
  auto& msg = *sub_message;
892
196
  for (int i = 0; i != 13; ++i) {
893
182
    msg.mutable_rsi32()->Add(RandomUniformInt<int32_t>());
894
182
  }
895
14
  msg.set_sf32(RandomUniformInt<int32_t>());
896
14
  msg.set_str(RandomHumanReadableString(32));
897
168
  for (int i = 0; i != 11; ++i) {
898
154
    msg.mutable_rbytes()->Add(RandomHumanReadableString(32));
899
154
  }
900
14
  if (RandomUniformBool()) {
901
7
    Generate(msg.mutable_cycle());
902
7
  }
903
14
}
904
905
1
TEST_F(RpcStubTest, Lightweight) {
906
1
  CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_);
907
908
1
  RpcController controller;
909
1
  rpc_test::LightweightRequestPB req;
910
1
  req.set_i32(RandomUniformInt<int32_t>());
911
1
  req.set_i64(RandomUniformInt<int64_t>());
912
1
  req.set_f32(RandomUniformInt<uint32_t>());
913
1
  req.set_f64(RandomUniformInt<uint64_t>());
914
1
  req.set_u32(RandomUniformInt<uint32_t>());
915
1
  req.set_u64(RandomUniformInt<uint64_t>());
916
1
  req.set_r32(RandomUniformReal<float>());
917
1
  req.set_r64(RandomUniformReal<double>());
918
919
1
  req.set_str(RandomHumanReadableString(32));
920
1
  req.set_bytes(RandomHumanReadableString(32));
921
1
  req.set_en(rpc_test::LightweightEnum::TWO);
922
923
1
  req.set_sf32(RandomUniformInt<int32_t>());
924
1
  req.set_sf64(RandomUniformInt<int64_t>());
925
1
  req.set_si32(RandomUniformInt<int32_t>());
926
1
  req.set_si64(RandomUniformInt<int64_t>());
927
928
11
  for (int i = 0; i != 10; ++i) {
929
10
    req.mutable_ru32()->Add(RandomUniformInt<uint32_t>());
930
10
  }
931
932
21
  for (int i = 0; i != 20; ++i) {
933
20
    req.mutable_rf32()->Add(RandomUniformInt<uint32_t>());
934
20
  }
935
936
8
  for (int i = 0; i != 7; ++i) {
937
7
    req.mutable_rstr()->Add(RandomHumanReadableString(32));
938
7
  }
939
940
1
  Generate(req.mutable_message());
941
6
  for (int i = 0; i != 5; ++i) {
942
5
    Generate(req.mutable_repeated_messages()->Add());
943
5
  }
944
945
128
  for (int i = 0; i != 127; ++i) {
946
127
    req.mutable_packed_u64()->Add(RandomUniformInt<uint64_t>());
947
127
  }
948
949
38
  for (int i = 0; i != 37; ++i) {
950
37
    req.mutable_packed_f32()->Add(RandomUniformInt<uint32_t>());
951
37
  }
952
953
14
  for (int i = 0; i != 13; ++i) {
954
13
    auto& pair = *req.mutable_pairs()->Add();
955
13
    pair.set_s1(RandomHumanReadableString(16));
956
13
    pair.set_s2(RandomHumanReadableString(48));
957
13
  }
958
959
12
  for (int i = 0; i != 11; ++i) {
960
11
    (*req.mutable_map())[RandomHumanReadableString(8)] = RandomUniformInt<int64_t>();
961
11
  }
962
963
1
  Generate(req.mutable_ptr_message());
964
965
1
  rpc_test::LightweightResponsePB resp;
966
1
  ASSERT_OK(proxy.Lightweight(req, &resp, &controller));
967
968
1
  ASSERT_EQ(resp.i32(), -req.i32());
969
1
  ASSERT_EQ(resp.i64(), -req.i64());
970
971
1
  ASSERT_EQ(resp.f32(), req.u32());
972
1
  ASSERT_EQ(resp.u32(), req.f32());
973
1
  ASSERT_EQ(resp.f64(), req.u64());
974
1
  ASSERT_EQ(resp.u64(), req.f64());
975
976
1
  ASSERT_EQ(resp.r32(), -req.r32());
977
1
  ASSERT_EQ(resp.r64(), -req.r64());
978
979
1
  ASSERT_EQ(resp.bytes(), req.str());
980
1
  ASSERT_EQ(resp.str(), req.bytes());
981
982
1
  ASSERT_EQ(resp.en(), (req.en() + 1));
983
984
1
  ASSERT_EQ(resp.sf32(), req.si32());
985
1
  ASSERT_EQ(resp.si32(), req.sf32());
986
1
  ASSERT_EQ(resp.sf64(), req.si64());
987
1
  ASSERT_EQ(resp.si64(), req.sf64());
988
989
1
  ASSERT_EQ(AsString(resp.ru32()), AsString(req.rf32()));
990
1
  ASSERT_EQ(AsString(resp.rf32()), AsString(req.ru32()));
991
1
  ASSERT_EQ(AsString(resp.rstr()), ReversedAsString(req.rstr()));
992
993
1
  ASSERT_EQ(resp.message().sf32(), -req.message().sf32());
994
1
  ASSERT_EQ(AsString(resp.message().rsi32()), ReversedAsString(req.message().rsi32()));
995
1
  ASSERT_EQ(resp.message().str(), ">" + req.message().str() + "<");
996
1
  ASSERT_STR_EQ(AsString(resp.message().rbytes()), ReversedAsString(req.message().rbytes()));
997
1
  ASSERT_STR_EQ(AsString(resp.repeated_messages()), ReversedAsString(req.repeated_messages()));
998
1
  ASSERT_STR_EQ(AsString(resp.repeated_messages_copy()), AsString(req.repeated_messages()));
999
1000
1
  ASSERT_STR_EQ(AsString(resp.packed_u64()), ReversedAsString(req.packed_u64()));
1001
1
  ASSERT_STR_EQ(AsString(resp.packed_f32()), ReversedAsString(req.packed_f32()));
1002
1003
1
  ASSERT_EQ(resp.pairs().size(), req.pairs().size());
1004
14
  for (int i = 0; i != req.pairs().size(); ++i) {
1005
13
    ASSERT_EQ(resp.pairs()[i].s1(), req.pairs()[i].s2());
1006
13
    ASSERT_EQ(resp.pairs()[i].s2(), req.pairs()[i].s1());
1007
13
  }
1008
1009
1
  ASSERT_STR_EQ(AsString(resp.ptr_message()), AsString(req.ptr_message()));
1010
1011
11
  for (const auto& entry : resp.map()) {
1012
11
    ASSERT_EQ(entry.second, req.map().at(entry.first));
1013
11
  }
1014
1015
1
  req.mutable_map()->clear();
1016
1
  std::string req_str = req.ShortDebugString();
1017
1018
1
  auto lw_req = CopySharedMessage<rpc_test::LWLightweightRequestPB>(req);
1019
1
  req.Clear();
1020
1
  ASSERT_STR_EQ(AsString(*lw_req), req_str);
1021
1
  ASSERT_STR_EQ(AsString(resp.short_debug_string()), req_str);
1022
1
}
1023
1024
1
TEST_F(RpcStubTest, CustomServiceName) {
1025
1
  SendSimpleCall();
1026
1027
1
  rpc_test::ConcatRequestPB req;
1028
1
  req.set_lhs("yuga");
1029
1
  req.set_rhs("byte");
1030
1
  rpc_test::ConcatResponsePB resp;
1031
1032
1
  RpcController controller;
1033
1
  controller.set_timeout(30s);
1034
1035
1
  rpc_test::AbacusServiceProxy proxy(proxy_cache_.get(), server_hostport_);
1036
1
  ASSERT_OK(proxy.Concat(req, &resp, &controller));
1037
1
  ASSERT_EQ(resp.result(), "yugabyte");
1038
1
}
1039
1040
1
TEST_F(RpcStubTest, Trivial) {
1041
1
  CalculatorServiceProxy proxy(proxy_cache_.get(), server_hostport_);
1042
1043
1
  RpcController controller;
1044
1
  controller.set_timeout(30s);
1045
1046
1
  rpc_test::TrivialRequestPB req;
1047
1
  req.set_value(42);
1048
1
  rpc_test::TrivialResponsePB resp;
1049
1
  ASSERT_OK(proxy.Trivial(req, &resp, &controller));
1050
1
  ASSERT_EQ(resp.value(), req.value());
1051
1052
1
  req.set_value(-1);
1053
1
  controller.Reset();
1054
1
  controller.set_timeout(30s);
1055
1
  ASSERT_OK(proxy.Trivial(req, &resp, &controller));
1056
1
  ASSERT_TRUE(resp.has_error());
1057
1
  ASSERT_EQ(resp.error().code(), Status::Code::kInvalidArgument);
1058
1
}
1059
1060
} // namespace rpc
1061
} // namespace yb