/Users/deen/code/yugabyte-db/src/yb/rpc/rpc-test-base.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_RPC_RPC_TEST_BASE_H |
33 | | #define YB_RPC_RPC_TEST_BASE_H |
34 | | |
35 | | #include <algorithm> |
36 | | #include <list> |
37 | | #include <memory> |
38 | | #include <random> |
39 | | #include <string> |
40 | | |
41 | | #include "yb/rpc/acceptor.h" |
42 | | #include "yb/rpc/messenger.h" |
43 | | #include "yb/rpc/reactor.h" |
44 | | #include "yb/rpc/remote_method.h" |
45 | | #include "yb/rpc/rpc_context.h" |
46 | | #include "yb/rpc/rpc_test_util.h" |
47 | | #include "yb/rpc/rtest.pb.h" |
48 | | #include "yb/rpc/rtest.proxy.h" |
49 | | #include "yb/rpc/rtest.service.h" |
50 | | #include "yb/rpc/service_if.h" |
51 | | #include "yb/rpc/service_pool.h" |
52 | | #include "yb/util/faststring.h" |
53 | | #include "yb/util/net/sockaddr.h" |
54 | | #include "yb/util/metrics.h" |
55 | | #include "yb/util/random.h" |
56 | | #include "yb/util/random_util.h" |
57 | | #include "yb/util/stopwatch.h" |
58 | | #include "yb/util/test_util.h" |
59 | | #include "yb/util/trace.h" |
60 | | |
61 | | namespace yb { namespace rpc { |
62 | | |
63 | | class CalculatorServiceMethods { |
64 | | public: |
65 | | static const constexpr auto kAddMethodName = "Add"; |
66 | | static const constexpr auto kDisconnectMethodName = "Disconnect"; |
67 | | static const constexpr auto kEchoMethodName = "Echo"; |
68 | | static const constexpr auto kSendStringsMethodName = "SendStrings"; |
69 | | static const constexpr auto kSleepMethodName = "Sleep"; |
70 | | |
71 | 41 | static RemoteMethod* AddMethod() { |
72 | 41 | static RemoteMethod method( |
73 | 41 | rpc_test::CalculatorServiceIf::static_service_name(), kAddMethodName); |
74 | 41 | return &method; |
75 | 41 | } |
76 | | |
77 | 10.0k | static RemoteMethod* DisconnectMethod() { |
78 | 10.0k | static RemoteMethod method( |
79 | 10.0k | rpc_test::CalculatorServiceIf::static_service_name(), kDisconnectMethodName); |
80 | 10.0k | return &method; |
81 | 10.0k | } |
82 | | |
83 | 225k | static RemoteMethod* EchoMethod() { |
84 | 225k | static RemoteMethod method( |
85 | 225k | rpc_test::CalculatorServiceIf::static_service_name(), kEchoMethodName); |
86 | 225k | return &method; |
87 | 225k | } |
88 | | |
89 | 3 | static RemoteMethod* SendStringsMethod() { |
90 | 3 | static RemoteMethod method( |
91 | 3 | rpc_test::CalculatorServiceIf::static_service_name(), kSendStringsMethodName); |
92 | 3 | return &method; |
93 | 3 | } |
94 | | |
95 | 32 | static RemoteMethod* SleepMethod() { |
96 | 32 | static RemoteMethod method( |
97 | 32 | rpc_test::CalculatorServiceIf::static_service_name(), kSleepMethodName); |
98 | 32 | return &method; |
99 | 32 | } |
100 | | }; |
101 | | |
102 | | // Implementation of CalculatorService which just implements the generic |
103 | | // RPC handler (no generated code). |
104 | | class GenericCalculatorService : public ServiceIf { |
105 | | public: |
106 | 1 | GenericCalculatorService() { |
107 | 1 | } |
108 | | |
109 | | // To match the argument list of the generated CalculatorService. |
110 | 13 | explicit GenericCalculatorService(const scoped_refptr<MetricEntity>& entity) { |
111 | | // this test doesn't generate metrics, so we ignore the argument. |
112 | 13 | } |
113 | | |
114 | | void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) override; |
115 | | void Handle(InboundCallPtr incoming) override; |
116 | | |
117 | 45 | std::string service_name() const override { |
118 | 45 | return rpc_test::CalculatorServiceIf::static_service_name(); |
119 | 45 | } |
120 | | |
121 | | private: |
122 | | typedef void (GenericCalculatorService::*Method)(InboundCall*); |
123 | | |
124 | | void DoAdd(InboundCall *incoming); |
125 | | void DoSendStrings(InboundCall* incoming); |
126 | | void DoSleep(InboundCall *incoming); |
127 | | void DoEcho(InboundCall *incoming); |
128 | | void AddMethodToMap( |
129 | | const RpcServicePtr& service, RpcEndpointMap* map, const char* method_name, Method method); |
130 | | |
131 | | std::deque<std::pair<RemoteMethod, Method>> methods_; |
132 | | }; |
133 | | |
134 | | struct MessengerOptions { |
135 | | MessengerOptions() = delete; |
136 | | int n_reactors; |
137 | | std::chrono::milliseconds keep_alive_timeout; |
138 | | int num_connections_to_server = -1; |
139 | | }; |
140 | | |
141 | | extern const MessengerOptions kDefaultClientMessengerOptions; |
142 | | extern const MessengerOptions kDefaultServerMessengerOptions; |
143 | | |
144 | | struct TestServerOptions { |
145 | | MessengerOptions messenger_options = kDefaultServerMessengerOptions; |
146 | | size_t n_worker_threads = 3; |
147 | | Endpoint endpoint; |
148 | | }; |
149 | | |
150 | | class TestServer { |
151 | | public: |
152 | | TestServer(std::unique_ptr<Messenger>&& messenger, |
153 | | const TestServerOptions& options = TestServerOptions()); |
154 | | |
155 | 66 | TestServer(TestServer&& rhs) = default; |
156 | | |
157 | | ~TestServer(); |
158 | | |
159 | | void Shutdown(); |
160 | | |
161 | 91 | const Endpoint& bound_endpoint() const { return bound_endpoint_; } |
162 | 77 | Messenger* messenger() const { return messenger_.get(); } |
163 | 1 | ServicePool& service_pool() const { return *service_pool_; } |
164 | | |
165 | | CHECKED_STATUS Start(); |
166 | | |
167 | | CHECKED_STATUS RegisterService(std::unique_ptr<ServiceIf> service); |
168 | | |
169 | | private: |
170 | | std::unique_ptr<Messenger> messenger_; |
171 | | std::unique_ptr<ThreadPool> thread_pool_; |
172 | | scoped_refptr<ServicePool> service_pool_; |
173 | | Endpoint bound_endpoint_; |
174 | | }; |
175 | | |
176 | | class RpcTestBase : public YBTest { |
177 | | public: |
178 | | RpcTestBase(); |
179 | | |
180 | | void TearDown() override; |
181 | | |
182 | | std::unique_ptr<Messenger> CreateMessenger( |
183 | | const string &name, |
184 | | const MessengerOptions& options = kDefaultClientMessengerOptions); |
185 | | |
186 | | AutoShutdownMessengerHolder CreateAutoShutdownMessengerHolder( |
187 | | const string &name, |
188 | | const MessengerOptions& options = kDefaultClientMessengerOptions); |
189 | | |
190 | | MessengerBuilder CreateMessengerBuilder( |
191 | | const string &name, |
192 | | const MessengerOptions& options = kDefaultClientMessengerOptions); |
193 | | |
194 | | CHECKED_STATUS DoTestSyncCall(Proxy* proxy, const RemoteMethod *method); |
195 | | |
196 | | void DoTestSidecar(Proxy* proxy, |
197 | | std::vector<size_t> sizes, |
198 | | Status::Code expected_code = Status::Code::kOk); |
199 | | |
200 | | void DoTestExpectTimeout(Proxy* proxy, const MonoDelta &timeout); |
201 | | |
202 | | // Starts test server. |
203 | | void StartTestServer(HostPort* server_hostport, |
204 | | const TestServerOptions& options = TestServerOptions()); |
205 | | void StartTestServer(Endpoint* server_endpoint, |
206 | | const TestServerOptions& options = TestServerOptions()); |
207 | | TestServer StartTestServer( |
208 | | const TestServerOptions& options, const std::string& name = std::string(), |
209 | | std::unique_ptr<Messenger> messenger = nullptr); |
210 | | void StartTestServerWithGeneratedCode(HostPort* server_hostport, |
211 | | const TestServerOptions& options = TestServerOptions()); |
212 | | void StartTestServerWithGeneratedCode(std::unique_ptr<Messenger>&& messenger, |
213 | | HostPort* server_hostport, |
214 | | const TestServerOptions& options = TestServerOptions()); |
215 | | |
216 | | // Start a simple socket listening on a local port, returning the address. |
217 | | // This isn't an RPC server -- just a plain socket which can be helpful for testing. |
218 | | CHECKED_STATUS StartFakeServer(Socket *listen_sock, HostPort* listen_hostport); |
219 | | |
220 | 6 | Messenger* server_messenger() const { return server_->messenger(); } |
221 | 3 | TestServer& server() const { return *server_; } |
222 | 149 | const scoped_refptr<MetricEntity>& metric_entity() const { return metric_entity_; } |
223 | | |
224 | | private: |
225 | | MetricRegistry metric_registry_; |
226 | | scoped_refptr<MetricEntity> metric_entity_; |
227 | | std::unique_ptr<TestServer> server_; |
228 | | }; |
229 | | |
230 | | } // namespace rpc |
231 | | } // namespace yb |
232 | | |
233 | | #endif // YB_RPC_RPC_TEST_BASE_H |