YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc-test-base.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_RPC_TEST_BASE_H
33
#define YB_RPC_RPC_TEST_BASE_H
34
35
#include <algorithm>
36
#include <list>
37
#include <memory>
38
#include <random>
39
#include <string>
40
41
#include "yb/rpc/acceptor.h"
42
#include "yb/rpc/messenger.h"
43
#include "yb/rpc/reactor.h"
44
#include "yb/rpc/remote_method.h"
45
#include "yb/rpc/rpc_context.h"
46
#include "yb/rpc/rpc_test_util.h"
47
#include "yb/rpc/rtest.pb.h"
48
#include "yb/rpc/rtest.proxy.h"
49
#include "yb/rpc/rtest.service.h"
50
#include "yb/rpc/service_if.h"
51
#include "yb/rpc/service_pool.h"
52
#include "yb/util/faststring.h"
53
#include "yb/util/net/sockaddr.h"
54
#include "yb/util/metrics.h"
55
#include "yb/util/random.h"
56
#include "yb/util/random_util.h"
57
#include "yb/util/stopwatch.h"
58
#include "yb/util/test_util.h"
59
#include "yb/util/trace.h"
60
61
namespace yb { namespace rpc {
62
63
class CalculatorServiceMethods {
64
 public:
65
  static const constexpr auto kAddMethodName = "Add";
66
  static const constexpr auto kDisconnectMethodName = "Disconnect";
67
  static const constexpr auto kEchoMethodName = "Echo";
68
  static const constexpr auto kSendStringsMethodName = "SendStrings";
69
  static const constexpr auto kSleepMethodName = "Sleep";
70
71
41
  static RemoteMethod* AddMethod() {
72
41
    static RemoteMethod method(
73
41
        rpc_test::CalculatorServiceIf::static_service_name(), kAddMethodName);
74
41
    return &method;
75
41
  }
76
77
10.0k
  static RemoteMethod* DisconnectMethod() {
78
10.0k
    static RemoteMethod method(
79
10.0k
        rpc_test::CalculatorServiceIf::static_service_name(), kDisconnectMethodName);
80
10.0k
    return &method;
81
10.0k
  }
82
83
225k
  static RemoteMethod* EchoMethod() {
84
225k
    static RemoteMethod method(
85
225k
        rpc_test::CalculatorServiceIf::static_service_name(), kEchoMethodName);
86
225k
    return &method;
87
225k
  }
88
89
3
  static RemoteMethod* SendStringsMethod() {
90
3
    static RemoteMethod method(
91
3
        rpc_test::CalculatorServiceIf::static_service_name(), kSendStringsMethodName);
92
3
    return &method;
93
3
  }
94
95
32
  static RemoteMethod* SleepMethod() {
96
32
    static RemoteMethod method(
97
32
        rpc_test::CalculatorServiceIf::static_service_name(), kSleepMethodName);
98
32
    return &method;
99
32
  }
100
};
101
102
// Implementation of CalculatorService which just implements the generic
103
// RPC handler (no generated code).
104
class GenericCalculatorService : public ServiceIf {
105
 public:
106
1
  GenericCalculatorService() {
107
1
  }
108
109
  // To match the argument list of the generated CalculatorService.
110
13
  explicit GenericCalculatorService(const scoped_refptr<MetricEntity>& entity) {
111
    // this test doesn't generate metrics, so we ignore the argument.
112
13
  }
113
114
  void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) override;
115
  void Handle(InboundCallPtr incoming) override;
116
117
45
  std::string service_name() const override {
118
45
    return rpc_test::CalculatorServiceIf::static_service_name();
119
45
  }
120
121
 private:
122
  typedef void (GenericCalculatorService::*Method)(InboundCall*);
123
124
  void DoAdd(InboundCall *incoming);
125
  void DoSendStrings(InboundCall* incoming);
126
  void DoSleep(InboundCall *incoming);
127
  void DoEcho(InboundCall *incoming);
128
  void AddMethodToMap(
129
      const RpcServicePtr& service, RpcEndpointMap* map, const char* method_name, Method method);
130
131
  std::deque<std::pair<RemoteMethod, Method>> methods_;
132
};
133
134
struct MessengerOptions {
135
  MessengerOptions() = delete;
136
  int n_reactors;
137
  std::chrono::milliseconds keep_alive_timeout;
138
  int num_connections_to_server = -1;
139
};
140
141
extern const MessengerOptions kDefaultClientMessengerOptions;
142
extern const MessengerOptions kDefaultServerMessengerOptions;
143
144
struct TestServerOptions {
145
  MessengerOptions messenger_options = kDefaultServerMessengerOptions;
146
  size_t n_worker_threads = 3;
147
  Endpoint endpoint;
148
};
149
150
class TestServer {
151
 public:
152
  TestServer(std::unique_ptr<Messenger>&& messenger,
153
             const TestServerOptions& options = TestServerOptions());
154
155
66
  TestServer(TestServer&& rhs) = default;
156
157
  ~TestServer();
158
159
  void Shutdown();
160
161
91
  const Endpoint& bound_endpoint() const { return bound_endpoint_; }
162
77
  Messenger* messenger() const { return messenger_.get(); }
163
1
  ServicePool& service_pool() const { return *service_pool_; }
164
165
  CHECKED_STATUS Start();
166
167
  CHECKED_STATUS RegisterService(std::unique_ptr<ServiceIf> service);
168
169
 private:
170
  std::unique_ptr<Messenger> messenger_;
171
  std::unique_ptr<ThreadPool> thread_pool_;
172
  scoped_refptr<ServicePool> service_pool_;
173
  Endpoint bound_endpoint_;
174
};
175
176
class RpcTestBase : public YBTest {
177
 public:
178
  RpcTestBase();
179
180
  void TearDown() override;
181
182
  std::unique_ptr<Messenger> CreateMessenger(
183
      const string &name,
184
      const MessengerOptions& options = kDefaultClientMessengerOptions);
185
186
  AutoShutdownMessengerHolder CreateAutoShutdownMessengerHolder(
187
      const string &name,
188
      const MessengerOptions& options = kDefaultClientMessengerOptions);
189
190
  MessengerBuilder CreateMessengerBuilder(
191
      const string &name,
192
      const MessengerOptions& options = kDefaultClientMessengerOptions);
193
194
  CHECKED_STATUS DoTestSyncCall(Proxy* proxy, const RemoteMethod *method);
195
196
  void DoTestSidecar(Proxy* proxy,
197
                     std::vector<size_t> sizes,
198
                     Status::Code expected_code = Status::Code::kOk);
199
200
  void DoTestExpectTimeout(Proxy* proxy, const MonoDelta &timeout);
201
202
  // Starts test server.
203
  void StartTestServer(HostPort* server_hostport,
204
                       const TestServerOptions& options = TestServerOptions());
205
  void StartTestServer(Endpoint* server_endpoint,
206
                       const TestServerOptions& options = TestServerOptions());
207
  TestServer StartTestServer(
208
      const TestServerOptions& options, const std::string& name = std::string(),
209
      std::unique_ptr<Messenger> messenger = nullptr);
210
  void StartTestServerWithGeneratedCode(HostPort* server_hostport,
211
                                        const TestServerOptions& options = TestServerOptions());
212
  void StartTestServerWithGeneratedCode(std::unique_ptr<Messenger>&& messenger,
213
                                        HostPort* server_hostport,
214
                                        const TestServerOptions& options = TestServerOptions());
215
216
  // Start a simple socket listening on a local port, returning the address.
217
  // This isn't an RPC server -- just a plain socket which can be helpful for testing.
218
  CHECKED_STATUS StartFakeServer(Socket *listen_sock, HostPort* listen_hostport);
219
220
6
  Messenger* server_messenger() const { return server_->messenger(); }
221
3
  TestServer& server() const { return *server_; }
222
149
  const scoped_refptr<MetricEntity>& metric_entity() const { return metric_entity_; }
223
224
 private:
225
  MetricRegistry metric_registry_;
226
  scoped_refptr<MetricEntity> metric_entity_;
227
  std::unique_ptr<TestServer> server_;
228
};
229
230
} // namespace rpc
231
} // namespace yb
232
233
#endif  // YB_RPC_RPC_TEST_BASE_H