YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/net/tunnel.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/util/net/tunnel.h"
15
16
#include <boost/asio/ip/tcp.hpp>
17
#include <boost/asio/strand.hpp>
18
#include <boost/asio/write.hpp>
19
#include <boost/optional.hpp>
20
21
#include "yb/util/logging.h"
22
#include "yb/util/size_literals.h"
23
#include "yb/util/status.h"
24
#include "yb/util/status_format.h"
25
26
using namespace std::placeholders;
27
28
namespace yb {
29
30
class TunnelConnection;
31
32
typedef std::shared_ptr<class TunnelConnection> TunnelConnectionPtr;
33
34
struct SemiTunnel {
35
  boost::asio::ip::tcp::socket* input;
36
  boost::asio::ip::tcp::socket* output;
37
  std::vector<char>* buffer;
38
  TunnelConnectionPtr self;
39
};
40
41
class TunnelConnection : public std::enable_shared_from_this<TunnelConnection> {
42
 public:
43
  explicit TunnelConnection(IoService* io_service, boost::asio::ip::tcp::socket* socket)
44
3.47k
      : inbound_socket_(std::move(*socket)), outbound_socket_(*io_service), strand_(*io_service) {
45
3.47k
  }
46
47
3.47k
  void Start(const Endpoint& dest) {
48
3.47k
    boost::system::error_code ec;
49
3.47k
    auto remote = inbound_socket_.remote_endpoint(ec);
50
3.47k
    auto inbound = inbound_socket_.local_endpoint(ec);
51
3.47k
    log_prefix_ = Format("$0 => $1 => $2: ", remote, inbound, dest);
52
3.47k
    outbound_socket_.async_connect(
53
3.47k
        dest,
54
3.47k
        strand_.wrap(std::bind(&TunnelConnection::HandleConnect, this, _1, shared_from_this())));
55
3.47k
  }
56
57
94
  void Shutdown() {
58
94
    strand_.dispatch([this, shared_self = shared_from_this()] {
59
94
      boost::system::error_code ec;
60
94
      inbound_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, ec);
61
94
      
LOG_IF_WITH_PREFIX88
(INFO, ec) << "Shutdown failed: " << ec.message()88
;
62
94
      outbound_socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, ec);
63
94
      
LOG_IF_WITH_PREFIX0
(INFO, ec) << "Shutdown failed: " << ec.message()0
;
64
94
    });
65
94
  }
66
67
 private:
68
3.47k
  void HandleConnect(const boost::system::error_code& ec, const TunnelConnectionPtr& self) {
69
3.47k
    if (ec) {
70
66
      LOG_WITH_PREFIX(WARNING) << "Connect failed: " << ec.message();
71
66
      return;
72
66
    }
73
74
3.41k
    if (VLOG_IS_ON(2)) {
75
0
      boost::system::error_code endpoint_ec;
76
0
      VLOG_WITH_PREFIX(2) << "Connected: " << outbound_socket_.local_endpoint(endpoint_ec);
77
0
    }
78
79
3.41k
    in2out_buffer_.resize(4_KB);
80
3.41k
    out2in_buffer_.resize(4_KB);
81
3.41k
    StartRead({&inbound_socket_, &outbound_socket_, &in2out_buffer_, self});
82
3.41k
    StartRead({&outbound_socket_, &inbound_socket_, &out2in_buffer_, self});
83
3.41k
  }
84
85
15.1k
  void StartRead(const SemiTunnel& semi_tunnel) {
86
15.1k
    semi_tunnel.input->async_read_some(
87
15.1k
        boost::asio::buffer(*semi_tunnel.buffer),
88
15.1k
        strand_.wrap(std::bind(&TunnelConnection::HandleRead, this, _1, _2, semi_tunnel)));
89
15.1k
  }
90
91
  void HandleRead(const boost::system::error_code& ec, size_t transferred,
92
10.5k
                  const SemiTunnel& semi_tunnel) {
93
10.5k
    if (ec) {
94
18.4E
      VLOG_WITH_PREFIX(1) << "Read failed: " << ec.message();
95
2.20k
      return;
96
2.20k
    }
97
98
8.34k
    async_write(
99
8.34k
        *semi_tunnel.output, boost::asio::buffer(semi_tunnel.buffer->data(), transferred),
100
8.34k
        strand_.wrap(std::bind(&TunnelConnection::HandleWrite, this, _1, _2, semi_tunnel)));
101
8.34k
  }
102
103
  void HandleWrite(const boost::system::error_code& ec, size_t transferred,
104
8.34k
                   const SemiTunnel& semi_tunnel) {
105
8.34k
    if (ec) {
106
18.4E
      VLOG_WITH_PREFIX(1) << "Write failed: " << ec.message();
107
31
      return;
108
31
    }
109
110
8.31k
    StartRead(semi_tunnel);
111
8.31k
  }
112
113
154
  const std::string& LogPrefix() const {
114
154
    return log_prefix_;
115
154
  }
116
117
  boost::asio::ip::tcp::socket inbound_socket_;
118
  boost::asio::ip::tcp::socket outbound_socket_;
119
  boost::asio::io_context::strand strand_;
120
  std::vector<char> in2out_buffer_;
121
  std::vector<char> out2in_buffer_;
122
  std::string log_prefix_;
123
};
124
125
class Tunnel::Impl {
126
 public:
127
  explicit Impl(boost::asio::io_context* io_context)
128
1.92k
      : io_context_(*io_context), strand_(*io_context) {}
129
130
230
  ~Impl() {
131
230
    LOG_IF(DFATAL, !closing_.load(std::memory_order_acquire))
132
54
        << "Tunnel shutdown has not been started";
133
230
  }
134
135
  CHECKED_STATUS Start(const Endpoint& local, const Endpoint& remote,
136
1.92k
                       AddressChecker address_checker) {
137
1.92k
    auto acceptor = std::make_shared<boost::asio::ip::tcp::acceptor>(io_context_);
138
1.92k
    boost::system::error_code ec;
139
140
1.92k
    LOG(INFO) << "Starting tunnel: " << local << " => " << remote;
141
142
1.92k
    acceptor->open(local.protocol(), ec);
143
1.92k
    if (ec) {
144
1
      return STATUS_FORMAT(NetworkError, "Open failed: $0", ec.message());
145
1
    }
146
1.92k
    acceptor->set_option(boost::asio::socket_base::reuse_address(true), ec);
147
1.92k
    if (ec) {
148
0
      return STATUS_FORMAT(NetworkError, "Reuse address failed: $0", ec.message());
149
0
    }
150
1.92k
    acceptor->bind(local, ec);
151
1.92k
    if (ec) {
152
0
      return STATUS_FORMAT(NetworkError, "Bind failed: $0", ec.message());
153
0
    }
154
1.92k
    acceptor->listen(boost::asio::ip::tcp::socket::max_listen_connections, ec);
155
1.92k
    if (ec) {
156
0
      return STATUS_FORMAT(NetworkError, "Listen failed: $0", ec.message());
157
0
    }
158
1.92k
    strand_.dispatch([
159
1.92k
        this, acceptor, local, remote, address_checker]() {
160
1.92k
      local_ = local;
161
1.92k
      remote_ = remote;
162
1.92k
      address_checker_ = address_checker;
163
1.92k
      acceptor_.emplace(std::move(*acceptor));
164
1.92k
      StartAccept();
165
1.92k
    });
166
1.92k
    return Status::OK();
167
1.92k
  }
168
169
195
  void Shutdown() {
170
195
    closing_.store(true, std::memory_order_release);
171
195
    strand_.dispatch([this] {
172
194
      LOG(INFO) << "Shutdown tunnel: " << local_ << " => " << remote_;
173
194
      if (acceptor_) {
174
193
        boost::system::error_code ec;
175
193
        acceptor_->cancel(ec);
176
193
        LOG_IF
(WARNING, ec) << "Cancel failed: " << ec.message()0
;
177
193
        acceptor_->close(ec);
178
193
        LOG_IF
(WARNING, ec) << "Close failed: " << ec.message()0
;
179
193
      }
180
181
194
      for (auto& connection : connections_) {
182
100
        auto shared_connection = connection.lock();
183
100
        if (shared_connection) {
184
94
          shared_connection->Shutdown();
185
94
        }
186
100
      }
187
194
      connections_.clear();
188
194
    });
189
195
  }
190
191
 private:
192
5.40k
  void StartAccept() {
193
5.40k
    socket_.emplace(io_context_);
194
5.40k
    acceptor_->async_accept(*socket_, strand_.wrap(std::bind(&Impl::HandleAccept, this, _1)));
195
5.40k
  }
196
197
3.73k
  void HandleAccept(const boost::system::error_code& ec) {
198
3.73k
    if (ec) {
199
254
      LOG_IF(WARNING, ec != boost::asio::error::operation_aborted)
200
64
          << "Accept failed: " << ec.message();
201
254
      return;
202
254
    }
203
204
3.47k
    if (!CheckAddress()) {
205
0
      boost::system::error_code ec;
206
0
      socket_->close(ec);
207
0
      LOG_IF(WARNING, ec) << "Close failed: " << ec.message();
208
0
      StartAccept();
209
0
      return;
210
0
    }
211
212
3.47k
    auto connection = std::make_shared<TunnelConnection>(&io_context_, socket_.get_ptr());
213
3.47k
    connection->Start(remote_);
214
3.47k
    bool found = false;
215
7.00k
    for (auto& weak_connection : connections_) {
216
7.00k
      auto shared_connection = weak_connection.lock();
217
7.00k
      if (!shared_connection) {
218
13
        found = true;
219
13
        weak_connection = connection;
220
13
        break;
221
13
      }
222
7.00k
    }
223
3.47k
    if (!found) {
224
3.46k
      connections_.push_back(connection);
225
3.46k
    }
226
3.47k
    StartAccept();
227
3.47k
  }
228
229
3.47k
  bool CheckAddress() {
230
3.47k
    if (!address_checker_) {
231
3.47k
      return true;
232
3.47k
    }
233
234
0
    boost::system::error_code ec;
235
0
    auto endpoint = socket_->remote_endpoint(ec);
236
237
0
    if (ec) {
238
0
      LOG(WARNING) << "Cannot get remote endpoint: " << ec.message();
239
0
      return true;
240
0
    }
241
242
0
    return address_checker_(endpoint.address());
243
0
  }
244
245
  boost::asio::io_context& io_context_;
246
  boost::asio::io_context::strand strand_;
247
  AddressChecker address_checker_;
248
  Endpoint local_;
249
  Endpoint remote_;
250
  boost::optional<boost::asio::ip::tcp::acceptor> acceptor_;
251
  boost::optional<boost::asio::ip::tcp::socket> socket_;
252
  std::vector<std::weak_ptr<TunnelConnection>> connections_;
253
  std::atomic<bool> closing_{false};
254
};
255
256
1.92k
Tunnel::Tunnel(boost::asio::io_context* io_context) : impl_(new Impl(io_context)) {
257
1.92k
}
258
259
230
Tunnel::~Tunnel() {
260
230
}
261
262
Status Tunnel::Start(const Endpoint& local, const Endpoint& remote,
263
1.92k
                     AddressChecker address_checker) {
264
1.92k
  return impl_->Start(local, remote, std::move(address_checker));
265
1.92k
}
266
267
195
void Tunnel::Shutdown() {
268
195
  impl_->Shutdown();
269
195
}
270
271
} // namespace yb