YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/net/socket.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/net/socket.h"
34
35
#include <netinet/in.h>
36
#include <sys/types.h>
37
38
#include <limits>
39
#include <string>
40
41
#include <glog/logging.h>
42
43
#include "yb/gutil/casts.h"
44
#include "yb/gutil/stringprintf.h"
45
46
#include "yb/util/debug/trace_event.h"
47
#include "yb/util/errno.h"
48
#include "yb/util/flag_tags.h"
49
#include "yb/util/monotime.h"
50
#include "yb/util/net/net_util.h"
51
#include "yb/util/net/sockaddr.h"
52
#include "yb/util/random.h"
53
#include "yb/util/random_util.h"
54
#include "yb/util/result.h"
55
#include "yb/util/status_format.h"
56
57
DEFINE_string(local_ip_for_outbound_sockets, "",
58
              "IP to bind to when making outgoing socket connections. "
59
              "This must be an IP address of the form A.B.C.D, not a hostname. "
60
              "Advanced parameter, subject to change.");
61
TAG_FLAG(local_ip_for_outbound_sockets, experimental);
62
63
DEFINE_bool(socket_inject_short_recvs, false,
64
            "Inject short recv() responses which return less data than "
65
            "requested");
66
TAG_FLAG(socket_inject_short_recvs, hidden);
67
TAG_FLAG(socket_inject_short_recvs, unsafe);
68
69
namespace yb {
70
71
159M
size_t IoVecsFullSize(const IoVecs& io_vecs) {
72
159M
  return std::accumulate(io_vecs.begin(), io_vecs.end(), 0ULL, [](size_t p, const iovec& v) {
73
159M
    return p + v.iov_len;
74
159M
  });
75
159M
}
76
77
163M
void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, std::vector<char>* result) {
78
163M
  result->clear();
79
163M
  result->reserve(end - begin);
80
163M
  for (const auto& io_vec : io_vecs) {
81
163M
    if (begin == end) {
82
122
      break;
83
122
    }
84
163M
    
if (163M
io_vec.iov_len > begin163M
) {
85
163M
      size_t clen = std::min(io_vec.iov_len, end) - begin;
86
163M
      auto start = IoVecBegin(io_vec) + begin;
87
163M
      result->insert(result->end(), start, start + clen);
88
163M
      begin += clen;
89
163M
    }
90
163M
    begin -= io_vec.iov_len;
91
163M
    end -= io_vec.iov_len;
92
163M
  }
93
163M
}
94
95
152M
void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, char* result) {
96
152M
  for (const auto& io_vec : io_vecs) {
97
152M
    if (begin == end) {
98
0
      break;
99
0
    }
100
152M
    
if (152M
io_vec.iov_len > begin152M
) {
101
152M
      size_t clen = std::min(io_vec.iov_len, end) - begin;
102
152M
      auto start = IoVecBegin(io_vec) + begin;
103
152M
      memcpy(result, start, clen);
104
152M
      result += clen;
105
152M
      begin += clen;
106
152M
    }
107
152M
    begin -= io_vec.iov_len;
108
152M
    end -= io_vec.iov_len;
109
152M
  }
110
152M
}
111
112
Socket::Socket()
113
4.37M
  : fd_(-1) {
114
4.37M
}
115
116
Socket::Socket(int fd)
117
0
  : fd_(fd) {
118
0
}
119
120
4.37M
void Socket::Reset(int fd) {
121
4.37M
  WARN_NOT_OK(Close(), "Close failed");
122
4.37M
  fd_ = fd;
123
4.37M
}
124
125
10.0M
int Socket::Release() {
126
10.0M
  int fd = fd_;
127
10.0M
  fd_ = -1;
128
10.0M
  return fd;
129
10.0M
}
130
131
13.8M
Socket::~Socket() {
132
13.8M
  auto status = Close();
133
13.8M
  if (!status.ok()) {
134
0
    LOG(WARNING) << "Failed to close socket: " << status.ToString();
135
0
  }
136
13.8M
}
137
138
24.5M
Status Socket::Close() {
139
24.5M
  if (fd_ < 0)
140
21.3M
    return Status::OK();
141
3.28M
  int fd = fd_;
142
3.28M
  fd_ = -1;
143
3.28M
  if (::close(fd) < 0) {
144
0
    return STATUS(NetworkError, "Close error", Errno(errno));
145
0
  }
146
3.28M
  return Status::OK();
147
3.28M
}
148
149
7.44k
Status Socket::Shutdown(bool shut_read, bool shut_write) {
150
7.44k
  DCHECK_GE(fd_, 0);
151
7.44k
  int flags = 0;
152
7.44k
  if (shut_read && 
shut_write7.41k
) {
153
7.41k
    flags |= SHUT_RDWR;
154
7.41k
  } else 
if (33
shut_read33
) {
155
0
    flags |= SHUT_RD;
156
33
  } else if (shut_write) {
157
0
    flags |= SHUT_WR;
158
0
  }
159
7.44k
  if (::shutdown(fd_, flags) < 0) {
160
5.23k
    return STATUS(NetworkError, "Shutdown error", Errno(errno));
161
5.23k
  }
162
2.21k
  return Status::OK();
163
7.44k
}
164
165
4.93M
int Socket::GetFd() const {
166
4.93M
  return fd_;
167
4.93M
}
168
169
165M
bool IsTemporarySocketError(int err) {
170
165M
  return err == EAGAIN || 
err == EWOULDBLOCK5.68M
||
err == EINTR5.68M
||
err == EINPROGRESS5.68M
;
171
165M
}
172
173
#if defined(__linux__)
174
175
Status Socket::Init(int flags) {
176
  auto family = flags & FLAG_IPV6 ? AF_INET6 : AF_INET;
177
  int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
178
  Reset(::socket(family, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
179
  if (fd_ < 0) {
180
    return STATUS(NetworkError, "Error opening socket", Errno(errno));
181
  }
182
183
  return Status::OK();
184
}
185
186
#else
187
188
3.19M
Status Socket::Init(int flags) {
189
3.19M
  Reset(::socket(flags & FLAG_IPV6 ? AF_INET6 : AF_INET, SOCK_STREAM, 0));
190
3.19M
  if (fd_ < 0) {
191
4.78k
    return STATUS(NetworkError, "Error opening socket", Errno(errno));
192
4.78k
  }
193
3.19M
  RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
194
3.19M
  RETURN_NOT_OK(SetCloseOnExec());
195
196
  // Disable SIGPIPE.
197
3.19M
  int set = 1;
198
3.19M
  if (setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)) == -1) {
199
0
    return STATUS(NetworkError, "Failed to set SO_NOSIGPIPE", Errno(errno));
200
0
  }
201
202
3.19M
  return Status::OK();
203
3.19M
}
204
205
#endif // defined(__linux__)
206
207
7.50M
Status Socket::SetNoDelay(bool enabled) {
208
18.4E
  int flag = 
enabled7.50M
?
17.50M
: 0;
209
7.50M
  if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) {
210
1
    return STATUS(NetworkError, "Failed to set TCP_NODELAY", Errno(errno));
211
1
  }
212
7.50M
  return Status::OK();
213
7.50M
}
214
215
3.84M
Status Socket::SetNonBlocking(bool enabled) {
216
3.84M
  int curflags = ::fcntl(fd_, F_GETFL, 0);
217
3.84M
  if (curflags == -1) {
218
0
    return STATUS(
219
0
        NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_),
220
0
        Errno(errno));
221
0
  }
222
3.84M
  int newflags = (enabled) ? 
(curflags | O_NONBLOCK)3.79M
:
(curflags & ~O_NONBLOCK)52.8k
;
223
3.84M
  if (::fcntl(fd_, F_SETFL, newflags) == -1) {
224
0
    if (enabled) {
225
0
      return STATUS(
226
0
          NetworkError, StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_), Errno(errno));
227
0
    } else {
228
0
      return STATUS(
229
0
          NetworkError, StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_), Errno(errno));
230
0
    }
231
0
  }
232
3.84M
  return Status::OK();
233
3.84M
}
234
235
0
Status Socket::IsNonBlocking(bool* is_nonblock) const {
236
0
  int curflags = ::fcntl(fd_, F_GETFL, 0);
237
0
  if (curflags == -1) {
238
0
    return STATUS(
239
0
        NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_), Errno(errno));
240
0
  }
241
0
  *is_nonblock = ((curflags & O_NONBLOCK) != 0);
242
0
  return Status::OK();
243
0
}
244
245
3.82M
Status Socket::SetCloseOnExec() {
246
3.82M
  int curflags = fcntl(fd_, F_GETFD, 0);
247
3.82M
  if (curflags == -1) {
248
0
    Reset(-1);
249
0
    return STATUS(NetworkError, "fcntl(F_GETFD) error", Errno(errno));
250
0
  }
251
3.82M
  if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
252
0
    Reset(-1);
253
0
    return STATUS(NetworkError, "fcntl(F_SETFD) error", Errno(errno));
254
0
  }
255
3.82M
  return Status::OK();
256
3.82M
}
257
258
3.74M
Status Socket::SetSendTimeout(const MonoDelta& timeout) {
259
3.74M
  return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
260
3.74M
}
261
262
3.75M
Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
263
3.75M
  return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
264
3.75M
}
265
266
2.47M
Status Socket::SetReuseAddr(bool flag) {
267
2.47M
  int int_flag = flag ? 
12.47M
:
0798
;
268
2.47M
  if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &int_flag, sizeof(int_flag)) == -1) {
269
0
    return STATUS(NetworkError, "Failed to set SO_REUSEADDR", Errno(errno));
270
0
  }
271
2.47M
  return Status::OK();
272
2.47M
}
273
274
Status Socket::BindAndListen(const Endpoint& sockaddr,
275
4
                             int listenQueueSize) {
276
4
  RETURN_NOT_OK(SetReuseAddr(true));
277
4
  RETURN_NOT_OK(Bind(sockaddr));
278
4
  RETURN_NOT_OK(Listen(listenQueueSize));
279
4
  return Status::OK();
280
4
}
281
282
25.9k
Status Socket::Listen(int listen_queue_size) {
283
25.9k
  if (listen(fd_, listen_queue_size)) {
284
0
    return STATUS(NetworkError, "listen() error", Errno(errno));
285
0
  }
286
25.9k
  return Status::OK();
287
25.9k
}
288
289
namespace {
290
291
enum class EndpointType {
292
  REMOTE,
293
  LOCAL,
294
};
295
296
3.80M
Status GetEndpoint(EndpointType type, int fd, Endpoint* out) {
297
3.80M
  Endpoint temp;
298
3.80M
  DCHECK_GE(fd, 0);
299
3.80M
  socklen_t len = narrow_cast<socklen_t>(temp.capacity());
300
3.80M
  auto result = type == EndpointType::LOCAL ? 
getsockname(fd, temp.data(), &len)3.76M
301
3.80M
                                            : 
getpeername(fd, temp.data(), &len)35.1k
;
302
3.80M
  if (result == -1) {
303
0
    const std::string prefix = type == EndpointType::LOCAL ? "getsockname" : "getpeername";
304
0
    return STATUS(NetworkError, prefix + " error", Errno(errno));
305
0
  }
306
3.80M
  temp.resize(len);
307
3.80M
  *out = temp;
308
3.80M
  return Status::OK();
309
3.80M
}
310
311
} // namespace
312
313
3.80M
Status Socket::GetSocketAddress(Endpoint* out) const {
314
3.80M
  return GetEndpoint(EndpointType::LOCAL, fd_, out);
315
3.80M
}
316
317
12
Status Socket::GetPeerAddress(Endpoint* out) const {
318
12
  return GetEndpoint(EndpointType::REMOTE, fd_, out);
319
12
}
320
321
3.13M
Status Socket::Bind(const Endpoint& endpoint, bool explain_addr_in_use) {
322
3.13M
  DCHECK_GE(fd_, 0);
323
3.13M
  if (PREDICT_FALSE(::bind(fd_, endpoint.data(), narrow_cast<socklen_t>(endpoint.size())) != 0)) {
324
3.00k
    Errno err(errno);
325
3.00k
    Status s = STATUS(NetworkError, Format("Error binding socket to $0", endpoint), err);
326
327
3.00k
    if (err == EADDRINUSE && 
explain_addr_in_use3.00k
&&
endpoint.port() != 01
) {
328
1
      TryRunLsof(endpoint);
329
1
    }
330
3.00k
    return s;
331
3.00k
  }
332
333
3.13M
  return Status::OK();
334
3.13M
}
335
336
1.17M
Status CheckAcceptError(Socket *new_conn) {
337
1.17M
  if (new_conn->GetFd() < 0) {
338
546k
    if (IsTemporarySocketError(errno)) {
339
545k
      static const Status try_accept_again = STATUS(TryAgain, "Accept not yet ready");
340
545k
      return try_accept_again;
341
545k
    }
342
601
    return STATUS(NetworkError, "Accept failed", Errno(errno));
343
546k
  }
344
345
628k
  return Status::OK();
346
1.17M
}
347
348
1.17M
Status Socket::Accept(Socket *new_conn, Endpoint* remote, int flags) {
349
1.17M
  TRACE_EVENT0("net", "Socket::Accept");
350
1.17M
  Endpoint temp;
351
1.17M
  socklen_t olen = narrow_cast<socklen_t>(temp.capacity());
352
1.17M
  DCHECK_GE(fd_, 0);
353
#if defined(__linux__)
354
  int accept_flags = SOCK_CLOEXEC;
355
  if (flags & FLAG_NONBLOCKING) {
356
    accept_flags |= SOCK_NONBLOCK;
357
  }
358
  new_conn->Reset(::accept4(fd_, temp.data(), &olen, accept_flags));
359
  RETURN_NOT_OK(CheckAcceptError(new_conn));
360
#else
361
1.17M
  new_conn->Reset(::accept(fd_, temp.data(), &olen));
362
1.17M
  RETURN_NOT_OK(CheckAcceptError(new_conn));
363
628k
  RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
364
628k
  RETURN_NOT_OK(new_conn->SetCloseOnExec());
365
628k
#endif // defined(__linux__)
366
628k
  temp.resize(olen);
367
368
628k
  *remote = temp;
369
628k
  TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
370
628k
                       "remote", ToString(*remote));
371
628k
  return Status::OK();
372
628k
}
373
374
638k
Status Socket::BindForOutgoingConnection() {
375
638k
  boost::system::error_code ec;
376
638k
  auto bind_address = IpAddress::from_string(FLAGS_local_ip_for_outbound_sockets, ec);
377
638k
  CHECK(!ec)
378
19.6k
    << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
379
19.6k
    << FLAGS_local_ip_for_outbound_sockets << "': " << ec;
380
381
638k
  RETURN_NOT_OK(Bind(Endpoint(bind_address, 0)));
382
638k
  return Status::OK();
383
638k
}
384
385
3.13M
Status Socket::Connect(const Endpoint& remote) {
386
3.13M
  TRACE_EVENT1("net", "Socket::Connect", "remote", ToString(remote));
387
388
3.13M
  if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
389
634k
    RETURN_NOT_OK(BindForOutgoingConnection());
390
634k
  }
391
392
3.13M
  DCHECK_GE(fd_, 0);
393
3.13M
  if (::connect(fd_, remote.data(), narrow_cast<socklen_t>(remote.size())) < 0) {
394
3.13M
    if (
IsTemporarySocketError(errno)3.12M
) {
395
3.13M
      static const Status try_connect_again = STATUS(TryAgain, "Connect not yet ready");
396
3.13M
      return try_connect_again;
397
3.13M
    }
398
18.4E
    return STATUS(NetworkError, "connect(2) error", Errno(errno));
399
3.12M
  }
400
6.08k
  return Status::OK();
401
3.13M
}
402
403
0
Status Socket::GetSockError() const {
404
0
  int val = 0, ret;
405
0
  socklen_t val_len = sizeof(val);
406
0
  DCHECK_GE(fd_, 0);
407
0
  ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
408
0
  if (ret) {
409
0
    return STATUS(NetworkError, "getsockopt(SO_ERROR) failed", Errno(errno));
410
0
  }
411
0
  if (val != 0) {
412
0
    return STATUS(NetworkError, Errno(val));
413
0
  }
414
0
  return Status::OK();
415
0
}
416
417
0
Result<size_t> Socket::Write(const uint8_t *buf, ssize_t amt) {
418
0
  if (amt <= 0) {
419
0
    return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid send of $0 bytes", amt);
420
0
  }
421
0
  DCHECK_GE(fd_, 0);
422
0
  auto res = ::send(fd_, buf, amt, MSG_NOSIGNAL);
423
0
  if (res < 0) {
424
0
    return STATUS(NetworkError, "Write error", Errno(errno));
425
0
  }
426
0
  return res;
427
0
}
428
429
160M
Result<size_t> Socket::Writev(const struct ::iovec *iov, int iov_len) {
430
160M
  if (PREDICT_FALSE(iov_len <= 0)) {
431
0
    return STATUS(NetworkError,
432
0
                  StringPrintf("Writev: invalid io vector length of %d", iov_len),
433
0
                  Slice() /* msg2 */, Errno(EINVAL));
434
0
  }
435
160M
  DCHECK_GE(fd_, 0);
436
437
160M
  struct msghdr msg;
438
160M
  memset(&msg, 0, sizeof(struct msghdr));
439
160M
  msg.msg_iov = const_cast<iovec *>(iov);
440
160M
  msg.msg_iovlen = iov_len;
441
160M
  auto res = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
442
160M
  if (PREDICT_FALSE(res < 0)) {
443
32.2k
    if (IsTemporarySocketError(errno)) {
444
30.7k
      static const Status try_write_again = STATUS(TryAgain, "Write not yet ready");
445
30.7k
      return try_write_again;
446
30.7k
    }
447
1.49k
    return STATUS(NetworkError, "sendmsg error", Errno(errno));
448
32.2k
  }
449
450
160M
  return res;
451
160M
}
452
453
// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
454
0
Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, const MonoTime& deadline) {
455
0
  DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported";
456
457
0
  const uint8_t* bufend = buf + buflen;
458
0
  while (buf < bufend) {
459
0
    auto num_to_write = bufend - buf;
460
0
    MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now());
461
0
    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
462
0
      return STATUS(TimedOut, "BlockingWrite timed out");
463
0
    }
464
0
    RETURN_NOT_OK(SetSendTimeout(timeout));
465
0
    auto inc_num_written = Write(buf, num_to_write);
466
467
0
    if (PREDICT_FALSE(!inc_num_written.ok())) {
468
0
      Errno err(inc_num_written.status());
469
      // Continue silently when the syscall is interrupted.
470
0
      if (err == EINTR) {
471
0
        continue;
472
0
      }
473
0
      if (err == EAGAIN) {
474
0
        return STATUS(TimedOut, "");
475
0
      }
476
0
      return inc_num_written.status().CloneAndPrepend("BlockingWrite error");
477
0
    }
478
0
    if (PREDICT_FALSE(*inc_num_written == 0)) {
479
      // Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
480
0
      return STATUS_FORMAT(
481
0
          IOError, "Wrote zero bytes on a BlockingWrite() call. Transferred $0 of $1 bytes.",
482
0
          buflen - num_to_write, buflen);
483
0
    }
484
0
    buf += *inc_num_written;
485
0
  }
486
487
0
  return Status::OK();
488
0
}
489
490
9.35k
Result<size_t> Socket::Recv(uint8_t* buf, ssize_t amt) {
491
9.35k
  if (amt <= 0) {
492
0
    return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid recv of $0 bytes", amt);
493
0
  }
494
495
  // The recv() call can return fewer than the requested number of bytes.
496
  // Especially when 'amt' is small, this is very unlikely to happen in
497
  // the context of unit tests. So, we provide an injection hook which
498
  // simulates the same behavior.
499
9.35k
  if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
500
0
    amt = RandomUniformInt<ssize_t>(1, amt);
501
0
  }
502
503
9.35k
  DCHECK_GE(fd_, 0);
504
9.35k
  auto res = ::recv(fd_, buf, amt, 0);
505
9.35k
  if (res <= 0) {
506
2.10k
    if (res == 0) {
507
0
      return STATUS(NetworkError, "Recv() got EOF from remote", Slice(), Errno(ESHUTDOWN));
508
0
    }
509
2.10k
    if (IsTemporarySocketError(errno)) {
510
2.10k
      static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready");
511
2.10k
      return try_recv_again;
512
2.10k
    }
513
0
    return STATUS(NetworkError, "Recv error", Errno(errno));
514
2.10k
  }
515
516
7.25k
  return res;
517
9.35k
}
518
519
321M
Result<size_t> Socket::Recvv(IoVecs* vecs) {
520
321M
  if (PREDICT_FALSE(vecs->empty())) {
521
0
    return STATUS(NetworkError, "Recvv: receive to empty vecs");
522
0
  }
523
321M
  if (fd_ < 0) {
524
0
    return STATUS(NetworkError, "Recvv on closed socket");
525
0
  }
526
527
321M
  struct msghdr msg;
528
321M
  memset(&msg, 0, sizeof(struct msghdr));
529
321M
  msg.msg_iov = vecs->data();
530
321M
  msg.msg_iovlen = narrow_cast<int>(vecs->size());
531
321M
  auto res = recvmsg(fd_, &msg, MSG_NOSIGNAL);
532
321M
  if (PREDICT_FALSE(res <= 0)) {
533
161M
    if (res == 0) {
534
495k
      return STATUS(NetworkError, "recvmsg got EOF from remote", Slice(), Errno(ESHUTDOWN));
535
495k
    }
536
161M
    if (IsTemporarySocketError(errno)) {
537
158M
      static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready");
538
158M
      return try_recv_again;
539
158M
    }
540
2.50M
    return STATUS(NetworkError, "recvmsg error", Errno(errno));
541
161M
  }
542
543
159M
  return res;
544
321M
}
545
546
// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
547
// One place where we deviate: we consider EOF a failure if < amt bytes are read.
548
1
Result<size_t> Socket::BlockingRecv(uint8_t *buf, size_t amt, const MonoTime& deadline) {
549
1
  DCHECK_LE
(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported"0
;
550
1
  size_t tot_read = 0;
551
552
  // We populate this with the full (initial) duration of the timeout on the first iteration of the
553
  // loop below.
554
1
  MonoDelta full_timeout;
555
556
3
  while (tot_read < amt) {
557
    // Read at most the max value of int32_t bytes at a time.
558
3
    const auto num_to_read = std::min<size_t>(amt - tot_read, std::numeric_limits<int32_t>::max());
559
3
    const MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now());
560
3
    if (!full_timeout.Initialized()) {
561
1
      full_timeout = timeout;
562
1
    }
563
3
    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
564
1
      VLOG
(4) << __func__ << " timed out in " << full_timeout.ToString()0
;
565
1
      return STATUS(TimedOut, "");
566
1
    }
567
2
    RETURN_NOT_OK(SetRecvTimeout(timeout));
568
2
    auto recv_res = Recv(buf, num_to_read);
569
2
    if (PREDICT_TRUE(recv_res.ok())) {
570
1
      auto inc_num_read = *recv_res;
571
1
      if (PREDICT_FALSE(inc_num_read == 0)) {
572
        // EOF.
573
0
        break;
574
1
      } else {
575
1
        tot_read += inc_num_read;
576
1
        buf += inc_num_read;
577
1
      }
578
1
    } else {
579
      // Continue silently when the syscall is interrupted.
580
      //
581
      // We used to treat EAGAIN as a timeout, and the reason for that is not entirely clear
582
      // to me (mbautin). http://man7.org/linux/man-pages/man2/recv.2.html says that EAGAIN and
583
      // EWOULDBLOCK could be used interchangeably, and these could happen on a nonblocking socket
584
      // that no data is available on. I think we should just retry in that case.
585
1
      if (recv_res.status().IsTryAgain()) {
586
1
        continue;
587
1
      }
588
0
      return recv_res.status().CloneAndPrepend("BlockingRecv error");
589
1
    }
590
2
  }
591
592
0
  if (PREDICT_FALSE(tot_read < amt)) {
593
0
    return STATUS(IOError, "Read zero bytes on a blocking Recv() call",
594
0
        StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
595
0
  }
596
597
0
  return tot_read;
598
0
}
599
600
7.49M
Status Socket::SetTimeout(int opt, std::string optname, const MonoDelta& timeout) {
601
7.49M
  if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
602
0
    return STATUS(InvalidArgument, "Timeout specified as negative to SetTimeout",
603
0
                                   timeout.ToString());
604
0
  }
605
7.49M
  struct timeval tv;
606
7.49M
  timeout.ToTimeVal(&tv);
607
7.49M
  socklen_t optlen = sizeof(tv);
608
7.49M
  if (::setsockopt(fd_, SOL_SOCKET, opt, &tv, optlen) == -1) {
609
0
    return STATUS(
610
0
        NetworkError,
611
0
        StringPrintf("Failed to set %s to %s", optname.c_str(), timeout.ToString().c_str()),
612
0
        Errno(errno));
613
0
  }
614
7.49M
  return Status::OK();
615
7.49M
}
616
617
3.74M
Result<int32_t> Socket::GetReceiveBufferSize() {
618
3.74M
  int32_t val = 0;
619
3.74M
  socklen_t val_len = sizeof(val);
620
3.74M
  DCHECK_GE(fd_, 0);
621
3.74M
  if (getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, &val_len)) {
622
0
    return STATUS(NetworkError, "Failed to get socket receive buffer", Errno(errno));
623
0
  }
624
3.74M
  return val;
625
3.74M
}
626
627
0
Status Socket::SetReceiveBufferSize(int32_t size) {
628
0
  int32_t val = size / 2; // Kernel will double this value
629
0
  DCHECK_GE(fd_, 0);
630
0
  if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val))) {
631
0
    return STATUS(
632
0
        NetworkError, "Failed to set socket receive buffer", Errno(errno));
633
0
  }
634
0
  return Status::OK();
635
0
}
636
637
} // namespace yb