YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/net/socket.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/net/socket.h"
34
35
#include <netinet/in.h>
36
#include <sys/types.h>
37
38
#include <limits>
39
#include <string>
40
41
#include <glog/logging.h>
42
43
#include "yb/gutil/casts.h"
44
#include "yb/gutil/stringprintf.h"
45
46
#include "yb/util/debug/trace_event.h"
47
#include "yb/util/errno.h"
48
#include "yb/util/flag_tags.h"
49
#include "yb/util/monotime.h"
50
#include "yb/util/net/net_util.h"
51
#include "yb/util/net/sockaddr.h"
52
#include "yb/util/random.h"
53
#include "yb/util/random_util.h"
54
#include "yb/util/result.h"
55
#include "yb/util/status_format.h"
56
57
DEFINE_string(local_ip_for_outbound_sockets, "",
58
              "IP to bind to when making outgoing socket connections. "
59
              "This must be an IP address of the form A.B.C.D, not a hostname. "
60
              "Advanced parameter, subject to change.");
61
TAG_FLAG(local_ip_for_outbound_sockets, experimental);
62
63
DEFINE_bool(socket_inject_short_recvs, false,
64
            "Inject short recv() responses which return less data than "
65
            "requested");
66
TAG_FLAG(socket_inject_short_recvs, hidden);
67
TAG_FLAG(socket_inject_short_recvs, unsafe);
68
69
namespace yb {
70
71
37.0M
size_t IoVecsFullSize(const IoVecs& io_vecs) {
72
37.0M
  return std::accumulate(io_vecs.begin(), io_vecs.end(), 0ULL, [](size_t p, const iovec& v) {
73
37.0M
    return p + v.iov_len;
74
37.0M
  });
75
37.0M
}
76
77
38.8M
void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, std::vector<char>* result) {
78
38.8M
  result->clear();
79
38.8M
  result->reserve(end - begin);
80
38.8M
  for (const auto& io_vec : io_vecs) {
81
38.8M
    if (begin == end) {
82
0
      break;
83
0
    }
84
38.9M
    if (io_vec.iov_len > begin) {
85
38.9M
      size_t clen = std::min(io_vec.iov_len, end) - begin;
86
38.9M
      auto start = IoVecBegin(io_vec) + begin;
87
38.9M
      result->insert(result->end(), start, start + clen);
88
38.9M
      begin += clen;
89
38.9M
    }
90
38.8M
    begin -= io_vec.iov_len;
91
38.8M
    end -= io_vec.iov_len;
92
38.8M
  }
93
38.8M
}
94
95
37.7M
void IoVecsToBuffer(const IoVecs& io_vecs, size_t begin, size_t end, char* result) {
96
37.7M
  for (const auto& io_vec : io_vecs) {
97
37.7M
    if (begin == end) {
98
0
      break;
99
0
    }
100
37.7M
    if (io_vec.iov_len > begin) {
101
37.7M
      size_t clen = std::min(io_vec.iov_len, end) - begin;
102
37.7M
      auto start = IoVecBegin(io_vec) + begin;
103
37.7M
      memcpy(result, start, clen);
104
37.7M
      result += clen;
105
37.7M
      begin += clen;
106
37.7M
    }
107
37.7M
    begin -= io_vec.iov_len;
108
37.7M
    end -= io_vec.iov_len;
109
37.7M
  }
110
37.7M
}
111
112
Socket::Socket()
113
1.24M
  : fd_(-1) {
114
1.24M
}
115
116
Socket::Socket(int fd)
117
0
  : fd_(fd) {
118
0
}
119
120
1.24M
void Socket::Reset(int fd) {
121
1.24M
  WARN_NOT_OK(Close(), "Close failed");
122
1.24M
  fd_ = fd;
123
1.24M
}
124
125
2.23M
int Socket::Release() {
126
2.23M
  int fd = fd_;
127
2.23M
  fd_ = -1;
128
2.23M
  return fd;
129
2.23M
}
130
131
3.14M
Socket::~Socket() {
132
3.14M
  auto status = Close();
133
3.14M
  if (!status.ok()) {
134
0
    LOG(WARNING) << "Failed to close socket: " << status.ToString();
135
0
  }
136
3.14M
}
137
138
5.60M
Status Socket::Close() {
139
5.60M
  if (fd_ < 0)
140
4.95M
    return Status::OK();
141
641k
  int fd = fd_;
142
641k
  fd_ = -1;
143
641k
  if (::close(fd) < 0) {
144
0
    return STATUS(NetworkError, "Close error", Errno(errno));
145
0
  }
146
641k
  return Status::OK();
147
641k
}
148
149
10.0k
Status Socket::Shutdown(bool shut_read, bool shut_write) {
150
10.0k
  DCHECK_GE(fd_, 0);
151
10.0k
  int flags = 0;
152
10.0k
  if (shut_read && shut_write) {
153
10.0k
    flags |= SHUT_RDWR;
154
18
  } else if (shut_read) {
155
0
    flags |= SHUT_RD;
156
18
  } else if (shut_write) {
157
0
    flags |= SHUT_WR;
158
0
  }
159
10.0k
  if (::shutdown(fd_, flags) < 0) {
160
8.91k
    return STATUS(NetworkError, "Shutdown error", Errno(errno));
161
8.91k
  }
162
1.17k
  return Status::OK();
163
1.17k
}
164
165
1.54M
int Socket::GetFd() const {
166
1.54M
  return fd_;
167
1.54M
}
168
169
43.5M
bool IsTemporarySocketError(int err) {
170
43.5M
  return err == EAGAIN || err == EWOULDBLOCK || err == EINTR || err == EINPROGRESS;
171
43.5M
}
172
173
#if defined(__linux__)
174
175
Status Socket::Init(int flags) {
176
  auto family = flags & FLAG_IPV6 ? AF_INET6 : AF_INET;
177
  int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
178
  Reset(::socket(family, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
179
  if (fd_ < 0) {
180
    return STATUS(NetworkError, "Error opening socket", Errno(errno));
181
  }
182
183
  return Status::OK();
184
}
185
186
#else
187
188
664k
Status Socket::Init(int flags) {
189
664k
  Reset(::socket(flags & FLAG_IPV6 ? AF_INET6 : AF_INET, SOCK_STREAM, 0));
190
664k
  if (fd_ < 0) {
191
3.08k
    return STATUS(NetworkError, "Error opening socket", Errno(errno));
192
3.08k
  }
193
661k
  RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
194
661k
  RETURN_NOT_OK(SetCloseOnExec());
195
196
  // Disable SIGPIPE.
197
661k
  int set = 1;
198
661k
  if (setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof(set)) == -1) {
199
0
    return STATUS(NetworkError, "Failed to set SO_NOSIGPIPE", Errno(errno));
200
0
  }
201
202
661k
  return Status::OK();
203
661k
}
204
205
#endif // defined(__linux__)
206
207
1.88M
Status Socket::SetNoDelay(bool enabled) {
208
18.4E
  int flag = enabled ? 1 : 0;
209
1.88M
  if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) == -1) {
210
0
    return STATUS(NetworkError, "Failed to set TCP_NODELAY", Errno(errno));
211
0
  }
212
1.88M
  return Status::OK();
213
1.88M
}
214
215
993k
Status Socket::SetNonBlocking(bool enabled) {
216
993k
  int curflags = ::fcntl(fd_, F_GETFL, 0);
217
993k
  if (curflags == -1) {
218
0
    return STATUS(
219
0
        NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_),
220
0
        Errno(errno));
221
0
  }
222
993k
  int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & ~O_NONBLOCK);
223
993k
  if (::fcntl(fd_, F_SETFL, newflags) == -1) {
224
0
    if (enabled) {
225
0
      return STATUS(
226
0
          NetworkError, StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_), Errno(errno));
227
0
    } else {
228
0
      return STATUS(
229
0
          NetworkError, StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_), Errno(errno));
230
0
    }
231
993k
  }
232
993k
  return Status::OK();
233
993k
}
234
235
0
Status Socket::IsNonBlocking(bool* is_nonblock) const {
236
0
  int curflags = ::fcntl(fd_, F_GETFL, 0);
237
0
  if (curflags == -1) {
238
0
    return STATUS(
239
0
        NetworkError, StringPrintf("Failed to get file status flags on fd %d", fd_), Errno(errno));
240
0
  }
241
0
  *is_nonblock = ((curflags & O_NONBLOCK) != 0);
242
0
  return Status::OK();
243
0
}
244
245
975k
Status Socket::SetCloseOnExec() {
246
975k
  int curflags = fcntl(fd_, F_GETFD, 0);
247
975k
  if (curflags == -1) {
248
0
    Reset(-1);
249
0
    return STATUS(NetworkError, "fcntl(F_GETFD) error", Errno(errno));
250
0
  }
251
975k
  if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
252
0
    Reset(-1);
253
0
    return STATUS(NetworkError, "fcntl(F_SETFD) error", Errno(errno));
254
0
  }
255
975k
  return Status::OK();
256
975k
}
257
258
942k
Status Socket::SetSendTimeout(const MonoDelta& timeout) {
259
942k
  return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
260
942k
}
261
262
945k
Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
263
945k
  return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
264
945k
}
265
266
128k
Status Socket::SetReuseAddr(bool flag) {
267
127k
  int int_flag = flag ? 1 : 0;
268
128k
  if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &int_flag, sizeof(int_flag)) == -1) {
269
0
    return STATUS(NetworkError, "Failed to set SO_REUSEADDR", Errno(errno));
270
0
  }
271
128k
  return Status::OK();
272
128k
}
273
274
Status Socket::BindAndListen(const Endpoint& sockaddr,
275
4
                             int listenQueueSize) {
276
4
  RETURN_NOT_OK(SetReuseAddr(true));
277
4
  RETURN_NOT_OK(Bind(sockaddr));
278
4
  RETURN_NOT_OK(Listen(listenQueueSize));
279
4
  return Status::OK();
280
4
}
281
282
17.2k
Status Socket::Listen(int listen_queue_size) {
283
17.2k
  if (listen(fd_, listen_queue_size)) {
284
0
    return STATUS(NetworkError, "listen() error", Errno(errno));
285
0
  }
286
17.2k
  return Status::OK();
287
17.2k
}
288
289
namespace {
290
291
enum class EndpointType {
292
  REMOTE,
293
  LOCAL,
294
};
295
296
978k
Status GetEndpoint(EndpointType type, int fd, Endpoint* out) {
297
978k
  Endpoint temp;
298
978k
  DCHECK_GE(fd, 0);
299
978k
  socklen_t len = narrow_cast<socklen_t>(temp.capacity());
300
964k
  auto result = type == EndpointType::LOCAL ? getsockname(fd, temp.data(), &len)
301
13.5k
                                            : getpeername(fd, temp.data(), &len);
302
978k
  if (result == -1) {
303
0
    const std::string prefix = type == EndpointType::LOCAL ? "getsockname" : "getpeername";
304
0
    return STATUS(NetworkError, prefix + " error", Errno(errno));
305
0
  }
306
978k
  temp.resize(len);
307
978k
  *out = temp;
308
978k
  return Status::OK();
309
978k
}
310
311
} // namespace
312
313
978k
Status Socket::GetSocketAddress(Endpoint* out) const {
314
978k
  return GetEndpoint(EndpointType::LOCAL, fd_, out);
315
978k
}
316
317
0
Status Socket::GetPeerAddress(Endpoint* out) const {
318
0
  return GetEndpoint(EndpointType::REMOTE, fd_, out);
319
0
}
320
321
610k
Status Socket::Bind(const Endpoint& endpoint, bool explain_addr_in_use) {
322
610k
  DCHECK_GE(fd_, 0);
323
610k
  if (PREDICT_FALSE(::bind(fd_, endpoint.data(), narrow_cast<socklen_t>(endpoint.size())) != 0)) {
324
522
    Errno err(errno);
325
522
    Status s = STATUS(NetworkError, Format("Error binding socket to $0", endpoint), err);
326
327
522
    if (err == EADDRINUSE && explain_addr_in_use && endpoint.port() != 0) {
328
0
      TryRunLsof(endpoint);
329
0
    }
330
522
    return s;
331
522
  }
332
333
609k
  return Status::OK();
334
609k
}
335
336
579k
Status CheckAcceptError(Socket *new_conn) {
337
579k
  if (new_conn->GetFd() < 0) {
338
264k
    if (IsTemporarySocketError(errno)) {
339
264k
      static const Status try_accept_again = STATUS(TryAgain, "Accept not yet ready");
340
264k
      return try_accept_again;
341
264k
    }
342
158
    return STATUS(NetworkError, "Accept failed", Errno(errno));
343
158
  }
344
345
314k
  return Status::OK();
346
314k
}
347
348
579k
Status Socket::Accept(Socket *new_conn, Endpoint* remote, int flags) {
349
579k
  TRACE_EVENT0("net", "Socket::Accept");
350
579k
  Endpoint temp;
351
579k
  socklen_t olen = narrow_cast<socklen_t>(temp.capacity());
352
579k
  DCHECK_GE(fd_, 0);
353
#if defined(__linux__)
354
  int accept_flags = SOCK_CLOEXEC;
355
  if (flags & FLAG_NONBLOCKING) {
356
    accept_flags |= SOCK_NONBLOCK;
357
  }
358
  new_conn->Reset(::accept4(fd_, temp.data(), &olen, accept_flags));
359
  RETURN_NOT_OK(CheckAcceptError(new_conn));
360
#else
361
579k
  new_conn->Reset(::accept(fd_, temp.data(), &olen));
362
579k
  RETURN_NOT_OK(CheckAcceptError(new_conn));
363
314k
  RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
364
314k
  RETURN_NOT_OK(new_conn->SetCloseOnExec());
365
314k
#endif // defined(__linux__)
366
314k
  temp.resize(olen);
367
368
314k
  *remote = temp;
369
314k
  TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
370
314k
                       "remote", ToString(*remote));
371
314k
  return Status::OK();
372
314k
}
373
374
467k
Status Socket::BindForOutgoingConnection() {
375
467k
  boost::system::error_code ec;
376
467k
  auto bind_address = IpAddress::from_string(FLAGS_local_ip_for_outbound_sockets, ec);
377
5.65k
  CHECK(!ec)
378
5.65k
    << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
379
5.65k
    << FLAGS_local_ip_for_outbound_sockets << "': " << ec;
380
381
467k
  RETURN_NOT_OK(Bind(Endpoint(bind_address, 0)));
382
467k
  return Status::OK();
383
467k
}
384
385
631k
Status Socket::Connect(const Endpoint& remote) {
386
631k
  TRACE_EVENT1("net", "Socket::Connect", "remote", ToString(remote));
387
388
631k
  if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
389
465k
    RETURN_NOT_OK(BindForOutgoingConnection());
390
465k
  }
391
392
631k
  DCHECK_GE(fd_, 0);
393
631k
  if (::connect(fd_, remote.data(), narrow_cast<socklen_t>(remote.size())) < 0) {
394
629k
    if (IsTemporarySocketError(errno)) {
395
624k
      static const Status try_connect_again = STATUS(TryAgain, "Connect not yet ready");
396
624k
      return try_connect_again;
397
624k
    }
398
5.15k
    return STATUS(NetworkError, "connect(2) error", Errno(errno));
399
5.15k
  }
400
1.46k
  return Status::OK();
401
1.46k
}
402
403
0
Status Socket::GetSockError() const {
404
0
  int val = 0, ret;
405
0
  socklen_t val_len = sizeof(val);
406
0
  DCHECK_GE(fd_, 0);
407
0
  ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
408
0
  if (ret) {
409
0
    return STATUS(NetworkError, "getsockopt(SO_ERROR) failed", Errno(errno));
410
0
  }
411
0
  if (val != 0) {
412
0
    return STATUS(NetworkError, Errno(val));
413
0
  }
414
0
  return Status::OK();
415
0
}
416
417
0
Result<size_t> Socket::Write(const uint8_t *buf, ssize_t amt) {
418
0
  if (amt <= 0) {
419
0
    return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid send of $0 bytes", amt);
420
0
  }
421
0
  DCHECK_GE(fd_, 0);
422
0
  auto res = ::send(fd_, buf, amt, MSG_NOSIGNAL);
423
0
  if (res < 0) {
424
0
    return STATUS(NetworkError, "Write error", Errno(errno));
425
0
  }
426
0
  return res;
427
0
}
428
429
43.0M
Result<size_t> Socket::Writev(const struct ::iovec *iov, int iov_len) {
430
43.0M
  if (PREDICT_FALSE(iov_len <= 0)) {
431
0
    return STATUS(NetworkError,
432
0
                  StringPrintf("Writev: invalid io vector length of %d", iov_len),
433
0
                  Slice() /* msg2 */, Errno(EINVAL));
434
0
  }
435
43.0M
  DCHECK_GE(fd_, 0);
436
437
43.0M
  struct msghdr msg;
438
43.0M
  memset(&msg, 0, sizeof(struct msghdr));
439
43.0M
  msg.msg_iov = const_cast<iovec *>(iov);
440
43.0M
  msg.msg_iovlen = iov_len;
441
43.0M
  auto res = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
442
43.0M
  if (PREDICT_FALSE(res < 0)) {
443
15.4k
    if (IsTemporarySocketError(errno)) {
444
15.4k
      static const Status try_write_again = STATUS(TryAgain, "Write not yet ready");
445
15.4k
      return try_write_again;
446
15.4k
    }
447
36
    return STATUS(NetworkError, "sendmsg error", Errno(errno));
448
36
  }
449
450
43.0M
  return res;
451
43.0M
}
452
453
// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
454
0
Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, const MonoTime& deadline) {
455
0
  DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported";
456
457
0
  const uint8_t* bufend = buf + buflen;
458
0
  while (buf < bufend) {
459
0
    auto num_to_write = bufend - buf;
460
0
    MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now());
461
0
    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
462
0
      return STATUS(TimedOut, "BlockingWrite timed out");
463
0
    }
464
0
    RETURN_NOT_OK(SetSendTimeout(timeout));
465
0
    auto inc_num_written = Write(buf, num_to_write);
466
467
0
    if (PREDICT_FALSE(!inc_num_written.ok())) {
468
0
      Errno err(inc_num_written.status());
469
      // Continue silently when the syscall is interrupted.
470
0
      if (err == EINTR) {
471
0
        continue;
472
0
      }
473
0
      if (err == EAGAIN) {
474
0
        return STATUS(TimedOut, "");
475
0
      }
476
0
      return inc_num_written.status().CloneAndPrepend("BlockingWrite error");
477
0
    }
478
0
    if (PREDICT_FALSE(*inc_num_written == 0)) {
479
      // Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
480
0
      return STATUS_FORMAT(
481
0
          IOError, "Wrote zero bytes on a BlockingWrite() call. Transferred $0 of $1 bytes.",
482
0
          buflen - num_to_write, buflen);
483
0
    }
484
0
    buf += *inc_num_written;
485
0
  }
486
487
0
  return Status::OK();
488
0
}
489
490
7.48k
Result<size_t> Socket::Recv(uint8_t* buf, ssize_t amt) {
491
7.48k
  if (amt <= 0) {
492
0
    return STATUS_EC_FORMAT(NetworkError, Errno(EINVAL), "Invalid recv of $0 bytes", amt);
493
0
  }
494
495
  // The recv() call can return fewer than the requested number of bytes.
496
  // Especially when 'amt' is small, this is very unlikely to happen in
497
  // the context of unit tests. So, we provide an injection hook which
498
  // simulates the same behavior.
499
7.48k
  if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
500
0
    amt = RandomUniformInt<ssize_t>(1, amt);
501
0
  }
502
503
7.48k
  DCHECK_GE(fd_, 0);
504
7.48k
  auto res = ::recv(fd_, buf, amt, 0);
505
7.48k
  if (res <= 0) {
506
1.76k
    if (res == 0) {
507
0
      return STATUS(NetworkError, "Recv() got EOF from remote", Slice(), Errno(ESHUTDOWN));
508
0
    }
509
1.76k
    if (IsTemporarySocketError(errno)) {
510
1.76k
      static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready");
511
1.76k
      return try_recv_again;
512
1.76k
    }
513
0
    return STATUS(NetworkError, "Recv error", Errno(errno));
514
0
  }
515
516
5.71k
  return res;
517
5.71k
}
518
519
74.1M
Result<size_t> Socket::Recvv(IoVecs* vecs) {
520
74.1M
  if (PREDICT_FALSE(vecs->empty())) {
521
0
    return STATUS(NetworkError, "Recvv: receive to empty vecs");
522
0
  }
523
74.1M
  if (fd_ < 0) {
524
0
    return STATUS(NetworkError, "Recvv on closed socket");
525
0
  }
526
527
74.1M
  struct msghdr msg;
528
74.1M
  memset(&msg, 0, sizeof(struct msghdr));
529
74.1M
  msg.msg_iov = vecs->data();
530
74.1M
  msg.msg_iovlen = narrow_cast<int>(vecs->size());
531
74.1M
  auto res = recvmsg(fd_, &msg, MSG_NOSIGNAL);
532
74.1M
  if (PREDICT_FALSE(res <= 0)) {
533
37.1M
    if (res == 0) {
534
222k
      return STATUS(NetworkError, "recvmsg got EOF from remote", Slice(), Errno(ESHUTDOWN));
535
222k
    }
536
36.9M
    if (IsTemporarySocketError(errno)) {
537
36.7M
      static const Status try_recv_again = STATUS(TryAgain, "Recv not yet ready");
538
36.7M
      return try_recv_again;
539
36.7M
    }
540
244k
    return STATUS(NetworkError, "recvmsg error", Errno(errno));
541
244k
  }
542
543
36.9M
  return res;
544
36.9M
}
545
546
// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
547
// One place where we deviate: we consider EOF a failure if < amt bytes are read.
548
1
Result<size_t> Socket::BlockingRecv(uint8_t *buf, size_t amt, const MonoTime& deadline) {
549
0
  DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported";
550
1
  size_t tot_read = 0;
551
552
  // We populate this with the full (initial) duration of the timeout on the first iteration of the
553
  // loop below.
554
1
  MonoDelta full_timeout;
555
556
3
  while (tot_read < amt) {
557
    // Read at most the max value of int32_t bytes at a time.
558
3
    const auto num_to_read = std::min<size_t>(amt - tot_read, std::numeric_limits<int32_t>::max());
559
3
    const MonoDelta timeout = deadline.GetDeltaSince(MonoTime::Now());
560
3
    if (!full_timeout.Initialized()) {
561
1
      full_timeout = timeout;
562
1
    }
563
3
    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
564
0
      VLOG(4) << __func__ << " timed out in " << full_timeout.ToString();
565
1
      return STATUS(TimedOut, "");
566
1
    }
567
2
    RETURN_NOT_OK(SetRecvTimeout(timeout));
568
2
    auto recv_res = Recv(buf, num_to_read);
569
2
    if (PREDICT_TRUE(recv_res.ok())) {
570
1
      auto inc_num_read = *recv_res;
571
1
      if (PREDICT_FALSE(inc_num_read == 0)) {
572
        // EOF.
573
0
        break;
574
1
      } else {
575
1
        tot_read += inc_num_read;
576
1
        buf += inc_num_read;
577
1
      }
578
1
    } else {
579
      // Continue silently when the syscall is interrupted.
580
      //
581
      // We used to treat EAGAIN as a timeout, and the reason for that is not entirely clear
582
      // to me (mbautin). http://man7.org/linux/man-pages/man2/recv.2.html says that EAGAIN and
583
      // EWOULDBLOCK could be used interchangeably, and these could happen on a nonblocking socket
584
      // that no data is available on. I think we should just retry in that case.
585
1
      if (recv_res.status().IsTryAgain()) {
586
1
        continue;
587
1
      }
588
0
      return recv_res.status().CloneAndPrepend("BlockingRecv error");
589
0
    }
590
2
  }
591
592
0
  if (PREDICT_FALSE(tot_read < amt)) {
593
0
    return STATUS(IOError, "Read zero bytes on a blocking Recv() call",
594
0
        StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
595
0
  }
596
597
0
  return tot_read;
598
0
}
599
600
1.88M
Status Socket::SetTimeout(int opt, std::string optname, const MonoDelta& timeout) {
601
1.88M
  if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
602
0
    return STATUS(InvalidArgument, "Timeout specified as negative to SetTimeout",
603
0
                                   timeout.ToString());
604
0
  }
605
1.88M
  struct timeval tv;
606
1.88M
  timeout.ToTimeVal(&tv);
607
1.88M
  socklen_t optlen = sizeof(tv);
608
1.88M
  if (::setsockopt(fd_, SOL_SOCKET, opt, &tv, optlen) == -1) {
609
0
    return STATUS(
610
0
        NetworkError,
611
0
        StringPrintf("Failed to set %s to %s", optname.c_str(), timeout.ToString().c_str()),
612
0
        Errno(errno));
613
0
  }
614
1.88M
  return Status::OK();
615
1.88M
}
616
617
935k
Result<int32_t> Socket::GetReceiveBufferSize() {
618
935k
  int32_t val = 0;
619
935k
  socklen_t val_len = sizeof(val);
620
935k
  DCHECK_GE(fd_, 0);
621
935k
  if (getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, &val_len)) {
622
0
    return STATUS(NetworkError, "Failed to get socket receive buffer", Errno(errno));
623
0
  }
624
935k
  return val;
625
935k
}
626
627
0
Status Socket::SetReceiveBufferSize(int32_t size) {
628
0
  int32_t val = size / 2; // Kernel will double this value
629
0
  DCHECK_GE(fd_, 0);
630
0
  if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val))) {
631
0
    return STATUS(
632
0
        NetworkError, "Failed to set socket receive buffer", Errno(errno));
633
0
  }
634
0
  return Status::OK();
635
0
}
636
637
} // namespace yb