YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/rpc-test-base.h"
34
35
#include <condition_variable>
36
#include <functional>
37
#include <memory>
38
#include <string>
39
#include <thread>
40
#include <unordered_map>
41
42
#include <boost/ptr_container/ptr_vector.hpp>
43
44
#include <gtest/gtest.h>
45
46
#if defined(TCMALLOC_ENABLED)
47
#include <gperftools/heap-profiler.h>
48
#endif
49
50
51
#include "yb/gutil/map-util.h"
52
#include "yb/gutil/strings/human_readable.h"
53
54
#include "yb/rpc/compressed_stream.h"
55
#include "yb/rpc/proxy.h"
56
#include "yb/rpc/rpc_controller.h"
57
#include "yb/rpc/secure_stream.h"
58
#include "yb/rpc/serialization.h"
59
#include "yb/rpc/tcp_stream.h"
60
#include "yb/rpc/yb_rpc.h"
61
62
#include "yb/util/countdown_latch.h"
63
#include "yb/util/env.h"
64
#include "yb/util/format.h"
65
#include "yb/util/logging_test_util.h"
66
#include "yb/util/net/net_util.h"
67
#include "yb/util/result.h"
68
#include "yb/util/status_format.h"
69
#include "yb/util/status_log.h"
70
#include "yb/util/test_macros.h"
71
#include "yb/util/test_util.h"
72
#include "yb/util/tsan_util.h"
73
#include "yb/util/thread.h"
74
75
#include "yb/util/memory/memory_usage_test_util.h"
76
77
METRIC_DECLARE_histogram(handler_latency_yb_rpc_test_CalculatorService_Sleep);
78
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
79
METRIC_DECLARE_counter(tcp_bytes_sent);
80
METRIC_DECLARE_counter(tcp_bytes_received);
81
METRIC_DECLARE_counter(rpcs_timed_out_early_in_queue);
82
83
DEFINE_int32(rpc_test_connection_keepalive_num_iterations, 1,
84
  "Number of iterations in TestRpc.TestConnectionKeepalive");
85
86
DECLARE_bool(TEST_pause_calculator_echo_request);
87
DECLARE_bool(binary_call_parser_reject_on_mem_tracker_hard_limit);
88
DECLARE_bool(enable_rpc_keepalive);
89
DECLARE_int32(num_connections_to_server);
90
DECLARE_int64(rpc_throttle_threshold_bytes);
91
DECLARE_int32(stream_compression_algo);
92
DECLARE_int64(memory_limit_hard_bytes);
93
DECLARE_string(vmodule);
94
DECLARE_uint64(rpc_connection_timeout_ms);
95
DECLARE_uint64(rpc_read_buffer_size);
96
97
using namespace std::chrono_literals;
98
using std::string;
99
using std::shared_ptr;
100
using std::unordered_map;
101
102
namespace yb {
103
104
using rpc_test::CalculatorServiceProxy;
105
106
namespace rpc {
107
108
namespace {
109
110
template <class MessengerFactory, class F>
111
void RunTest(RpcTestBase* test, const TestServerOptions& options,
112
35
             const MessengerFactory& messenger_factory, const F& f) {
113
35
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
35
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
35
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
35
  HostPort server_hostport;
118
35
  test->StartTestServerWithGeneratedCode(
119
35
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
35
      options);
121
122
35
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
35
  f(&p);
124
35
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_7TestRpc12RunPlainTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_
Line
Count
Source
112
1
             const MessengerFactory& messenger_factory, const F& f) {
113
1
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
1
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
1
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
1
  HostPort server_hostport;
118
1
  test->StartTestServerWithGeneratedCode(
119
1
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
1
      options);
121
122
1
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
1
  f(&p);
124
1
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_13TestRpcSecure13RunSecureTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_
Line
Count
Source
112
6
             const MessengerFactory& messenger_factory, const F& f) {
113
6
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
6
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
6
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
6
  HostPort server_hostport;
118
6
  test->StartTestServerWithGeneratedCode(
119
6
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
6
      options);
121
122
6
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
6
  f(&p);
124
6
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_
Line
Count
Source
112
18
             const MessengerFactory& messenger_factory, const F& f) {
113
18
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
18
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
18
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
18
  HostPort server_hostport;
118
18
  test->StartTestServerWithGeneratedCode(
119
18
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
18
      options);
121
122
18
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
18
  f(&p);
124
18
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S6_EEvPNS0_11RpcTestBaseESC_S9_RKT0_
Line
Count
Source
112
3
             const MessengerFactory& messenger_factory, const F& f) {
113
3
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
3
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
3
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
3
  HostPort server_hostport;
118
3
  test->StartTestServerWithGeneratedCode(
119
3
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
3
      options);
121
122
3
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
3
  f(&p);
124
3
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSG_11char_traitsIcEENSG_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S9_EEvPNS0_11RpcTestBaseESF_SC_RKT0_
Line
Count
Source
112
6
             const MessengerFactory& messenger_factory, const F& f) {
113
6
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
6
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
6
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
6
  HostPort server_hostport;
118
6
  test->StartTestServerWithGeneratedCode(
119
6
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
6
      options);
121
122
6
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
6
  f(&p);
124
6
}
rpc-test.cc:_ZN2yb3rpc12_GLOBAL__N_17RunTestIZNS0_24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsEEUlRKNSt3__112basic_stringIcNSD_11char_traitsIcEENSD_9allocatorIcEEEERKNS0_16MessengerOptionsEE_S6_EEvPNS0_11RpcTestBaseESC_S9_RKT0_
Line
Count
Source
112
1
             const MessengerFactory& messenger_factory, const F& f) {
113
1
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
114
1
      messenger_factory("Client", kDefaultClientMessengerOptions));
115
1
  auto proxy_cache = std::make_unique<ProxyCache>(client_messenger.get());
116
117
1
  HostPort server_hostport;
118
1
  test->StartTestServerWithGeneratedCode(
119
1
      messenger_factory("TestServer", options.messenger_options), &server_hostport,
120
1
      options);
121
122
1
  CalculatorServiceProxy p(proxy_cache.get(), server_hostport, client_messenger->DefaultProtocol());
123
1
  f(&p);
124
1
}
125
126
} // namespace
127
128
class TestRpc : public RpcTestBase {
129
 public:
130
4
  void CheckServerMessengerConnections(size_t num_connections) {
131
4
    ReactorMetrics metrics;
132
4
    ASSERT_OK(server_messenger()->TEST_GetReactorMetrics(0, &metrics));
133
8
    ASSERT_EQ(metrics.num_server_connections, num_connections)
134
8
        << "Server should have " << num_connections << " server connection(s)";
135
8
    ASSERT_EQ(metrics.num_client_connections, 0) << "Server should have 0 client connections";
136
4
  }
137
138
4
  void CheckClientMessengerConnections(Messenger* messenger, size_t num_connections) {
139
4
    ReactorMetrics metrics;
140
4
    ASSERT_OK(messenger->TEST_GetReactorMetrics(0, &metrics));
141
8
    ASSERT_EQ(metrics.num_server_connections, 0) << "Client should have 0 server connections";
142
8
    ASSERT_EQ(metrics.num_client_connections, num_connections)
143
8
        << "Client should have " << num_connections << " client connection(s)";
144
4
  }
145
146
  template <class F>
147
1
  void RunPlainTest(const F& f, const TestServerOptions& server_options = TestServerOptions()) {
148
2
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
149
2
      return CreateMessenger(name, options);
150
2
    }, f);
151
1
  }
152
};
153
154
namespace {
155
156
// Used only to test parsing.
157
const uint16_t kDefaultPort = 80;
158
159
8
void CheckParseEndpoint(const std::string& input, std::string expected = std::string()) {
160
8
  if (expected.empty()) {
161
2
    expected = input;
162
2
  }
163
8
  auto endpoint = ParseEndpoint(input, kDefaultPort);
164
16
  ASSERT_TRUE(endpoint.ok()) << "input: " << input << ", status: " << endpoint.status().ToString();
165
8
  ASSERT_EQ(expected, AsString(*endpoint));
166
8
}
167
168
} // namespace
169
170
1
TEST_F(TestRpc, Endpoint) {
171
1
  Endpoint addr1, addr2;
172
1
  addr1.port(1000);
173
1
  addr2.port(2000);
174
1
  ASSERT_TRUE(addr1 < addr2);
175
1
  ASSERT_FALSE(addr2 < addr1);
176
1
  ASSERT_EQ(1000, addr1.port());
177
1
  ASSERT_EQ(2000, addr2.port());
178
1
  ASSERT_EQ(string("0.0.0.0:1000"), AsString(addr1));
179
1
  ASSERT_EQ(string("0.0.0.0:2000"), AsString(addr2));
180
1
  Endpoint addr3(addr1);
181
1
  ASSERT_EQ(string("0.0.0.0:1000"), AsString(addr3));
182
183
1
  CheckParseEndpoint("127.0.0.1", "127.0.0.1:80");
184
1
  CheckParseEndpoint("192.168.0.1:123");
185
1
  CheckParseEndpoint("[10.8.0.137]", "10.8.0.137:80");
186
1
  CheckParseEndpoint("[10.8.0.137]:123", "10.8.0.137:123");
187
188
1
  CheckParseEndpoint("fe80::1", "[fe80::1]:80");
189
1
  CheckParseEndpoint("[fe80::1]", "[fe80::1]:80");
190
1
  CheckParseEndpoint("fe80::1:123", "[fe80::1:123]:80");
191
1
  CheckParseEndpoint("[fe80::1]:123");
192
193
1
  ASSERT_NOK(ParseEndpoint("[127.0.0.1]:", kDefaultPort));
194
1
  ASSERT_NOK(ParseEndpoint("[127.0.0.1:123", kDefaultPort));
195
1
  ASSERT_NOK(ParseEndpoint("fe80::1:12345", kDefaultPort));
196
1
}
197
198
1
TEST_F(TestRpc, TestMessengerCreateDestroy) {
199
1
  std::unique_ptr<Messenger> messenger = CreateMessenger("TestCreateDestroy");
200
1
  LOG(INFO) << "started messenger " << messenger->name();
201
1
  messenger->Shutdown();
202
1
}
203
204
// Test starting and stopping a messenger. This is a regression
205
// test for a segfault seen in early versions of the RPC code,
206
// in which shutting down the acceptor would trigger an assert,
207
// making our tests flaky.
208
1
TEST_F(TestRpc, TestAcceptorPoolStartStop) {
209
1
  int n_iters = AllowSlowTests() ? 100 : 5;
210
6
  for (int i = 0; i < n_iters; i++) {
211
5
    std::unique_ptr<Messenger> messenger = CreateMessenger("TestAcceptorPoolStartStop");
212
5
    Endpoint bound_endpoint;
213
5
    ASSERT_OK(messenger->ListenAddress(
214
5
        CreateConnectionContextFactory<YBInboundConnectionContext>(),
215
5
        Endpoint(), &bound_endpoint));
216
5
    ASSERT_OK(messenger->StartAcceptor());
217
5
    ASSERT_NE(0, bound_endpoint.port());
218
5
    messenger->Shutdown();
219
5
  }
220
1
}
221
222
// Test making successful RPC calls.
223
1
TEST_F(TestRpc, TestCall) {
224
  // Set up server.
225
1
  HostPort server_addr;
226
1
  StartTestServer(&server_addr);
227
228
  // Set up client.
229
1
  LOG(INFO) << "Connecting to " << server_addr;
230
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
231
1
  Proxy p(client_messenger.get(), server_addr);
232
233
11
  for (int i = 0; i < 10; i++) {
234
10
    ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()));
235
10
  }
236
1
}
237
238
1
TEST_F(TestRpc, BigTimeout) {
239
  // Set up server.
240
1
  TestServerOptions options;
241
1
  options.messenger_options.keep_alive_timeout = 60s;
242
1
  HostPort server_addr;
243
1
  StartTestServer(&server_addr, options);
244
245
  // Set up client.
246
1
  LOG(INFO) << "Connecting to " << server_addr;
247
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
248
1
  Proxy p(client_messenger.get(), server_addr);
249
250
11
  for (int i = 0; i < 10; i++) {
251
10
    ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()));
252
10
  }
253
254
1
  LOG(INFO) << "Calls OK";
255
256
1
  auto call_consumption = MemTracker::GetRootTracker()->FindChild("Call")->consumption();
257
1
  ASSERT_EQ(call_consumption, 0);
258
1
}
259
260
// Test that connecting to an invalid server properly throws an error.
261
1
TEST_F(TestRpc, TestCallToBadServer) {
262
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
263
1
  HostPort addr;
264
1
  Proxy p(client_messenger.get(), addr);
265
266
  // Loop a few calls to make sure that we properly set up and tear down
267
  // the connections.
268
6
  for (int i = 0; i < 5; i++) {
269
5
    Status s = DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod());
270
5
    LOG(INFO) << "Status: " << s.ToString();
271
10
    ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
272
5
  }
273
1
}
274
275
// Test that RPC calls can be failed with an error status on the server.
276
1
TEST_F(TestRpc, TestInvalidMethodCall) {
277
  // Set up server.
278
1
  HostPort server_addr;
279
1
  StartTestServer(&server_addr);
280
281
  // Set up client.
282
1
  LOG(INFO) << "Connecting to " << server_addr;
283
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
284
1
  Proxy p(client_messenger.get(), server_addr);
285
286
  // Call the method which fails.
287
1
  static RemoteMethod method(
288
1
      rpc_test::CalculatorServiceIf::static_service_name(), "ThisMethodDoesNotExist");
289
1
  Status s = DoTestSyncCall(&p, &method);
290
2
  ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
291
1
  ASSERT_STR_CONTAINS(s.ToString(), "invalid method name");
292
1
}
293
294
// Test that the error message returned when connecting to the wrong service
295
// is reasonable.
296
1
TEST_F(TestRpc, TestWrongService) {
297
  // Set up server.
298
1
  HostPort server_addr;
299
1
  StartTestServer(&server_addr);
300
301
  // Set up client with the wrong service name.
302
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
303
1
  Proxy p(client_messenger.get(), server_addr);
304
305
  // Call the method which fails.
306
1
  static RemoteMethod method("WrongServiceName", "ThisMethodDoesNotExist");
307
1
  Status s = DoTestSyncCall(&p, &method);
308
1
  auto message = s.ToString();
309
2
  ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << message;
310
  // Remote errors always contain file name and line number.
311
1
  ASSERT_STR_CONTAINS(message, "Remote error (");
312
1
  ASSERT_STR_CONTAINS(message, "): Service unavailable (");
313
1
  ASSERT_STR_CONTAINS(message, "): Service WrongServiceName not registered on TestServer");
314
1
}
315
316
namespace {
317
318
1
uint64_t GetOpenFileLimit() {
319
1
  struct rlimit limit;
320
1
  PCHECK(getrlimit(RLIMIT_NOFILE, &limit) == 0);
321
1
  return limit.rlim_cur;
322
1
}
323
324
} // anonymous namespace
325
326
// Test that we can still make RPC connections even if many fds are in use.
327
// This is a regression test for KUDU-650.
328
1
TEST_F(TestRpc, TestHighFDs) {
329
  // This test can only run if ulimit is set high.
330
1
  const uint64_t kNumFakeFiles = 3500;
331
1
  const uint64_t kMinUlimit = kNumFakeFiles + 100;
332
1
  if (GetOpenFileLimit() < kMinUlimit) {
333
1
    LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit;
334
1
    return;
335
1
  }
336
337
  // Open a bunch of fds just to increase our fd count.
338
0
  std::vector<std::unique_ptr<RandomAccessFile>> fake_files;
339
0
  for (uint64_t i = 0; i < kNumFakeFiles; i++) {
340
0
    std::unique_ptr<RandomAccessFile> f;
341
0
    CHECK_OK(Env::Default()->NewRandomAccessFile("/dev/zero", &f));
342
0
    fake_files.emplace_back(f.release());
343
0
  }
344
345
  // Set up server and client, and verify we can make a successful call.
346
0
  HostPort server_addr;
347
0
  StartTestServer(&server_addr);
348
0
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
349
0
  Proxy p(client_messenger.get(), server_addr);
350
0
  ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()));
351
0
}
352
353
// Test that connections are kept alive by ScanIdleConnections between calls.
354
1
TEST_F(TestRpc, TestConnectionKeepalive) {
355
1
  google::FlagSaver saver;
356
357
  // Only run one reactor per messenger, so we can grab the metrics from that
358
  // one without having to check all.
359
1
  const auto kGcTimeout = 300ms;
360
1
  MessengerOptions messenger_options = { 1, kGcTimeout };
361
1
  TestServerOptions options;
362
1
  options.messenger_options = messenger_options;
363
  // RPC heartbeats shouldn't prevent idle connections from being GCed. To test that we set
364
  // rpc_connection_timeout less than kGcTimeout.
365
1
  FLAGS_rpc_connection_timeout_ms = MonoDelta(kGcTimeout).ToMilliseconds() / 2;
366
1
  FLAGS_enable_rpc_keepalive = true;
367
1
  if (!FLAGS_vmodule.empty()) {
368
0
    FLAGS_vmodule = FLAGS_vmodule + ",yb_rpc=5";
369
1
  } else {
370
1
    FLAGS_vmodule = "yb_rpc=5";
371
1
  }
372
  // Set up server.
373
1
  HostPort server_addr;
374
1
  StartTestServer(&server_addr, options);
375
2
  for (int i = 0; i < FLAGS_rpc_test_connection_keepalive_num_iterations; ++i) {
376
    // Set up client.
377
1
    LOG(INFO) << "Connecting to " << server_addr;
378
1
    auto client_messenger = CreateAutoShutdownMessengerHolder("Client", messenger_options);
379
1
    Proxy p(client_messenger.get(), server_addr);
380
381
1
    ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()));
382
1
    ASSERT_NO_FATALS(CheckServerMessengerConnections(1));
383
1
    ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1));
384
1
    LOG(INFO) << "Connections are up";
385
386
1
    SleepFor(kGcTimeout / 2);
387
388
1
    LOG(INFO) << "Checking connections";
389
1
    ASSERT_NO_FATALS(CheckServerMessengerConnections(1));
390
1
    ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1));
391
392
1
    SleepFor(kGcTimeout * 2);
393
394
    // After sleeping, the keepalive timer should have closed both sides of the connection.
395
1
    ASSERT_NO_FATALS(CheckServerMessengerConnections(0));
396
1
    ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 0));
397
1
  }
398
1
}
399
400
// Test that a call which takes longer than the keepalive time
401
// succeeds -- i.e that we don't consider a connection to be "idle" on the
402
// server if there is a call outstanding on it.
403
1
TEST_F(TestRpc, TestCallLongerThanKeepalive) {
404
1
  TestServerOptions options;
405
  // set very short keepalive
406
1
  options.messenger_options.keep_alive_timeout = 100ms;
407
408
  // Set up server.
409
1
  HostPort server_addr;
410
1
  StartTestServer(&server_addr, options);
411
412
  // Set up client.
413
1
  auto client_options = kDefaultClientMessengerOptions;
414
1
  client_options.keep_alive_timeout = 100ms;
415
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client", client_options);
416
1
  Proxy p(client_messenger.get(), server_addr);
417
418
  // Make a call which sleeps longer than the keepalive.
419
1
  RpcController controller;
420
1
  rpc_test::SleepRequestPB req;
421
1
  req.set_sleep_micros(200 * 1000);
422
1
  req.set_deferred(true);
423
1
  rpc_test::SleepResponsePB resp;
424
1
  ASSERT_OK(p.SyncRequest(
425
1
      CalculatorServiceMethods::SleepMethod(), nullptr, req, &resp, &controller));
426
1
}
427
428
// Test that connections are kept alive by heartbeats between calls.
429
1
TEST_F(TestRpc, TestConnectionHeartbeating) {
430
1
  google::FlagSaver saver;
431
432
1
  const auto kTestTimeout = 300ms;
433
434
  // Only run one reactor per messenger, so we can grab the metrics from that
435
  // one without having to check all. Set ScanIdleConnections keep alive to huge value in order
436
  // to not affect heartbeats testing.
437
1
  MessengerOptions messenger_options = { 1, kTestTimeout * 100 };
438
1
  TestServerOptions options;
439
1
  options.messenger_options = messenger_options;
440
1
  FLAGS_num_connections_to_server = 1;
441
1
  FLAGS_rpc_connection_timeout_ms = MonoDelta(kTestTimeout).ToMilliseconds();
442
443
  // Set up server.
444
1
  HostPort server_addr;
445
1
  StartTestServer(&server_addr, options);
446
447
2
  for (int i = 0; i < FLAGS_rpc_test_connection_keepalive_num_iterations; ++i) {
448
    // Set up client.
449
1
    LOG(INFO) << "Connecting to " << server_addr;
450
1
    auto client_messenger = CreateAutoShutdownMessengerHolder("Client", messenger_options);
451
1
    Proxy p(client_messenger.get(), server_addr);
452
453
1
    ASSERT_OK(DoTestSyncCall(&p, CalculatorServiceMethods::AddMethod()));
454
455
1
    SleepFor(kTestTimeout * 3);
456
    // Both client and server connections should survive when there is no application traffic.
457
1
    ASSERT_NO_FATALS(CheckServerMessengerConnections(1));
458
1
    ASSERT_NO_FATALS(CheckClientMessengerConnections(client_messenger.get(), 1));
459
1
  }
460
1
}
461
462
// Test that the RpcSidecar transfers the expected messages.
463
1
TEST_F(TestRpc, TestRpcSidecar) {
464
  // Set up server.
465
1
  HostPort server_addr;
466
1
  StartTestServer(&server_addr);
467
468
  // Set up client.
469
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
470
1
  Proxy p(client_messenger.get(), server_addr);
471
472
  // Test some small sidecars
473
1
  DoTestSidecar(&p, {123, 456});
474
475
  // Test some larger sidecars to verify that we properly handle the case where
476
  // we can't write the whole response to the socket in a single call.
477
1
  DoTestSidecar(&p, {3_MB, 2_MB, 240_MB});
478
479
1
  std::vector<size_t> sizes(20);
480
1
  std::fill(sizes.begin(), sizes.end(), 123);
481
1
  DoTestSidecar(&p, sizes);
482
1
}
483
484
// Test that timeouts are properly handled.
485
1
TEST_F(TestRpc, TestCallTimeout) {
486
1
  HostPort server_addr;
487
1
  StartTestServer(&server_addr);
488
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
489
1
  Proxy p(client_messenger.get(), server_addr);
490
491
1
  uint64_t delay_ns = 1;
492
493
  // Test a very short timeout - we expect this will time out while the
494
  // call is still trying to connect, or in the send queue. This was triggering ASAN failures
495
  // before.
496
497
28
  while (delay_ns < 100ul * 1000 * 1000) {
498
27
    ASSERT_NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromNanoseconds(delay_ns)));
499
27
    delay_ns *= 2;
500
27
  }
501
1
}
502
503
1
static void AcceptAndReadForever(Socket* listen_sock) {
504
  // Accept the TCP connection.
505
1
  Socket server_sock;
506
1
  Endpoint remote;
507
1
  CHECK_OK(listen_sock->Accept(&server_sock, &remote, 0));
508
509
1
  MonoTime deadline = MonoTime::Now();
510
1
  deadline.AddDelta(MonoDelta::FromSeconds(10));
511
512
1
  uint8_t buf[1024];
513
1
  while (server_sock.BlockingRecv(buf, sizeof(buf), deadline).ok()) {
514
0
  }
515
1
}
516
517
// Starts a fake listening socket which never actually negotiates.
518
// Ensures that the client gets a reasonable status code in this case.
519
1
TEST_F(TestRpc, TestNegotiationTimeout) {
520
  // Set up a simple socket server which accepts a connection.
521
1
  HostPort server_addr;
522
1
  Socket listen_sock;
523
1
  ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
524
525
  // Create another thread to accept the connection on the fake server.
526
1
  scoped_refptr<Thread> acceptor_thread;
527
1
  ASSERT_OK(Thread::Create("test", "acceptor",
528
1
                                  AcceptAndReadForever, &listen_sock,
529
1
                                  &acceptor_thread));
530
531
  // Set up client.
532
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
533
1
  Proxy p(client_messenger.get(), server_addr);
534
535
1
  ASSERT_NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(100)));
536
537
1
  acceptor_thread->Join();
538
1
}
539
540
// Test that client calls get failed properly when the server they're connected to
541
// shuts down.
542
1
TEST_F(TestRpc, TestServerShutsDown) {
543
  // Set up a simple socket server which accepts a connection.
544
1
  HostPort server_addr;
545
1
  Socket listen_sock;
546
1
  ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
547
548
  // Set up client.
549
1
  LOG(INFO) << "Connecting to " << server_addr;
550
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
551
1
  Proxy p(client_messenger.get(), server_addr);
552
553
  // Send a call.
554
1
  rpc_test::AddRequestPB req;
555
1
  unsigned int seed = SeedRandom();
556
1
  req.set_x(rand_r(&seed));
557
1
  req.set_y(rand_r(&seed));
558
1
  rpc_test::AddResponsePB resp;
559
560
1
  boost::ptr_vector<RpcController> controllers;
561
562
  // We'll send several calls async, and ensure that they all
563
  // get the error status when the connection drops.
564
1
  int n_calls = 5;
565
566
1
  CountDownLatch latch(n_calls);
567
6
  for (int i = 0; i < n_calls; i++) {
568
5
    auto controller = new RpcController();
569
5
    controllers.push_back(controller);
570
5
    p.AsyncRequest(
571
5
        CalculatorServiceMethods::AddMethod(), /* method_metrics= */ nullptr, req, &resp,
572
5
        controller, latch.CountDownCallback());
573
5
  }
574
575
  // Accept the TCP connection.
576
1
  Socket server_sock;
577
1
  Endpoint remote;
578
1
  ASSERT_OK(listen_sock.Accept(&server_sock, &remote, 0));
579
580
  // The call is still in progress at this point.
581
1
  for (const RpcController &controller : controllers) {
582
1
    ASSERT_FALSE(controller.finished());
583
1
  }
584
585
  // Shut down the socket.
586
0
  ASSERT_OK(listen_sock.Close());
587
0
  ASSERT_OK(server_sock.Close());
588
589
  // Wait for the call to be marked finished.
590
0
  latch.Wait();
591
592
  // Should get the appropriate error on the client for all calls;
593
0
  for (const RpcController &controller : controllers) {
594
0
    ASSERT_TRUE(controller.finished());
595
0
    Status s = controller.status();
596
0
    ASSERT_TRUE(s.IsNetworkError()) <<
597
0
      "Unexpected status: " << s.ToString();
598
599
    // Any of these errors could happen, depending on whether we were
600
    // in the middle of sending a call while the connection died, or
601
    // if we were already waiting for responses.
602
    //
603
    // ECONNREFUSED is possible because the sending of the calls is async.
604
    // For example, the following interleaving:
605
    // - Enqueue 3 calls
606
    // - Reactor wakes up, creates connection, starts writing calls
607
    // - Enqueue 2 more calls
608
    // - Shut down socket
609
    // - Reactor wakes up, tries to write more of the first 3 calls, gets error
610
    // - Reactor shuts down connection
611
    // - Reactor sees the 2 remaining calls, makes a new connection
612
    // - Because the socket is shut down, gets ECONNREFUSED.
613
    //
614
    // EINVAL is possible if the controller socket had already disconnected by
615
    // the time it tries to set the SO_SNDTIMEO socket option as part of the
616
    // normal blocking SASL handshake.
617
    //
618
    // EPROTOTYPE sometimes happens on Mac OS X.
619
    // TODO: figure out why.
620
0
    Errno err(s);
621
0
    ASSERT_TRUE(err == EPIPE ||
622
0
                err == ECONNRESET ||
623
0
                err == ESHUTDOWN ||
624
0
                err == ECONNREFUSED ||
625
0
                err == EINVAL
626
0
#if defined(__APPLE__)
627
0
                || err == EPROTOTYPE
628
0
#endif
629
0
               )
630
0
      << "Unexpected status: " << s.ToString();
631
0
  }
632
0
}
633
634
Result<MetricPtr> GetMetric(
635
75
    const MetricEntityPtr& metric_entity, const MetricPrototype& prototype) {
636
75
  const auto& metric_map = metric_entity->UnsafeMetricsMapForTests();
637
638
75
  auto it = metric_map.find(&prototype);
639
75
  if (it == metric_map.end()) {
640
0
    return STATUS_FORMAT(NotFound, "Metric $0 not found", prototype.name());
641
0
  }
642
643
75
  return it->second;
644
75
}
645
646
Result<HistogramPtr> GetHistogram(
647
2
    const MetricEntityPtr& metric_entity, const HistogramPrototype& prototype) {
648
2
  return down_cast<Histogram*>(VERIFY_RESULT(GetMetric(metric_entity, prototype)).get());
649
2
}
650
651
Result<CounterPtr> GetCounter(
652
73
    const MetricEntityPtr& metric_entity, const CounterPrototype& prototype) {
653
73
  return down_cast<Counter*>(VERIFY_RESULT(GetMetric(metric_entity, prototype)).get());
654
73
}
655
656
// Test handler latency metric.
657
1
TEST_F(TestRpc, TestRpcHandlerLatencyMetric) {
658
659
1
  const uint64_t sleep_micros = 20 * 1000;
660
661
  // Set up server.
662
1
  HostPort server_addr;
663
1
  StartTestServerWithGeneratedCode(&server_addr);
664
665
  // Set up client.
666
667
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
668
1
  Proxy p(client_messenger.get(), server_addr);
669
670
1
  RpcController controller;
671
1
  rpc_test::SleepRequestPB req;
672
1
  req.set_sleep_micros(sleep_micros);
673
1
  req.set_deferred(true);
674
1
  rpc_test::SleepResponsePB resp;
675
1
  ASSERT_OK(p.SyncRequest(
676
1
      CalculatorServiceMethods::SleepMethod(), /* method_metrics= */ nullptr, req, &resp,
677
1
      &controller));
678
679
1
  auto latency_histogram = ASSERT_RESULT(GetHistogram(
680
1
      metric_entity(), METRIC_handler_latency_yb_rpc_test_CalculatorService_Sleep));
681
682
1
  LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests();
683
1
  LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests();
684
1
  LOG(INFO) << "Sleep() max lat: " << latency_histogram->MaxValueForTests();
685
1
  LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount();
686
687
1
  ASSERT_EQ(1, latency_histogram->TotalCount());
688
1
  ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros);
689
1
  ASSERT_TRUE(latency_histogram->MinValueForTests() == latency_histogram->MaxValueForTests());
690
691
  // TODO: Implement an incoming queue latency test.
692
  // For now we just assert that the metric exists.
693
1
  ASSERT_OK(GetHistogram(metric_entity(), METRIC_rpc_incoming_queue_time));
694
1
}
695
696
1
TEST_F(TestRpc, TestRpcCallbackDestroysMessenger) {
697
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
698
1
  HostPort bad_addr;
699
1
  CountDownLatch latch(1);
700
701
1
  rpc_test::AddRequestPB req;
702
1
  unsigned int seed = SeedRandom();
703
1
  req.set_x(rand_r(&seed));
704
1
  req.set_y(rand_r(&seed));
705
1
  rpc_test::AddResponsePB resp;
706
1
  RpcController controller;
707
1
  controller.set_timeout(MonoDelta::FromMilliseconds(1));
708
1
  {
709
1
    Proxy p(client_messenger.get(), bad_addr);
710
1
    static RemoteMethod method(
711
1
        rpc_test::CalculatorServiceIf::static_service_name(), "my-fake-method");
712
1
    p.AsyncRequest(&method, /* method_metrics= */ nullptr, req, &resp, &controller,
713
1
                   latch.CountDownCallback());
714
1
  }
715
1
  latch.Wait();
716
1
}
717
718
// Test that setting the client timeout / deadline gets propagated to RPC
719
// services.
720
1
TEST_F(TestRpc, TestRpcContextClientDeadline) {
721
1
  const uint64_t sleep_micros = 20 * 1000;
722
723
  // Set up server.
724
1
  HostPort server_addr;
725
1
  StartTestServerWithGeneratedCode(&server_addr);
726
727
  // Set up client.
728
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
729
1
  Proxy p(client_messenger.get(), server_addr);
730
731
1
  rpc_test::SleepRequestPB req;
732
1
  req.set_sleep_micros(sleep_micros);
733
1
  req.set_client_timeout_defined(true);
734
1
  rpc_test::SleepResponsePB resp;
735
1
  RpcController controller;
736
1
  const auto* method = CalculatorServiceMethods::SleepMethod();
737
1
  Status s = p.SyncRequest(method, /* method_metrics= */ nullptr, req, &resp, &controller);
738
1
  ASSERT_TRUE(s.IsRemoteError());
739
1
  ASSERT_STR_CONTAINS(s.ToString(), "Missing required timeout");
740
741
1
  controller.Reset();
742
1
  controller.set_timeout(MonoDelta::FromMilliseconds(1000));
743
1
  ASSERT_OK(p.SyncRequest(method, /* method_metrics= */ nullptr, req, &resp, &controller));
744
1
}
745
746
// Send multiple long running calls to a single worker thread. All of them except the first one,
747
// should time out early w/o starting processing them.
748
1
TEST_F(TestRpc, QueueTimeout) {
749
1
  const MonoDelta kSleep = 1s;
750
1
  constexpr auto kCalls = 10;
751
752
  // Set up server.
753
1
  TestServerOptions options;
754
1
  options.n_worker_threads = 1;
755
1
  HostPort server_addr;
756
1
  StartTestServerWithGeneratedCode(&server_addr, options);
757
758
  // Set up client.
759
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
760
1
  Proxy p(client_messenger.get(), server_addr);
761
762
1
  const auto* method = CalculatorServiceMethods::SleepMethod();
763
764
1
  CountDownLatch latch(kCalls);
765
766
1
  struct Call {
767
1
    rpc_test::SleepRequestPB req;
768
1
    rpc_test::SleepResponsePB resp;
769
1
    RpcController controller;
770
1
  };
771
1
  std::vector<Call> calls(kCalls);
772
773
11
  for (int i = 0; i != kCalls; ++i) {
774
10
    auto& call = calls[i];
775
10
    auto& req = call.req;
776
10
    req.set_sleep_micros(narrow_cast<uint32_t>(kSleep.ToMicroseconds()));
777
10
    req.set_client_timeout_defined(true);
778
10
    call.controller.set_timeout(kSleep / 2);
779
10
    p.AsyncRequest(method, /* method_metrics= */ nullptr, req, &call.resp, &call.controller,
780
10
        [&latch, &call] {
781
10
      latch.CountDown();
782
20
      ASSERT_TRUE(call.controller.status().IsTimedOut()) << call.controller.status();
783
10
    });
784
10
  }
785
786
1
  latch.Wait();
787
788
  // Give some time for algorithm to work.
789
1
  std::this_thread::sleep_for((kSleep / 2).ToSteadyDuration());
790
791
1
  auto counter = ASSERT_RESULT(GetCounter(metric_entity(), METRIC_rpcs_timed_out_early_in_queue));
792
793
  // First call should succeed, other should timeout.
794
1
  ASSERT_EQ(counter->value(), kCalls - 1);
795
1
}
796
797
struct DisconnectShare {
798
  Proxy proxy;
799
  size_t left;
800
  std::mutex mutex;
801
  std::condition_variable cond;
802
  std::unordered_map<std::string, size_t> counts;
803
};
804
805
class DisconnectTask {
806
 public:
807
10.0k
  explicit DisconnectTask(DisconnectShare* share) : share_(share) {
808
10.0k
  }
809
810
10.0k
  void Launch() {
811
10.0k
    controller_.set_timeout(MonoDelta::FromSeconds(1));
812
10.0k
    share_->proxy.AsyncRequest(CalculatorServiceMethods::DisconnectMethod(),
813
10.0k
                               /* method_metrics= */ nullptr,
814
10.0k
                               rpc_test::DisconnectRequestPB(),
815
10.0k
                               &response_,
816
10.0k
                               &controller_,
817
9.99k
                               [this]() { this->Done(); });
818
10.0k
  }
819
 private:
820
9.99k
  void Done() {
821
9.99k
    bool notify;
822
9.99k
    {
823
9.99k
      std::lock_guard<std::mutex> lock(share_->mutex);
824
9.99k
      ++share_->counts[controller_.status().ToString()];
825
9.99k
      notify = 0 == --share_->left;
826
9.99k
    }
827
9.99k
    if (notify)
828
1
      share_->cond.notify_one();
829
9.99k
  }
830
831
  DisconnectShare* share_;
832
  rpc_test::DisconnectResponsePB response_;
833
  RpcController controller_;
834
};
835
836
1
TEST_F(TestRpc, TestDisconnect) {
837
  // Set up server.
838
1
  HostPort server_addr;
839
1
  StartTestServerWithGeneratedCode(&server_addr);
840
841
  // Set up client.
842
1
  auto client_messenger = CreateAutoShutdownMessengerHolder("Client");
843
844
1
  constexpr size_t kRequests = 10000;
845
1
  DisconnectShare share = { { client_messenger.get(), server_addr }, kRequests };
846
847
1
  std::vector<DisconnectTask> tasks;
848
10.0k
  for (size_t i = 0; i != kRequests; ++i) {
849
10.0k
    tasks.emplace_back(&share);
850
10.0k
  }
851
10.0k
  for (size_t i = 0; i != kRequests; ++i) {
852
10.0k
    tasks[i].Launch();
853
10.0k
  }
854
1
  {
855
1
    std::unique_lock<std::mutex> lock(share.mutex);
856
2
    share.cond.wait(lock, [&share]() { return !share.left; });
857
1
  }
858
859
1
  size_t total = 0;
860
3
  for (const auto& pair : share.counts) {
861
3
    ASSERT_NE(pair.first, "OK");
862
3
    total += pair.second;
863
3
    LOG(INFO) << pair.first << ": " << pair.second;
864
3
  }
865
1
  ASSERT_EQ(kRequests, total);
866
1
}
867
868
// Check that we could perform DumpRunningRpcs while timed out calls are in queue.
869
//
870
// Start listenting socket, that will accept one connection and does not read it.
871
// Send big RPC request, that does not fit into socket buffer, so it will be sending forever.
872
// Wait until this call is timed out.
873
// Check that we could invoke DumpRunningRpcs after it.
874
1
TEST_F(TestRpc, DumpTimedOutCall) {
875
  // Set up a simple socket server which accepts a connection.
876
1
  HostPort server_addr;
877
1
  Socket listen_sock;
878
1
  ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
879
880
1
  std::atomic<bool> stop(false);
881
882
1
  std::thread thread([&listen_sock, &stop] {
883
1
    Socket socket;
884
1
    Endpoint remote;
885
1
    ASSERT_OK(listen_sock.Accept(&socket, &remote, 0));
886
2
    while (!stop.load(std::memory_order_acquire)) {
887
1
      std::this_thread::sleep_for(100ms);
888
1
    }
889
1
  });
890
891
1
  auto messenger = CreateAutoShutdownMessengerHolder("Client");
892
1
  Proxy p(messenger.get(), server_addr);
893
894
1
  {
895
1
    rpc_test::EchoRequestPB req;
896
1
    req.set_data(std::string(1_MB, 'X'));
897
1
    rpc_test::EchoResponsePB resp;
898
1
    std::aligned_storage<sizeof(RpcController), alignof(RpcController)>::type storage;
899
1
    auto controller = new (&storage) RpcController;
900
1
    controller->set_timeout(100ms);
901
1
    auto status = p.SyncRequest(
902
1
        CalculatorServiceMethods::EchoMethod(), /* method_metrics= */ nullptr, req, &resp,
903
1
        controller);
904
2
    ASSERT_TRUE(status.IsTimedOut()) << status;
905
1
    controller->~RpcController();
906
1
    memset(&storage, 0xff, sizeof(storage));
907
1
  }
908
909
1
  DumpRunningRpcsRequestPB dump_req;
910
1
  DumpRunningRpcsResponsePB dump_resp;
911
1
  ASSERT_OK(messenger->DumpRunningRpcs(dump_req, &dump_resp));
912
913
1
  stop.store(true, std::memory_order_release);
914
1
  thread.join();
915
1
}
916
917
#if defined(TCMALLOC_ENABLED)
918
919
namespace {
920
921
const char kEmptyMsgLengthPrefix[kMsgLengthPrefixLength] = {0};
922
923
}
924
925
// Test that even with small packets we track memory usage in sending queue with acceptable
926
// accuracy.
927
TEST_F(TestRpc, SendingQueueMemoryUsage) {
928
  std::deque<TcpStreamSendingData> sending;
929
930
  auto tracker = MemTracker::CreateTracker("t");
931
932
  MemoryUsage current, latest_before_realloc;
933
934
  StartAllocationsTracking();
935
  const auto heap_allocated_bytes_initial = MemTracker::GetTCMallocCurrentAllocatedBytes();
936
  while (current.heap_allocated_bytes < 1_MB) {
937
    auto data_ptr = std::make_shared<StringOutboundData>(
938
        kEmptyMsgLengthPrefix, kMsgLengthPrefixLength, "Empty message");
939
    sending.emplace_back(data_ptr, tracker);
940
941
    const size_t heap_allocated_bytes =
942
        MemTracker::GetTCMallocCurrentAllocatedBytes() - heap_allocated_bytes_initial;
943
    if (heap_allocated_bytes != current.heap_allocated_bytes) {
944
      latest_before_realloc = current;
945
    }
946
    current.heap_allocated_bytes = heap_allocated_bytes;
947
    current.heap_requested_bytes = GetHeapRequestedBytes();
948
    current.tracked_consumption += sending.back().consumption.consumption();
949
    // Account data_ptr as well.
950
    current.tracked_consumption += sizeof(data_ptr);
951
    current.entities_count = sending.size();
952
  }
953
  StopAllocationsTracking();
954
955
  LOG(INFO) << DumpMemoryUsage(latest_before_realloc);
956
957
  LOG(INFO) << "Tracked consumption: " << latest_before_realloc.tracked_consumption;
958
  LOG(INFO) << "Requested bytes: " << latest_before_realloc.heap_requested_bytes;
959
  LOG(INFO) << "Allocated bytes: " << latest_before_realloc.heap_allocated_bytes;
960
961
  ASSERT_LE(latest_before_realloc.tracked_consumption, latest_before_realloc.heap_requested_bytes);
962
  // We should track at least kDynamicMemoryUsageAccuracyLowLimit memory requested from heap.
963
  ASSERT_GT(
964
      latest_before_realloc.tracked_consumption,
965
      size_t(latest_before_realloc.heap_requested_bytes * kDynamicMemoryUsageAccuracyLowLimit));
966
967
  ASSERT_LE(latest_before_realloc.heap_requested_bytes, latest_before_realloc.heap_allocated_bytes);
968
  // Expect TCMalloc to allocate more memory than requested due to roundup, but limited by
969
  // kMemoryAllocationAccuracyHighLimit.
970
  ASSERT_LE(
971
      latest_before_realloc.heap_allocated_bytes,
972
      latest_before_realloc.heap_requested_bytes * kMemoryAllocationAccuracyHighLimit);
973
}
974
975
#endif
976
977
namespace {
978
979
constexpr auto kMemoryLimitHardBytes = 100_MB;
980
981
6
TestServerOptions SetupServerForTestCantAllocateReadBuffer() {
982
6
  FLAGS_binary_call_parser_reject_on_mem_tracker_hard_limit = true;
983
6
  FLAGS_memory_limit_hard_bytes = kMemoryLimitHardBytes;
984
6
  FLAGS_rpc_throttle_threshold_bytes = -1;
985
6
  TestServerOptions options;
986
6
  options.messenger_options.n_reactors = 1;
987
6
  options.messenger_options.num_connections_to_server = 1;
988
6
  options.messenger_options.keep_alive_timeout = 60s;
989
6
  return options;
990
6
}
991
992
6
void TestCantAllocateReadBuffer(CalculatorServiceProxy* proxy) {
993
6
  const MonoDelta kTimeToWaitForOom = 20s;
994
  // Reactor threads are blocked by pauses injected into calls processing by the test and also we
995
  // can have other random slow downs in this tests due to large requests processing in reactor
996
  // thread, so we turn off application level RPC keepalive mechanism to prevent connections from
997
  // being closed.
998
6
  FLAGS_enable_rpc_keepalive = false;
999
1000
6
  rpc_test::EchoRequestPB req;
1001
6
  rpc_test::EchoResponsePB resp;
1002
1003
6
  std::vector<std::unique_ptr<RpcController>> controllers;
1004
1005
6
  auto n_calls = 50;
1006
1007
6
  SetAtomicFlag(true, &FLAGS_TEST_pause_calculator_echo_request);
1008
6
  StringWaiterLogSink log_waiter("Unable to allocate read buffer because of limit");
1009
1010
6
  LOG(INFO) << "Start sending calls...";
1011
6
  CountDownLatch latch(n_calls);
1012
306
  for (int i = 0; i < n_calls; i++) {
1013
300
    req.set_data(std::string(10_MB + i, 'X'));
1014
300
    auto controller = std::make_unique<RpcController>();
1015
    // No need to wait more than kTimeToWaitForOom + some delay, because we only need these
1016
    // calls to cause hitting hard memory limit.
1017
300
    controller->set_timeout(kTimeToWaitForOom + 5s);
1018
300
    proxy->EchoAsync(req, &resp, controller.get(), latch.CountDownCallback());
1019
300
    if ((i + 1) % 10 == 0) {
1020
30
      LOG(INFO) << "Sent " << i + 1 << " calls.";
1021
30
      LOG(INFO) << DumpMemoryUsage();
1022
30
    }
1023
300
    controllers.push_back(std::move(controller));
1024
300
  }
1025
6
  LOG(INFO) << n_calls << " calls sent.";
1026
1027
6
  auto wait_status = log_waiter.WaitFor(kTimeToWaitForOom);
1028
1029
6
  SetAtomicFlag(false, &FLAGS_TEST_pause_calculator_echo_request);
1030
6
  LOG(INFO) << "Resumed call function.";
1031
1032
6
  LOG(INFO) << "Waiting for the calls to be marked finished...";
1033
6
  latch.Wait();
1034
1035
6
  LOG(INFO) << n_calls << " calls marked as finished.";
1036
1037
306
  for (size_t i = 0; i < controllers.size(); ++i) {
1038
300
    auto& controller = controllers[i];
1039
300
    ASSERT_TRUE(controller->finished());
1040
300
    auto s = controller->status();
1041
600
    ASSERT_TRUE(s.ok() || s.IsTimedOut())
1042
600
        << "Unexpected error for call #" << i + 1 << ": " << s;
1043
300
  }
1044
6
  controllers.clear();
1045
6
  req.clear_data();
1046
1047
6
  ASSERT_OK(wait_status);
1048
1049
6
  LOG(INFO) << DumpMemoryUsage();
1050
6
  {
1051
6
    constexpr auto target_memory_consumption = kMemoryLimitHardBytes * 0.6;
1052
6
    wait_status = LoggedWaitFor(
1053
6
        [] {
1054
#if defined(TCMALLOC_ENABLED)
1055
          // Don't rely on root mem tracker consumption, since it includes memory released by
1056
          // the application, but not yet released by TCMalloc.
1057
          const auto consumption = MemTracker::GetTCMallocCurrentAllocatedBytes();
1058
#else
1059
          // For TSAN/ASAN we don't have TCMalloc and rely on root mem tracker consumption.
1060
6
          const auto consumption = MemTracker::GetRootTracker()->consumption();
1061
6
#endif
1062
6
          LOG(INFO) << "Memory consumption: " << HumanReadableNumBytes::ToString(consumption);
1063
6
          return consumption < target_memory_consumption;
1064
6
        }, 10s * kTimeMultiplier,
1065
6
        Format("Waiting until memory consumption is less than $0 ...",
1066
6
               HumanReadableNumBytes::ToString(target_memory_consumption)));
1067
6
    LOG(INFO) << DumpMemoryUsage();
1068
6
    ASSERT_OK(wait_status);
1069
6
  }
1070
1071
  // Further calls should be processed successfully since memory consumption is now under limit.
1072
6
  n_calls = 20;
1073
6
  const MonoDelta kCallsTimeout = 60s * kTimeMultiplier;
1074
6
  LOG(INFO) << "Start sending more calls...";
1075
6
  latch.Reset(n_calls);
1076
126
  for (int i = 0; i < n_calls; i++) {
1077
120
    req.set_data(std::string(i + 1, 'Y'));
1078
120
    auto controller = std::make_unique<RpcController>();
1079
120
    controller->set_timeout(kCallsTimeout);
1080
120
    proxy->EchoAsync(req, &resp, controller.get(), latch.CountDownCallback());
1081
120
    controllers.push_back(std::move(controller));
1082
120
  }
1083
6
  LOG(INFO) << n_calls << " calls sent.";
1084
6
  latch.Wait();
1085
6
  LOG(INFO) << n_calls << " calls marked as finished.";
1086
1087
126
  for (size_t i = 0; i < controllers.size(); ++i) {
1088
120
    auto& controller = controllers[i];
1089
120
    ASSERT_TRUE(controller->finished());
1090
120
    auto s = controller->status();
1091
240
    ASSERT_TRUE(s.ok()) << "Unexpected error for call #" << i + 1 << ": " << AsString(s);
1092
120
  }
1093
6
}
1094
1095
}  // namespace
1096
1097
1
TEST_F(TestRpc, CantAllocateReadBuffer) {
1098
1
  RunPlainTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer());
1099
1
}
1100
1101
class TestRpcSecure : public RpcTestBase {
1102
 public:
1103
13
  void SetUp() override {
1104
13
    RpcTestBase::SetUp();
1105
13
    secure_context_ = std::make_unique<SecureContext>();
1106
13
    EXPECT_OK(secure_context_->TEST_GenerateKeys(1024, "127.0.0.1"));
1107
13
  }
1108
1109
 protected:
1110
26
  auto CreateSecureStreamFactory() {
1111
26
    return SecureStreamFactory(
1112
26
        TcpStream::Factory(), MemTracker::GetRootTracker(), secure_context_.get());
1113
26
  }
1114
1115
  std::unique_ptr<Messenger> CreateSecureMessenger(
1116
12
      const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) {
1117
12
    auto builder = CreateMessengerBuilder(name, options);
1118
12
    builder.SetListenProtocol(SecureStreamProtocol());
1119
12
    builder.AddStreamFactory(SecureStreamProtocol(), CreateSecureStreamFactory());
1120
12
    return EXPECT_RESULT(builder.Build());
1121
12
  }
1122
1123
  std::unique_ptr<SecureContext> secure_context_;
1124
1125
  template <class F>
1126
6
  void RunSecureTest(const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1127
12
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1128
12
      return CreateSecureMessenger(name, options);
1129
12
    }, f);
1130
6
  }
1131
};
1132
1133
5
void TestSimple(CalculatorServiceProxy* proxy) {
1134
5
  RpcController controller;
1135
5
  controller.set_timeout(5s * kTimeMultiplier);
1136
5
  rpc_test::AddRequestPB req;
1137
5
  req.set_x(10);
1138
5
  req.set_y(20);
1139
5
  rpc_test::AddResponsePB resp;
1140
5
  ASSERT_OK(proxy->Add(req, &resp, &controller));
1141
5
  ASSERT_EQ(30, resp.result());
1142
5
}
1143
1144
1
TEST_F(TestRpcSecure, TLS) {
1145
1
  RunSecureTest(&TestSimple);
1146
1
}
1147
1148
10
void TestBigOp(CalculatorServiceProxy* proxy) {
1149
10
  RpcController controller;
1150
10
  controller.set_timeout(5s * kTimeMultiplier);
1151
10
  rpc_test::EchoRequestPB req;
1152
10
  req.set_data(RandomHumanReadableString(4_MB));
1153
10
  rpc_test::EchoResponsePB resp;
1154
10
  ASSERT_OK(proxy->Echo(req, &resp, &controller));
1155
10
  ASSERT_EQ(req.data(), resp.data());
1156
10
}
1157
1158
1
TEST_F(TestRpcSecure, BigOp) {
1159
1
  RunSecureTest(&TestBigOp);
1160
1
}
1161
1162
1
TEST_F(TestRpcSecure, BigOpWithSmallBuffer) {
1163
1
  FLAGS_rpc_read_buffer_size = 128;
1164
1
  RunSecureTest(&TestBigOp);
1165
1
}
1166
1167
5
void TestManyOps(CalculatorServiceProxy* proxy) {
1168
5.00k
  for (int i = 0; i != RegularBuildVsSanitizers(1000, 100); ++i) {
1169
5.00k
    RpcController controller;
1170
5.00k
    controller.set_timeout(5s * kTimeMultiplier);
1171
5.00k
    rpc_test::EchoRequestPB req;
1172
5.00k
    req.set_data(RandomHumanReadableString(4_KB));
1173
5.00k
    rpc_test::EchoResponsePB resp;
1174
5.00k
    ASSERT_OK(proxy->Echo(req, &resp, &controller));
1175
5.00k
    ASSERT_EQ(req.data(), resp.data());
1176
5.00k
  }
1177
5
}
1178
1179
1
TEST_F(TestRpcSecure, ManyOps) {
1180
1
  RunSecureTest(&TestManyOps);
1181
1
}
1182
1183
5
void TestConcurrentOps(CalculatorServiceProxy* proxy) {
1184
5
  struct Op {
1185
5
    RpcController controller;
1186
5
    rpc_test::EchoRequestPB req;
1187
5
    rpc_test::EchoResponsePB resp;
1188
5
  };
1189
5
  std::vector<Op> ops(RegularBuildVsSanitizers(1000, 100));
1190
5
  CountDownLatch latch(ops.size());
1191
5.00k
  for (auto& op : ops) {
1192
5.00k
    op.controller.set_timeout(5s * kTimeMultiplier);
1193
5.00k
    op.req.set_data(RandomHumanReadableString(4_KB));
1194
5.00k
    proxy->EchoAsync(op.req, &op.resp, &op.controller, [&latch]() {
1195
5.00k
      latch.CountDown();
1196
5.00k
    });
1197
5.00k
  }
1198
5
  latch.Wait();
1199
5.00k
  for (const auto& op : ops) {
1200
5.00k
    ASSERT_OK(op.controller.status());
1201
5.00k
    ASSERT_EQ(op.req.data(), op.resp.data());
1202
5.00k
  }
1203
5
}
1204
1205
1
TEST_F(TestRpcSecure, ConcurrentOps) {
1206
1
  RunSecureTest(&TestConcurrentOps);
1207
1
}
1208
1209
1
TEST_F(TestRpcSecure, CantAllocateReadBuffer) {
1210
1
  RunSecureTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer());
1211
1
}
1212
1213
class TestRpcCompression : public RpcTestBase, public testing::WithParamInterface<int> {
1214
 public:
1215
21
  void SetUp() override {
1216
21
    FLAGS_stream_compression_algo = GetParam();
1217
21
    RpcTestBase::SetUp();
1218
21
  }
1219
1220
 protected:
1221
  std::unique_ptr<Messenger> CreateCompressedMessenger(
1222
42
      const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) {
1223
42
    auto builder = CreateMessengerBuilder(name, options);
1224
42
    builder.SetListenProtocol(CompressedStreamProtocol());
1225
42
    builder.AddStreamFactory(
1226
42
        CompressedStreamProtocol(),
1227
42
        CompressedStreamFactory(TcpStream::Factory(), MemTracker::GetRootTracker()));
1228
42
    return EXPECT_RESULT(builder.Build());
1229
42
  }
1230
1231
  template <class F>
1232
  void RunCompressionTest(
1233
21
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1234
42
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1235
42
      return CreateCompressedMessenger(name, options);
1236
42
    }, f);
_ZZN2yb3rpc18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSE_11char_traitsIcEENSE_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESM_SP_
Line
Count
Source
1234
36
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1235
36
      return CreateCompressedMessenger(name, options);
1236
36
    }, f);
rpc-test.cc:_ZZN2yb3rpc18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSB_11char_traitsIcEENSB_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESJ_SM_
Line
Count
Source
1234
6
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1235
6
      return CreateCompressedMessenger(name, options);
1236
6
    }, f);
1237
21
  }
_ZN2yb3rpc18TestRpcCompression18RunCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsE
Line
Count
Source
1233
18
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1234
18
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1235
18
      return CreateCompressedMessenger(name, options);
1236
18
    }, f);
1237
18
  }
rpc-test.cc:_ZN2yb3rpc18TestRpcCompression18RunCompressionTestIZNS0_35TestRpcCompression_Compression_Test8TestBodyEvE3$_4EEvRKT_RKNS0_17TestServerOptionsE
Line
Count
Source
1233
3
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1234
3
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1235
3
      return CreateCompressedMessenger(name, options);
1236
3
    }, f);
1237
3
  }
1238
};
1239
1240
3
TEST_P(TestRpcCompression, Simple) {
1241
3
  RunCompressionTest(&TestSimple);
1242
3
}
1243
1244
3
TEST_P(TestRpcCompression, BigOp) {
1245
3
  RunCompressionTest(&TestBigOp);
1246
3
}
1247
1248
3
TEST_P(TestRpcCompression, BigOpWithSmallBuffer) {
1249
3
  FLAGS_rpc_read_buffer_size = 128;
1250
3
  RunCompressionTest(&TestBigOp);
1251
3
}
1252
1253
3
TEST_P(TestRpcCompression, ManyOps) {
1254
3
  RunCompressionTest(&TestManyOps);
1255
3
}
1256
1257
3
TEST_P(TestRpcCompression, ConcurrentOps) {
1258
3
  RunCompressionTest(&TestConcurrentOps);
1259
3
}
1260
1261
3
TEST_P(TestRpcCompression, CantAllocateReadBuffer) {
1262
3
  RunCompressionTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer());
1263
3
}
1264
1265
4
void TestCompression(CalculatorServiceProxy* proxy, const MetricEntityPtr& metric_entity) {
1266
4
  constexpr size_t kStringLen = 4_KB;
1267
1268
4
  size_t prev_sent = 0;
1269
4
  size_t prev_received = 0;
1270
36
  for (int i = 0;; ++i) {
1271
36
    RpcController controller;
1272
36
    controller.set_timeout(5s * kTimeMultiplier);
1273
36
    rpc_test::EchoRequestPB req;
1274
36
    req.set_data(std::string(kStringLen, 'Y'));
1275
36
    rpc_test::EchoResponsePB resp;
1276
36
    ASSERT_OK(proxy->Echo(req, &resp, &controller));
1277
36
    ASSERT_EQ(req.data(), resp.data());
1278
1279
36
    auto sent_counter = ASSERT_RESULT(GetCounter(metric_entity, METRIC_tcp_bytes_sent));
1280
36
    auto received_counter = ASSERT_RESULT(GetCounter(metric_entity, METRIC_tcp_bytes_received));
1281
1282
    // First FLAGS_num_connections_to_server runs were warmup.
1283
    // To avoid counting handshake bytes.
1284
36
    if (i == FLAGS_num_connections_to_server) {
1285
4
      auto sent = sent_counter->value() - prev_sent;
1286
4
      auto received = received_counter->value() - prev_received;
1287
4
      LOG(INFO) << "Sent: " << sent << ", received: " << received;
1288
1289
4
      ASSERT_GT(sent, 10); // Check that metric even work.
1290
4
      ASSERT_LE(sent, kStringLen / 5); // Check that compression work.
1291
4
      ASSERT_GT(received, 10); // Check that metric even work.
1292
4
      ASSERT_LE(received, kStringLen / 5); // Check that compression work.
1293
4
      break;
1294
32
    }
1295
1296
32
    prev_sent = sent_counter->value();
1297
32
    prev_received = received_counter->value();
1298
32
  }
1299
4
}
1300
1301
3
TEST_P(TestRpcCompression, Compression) {
1302
3
  RunCompressionTest([this](CalculatorServiceProxy* proxy) {
1303
3
    TestCompression(proxy, metric_entity());
1304
3
  });
1305
3
}
1306
1307
1.19k
std::string CompressionName(const testing::TestParamInfo<int>& info) {
1308
1.19k
  switch (info.param) {
1309
399
    case 1: return "Zlib";
1310
399
    case 2: return "Snappy";
1311
399
    case 3: return "LZ4";
1312
0
  }
1313
0
  return Format("Unknown compression $0", info.param);
1314
0
}
1315
1316
INSTANTIATE_TEST_CASE_P(, TestRpcCompression, testing::Range(1, 4), CompressionName);
1317
1318
class TestRpcSecureCompression : public TestRpcSecure {
1319
 public:
1320
7
  void SetUp() override {
1321
7
    FLAGS_stream_compression_algo = 1;
1322
7
    TestRpcSecure::SetUp();
1323
7
  }
1324
1325
 protected:
1326
  std::unique_ptr<Messenger> CreateSecureCompressedMessenger(
1327
14
      const std::string& name, const MessengerOptions& options = kDefaultClientMessengerOptions) {
1328
14
    auto builder = CreateMessengerBuilder(name, options);
1329
14
    builder.SetListenProtocol(CompressedStreamProtocol());
1330
14
    builder.AddStreamFactory(
1331
14
        CompressedStreamProtocol(),
1332
14
        CompressedStreamFactory(CreateSecureStreamFactory(), MemTracker::GetRootTracker()));
1333
14
    return EXPECT_RESULT(builder.Build());
1334
14
  }
1335
1336
  template <class F>
1337
  void RunSecureCompressionTest(
1338
7
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1339
14
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1340
14
      return CreateSecureCompressedMessenger(name, options);
1341
14
    }, f);
_ZZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSE_11char_traitsIcEENSE_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESM_SP_
Line
Count
Source
1339
12
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1340
12
      return CreateSecureCompressedMessenger(name, options);
1341
12
    }, f);
rpc-test.cc:_ZZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsEENKUlRKNSt3__112basic_stringIcNSB_11char_traitsIcEENSB_9allocatorIcEEEERKNS0_16MessengerOptionsEE_clESJ_SM_
Line
Count
Source
1339
2
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1340
2
      return CreateSecureCompressedMessenger(name, options);
1341
2
    }, f);
1342
7
  }
_ZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIPFvPNS_8rpc_test22CalculatorServiceProxyEEEEvRKT_RKNS0_17TestServerOptionsE
Line
Count
Source
1338
6
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1339
6
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1340
6
      return CreateSecureCompressedMessenger(name, options);
1341
6
    }, f);
1342
6
  }
rpc-test.cc:_ZN2yb3rpc24TestRpcSecureCompression24RunSecureCompressionTestIZNS0_41TestRpcSecureCompression_Compression_Test8TestBodyEvE3$_5EEvRKT_RKNS0_17TestServerOptionsE
Line
Count
Source
1338
1
      const F& f, const TestServerOptions& server_options = TestServerOptions()) {
1339
1
    RunTest(this, server_options, [this](const std::string& name, const MessengerOptions& options) {
1340
1
      return CreateSecureCompressedMessenger(name, options);
1341
1
    }, f);
1342
1
  }
1343
};
1344
1345
1
TEST_F(TestRpcSecureCompression, Simple) {
1346
1
  RunSecureCompressionTest(&TestSimple);
1347
1
}
1348
1349
1
TEST_F(TestRpcSecureCompression, BigOp) {
1350
1
  RunSecureCompressionTest(&TestBigOp);
1351
1
}
1352
1353
1
TEST_F(TestRpcSecureCompression, BigOpWithSmallBuffer) {
1354
1
  FLAGS_rpc_read_buffer_size = 128;
1355
1
  RunSecureCompressionTest(&TestBigOp);
1356
1
}
1357
1358
1
TEST_F(TestRpcSecureCompression, ManyOps) {
1359
1
  RunSecureCompressionTest(&TestManyOps);
1360
1
}
1361
1362
1
TEST_F(TestRpcSecureCompression, ConcurrentOps) {
1363
1
  RunSecureCompressionTest(&TestConcurrentOps);
1364
1
}
1365
1366
1
TEST_F(TestRpcSecureCompression, CantAllocateReadBuffer) {
1367
1
  RunSecureCompressionTest(&TestCantAllocateReadBuffer, SetupServerForTestCantAllocateReadBuffer());
1368
1
}
1369
1370
1
TEST_F(TestRpcSecureCompression, Compression) {
1371
1
  RunSecureCompressionTest([this](CalculatorServiceProxy* proxy) {
1372
1
    TestCompression(proxy, metric_entity());
1373
1
  });
1374
1
}
1375
1376
} // namespace rpc
1377
} // namespace yb