YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/proxy.h"
34
35
#include <cstdint>
36
#include <functional>
37
#include <memory>
38
#include <sstream>
39
#include <vector>
40
41
#include <glog/logging.h>
42
43
#include "yb/rpc/local_call.h"
44
#include "yb/rpc/lightweight_message.h"
45
#include "yb/rpc/proxy_context.h"
46
#include "yb/rpc/outbound_call.h"
47
#include "yb/rpc/remote_method.h"
48
#include "yb/rpc/rpc_controller.h"
49
#include "yb/rpc/rpc_header.pb.h"
50
51
#include "yb/util/backoff_waiter.h"
52
#include "yb/util/countdown_latch.h"
53
#include "yb/util/metrics.h"
54
#include "yb/util/net/dns_resolver.h"
55
#include "yb/util/net/sockaddr.h"
56
#include "yb/util/net/socket.h"
57
#include "yb/util/result.h"
58
#include "yb/util/status.h"
59
60
DEFINE_int32(num_connections_to_server, 8,
61
             "Number of underlying connections to each server");
62
63
DEFINE_int32(proxy_resolve_cache_ms, 5000,
64
             "Time in milliseconds to cache resolution result in Proxy");
65
66
using namespace std::literals;
67
68
using google::protobuf::Message;
69
using std::string;
70
using std::shared_ptr;
71
72
namespace yb {
73
namespace rpc {
74
75
Proxy::Proxy(ProxyContext* context,
76
             const HostPort& remote,
77
             const Protocol* protocol,
78
             const MonoDelta& resolve_cache_timeout)
79
    : context_(context),
80
      remote_(remote),
81
      protocol_(protocol ? protocol : context_->DefaultProtocol()),
82
      outbound_call_metrics_(context_->metric_entity() ?
83
          std::make_shared<OutboundCallMetrics>(context_->metric_entity()) : nullptr),
84
      call_local_service_(remote == HostPort()),
85
      resolve_waiters_(30),
86
      resolved_ep_(std::chrono::milliseconds(
87
          resolve_cache_timeout.Initialized() ? resolve_cache_timeout.ToMilliseconds()
88
                                              : FLAGS_proxy_resolve_cache_ms)),
89
      latency_hist_(ScopedDnsTracker::active_metric()),
90
      // Use the context->num_connections_to_server() here as opposed to directly reading the
91
      // FLAGS_num_connections_to_server, because the flag value could have changed since then.
92
212k
      num_connections_to_server_(context_->num_connections_to_server()) {
93
212k
  VLOG(1) << "Create proxy to " << remote << " with num_connections_to_server="
94
47
          << num_connections_to_server_;
95
212k
  if (context_->parent_mem_tracker()) {
96
212k
    mem_tracker_ = MemTracker::FindOrCreateTracker(
97
212k
        "Queueing", context_->parent_mem_tracker());
98
212k
  }
99
212k
}
100
101
44.6k
Proxy::~Proxy() {
102
44.6k
  const auto kTimeout = 5s;
103
44.6k
  const auto kMaxWaitTime = 100ms;
104
44.6k
  BackoffWaiter waiter(std::chrono::steady_clock::now() + kTimeout, kMaxWaitTime);
105
44.6k
  for (;;) {
106
44.5k
    auto expected = ResolveState::kIdle;
107
44.5k
    if (resolve_state_.compare_exchange_weak(
108
44.5k
        expected, ResolveState::kFinished, std::memory_order_acq_rel)) {
109
44.5k
      break;
110
44.5k
    }
111
18.4E
    if (!waiter.Wait()) {
112
0
      LOG(DFATAL) << "Timeout to wait resolve to complete";
113
0
      break;
114
0
    }
115
18.4E
  }
116
44.6k
}
117
118
void Proxy::AsyncRequest(const RemoteMethod* method,
119
                         std::shared_ptr<const OutboundMethodMetrics> method_metrics,
120
                         const google::protobuf::Message& req,
121
                         google::protobuf::Message* resp,
122
                         RpcController* controller,
123
79.8M
                         ResponseCallback callback) {
124
79.8M
  DoAsyncRequest(
125
79.8M
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller,
126
79.8M
      std::move(callback), false /* force_run_callback_on_reactor */);
127
79.8M
}
128
129
void Proxy::AsyncRequest(const RemoteMethod* method,
130
                         std::shared_ptr<const OutboundMethodMetrics> method_metrics,
131
                         const LightweightMessage& req,
132
                         LightweightMessage* resp,
133
                         RpcController* controller,
134
0
                         ResponseCallback callback) {
135
0
  DoAsyncRequest(
136
0
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller,
137
0
      std::move(callback), false /* force_run_callback_on_reactor */);
138
0
}
139
140
ThreadPool* Proxy::GetCallbackThreadPool(
141
75.6M
    bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode) {
142
75.6M
  if (force_run_callback_on_reactor) {
143
6.63M
    return nullptr;
144
6.63M
  }
145
68.9M
  switch (invoke_callback_mode) {
146
2.16M
    case InvokeCallbackMode::kReactorThread:
147
2.16M
      return nullptr;
148
0
      break;
149
39.5M
    case InvokeCallbackMode::kThreadPoolNormal:
150
39.5M
      return &context_->CallbackThreadPool(ServicePriority::kNormal);
151
27.2M
    case InvokeCallbackMode::kThreadPoolHigh:
152
27.2M
      return &context_->CallbackThreadPool(ServicePriority::kHigh);
153
68.9M
  }
154
0
  FATAL_INVALID_ENUM_VALUE(InvokeCallbackMode, invoke_callback_mode);
155
0
}
156
157
void Proxy::DoAsyncRequest(const RemoteMethod* method,
158
                           std::shared_ptr<const OutboundMethodMetrics> method_metrics,
159
                           AnyMessageConstPtr req,
160
                           AnyMessagePtr resp,
161
                           RpcController* controller,
162
                           ResponseCallback callback,
163
86.4M
                           bool force_run_callback_on_reactor) {
164
86.4M
  CHECK
(controller->call_.get() == nullptr) << "Controller should be reset"31.1k
;
165
166
86.4M
  controller->call_ =
167
86.4M
      call_local_service_ ?
168
10.7M
      std::make_shared<LocalOutboundCall>(method,
169
10.7M
                                          outbound_call_metrics_,
170
10.7M
                                          resp,
171
10.7M
                                          controller,
172
10.7M
                                          context_->rpc_metrics(),
173
10.7M
                                          std::move(callback)) :
174
86.4M
      std::make_shared<OutboundCall>(method,
175
75.6M
                                     outbound_call_metrics_,
176
75.6M
                                     std::move(method_metrics),
177
75.6M
                                     resp,
178
75.6M
                                     controller,
179
75.6M
                                     context_->rpc_metrics(),
180
75.6M
                                     std::move(callback),
181
75.6M
                                     GetCallbackThreadPool(
182
75.6M
                                         force_run_callback_on_reactor,
183
75.6M
                                         controller->invoke_callback_mode()));
184
86.4M
  auto call = controller->call_.get();
185
86.4M
  Status s = call->SetRequestParam(req, mem_tracker_);
186
86.4M
  if (PREDICT_FALSE(!s.ok())) {
187
    // Failed to serialize request: likely the request is missing a required
188
    // field.
189
1.37k
    NotifyFailed(controller, s);
190
1.37k
    return;
191
1.37k
  }
192
193
86.4M
  
if (86.4M
controller->timeout().Initialized()86.4M
&& controller->timeout() > 3600s) {
194
0
    LOG(DFATAL) << "Too big timeout specified: " << controller->timeout();
195
0
  }
196
197
86.4M
  if (call_local_service_) {
198
    // For local call, the response message buffer is reused when an RPC call is retried. So clear
199
    // the buffer before calling the RPC method.
200
10.7M
    if (resp.is_lightweight()) {
201
0
      resp.lightweight()->Clear();
202
10.7M
    } else {
203
10.7M
      resp.protobuf()->Clear();
204
10.7M
    }
205
10.7M
    call->SetQueued();
206
10.7M
    call->SetSent();
207
    // If currrent thread is RPC worker thread, it is ok to call the handler in the current thread.
208
    // Otherwise, enqueue the call to be handled by the service's handler thread.
209
10.7M
    const shared_ptr<LocalYBInboundCall>& local_call =
210
10.7M
        static_cast<LocalOutboundCall*>(call)->CreateLocalInboundCall();
211
10.7M
    Queue queue(!controller->allow_local_calls_in_curr_thread() ||
212
10.7M
                
!ThreadPool::IsCurrentThreadRpcWorker()8.80M
);
213
10.7M
    context_->Handle(local_call, queue);
214
75.6M
  } else {
215
75.6M
    auto ep = resolved_ep_.Load();
216
75.6M
    if (ep.address().is_unspecified()) {
217
2.25M
      CHECK(resolve_waiters_.push(controller));
218
2.25M
      Resolve();
219
73.3M
    } else {
220
73.3M
      QueueCall(controller, ep);
221
73.3M
    }
222
75.6M
  }
223
86.4M
}
224
225
2.25M
void Proxy::Resolve() {
226
2.25M
  auto expected = ResolveState::kIdle;
227
2.25M
  if (!resolve_state_.compare_exchange_strong(
228
2.25M
      expected, ResolveState::kResolving, std::memory_order_acq_rel)) {
229
8.36k
    return;
230
8.36k
  }
231
232
2.25M
  auto endpoint = resolved_ep_.Load();
233
2.25M
  if (!endpoint.address().is_unspecified()) {
234
49
    expected = ResolveState::kResolving;
235
49
    while (resolve_state_.compare_exchange_strong(
236
49
        expected, ResolveState::kNotifying, std::memory_order_acq_rel)) {
237
49
      RpcController* controller = nullptr;
238
89
      while (resolve_waiters_.pop(controller)) {
239
40
        QueueCall(controller, endpoint);
240
40
      }
241
49
      resolve_state_.store(ResolveState::kIdle, std::memory_order_release);
242
49
      if (resolve_waiters_.empty()) {
243
49
        break;
244
49
      }
245
0
      expected = ResolveState::kIdle;
246
0
    }
247
49
    return;
248
49
  }
249
250
2.25M
  const std::string kService = "";
251
252
2.25M
  auto address = TryFastResolve(remote_.host());
253
2.25M
  if (address) {
254
2.24M
    HandleResolve(*address);
255
2.24M
    return;
256
2.24M
  }
257
258
1.66k
  auto latency_metric = std::make_shared<ScopedLatencyMetric>(latency_hist_, Auto::kFalse);
259
260
1.66k
  context_->resolver().AsyncResolve(
261
1.66k
      remote_.host(), [this, latency_metric = std::move(latency_metric)](
262
1.66k
          const Result<IpAddress>& result) {
263
0
    latency_metric->Finish();
264
0
    HandleResolve(result);
265
0
  });
266
1.66k
}
267
268
0
void Proxy::NotifyAllFailed(const Status& status) {
269
0
  RpcController* controller = nullptr;
270
0
  while (resolve_waiters_.pop(controller)) {
271
0
    NotifyFailed(controller, status);
272
0
  }
273
0
}
274
275
2.24M
void Proxy::HandleResolve(const Result<IpAddress>& result) {
276
2.24M
  auto expected = ResolveState::kResolving;
277
2.24M
  if (resolve_state_.compare_exchange_strong(
278
2.25M
      expected, ResolveState::kNotifying, std::memory_order_acq_rel)) {
279
2.25M
    ResolveDone(result);
280
2.25M
    resolve_state_.store(ResolveState::kIdle, std::memory_order_release);
281
2.25M
    if (!resolve_waiters_.empty()) {
282
1
      Resolve();
283
1
    }
284
2.25M
  }
285
2.24M
}
286
287
2.25M
void Proxy::ResolveDone(const Result<IpAddress>& result) {
288
2.25M
  if (!result.ok()) {
289
0
    LOG(WARNING) << "Resolve " << remote_.host() << " failed: " << result.status();
290
0
    NotifyAllFailed(result.status());
291
0
    return;
292
0
  }
293
294
2.25M
  Endpoint endpoint(*result, remote_.port());
295
2.25M
  resolved_ep_.Store(endpoint);
296
297
2.25M
  RpcController* controller = nullptr;
298
4.50M
  while (resolve_waiters_.pop(controller)) {
299
2.25M
    QueueCall(controller, endpoint);
300
2.25M
  }
301
2.25M
}
302
303
75.5M
void Proxy::QueueCall(RpcController* controller, const Endpoint& endpoint) {
304
75.5M
  uint8_t idx = num_calls_.fetch_add(1) % num_connections_to_server_;
305
75.5M
  ConnectionId conn_id(endpoint, idx, protocol_);
306
75.5M
  controller->call_->SetConnectionId(conn_id, &remote_.host());
307
75.5M
  context_->QueueOutboundCall(controller->call_);
308
75.5M
}
309
310
1.37k
void Proxy::NotifyFailed(RpcController* controller, const Status& status) {
311
  // We should retain reference to call, so it would not be destroyed during SetFailed.
312
1.37k
  auto call = controller->call_;
313
1.37k
  call->SetFailed(status);
314
1.37k
}
315
316
Status Proxy::DoSyncRequest(const RemoteMethod* method,
317
                            std::shared_ptr<const OutboundMethodMetrics> method_metrics,
318
                            AnyMessageConstPtr request,
319
                            AnyMessagePtr resp,
320
6.63M
                            RpcController* controller) {
321
6.63M
  CountDownLatch latch(1);
322
  // We want to execute this fast callback in reactor thread to avoid overhead on putting in
323
  // separate pool.
324
6.63M
  DoAsyncRequest(
325
6.63M
      method, std::move(method_metrics), request, DCHECK_NOTNULL(resp), controller,
326
6.63M
      latch.CountDownCallback(), true /* force_run_callback_on_reactor */);
327
6.63M
  latch.Wait();
328
6.63M
  return controller->status();
329
6.63M
}
330
331
Status Proxy::SyncRequest(const RemoteMethod* method,
332
                          std::shared_ptr<const OutboundMethodMetrics> method_metrics,
333
                          const google::protobuf::Message& req,
334
                          google::protobuf::Message* resp,
335
6.63M
                          RpcController* controller) {
336
6.63M
  return DoSyncRequest(
337
6.63M
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller);
338
6.63M
}
339
340
Status Proxy::SyncRequest(const RemoteMethod* method,
341
                          std::shared_ptr<const OutboundMethodMetrics> method_metrics,
342
                          const LightweightMessage& req,
343
                          LightweightMessage* resp,
344
0
                          RpcController* controller) {
345
0
  return DoSyncRequest(
346
0
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller);
347
0
}
348
349
0
scoped_refptr<MetricEntity> Proxy::metric_entity() const {
350
0
  return context_->metric_entity();
351
0
}
352
353
ProxyPtr ProxyCache::GetProxy(
354
36.7M
    const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout) {
355
36.7M
  ProxyKey key(remote, protocol);
356
36.7M
  std::lock_guard<std::mutex> lock(proxy_mutex_);
357
36.7M
  auto it = proxies_.find(key);
358
36.7M
  if (it == proxies_.end()) {
359
211k
    it = proxies_.emplace(
360
211k
        key, std::make_unique<Proxy>(context_, remote, protocol, resolve_cache_timeout)).first;
361
211k
  }
362
36.7M
  return it->second;
363
36.7M
}
364
365
ProxyMetricsPtr ProxyCache::GetMetrics(
366
36.7M
    const std::string& service_name, ProxyMetricsFactory factory) {
367
36.7M
  std::lock_guard<std::mutex> lock(metrics_mutex_);
368
36.7M
  auto it = metrics_.find(service_name);
369
36.7M
  if (it != metrics_.end()) {
370
36.4M
    return it->second;
371
36.4M
  }
372
373
282k
  auto entity = context_->metric_entity();
374
282k
  auto metrics = entity ? 
factory(entity)267k
:
nullptr14.9k
;
375
282k
  metrics_.emplace(service_name, metrics);
376
282k
  return metrics;
377
36.7M
}
378
379
ProxyBase::ProxyBase(const std::string& service_name, ProxyMetricsFactory metrics_factory,
380
                     ProxyCache* cache, const HostPort& remote,
381
                     const Protocol* protocol,
382
                     const MonoDelta& resolve_cache_timeout)
383
    : proxy_(cache->GetProxy(remote, protocol, resolve_cache_timeout)),
384
36.7M
      metrics_(cache->GetMetrics(service_name, metrics_factory)) {
385
36.7M
}
386
387
}  // namespace rpc
388
}  // namespace yb