YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/tcp_stream.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/tcp_stream.h"
15
16
#include "yb/rpc/outbound_data.h"
17
#include "yb/rpc/rpc_introspection.pb.h"
18
#include "yb/rpc/rpc_util.h"
19
20
#include "yb/util/errno.h"
21
#include "yb/util/flag_tags.h"
22
#include "yb/util/logging.h"
23
#include "yb/util/memory/memory_usage.h"
24
#include "yb/util/metrics.h"
25
#include "yb/util/result.h"
26
#include "yb/util/status_log.h"
27
#include "yb/util/string_util.h"
28
29
using namespace std::literals;
30
31
DECLARE_uint64(rpc_connection_timeout_ms);
32
DEFINE_test_flag(int32, delay_connect_ms, 0,
33
                 "Delay connect in tests for specified amount of milliseconds.");
34
35
METRIC_DEFINE_simple_counter(
36
  server, tcp_bytes_sent, "Bytes sent over TCP connections", yb::MetricUnit::kBytes);
37
38
METRIC_DEFINE_simple_counter(
39
  server, tcp_bytes_received, "Bytes received via TCP connections", yb::MetricUnit::kBytes);
40
41
namespace yb {
42
namespace rpc {
43
44
namespace {
45
46
const size_t kMaxIov = 16;
47
48
}
49
50
TcpStream::TcpStream(const StreamCreateData& data)
51
    : socket_(std::move(*data.socket)),
52
938k
      remote_(data.remote) {
53
938k
  if (data.mem_tracker) {
54
931k
    mem_tracker_ = MemTracker::FindOrCreateTracker("Sending", data.mem_tracker);
55
931k
  }
56
938k
  if (data.metric_entity) {
57
884k
    bytes_received_counter_ = METRIC_tcp_bytes_received.Instantiate(data.metric_entity);
58
884k
    bytes_sent_counter_ = METRIC_tcp_bytes_sent.Instantiate(data.metric_entity);
59
884k
  }
60
938k
}
61
62
621k
TcpStream::~TcpStream() {
63
  // Must clear the outbound_transfers_ list before deleting.
64
914
  CHECK(sending_.empty()) << ToString();
65
66
  // It's crucial that the stream is Shutdown first -- otherwise
67
  // our destructor will end up calling io_.stop()
68
  // from a possibly non-reactor thread context. This can then make all
69
  // hell break loose with libev.
70
583
  CHECK(!is_epoll_registered_) << ToString();
71
621k
}
72
73
944k
Status TcpStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) {
74
944k
  context_ = context;
75
944k
  connected_ = !connect;
76
77
944k
  RETURN_NOT_OK(socket_.SetNoDelay(true));
78
  // These timeouts don't affect non-blocking sockets:
79
944k
  RETURN_NOT_OK(socket_.SetSendTimeout(FLAGS_rpc_connection_timeout_ms * 1ms));
80
944k
  RETURN_NOT_OK(socket_.SetRecvTimeout(FLAGS_rpc_connection_timeout_ms * 1ms));
81
82
944k
  if (connect && FLAGS_TEST_delay_connect_ms) {
83
10
    connect_delayer_.set(*loop);
84
10
    connect_delayer_.set<TcpStream, &TcpStream::DelayConnectHandler>(this);
85
10
    connect_delayer_.start(
86
10
        static_cast<double>(FLAGS_TEST_delay_connect_ms) / MonoTime::kMillisecondsPerSecond, 0);
87
10
    return Status::OK();
88
10
  }
89
90
944k
  return DoStart(loop, connect);
91
944k
}
92
93
946k
Status TcpStream::DoStart(ev::loop_ref* loop, bool connect) {
94
946k
  if (connect) {
95
631k
    auto status = socket_.Connect(remote_);
96
631k
    if (!status.ok() && !status.IsTryAgain()) {
97
237
      LOG_WITH_PREFIX(WARNING) << "Connect failed: " << status;
98
237
      return status;
99
237
    }
100
946k
  }
101
102
946k
  RETURN_NOT_OK(socket_.GetSocketAddress(&local_));
103
946k
  log_prefix_.clear();
104
105
946k
  io_.set(*loop);
106
946k
  io_.set<TcpStream, &TcpStream::Handler>(this);
107
616k
  int events = ev::READ | (!connected_ ? ev::WRITE : 0);
108
946k
  io_.start(socket_.GetFd(), events);
109
110
10.3k
  DVLOG_WITH_PREFIX(3) << "Starting, listen events: " << events << ", fd: " << socket_.GetFd();
111
112
946k
  is_epoll_registered_ = true;
113
114
946k
  if (connected_) {
115
314k
    context_->Connected();
116
314k
  }
117
118
946k
  return Status::OK();
119
946k
}
120
121
10
void TcpStream::DelayConnectHandler(ev::timer& watcher, int revents) { // NOLINT
122
10
  if (EV_ERROR & revents) {
123
0
    LOG_WITH_PREFIX(WARNING) << "Got an error in handle delay connect";
124
0
    return;
125
0
  }
126
127
10
  auto status = DoStart(&watcher.loop, true /* connect */);
128
10
  if (!status.ok()) {
129
0
    Shutdown(status);
130
0
  }
131
10
}
132
133
10.0k
void TcpStream::Close() {
134
10.0k
  if (socket_.GetFd() >= 0) {
135
10.0k
    auto status = socket_.Shutdown(true, true);
136
8.90k
    LOG_IF(INFO, !status.ok()) << "Failed to shutdown socket: " << status;
137
10.0k
  }
138
10.0k
}
139
140
1.21M
void TcpStream::Shutdown(const Status& status) {
141
1.21M
  ClearSending(status);
142
143
1.21M
  if (!ReadBuffer().Empty()) {
144
124
    LOG_WITH_PREFIX(WARNING) << "Shutting down with pending inbound data ("
145
124
                             << ReadBuffer().ToString() << ", status = " << status << ")";
146
124
  }
147
148
1.21M
  io_.stop();
149
1.21M
  is_epoll_registered_ = false;
150
151
1.21M
  ReadBuffer().Reset();
152
153
1.21M
  WARN_NOT_OK(socket_.Close(), "Error closing socket");
154
1.21M
}
155
156
44.0M
Status TcpStream::TryWrite() {
157
44.0M
  auto result = DoWrite();
158
44.0M
  if (result.ok()) {
159
44.0M
    UpdateEvents();
160
44.0M
  }
161
44.0M
  return result;
162
44.0M
}
163
164
43.0M
TcpStream::FillIovResult TcpStream::FillIov(iovec* out) {
165
43.0M
  int index = 0;
166
43.0M
  size_t offset = send_position_;
167
43.0M
  bool only_heartbeats = true;
168
44.5M
  for (auto& data : sending_) {
169
44.5M
    const auto wrapped_data = data.data;
170
44.5M
    if (wrapped_data && !wrapped_data->IsHeartbeat()) {
171
44.0M
      only_heartbeats = false;
172
44.0M
    }
173
44.5M
    if (data.skipped || (offset == 0 && wrapped_data && wrapped_data->IsFinished())) {
174
4.66k
      queued_bytes_to_send_ -= data.bytes_size();
175
4.66k
      data.ClearBytes();
176
4.66k
      data.skipped = true;
177
4.66k
      continue;
178
4.66k
    }
179
46.4M
    for (const auto& bytes : data.bytes) {
180
46.4M
      if (offset >= bytes.size()) {
181
27.6k
        offset -= bytes.size();
182
27.6k
        continue;
183
27.6k
      }
184
185
46.3M
      out[index].iov_base = bytes.data() + offset;
186
46.3M
      out[index].iov_len = bytes.size() - offset;
187
46.3M
      offset = 0;
188
46.3M
      if (++index == kMaxIov) {
189
5.40k
        return FillIovResult{index, only_heartbeats};
190
5.40k
      }
191
46.3M
    }
192
44.5M
  }
193
194
43.0M
  return FillIovResult{index, only_heartbeats};
195
43.0M
}
196
197
44.3M
Status TcpStream::DoWrite() {
198
37.1k
  DVLOG_WITH_PREFIX(5) << "sending_.size(): " << sending_.size();
199
44.3M
  if (!connected_ || waiting_write_ready_ || !is_epoll_registered_) {
200
3.39k
    DVLOG_WITH_PREFIX(5)
201
3.39k
        << "connected_: " << connected_
202
3.39k
        << " waiting_write_ready_: " << waiting_write_ready_
203
3.39k
        << " is_epoll_registered_: " << is_epoll_registered_;
204
1.31M
    return Status::OK();
205
1.31M
  }
206
207
  // If we weren't waiting write to be ready, we could try to write data to socket.
208
86.0M
  while (!sending_.empty()) {
209
43.0M
    iovec iov[kMaxIov];
210
43.0M
    auto fill_result = FillIov(iov);
211
212
43.0M
    if (!fill_result.only_heartbeats) {
213
42.6M
      context_->UpdateLastActivity();
214
42.6M
    }
215
216
43.0M
    auto result = fill_result.len != 0
217
43.0M
        ? socket_.Writev(iov, fill_result.len)
218
18.4E
        : 0;
219
18.4E
    DVLOG_WITH_PREFIX(4) << "Queued writes " << queued_bytes_to_send_ << " bytes. Result "
220
18.4E
                         << result << ", sending_.size(): " << sending_.size();
221
222
43.0M
    if (PREDICT_FALSE(!result.ok())) {
223
15.4k
      if (!result.status().IsTryAgain()) {
224
11
        YB_LOG_WITH_PREFIX_EVERY_N(WARNING, 50) << "Send failed: " << result.status();
225
37
        return result.status();
226
15.4k
      } else {
227
0
        VLOG_WITH_PREFIX(3) << "Send temporary failed: " << result.status();
228
15.4k
        return Status::OK();
229
15.4k
      }
230
43.0M
    }
231
232
43.0M
    context_->UpdateLastWrite();
233
234
43.0M
    IncrementCounterBy(bytes_sent_counter_, *result);
235
236
43.0M
    send_position_ += *result;
237
87.5M
    while (!sending_.empty()) {
238
44.5M
      auto& front = sending_.front();
239
44.5M
      size_t full_size = front.bytes_size();
240
44.5M
      if (front.skipped) {
241
4.65k
        PopSending();
242
4.65k
        continue;
243
4.65k
      }
244
44.5M
      if (send_position_ < full_size) {
245
21.4k
        break;
246
21.4k
      }
247
44.5M
      auto data = front.data;
248
44.5M
      send_position_ -= full_size;
249
44.5M
      PopSending();
250
44.5M
      if (data) {
251
44.5M
        context_->Transferred(data, Status::OK());
252
44.5M
      }
253
44.5M
    }
254
43.0M
  }
255
256
43.0M
  return Status::OK();
257
43.0M
}
258
259
44.5M
void TcpStream::PopSending() {
260
44.5M
  queued_bytes_to_send_ -= sending_.front().bytes_size();
261
44.5M
  sending_.pop_front();
262
44.5M
  ++data_blocks_sent_;
263
44.5M
}
264
265
43.2M
void TcpStream::Handler(ev::io& watcher, int revents) {  // NOLINT
266
6.24k
  DVLOG_WITH_PREFIX(4) << "Handler(revents=" << revents << ")";
267
43.2M
  Status status = Status::OK();
268
43.2M
  if (revents & ev::ERROR) {
269
0
    status = STATUS(NetworkError, ToString() + ": Handler encountered an error");
270
0
    VLOG_WITH_PREFIX(3) << status;
271
0
  }
272
273
43.2M
  if (status.ok() && (revents & ev::READ)) {
274
42.9M
    status = ReadHandler();
275
42.9M
    if (!status.ok()) {
276
24
      VLOG_WITH_PREFIX(3) << "ReadHandler() returned error: " << status;
277
593k
    }
278
42.9M
  }
279
280
43.2M
  if (status.ok() && (revents & ev::WRITE)) {
281
316k
    bool just_connected = !connected_;
282
316k
    if (just_connected) {
283
298k
      connected_ = true;
284
298k
      context_->Connected();
285
298k
    }
286
316k
    status = WriteHandler(just_connected);
287
316k
    if (!status.ok()) {
288
0
      VLOG_WITH_PREFIX(3) << "WriteHandler() returned error: " << status;
289
9
    }
290
316k
  }
291
292
43.2M
  if (status.ok()) {
293
42.6M
    UpdateEvents();
294
623k
  } else {
295
623k
    context_->Destroy(status);
296
623k
  }
297
43.2M
}
298
299
86.6M
void TcpStream::UpdateEvents() {
300
86.6M
  int events = 0;
301
86.6M
  if (!read_buffer_full_) {
302
86.6M
    events |= ev::READ;
303
86.6M
  }
304
86.6M
  waiting_write_ready_ = !sending_.empty() || !connected_;
305
86.6M
  if (waiting_write_ready_) {
306
1.32M
    events |= ev::WRITE;
307
1.32M
  }
308
86.6M
  if (events) {
309
86.6M
    io_.set(events);
310
86.6M
  }
311
86.6M
}
312
313
42.9M
Status TcpStream::ReadHandler() {
314
42.9M
  context_->UpdateLastRead();
315
316
85.3M
  for (;;) {
317
85.3M
    auto received = Receive();
318
85.3M
    if (PREDICT_FALSE(!received.ok())) {
319
590k
      if (Errno(received.status()) == ESHUTDOWN) {
320
18.4E
        VLOG_WITH_PREFIX(1) << "Shut down by remote end.";
321
334k
      } else {
322
8.84k
        YB_LOG_WITH_PREFIX_EVERY_N(INFO, 50) << " Recv failed: " << received;
323
334k
      }
324
590k
      return received.status();
325
590k
    }
326
    // Exit the loop if we did not receive anything.
327
84.8M
    if (!received.get()) {
328
42.3M
      return Status::OK();
329
42.3M
    }
330
    // If we were not able to process next call exit loop.
331
    // If status is ok, it means that we just do not have enough data to process yet.
332
42.5M
    auto continue_receiving = TryProcessReceived();
333
42.5M
    if (!continue_receiving.ok()) {
334
250
      return continue_receiving.status();
335
250
    }
336
42.5M
    if (!continue_receiving.get()) {
337
81.9k
      return Status::OK();
338
81.9k
    }
339
42.5M
  }
340
42.9M
}
341
342
85.3M
Result<bool> TcpStream::Receive() {
343
85.3M
  auto iov = ReadBuffer().PrepareAppend();
344
85.3M
  if (!iov.ok()) {
345
0
    VLOG_WITH_PREFIX(3) << "ReadBuffer().PrepareAppend() error: " << iov.status();
346
0
    if (iov.status().IsBusy()) {
347
0
      read_buffer_full_ = true;
348
0
      return false;
349
0
    }
350
0
    return iov.status();
351
0
  }
352
85.3M
  read_buffer_full_ = false;
353
354
85.3M
  if (inbound_bytes_to_skip_ > 0) {
355
1.83k
    auto global_skip_buffer = GetGlobalSkipBuffer();
356
7.47k
    do {
357
0
      VLOG_WITH_PREFIX(3) << "inbound_bytes_to_skip_: " << inbound_bytes_to_skip_;
358
7.47k
      auto nread = socket_.Recv(
359
7.47k
          global_skip_buffer.mutable_data(),
360
7.47k
          std::min(global_skip_buffer.size(), inbound_bytes_to_skip_));
361
7.47k
      if (!nread.ok()) {
362
0
        VLOG_WITH_PREFIX(3) << "socket_.Recv() error: " << nread.status();
363
1.76k
        if (nread.status().IsTryAgain()) {
364
1.76k
          return false;
365
1.76k
        }
366
0
        return nread.status();
367
0
      }
368
5.71k
      IncrementCounterBy(bytes_received_counter_, *nread);
369
5.71k
      inbound_bytes_to_skip_ -= *nread;
370
5.71k
    } while (inbound_bytes_to_skip_ > 0);
371
1.83k
  }
372
373
85.3M
  auto nread = socket_.Recvv(iov.get_ptr());
374
85.3M
  if (!nread.ok()) {
375
18.4E
    DVLOG_WITH_PREFIX(3) << "socket_.Recvv() error: " << nread.status();
376
42.8M
    if (nread.status().IsTryAgain()) {
377
42.2M
      return false;
378
42.2M
    }
379
664k
    return nread.status();
380
664k
  }
381
18.4E
  DVLOG_WITH_PREFIX(4) << "socket_.Recvv() bytes: " << *nread;
382
383
42.4M
  IncrementCounterBy(bytes_received_counter_, *nread);
384
42.4M
  ReadBuffer().DataAppended(*nread);
385
42.4M
  return *nread != 0;
386
42.4M
}
387
388
0
void TcpStream::ParseReceived() {
389
0
  auto result = TryProcessReceived();
390
0
  if (!result.ok()) {
391
0
    context_->Destroy(result.status());
392
0
    return;
393
0
  }
394
0
  if (read_buffer_full_) {
395
0
    read_buffer_full_ = false;
396
0
    UpdateEvents();
397
0
  }
398
0
}
399
400
42.5M
Result<bool> TcpStream::TryProcessReceived() {
401
42.5M
  auto& read_buffer = ReadBuffer();
402
42.5M
  if (!read_buffer.ReadyToRead()) {
403
81.9k
    return false;
404
81.9k
  }
405
406
42.4M
  auto result = VERIFY_RESULT(context_->ProcessReceived(ReadBufferFull(read_buffer.Full())));
407
10.6k
  DVLOG_WITH_PREFIX(5) << "context_->ProcessReceived result: " << AsString(result);
408
409
8.18k
  LOG_IF(DFATAL, inbound_bytes_to_skip_ != 0)
410
8.18k
      << "Expected inbound_bytes_to_skip_ to be 0 instead of " << inbound_bytes_to_skip_;
411
42.4M
  inbound_bytes_to_skip_ = result;
412
42.4M
  return true;
413
42.4M
}
414
415
316k
Status TcpStream::WriteHandler(bool just_connected) {
416
316k
  waiting_write_ready_ = false;
417
316k
  if (sending_.empty()) {
418
0
    LOG_IF_WITH_PREFIX(WARNING, !just_connected) <<
419
0
        "Got a ready-to-write callback, but there is nothing to write.";
420
0
    return Status::OK();
421
0
  }
422
423
316k
  return DoWrite();
424
316k
}
425
426
59.0M
bool TcpStream::Idle(std::string* reason_not_idle) {
427
59.0M
  bool result = true;
428
  // Check if we're in the middle of receiving something.
429
59.0M
  if (!ReadBuffer().Empty()) {
430
1.68k
    if (reason_not_idle) {
431
0
      AppendWithSeparator("read buffer not empty", reason_not_idle);
432
0
    }
433
1.68k
    result = false;
434
1.68k
  }
435
436
  // Check if we still need to send something.
437
59.0M
  if (!sending_.empty()) {
438
92
    if (reason_not_idle) {
439
0
      AppendWithSeparator("still sending", reason_not_idle);
440
0
    }
441
92
    result = false;
442
92
  }
443
444
59.0M
  return result;
445
59.0M
}
446
447
1.21M
void TcpStream::ClearSending(const Status& status) {
448
  // Clear any outbound transfers.
449
94.2k
  for (auto& data : sending_) {
450
94.2k
    if (data.data) {
451
94.2k
      context_->Transferred(data.data, status);
452
94.2k
    }
453
94.2k
  }
454
1.21M
  sending_.clear();
455
1.21M
  queued_bytes_to_send_ = 0;
456
1.21M
}
457
458
44.7M
Result<size_t> TcpStream::Send(OutboundDataPtr data) {
459
  // In case of TcpStream handle is absolute index of data block, since stream start.
460
  // So it could be cacluated as index in sending_ plus number of data blocks that were already
461
  // transferred.
462
44.7M
  size_t result = data_blocks_sent_ + sending_.size();
463
464
60.8k
  DVLOG_WITH_PREFIX(6) << "TcpStream::Send queueing: " << AsString(*data);
465
  // Serialize the actual bytes to be put on the wire.
466
44.7M
  sending_.emplace_back(std::move(data), mem_tracker_);
467
44.7M
  queued_bytes_to_send_ += sending_.back().bytes_size();
468
43.9k
  DVLOG_WITH_PREFIX(4) << "Queued data, sending_.size(): " << sending_.size()
469
43.9k
                       << ", queued_bytes_to_send_: " << queued_bytes_to_send_;
470
471
44.7M
  return result;
472
44.7M
}
473
474
14.1k
void TcpStream::Cancelled(size_t handle) {
475
14.1k
  if (handle < data_blocks_sent_) {
476
9.37k
    return;
477
9.37k
  }
478
4.74k
  handle -= data_blocks_sent_;
479
3
  LOG_IF_WITH_PREFIX(DFATAL, !sending_[handle].data->IsFinished())
480
3
      << "Cancelling not finished data: " << sending_[handle].data->ToString();
481
4.74k
  auto& entry = sending_[handle];
482
4.74k
  if (handle == 0 && send_position_ > 0) {
483
    // Transfer already started, cannot drop it.
484
43
    return;
485
43
  }
486
487
4.69k
  queued_bytes_to_send_ -= entry.bytes_size();
488
4.69k
  entry.ClearBytes();
489
4.69k
}
490
491
10
void TcpStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) {
492
10
  auto call_in_flight = resp->add_calls_in_flight();
493
10
  uint64_t sending_bytes = 0;
494
1
  for (auto& entry : sending_) {
495
1
    auto entry_bytes_size = entry.bytes_size();;
496
1
    sending_bytes += entry_bytes_size;
497
1
    if (!entry.data) {
498
0
      continue;
499
0
    }
500
1
    if (entry.data->DumpPB(req, call_in_flight)) {
501
0
      call_in_flight->set_sending_bytes(entry_bytes_size);
502
0
      call_in_flight = resp->add_calls_in_flight();
503
0
    }
504
1
  }
505
10
  resp->set_sending_bytes(sending_bytes);
506
10
  resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1);
507
10
}
508
509
408k
const Protocol* TcpStream::StaticProtocol() {
510
408k
  static Protocol result("tcp");
511
408k
  return &result;
512
408k
}
513
514
37.5k
StreamFactoryPtr TcpStream::Factory() {
515
37.5k
  class TcpStreamFactory : public StreamFactory {
516
37.5k
   private:
517
942k
    std::unique_ptr<Stream> Create(const StreamCreateData& data) override {
518
942k
      return std::make_unique<TcpStream>(data);
519
942k
    }
520
37.5k
  };
521
522
37.5k
  return std::make_shared<TcpStreamFactory>();
523
37.5k
}
524
525
TcpStreamSendingData::TcpStreamSendingData(OutboundDataPtr data_, const MemTrackerPtr& mem_tracker)
526
44.6M
    : data(std::move(data_)) {
527
44.6M
  data->Serialize(&bytes);
528
44.6M
  if (mem_tracker) {
529
44.6M
    size_t memory_used = sizeof(*this);
530
44.6M
    memory_used += DynamicMemoryUsageOf(data);
531
    // We don't need to account `bytes` dynamic memory usage, because it stores RefCntBuffer
532
    // instance in internal memory and RefCntBuffer instance is referring to the same dynamic memory
533
    // as `data`.
534
44.6M
    consumption = ScopedTrackedConsumption(mem_tracker, memory_used);
535
44.6M
  }
536
44.6M
}
537
538
} // namespace rpc
539
} // namespace yb