/Users/deen/code/yugabyte-db/src/yb/rpc/tcp_stream.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_RPC_TCP_STREAM_H |
15 | | #define YB_RPC_TCP_STREAM_H |
16 | | |
17 | | #include <ev++.h> |
18 | | |
19 | | #include "yb/rpc/stream.h" |
20 | | |
21 | | #include "yb/util/net/socket.h" |
22 | | #include "yb/util/mem_tracker.h" |
23 | | #include "yb/util/ref_cnt_buffer.h" |
24 | | |
25 | | namespace yb { |
26 | | |
27 | | class Counter; |
28 | | |
29 | | namespace rpc { |
30 | | |
31 | | struct TcpStreamSendingData { |
32 | | typedef boost::container::small_vector<RefCntBuffer, 4> SendingBytes; |
33 | | |
34 | | TcpStreamSendingData(OutboundDataPtr data_, const MemTrackerPtr& mem_tracker); |
35 | | |
36 | 133M | size_t bytes_size() const { |
37 | 133M | size_t result = 0; |
38 | 139M | for (const auto& entry : bytes) { |
39 | 139M | result += entry.size(); |
40 | 139M | } |
41 | 133M | return result; |
42 | 133M | } |
43 | | |
44 | 9.35k | void ClearBytes() { |
45 | 9.35k | bytes.clear(); |
46 | 9.35k | consumption = ScopedTrackedConsumption(); |
47 | 9.35k | } |
48 | | |
49 | | OutboundDataPtr data; |
50 | | SendingBytes bytes; |
51 | | ScopedTrackedConsumption consumption; |
52 | | bool skipped = false; |
53 | | }; |
54 | | |
55 | | class TcpStream : public Stream { |
56 | | public: |
57 | | explicit TcpStream(const StreamCreateData& data); |
58 | | ~TcpStream(); |
59 | | |
60 | 0 | Socket* socket() { return &socket_; } |
61 | | |
62 | 90.4M | size_t GetPendingWriteBytes() override { |
63 | 90.4M | return queued_bytes_to_send_ - send_position_; |
64 | 90.4M | } |
65 | | |
66 | | static const rpc::Protocol* StaticProtocol(); |
67 | | static StreamFactoryPtr Factory(); |
68 | | |
69 | | private: |
70 | | struct FillIovResult { |
71 | | int len; |
72 | | bool only_heartbeats; |
73 | | }; |
74 | | |
75 | | CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) override; |
76 | | void Close() override; |
77 | | void Shutdown(const Status& status) override; |
78 | | Result<size_t> Send(OutboundDataPtr data) override; |
79 | | CHECKED_STATUS TryWrite() override; |
80 | | void Cancelled(size_t handle) override; |
81 | | |
82 | | bool Idle(std::string* reason_not_idle) override; |
83 | 1.21M | bool IsConnected() override { return connected_; } |
84 | | void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override; |
85 | | |
86 | 15.5M | const Endpoint& Remote() const override { return remote_; } |
87 | 33.1k | const Endpoint& Local() const override { return local_; } |
88 | | |
89 | 366k | const Protocol* GetProtocol() override { |
90 | 366k | return StaticProtocol(); |
91 | 366k | } |
92 | | |
93 | | void ParseReceived() override; |
94 | | |
95 | | CHECKED_STATUS DoWrite(); |
96 | | void HandleOutcome(const Status& status, bool enqueue); |
97 | | void ClearSending(const Status& status); |
98 | | |
99 | | void Handler(ev::io& watcher, int revents); // NOLINT |
100 | | CHECKED_STATUS ReadHandler(); |
101 | | CHECKED_STATUS WriteHandler(bool just_connected); |
102 | | |
103 | | Result<bool> Receive(); |
104 | | // Try to parse received data and process it. |
105 | | Result<bool> TryProcessReceived(); |
106 | | |
107 | | // Updates listening events. |
108 | | void UpdateEvents(); |
109 | | |
110 | | FillIovResult FillIov(iovec* out); |
111 | | |
112 | | void DelayConnectHandler(ev::timer& watcher, int revents); // NOLINT |
113 | | |
114 | | CHECKED_STATUS DoStart(ev::loop_ref* loop, bool connect); |
115 | | |
116 | 231M | StreamReadBuffer& ReadBuffer() { |
117 | 231M | return context_->ReadBuffer(); |
118 | 231M | } |
119 | | |
120 | | void PopSending(); |
121 | | |
122 | | // The socket we're communicating on. |
123 | | Socket socket_; |
124 | | |
125 | | // The remote address we're talking from. |
126 | | Endpoint local_; |
127 | | |
128 | | // The remote address we're talking to. |
129 | | const Endpoint remote_; |
130 | | |
131 | | StreamContext* context_ = nullptr; |
132 | | |
133 | | // Notifies us when our socket is readable or writable. |
134 | | ev::io io_; |
135 | | |
136 | | ev::timer connect_delayer_; |
137 | | |
138 | | // Set to true when the connection is registered on a loop. |
139 | | // This is used for a sanity check in the destructor that we are properly |
140 | | // un-registered before shutting down. |
141 | | bool is_epoll_registered_ = false; |
142 | | |
143 | | bool connected_ = false; |
144 | | |
145 | | bool read_buffer_full_ = false; |
146 | | |
147 | | std::deque<TcpStreamSendingData> sending_; |
148 | | size_t data_blocks_sent_ = 0; |
149 | | size_t send_position_ = 0; |
150 | | size_t queued_bytes_to_send_ = 0; |
151 | | size_t inbound_bytes_to_skip_ = 0; |
152 | | bool waiting_write_ready_ = false; |
153 | | MemTrackerPtr mem_tracker_; |
154 | | scoped_refptr<Counter> bytes_sent_counter_; |
155 | | scoped_refptr<Counter> bytes_received_counter_; |
156 | | }; |
157 | | |
158 | | } // namespace rpc |
159 | | } // namespace yb |
160 | | |
161 | | #endif // YB_RPC_TCP_STREAM_H |