/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/proxy.h" |
34 | | |
35 | | #include <cstdint> |
36 | | #include <functional> |
37 | | #include <memory> |
38 | | #include <sstream> |
39 | | #include <vector> |
40 | | |
41 | | #include <glog/logging.h> |
42 | | |
43 | | #include "yb/rpc/local_call.h" |
44 | | #include "yb/rpc/lightweight_message.h" |
45 | | #include "yb/rpc/proxy_context.h" |
46 | | #include "yb/rpc/outbound_call.h" |
47 | | #include "yb/rpc/remote_method.h" |
48 | | #include "yb/rpc/rpc_controller.h" |
49 | | #include "yb/rpc/rpc_header.pb.h" |
50 | | |
51 | | #include "yb/util/backoff_waiter.h" |
52 | | #include "yb/util/countdown_latch.h" |
53 | | #include "yb/util/metrics.h" |
54 | | #include "yb/util/net/dns_resolver.h" |
55 | | #include "yb/util/net/sockaddr.h" |
56 | | #include "yb/util/net/socket.h" |
57 | | #include "yb/util/result.h" |
58 | | #include "yb/util/status.h" |
59 | | |
60 | | DEFINE_int32(num_connections_to_server, 8, |
61 | | "Number of underlying connections to each server"); |
62 | | |
63 | | DEFINE_int32(proxy_resolve_cache_ms, 5000, |
64 | | "Time in milliseconds to cache resolution result in Proxy"); |
65 | | |
66 | | using namespace std::literals; |
67 | | |
68 | | using google::protobuf::Message; |
69 | | using std::string; |
70 | | using std::shared_ptr; |
71 | | |
72 | | namespace yb { |
73 | | namespace rpc { |
74 | | |
75 | | Proxy::Proxy(ProxyContext* context, |
76 | | const HostPort& remote, |
77 | | const Protocol* protocol, |
78 | | const MonoDelta& resolve_cache_timeout) |
79 | | : context_(context), |
80 | | remote_(remote), |
81 | | protocol_(protocol ? protocol : context_->DefaultProtocol()), |
82 | | outbound_call_metrics_(context_->metric_entity() ? |
83 | | std::make_shared<OutboundCallMetrics>(context_->metric_entity()) : nullptr), |
84 | | call_local_service_(remote == HostPort()), |
85 | | resolve_waiters_(30), |
86 | | resolved_ep_(std::chrono::milliseconds( |
87 | | resolve_cache_timeout.Initialized() ? resolve_cache_timeout.ToMilliseconds() |
88 | | : FLAGS_proxy_resolve_cache_ms)), |
89 | | latency_hist_(ScopedDnsTracker::active_metric()), |
90 | | // Use the context->num_connections_to_server() here as opposed to directly reading the |
91 | | // FLAGS_num_connections_to_server, because the flag value could have changed since then. |
92 | 145k | num_connections_to_server_(context_->num_connections_to_server()) { |
93 | 57 | VLOG(1) << "Create proxy to " << remote << " with num_connections_to_server=" |
94 | 57 | << num_connections_to_server_; |
95 | 145k | if (context_->parent_mem_tracker()) { |
96 | 144k | mem_tracker_ = MemTracker::FindOrCreateTracker( |
97 | 144k | "Queueing", context_->parent_mem_tracker()); |
98 | 144k | } |
99 | 145k | } |
100 | | |
101 | 31.2k | Proxy::~Proxy() { |
102 | 31.2k | const auto kTimeout = 5s; |
103 | 31.2k | const auto kMaxWaitTime = 100ms; |
104 | 31.2k | BackoffWaiter waiter(std::chrono::steady_clock::now() + kTimeout, kMaxWaitTime); |
105 | 31.2k | for (;;) { |
106 | 31.2k | auto expected = ResolveState::kIdle; |
107 | 31.2k | if (resolve_state_.compare_exchange_weak( |
108 | 31.2k | expected, ResolveState::kFinished, std::memory_order_acq_rel)) { |
109 | 31.2k | break; |
110 | 31.2k | } |
111 | 18.4E | if (!waiter.Wait()) { |
112 | 0 | LOG(DFATAL) << "Timeout to wait resolve to complete"; |
113 | 0 | break; |
114 | 0 | } |
115 | 18.4E | } |
116 | 31.2k | } |
117 | | |
118 | | void Proxy::AsyncRequest(const RemoteMethod* method, |
119 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
120 | | const google::protobuf::Message& req, |
121 | | google::protobuf::Message* resp, |
122 | | RpcController* controller, |
123 | 24.5M | ResponseCallback callback) { |
124 | 24.5M | DoAsyncRequest( |
125 | 24.5M | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller, |
126 | 24.5M | std::move(callback), false /* force_run_callback_on_reactor */); |
127 | 24.5M | } |
128 | | |
129 | | void Proxy::AsyncRequest(const RemoteMethod* method, |
130 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
131 | | const LightweightMessage& req, |
132 | | LightweightMessage* resp, |
133 | | RpcController* controller, |
134 | 0 | ResponseCallback callback) { |
135 | 0 | DoAsyncRequest( |
136 | 0 | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller, |
137 | 0 | std::move(callback), false /* force_run_callback_on_reactor */); |
138 | 0 | } |
139 | | |
140 | | ThreadPool* Proxy::GetCallbackThreadPool( |
141 | 19.9M | bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode) { |
142 | 19.9M | if (force_run_callback_on_reactor) { |
143 | 1.05M | return nullptr; |
144 | 1.05M | } |
145 | 18.9M | switch (invoke_callback_mode) { |
146 | 774k | case InvokeCallbackMode::kReactorThread: |
147 | 774k | return nullptr; |
148 | 0 | break; |
149 | 7.67M | case InvokeCallbackMode::kThreadPoolNormal: |
150 | 7.67M | return &context_->CallbackThreadPool(ServicePriority::kNormal); |
151 | 10.4M | case InvokeCallbackMode::kThreadPoolHigh: |
152 | 10.4M | return &context_->CallbackThreadPool(ServicePriority::kHigh); |
153 | 0 | } |
154 | 0 | FATAL_INVALID_ENUM_VALUE(InvokeCallbackMode, invoke_callback_mode); |
155 | 0 | } |
156 | | |
157 | | void Proxy::DoAsyncRequest(const RemoteMethod* method, |
158 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
159 | | AnyMessageConstPtr req, |
160 | | AnyMessagePtr resp, |
161 | | RpcController* controller, |
162 | | ResponseCallback callback, |
163 | 25.5M | bool force_run_callback_on_reactor) { |
164 | 1.95k | CHECK(controller->call_.get() == nullptr) << "Controller should be reset"; |
165 | | |
166 | 25.5M | controller->call_ = |
167 | 25.5M | call_local_service_ ? |
168 | 5.58M | std::make_shared<LocalOutboundCall>(method, |
169 | 5.58M | outbound_call_metrics_, |
170 | 5.58M | resp, |
171 | 5.58M | controller, |
172 | 5.58M | context_->rpc_metrics(), |
173 | 5.58M | std::move(callback)) : |
174 | 19.9M | std::make_shared<OutboundCall>(method, |
175 | 19.9M | outbound_call_metrics_, |
176 | 19.9M | std::move(method_metrics), |
177 | 19.9M | resp, |
178 | 19.9M | controller, |
179 | 19.9M | context_->rpc_metrics(), |
180 | 19.9M | std::move(callback), |
181 | 19.9M | GetCallbackThreadPool( |
182 | 19.9M | force_run_callback_on_reactor, |
183 | 19.9M | controller->invoke_callback_mode())); |
184 | 25.5M | auto call = controller->call_.get(); |
185 | 25.5M | Status s = call->SetRequestParam(req, mem_tracker_); |
186 | 25.5M | if (PREDICT_FALSE(!s.ok())) { |
187 | | // Failed to serialize request: likely the request is missing a required |
188 | | // field. |
189 | 651 | NotifyFailed(controller, s); |
190 | 651 | return; |
191 | 651 | } |
192 | | |
193 | 25.5M | if (controller->timeout().Initialized() && controller->timeout() > 3600s) { |
194 | 0 | LOG(DFATAL) << "Too big timeout specified: " << controller->timeout(); |
195 | 0 | } |
196 | | |
197 | 25.5M | if (call_local_service_) { |
198 | | // For local call, the response message buffer is reused when an RPC call is retried. So clear |
199 | | // the buffer before calling the RPC method. |
200 | 5.58M | if (resp.is_lightweight()) { |
201 | 0 | resp.lightweight()->Clear(); |
202 | 5.58M | } else { |
203 | 5.58M | resp.protobuf()->Clear(); |
204 | 5.58M | } |
205 | 5.58M | call->SetQueued(); |
206 | 5.58M | call->SetSent(); |
207 | | // If currrent thread is RPC worker thread, it is ok to call the handler in the current thread. |
208 | | // Otherwise, enqueue the call to be handled by the service's handler thread. |
209 | 5.58M | const shared_ptr<LocalYBInboundCall>& local_call = |
210 | 5.58M | static_cast<LocalOutboundCall*>(call)->CreateLocalInboundCall(); |
211 | 5.58M | Queue queue(!controller->allow_local_calls_in_curr_thread() || |
212 | 4.45M | !ThreadPool::IsCurrentThreadRpcWorker()); |
213 | 5.58M | context_->Handle(local_call, queue); |
214 | 19.9M | } else { |
215 | 19.9M | auto ep = resolved_ep_.Load(); |
216 | 19.9M | if (ep.address().is_unspecified()) { |
217 | 238k | CHECK(resolve_waiters_.push(controller)); |
218 | 238k | Resolve(); |
219 | 19.7M | } else { |
220 | 19.7M | QueueCall(controller, ep); |
221 | 19.7M | } |
222 | 19.9M | } |
223 | 25.5M | } |
224 | | |
225 | 239k | void Proxy::Resolve() { |
226 | 239k | auto expected = ResolveState::kIdle; |
227 | 239k | if (!resolve_state_.compare_exchange_strong( |
228 | 3.93k | expected, ResolveState::kResolving, std::memory_order_acq_rel)) { |
229 | 3.93k | return; |
230 | 3.93k | } |
231 | | |
232 | 235k | auto endpoint = resolved_ep_.Load(); |
233 | 235k | if (!endpoint.address().is_unspecified()) { |
234 | 6 | expected = ResolveState::kResolving; |
235 | 6 | while (resolve_state_.compare_exchange_strong( |
236 | 6 | expected, ResolveState::kNotifying, std::memory_order_acq_rel)) { |
237 | 6 | RpcController* controller = nullptr; |
238 | 12 | while (resolve_waiters_.pop(controller)) { |
239 | 6 | QueueCall(controller, endpoint); |
240 | 6 | } |
241 | 6 | resolve_state_.store(ResolveState::kIdle, std::memory_order_release); |
242 | 6 | if (resolve_waiters_.empty()) { |
243 | 6 | break; |
244 | 6 | } |
245 | 0 | expected = ResolveState::kIdle; |
246 | 0 | } |
247 | 6 | return; |
248 | 6 | } |
249 | | |
250 | 235k | const std::string kService = ""; |
251 | | |
252 | 235k | auto address = TryFastResolve(remote_.host()); |
253 | 235k | if (address) { |
254 | 235k | HandleResolve(*address); |
255 | 235k | return; |
256 | 235k | } |
257 | | |
258 | 504 | auto latency_metric = std::make_shared<ScopedLatencyMetric>(latency_hist_, Auto::kFalse); |
259 | | |
260 | 504 | context_->resolver().AsyncResolve( |
261 | 504 | remote_.host(), [this, latency_metric = std::move(latency_metric)]( |
262 | 0 | const Result<IpAddress>& result) { |
263 | 0 | latency_metric->Finish(); |
264 | 0 | HandleResolve(result); |
265 | 0 | }); |
266 | 504 | } |
267 | | |
268 | 0 | void Proxy::NotifyAllFailed(const Status& status) { |
269 | 0 | RpcController* controller = nullptr; |
270 | 0 | while (resolve_waiters_.pop(controller)) { |
271 | 0 | NotifyFailed(controller, status); |
272 | 0 | } |
273 | 0 | } |
274 | | |
275 | 235k | void Proxy::HandleResolve(const Result<IpAddress>& result) { |
276 | 235k | auto expected = ResolveState::kResolving; |
277 | 235k | if (resolve_state_.compare_exchange_strong( |
278 | 235k | expected, ResolveState::kNotifying, std::memory_order_acq_rel)) { |
279 | 235k | ResolveDone(result); |
280 | 235k | resolve_state_.store(ResolveState::kIdle, std::memory_order_release); |
281 | 235k | if (!resolve_waiters_.empty()) { |
282 | 0 | Resolve(); |
283 | 0 | } |
284 | 235k | } |
285 | 235k | } |
286 | | |
287 | 234k | void Proxy::ResolveDone(const Result<IpAddress>& result) { |
288 | 234k | if (!result.ok()) { |
289 | 0 | LOG(WARNING) << "Resolve " << remote_.host() << " failed: " << result.status(); |
290 | 0 | NotifyAllFailed(result.status()); |
291 | 0 | return; |
292 | 0 | } |
293 | | |
294 | 234k | Endpoint endpoint(*result, remote_.port()); |
295 | 234k | resolved_ep_.Store(endpoint); |
296 | | |
297 | 234k | RpcController* controller = nullptr; |
298 | 474k | while (resolve_waiters_.pop(controller)) { |
299 | 239k | QueueCall(controller, endpoint); |
300 | 239k | } |
301 | 234k | } |
302 | | |
303 | 19.9M | void Proxy::QueueCall(RpcController* controller, const Endpoint& endpoint) { |
304 | 19.9M | uint8_t idx = num_calls_.fetch_add(1) % num_connections_to_server_; |
305 | 19.9M | ConnectionId conn_id(endpoint, idx, protocol_); |
306 | 19.9M | controller->call_->SetConnectionId(conn_id, &remote_.host()); |
307 | 19.9M | context_->QueueOutboundCall(controller->call_); |
308 | 19.9M | } |
309 | | |
310 | 651 | void Proxy::NotifyFailed(RpcController* controller, const Status& status) { |
311 | | // We should retain reference to call, so it would not be destroyed during SetFailed. |
312 | 651 | auto call = controller->call_; |
313 | 651 | call->SetFailed(status); |
314 | 651 | } |
315 | | |
316 | | Status Proxy::DoSyncRequest(const RemoteMethod* method, |
317 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
318 | | AnyMessageConstPtr request, |
319 | | AnyMessagePtr resp, |
320 | 1.05M | RpcController* controller) { |
321 | 1.05M | CountDownLatch latch(1); |
322 | | // We want to execute this fast callback in reactor thread to avoid overhead on putting in |
323 | | // separate pool. |
324 | 1.05M | DoAsyncRequest( |
325 | 1.05M | method, std::move(method_metrics), request, DCHECK_NOTNULL(resp), controller, |
326 | 1.05M | latch.CountDownCallback(), true /* force_run_callback_on_reactor */); |
327 | 1.05M | latch.Wait(); |
328 | 1.05M | return controller->status(); |
329 | 1.05M | } |
330 | | |
331 | | Status Proxy::SyncRequest(const RemoteMethod* method, |
332 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
333 | | const google::protobuf::Message& req, |
334 | | google::protobuf::Message* resp, |
335 | 1.05M | RpcController* controller) { |
336 | 1.05M | return DoSyncRequest( |
337 | 1.05M | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller); |
338 | 1.05M | } |
339 | | |
340 | | Status Proxy::SyncRequest(const RemoteMethod* method, |
341 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
342 | | const LightweightMessage& req, |
343 | | LightweightMessage* resp, |
344 | 0 | RpcController* controller) { |
345 | 0 | return DoSyncRequest( |
346 | 0 | method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller); |
347 | 0 | } |
348 | | |
349 | 0 | scoped_refptr<MetricEntity> Proxy::metric_entity() const { |
350 | 0 | return context_->metric_entity(); |
351 | 0 | } |
352 | | |
353 | | ProxyPtr ProxyCache::GetProxy( |
354 | 5.07M | const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout) { |
355 | 5.07M | ProxyKey key(remote, protocol); |
356 | 5.07M | std::lock_guard<std::mutex> lock(proxy_mutex_); |
357 | 5.07M | auto it = proxies_.find(key); |
358 | 5.07M | if (it == proxies_.end()) { |
359 | 144k | it = proxies_.emplace( |
360 | 144k | key, std::make_unique<Proxy>(context_, remote, protocol, resolve_cache_timeout)).first; |
361 | 144k | } |
362 | 5.07M | return it->second; |
363 | 5.07M | } |
364 | | |
365 | | ProxyMetricsPtr ProxyCache::GetMetrics( |
366 | 5.07M | const std::string& service_name, ProxyMetricsFactory factory) { |
367 | 5.07M | std::lock_guard<std::mutex> lock(metrics_mutex_); |
368 | 5.07M | auto it = metrics_.find(service_name); |
369 | 5.07M | if (it != metrics_.end()) { |
370 | 4.87M | return it->second; |
371 | 4.87M | } |
372 | | |
373 | 198k | auto entity = context_->metric_entity(); |
374 | 190k | auto metrics = entity ? factory(entity) : nullptr; |
375 | 198k | metrics_.emplace(service_name, metrics); |
376 | 198k | return metrics; |
377 | 198k | } |
378 | | |
379 | | ProxyBase::ProxyBase(const std::string& service_name, ProxyMetricsFactory metrics_factory, |
380 | | ProxyCache* cache, const HostPort& remote, |
381 | | const Protocol* protocol, |
382 | | const MonoDelta& resolve_cache_timeout) |
383 | | : proxy_(cache->GetProxy(remote, protocol, resolve_cache_timeout)), |
384 | 5.07M | metrics_(cache->GetMetrics(service_name, metrics_factory)) { |
385 | 5.07M | } |
386 | | |
387 | | } // namespace rpc |
388 | | } // namespace yb |