/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/proxy.h" |
34 | | |
35 | | #include <cstdint> |
36 | | #include <functional> |
37 | | #include <memory> |
38 | | #include <sstream> |
39 | | #include <vector> |
40 | | |
41 | | #include <glog/logging.h> |
42 | | |
43 | | #include "yb/rpc/local_call.h" |
44 | | #include "yb/rpc/lightweight_message.h" |
45 | | #include "yb/rpc/proxy_context.h" |
46 | | #include "yb/rpc/outbound_call.h" |
47 | | #include "yb/rpc/remote_method.h" |
48 | | #include "yb/rpc/rpc_controller.h" |
49 | | #include "yb/rpc/rpc_header.pb.h" |
50 | | |
51 | | #include "yb/util/backoff_waiter.h" |
52 | | #include "yb/util/countdown_latch.h" |
53 | | #include "yb/util/metrics.h" |
54 | | #include "yb/util/net/dns_resolver.h" |
55 | | #include "yb/util/net/sockaddr.h" |
56 | | #include "yb/util/net/socket.h" |
57 | | #include "yb/util/result.h" |
58 | | #include "yb/util/status.h" |
59 | | |
60 | | DEFINE_int32(num_connections_to_server, 8, |
61 | | "Number of underlying connections to each server"); |
62 | | |
63 | | DEFINE_int32(proxy_resolve_cache_ms, 5000, |
64 | | "Time in milliseconds to cache resolution result in Proxy"); |
65 | | |
66 | | using namespace std::literals; |
67 | | |
68 | | using google::protobuf::Message; |
69 | | using std::string; |
70 | | using std::shared_ptr; |
71 | | |
72 | | namespace yb { |
73 | | namespace rpc { |
74 | | |
75 | | Proxy::Proxy(ProxyContext* context, |
76 | | const HostPort& remote, |
77 | | const Protocol* protocol, |
78 | | const MonoDelta& resolve_cache_timeout) |
79 | | : context_(context), |
80 | | remote_(remote), |
81 | | protocol_(protocol ? protocol : context_->DefaultProtocol()), |
82 | | outbound_call_metrics_(context_->metric_entity() ? |
83 | | std::make_shared<OutboundCallMetrics>(context_->metric_entity()) : nullptr), |
84 | | call_local_service_(remote == HostPort()), |
85 | | resolve_waiters_(30), |
86 | | resolved_ep_(std::chrono::milliseconds( |
87 | | resolve_cache_timeout.Initialized() ? resolve_cache_timeout.ToMilliseconds() |
88 | | : FLAGS_proxy_resolve_cache_ms)), |
89 | | latency_hist_(ScopedDnsTracker::active_metric()), |
90 | | // Use the context->num_connections_to_server() here as opposed to directly reading the |
91 | | // FLAGS_num_connections_to_server, because the flag value could have changed since then. |
92 | 212k | num_connections_to_server_(context_->num_connections_to_server()) { |
93 | 212k | VLOG(1) << "Create proxy to " << remote << " with num_connections_to_server=" |
94 | 47 | << num_connections_to_server_; |
95 | 212k | if (context_->parent_mem_tracker()) { |
96 | 212k | mem_tracker_ = MemTracker::FindOrCreateTracker( |
97 | 212k | "Queueing", context_->parent_mem_tracker()); |
98 | 212k | } |
99 | 212k | } |
100 | | |
101 | 44.6k | Proxy::~Proxy() { |
102 | 44.6k | const auto kTimeout = 5s; |
103 | 44.6k | const auto kMaxWaitTime = 100ms; |
104 | 44.6k | BackoffWaiter waiter(std::chrono::steady_clock::now() + kTimeout, kMaxWaitTime); |
105 | 44.6k | for (;;) { |
106 | 44.5k | auto expected = ResolveState::kIdle; |
107 | 44.5k | if (resolve_state_.compare_exchange_weak( |
108 | 44.5k | expected, ResolveState::kFinished, std::memory_order_acq_rel)) { |
109 | 44.5k | break; |
110 | 44.5k | } |
111 | 18.4E | if (!waiter.Wait()) { |
112 | 0 | LOG(DFATAL) << "Timeout to wait resolve to complete"; |
113 | 0 | break; |
114 | 0 | } |
115 | 18.4E | } |
116 | 44.6k | } |
117 | | |
118 | | void Proxy::AsyncRequest(const RemoteMethod* method, |
119 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
120 | | const google::protobuf::Message& req, |
121 | | google::protobuf::Message* resp, |
122 | | RpcController* controller, |
123 | 79.8M | ResponseCallback callback) { |
124 | 79.8M | DoAsyncRequest( |
125 | 79.8M | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller, |
126 | 79.8M | std::move(callback), false /* force_run_callback_on_reactor */); |
127 | 79.8M | } |
128 | | |
129 | | void Proxy::AsyncRequest(const RemoteMethod* method, |
130 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
131 | | const LightweightMessage& req, |
132 | | LightweightMessage* resp, |
133 | | RpcController* controller, |
134 | 0 | ResponseCallback callback) { |
135 | 0 | DoAsyncRequest( |
136 | 0 | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller, |
137 | 0 | std::move(callback), false /* force_run_callback_on_reactor */); |
138 | 0 | } |
139 | | |
140 | | ThreadPool* Proxy::GetCallbackThreadPool( |
141 | 75.6M | bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode) { |
142 | 75.6M | if (force_run_callback_on_reactor) { |
143 | 6.63M | return nullptr; |
144 | 6.63M | } |
145 | 68.9M | switch (invoke_callback_mode) { |
146 | 2.16M | case InvokeCallbackMode::kReactorThread: |
147 | 2.16M | return nullptr; |
148 | 0 | break; |
149 | 39.5M | case InvokeCallbackMode::kThreadPoolNormal: |
150 | 39.5M | return &context_->CallbackThreadPool(ServicePriority::kNormal); |
151 | 27.2M | case InvokeCallbackMode::kThreadPoolHigh: |
152 | 27.2M | return &context_->CallbackThreadPool(ServicePriority::kHigh); |
153 | 68.9M | } |
154 | 0 | FATAL_INVALID_ENUM_VALUE(InvokeCallbackMode, invoke_callback_mode); |
155 | 0 | } |
156 | | |
157 | | void Proxy::DoAsyncRequest(const RemoteMethod* method, |
158 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
159 | | AnyMessageConstPtr req, |
160 | | AnyMessagePtr resp, |
161 | | RpcController* controller, |
162 | | ResponseCallback callback, |
163 | 86.4M | bool force_run_callback_on_reactor) { |
164 | 86.4M | CHECK(controller->call_.get() == nullptr) << "Controller should be reset"31.1k ; |
165 | | |
166 | 86.4M | controller->call_ = |
167 | 86.4M | call_local_service_ ? |
168 | 10.7M | std::make_shared<LocalOutboundCall>(method, |
169 | 10.7M | outbound_call_metrics_, |
170 | 10.7M | resp, |
171 | 10.7M | controller, |
172 | 10.7M | context_->rpc_metrics(), |
173 | 10.7M | std::move(callback)) : |
174 | 86.4M | std::make_shared<OutboundCall>(method, |
175 | 75.6M | outbound_call_metrics_, |
176 | 75.6M | std::move(method_metrics), |
177 | 75.6M | resp, |
178 | 75.6M | controller, |
179 | 75.6M | context_->rpc_metrics(), |
180 | 75.6M | std::move(callback), |
181 | 75.6M | GetCallbackThreadPool( |
182 | 75.6M | force_run_callback_on_reactor, |
183 | 75.6M | controller->invoke_callback_mode())); |
184 | 86.4M | auto call = controller->call_.get(); |
185 | 86.4M | Status s = call->SetRequestParam(req, mem_tracker_); |
186 | 86.4M | if (PREDICT_FALSE(!s.ok())) { |
187 | | // Failed to serialize request: likely the request is missing a required |
188 | | // field. |
189 | 1.37k | NotifyFailed(controller, s); |
190 | 1.37k | return; |
191 | 1.37k | } |
192 | | |
193 | 86.4M | if (86.4M controller->timeout().Initialized()86.4M && controller->timeout() > 3600s) { |
194 | 0 | LOG(DFATAL) << "Too big timeout specified: " << controller->timeout(); |
195 | 0 | } |
196 | | |
197 | 86.4M | if (call_local_service_) { |
198 | | // For local call, the response message buffer is reused when an RPC call is retried. So clear |
199 | | // the buffer before calling the RPC method. |
200 | 10.7M | if (resp.is_lightweight()) { |
201 | 0 | resp.lightweight()->Clear(); |
202 | 10.7M | } else { |
203 | 10.7M | resp.protobuf()->Clear(); |
204 | 10.7M | } |
205 | 10.7M | call->SetQueued(); |
206 | 10.7M | call->SetSent(); |
207 | | // If currrent thread is RPC worker thread, it is ok to call the handler in the current thread. |
208 | | // Otherwise, enqueue the call to be handled by the service's handler thread. |
209 | 10.7M | const shared_ptr<LocalYBInboundCall>& local_call = |
210 | 10.7M | static_cast<LocalOutboundCall*>(call)->CreateLocalInboundCall(); |
211 | 10.7M | Queue queue(!controller->allow_local_calls_in_curr_thread() || |
212 | 10.7M | !ThreadPool::IsCurrentThreadRpcWorker()8.80M ); |
213 | 10.7M | context_->Handle(local_call, queue); |
214 | 75.6M | } else { |
215 | 75.6M | auto ep = resolved_ep_.Load(); |
216 | 75.6M | if (ep.address().is_unspecified()) { |
217 | 2.25M | CHECK(resolve_waiters_.push(controller)); |
218 | 2.25M | Resolve(); |
219 | 73.3M | } else { |
220 | 73.3M | QueueCall(controller, ep); |
221 | 73.3M | } |
222 | 75.6M | } |
223 | 86.4M | } |
224 | | |
225 | 2.25M | void Proxy::Resolve() { |
226 | 2.25M | auto expected = ResolveState::kIdle; |
227 | 2.25M | if (!resolve_state_.compare_exchange_strong( |
228 | 2.25M | expected, ResolveState::kResolving, std::memory_order_acq_rel)) { |
229 | 8.36k | return; |
230 | 8.36k | } |
231 | | |
232 | 2.25M | auto endpoint = resolved_ep_.Load(); |
233 | 2.25M | if (!endpoint.address().is_unspecified()) { |
234 | 49 | expected = ResolveState::kResolving; |
235 | 49 | while (resolve_state_.compare_exchange_strong( |
236 | 49 | expected, ResolveState::kNotifying, std::memory_order_acq_rel)) { |
237 | 49 | RpcController* controller = nullptr; |
238 | 89 | while (resolve_waiters_.pop(controller)) { |
239 | 40 | QueueCall(controller, endpoint); |
240 | 40 | } |
241 | 49 | resolve_state_.store(ResolveState::kIdle, std::memory_order_release); |
242 | 49 | if (resolve_waiters_.empty()) { |
243 | 49 | break; |
244 | 49 | } |
245 | 0 | expected = ResolveState::kIdle; |
246 | 0 | } |
247 | 49 | return; |
248 | 49 | } |
249 | | |
250 | 2.25M | const std::string kService = ""; |
251 | | |
252 | 2.25M | auto address = TryFastResolve(remote_.host()); |
253 | 2.25M | if (address) { |
254 | 2.24M | HandleResolve(*address); |
255 | 2.24M | return; |
256 | 2.24M | } |
257 | | |
258 | 1.66k | auto latency_metric = std::make_shared<ScopedLatencyMetric>(latency_hist_, Auto::kFalse); |
259 | | |
260 | 1.66k | context_->resolver().AsyncResolve( |
261 | 1.66k | remote_.host(), [this, latency_metric = std::move(latency_metric)]( |
262 | 1.66k | const Result<IpAddress>& result) { |
263 | 0 | latency_metric->Finish(); |
264 | 0 | HandleResolve(result); |
265 | 0 | }); |
266 | 1.66k | } |
267 | | |
268 | 0 | void Proxy::NotifyAllFailed(const Status& status) { |
269 | 0 | RpcController* controller = nullptr; |
270 | 0 | while (resolve_waiters_.pop(controller)) { |
271 | 0 | NotifyFailed(controller, status); |
272 | 0 | } |
273 | 0 | } |
274 | | |
275 | 2.24M | void Proxy::HandleResolve(const Result<IpAddress>& result) { |
276 | 2.24M | auto expected = ResolveState::kResolving; |
277 | 2.24M | if (resolve_state_.compare_exchange_strong( |
278 | 2.25M | expected, ResolveState::kNotifying, std::memory_order_acq_rel)) { |
279 | 2.25M | ResolveDone(result); |
280 | 2.25M | resolve_state_.store(ResolveState::kIdle, std::memory_order_release); |
281 | 2.25M | if (!resolve_waiters_.empty()) { |
282 | 1 | Resolve(); |
283 | 1 | } |
284 | 2.25M | } |
285 | 2.24M | } |
286 | | |
287 | 2.25M | void Proxy::ResolveDone(const Result<IpAddress>& result) { |
288 | 2.25M | if (!result.ok()) { |
289 | 0 | LOG(WARNING) << "Resolve " << remote_.host() << " failed: " << result.status(); |
290 | 0 | NotifyAllFailed(result.status()); |
291 | 0 | return; |
292 | 0 | } |
293 | | |
294 | 2.25M | Endpoint endpoint(*result, remote_.port()); |
295 | 2.25M | resolved_ep_.Store(endpoint); |
296 | | |
297 | 2.25M | RpcController* controller = nullptr; |
298 | 4.50M | while (resolve_waiters_.pop(controller)) { |
299 | 2.25M | QueueCall(controller, endpoint); |
300 | 2.25M | } |
301 | 2.25M | } |
302 | | |
303 | 75.5M | void Proxy::QueueCall(RpcController* controller, const Endpoint& endpoint) { |
304 | 75.5M | uint8_t idx = num_calls_.fetch_add(1) % num_connections_to_server_; |
305 | 75.5M | ConnectionId conn_id(endpoint, idx, protocol_); |
306 | 75.5M | controller->call_->SetConnectionId(conn_id, &remote_.host()); |
307 | 75.5M | context_->QueueOutboundCall(controller->call_); |
308 | 75.5M | } |
309 | | |
310 | 1.37k | void Proxy::NotifyFailed(RpcController* controller, const Status& status) { |
311 | | // We should retain reference to call, so it would not be destroyed during SetFailed. |
312 | 1.37k | auto call = controller->call_; |
313 | 1.37k | call->SetFailed(status); |
314 | 1.37k | } |
315 | | |
316 | | Status Proxy::DoSyncRequest(const RemoteMethod* method, |
317 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
318 | | AnyMessageConstPtr request, |
319 | | AnyMessagePtr resp, |
320 | 6.63M | RpcController* controller) { |
321 | 6.63M | CountDownLatch latch(1); |
322 | | // We want to execute this fast callback in reactor thread to avoid overhead on putting in |
323 | | // separate pool. |
324 | 6.63M | DoAsyncRequest( |
325 | 6.63M | method, std::move(method_metrics), request, DCHECK_NOTNULL(resp), controller, |
326 | 6.63M | latch.CountDownCallback(), true /* force_run_callback_on_reactor */); |
327 | 6.63M | latch.Wait(); |
328 | 6.63M | return controller->status(); |
329 | 6.63M | } |
330 | | |
331 | | Status Proxy::SyncRequest(const RemoteMethod* method, |
332 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
333 | | const google::protobuf::Message& req, |
334 | | google::protobuf::Message* resp, |
335 | 6.63M | RpcController* controller) { |
336 | 6.63M | return DoSyncRequest( |
337 | 6.63M | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller); |
338 | 6.63M | } |
339 | | |
340 | | Status Proxy::SyncRequest(const RemoteMethod* method, |
341 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
342 | | const LightweightMessage& req, |
343 | | LightweightMessage* resp, |
344 | 0 | RpcController* controller) { |
345 | 0 | return DoSyncRequest( |
346 | 0 | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller); |
347 | 0 | } |
348 | | |
349 | 0 | scoped_refptr<MetricEntity> Proxy::metric_entity() const { |
350 | 0 | return context_->metric_entity(); |
351 | 0 | } |
352 | | |
353 | | ProxyPtr ProxyCache::GetProxy( |
354 | 36.7M | const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout) { |
355 | 36.7M | ProxyKey key(remote, protocol); |
356 | 36.7M | std::lock_guard<std::mutex> lock(proxy_mutex_); |
357 | 36.7M | auto it = proxies_.find(key); |
358 | 36.7M | if (it == proxies_.end()) { |
359 | 211k | it = proxies_.emplace( |
360 | 211k | key, std::make_unique<Proxy>(context_, remote, protocol, resolve_cache_timeout)).first; |
361 | 211k | } |
362 | 36.7M | return it->second; |
363 | 36.7M | } |
364 | | |
365 | | ProxyMetricsPtr ProxyCache::GetMetrics( |
366 | 36.7M | const std::string& service_name, ProxyMetricsFactory factory) { |
367 | 36.7M | std::lock_guard<std::mutex> lock(metrics_mutex_); |
368 | 36.7M | auto it = metrics_.find(service_name); |
369 | 36.7M | if (it != metrics_.end()) { |
370 | 36.4M | return it->second; |
371 | 36.4M | } |
372 | | |
373 | 282k | auto entity = context_->metric_entity(); |
374 | 282k | auto metrics = entity ? factory(entity)267k : nullptr14.9k ; |
375 | 282k | metrics_.emplace(service_name, metrics); |
376 | 282k | return metrics; |
377 | 36.7M | } |
378 | | |
379 | | ProxyBase::ProxyBase(const std::string& service_name, ProxyMetricsFactory metrics_factory, |
380 | | ProxyCache* cache, const HostPort& remote, |
381 | | const Protocol* protocol, |
382 | | const MonoDelta& resolve_cache_timeout) |
383 | | : proxy_(cache->GetProxy(remote, protocol, resolve_cache_timeout)), |
384 | 36.7M | metrics_(cache->GetMetrics(service_name, metrics_factory)) { |
385 | 36.7M | } |
386 | | |
387 | | } // namespace rpc |
388 | | } // namespace yb |