YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/proxy.h"
34
35
#include <cstdint>
36
#include <functional>
37
#include <memory>
38
#include <sstream>
39
#include <vector>
40
41
#include <glog/logging.h>
42
43
#include "yb/rpc/local_call.h"
44
#include "yb/rpc/lightweight_message.h"
45
#include "yb/rpc/proxy_context.h"
46
#include "yb/rpc/outbound_call.h"
47
#include "yb/rpc/remote_method.h"
48
#include "yb/rpc/rpc_controller.h"
49
#include "yb/rpc/rpc_header.pb.h"
50
51
#include "yb/util/backoff_waiter.h"
52
#include "yb/util/countdown_latch.h"
53
#include "yb/util/metrics.h"
54
#include "yb/util/net/dns_resolver.h"
55
#include "yb/util/net/sockaddr.h"
56
#include "yb/util/net/socket.h"
57
#include "yb/util/result.h"
58
#include "yb/util/status.h"
59
60
DEFINE_int32(num_connections_to_server, 8,
61
             "Number of underlying connections to each server");
62
63
DEFINE_int32(proxy_resolve_cache_ms, 5000,
64
             "Time in milliseconds to cache resolution result in Proxy");
65
66
using namespace std::literals;
67
68
using google::protobuf::Message;
69
using std::string;
70
using std::shared_ptr;
71
72
namespace yb {
73
namespace rpc {
74
75
Proxy::Proxy(ProxyContext* context,
76
             const HostPort& remote,
77
             const Protocol* protocol,
78
             const MonoDelta& resolve_cache_timeout)
79
    : context_(context),
80
      remote_(remote),
81
      protocol_(protocol ? protocol : context_->DefaultProtocol()),
82
      outbound_call_metrics_(context_->metric_entity() ?
83
          std::make_shared<OutboundCallMetrics>(context_->metric_entity()) : nullptr),
84
      call_local_service_(remote == HostPort()),
85
      resolve_waiters_(30),
86
      resolved_ep_(std::chrono::milliseconds(
87
          resolve_cache_timeout.Initialized() ? resolve_cache_timeout.ToMilliseconds()
88
                                              : FLAGS_proxy_resolve_cache_ms)),
89
      latency_hist_(ScopedDnsTracker::active_metric()),
90
      // Use the context->num_connections_to_server() here as opposed to directly reading the
91
      // FLAGS_num_connections_to_server, because the flag value could have changed since then.
92
145k
      num_connections_to_server_(context_->num_connections_to_server()) {
93
57
  VLOG(1) << "Create proxy to " << remote << " with num_connections_to_server="
94
57
          << num_connections_to_server_;
95
145k
  if (context_->parent_mem_tracker()) {
96
144k
    mem_tracker_ = MemTracker::FindOrCreateTracker(
97
144k
        "Queueing", context_->parent_mem_tracker());
98
144k
  }
99
145k
}
100
101
31.2k
Proxy::~Proxy() {
102
31.2k
  const auto kTimeout = 5s;
103
31.2k
  const auto kMaxWaitTime = 100ms;
104
31.2k
  BackoffWaiter waiter(std::chrono::steady_clock::now() + kTimeout, kMaxWaitTime);
105
31.2k
  for (;;) {
106
31.2k
    auto expected = ResolveState::kIdle;
107
31.2k
    if (resolve_state_.compare_exchange_weak(
108
31.2k
        expected, ResolveState::kFinished, std::memory_order_acq_rel)) {
109
31.2k
      break;
110
31.2k
    }
111
18.4E
    if (!waiter.Wait()) {
112
0
      LOG(DFATAL) << "Timeout to wait resolve to complete";
113
0
      break;
114
0
    }
115
18.4E
  }
116
31.2k
}
117
118
void Proxy::AsyncRequest(const RemoteMethod* method,
119
                         std::shared_ptr<const OutboundMethodMetrics> method_metrics,
120
                         const google::protobuf::Message& req,
121
                         google::protobuf::Message* resp,
122
                         RpcController* controller,
123
24.5M
                         ResponseCallback callback) {
124
24.5M
  DoAsyncRequest(
125
24.5M
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller,
126
24.5M
      std::move(callback), false /* force_run_callback_on_reactor */);
127
24.5M
}
128
129
void Proxy::AsyncRequest(const RemoteMethod* method,
130
                         std::shared_ptr<const OutboundMethodMetrics> method_metrics,
131
                         const LightweightMessage& req,
132
                         LightweightMessage* resp,
133
                         RpcController* controller,
134
0
                         ResponseCallback callback) {
135
0
  DoAsyncRequest(
136
0
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller,
137
0
      std::move(callback), false /* force_run_callback_on_reactor */);
138
0
}
139
140
ThreadPool* Proxy::GetCallbackThreadPool(
141
19.9M
    bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode) {
142
19.9M
  if (force_run_callback_on_reactor) {
143
1.05M
    return nullptr;
144
1.05M
  }
145
18.9M
  switch (invoke_callback_mode) {
146
774k
    case InvokeCallbackMode::kReactorThread:
147
774k
      return nullptr;
148
0
      break;
149
7.67M
    case InvokeCallbackMode::kThreadPoolNormal:
150
7.67M
      return &context_->CallbackThreadPool(ServicePriority::kNormal);
151
10.4M
    case InvokeCallbackMode::kThreadPoolHigh:
152
10.4M
      return &context_->CallbackThreadPool(ServicePriority::kHigh);
153
0
  }
154
0
  FATAL_INVALID_ENUM_VALUE(InvokeCallbackMode, invoke_callback_mode);
155
0
}
156
157
void Proxy::DoAsyncRequest(const RemoteMethod* method,
158
                           std::shared_ptr<const OutboundMethodMetrics> method_metrics,
159
                           AnyMessageConstPtr req,
160
                           AnyMessagePtr resp,
161
                           RpcController* controller,
162
                           ResponseCallback callback,
163
25.5M
                           bool force_run_callback_on_reactor) {
164
1.95k
  CHECK(controller->call_.get() == nullptr) << "Controller should be reset";
165
166
25.5M
  controller->call_ =
167
25.5M
      call_local_service_ ?
168
5.58M
      std::make_shared<LocalOutboundCall>(method,
169
5.58M
                                          outbound_call_metrics_,
170
5.58M
                                          resp,
171
5.58M
                                          controller,
172
5.58M
                                          context_->rpc_metrics(),
173
5.58M
                                          std::move(callback)) :
174
19.9M
      std::make_shared<OutboundCall>(method,
175
19.9M
                                     outbound_call_metrics_,
176
19.9M
                                     std::move(method_metrics),
177
19.9M
                                     resp,
178
19.9M
                                     controller,
179
19.9M
                                     context_->rpc_metrics(),
180
19.9M
                                     std::move(callback),
181
19.9M
                                     GetCallbackThreadPool(
182
19.9M
                                         force_run_callback_on_reactor,
183
19.9M
                                         controller->invoke_callback_mode()));
184
25.5M
  auto call = controller->call_.get();
185
25.5M
  Status s = call->SetRequestParam(req, mem_tracker_);
186
25.5M
  if (PREDICT_FALSE(!s.ok())) {
187
    // Failed to serialize request: likely the request is missing a required
188
    // field.
189
651
    NotifyFailed(controller, s);
190
651
    return;
191
651
  }
192
193
25.5M
  if (controller->timeout().Initialized() && controller->timeout() > 3600s) {
194
0
    LOG(DFATAL) << "Too big timeout specified: " << controller->timeout();
195
0
  }
196
197
25.5M
  if (call_local_service_) {
198
    // For local call, the response message buffer is reused when an RPC call is retried. So clear
199
    // the buffer before calling the RPC method.
200
5.58M
    if (resp.is_lightweight()) {
201
0
      resp.lightweight()->Clear();
202
5.58M
    } else {
203
5.58M
      resp.protobuf()->Clear();
204
5.58M
    }
205
5.58M
    call->SetQueued();
206
5.58M
    call->SetSent();
207
    // If currrent thread is RPC worker thread, it is ok to call the handler in the current thread.
208
    // Otherwise, enqueue the call to be handled by the service's handler thread.
209
5.58M
    const shared_ptr<LocalYBInboundCall>& local_call =
210
5.58M
        static_cast<LocalOutboundCall*>(call)->CreateLocalInboundCall();
211
5.58M
    Queue queue(!controller->allow_local_calls_in_curr_thread() ||
212
4.45M
                !ThreadPool::IsCurrentThreadRpcWorker());
213
5.58M
    context_->Handle(local_call, queue);
214
19.9M
  } else {
215
19.9M
    auto ep = resolved_ep_.Load();
216
19.9M
    if (ep.address().is_unspecified()) {
217
238k
      CHECK(resolve_waiters_.push(controller));
218
238k
      Resolve();
219
19.7M
    } else {
220
19.7M
      QueueCall(controller, ep);
221
19.7M
    }
222
19.9M
  }
223
25.5M
}
224
225
239k
void Proxy::Resolve() {
226
239k
  auto expected = ResolveState::kIdle;
227
239k
  if (!resolve_state_.compare_exchange_strong(
228
3.93k
      expected, ResolveState::kResolving, std::memory_order_acq_rel)) {
229
3.93k
    return;
230
3.93k
  }
231
232
235k
  auto endpoint = resolved_ep_.Load();
233
235k
  if (!endpoint.address().is_unspecified()) {
234
6
    expected = ResolveState::kResolving;
235
6
    while (resolve_state_.compare_exchange_strong(
236
6
        expected, ResolveState::kNotifying, std::memory_order_acq_rel)) {
237
6
      RpcController* controller = nullptr;
238
12
      while (resolve_waiters_.pop(controller)) {
239
6
        QueueCall(controller, endpoint);
240
6
      }
241
6
      resolve_state_.store(ResolveState::kIdle, std::memory_order_release);
242
6
      if (resolve_waiters_.empty()) {
243
6
        break;
244
6
      }
245
0
      expected = ResolveState::kIdle;
246
0
    }
247
6
    return;
248
6
  }
249
250
235k
  const std::string kService = "";
251
252
235k
  auto address = TryFastResolve(remote_.host());
253
235k
  if (address) {
254
235k
    HandleResolve(*address);
255
235k
    return;
256
235k
  }
257
258
504
  auto latency_metric = std::make_shared<ScopedLatencyMetric>(latency_hist_, Auto::kFalse);
259
260
504
  context_->resolver().AsyncResolve(
261
504
      remote_.host(), [this, latency_metric = std::move(latency_metric)](
262
0
          const Result<IpAddress>& result) {
263
0
    latency_metric->Finish();
264
0
    HandleResolve(result);
265
0
  });
266
504
}
267
268
0
void Proxy::NotifyAllFailed(const Status& status) {
269
0
  RpcController* controller = nullptr;
270
0
  while (resolve_waiters_.pop(controller)) {
271
0
    NotifyFailed(controller, status);
272
0
  }
273
0
}
274
275
235k
void Proxy::HandleResolve(const Result<IpAddress>& result) {
276
235k
  auto expected = ResolveState::kResolving;
277
235k
  if (resolve_state_.compare_exchange_strong(
278
235k
      expected, ResolveState::kNotifying, std::memory_order_acq_rel)) {
279
235k
    ResolveDone(result);
280
235k
    resolve_state_.store(ResolveState::kIdle, std::memory_order_release);
281
235k
    if (!resolve_waiters_.empty()) {
282
0
      Resolve();
283
0
    }
284
235k
  }
285
235k
}
286
287
234k
void Proxy::ResolveDone(const Result<IpAddress>& result) {
288
234k
  if (!result.ok()) {
289
0
    LOG(WARNING) << "Resolve " << remote_.host() << " failed: " << result.status();
290
0
    NotifyAllFailed(result.status());
291
0
    return;
292
0
  }
293
294
234k
  Endpoint endpoint(*result, remote_.port());
295
234k
  resolved_ep_.Store(endpoint);
296
297
234k
  RpcController* controller = nullptr;
298
474k
  while (resolve_waiters_.pop(controller)) {
299
239k
    QueueCall(controller, endpoint);
300
239k
  }
301
234k
}
302
303
19.9M
void Proxy::QueueCall(RpcController* controller, const Endpoint& endpoint) {
304
19.9M
  uint8_t idx = num_calls_.fetch_add(1) % num_connections_to_server_;
305
19.9M
  ConnectionId conn_id(endpoint, idx, protocol_);
306
19.9M
  controller->call_->SetConnectionId(conn_id, &remote_.host());
307
19.9M
  context_->QueueOutboundCall(controller->call_);
308
19.9M
}
309
310
651
void Proxy::NotifyFailed(RpcController* controller, const Status& status) {
311
  // We should retain reference to call, so it would not be destroyed during SetFailed.
312
651
  auto call = controller->call_;
313
651
  call->SetFailed(status);
314
651
}
315
316
Status Proxy::DoSyncRequest(const RemoteMethod* method,
317
                            std::shared_ptr<const OutboundMethodMetrics> method_metrics,
318
                            AnyMessageConstPtr request,
319
                            AnyMessagePtr resp,
320
1.05M
                            RpcController* controller) {
321
1.05M
  CountDownLatch latch(1);
322
  // We want to execute this fast callback in reactor thread to avoid overhead on putting in
323
  // separate pool.
324
1.05M
  DoAsyncRequest(
325
1.05M
      method, std::move(method_metrics), request, DCHECK_NOTNULL(resp), controller,
326
1.05M
      latch.CountDownCallback(), true /* force_run_callback_on_reactor */);
327
1.05M
  latch.Wait();
328
1.05M
  return controller->status();
329
1.05M
}
330
331
Status Proxy::SyncRequest(const RemoteMethod* method,
332
                          std::shared_ptr<const OutboundMethodMetrics> method_metrics,
333
                          const google::protobuf::Message& req,
334
                          google::protobuf::Message* resp,
335
1.05M
                          RpcController* controller) {
336
1.05M
  return DoSyncRequest(
337
1.05M
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller);
338
1.05M
}
339
340
Status Proxy::SyncRequest(const RemoteMethod* method,
341
                          std::shared_ptr<const OutboundMethodMetrics> method_metrics,
342
                          const LightweightMessage& req,
343
                          LightweightMessage* resp,
344
0
                          RpcController* controller) {
345
0
  return DoSyncRequest(
346
0
      method, std::move(method_metrics), AnyMessageConstPtr(&req), AnyMessagePtr(resp), controller);
347
0
}
348
349
0
scoped_refptr<MetricEntity> Proxy::metric_entity() const {
350
0
  return context_->metric_entity();
351
0
}
352
353
ProxyPtr ProxyCache::GetProxy(
354
5.07M
    const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout) {
355
5.07M
  ProxyKey key(remote, protocol);
356
5.07M
  std::lock_guard<std::mutex> lock(proxy_mutex_);
357
5.07M
  auto it = proxies_.find(key);
358
5.07M
  if (it == proxies_.end()) {
359
144k
    it = proxies_.emplace(
360
144k
        key, std::make_unique<Proxy>(context_, remote, protocol, resolve_cache_timeout)).first;
361
144k
  }
362
5.07M
  return it->second;
363
5.07M
}
364
365
ProxyMetricsPtr ProxyCache::GetMetrics(
366
5.07M
    const std::string& service_name, ProxyMetricsFactory factory) {
367
5.07M
  std::lock_guard<std::mutex> lock(metrics_mutex_);
368
5.07M
  auto it = metrics_.find(service_name);
369
5.07M
  if (it != metrics_.end()) {
370
4.87M
    return it->second;
371
4.87M
  }
372
373
198k
  auto entity = context_->metric_entity();
374
190k
  auto metrics = entity ? factory(entity) : nullptr;
375
198k
  metrics_.emplace(service_name, metrics);
376
198k
  return metrics;
377
198k
}
378
379
ProxyBase::ProxyBase(const std::string& service_name, ProxyMetricsFactory metrics_factory,
380
                     ProxyCache* cache, const HostPort& remote,
381
                     const Protocol* protocol,
382
                     const MonoDelta& resolve_cache_timeout)
383
    : proxy_(cache->GetProxy(remote, protocol, resolve_cache_timeout)),
384
5.07M
      metrics_(cache->GetMetrics(service_name, metrics_factory)) {
385
5.07M
}
386
387
}  // namespace rpc
388
}  // namespace yb