/Users/deen/code/yugabyte-db/src/yb/rpc/tcp_stream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/tcp_stream.h" |
15 | | |
16 | | #include "yb/rpc/outbound_data.h" |
17 | | #include "yb/rpc/rpc_introspection.pb.h" |
18 | | #include "yb/rpc/rpc_util.h" |
19 | | |
20 | | #include "yb/util/errno.h" |
21 | | #include "yb/util/flag_tags.h" |
22 | | #include "yb/util/logging.h" |
23 | | #include "yb/util/memory/memory_usage.h" |
24 | | #include "yb/util/metrics.h" |
25 | | #include "yb/util/result.h" |
26 | | #include "yb/util/status_log.h" |
27 | | #include "yb/util/string_util.h" |
28 | | |
29 | | using namespace std::literals; |
30 | | |
31 | | DECLARE_uint64(rpc_connection_timeout_ms); |
32 | | DEFINE_test_flag(int32, delay_connect_ms, 0, |
33 | | "Delay connect in tests for specified amount of milliseconds."); |
34 | | |
35 | | METRIC_DEFINE_simple_counter( |
36 | | server, tcp_bytes_sent, "Bytes sent over TCP connections", yb::MetricUnit::kBytes); |
37 | | |
38 | | METRIC_DEFINE_simple_counter( |
39 | | server, tcp_bytes_received, "Bytes received via TCP connections", yb::MetricUnit::kBytes); |
40 | | |
41 | | namespace yb { |
42 | | namespace rpc { |
43 | | |
44 | | namespace { |
45 | | |
46 | | const size_t kMaxIov = 16; |
47 | | |
48 | | } |
49 | | |
50 | | TcpStream::TcpStream(const StreamCreateData& data) |
51 | | : socket_(std::move(*data.socket)), |
52 | 3.74M | remote_(data.remote) { |
53 | 3.74M | if (data.mem_tracker) { |
54 | 3.73M | mem_tracker_ = MemTracker::FindOrCreateTracker("Sending", data.mem_tracker); |
55 | 3.73M | } |
56 | 3.74M | if (data.metric_entity) { |
57 | 3.70M | bytes_received_counter_ = METRIC_tcp_bytes_received.Instantiate(data.metric_entity); |
58 | 3.70M | bytes_sent_counter_ = METRIC_tcp_bytes_sent.Instantiate(data.metric_entity); |
59 | 3.70M | } |
60 | 3.74M | } |
61 | | |
62 | 3.25M | TcpStream::~TcpStream() { |
63 | | // Must clear the outbound_transfers_ list before deleting. |
64 | 3.25M | CHECK(sending_.empty()) << ToString()1.22k ; |
65 | | |
66 | | // It's crucial that the stream is Shutdown first -- otherwise |
67 | | // our destructor will end up calling io_.stop() |
68 | | // from a possibly non-reactor thread context. This can then make all |
69 | | // hell break loose with libev. |
70 | 3.25M | CHECK(!is_epoll_registered_) << ToString()2.62k ; |
71 | 3.25M | } |
72 | | |
73 | 3.75M | Status TcpStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) { |
74 | 3.75M | context_ = context; |
75 | 3.75M | connected_ = !connect; |
76 | | |
77 | 3.75M | RETURN_NOT_OK(socket_.SetNoDelay(true)); |
78 | | // These timeouts don't affect non-blocking sockets: |
79 | 3.75M | RETURN_NOT_OK(socket_.SetSendTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); |
80 | 3.75M | RETURN_NOT_OK(socket_.SetRecvTimeout(FLAGS_rpc_connection_timeout_ms * 1ms)); |
81 | | |
82 | 3.75M | if (connect && FLAGS_TEST_delay_connect_ms3.13M ) { |
83 | 10 | connect_delayer_.set(*loop); |
84 | 10 | connect_delayer_.set<TcpStream, &TcpStream::DelayConnectHandler>(this); |
85 | 10 | connect_delayer_.start( |
86 | 10 | static_cast<double>(FLAGS_TEST_delay_connect_ms) / MonoTime::kMillisecondsPerSecond, 0); |
87 | 10 | return Status::OK(); |
88 | 10 | } |
89 | | |
90 | 3.75M | return DoStart(loop, connect); |
91 | 3.75M | } |
92 | | |
93 | 3.75M | Status TcpStream::DoStart(ev::loop_ref* loop, bool connect) { |
94 | 3.75M | if (connect) { |
95 | 3.13M | auto status = socket_.Connect(remote_); |
96 | 3.13M | if (!status.ok() && !status.IsTryAgain()3.12M ) { |
97 | 236 | LOG_WITH_PREFIX(WARNING) << "Connect failed: " << status; |
98 | 236 | return status; |
99 | 236 | } |
100 | 3.13M | } |
101 | | |
102 | 3.75M | RETURN_NOT_OK(socket_.GetSocketAddress(&local_)); |
103 | 3.75M | log_prefix_.clear(); |
104 | | |
105 | 3.75M | io_.set(*loop); |
106 | 3.75M | io_.set<TcpStream, &TcpStream::Handler>(this); |
107 | 3.75M | int events = ev::READ | (!connected_ ? ev::WRITE3.09M : 0665k ); |
108 | 3.75M | io_.start(socket_.GetFd(), events); |
109 | | |
110 | 3.75M | DVLOG_WITH_PREFIX23.9k (3) << "Starting, listen events: " << events << ", fd: " << socket_.GetFd()23.9k ; |
111 | | |
112 | 3.75M | is_epoll_registered_ = true; |
113 | | |
114 | 3.75M | if (connected_) { |
115 | 622k | context_->Connected(); |
116 | 622k | } |
117 | | |
118 | 3.75M | return Status::OK(); |
119 | 3.75M | } |
120 | | |
121 | 10 | void TcpStream::DelayConnectHandler(ev::timer& watcher, int revents) { // NOLINT |
122 | 10 | if (EV_ERROR & revents) { |
123 | 0 | LOG_WITH_PREFIX(WARNING) << "Got an error in handle delay connect"; |
124 | 0 | return; |
125 | 0 | } |
126 | | |
127 | 10 | auto status = DoStart(&watcher.loop, true /* connect */); |
128 | 10 | if (!status.ok()) { |
129 | 0 | Shutdown(status); |
130 | 0 | } |
131 | 10 | } |
132 | | |
133 | 7.50k | void TcpStream::Close() { |
134 | 7.50k | if (socket_.GetFd() >= 0) { |
135 | 7.41k | auto status = socket_.Shutdown(true, true); |
136 | 7.41k | LOG_IF(INFO, !status.ok()) << "Failed to shutdown socket: " << status5.22k ; |
137 | 7.41k | } |
138 | 7.50k | } |
139 | | |
140 | 6.33M | void TcpStream::Shutdown(const Status& status) { |
141 | 6.33M | ClearSending(status); |
142 | | |
143 | 6.33M | if (!ReadBuffer().Empty()) { |
144 | 4.76k | LOG_WITH_PREFIX(WARNING) << "Shutting down with pending inbound data (" |
145 | 4.76k | << ReadBuffer().ToString() << ", status = " << status << ")"; |
146 | 4.76k | } |
147 | | |
148 | 6.33M | io_.stop(); |
149 | 6.33M | is_epoll_registered_ = false; |
150 | | |
151 | 6.33M | ReadBuffer().Reset(); |
152 | | |
153 | 6.33M | WARN_NOT_OK(socket_.Close(), "Error closing socket"); |
154 | 6.33M | } |
155 | | |
156 | 165M | Status TcpStream::TryWrite() { |
157 | 165M | auto result = DoWrite(); |
158 | 165M | if (result.ok()165M ) { |
159 | 165M | UpdateEvents(); |
160 | 165M | } |
161 | 165M | return result; |
162 | 165M | } |
163 | | |
164 | 159M | TcpStream::FillIovResult TcpStream::FillIov(iovec* out) { |
165 | 159M | int index = 0; |
166 | 159M | size_t offset = send_position_; |
167 | 159M | bool only_heartbeats = true; |
168 | 162M | for (auto& data : sending_) { |
169 | 162M | const auto wrapped_data = data.data; |
170 | 162M | if (wrapped_data && !wrapped_data->IsHeartbeat()162M ) { |
171 | 153M | only_heartbeats = false; |
172 | 153M | } |
173 | 162M | if (data.skipped || (162M offset == 0162M && wrapped_data162M && wrapped_data->IsFinished()162M )) { |
174 | 3.77k | queued_bytes_to_send_ -= data.bytes_size(); |
175 | 3.77k | data.ClearBytes(); |
176 | 3.77k | data.skipped = true; |
177 | 3.77k | continue; |
178 | 3.77k | } |
179 | 166M | for (const auto& bytes : data.bytes)162M { |
180 | 166M | if (offset >= bytes.size()) { |
181 | 55.9k | offset -= bytes.size(); |
182 | 55.9k | continue; |
183 | 55.9k | } |
184 | | |
185 | 166M | out[index].iov_base = bytes.data() + offset; |
186 | 166M | out[index].iov_len = bytes.size() - offset; |
187 | 166M | offset = 0; |
188 | 166M | if (++index == kMaxIov) { |
189 | 5.77k | return FillIovResult{index, only_heartbeats}; |
190 | 5.77k | } |
191 | 166M | } |
192 | 162M | } |
193 | | |
194 | 159M | return FillIovResult{index, only_heartbeats}; |
195 | 159M | } |
196 | | |
197 | 166M | Status TcpStream::DoWrite() { |
198 | 166M | DVLOG_WITH_PREFIX255k (5) << "sending_.size(): " << sending_.size()255k ; |
199 | 166M | if (!connected_ || waiting_write_ready_159M || !is_epoll_registered_159M ) { |
200 | 6.32M | DVLOG_WITH_PREFIX9.04k (5) |
201 | 9.04k | << "connected_: " << connected_ |
202 | 9.04k | << " waiting_write_ready_: " << waiting_write_ready_ |
203 | 9.04k | << " is_epoll_registered_: " << is_epoll_registered_; |
204 | 6.32M | return Status::OK(); |
205 | 6.32M | } |
206 | | |
207 | | // If we weren't waiting write to be ready, we could try to write data to socket. |
208 | 319M | while (159M !sending_.empty()) { |
209 | 159M | iovec iov[kMaxIov]; |
210 | 159M | auto fill_result = FillIov(iov); |
211 | | |
212 | 159M | if (!fill_result.only_heartbeats) { |
213 | 151M | context_->UpdateLastActivity(); |
214 | 151M | } |
215 | | |
216 | 159M | auto result = fill_result.len != 0 |
217 | 159M | ? socket_.Writev(iov, fill_result.len) |
218 | 18.4E | : 0; |
219 | 18.4E | DVLOG_WITH_PREFIX(4) << "Queued writes " << queued_bytes_to_send_ << " bytes. Result " |
220 | 18.4E | << result << ", sending_.size(): " << sending_.size(); |
221 | | |
222 | 159M | if (PREDICT_FALSE(!result.ok())) { |
223 | 32.2k | if (!result.status().IsTryAgain()) { |
224 | 1.49k | YB_LOG_WITH_PREFIX_EVERY_N229 (WARNING, 50) << "Send failed: " << result.status()229 ; |
225 | 1.49k | return result.status(); |
226 | 30.7k | } else { |
227 | 30.7k | VLOG_WITH_PREFIX0 (3) << "Send temporary failed: " << result.status()0 ; |
228 | 30.7k | return Status::OK(); |
229 | 30.7k | } |
230 | 32.2k | } |
231 | | |
232 | 159M | context_->UpdateLastWrite(); |
233 | | |
234 | 159M | IncrementCounterBy(bytes_sent_counter_, *result); |
235 | | |
236 | 159M | send_position_ += *result; |
237 | 321M | while (!sending_.empty()) { |
238 | 162M | auto& front = sending_.front(); |
239 | 162M | size_t full_size = front.bytes_size(); |
240 | 162M | if (front.skipped) { |
241 | 3.77k | PopSending(); |
242 | 3.77k | continue; |
243 | 3.77k | } |
244 | 162M | if (send_position_ < full_size) { |
245 | 39.6k | break; |
246 | 39.6k | } |
247 | 162M | auto data = front.data; |
248 | 162M | send_position_ -= full_size; |
249 | 162M | PopSending(); |
250 | 162M | if (data162M ) { |
251 | 162M | context_->Transferred(data, Status::OK()); |
252 | 162M | } |
253 | 162M | } |
254 | 159M | } |
255 | | |
256 | 159M | return Status::OK(); |
257 | 159M | } |
258 | | |
259 | 162M | void TcpStream::PopSending() { |
260 | 162M | queued_bytes_to_send_ -= sending_.front().bytes_size(); |
261 | 162M | sending_.pop_front(); |
262 | 162M | ++data_blocks_sent_; |
263 | 162M | } |
264 | | |
265 | 162M | void TcpStream::Handler(ev::io& watcher, int revents) { // NOLINT |
266 | 162M | DVLOG_WITH_PREFIX20.2k (4) << "Handler(revents=" << revents << ")"20.2k ; |
267 | 162M | Status status = Status::OK(); |
268 | 162M | if (revents & ev::ERROR) { |
269 | 0 | status = STATUS(NetworkError, ToString() + ": Handler encountered an error"); |
270 | 0 | VLOG_WITH_PREFIX(3) << status; |
271 | 0 | } |
272 | | |
273 | 162M | if (status.ok()162M && (revents & ev::READ)) { |
274 | 162M | status = ReadHandler(); |
275 | 162M | if (!status.ok()) { |
276 | 3.04M | VLOG_WITH_PREFIX139 (3) << "ReadHandler() returned error: " << status139 ; |
277 | 3.04M | } |
278 | 162M | } |
279 | | |
280 | 162M | if (status.ok() && (revents & ev::WRITE)159M ) { |
281 | 645k | bool just_connected = !connected_; |
282 | 645k | if (just_connected) { |
283 | 606k | connected_ = true; |
284 | 606k | context_->Connected(); |
285 | 606k | } |
286 | 645k | status = WriteHandler(just_connected); |
287 | 645k | if (!status.ok()) { |
288 | 15 | VLOG_WITH_PREFIX0 (3) << "WriteHandler() returned error: " << status0 ; |
289 | 15 | } |
290 | 645k | } |
291 | | |
292 | 162M | if (status.ok()) { |
293 | 159M | UpdateEvents(); |
294 | 159M | } else { |
295 | 3.12M | context_->Destroy(status); |
296 | 3.12M | } |
297 | 162M | } |
298 | | |
299 | 325M | void TcpStream::UpdateEvents() { |
300 | 325M | int events = 0; |
301 | 325M | if (!read_buffer_full_) { |
302 | 325M | events |= ev::READ; |
303 | 325M | } |
304 | 325M | waiting_write_ready_ = !sending_.empty() || !connected_324M ; |
305 | 325M | if (waiting_write_ready_) { |
306 | 6.34M | events |= ev::WRITE; |
307 | 6.34M | } |
308 | 325M | if (events) { |
309 | 325M | io_.set(events); |
310 | 325M | } |
311 | 325M | } |
312 | | |
313 | 162M | Status TcpStream::ReadHandler() { |
314 | 162M | context_->UpdateLastRead(); |
315 | | |
316 | 321M | for (;;) { |
317 | 321M | auto received = Receive(); |
318 | 321M | if (PREDICT_FALSE(!received.ok())) { |
319 | 3.03M | if (Errno(received.status()) == ESHUTDOWN) { |
320 | 18.4E | VLOG_WITH_PREFIX(1) << "Shut down by remote end."; |
321 | 2.54M | } else { |
322 | 2.54M | YB_LOG_WITH_PREFIX_EVERY_N55.1k (INFO, 50) << " Recv failed: " << received55.1k ; |
323 | 2.54M | } |
324 | 3.03M | return received.status(); |
325 | 3.03M | } |
326 | | // Exit the loop if we did not receive anything. |
327 | 318M | if (!received.get()) { |
328 | 158M | return Status::OK(); |
329 | 158M | } |
330 | | // If we were not able to process next call exit loop. |
331 | | // If status is ok, it means that we just do not have enough data to process yet. |
332 | 159M | auto continue_receiving = TryProcessReceived(); |
333 | 159M | if (!continue_receiving.ok()) { |
334 | 2.89k | return continue_receiving.status(); |
335 | 2.89k | } |
336 | 159M | if (!continue_receiving.get()) { |
337 | 210k | return Status::OK(); |
338 | 210k | } |
339 | 159M | } |
340 | 162M | } |
341 | | |
342 | 321M | Result<bool> TcpStream::Receive() { |
343 | 321M | auto iov = ReadBuffer().PrepareAppend(); |
344 | 321M | if (!iov.ok()) { |
345 | 0 | VLOG_WITH_PREFIX(3) << "ReadBuffer().PrepareAppend() error: " << iov.status(); |
346 | 0 | if (iov.status().IsBusy()) { |
347 | 0 | read_buffer_full_ = true; |
348 | 0 | return false; |
349 | 0 | } |
350 | 0 | return iov.status(); |
351 | 0 | } |
352 | 321M | read_buffer_full_ = false; |
353 | | |
354 | 321M | if (inbound_bytes_to_skip_ > 0) { |
355 | 2.18k | auto global_skip_buffer = GetGlobalSkipBuffer(); |
356 | 9.35k | do { |
357 | 9.35k | VLOG_WITH_PREFIX0 (3) << "inbound_bytes_to_skip_: " << inbound_bytes_to_skip_0 ; |
358 | 9.35k | auto nread = socket_.Recv( |
359 | 9.35k | global_skip_buffer.mutable_data(), |
360 | 9.35k | std::min(global_skip_buffer.size(), inbound_bytes_to_skip_)); |
361 | 9.35k | if (!nread.ok()) { |
362 | 2.10k | VLOG_WITH_PREFIX0 (3) << "socket_.Recv() error: " << nread.status()0 ; |
363 | 2.10k | if (nread.status().IsTryAgain()) { |
364 | 2.10k | return false; |
365 | 2.10k | } |
366 | 0 | return nread.status(); |
367 | 2.10k | } |
368 | 7.24k | IncrementCounterBy(bytes_received_counter_, *nread); |
369 | 7.24k | inbound_bytes_to_skip_ -= *nread; |
370 | 7.24k | } while (inbound_bytes_to_skip_ > 0); |
371 | 2.18k | } |
372 | | |
373 | 321M | auto nread = socket_.Recvv(iov.get_ptr()); |
374 | 321M | if (!nread.ok()) { |
375 | 161M | DVLOG_WITH_PREFIX3.25k (3) << "socket_.Recvv() error: " << nread.status()3.25k ; |
376 | 161M | if (nread.status().IsTryAgain()) { |
377 | 158M | return false; |
378 | 158M | } |
379 | 3.31M | return nread.status(); |
380 | 161M | } |
381 | 18.4E | DVLOG_WITH_PREFIX(4) << "socket_.Recvv() bytes: " << *nread; |
382 | | |
383 | 159M | IncrementCounterBy(bytes_received_counter_, *nread); |
384 | 159M | ReadBuffer().DataAppended(*nread); |
385 | 159M | return *nread != 0; |
386 | 321M | } |
387 | | |
388 | 0 | void TcpStream::ParseReceived() { |
389 | 0 | auto result = TryProcessReceived(); |
390 | 0 | if (!result.ok()) { |
391 | 0 | context_->Destroy(result.status()); |
392 | 0 | return; |
393 | 0 | } |
394 | 0 | if (read_buffer_full_) { |
395 | 0 | read_buffer_full_ = false; |
396 | 0 | UpdateEvents(); |
397 | 0 | } |
398 | 0 | } |
399 | | |
400 | 159M | Result<bool> TcpStream::TryProcessReceived() { |
401 | 159M | auto& read_buffer = ReadBuffer(); |
402 | 159M | if (!read_buffer.ReadyToRead()) { |
403 | 210k | return false; |
404 | 210k | } |
405 | | |
406 | 158M | auto result = VERIFY_RESULT158M (context_->ProcessReceived(ReadBufferFull(read_buffer.Full())));158M |
407 | 18.4E | DVLOG_WITH_PREFIX(5) << "context_->ProcessReceived result: " << AsString(result); |
408 | | |
409 | 158M | LOG_IF(DFATAL, inbound_bytes_to_skip_ != 0) |
410 | 4.82k | << "Expected inbound_bytes_to_skip_ to be 0 instead of " << inbound_bytes_to_skip_; |
411 | 158M | inbound_bytes_to_skip_ = result; |
412 | 158M | return true; |
413 | 158M | } |
414 | | |
415 | 644k | Status TcpStream::WriteHandler(bool just_connected) { |
416 | 644k | waiting_write_ready_ = false; |
417 | 644k | if (sending_.empty()) { |
418 | 0 | LOG_IF_WITH_PREFIX(WARNING, !just_connected) << |
419 | 0 | "Got a ready-to-write callback, but there is nothing to write."; |
420 | 0 | return Status::OK(); |
421 | 0 | } |
422 | | |
423 | 644k | return DoWrite(); |
424 | 644k | } |
425 | | |
426 | 914M | bool TcpStream::Idle(std::string* reason_not_idle) { |
427 | 914M | bool result = true; |
428 | | // Check if we're in the middle of receiving something. |
429 | 914M | if (!ReadBuffer().Empty()) { |
430 | 1.68k | if (reason_not_idle) { |
431 | 0 | AppendWithSeparator("read buffer not empty", reason_not_idle); |
432 | 0 | } |
433 | 1.68k | result = false; |
434 | 1.68k | } |
435 | | |
436 | | // Check if we still need to send something. |
437 | 914M | if (!sending_.empty()) { |
438 | 334 | if (reason_not_idle) { |
439 | 0 | AppendWithSeparator("still sending", reason_not_idle); |
440 | 0 | } |
441 | 334 | result = false; |
442 | 334 | } |
443 | | |
444 | 914M | return result; |
445 | 914M | } |
446 | | |
447 | 6.33M | void TcpStream::ClearSending(const Status& status) { |
448 | | // Clear any outbound transfers. |
449 | 6.33M | for (auto& data : sending_) { |
450 | 83.9k | if (data.data) { |
451 | 83.9k | context_->Transferred(data.data, status); |
452 | 83.9k | } |
453 | 83.9k | } |
454 | 6.33M | sending_.clear(); |
455 | 6.33M | queued_bytes_to_send_ = 0; |
456 | 6.33M | } |
457 | | |
458 | 162M | Result<size_t> TcpStream::Send(OutboundDataPtr data) { |
459 | | // In case of TcpStream handle is absolute index of data block, since stream start. |
460 | | // So it could be cacluated as index in sending_ plus number of data blocks that were already |
461 | | // transferred. |
462 | 162M | size_t result = data_blocks_sent_ + sending_.size(); |
463 | | |
464 | 162M | DVLOG_WITH_PREFIX269k (6) << "TcpStream::Send queueing: " << AsString(*data)269k ; |
465 | | // Serialize the actual bytes to be put on the wire. |
466 | 162M | sending_.emplace_back(std::move(data), mem_tracker_); |
467 | 162M | queued_bytes_to_send_ += sending_.back().bytes_size(); |
468 | 162M | DVLOG_WITH_PREFIX659k (4) << "Queued data, sending_.size(): " << sending_.size() |
469 | 659k | << ", queued_bytes_to_send_: " << queued_bytes_to_send_; |
470 | | |
471 | 162M | return result; |
472 | 162M | } |
473 | | |
474 | 29.5k | void TcpStream::Cancelled(size_t handle) { |
475 | 29.5k | if (handle < data_blocks_sent_) { |
476 | 25.7k | return; |
477 | 25.7k | } |
478 | 3.78k | handle -= data_blocks_sent_; |
479 | 3.78k | LOG_IF_WITH_PREFIX3 (DFATAL, !sending_[handle].data->IsFinished()) |
480 | 3 | << "Cancelling not finished data: " << sending_[handle].data->ToString(); |
481 | 3.78k | auto& entry = sending_[handle]; |
482 | 3.78k | if (handle == 0 && send_position_ > 011 ) { |
483 | | // Transfer already started, cannot drop it. |
484 | 11 | return; |
485 | 11 | } |
486 | | |
487 | 3.77k | queued_bytes_to_send_ -= entry.bytes_size(); |
488 | 3.77k | entry.ClearBytes(); |
489 | 3.77k | } |
490 | | |
491 | 10 | void TcpStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) { |
492 | 10 | auto call_in_flight = resp->add_calls_in_flight(); |
493 | 10 | uint64_t sending_bytes = 0; |
494 | 10 | for (auto& entry : sending_) { |
495 | 0 | auto entry_bytes_size = entry.bytes_size();; |
496 | 0 | sending_bytes += entry_bytes_size; |
497 | 0 | if (!entry.data) { |
498 | 0 | continue; |
499 | 0 | } |
500 | 0 | if (entry.data->DumpPB(req, call_in_flight)) { |
501 | 0 | call_in_flight->set_sending_bytes(entry_bytes_size); |
502 | 0 | call_in_flight = resp->add_calls_in_flight(); |
503 | 0 | } |
504 | 0 | } |
505 | 10 | resp->set_sending_bytes(sending_bytes); |
506 | 10 | resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1); |
507 | 10 | } |
508 | | |
509 | 411k | const Protocol* TcpStream::StaticProtocol() { |
510 | 411k | static Protocol result("tcp"); |
511 | 411k | return &result; |
512 | 411k | } |
513 | | |
514 | 58.8k | StreamFactoryPtr TcpStream::Factory() { |
515 | 58.8k | class TcpStreamFactory : public StreamFactory { |
516 | 58.8k | private: |
517 | 3.74M | std::unique_ptr<Stream> Create(const StreamCreateData& data) override { |
518 | 3.74M | return std::make_unique<TcpStream>(data); |
519 | 3.74M | } |
520 | 58.8k | }; |
521 | | |
522 | 58.8k | return std::make_shared<TcpStreamFactory>(); |
523 | 58.8k | } |
524 | | |
525 | | TcpStreamSendingData::TcpStreamSendingData(OutboundDataPtr data_, const MemTrackerPtr& mem_tracker) |
526 | 162M | : data(std::move(data_)) { |
527 | 162M | data->Serialize(&bytes); |
528 | 162M | if (mem_tracker162M ) { |
529 | 162M | size_t memory_used = sizeof(*this); |
530 | 162M | memory_used += DynamicMemoryUsageOf(data); |
531 | | // We don't need to account `bytes` dynamic memory usage, because it stores RefCntBuffer |
532 | | // instance in internal memory and RefCntBuffer instance is referring to the same dynamic memory |
533 | | // as `data`. |
534 | 162M | consumption = ScopedTrackedConsumption(mem_tracker, memory_used); |
535 | 162M | } |
536 | 162M | } |
537 | | |
538 | | } // namespace rpc |
539 | | } // namespace yb |