YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/tcp_stream.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/tcp_stream.h"
15
16
#include "yb/rpc/outbound_data.h"
17
#include "yb/rpc/rpc_introspection.pb.h"
18
#include "yb/rpc/rpc_util.h"
19
20
#include "yb/util/errno.h"
21
#include "yb/util/flag_tags.h"
22
#include "yb/util/logging.h"
23
#include "yb/util/memory/memory_usage.h"
24
#include "yb/util/metrics.h"
25
#include "yb/util/result.h"
26
#include "yb/util/status_log.h"
27
#include "yb/util/string_util.h"
28
29
using namespace std::literals;
30
31
DECLARE_uint64(rpc_connection_timeout_ms);
32
DEFINE_test_flag(int32, delay_connect_ms, 0,
33
                 "Delay connect in tests for specified amount of milliseconds.");
34
35
METRIC_DEFINE_simple_counter(
36
  server, tcp_bytes_sent, "Bytes sent over TCP connections", yb::MetricUnit::kBytes);
37
38
METRIC_DEFINE_simple_counter(
39
  server, tcp_bytes_received, "Bytes received via TCP connections", yb::MetricUnit::kBytes);
40
41
namespace yb {
42
namespace rpc {
43
44
namespace {
45
46
const size_t kMaxIov = 16;
47
48
}
49
50
TcpStream::TcpStream(const StreamCreateData& data)
51
    : socket_(std::move(*data.socket)),
52
3.74M
      remote_(data.remote) {
53
3.74M
  if (data.mem_tracker) {
54
3.73M
    mem_tracker_ = MemTracker::FindOrCreateTracker("Sending", data.mem_tracker);
55
3.73M
  }
56
3.74M
  if (data.metric_entity) {
57
3.70M
    bytes_received_counter_ = METRIC_tcp_bytes_received.Instantiate(data.metric_entity);
58
3.70M
    bytes_sent_counter_ = METRIC_tcp_bytes_sent.Instantiate(data.metric_entity);
59
3.70M
  }
60
3.74M
}
61
62
3.25M
TcpStream::~TcpStream() {
63
  // Must clear the outbound_transfers_ list before deleting.
64
3.25M
  CHECK
(sending_.empty()) << ToString()1.22k
;
65
66
  // It's crucial that the stream is Shutdown first -- otherwise
67
  // our destructor will end up calling io_.stop()
68
  // from a possibly non-reactor thread context. This can then make all
69
  // hell break loose with libev.
70
3.25M
  CHECK
(!is_epoll_registered_) << ToString()2.62k
;
71
3.25M
}
72
73
3.75M
Status TcpStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) {
74
3.75M
  context_ = context;
75
3.75M
  connected_ = !connect;
76
77
3.75M
  RETURN_NOT_OK(socket_.SetNoDelay(true));
78
  // These timeouts don't affect non-blocking sockets:
79
3.75M
  RETURN_NOT_OK(socket_.SetSendTimeout(FLAGS_rpc_connection_timeout_ms * 1ms));
80
3.75M
  RETURN_NOT_OK(socket_.SetRecvTimeout(FLAGS_rpc_connection_timeout_ms * 1ms));
81
82
3.75M
  if (connect && 
FLAGS_TEST_delay_connect_ms3.13M
) {
83
10
    connect_delayer_.set(*loop);
84
10
    connect_delayer_.set<TcpStream, &TcpStream::DelayConnectHandler>(this);
85
10
    connect_delayer_.start(
86
10
        static_cast<double>(FLAGS_TEST_delay_connect_ms) / MonoTime::kMillisecondsPerSecond, 0);
87
10
    return Status::OK();
88
10
  }
89
90
3.75M
  return DoStart(loop, connect);
91
3.75M
}
92
93
3.75M
Status TcpStream::DoStart(ev::loop_ref* loop, bool connect) {
94
3.75M
  if (connect) {
95
3.13M
    auto status = socket_.Connect(remote_);
96
3.13M
    if (!status.ok() && 
!status.IsTryAgain()3.12M
) {
97
236
      LOG_WITH_PREFIX(WARNING) << "Connect failed: " << status;
98
236
      return status;
99
236
    }
100
3.13M
  }
101
102
3.75M
  RETURN_NOT_OK(socket_.GetSocketAddress(&local_));
103
3.75M
  log_prefix_.clear();
104
105
3.75M
  io_.set(*loop);
106
3.75M
  io_.set<TcpStream, &TcpStream::Handler>(this);
107
3.75M
  int events = ev::READ | (!connected_ ? 
ev::WRITE3.09M
:
0665k
);
108
3.75M
  io_.start(socket_.GetFd(), events);
109
110
3.75M
  
DVLOG_WITH_PREFIX23.9k
(3) << "Starting, listen events: " << events << ", fd: " << socket_.GetFd()23.9k
;
111
112
3.75M
  is_epoll_registered_ = true;
113
114
3.75M
  if (connected_) {
115
622k
    context_->Connected();
116
622k
  }
117
118
3.75M
  return Status::OK();
119
3.75M
}
120
121
10
void TcpStream::DelayConnectHandler(ev::timer& watcher, int revents) { // NOLINT
122
10
  if (EV_ERROR & revents) {
123
0
    LOG_WITH_PREFIX(WARNING) << "Got an error in handle delay connect";
124
0
    return;
125
0
  }
126
127
10
  auto status = DoStart(&watcher.loop, true /* connect */);
128
10
  if (!status.ok()) {
129
0
    Shutdown(status);
130
0
  }
131
10
}
132
133
7.50k
void TcpStream::Close() {
134
7.50k
  if (socket_.GetFd() >= 0) {
135
7.41k
    auto status = socket_.Shutdown(true, true);
136
7.41k
    LOG_IF
(INFO, !status.ok()) << "Failed to shutdown socket: " << status5.22k
;
137
7.41k
  }
138
7.50k
}
139
140
6.33M
void TcpStream::Shutdown(const Status& status) {
141
6.33M
  ClearSending(status);
142
143
6.33M
  if (!ReadBuffer().Empty()) {
144
4.76k
    LOG_WITH_PREFIX(WARNING) << "Shutting down with pending inbound data ("
145
4.76k
                             << ReadBuffer().ToString() << ", status = " << status << ")";
146
4.76k
  }
147
148
6.33M
  io_.stop();
149
6.33M
  is_epoll_registered_ = false;
150
151
6.33M
  ReadBuffer().Reset();
152
153
6.33M
  WARN_NOT_OK(socket_.Close(), "Error closing socket");
154
6.33M
}
155
156
165M
Status TcpStream::TryWrite() {
157
165M
  auto result = DoWrite();
158
165M
  if (
result.ok()165M
) {
159
165M
    UpdateEvents();
160
165M
  }
161
165M
  return result;
162
165M
}
163
164
159M
TcpStream::FillIovResult TcpStream::FillIov(iovec* out) {
165
159M
  int index = 0;
166
159M
  size_t offset = send_position_;
167
159M
  bool only_heartbeats = true;
168
162M
  for (auto& data : sending_) {
169
162M
    const auto wrapped_data = data.data;
170
162M
    if (wrapped_data && 
!wrapped_data->IsHeartbeat()162M
) {
171
153M
      only_heartbeats = false;
172
153M
    }
173
162M
    if (data.skipped || 
(162M
offset == 0162M
&&
wrapped_data162M
&&
wrapped_data->IsFinished()162M
)) {
174
3.77k
      queued_bytes_to_send_ -= data.bytes_size();
175
3.77k
      data.ClearBytes();
176
3.77k
      data.skipped = true;
177
3.77k
      continue;
178
3.77k
    }
179
166M
    
for (const auto& bytes : data.bytes)162M
{
180
166M
      if (offset >= bytes.size()) {
181
55.9k
        offset -= bytes.size();
182
55.9k
        continue;
183
55.9k
      }
184
185
166M
      out[index].iov_base = bytes.data() + offset;
186
166M
      out[index].iov_len = bytes.size() - offset;
187
166M
      offset = 0;
188
166M
      if (++index == kMaxIov) {
189
5.77k
        return FillIovResult{index, only_heartbeats};
190
5.77k
      }
191
166M
    }
192
162M
  }
193
194
159M
  return FillIovResult{index, only_heartbeats};
195
159M
}
196
197
166M
Status TcpStream::DoWrite() {
198
166M
  
DVLOG_WITH_PREFIX255k
(5) << "sending_.size(): " << sending_.size()255k
;
199
166M
  if (!connected_ || 
waiting_write_ready_159M
||
!is_epoll_registered_159M
) {
200
6.32M
    
DVLOG_WITH_PREFIX9.04k
(5)
201
9.04k
        << "connected_: " << connected_
202
9.04k
        << " waiting_write_ready_: " << waiting_write_ready_
203
9.04k
        << " is_epoll_registered_: " << is_epoll_registered_;
204
6.32M
    return Status::OK();
205
6.32M
  }
206
207
  // If we weren't waiting write to be ready, we could try to write data to socket.
208
319M
  
while (159M
!sending_.empty()) {
209
159M
    iovec iov[kMaxIov];
210
159M
    auto fill_result = FillIov(iov);
211
212
159M
    if (!fill_result.only_heartbeats) {
213
151M
      context_->UpdateLastActivity();
214
151M
    }
215
216
159M
    auto result = fill_result.len != 0
217
159M
        ? socket_.Writev(iov, fill_result.len)
218
18.4E
        : 0;
219
18.4E
    DVLOG_WITH_PREFIX(4) << "Queued writes " << queued_bytes_to_send_ << " bytes. Result "
220
18.4E
                         << result << ", sending_.size(): " << sending_.size();
221
222
159M
    if (PREDICT_FALSE(!result.ok())) {
223
32.2k
      if (!result.status().IsTryAgain()) {
224
1.49k
        
YB_LOG_WITH_PREFIX_EVERY_N229
(WARNING, 50) << "Send failed: " << result.status()229
;
225
1.49k
        return result.status();
226
30.7k
      } else {
227
30.7k
        
VLOG_WITH_PREFIX0
(3) << "Send temporary failed: " << result.status()0
;
228
30.7k
        return Status::OK();
229
30.7k
      }
230
32.2k
    }
231
232
159M
    context_->UpdateLastWrite();
233
234
159M
    IncrementCounterBy(bytes_sent_counter_, *result);
235
236
159M
    send_position_ += *result;
237
321M
    while (!sending_.empty()) {
238
162M
      auto& front = sending_.front();
239
162M
      size_t full_size = front.bytes_size();
240
162M
      if (front.skipped) {
241
3.77k
        PopSending();
242
3.77k
        continue;
243
3.77k
      }
244
162M
      if (send_position_ < full_size) {
245
39.6k
        break;
246
39.6k
      }
247
162M
      auto data = front.data;
248
162M
      send_position_ -= full_size;
249
162M
      PopSending();
250
162M
      if (
data162M
) {
251
162M
        context_->Transferred(data, Status::OK());
252
162M
      }
253
162M
    }
254
159M
  }
255
256
159M
  return Status::OK();
257
159M
}
258
259
162M
void TcpStream::PopSending() {
260
162M
  queued_bytes_to_send_ -= sending_.front().bytes_size();
261
162M
  sending_.pop_front();
262
162M
  ++data_blocks_sent_;
263
162M
}
264
265
162M
void TcpStream::Handler(ev::io& watcher, int revents) {  // NOLINT
266
162M
  
DVLOG_WITH_PREFIX20.2k
(4) << "Handler(revents=" << revents << ")"20.2k
;
267
162M
  Status status = Status::OK();
268
162M
  if (revents & ev::ERROR) {
269
0
    status = STATUS(NetworkError, ToString() + ": Handler encountered an error");
270
0
    VLOG_WITH_PREFIX(3) << status;
271
0
  }
272
273
162M
  if (
status.ok()162M
&& (revents & ev::READ)) {
274
162M
    status = ReadHandler();
275
162M
    if (!status.ok()) {
276
3.04M
      
VLOG_WITH_PREFIX139
(3) << "ReadHandler() returned error: " << status139
;
277
3.04M
    }
278
162M
  }
279
280
162M
  if (status.ok() && 
(revents & ev::WRITE)159M
) {
281
645k
    bool just_connected = !connected_;
282
645k
    if (just_connected) {
283
606k
      connected_ = true;
284
606k
      context_->Connected();
285
606k
    }
286
645k
    status = WriteHandler(just_connected);
287
645k
    if (!status.ok()) {
288
15
      
VLOG_WITH_PREFIX0
(3) << "WriteHandler() returned error: " << status0
;
289
15
    }
290
645k
  }
291
292
162M
  if (status.ok()) {
293
159M
    UpdateEvents();
294
159M
  } else {
295
3.12M
    context_->Destroy(status);
296
3.12M
  }
297
162M
}
298
299
325M
void TcpStream::UpdateEvents() {
300
325M
  int events = 0;
301
325M
  if (!read_buffer_full_) {
302
325M
    events |= ev::READ;
303
325M
  }
304
325M
  waiting_write_ready_ = !sending_.empty() || 
!connected_324M
;
305
325M
  if (waiting_write_ready_) {
306
6.34M
    events |= ev::WRITE;
307
6.34M
  }
308
325M
  if (events) {
309
325M
    io_.set(events);
310
325M
  }
311
325M
}
312
313
162M
Status TcpStream::ReadHandler() {
314
162M
  context_->UpdateLastRead();
315
316
321M
  for (;;) {
317
321M
    auto received = Receive();
318
321M
    if (PREDICT_FALSE(!received.ok())) {
319
3.03M
      if (Errno(received.status()) == ESHUTDOWN) {
320
18.4E
        VLOG_WITH_PREFIX(1) << "Shut down by remote end.";
321
2.54M
      } else {
322
2.54M
        
YB_LOG_WITH_PREFIX_EVERY_N55.1k
(INFO, 50) << " Recv failed: " << received55.1k
;
323
2.54M
      }
324
3.03M
      return received.status();
325
3.03M
    }
326
    // Exit the loop if we did not receive anything.
327
318M
    if (!received.get()) {
328
158M
      return Status::OK();
329
158M
    }
330
    // If we were not able to process next call exit loop.
331
    // If status is ok, it means that we just do not have enough data to process yet.
332
159M
    auto continue_receiving = TryProcessReceived();
333
159M
    if (!continue_receiving.ok()) {
334
2.89k
      return continue_receiving.status();
335
2.89k
    }
336
159M
    if (!continue_receiving.get()) {
337
210k
      return Status::OK();
338
210k
    }
339
159M
  }
340
162M
}
341
342
321M
Result<bool> TcpStream::Receive() {
343
321M
  auto iov = ReadBuffer().PrepareAppend();
344
321M
  if (!iov.ok()) {
345
0
    VLOG_WITH_PREFIX(3) << "ReadBuffer().PrepareAppend() error: " << iov.status();
346
0
    if (iov.status().IsBusy()) {
347
0
      read_buffer_full_ = true;
348
0
      return false;
349
0
    }
350
0
    return iov.status();
351
0
  }
352
321M
  read_buffer_full_ = false;
353
354
321M
  if (inbound_bytes_to_skip_ > 0) {
355
2.18k
    auto global_skip_buffer = GetGlobalSkipBuffer();
356
9.35k
    do {
357
9.35k
      
VLOG_WITH_PREFIX0
(3) << "inbound_bytes_to_skip_: " << inbound_bytes_to_skip_0
;
358
9.35k
      auto nread = socket_.Recv(
359
9.35k
          global_skip_buffer.mutable_data(),
360
9.35k
          std::min(global_skip_buffer.size(), inbound_bytes_to_skip_));
361
9.35k
      if (!nread.ok()) {
362
2.10k
        
VLOG_WITH_PREFIX0
(3) << "socket_.Recv() error: " << nread.status()0
;
363
2.10k
        if (nread.status().IsTryAgain()) {
364
2.10k
          return false;
365
2.10k
        }
366
0
        return nread.status();
367
2.10k
      }
368
7.24k
      IncrementCounterBy(bytes_received_counter_, *nread);
369
7.24k
      inbound_bytes_to_skip_ -= *nread;
370
7.24k
    } while (inbound_bytes_to_skip_ > 0);
371
2.18k
  }
372
373
321M
  auto nread = socket_.Recvv(iov.get_ptr());
374
321M
  if (!nread.ok()) {
375
161M
    
DVLOG_WITH_PREFIX3.25k
(3) << "socket_.Recvv() error: " << nread.status()3.25k
;
376
161M
    if (nread.status().IsTryAgain()) {
377
158M
      return false;
378
158M
    }
379
3.31M
    return nread.status();
380
161M
  }
381
18.4E
  DVLOG_WITH_PREFIX(4) << "socket_.Recvv() bytes: " << *nread;
382
383
159M
  IncrementCounterBy(bytes_received_counter_, *nread);
384
159M
  ReadBuffer().DataAppended(*nread);
385
159M
  return *nread != 0;
386
321M
}
387
388
0
void TcpStream::ParseReceived() {
389
0
  auto result = TryProcessReceived();
390
0
  if (!result.ok()) {
391
0
    context_->Destroy(result.status());
392
0
    return;
393
0
  }
394
0
  if (read_buffer_full_) {
395
0
    read_buffer_full_ = false;
396
0
    UpdateEvents();
397
0
  }
398
0
}
399
400
159M
Result<bool> TcpStream::TryProcessReceived() {
401
159M
  auto& read_buffer = ReadBuffer();
402
159M
  if (!read_buffer.ReadyToRead()) {
403
210k
    return false;
404
210k
  }
405
406
158M
  auto result = 
VERIFY_RESULT158M
(context_->ProcessReceived(ReadBufferFull(read_buffer.Full())));158M
407
18.4E
  DVLOG_WITH_PREFIX(5) << "context_->ProcessReceived result: " << AsString(result);
408
409
158M
  LOG_IF(DFATAL, inbound_bytes_to_skip_ != 0)
410
4.82k
      << "Expected inbound_bytes_to_skip_ to be 0 instead of " << inbound_bytes_to_skip_;
411
158M
  inbound_bytes_to_skip_ = result;
412
158M
  return true;
413
158M
}
414
415
644k
Status TcpStream::WriteHandler(bool just_connected) {
416
644k
  waiting_write_ready_ = false;
417
644k
  if (sending_.empty()) {
418
0
    LOG_IF_WITH_PREFIX(WARNING, !just_connected) <<
419
0
        "Got a ready-to-write callback, but there is nothing to write.";
420
0
    return Status::OK();
421
0
  }
422
423
644k
  return DoWrite();
424
644k
}
425
426
914M
bool TcpStream::Idle(std::string* reason_not_idle) {
427
914M
  bool result = true;
428
  // Check if we're in the middle of receiving something.
429
914M
  if (!ReadBuffer().Empty()) {
430
1.68k
    if (reason_not_idle) {
431
0
      AppendWithSeparator("read buffer not empty", reason_not_idle);
432
0
    }
433
1.68k
    result = false;
434
1.68k
  }
435
436
  // Check if we still need to send something.
437
914M
  if (!sending_.empty()) {
438
334
    if (reason_not_idle) {
439
0
      AppendWithSeparator("still sending", reason_not_idle);
440
0
    }
441
334
    result = false;
442
334
  }
443
444
914M
  return result;
445
914M
}
446
447
6.33M
void TcpStream::ClearSending(const Status& status) {
448
  // Clear any outbound transfers.
449
6.33M
  for (auto& data : sending_) {
450
83.9k
    if (data.data) {
451
83.9k
      context_->Transferred(data.data, status);
452
83.9k
    }
453
83.9k
  }
454
6.33M
  sending_.clear();
455
6.33M
  queued_bytes_to_send_ = 0;
456
6.33M
}
457
458
162M
Result<size_t> TcpStream::Send(OutboundDataPtr data) {
459
  // In case of TcpStream handle is absolute index of data block, since stream start.
460
  // So it could be cacluated as index in sending_ plus number of data blocks that were already
461
  // transferred.
462
162M
  size_t result = data_blocks_sent_ + sending_.size();
463
464
162M
  
DVLOG_WITH_PREFIX269k
(6) << "TcpStream::Send queueing: " << AsString(*data)269k
;
465
  // Serialize the actual bytes to be put on the wire.
466
162M
  sending_.emplace_back(std::move(data), mem_tracker_);
467
162M
  queued_bytes_to_send_ += sending_.back().bytes_size();
468
162M
  
DVLOG_WITH_PREFIX659k
(4) << "Queued data, sending_.size(): " << sending_.size()
469
659k
                       << ", queued_bytes_to_send_: " << queued_bytes_to_send_;
470
471
162M
  return result;
472
162M
}
473
474
29.5k
void TcpStream::Cancelled(size_t handle) {
475
29.5k
  if (handle < data_blocks_sent_) {
476
25.7k
    return;
477
25.7k
  }
478
3.78k
  handle -= data_blocks_sent_;
479
3.78k
  
LOG_IF_WITH_PREFIX3
(DFATAL, !sending_[handle].data->IsFinished())
480
3
      << "Cancelling not finished data: " << sending_[handle].data->ToString();
481
3.78k
  auto& entry = sending_[handle];
482
3.78k
  if (handle == 0 && 
send_position_ > 011
) {
483
    // Transfer already started, cannot drop it.
484
11
    return;
485
11
  }
486
487
3.77k
  queued_bytes_to_send_ -= entry.bytes_size();
488
3.77k
  entry.ClearBytes();
489
3.77k
}
490
491
10
void TcpStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) {
492
10
  auto call_in_flight = resp->add_calls_in_flight();
493
10
  uint64_t sending_bytes = 0;
494
10
  for (auto& entry : sending_) {
495
0
    auto entry_bytes_size = entry.bytes_size();;
496
0
    sending_bytes += entry_bytes_size;
497
0
    if (!entry.data) {
498
0
      continue;
499
0
    }
500
0
    if (entry.data->DumpPB(req, call_in_flight)) {
501
0
      call_in_flight->set_sending_bytes(entry_bytes_size);
502
0
      call_in_flight = resp->add_calls_in_flight();
503
0
    }
504
0
  }
505
10
  resp->set_sending_bytes(sending_bytes);
506
10
  resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1);
507
10
}
508
509
411k
const Protocol* TcpStream::StaticProtocol() {
510
411k
  static Protocol result("tcp");
511
411k
  return &result;
512
411k
}
513
514
58.8k
StreamFactoryPtr TcpStream::Factory() {
515
58.8k
  class TcpStreamFactory : public StreamFactory {
516
58.8k
   private:
517
3.74M
    std::unique_ptr<Stream> Create(const StreamCreateData& data) override {
518
3.74M
      return std::make_unique<TcpStream>(data);
519
3.74M
    }
520
58.8k
  };
521
522
58.8k
  return std::make_shared<TcpStreamFactory>();
523
58.8k
}
524
525
TcpStreamSendingData::TcpStreamSendingData(OutboundDataPtr data_, const MemTrackerPtr& mem_tracker)
526
162M
    : data(std::move(data_)) {
527
162M
  data->Serialize(&bytes);
528
162M
  if (
mem_tracker162M
) {
529
162M
    size_t memory_used = sizeof(*this);
530
162M
    memory_used += DynamicMemoryUsageOf(data);
531
    // We don't need to account `bytes` dynamic memory usage, because it stores RefCntBuffer
532
    // instance in internal memory and RefCntBuffer instance is referring to the same dynamic memory
533
    // as `data`.
534
162M
    consumption = ScopedTrackedConsumption(mem_tracker, memory_used);
535
162M
  }
536
162M
}
537
538
} // namespace rpc
539
} // namespace yb