/Users/deen/code/yugabyte-db/src/yb/util/net/socket.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/util/net/socket.h" |
34 | | |
35 | | #include <netinet/in.h> |
36 | | #include <sys/types.h> |
37 | | |
38 | | #include <limits> |
39 | | #include <string> |
40 | | |
41 | | #include <glog/logging.h> |
42 | | |
43 | | #include "yb/gutil/casts.h" |
44 | | #include "yb/gutil/stringprintf.h" |
45 | | |
46 | | #include "yb/util/debug/trace_event.h" |
47 | | #include "yb/util/errno.h" |
48 | | #include "yb/util/flag_tags.h" |
49 | | #include "yb/util/monotime.h" |
50 | | #include "yb/util/net/net_util.h" |
51 | | #include "yb/util/net/sockaddr.h" |
52 | | #include "yb/util/random.h" |
53 | | #include "yb/util/random_util.h" |
54 | | #include "yb/util/result.h" |
55 | | #include "yb/util/status_format.h" |
56 | | |
57 | | DEFINE_string(local_ip_for_outbound_sockets, "", |
58 | | "IP to bind to when making outgoing socket connections. " |
59 | | "This must be an IP address of the form A.B.C.D, not a hostname. " |
60 | | "Advanced parameter, subject to change."); |
61 | | TAG_FLAG(local_ip_for_outbound_sockets, experimental); |
62 | | |
63 | | DEFINE_bool(socket_inject_short_recvs, false, |
64 | | "Inject short recv() responses which return less data than " |
65 | | "requested"); |
66 | | TAG_FLAG(socket_inject_short_recvs, hidden); |
67 | | TAG_FLAG(socket_inject_short_recvs, unsafe); |
68 | | |
69 | | namespace yb { |
70 | | |
71 | 159M | size_t IoVecsFullSize(const IoVecs& io_vecs) { |
72 | 159M | return std::accumulate(io_vecs.begin(), io_vecs.end(), 0ULL, [](size_t p, const iovec& v) { |
73 | 159M | return p + v.iov_len; |
74 | 159M | }); |
75 | 159M | } |
76 | | |
77 | 163M | void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, std::vector<char>* result) { |
78 | 163M | result->clear(); |
79 | 163M | result->reserve(end - begin); |
80 | 163M | for (const auto& io_vec : io_vecs) { |
81 | 163M | if (begin == end) { |
82 | 122 | break; |
83 | 122 | } |
84 | 163M | if (163M io_vec.iov_len > begin163M ) { |
85 | 163M | size_t clen = std::min(io_vec.iov_len, end) - begin; |
86 | 163M | auto start = IoVecBegin(io_vec) + begin; |
87 | 163M | result->insert(result->end(), start, start + clen); |
88 | 163M | begin += clen; |
89 | 163M | } |
90 | 163M | begin -= io_vec.iov_len; |
91 | 163M | end -= io_vec.iov_len; |
92 | 163M | } |
93 | 163M | } |
94 | | |
95 | 152M | void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, char* result) { |
96 | 152M | for (const auto& io_vec : io_vecs) { |
97 | 152M | if (begin == end) { |
98 | 0 | break; |
99 | 0 | } |
100 | 152M | if (152M io_vec.iov_len > begin152M ) { |
101 | 152M | size_t clen = std::min(io_vec.iov_len, end) - begin; |
102 | 152M | auto start = IoVecBegin(io_vec) + begin; |
103 | 152M | memcpy(result, start, clen); |
104 | 152M | result += clen; |
105 | 152M | begin += clen; |
106 | 152M | } |
107 | 152M | begin -= io_vec.iov_len; |
108 | 152M | end -= io_vec.iov_len; |
109 | 152M | } |
110 | 152M | } |
111 | | |
112 | | Socket::Socket() |
113 | 4.37M | : fd_(-1) { |
114 | 4.37M | } |
115 | | |
116 | | Socket::Socket(int fd) |
117 | 0 | : fd_(fd) { |
118 | 0 | } |
119 | | |
120 | 4.37M | void Socket::Reset(int fd) { |
121 | 4.37M | WARN_NOT_OK(Close(), "Close failed"); |
122 | 4.37M | fd_ = fd; |
123 | 4.37M | } |
124 | | |
125 | 10.0M | int Socket::Release() { |
126 | 10.0M | int fd = fd_; |
127 | 10.0M | fd_ = -1; |
128 | 10.0M | return fd; |
129 | 10.0M | } |
130 | | |
131 | 13.8M | Socket::~Socket() { |
132 | 13.8M | auto status = Close(); |
133 | 13.8M | if (!status.ok()) { |
134 | 0 | LOG(WARNING) << "Failed to close socket: " << status.ToString(); |
135 | 0 | } |
136 | 13.8M | } |
137 | | |
138 | 24.5M | Status Socket::Close() { |
139 | 24.5M | if (fd_ < 0) |
140 | 21.3M | return Status::OK(); |
141 | 3.28M | int fd = fd_; |
142 | 3.28M | fd_ = -1; |
143 | 3.28M | if (::close(fd) < 0) { |
144 | 0 | return STATUS(NetworkError, "Close error", Errno(errno)); |
145 | 0 | } |
146 | 3.28M | return Status::OK(); |
147 | 3.28M | } |
148 | | |
149 | 7.44k | Status Socket::Shutdown(bool shut_read, bool shut_write) { |
150 | 7.44k | DCHECK_GE(fd_, 0); |
151 | 7.44k | int flags = 0; |
152 | 7.44k | if (shut_read && shut_write7.41k ) { |
153 | 7.41k | flags |= SHUT_RDWR; |
154 | 7.41k | } else if (33 shut_read33 ) { |
155 | 0 | flags |= SHUT_RD; |
156 | 33 | } else if (shut_write) { |
157 | 0 | flags |= SHUT_WR; |
158 | 0 | } |
159 | 7.44k | if (::shutdown(fd_, flags) < 0) { |
160 | 5.23k | return STATUS(NetworkError, "Shutdown error", Errno(errno)); |
161 | 5.23k | } |
162 | 2.21k | return Status::OK(); |
163 | 7.44k | } |
164 | | |
165 | 4.93M | int Socket::GetFd() const { |
166 | 4.93M | return fd_; |
167 | 4.93M | } |
168 | | |
169 | 165M | bool IsTemporarySocketError(int err) { |
170 | 165M | return err == EAGAIN || err == EWOULDBLOCK5.68M || err == EINTR5.68M || err == EINPROGRESS5.68M ; |
171 | 165M | } |
172 | | |
173 | | #if defined(__linux__) |
174 | | |
175 | | Status Socket::Init(int flags) { |
176 | | auto family = flags & FLAG_IPV6 ? AF_INET6 : AF_INET; |
177 | | int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0; |
178 | | Reset(::socket(family, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0)); |
179 | | if (fd_ < 0) { |
180 | | return STATUS(NetworkError, "Error opening socket", Errno(errno)); |
181 | | } |
182 | | |
183 | | return Status::OK(); |
184 | | } |
185 | | |
186 | | #else |
187 | | |
188 | 3.19M | Status Socket::Init(int flags) { |
189 | 3.19M | Reset(::socket(flags & FLAG_IPV6 ? AF_INET6 : AF_INET, SOCK_STREAM, 0)); |
190 | 3.19M | if (fd_ < 0) { |
191 | 4.78k | return STATUS(NetworkError, "Error opening socket", Errno(errno)); |
192 | 4.78k | } |
193 | 3.19M | RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING)); |
194 | 3.19M | RETURN_NOT_OK(SetCloseOnExec()); |
195 | | |
196 | | // Disable SIGPIPE. |
197 | 3.19M | int set = 1; |
198 | 3.19M | if (setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)) == -1) { |
199 | 0 | return STATUS(NetworkError, "Failed to set SO_NOSIGPIPE", Errno(errno)); |
200 | 0 | } |
201 | | |
202 | 3.19M | return Status::OK(); |
203 | 3.19M | } |
204 | | |
205 | | #endif // defined(__linux__) |
206 | | |
207 | 7.50M | Status Socket::SetNoDelay(bool enabled) { |
208 | 18.4E | int flag = enabled7.50M ? 17.50M : 0; |
209 | 7.50M | if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) { |
210 | 1 | return STATUS(NetworkError, "Failed to set TCP_NODELAY", Errno(errno)); |
211 | 1 | } |
212 | 7.50M | return Status::OK(); |
213 | 7.50M | } |
214 | | |
215 | 3.84M | Status Socket::SetNonBlocking(bool enabled) { |
216 | 3.84M | int curflags = ::fcntl(fd_, F_GETFL, 0); |
217 | 3.84M | if (curflags == -1) { |
218 | 0 | return STATUS( |
219 | 0 | NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_), |
220 | 0 | Errno(errno)); |
221 | 0 | } |
222 | 3.84M | int newflags = (enabled) ? (curflags | O_NONBLOCK)3.79M : (curflags & ~O_NONBLOCK)52.8k ; |
223 | 3.84M | if (::fcntl(fd_, F_SETFL, newflags) == -1) { |
224 | 0 | if (enabled) { |
225 | 0 | return STATUS( |
226 | 0 | NetworkError, StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_), Errno(errno)); |
227 | 0 | } else { |
228 | 0 | return STATUS( |
229 | 0 | NetworkError, StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_), Errno(errno)); |
230 | 0 | } |
231 | 0 | } |
232 | 3.84M | return Status::OK(); |
233 | 3.84M | } |
234 | | |
235 | 0 | Status Socket::IsNonBlocking(bool* is_nonblock) const { |
236 | 0 | int curflags = ::fcntl(fd_, F_GETFL, 0); |
237 | 0 | if (curflags == -1) { |
238 | 0 | return STATUS( |
239 | 0 | NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_), Errno(errno)); |
240 | 0 | } |
241 | 0 | *is_nonblock = ((curflags & O_NONBLOCK) != 0); |
242 | 0 | return Status::OK(); |
243 | 0 | } |
244 | | |
245 | 3.82M | Status Socket::SetCloseOnExec() { |
246 | 3.82M | int curflags = fcntl(fd_, F_GETFD, 0); |
247 | 3.82M | if (curflags == -1) { |
248 | 0 | Reset(-1); |
249 | 0 | return STATUS(NetworkError, "fcntl(F_GETFD) error", Errno(errno)); |
250 | 0 | } |
251 | 3.82M | if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) { |
252 | 0 | Reset(-1); |
253 | 0 | return STATUS(NetworkError, "fcntl(F_SETFD) error", Errno(errno)); |
254 | 0 | } |
255 | 3.82M | return Status::OK(); |
256 | 3.82M | } |
257 | | |
258 | 3.74M | Status Socket::SetSendTimeout(const MonoDelta& timeout) { |
259 | 3.74M | return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout); |
260 | 3.74M | } |
261 | | |
262 | 3.75M | Status Socket::SetRecvTimeout(const MonoDelta& timeout) { |
263 | 3.75M | return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout); |
264 | 3.75M | } |
265 | | |
266 | 2.47M | Status Socket::SetReuseAddr(bool flag) { |
267 | 2.47M | int int_flag = flag ? 12.47M : 0798 ; |
268 | 2.47M | if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &int_flag, sizeof(int_flag)) == -1) { |
269 | 0 | return STATUS(NetworkError, "Failed to set SO_REUSEADDR", Errno(errno)); |
270 | 0 | } |
271 | 2.47M | return Status::OK(); |
272 | 2.47M | } |
273 | | |
274 | | Status Socket::BindAndListen(const Endpoint& sockaddr, |
275 | 4 | int listenQueueSize) { |
276 | 4 | RETURN_NOT_OK(SetReuseAddr(true)); |
277 | 4 | RETURN_NOT_OK(Bind(sockaddr)); |
278 | 4 | RETURN_NOT_OK(Listen(listenQueueSize)); |
279 | 4 | return Status::OK(); |
280 | 4 | } |
281 | | |
282 | 25.9k | Status Socket::Listen(int listen_queue_size) { |
283 | 25.9k | if (listen(fd_, listen_queue_size)) { |
284 | 0 | return STATUS(NetworkError, "listen() error", Errno(errno)); |
285 | 0 | } |
286 | 25.9k | return Status::OK(); |
287 | 25.9k | } |
288 | | |
289 | | namespace { |
290 | | |
291 | | enum class EndpointType { |
292 | | REMOTE, |
293 | | LOCAL, |
294 | | }; |
295 | | |
296 | 3.80M | Status GetEndpoint(EndpointType type, int fd, Endpoint* out) { |
297 | 3.80M | Endpoint temp; |
298 | 3.80M | DCHECK_GE(fd, 0); |
299 | 3.80M | socklen_t len = narrow_cast<socklen_t>(temp.capacity()); |
300 | 3.80M | auto result = type == EndpointType::LOCAL ? getsockname(fd, temp.data(), &len)3.76M |
301 | 3.80M | : getpeername(fd, temp.data(), &len)35.1k ; |
302 | 3.80M | if (result == -1) { |
303 | 0 | const std::string prefix = type == EndpointType::LOCAL ? "getsockname" : "getpeername"; |
304 | 0 | return STATUS(NetworkError, prefix + " error", Errno(errno)); |
305 | 0 | } |
306 | 3.80M | temp.resize(len); |
307 | 3.80M | *out = temp; |
308 | 3.80M | return Status::OK(); |
309 | 3.80M | } |
310 | | |
311 | | } // namespace |
312 | | |
313 | 3.80M | Status Socket::GetSocketAddress(Endpoint* out) const { |
314 | 3.80M | return GetEndpoint(EndpointType::LOCAL, fd_, out); |
315 | 3.80M | } |
316 | | |
317 | 12 | Status Socket::GetPeerAddress(Endpoint* out) const { |
318 | 12 | return GetEndpoint(EndpointType::REMOTE, fd_, out); |
319 | 12 | } |
320 | | |
321 | 3.13M | Status Socket::Bind(const Endpoint& endpoint, bool explain_addr_in_use) { |
322 | 3.13M | DCHECK_GE(fd_, 0); |
323 | 3.13M | if (PREDICT_FALSE(::bind(fd_, endpoint.data(), narrow_cast<socklen_t>(endpoint.size())) != 0)) { |
324 | 3.00k | Errno err(errno); |
325 | 3.00k | Status s = STATUS(NetworkError, Format("Error binding socket to $0", endpoint), err); |
326 | | |
327 | 3.00k | if (err == EADDRINUSE && explain_addr_in_use3.00k && endpoint.port() != 01 ) { |
328 | 1 | TryRunLsof(endpoint); |
329 | 1 | } |
330 | 3.00k | return s; |
331 | 3.00k | } |
332 | | |
333 | 3.13M | return Status::OK(); |
334 | 3.13M | } |
335 | | |
336 | 1.17M | Status CheckAcceptError(Socket *new_conn) { |
337 | 1.17M | if (new_conn->GetFd() < 0) { |
338 | 546k | if (IsTemporarySocketError(errno)) { |
339 | 545k | static const Status try_accept_again = STATUS(TryAgain, "Accept not yet ready"); |
340 | 545k | return try_accept_again; |
341 | 545k | } |
342 | 601 | return STATUS(NetworkError, "Accept failed", Errno(errno)); |
343 | 546k | } |
344 | | |
345 | 628k | return Status::OK(); |
346 | 1.17M | } |
347 | | |
348 | 1.17M | Status Socket::Accept(Socket *new_conn, Endpoint* remote, int flags) { |
349 | 1.17M | TRACE_EVENT0("net", "Socket::Accept"); |
350 | 1.17M | Endpoint temp; |
351 | 1.17M | socklen_t olen = narrow_cast<socklen_t>(temp.capacity()); |
352 | 1.17M | DCHECK_GE(fd_, 0); |
353 | | #if defined(__linux__) |
354 | | int accept_flags = SOCK_CLOEXEC; |
355 | | if (flags & FLAG_NONBLOCKING) { |
356 | | accept_flags |= SOCK_NONBLOCK; |
357 | | } |
358 | | new_conn->Reset(::accept4(fd_, temp.data(), &olen, accept_flags)); |
359 | | RETURN_NOT_OK(CheckAcceptError(new_conn)); |
360 | | #else |
361 | 1.17M | new_conn->Reset(::accept(fd_, temp.data(), &olen)); |
362 | 1.17M | RETURN_NOT_OK(CheckAcceptError(new_conn)); |
363 | 628k | RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING)); |
364 | 628k | RETURN_NOT_OK(new_conn->SetCloseOnExec()); |
365 | 628k | #endif // defined(__linux__) |
366 | 628k | temp.resize(olen); |
367 | | |
368 | 628k | *remote = temp; |
369 | 628k | TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD, |
370 | 628k | "remote", ToString(*remote)); |
371 | 628k | return Status::OK(); |
372 | 628k | } |
373 | | |
374 | 638k | Status Socket::BindForOutgoingConnection() { |
375 | 638k | boost::system::error_code ec; |
376 | 638k | auto bind_address = IpAddress::from_string(FLAGS_local_ip_for_outbound_sockets, ec); |
377 | 638k | CHECK(!ec) |
378 | 19.6k | << "Invalid local IP set for 'local_ip_for_outbound_sockets': '" |
379 | 19.6k | << FLAGS_local_ip_for_outbound_sockets << "': " << ec; |
380 | | |
381 | 638k | RETURN_NOT_OK(Bind(Endpoint(bind_address, 0))); |
382 | 638k | return Status::OK(); |
383 | 638k | } |
384 | | |
385 | 3.13M | Status Socket::Connect(const Endpoint& remote) { |
386 | 3.13M | TRACE_EVENT1("net", "Socket::Connect", "remote", ToString(remote)); |
387 | | |
388 | 3.13M | if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) { |
389 | 634k | RETURN_NOT_OK(BindForOutgoingConnection()); |
390 | 634k | } |
391 | | |
392 | 3.13M | DCHECK_GE(fd_, 0); |
393 | 3.13M | if (::connect(fd_, remote.data(), narrow_cast<socklen_t>(remote.size())) < 0) { |
394 | 3.13M | if (IsTemporarySocketError(errno)3.12M ) { |
395 | 3.13M | static const Status try_connect_again = STATUS(TryAgain, "Connect not yet ready"); |
396 | 3.13M | return try_connect_again; |
397 | 3.13M | } |
398 | 18.4E | return STATUS(NetworkError, "connect(2) error", Errno(errno)); |
399 | 3.12M | } |
400 | 6.08k | return Status::OK(); |
401 | 3.13M | } |
402 | | |
403 | 0 | Status Socket::GetSockError() const { |
404 | 0 | int val = 0, ret; |
405 | 0 | socklen_t val_len = sizeof(val); |
406 | 0 | DCHECK_GE(fd_, 0); |
407 | 0 | ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len); |
408 | 0 | if (ret) { |
409 | 0 | return STATUS(NetworkError, "getsockopt(SO_ERROR) failed", Errno(errno)); |
410 | 0 | } |
411 | 0 | if (val != 0) { |
412 | 0 | return STATUS(NetworkError, Errno(val)); |
413 | 0 | } |
414 | 0 | return Status::OK(); |
415 | 0 | } |
416 | | |
417 | 0 | Result<size_t> Socket::Write(const uint8_t *buf, ssize_t amt) { |
418 | 0 | if (amt <= 0) { |
419 | 0 | return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid send of $0 bytes", amt); |
420 | 0 | } |
421 | 0 | DCHECK_GE(fd_, 0); |
422 | 0 | auto res = ::send(fd_, buf, amt, MSG_NOSIGNAL); |
423 | 0 | if (res < 0) { |
424 | 0 | return STATUS(NetworkError, "Write error", Errno(errno)); |
425 | 0 | } |
426 | 0 | return res; |
427 | 0 | } |
428 | | |
429 | 160M | Result<size_t> Socket::Writev(const struct ::iovec *iov, int iov_len) { |
430 | 160M | if (PREDICT_FALSE(iov_len <= 0)) { |
431 | 0 | return STATUS(NetworkError, |
432 | 0 | StringPrintf("Writev: invalid io vector length of %d", iov_len), |
433 | 0 | Slice() /* msg2 */, Errno(EINVAL)); |
434 | 0 | } |
435 | 160M | DCHECK_GE(fd_, 0); |
436 | | |
437 | 160M | struct msghdr msg; |
438 | 160M | memset(&msg, 0, sizeof(struct msghdr)); |
439 | 160M | msg.msg_iov = const_cast<iovec *>(iov); |
440 | 160M | msg.msg_iovlen = iov_len; |
441 | 160M | auto res = ::sendmsg(fd_, &msg, MSG_NOSIGNAL); |
442 | 160M | if (PREDICT_FALSE(res < 0)) { |
443 | 32.2k | if (IsTemporarySocketError(errno)) { |
444 | 30.7k | static const Status try_write_again = STATUS(TryAgain, "Write not yet ready"); |
445 | 30.7k | return try_write_again; |
446 | 30.7k | } |
447 | 1.49k | return STATUS(NetworkError, "sendmsg error", Errno(errno)); |
448 | 32.2k | } |
449 | | |
450 | 160M | return res; |
451 | 160M | } |
452 | | |
453 | | // Mostly follows writen() from Stevens (2004) or Kerrisk (2010). |
454 | 0 | Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, const MonoTime& deadline) { |
455 | 0 | DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported"; |
456 | |
|
457 | 0 | const uint8_t* bufend = buf + buflen; |
458 | 0 | while (buf < bufend) { |
459 | 0 | auto num_to_write = bufend - buf; |
460 | 0 | MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now()); |
461 | 0 | if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) { |
462 | 0 | return STATUS(TimedOut, "BlockingWrite timed out"); |
463 | 0 | } |
464 | 0 | RETURN_NOT_OK(SetSendTimeout(timeout)); |
465 | 0 | auto inc_num_written = Write(buf, num_to_write); |
466 | |
|
467 | 0 | if (PREDICT_FALSE(!inc_num_written.ok())) { |
468 | 0 | Errno err(inc_num_written.status()); |
469 | | // Continue silently when the syscall is interrupted. |
470 | 0 | if (err == EINTR) { |
471 | 0 | continue; |
472 | 0 | } |
473 | 0 | if (err == EAGAIN) { |
474 | 0 | return STATUS(TimedOut, ""); |
475 | 0 | } |
476 | 0 | return inc_num_written.status().CloneAndPrepend("BlockingWrite error"); |
477 | 0 | } |
478 | 0 | if (PREDICT_FALSE(*inc_num_written == 0)) { |
479 | | // Shouldn't happen on Linux with a blocking socket. Maybe other Unices. |
480 | 0 | return STATUS_FORMAT( |
481 | 0 | IOError, "Wrote zero bytes on a BlockingWrite() call. Transferred $0 of $1 bytes.", |
482 | 0 | buflen - num_to_write, buflen); |
483 | 0 | } |
484 | 0 | buf += *inc_num_written; |
485 | 0 | } |
486 | | |
487 | 0 | return Status::OK(); |
488 | 0 | } |
489 | | |
490 | 9.35k | Result<size_t> Socket::Recv(uint8_t* buf, ssize_t amt) { |
491 | 9.35k | if (amt <= 0) { |
492 | 0 | return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid recv of $0 bytes", amt); |
493 | 0 | } |
494 | | |
495 | | // The recv() call can return fewer than the requested number of bytes. |
496 | | // Especially when 'amt' is small, this is very unlikely to happen in |
497 | | // the context of unit tests. So, we provide an injection hook which |
498 | | // simulates the same behavior. |
499 | 9.35k | if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) { |
500 | 0 | amt = RandomUniformInt<ssize_t>(1, amt); |
501 | 0 | } |
502 | | |
503 | 9.35k | DCHECK_GE(fd_, 0); |
504 | 9.35k | auto res = ::recv(fd_, buf, amt, 0); |
505 | 9.35k | if (res <= 0) { |
506 | 2.10k | if (res == 0) { |
507 | 0 | return STATUS(NetworkError, "Recv() got EOF from remote", Slice(), Errno(ESHUTDOWN)); |
508 | 0 | } |
509 | 2.10k | if (IsTemporarySocketError(errno)) { |
510 | 2.10k | static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready"); |
511 | 2.10k | return try_recv_again; |
512 | 2.10k | } |
513 | 0 | return STATUS(NetworkError, "Recv error", Errno(errno)); |
514 | 2.10k | } |
515 | | |
516 | 7.25k | return res; |
517 | 9.35k | } |
518 | | |
519 | 321M | Result<size_t> Socket::Recvv(IoVecs* vecs) { |
520 | 321M | if (PREDICT_FALSE(vecs->empty())) { |
521 | 0 | return STATUS(NetworkError, "Recvv: receive to empty vecs"); |
522 | 0 | } |
523 | 321M | if (fd_ < 0) { |
524 | 0 | return STATUS(NetworkError, "Recvv on closed socket"); |
525 | 0 | } |
526 | | |
527 | 321M | struct msghdr msg; |
528 | 321M | memset(&msg, 0, sizeof(struct msghdr)); |
529 | 321M | msg.msg_iov = vecs->data(); |
530 | 321M | msg.msg_iovlen = narrow_cast<int>(vecs->size()); |
531 | 321M | auto res = recvmsg(fd_, &msg, MSG_NOSIGNAL); |
532 | 321M | if (PREDICT_FALSE(res <= 0)) { |
533 | 161M | if (res == 0) { |
534 | 495k | return STATUS(NetworkError, "recvmsg got EOF from remote", Slice(), Errno(ESHUTDOWN)); |
535 | 495k | } |
536 | 161M | if (IsTemporarySocketError(errno)) { |
537 | 158M | static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready"); |
538 | 158M | return try_recv_again; |
539 | 158M | } |
540 | 2.50M | return STATUS(NetworkError, "recvmsg error", Errno(errno)); |
541 | 161M | } |
542 | | |
543 | 159M | return res; |
544 | 321M | } |
545 | | |
546 | | // Mostly follows readn() from Stevens (2004) or Kerrisk (2010). |
547 | | // One place where we deviate: we consider EOF a failure if < amt bytes are read. |
548 | 1 | Result<size_t> Socket::BlockingRecv(uint8_t *buf, size_t amt, const MonoTime& deadline) { |
549 | 1 | DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported"0 ; |
550 | 1 | size_t tot_read = 0; |
551 | | |
552 | | // We populate this with the full (initial) duration of the timeout on the first iteration of the |
553 | | // loop below. |
554 | 1 | MonoDelta full_timeout; |
555 | | |
556 | 3 | while (tot_read < amt) { |
557 | | // Read at most the max value of int32_t bytes at a time. |
558 | 3 | const auto num_to_read = std::min<size_t>(amt - tot_read, std::numeric_limits<int32_t>::max()); |
559 | 3 | const MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now()); |
560 | 3 | if (!full_timeout.Initialized()) { |
561 | 1 | full_timeout = timeout; |
562 | 1 | } |
563 | 3 | if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) { |
564 | 1 | VLOG(4) << __func__ << " timed out in " << full_timeout.ToString()0 ; |
565 | 1 | return STATUS(TimedOut, ""); |
566 | 1 | } |
567 | 2 | RETURN_NOT_OK(SetRecvTimeout(timeout)); |
568 | 2 | auto recv_res = Recv(buf, num_to_read); |
569 | 2 | if (PREDICT_TRUE(recv_res.ok())) { |
570 | 1 | auto inc_num_read = *recv_res; |
571 | 1 | if (PREDICT_FALSE(inc_num_read == 0)) { |
572 | | // EOF. |
573 | 0 | break; |
574 | 1 | } else { |
575 | 1 | tot_read += inc_num_read; |
576 | 1 | buf += inc_num_read; |
577 | 1 | } |
578 | 1 | } else { |
579 | | // Continue silently when the syscall is interrupted. |
580 | | // |
581 | | // We used to treat EAGAIN as a timeout, and the reason for that is not entirely clear |
582 | | // to me (mbautin). http://man7.org/linux/man-pages/man2/recv.2.html says that EAGAIN and |
583 | | // EWOULDBLOCK could be used interchangeably, and these could happen on a nonblocking socket |
584 | | // that no data is available on. I think we should just retry in that case. |
585 | 1 | if (recv_res.status().IsTryAgain()) { |
586 | 1 | continue; |
587 | 1 | } |
588 | 0 | return recv_res.status().CloneAndPrepend("BlockingRecv error"); |
589 | 1 | } |
590 | 2 | } |
591 | | |
592 | 0 | if (PREDICT_FALSE(tot_read < amt)) { |
593 | 0 | return STATUS(IOError, "Read zero bytes on a blocking Recv() call", |
594 | 0 | StringPrintf("Transferred %zu of %zu bytes", tot_read, amt)); |
595 | 0 | } |
596 | | |
597 | 0 | return tot_read; |
598 | 0 | } |
599 | | |
600 | 7.49M | Status Socket::SetTimeout(int opt, std::string optname, const MonoDelta& timeout) { |
601 | 7.49M | if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) { |
602 | 0 | return STATUS(InvalidArgument, "Timeout specified as negative to SetTimeout", |
603 | 0 | timeout.ToString()); |
604 | 0 | } |
605 | 7.49M | struct timeval tv; |
606 | 7.49M | timeout.ToTimeVal(&tv); |
607 | 7.49M | socklen_t optlen = sizeof(tv); |
608 | 7.49M | if (::setsockopt(fd_, SOL_SOCKET, opt, &tv, optlen) == -1) { |
609 | 0 | return STATUS( |
610 | 0 | NetworkError, |
611 | 0 | StringPrintf("Failed to set %s to %s", optname.c_str(), timeout.ToString().c_str()), |
612 | 0 | Errno(errno)); |
613 | 0 | } |
614 | 7.49M | return Status::OK(); |
615 | 7.49M | } |
616 | | |
617 | 3.74M | Result<int32_t> Socket::GetReceiveBufferSize() { |
618 | 3.74M | int32_t val = 0; |
619 | 3.74M | socklen_t val_len = sizeof(val); |
620 | 3.74M | DCHECK_GE(fd_, 0); |
621 | 3.74M | if (getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, &val_len)) { |
622 | 0 | return STATUS(NetworkError, "Failed to get socket receive buffer", Errno(errno)); |
623 | 0 | } |
624 | 3.74M | return val; |
625 | 3.74M | } |
626 | | |
627 | 0 | Status Socket::SetReceiveBufferSize(int32_t size) { |
628 | 0 | int32_t val = size / 2; // Kernel will double this value |
629 | 0 | DCHECK_GE(fd_, 0); |
630 | 0 | if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val))) { |
631 | 0 | return STATUS( |
632 | 0 | NetworkError, "Failed to set socket receive buffer", Errno(errno)); |
633 | 0 | } |
634 | 0 | return Status::OK(); |
635 | 0 | } |
636 | | |
637 | | } // namespace yb |