YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/tcp_stream.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_RPC_TCP_STREAM_H
15
#define YB_RPC_TCP_STREAM_H
16
17
#include <ev++.h>
18
19
#include "yb/rpc/stream.h"
20
21
#include "yb/util/net/socket.h"
22
#include "yb/util/mem_tracker.h"
23
#include "yb/util/ref_cnt_buffer.h"
24
25
namespace yb {
26
27
class Counter;
28
29
namespace rpc {
30
31
struct TcpStreamSendingData {
32
  typedef boost::container::small_vector<RefCntBuffer, 4> SendingBytes;
33
34
  TcpStreamSendingData(OutboundDataPtr data_, const MemTrackerPtr& mem_tracker);
35
36
133M
  size_t bytes_size() const {
37
133M
    size_t result = 0;
38
139M
    for (const auto& entry : bytes) {
39
139M
      result += entry.size();
40
139M
    }
41
133M
    return result;
42
133M
  }
43
44
9.35k
  void ClearBytes() {
45
9.35k
    bytes.clear();
46
9.35k
    consumption = ScopedTrackedConsumption();
47
9.35k
  }
48
49
  OutboundDataPtr data;
50
  SendingBytes bytes;
51
  ScopedTrackedConsumption consumption;
52
  bool skipped = false;
53
};
54
55
class TcpStream : public Stream {
56
 public:
57
  explicit TcpStream(const StreamCreateData& data);
58
  ~TcpStream();
59
60
0
  Socket* socket() { return &socket_; }
61
62
90.4M
  size_t GetPendingWriteBytes() override {
63
90.4M
    return queued_bytes_to_send_ - send_position_;
64
90.4M
  }
65
66
  static const rpc::Protocol* StaticProtocol();
67
  static StreamFactoryPtr Factory();
68
69
 private:
70
  struct FillIovResult {
71
    int len;
72
    bool only_heartbeats;
73
  };
74
75
  CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) override;
76
  void Close() override;
77
  void Shutdown(const Status& status) override;
78
  Result<size_t> Send(OutboundDataPtr data) override;
79
  CHECKED_STATUS TryWrite() override;
80
  void Cancelled(size_t handle) override;
81
82
  bool Idle(std::string* reason_not_idle) override;
83
1.21M
  bool IsConnected() override { return connected_; }
84
  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override;
85
86
15.5M
  const Endpoint& Remote() const override { return remote_; }
87
33.1k
  const Endpoint& Local() const override { return local_; }
88
89
366k
  const Protocol* GetProtocol() override {
90
366k
    return StaticProtocol();
91
366k
  }
92
93
  void ParseReceived() override;
94
95
  CHECKED_STATUS DoWrite();
96
  void HandleOutcome(const Status& status, bool enqueue);
97
  void ClearSending(const Status& status);
98
99
  void Handler(ev::io& watcher, int revents); // NOLINT
100
  CHECKED_STATUS ReadHandler();
101
  CHECKED_STATUS WriteHandler(bool just_connected);
102
103
  Result<bool> Receive();
104
  // Try to parse received data and process it.
105
  Result<bool> TryProcessReceived();
106
107
  // Updates listening events.
108
  void UpdateEvents();
109
110
  FillIovResult FillIov(iovec* out);
111
112
  void DelayConnectHandler(ev::timer& watcher, int revents); // NOLINT
113
114
  CHECKED_STATUS DoStart(ev::loop_ref* loop, bool connect);
115
116
231M
  StreamReadBuffer& ReadBuffer() {
117
231M
    return context_->ReadBuffer();
118
231M
  }
119
120
  void PopSending();
121
122
  // The socket we're communicating on.
123
  Socket socket_;
124
125
  // The remote address we're talking from.
126
  Endpoint local_;
127
128
  // The remote address we're talking to.
129
  const Endpoint remote_;
130
131
  StreamContext* context_ = nullptr;
132
133
  // Notifies us when our socket is readable or writable.
134
  ev::io io_;
135
136
  ev::timer connect_delayer_;
137
138
  // Set to true when the connection is registered on a loop.
139
  // This is used for a sanity check in the destructor that we are properly
140
  // un-registered before shutting down.
141
  bool is_epoll_registered_ = false;
142
143
  bool connected_ = false;
144
145
  bool read_buffer_full_ = false;
146
147
  std::deque<TcpStreamSendingData> sending_;
148
  size_t data_blocks_sent_ = 0;
149
  size_t send_position_ = 0;
150
  size_t queued_bytes_to_send_ = 0;
151
  size_t inbound_bytes_to_skip_ = 0;
152
  bool waiting_write_ready_ = false;
153
  MemTrackerPtr mem_tracker_;
154
  scoped_refptr<Counter> bytes_sent_counter_;
155
  scoped_refptr<Counter> bytes_received_counter_;
156
};
157
158
} // namespace rpc
159
} // namespace yb
160
161
#endif // YB_RPC_TCP_STREAM_H