/Users/deen/code/yugabyte-db/src/yb/rpc/binary_call_parser.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/binary_call_parser.h" |
15 | | |
16 | | #include "yb/gutil/endian.h" |
17 | | |
18 | | #include "yb/rpc/connection.h" |
19 | | #include "yb/rpc/connection_context.h" |
20 | | #include "yb/rpc/stream.h" |
21 | | |
22 | | #include "yb/util/logging.h" |
23 | | #include "yb/util/result.h" |
24 | | #include "yb/util/size_literals.h" |
25 | | #include "yb/util/status_format.h" |
26 | | |
27 | | using yb::operator"" _MB; |
28 | | |
29 | | DEFINE_bool( |
30 | | binary_call_parser_reject_on_mem_tracker_hard_limit, true, |
31 | | "Whether to reject/ignore calls on hitting mem tracker hard limit."); |
32 | | |
33 | | DEFINE_int64( |
34 | | rpc_throttle_threshold_bytes, 1048576, |
35 | | "Throttle inbound RPC calls larger than specified size on hitting mem tracker soft limit. " |
36 | | "Throttling is disabled if negative value is specified."); |
37 | | |
38 | | DECLARE_int32(memory_limit_warn_threshold_percentage); |
39 | | |
40 | | namespace yb { |
41 | | namespace rpc { |
42 | | |
43 | | bool ShouldThrottleRpc( |
44 | 38.9M | const MemTrackerPtr& throttle_tracker, ssize_t call_data_size, const char* throttle_message) { |
45 | 38.9M | return (FLAGS_rpc_throttle_threshold_bytes >= 0 && |
46 | 38.9M | call_data_size > FLAGS_rpc_throttle_threshold_bytes && |
47 | 1.24k | !CheckMemoryPressureWithLogging(throttle_tracker, 0 /* score */, throttle_message)); |
48 | 38.9M | } |
49 | | |
50 | | BinaryCallParser::BinaryCallParser( |
51 | | const MemTrackerPtr& parent_tracker, size_t header_size, size_t size_offset, |
52 | | size_t max_message_length, IncludeHeader include_header, SkipEmptyMessages skip_empty_messages, |
53 | | BinaryCallParserListener* listener) |
54 | | : call_header_buffer_(header_size), |
55 | | size_offset_(size_offset), |
56 | | max_message_length_(max_message_length), |
57 | | include_header_(include_header), |
58 | | skip_empty_messages_(skip_empty_messages), |
59 | 942k | listener_(listener) { |
60 | 942k | buffer_tracker_ = MemTracker::FindOrCreateTracker("Reading", parent_tracker); |
61 | 942k | } |
62 | | |
63 | | Result<ProcessCallsResult> BinaryCallParser::Parse( |
64 | | const rpc::ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full, |
65 | 36.7M | const MemTrackerPtr* tracker_for_throttle) { |
66 | 36.7M | if (call_data_.should_reject()) { |
67 | | // We can't properly respond with error, because we don't have enough call data since we |
68 | | // have ignored it. So, we will just ignore this call and client will have timeout. |
69 | | // We can implement partial higher level call header parsing to be able to respond with proper |
70 | | // error if we will need to detect case when server is hitting memory limit at client side. |
71 | 1.53M | call_data_.Reset(); |
72 | 1.53M | call_data_consumption_ = ScopedTrackedConsumption(); |
73 | 35.2M | } else if (!call_data_.empty()) { |
74 | 8.19k | RETURN_NOT_OK(listener_->HandleCall(connection, &call_data_)); |
75 | 8.19k | call_data_.Reset(); |
76 | 8.19k | call_data_consumption_ = ScopedTrackedConsumption(); |
77 | 8.19k | } |
78 | | |
79 | 36.7M | const auto full_input_size = IoVecsFullSize(data); |
80 | 14.4k | VLOG(4) << "BinaryCallParser::Parse, full_input_size: " << full_input_size; |
81 | 36.7M | size_t consumed = 0; |
82 | 36.7M | const size_t header_size = call_header_buffer_.size(); |
83 | 32.8M | const size_t body_offset = include_header_ ? 0 : header_size; |
84 | 25.0k | VLOG(4) << "BinaryCallParser::Parse, header_size: " << header_size |
85 | 25.0k | << " body_offset: " << body_offset; |
86 | 74.6M | while (full_input_size >= consumed + header_size) { |
87 | 37.9M | IoVecsToBuffer(data, consumed, consumed + header_size, &call_header_buffer_); |
88 | | |
89 | 37.9M | const size_t data_length = NetworkByteOrder::Load32(call_header_buffer_.data() + size_offset_); |
90 | 37.9M | const size_t total_length = data_length + header_size; |
91 | 60.7k | VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed << " data_length: " << data_length |
92 | 60.7k | << " total_length: " << total_length; |
93 | 37.9M | if (total_length > max_message_length_) { |
94 | 0 | return STATUS_FORMAT( |
95 | 0 | NetworkError, |
96 | 0 | "The frame had a length of $0, but we only support messages up to $1 bytes long.", |
97 | 0 | total_length, max_message_length_); |
98 | 0 | } |
99 | 37.9M | const size_t call_data_size = total_length - body_offset; |
100 | 42.8k | VLOG(4) << "BinaryCallParser::Parse, call_data_size: " << call_data_size; |
101 | 37.9M | if (consumed + total_length > full_input_size) { |
102 | | // `data` only contains beginning of the current call. Here we allocate memory buffer for the |
103 | | // whole call data and fill beginning with `data`, the rest of the data for the call will be |
104 | | // received and appended by the caller into the same buffer. |
105 | 8.19k | const size_t call_received_size = full_input_size - (consumed + body_offset); |
106 | 1 | VLOG(4) << "BinaryCallParser::Parse, call_received_size: " << call_received_size; |
107 | | |
108 | 8.19k | if (tracker_for_throttle) { |
109 | 0 | VLOG(4) << "BinaryCallParser::Parse, tracker_for_throttle memory usage: " |
110 | 0 | << (*tracker_for_throttle)->LogUsage(""); |
111 | 920 | if (ShouldThrottleRpc(*tracker_for_throttle, call_data_size, "Ignoring RPC call: ")) { |
112 | 0 | call_data_ = CallData(call_data_size, CallData::ShouldRejectTag()); |
113 | 0 | return ProcessCallsResult{ |
114 | 0 | .consumed = full_input_size, |
115 | 0 | .buffer = Slice(), |
116 | 0 | .bytes_to_skip = call_data_size - call_received_size |
117 | 0 | }; |
118 | 0 | } |
119 | 8.19k | } |
120 | | |
121 | 8.19k | MemTracker* blocking_mem_tracker = nullptr; |
122 | 8.19k | if (buffer_tracker_->TryConsume(call_data_size, &blocking_mem_tracker)) { |
123 | 8.19k | call_data_consumption_ = ScopedTrackedConsumption( |
124 | 8.19k | buffer_tracker_, call_data_size, AlreadyConsumed::kTrue); |
125 | 8.19k | call_data_ = CallData(call_data_size); |
126 | 8.19k | IoVecsToBuffer(data, consumed + body_offset, full_input_size, call_data_.data()); |
127 | 8.19k | Slice buffer(call_data_.data() + call_received_size, call_data_size - call_received_size); |
128 | 0 | VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed |
129 | 0 | << " returning: { full_input_size: " << full_input_size |
130 | 0 | << " buffer.size(): " << buffer.size() << " }"; |
131 | 8.19k | return ProcessCallsResult{ |
132 | 8.19k | .consumed = full_input_size, |
133 | 8.19k | .buffer = buffer, |
134 | 8.19k | }; |
135 | 18.4E | } else if (read_buffer_full && consumed == 0) { |
136 | 0 | auto consumption = blocking_mem_tracker ? blocking_mem_tracker->consumption() : -1; |
137 | 0 | auto limit = blocking_mem_tracker ? blocking_mem_tracker->limit() : -1; |
138 | 0 | if (FLAGS_binary_call_parser_reject_on_mem_tracker_hard_limit) { |
139 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 3) |
140 | 0 | << "Unable to allocate read buffer because of limit, required: " << call_data_size |
141 | 0 | << ", blocked by: " << AsString(blocking_mem_tracker) |
142 | 0 | << ", consumption: " << consumption << " of " << limit << ". Call will be ignored.\n" |
143 | 0 | << DumpMemoryUsage(); |
144 | 0 | call_data_ = CallData(call_data_size, CallData::ShouldRejectTag()); |
145 | 0 | return ProcessCallsResult{ |
146 | 0 | .consumed = full_input_size, |
147 | 0 | .buffer = Slice(), |
148 | 0 | .bytes_to_skip = call_data_size - call_received_size |
149 | 0 | }; |
150 | 0 | } else { |
151 | | // For backward compatibility in behavior until we fix |
152 | | // https://github.com/yugabyte/yugabyte-db/issues/2563. |
153 | 0 | LOG(WARNING) << "Unable to allocate read buffer because of limit, required: " |
154 | 0 | << call_data_size << ", blocked by: " << AsString(blocking_mem_tracker) |
155 | 0 | << ", consumption: " << consumption << " of " << limit << "\n" |
156 | 0 | << DumpMemoryUsage(); |
157 | 0 | } |
158 | 0 | } |
159 | 18.4E | break; |
160 | 37.9M | } |
161 | | |
162 | | // We might need to skip empty messages (we use them as low level heartbeats for inter-YB RPC |
163 | | // connections, don't confuse with RAFT heartbeats which are higher level non-empty messages). |
164 | 37.9M | if (!skip_empty_messages_ || data_length > 0) { |
165 | 37.6M | connection->UpdateLastActivity(); |
166 | 37.6M | CallData call_data(call_data_size); |
167 | 37.6M | IoVecsToBuffer(data, consumed + body_offset, consumed + total_length, call_data.data()); |
168 | 37.6M | RETURN_NOT_OK(listener_->HandleCall(connection, &call_data)); |
169 | 37.6M | } |
170 | | |
171 | 37.9M | consumed += total_length; |
172 | 37.9M | } |
173 | 18.4E | VLOG(4) << "BinaryCallParser::Parse, returning: { consumed: " << consumed << " buffer: empty }"; |
174 | 36.7M | return ProcessCallsResult { |
175 | 36.7M | .consumed = consumed, |
176 | 36.7M | .buffer = Slice(), |
177 | 36.7M | }; |
178 | 36.7M | } |
179 | | |
180 | | } // namespace rpc |
181 | | } // namespace yb |