YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/mt-rpc-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <string>
34
35
#include <gtest/gtest.h>
36
37
#include "yb/gutil/stl_util.h"
38
#include "yb/gutil/strings/substitute.h"
39
40
#include "yb/rpc/proxy.h"
41
#include "yb/rpc/rpc-test-base.h"
42
#include "yb/rpc/rpc_controller.h"
43
#include "yb/rpc/yb_rpc.h"
44
45
#include "yb/util/countdown_latch.h"
46
#include "yb/util/metrics.h"
47
#include "yb/util/net/net_util.h"
48
#include "yb/util/status_log.h"
49
#include "yb/util/test_macros.h"
50
#include "yb/util/test_util.h"
51
#include "yb/util/thread.h"
52
53
METRIC_DECLARE_counter(rpc_connections_accepted);
54
METRIC_DECLARE_counter(rpcs_queue_overflow);
55
56
using std::string;
57
using std::shared_ptr;
58
using strings::Substitute;
59
using namespace std::literals;
60
61
namespace yb {
62
namespace rpc {
63
64
class MultiThreadedRpcTest : public RpcTestBase {
65
 public:
66
  // Make a single RPC call.
67
  void SingleCall(const HostPort& server_addr, const RemoteMethod* method,
68
3
                  Status* result, CountDownLatch* latch) {
69
3
    LOG(INFO) << "Connecting to " << server_addr;
70
3
    auto client_messenger = CreateAutoShutdownMessengerHolder("ClientSC");
71
3
    Proxy p(client_messenger.get(), server_addr);
72
3
    *result = DoTestSyncCall(&p, method);
73
3
    latch->CountDown();
74
3
  }
75
76
  // Make RPC calls until we see a failure.
77
4
  void HammerServer(const HostPort& server_addr, const RemoteMethod* method, Status* last_result) {
78
4
    auto client_messenger = CreateAutoShutdownMessengerHolder("ClientHS");
79
4
    HammerServerWithMessenger(server_addr, method, last_result, client_messenger.get());
80
4
  }
81
82
  void HammerServerWithMessenger(
83
      const HostPort& server_addr, const RemoteMethod* method, Status* last_result,
84
5
      Messenger* messenger) {
85
5
    LOG(INFO) << "Connecting to " << server_addr;
86
5
    Proxy p(messenger, server_addr);
87
88
5
    int i = 0;
89
739
    while (true) {
90
739
      i++;
91
739
      Status s = DoTestSyncCall(&p, method);
92
739
      if (!s.ok()) {
93
        // Return on first failure.
94
5
        LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: " << s;
95
5
        *last_result = s;
96
5
        return;
97
5
      }
98
739
    }
99
5
  }
100
};
101
102
4
static void AssertShutdown(yb::Thread* thread, const Status* status) {
103
4
  ASSERT_OK(ThreadJoiner(thread).warn_every(500ms).Join());
104
4
  string msg = status->ToString();
105
8
  ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
106
8
              msg.find("Network error") != string::npos ||
107
8
              msg.find("Resource unavailable") != string::npos)
108
8
              << "Status is actually: " << msg;
109
4
}
110
111
// Test making several concurrent RPC calls while shutting down.
112
// Simply verify that we don't hit any CHECK errors.
113
1
TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
114
  // Set up server.
115
1
  HostPort server_addr;
116
1
  StartTestServer(&server_addr);
117
118
1
  const int kNumThreads = 4;
119
1
  scoped_refptr<yb::Thread> threads[kNumThreads];
120
1
  Status statuses[kNumThreads];
121
5
  for (int i = 0; i < kNumThreads; i++) {
122
4
    ASSERT_OK(yb::Thread::Create("test", strings::Substitute("t$0", i),
123
4
      &MultiThreadedRpcTest::HammerServer, this, server_addr,
124
4
      CalculatorServiceMethods::AddMethod(), &statuses[i], &threads[i]));
125
4
  }
126
127
1
  SleepFor(MonoDelta::FromMilliseconds(50));
128
129
  // Shut down server.
130
1
  server().Shutdown();
131
132
5
  for (int i = 0; i < kNumThreads; i++) {
133
4
    AssertShutdown(threads[i].get(), &statuses[i]);
134
4
  }
135
1
}
136
137
// Test shutting down the client messenger exactly as a thread is about to start
138
// a new connection. This is a regression test for KUDU-104.
139
1
TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
140
  // Set up server.
141
1
  HostPort server_addr;
142
1
  StartTestServer(&server_addr);
143
144
1
  std::unique_ptr<Messenger> client_messenger(CreateMessenger("Client"));
145
146
1
  scoped_refptr<yb::Thread> thread;
147
1
  Status status;
148
1
  ASSERT_OK(yb::Thread::Create("test", "test",
149
1
      &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr,
150
1
      CalculatorServiceMethods::AddMethod(), &status, client_messenger.get(), &thread));
151
152
  // Shut down the messenger after a very brief sleep. This often will race so that the
153
  // call gets submitted to the messenger before shutdown, but the negotiation won't have
154
  // started yet. In a debug build this fails about half the time without the bug fix.
155
  // See KUDU-104.
156
1
  SleepFor(MonoDelta::FromMicroseconds(10));
157
1
  client_messenger->Shutdown();
158
159
1
  ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
160
1
  ASSERT_TRUE(status.IsAborted() ||
161
1
              status.IsServiceUnavailable());
162
1
  string msg = status.ToString();
163
1
  SCOPED_TRACE(msg);
164
2
  ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
165
2
              msg.find("Shutdown connection") != string::npos ||
166
2
              msg.find("Unable to start connection negotiation thread") != string::npos ||
167
2
              msg.find("Messenger already stopped") != string::npos)
168
2
              << "Status is actually: " << msg;
169
1
}
170
171
3
void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
172
3
  string msg = status->ToString();
173
3
  if (msg.find("queue is full") != string::npos) {
174
1
    ++(*backpressure);
175
2
  } else if (msg.find("shutting down") != string::npos) {
176
2
    ++(*shutdown);
177
0
  } else if (msg.find("got EOF from remote") != string::npos) {
178
0
    ++(*shutdown);
179
0
  } else {
180
0
    FAIL() << "Unexpected status message: " << msg;
181
0
  }
182
3
}
183
184
// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
185
1
TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
186
1
  const size_t kMaxConcurrency = 2;
187
188
1
  MessengerBuilder bld("messenger1");
189
1
  bld.set_num_reactors(kMaxConcurrency);
190
1
  bld.set_metric_entity(metric_entity());
191
1
  std::unique_ptr<Messenger> server_messenger = ASSERT_RESULT(bld.Build());
192
193
1
  Endpoint server_addr;
194
1
  ASSERT_OK(server_messenger->ListenAddress(
195
1
      CreateConnectionContextFactory<YBInboundConnectionContext>(),
196
1
      Endpoint(), &server_addr));
197
198
1
  std::unique_ptr<ServiceIf> service(new GenericCalculatorService());
199
1
  auto service_name = service->service_name();
200
1
  ThreadPool thread_pool("bogus_pool", kMaxConcurrency, 0UL);
201
1
  scoped_refptr<ServicePool> service_pool(new ServicePool(kMaxConcurrency,
202
1
                                                          &thread_pool,
203
1
                                                          &server_messenger->scheduler(),
204
1
                                                          std::move(service),
205
1
                                                          metric_entity()));
206
1
  ASSERT_OK(server_messenger->RegisterService(service_name, service_pool));
207
1
  ASSERT_OK(server_messenger->StartAcceptor());
208
209
1
  scoped_refptr<yb::Thread> threads[3];
210
1
  Status status[3];
211
1
  CountDownLatch latch(1);
212
4
  for (int i = 0; i < 3; i++) {
213
3
    ASSERT_OK(yb::Thread::Create("test", strings::Substitute("t$0", i),
214
3
      &MultiThreadedRpcTest::SingleCall, this, HostPort::FromBoundEndpoint(server_addr),
215
3
      CalculatorServiceMethods::AddMethod(), &status[i], &latch, &threads[i]));
216
3
  }
217
218
  // One should immediately fail due to backpressure. The latch is only initialized
219
  // to wait for the first of three threads to finish.
220
1
  latch.Wait();
221
222
  // The rest would time out after 10 sec, but we help them along.
223
1
  server_messenger->UnregisterAllServices();
224
1
  service_pool->Shutdown();
225
1
  thread_pool.Shutdown();
226
1
  server_messenger->Shutdown();
227
228
3
  for (const auto& thread : threads) {
229
3
    ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
230
3
  }
231
232
  // Verify that one error was due to backpressure.
233
1
  int errors_backpressure = 0;
234
1
  int errors_shutdown = 0;
235
236
3
  for (const auto& s : status) {
237
3
    IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
238
3
  }
239
240
1
  ASSERT_EQ(1, errors_backpressure);
241
1
  ASSERT_EQ(2, errors_shutdown);
242
243
  // Check that RPC queue overflow metric is 1
244
1
  Counter *rpcs_queue_overflow =
245
1
    METRIC_rpcs_queue_overflow.Instantiate(metric_entity()).get();
246
1
  ASSERT_EQ(1, rpcs_queue_overflow->value());
247
1
}
248
249
8
static void HammerServerWithTCPConns(const Endpoint& addr) {
250
14
  while (true) {
251
13
    Socket socket;
252
13
    CHECK_OK(socket.Init(0));
253
13
    Status s;
254
13
    LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
255
13
      s = socket.Connect(addr);
256
13
    }
257
13
    if (!s.ok()) {
258
0
      CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
259
7
      return;
260
7
    }
261
6
    CHECK_OK(socket.Close());
262
6
  }
263
8
}
264
265
// Regression test for KUDU-128.
266
// Test that shuts down the server while new TCP connections are incoming.
267
1
TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
268
  // Set up server.
269
1
  Endpoint server_addr;
270
1
  StartTestServer(&server_addr);
271
272
  // Start a number of threads which just hammer the server with TCP connections.
273
1
  std::vector<scoped_refptr<yb::Thread>> threads;
274
9
  for (int i = 0; i < 8; i++) {
275
8
    scoped_refptr<yb::Thread> new_thread;
276
8
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("t$0", i),
277
8
        &HammerServerWithTCPConns, server_addr, &new_thread));
278
8
    threads.push_back(new_thread);
279
8
  }
280
281
  // Sleep until the server has started to actually accept some connections from the
282
  // test threads.
283
1
  scoped_refptr<Counter> conns_accepted =
284
1
    METRIC_rpc_connections_accepted.Instantiate(metric_entity());
285
1
  while (conns_accepted->value() == 0) {
286
0
    SleepFor(MonoDelta::FromMicroseconds(100));
287
0
  }
288
289
  // Shutdown while there are still new connections appearing.
290
1
  server().Shutdown();
291
292
8
  for (scoped_refptr<yb::Thread>& t : threads) {
293
8
    ASSERT_OK(ThreadJoiner(t.get()).warn_every(500ms).Join());
294
8
  }
295
1
}
296
297
1
TEST_F(MultiThreadedRpcTest, MemoryLimit) {
298
1
  constexpr size_t kMemoryLimit = 1;
299
1
  auto read_buffer_tracker = MemTracker::FindOrCreateTracker(kMemoryLimit, "Read Buffer");
300
301
  // Set up server.
302
1
  HostPort server_addr;
303
1
  StartTestServer(&server_addr);
304
305
1
  LOG(INFO) << "Server " << server_addr;
306
307
1
  std::atomic<bool> stop(false);
308
1
  MessengerOptions options = kDefaultClientMessengerOptions;
309
1
  options.n_reactors = 1;
310
1
  options.num_connections_to_server = 1;
311
1
  auto messenger_for_big = CreateAutoShutdownMessengerHolder("Client for big", options);
312
1
  auto messenger_for_small = CreateAutoShutdownMessengerHolder("Client for small", options);
313
1
  Proxy proxy_for_big(messenger_for_big.get(), server_addr);
314
1
  Proxy proxy_for_small(messenger_for_small.get(), server_addr);
315
316
1
  std::vector<std::thread> threads;
317
11
  while (threads.size() != 10) {
318
10
    bool big_call = threads.size() == 0;
319
9
    auto proxy = big_call ? &proxy_for_big : &proxy_for_small;
320
10
    threads.emplace_back([proxy, server_addr, &stop, big_call] {
321
10
      rpc_test::EchoRequestPB req;
322
9
      req.set_data(std::string(big_call ? 5_MB : 5_KB, 'X'));
323
224k
      while (!stop.load(std::memory_order_acquire)) {
324
224k
        rpc_test::EchoResponsePB resp;
325
224k
        RpcController controller;
326
224k
        controller.set_timeout(500ms);
327
224k
        auto status = proxy->SyncRequest(
328
224k
            CalculatorServiceMethods::EchoMethod(), /* method_metrics= */ nullptr, req, &resp,
329
224k
            &controller);
330
224k
        if (big_call) {
331
20
          ASSERT_NOK(status);
332
224k
        } else {
333
224k
          ASSERT_OK(status);
334
224k
        }
335
224k
      }
336
10
    });
337
10
  }
338
339
1
  std::this_thread::sleep_for(10s);
340
341
1
  stop.store(true, std::memory_order_release);
342
343
10
  for (auto& thread : threads) {
344
10
    thread.join();
345
10
  }
346
1
}
347
348
} // namespace rpc
349
} // namespace yb
350