YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/proxy.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_RPC_PROXY_H_
34
#define YB_RPC_PROXY_H_
35
36
#include <atomic>
37
#include <memory>
38
#include <mutex>
39
#include <string>
40
41
#include <boost/asio/ip/tcp.hpp>
42
#include <boost/lockfree/queue.hpp>
43
44
#include "yb/gutil/atomicops.h"
45
#include "yb/rpc/rpc_fwd.h"
46
#include "yb/rpc/growable_buffer.h"
47
#include "yb/rpc/proxy_base.h"
48
#include "yb/rpc/rpc_controller.h"
49
#include "yb/rpc/rpc_header.pb.h"
50
#include "yb/gutil/thread_annotations.h"
51
52
#include "yb/util/concurrent_pod.h"
53
#include "yb/util/net/net_fwd.h"
54
#include "yb/util/net/net_util.h"
55
#include "yb/util/metrics_fwd.h"
56
#include "yb/util/monotime.h"
57
58
namespace google {
59
namespace protobuf {
60
class Message;
61
}  // namespace protobuf
62
}  // namespace google
63
64
namespace yb {
65
66
class MemTracker;
67
68
namespace rpc {
69
70
YB_DEFINE_ENUM(ResolveState, (kIdle)(kResolving)(kNotifying)(kFinished));
71
72
// Interface to send calls to a remote or local service.
73
//
74
// Proxy objects do not map one-to-one with TCP connections.  The underlying TCP
75
// connection is not established until the first call, and may be torn down and
76
// re-established as necessary by the messenger. Additionally, the messenger is
77
// likely to multiplex many Proxy objects on the same connection. Or, split the
78
// requests sent over a single proxy across different connections to the server.
79
//
80
// When remote endpoint is blank (i.e. HostPort()), the proxy will attempt to
81
// call the service locally in the messenger instead.
82
//
83
// Proxy objects are thread-safe after initialization only.
84
// Setters on the Proxy are not thread-safe, and calling a setter after any RPC
85
// request has started will cause a fatal error.
86
//
87
// After initialization, multiple threads may make calls using the same proxy object.
88
class Proxy {
89
 public:
90
  Proxy(ProxyContext* context,
91
        const HostPort& remote,
92
        const Protocol* protocol = nullptr,
93
        const MonoDelta& resolve_cache_timeout = MonoDelta());
94
  ~Proxy();
95
96
  Proxy(const Proxy&) = delete;
97
  void operator=(const Proxy&) = delete;
98
99
  // Call a remote method asynchronously.
100
  //
101
  // Typically, users will not call this directly, but rather through
102
  // a generated Proxy subclass.
103
  //
104
  // method: the method name to invoke on the remote server.
105
  //
106
  // req:  the request protobuf. This will be serialized immediately,
107
  //       so the caller may free or otherwise mutate 'req' safely.
108
  //
109
  // resp: the response protobuf. This protobuf will be mutated upon
110
  //       completion of the call. The RPC system does not take ownership
111
  //       of this storage.
112
  //
113
  // NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation
114
  // class corresponding to the parameter and result types of the service method
115
  // defined in the service's '.proto' file.
116
  //
117
  // controller: the RpcController to associate with this call. Each call
118
  //             must use a unique controller object. Does not take ownership.
119
  //
120
  // callback: the callback to invoke upon call completion. This callback may be invoked before
121
  // AsyncRequest() itself returns, or any time thereafter. It may be invoked either on the
122
  // caller's thread or asynchronously. RpcController::set_invoke_callback_mode could be used to
123
  // specify on which thread to invoke callback in case of asynchronous invocation.
124
  void AsyncRequest(const RemoteMethod* method,
125
                    std::shared_ptr<const OutboundMethodMetrics> method_metrics,
126
                    const google::protobuf::Message& req,
127
                    google::protobuf::Message* resp,
128
                    RpcController* controller,
129
                    ResponseCallback callback);
130
131
  void AsyncRequest(const RemoteMethod* method,
132
                    std::shared_ptr<const OutboundMethodMetrics> method_metrics,
133
                    const LightweightMessage& req,
134
                    LightweightMessage* resp,
135
                    RpcController* controller,
136
                    ResponseCallback callback);
137
138
  // The same as AsyncRequest(), except that the call blocks until the call
139
  // finishes. If the call fails, returns a non-OK result.
140
  CHECKED_STATUS SyncRequest(const RemoteMethod* method,
141
                             std::shared_ptr<const OutboundMethodMetrics> method_metrics,
142
                             const google::protobuf::Message& req,
143
                             google::protobuf::Message* resp,
144
                             RpcController* controller);
145
146
  CHECKED_STATUS SyncRequest(const RemoteMethod* method,
147
                             std::shared_ptr<const OutboundMethodMetrics> method_metrics,
148
                             const LightweightMessage& request,
149
                             LightweightMessage* resp,
150
                             RpcController* controller);
151
152
  // Is the service local?
153
0
  bool IsServiceLocal() const { return call_local_service_; }
154
155
  scoped_refptr<MetricEntity> metric_entity() const;
156
157
 private:
158
  void Resolve();
159
  void HandleResolve(const Result<IpAddress>& result);
160
  void ResolveDone(const Result<IpAddress>& result);
161
  void NotifyAllFailed(const Status& status);
162
  void QueueCall(RpcController* controller, const Endpoint& endpoint);
163
  ThreadPool *GetCallbackThreadPool(
164
      bool force_run_callback_on_reactor, InvokeCallbackMode invoke_callback_mode);
165
166
  // Implements logic for AsyncRequest function, but allows to force to run callback on
167
  // reactor thread. This is an optimisation used by SyncRequest function.
168
  void DoAsyncRequest(const RemoteMethod* method,
169
                      std::shared_ptr<const OutboundMethodMetrics> method_metrics,
170
                      AnyMessageConstPtr req,
171
                      AnyMessagePtr resp,
172
                      RpcController* controller,
173
                      ResponseCallback callback,
174
                      bool force_run_callback_on_reactor);
175
176
  CHECKED_STATUS DoSyncRequest(const RemoteMethod* method,
177
                               std::shared_ptr<const OutboundMethodMetrics> method_metrics,
178
                               AnyMessageConstPtr req,
179
                               AnyMessagePtr resp,
180
                               RpcController* controller);
181
182
  static void NotifyFailed(RpcController* controller, const Status& status);
183
184
  ProxyContext* context_;
185
  HostPort remote_;
186
  const Protocol* const protocol_;
187
  mutable std::atomic<size_t> num_calls_{0};
188
  std::shared_ptr<OutboundCallMetrics> outbound_call_metrics_;
189
  const bool call_local_service_;
190
191
  std::atomic<ResolveState> resolve_state_{ResolveState::kIdle};
192
  boost::lockfree::queue<RpcController*> resolve_waiters_;
193
  ConcurrentPod<Endpoint> resolved_ep_;
194
195
  scoped_refptr<Histogram> latency_hist_;
196
197
  // Number of outbound connections to create per each destination server address.
198
  int num_connections_to_server_;
199
200
  std::shared_ptr<MemTracker> mem_tracker_;
201
};
202
203
class ProxyCache {
204
 public:
205
  explicit ProxyCache(ProxyContext* context)
206
80.0k
      : context_(context) {}
207
208
  ProxyPtr GetProxy(
209
      const HostPort& remote, const Protocol* protocol, const MonoDelta& resolve_cache_timeout);
210
211
  ProxyMetricsPtr GetMetrics(const std::string& service_name, ProxyMetricsFactory factory);
212
213
 private:
214
  typedef std::pair<HostPort, const Protocol*> ProxyKey;
215
216
  struct ProxyKeyHash {
217
36.9M
    size_t operator()(const ProxyKey& key) const {
218
36.9M
      size_t result = 0;
219
36.9M
      boost::hash_combine(result, HostPortHash()(key.first));
220
36.9M
      boost::hash_combine(result, key.second);
221
36.9M
      return result;
222
36.9M
    }
223
  };
224
225
  ProxyContext* context_;
226
227
  std::mutex proxy_mutex_;
228
  std::unordered_map<ProxyKey, ProxyPtr, ProxyKeyHash> proxies_ GUARDED_BY(proxy_mutex_);
229
230
  std::mutex metrics_mutex_;
231
  std::unordered_map<std::string , ProxyMetricsPtr> metrics_ GUARDED_BY(metrics_mutex_);
232
};
233
234
}  // namespace rpc
235
}  // namespace yb
236
237
#endif  // YB_RPC_PROXY_H_