YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/connection.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_RPC_CONNECTION_H_
34
#define YB_RPC_CONNECTION_H_
35
36
#include <stdint.h>
37
38
#include <atomic>
39
#include <cstdint>
40
#include <limits>
41
#include <memory>
42
#include <queue>
43
#include <string>
44
#include <thread>
45
#include <type_traits>
46
#include <unordered_map>
47
#include <vector>
48
49
#include <boost/container/small_vector.hpp>
50
#include <boost/version.hpp>
51
#include <ev++.h>
52
#include <gflags/gflags_declare.h>
53
#include <glog/logging.h>
54
55
#include "yb/gutil/ref_counted.h"
56
57
#include "yb/rpc/rpc_fwd.h"
58
#include "yb/rpc/stream.h"
59
60
#include "yb/util/metrics_fwd.h"
61
#include "yb/util/enums.h"
62
#include "yb/util/ev_util.h"
63
#include "yb/util/locks.h"
64
#include "yb/util/monotime.h"
65
#include "yb/util/net/net_fwd.h"
66
#include "yb/util/net/sockaddr.h"
67
#include "yb/util/net/socket.h"
68
#include "yb/util/status.h"
69
#include "yb/util/strongly_typed_bool.h"
70
71
namespace yb {
72
namespace rpc {
73
74
YB_DEFINE_ENUM(ConnectionDirection, (CLIENT)(SERVER));
75
76
typedef boost::container::small_vector_base<OutboundDataPtr> OutboundDataBatch;
77
78
//
79
// A connection between an endpoint and us.
80
//
81
// Inbound connections are created by AcceptorPools, which eventually schedule
82
// RegisterConnection() to be called from the reactor thread.
83
//
84
// Outbound connections are created by the Reactor thread in order to service
85
// outbound calls.
86
//
87
// Once a Connection is created, it can be used both for sending messages and
88
// receiving them, but any given connection is explicitly a client or server.
89
// If a pair of servers are making bidirectional RPCs, they will use two separate
90
// TCP connections (and Connection objects).
91
//
92
// This class is not fully thread-safe. It is accessed only from the context of a
93
// single Reactor except where otherwise specified.
94
//
95
class Connection final : public StreamContext, public std::enable_shared_from_this<Connection> {
96
 public:
97
  typedef ConnectionDirection Direction;
98
99
  // Create a new Connection.
100
  // reactor: the reactor that owns us.
101
  // remote: the address of the remote end
102
  // socket: the socket to take ownership of.
103
  // direction: whether we are the client or server side
104
  // context: context for this connection. Context is used by connection to handle
105
  // protocol specific actions, such as parsing of incoming data into calls.
106
  Connection(Reactor* reactor,
107
             std::unique_ptr<Stream> stream,
108
             Direction direction,
109
             RpcMetrics* rpc_metrics,
110
             std::unique_ptr<ConnectionContext> context);
111
112
  ~Connection();
113
114
58.9M
  CoarseTimePoint last_activity_time() const {
115
58.9M
    return last_activity_time_;
116
58.9M
  }
117
118
  void UpdateLastActivity() override;
119
120
  // Returns true if we are not in the process of receiving or sending a
121
  // message, and we have no outstanding calls.
122
  // When reason_not_idle is specified it contains reason why this connection is not idle.
123
  bool Idle(std::string* reason_not_idle = nullptr) const;
124
125
  // A human-readable reason why the connection is not idle. Empty string if connection is idle.
126
  std::string ReasonNotIdle() const;
127
128
  // Fail any calls which are currently queued or awaiting response.
129
  // Prohibits any future calls (they will be failed immediately with this
130
  // same Status).
131
  void Shutdown(const Status& status);
132
133
  // Queue a new call to be made. If the queueing fails, the call will be
134
  // marked failed.
135
  // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
136
  // This may be called from a non-reactor thread.
137
  void QueueOutboundCall(const OutboundCallPtr& call);
138
139
  // The address of the remote end of the connection.
140
  const Endpoint& remote() const;
141
142
  const Protocol* protocol() const;
143
144
  // The address of the local end of the connection.
145
  const Endpoint& local() const;
146
147
  void HandleTimeout(ev::timer& watcher, int revents); // NOLINT
148
149
  // Safe to be called from other threads.
150
  std::string ToString() const;
151
152
1.33M
  Direction direction() const { return direction_; }
153
154
  // Queue a call response back to the client on the server side.
155
  //
156
  // This is usually called by the IPC worker thread when the response is set, but in some
157
  // circumstances may also be called by the reactor thread (e.g. if the service has shut down).
158
  // In addition to this, its also called for processing events generated by the server.
159
  void QueueOutboundData(OutboundDataPtr outbound_data);
160
161
  void QueueOutboundDataBatch(const OutboundDataBatch& batch);
162
163
187M
  Reactor* reactor() const { return reactor_; }
164
165
  CHECKED_STATUS DumpPB(const DumpRunningRpcsRequestPB& req,
166
                        RpcConnectionPB* resp);
167
168
  // Do appropriate actions after adding outbound call.
169
  void OutboundQueued();
170
171
  // An incoming packet has completed on the client side. This parses the
172
  // call response, looks up the CallAwaitingResponse, and calls the
173
  // client callback.
174
  CHECKED_STATUS HandleCallResponse(CallData* call_data);
175
176
38.5M
  ConnectionContext& context() { return *context_; }
177
178
  void CallSent(OutboundCallPtr call);
179
180
  CHECKED_STATUS Start(ev::loop_ref* loop);
181
182
  // Try to parse already received data.
183
  void ParseReceived();
184
185
  void Close();
186
187
24.2M
  RpcMetrics& rpc_metrics() {
188
24.2M
    return *rpc_metrics_;
189
24.2M
  }
190
191
 private:
192
  CHECKED_STATUS DoWrite();
193
194
  // Does actual outbound data queueing. Invoked in appropriate reactor thread.
195
  size_t DoQueueOutboundData(OutboundDataPtr call, bool batch);
196
197
  void ProcessResponseQueue();
198
199
  // Stream context implementation
200
  void UpdateLastRead() override;
201
202
  void UpdateLastWrite() override;
203
204
  void Transferred(const OutboundDataPtr& data, const Status& status) override;
205
  void Destroy(const Status& status) override;
206
  Result<size_t> ProcessReceived(ReadBufferFull read_buffer_full) override;
207
  void Connected() override;
208
  StreamReadBuffer& ReadBuffer() override;
209
210
  std::string LogPrefix() const;
211
212
  // The reactor thread that created this connection.
213
  Reactor* const reactor_;
214
215
  std::unique_ptr<Stream> stream_;
216
217
  // whether we are client or server
218
  Direction direction_;
219
220
  // The last time we read or wrote from the socket.
221
  CoarseTimePoint last_activity_time_;
222
223
  // Calls which have been sent and are now waiting for a response.
224
  std::unordered_map<int32_t, OutboundCallPtr> awaiting_response_;
225
226
  // Starts as Status::OK, gets set to a shutdown status upon Shutdown(). Guarded by
227
  // outbound_data_queue_lock_.
228
  Status shutdown_status_;
229
230
  // We instantiate and store this metric instance at the level of connection, but not at the level
231
  // of the class emitting metrics (OutboundTransfer) as recommended in metrics.h. This is on
232
  // purpose, because OutboundTransfer is instantiated each time we need to send payload over a
233
  // connection and creating a metric instance each time could be a performance hit, because
234
  // it involves spin lock and search in a metrics map. Therefore we prepare metric instances
235
  // at connection level.
236
  scoped_refptr<Histogram> handler_latency_outbound_transfer_;
237
238
  struct ExpirationEntry {
239
    CoarseTimePoint time;
240
    std::weak_ptr<OutboundCall> call;
241
    // See Stream::Send for details.
242
    size_t handle;
243
  };
244
245
  struct CompareExpiration {
246
194M
    bool operator()(const ExpirationEntry& lhs, const ExpirationEntry& rhs) const {
247
194M
      return rhs.time < lhs.time;
248
194M
    }
249
  };
250
251
  std::priority_queue<ExpirationEntry,
252
                      std::vector<ExpirationEntry>,
253
                      CompareExpiration> expiration_queue_;
254
255
  EvTimerHolder timer_;
256
257
  simple_spinlock outbound_data_queue_lock_;
258
259
  // Responses we are going to process.
260
  std::vector<OutboundDataPtr> outbound_data_to_process_;
261
262
  // Responses that are currently being processed.
263
  // It could be in function variable, but declared as member for optimization.
264
  std::vector<OutboundDataPtr> outbound_data_being_processed_;
265
266
  std::shared_ptr<ReactorTask> process_response_queue_task_;
267
268
  // RPC related metrics.
269
  RpcMetrics* rpc_metrics_;
270
271
  // Connection is responsible for sending and receiving bytes.
272
  // Context is responsible for what to do with them.
273
  std::unique_ptr<ConnectionContext> context_;
274
275
  std::atomic<uint64_t> responded_call_count_{0};
276
};
277
278
}  // namespace rpc
279
}  // namespace yb
280
281
#endif  // YB_RPC_CONNECTION_H_