/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_RPC_PROXY_H_ |
34 | | #define YB_RPC_PROXY_H_ |
35 | | |
36 | | #include <atomic> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <string> |
40 | | |
41 | | #include <boost/asio/ip/tcp.hpp> |
42 | | #include <boost/lockfree/queue.hpp> |
43 | | |
44 | | #include "yb/gutil/atomicops.h" |
45 | | #include "yb/rpc/rpc_fwd.h" |
46 | | #include "yb/rpc/growable_buffer.h" |
47 | | #include "yb/rpc/proxy_base.h" |
48 | | #include "yb/rpc/rpc_controller.h" |
49 | | #include "yb/rpc/rpc_header.pb.h" |
50 | | #include "yb/gutil/thread_annotations.h" |
51 | | |
52 | | #include "yb/util/concurrent_pod.h" |
53 | | #include "yb/util/net/net_fwd.h" |
54 | | #include "yb/util/net/net_util.h" |
55 | | #include "yb/util/metrics_fwd.h" |
56 | | #include "yb/util/monotime.h" |
57 | | |
58 | | namespace google { |
59 | | namespace protobuf { |
60 | | class Message; |
61 | | } // namespace protobuf |
62 | | } // namespace google |
63 | | |
64 | | namespace yb { |
65 | | |
66 | | class MemTracker; |
67 | | |
68 | | namespace rpc { |
69 | | |
70 | | YB_DEFINE_ENUM(ResolveState, (kIdle)(kResolving)(kNotifying)(kFinished)); |
71 | | |
72 | | // Interface to send calls to a remote or local service. |
73 | | // |
74 | | // Proxy objects do not map one-to-one with TCP connections. The underlying TCP |
75 | | // connection is not established until the first call, and may be torn down and |
76 | | // re-established as necessary by the messenger. Additionally, the messenger is |
77 | | // likely to multiplex many Proxy objects on the same connection. Or, split the |
78 | | // requests sent over a single proxy across different connections to the server. |
79 | | // |
80 | | // When remote endpoint is blank (i.e. HostPort()), the proxy will attempt to |
81 | | // call the service locally in the messenger instead. |
82 | | // |
83 | | // Proxy objects are thread-safe after initialization only. |
84 | | // Setters on the Proxy are not thread-safe, and calling a setter after any RPC |
85 | | // request has started will cause a fatal error. |
86 | | // |
87 | | // After initialization, multiple threads may make calls using the same proxy object. |
88 | | class Proxy { |
89 | | public: |
90 | | Proxy(ProxyContext* context, |
91 | | const HostPort& remote, |
92 | | const Protocol* protocol = nullptr, |
93 | | const MonoDelta& resolve_cache_timeout = MonoDelta()); |
94 | | ~Proxy(); |
95 | | |
96 | | Proxy(const Proxy&) = delete; |
97 | | void operator=(const Proxy&) = delete; |
98 | | |
99 | | // Call a remote method asynchronously. |
100 | | // |
101 | | // Typically, users will not call this directly, but rather through |
102 | | // a generated Proxy subclass. |
103 | | // |
104 | | // method: the method name to invoke on the remote server. |
105 | | // |
106 | | // req: the request protobuf. This will be serialized immediately, |
107 | | // so the caller may free or otherwise mutate 'req' safely. |
108 | | // |
109 | | // resp: the response protobuf. This protobuf will be mutated upon |
110 | | // completion of the call. The RPC system does not take ownership |
111 | | // of this storage. |
112 | | // |
113 | | // NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation |
114 | | // class corresponding to the parameter and result types of the service method |
115 | | // defined in the service's '.proto' file. |
116 | | // |
117 | | // controller: the RpcController to associate with this call. Each call |
118 | | // must use a unique controller object. Does not take ownership. |
119 | | // |
120 | | // callback: the callback to invoke upon call completion. This callback may be invoked before |
121 | | // AsyncRequest() itself returns, or any time thereafter. It may be invoked either on the |
122 | | // caller's thread or asynchronously. RpcController::set_invoke_callback_mode could be used to |
123 | | // specify on which thread to invoke callback in case of asynchronous invocation. |
124 | | void AsyncRequest(const RemoteMethod* method, |
125 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
126 | | const google::protobuf::Message& req, |
127 | | google::protobuf::Message* resp, |
128 | | RpcController* controller, |
129 | | ResponseCallback callback); |
130 | | |
131 | | void AsyncRequest(const RemoteMethod* method, |
132 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
133 | | const LightweightMessage& req, |
134 | | LightweightMessage* resp, |
135 | | RpcController* controller, |
136 | | ResponseCallback callback); |
137 | | |
138 | | // The same as AsyncRequest(), except that the call blocks until the call |
139 | | // finishes. If the call fails, returns a non-OK result. |
140 | | CHECKED_STATUS SyncRequest(const RemoteMethod* method, |
141 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
142 | | const google::protobuf::Message& req, |
143 | | google::protobuf::Message* resp, |
144 | | RpcController* controller); |
145 | | |
146 | | CHECKED_STATUS SyncRequest(const RemoteMethod* method, |
147 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
148 | | const LightweightMessage& request, |
149 | | LightweightMessage* resp, |
150 | | RpcController* controller); |
151 | | |
152 | | // Is the service local? |
153 | 0 | bool IsServiceLocal() const { return call_local_service_; } |
154 | | |
155 | | scoped_refptr<MetricEntity> metric_entity() const; |
156 | | |
157 | | private: |
158 | | void Resolve(); |
159 | | void HandleResolve(const Result<IpAddress>& result); |
160 | | void ResolveDone(const Result<IpAddress>& result); |
161 | | void NotifyAllFailed(const Status& status); |
162 | | void QueueCall(RpcController* controller, const Endpoint& endpoint); |
163 | | ThreadPool *GetCallbackThreadPool( |
164 | | bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode); |
165 | | |
166 | | // Implements logic for AsyncRequest function, but allows to force to run callback on |
167 | | // reactor thread. This is an optimisation used by SyncRequest function. |
168 | | void DoAsyncRequest(const RemoteMethod* method, |
169 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
170 | | AnyMessageConstPtr req, |
171 | | AnyMessagePtr resp, |
172 | | RpcController* controller, |
173 | | ResponseCallback callback, |
174 | | bool force_run_callback_on_reactor); |
175 | | |
176 | | CHECKED_STATUS DoSyncRequest(const RemoteMethod* method, |
177 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
178 | | AnyMessageConstPtr req, |
179 | | AnyMessagePtr resp, |
180 | | RpcController* controller); |
181 | | |
182 | | static void NotifyFailed(RpcController* controller, const Status& status); |
183 | | |
184 | | ProxyContext* context_; |
185 | | HostPort remote_; |
186 | | const Protocol* const protocol_; |
187 | | mutable std::atomic<size_t> num_calls_{0}; |
188 | | std::shared_ptr<OutboundCallMetrics> outbound_call_metrics_; |
189 | | const bool call_local_service_; |
190 | | |
191 | | std::atomic<ResolveState> resolve_state_{ResolveState::kIdle}; |
192 | | boost::lockfree::queue<RpcController*> resolve_waiters_; |
193 | | ConcurrentPod<Endpoint> resolved_ep_; |
194 | | |
195 | | scoped_refptr<Histogram> latency_hist_; |
196 | | |
197 | | // Number of outbound connections to create per each destination server address. |
198 | | int num_connections_to_server_; |
199 | | |
200 | | std::shared_ptr<MemTracker> mem_tracker_; |
201 | | }; |
202 | | |
203 | | class ProxyCache { |
204 | | public: |
205 | | explicit ProxyCache(ProxyContext* context) |
206 | 80.0k | : context_(context) {} |
207 | | |
208 | | ProxyPtr GetProxy( |
209 | | const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout); |
210 | | |
211 | | ProxyMetricsPtr GetMetrics(const std::string& service_name, ProxyMetricsFactory factory); |
212 | | |
213 | | private: |
214 | | typedef std::pair<HostPort, const Protocol*> ProxyKey; |
215 | | |
216 | | struct ProxyKeyHash { |
217 | 36.9M | size_t operator()(const ProxyKey& key) const { |
218 | 36.9M | size_t result = 0; |
219 | 36.9M | boost::hash_combine(result, HostPortHash()(key.first)); |
220 | 36.9M | boost::hash_combine(result, key.second); |
221 | 36.9M | return result; |
222 | 36.9M | } |
223 | | }; |
224 | | |
225 | | ProxyContext* context_; |
226 | | |
227 | | std::mutex proxy_mutex_; |
228 | | std::unordered_map<ProxyKey, ProxyPtr, ProxyKeyHash> proxies_ GUARDED_BY(proxy_mutex_); |
229 | | |
230 | | std::mutex metrics_mutex_; |
231 | | std::unordered_map<std::string , ProxyMetricsPtr> metrics_ GUARDED_BY(metrics_mutex_); |
232 | | }; |
233 | | |
234 | | } // namespace rpc |
235 | | } // namespace yb |
236 | | |
237 | | #endif // YB_RPC_PROXY_H_ |