/Users/deen/code/yugabyte-db/src/yb/rpc/lightweight_message.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/lightweight_message.h" |
15 | | |
16 | | #include <google/protobuf/message.h> |
17 | | |
18 | | #include "yb/util/pb_util.h" |
19 | | #include "yb/util/size_literals.h" |
20 | | |
21 | | using namespace yb::size_literals; |
22 | | |
23 | | // Maximum size of RPC should be larger than size of consensus batch |
24 | | // At each layer, we embed the "message" from the previous layer. |
25 | | // In order to send three strings of 64, the request from cql/redis will be larger |
26 | | // than that because we will have overheads from that layer. |
27 | | // Hence, we have a limit of 254MB at the consensus layer. |
28 | | // The rpc layer adds its own headers, so we limit the rpc message size to 255MB. |
29 | | DEFINE_uint64(rpc_max_message_size, 255_MB, |
30 | | "The maximum size of a message of any RPC that the server will accept."); |
31 | | |
32 | | using google::protobuf::internal::WireFormatLite; |
33 | | using google::protobuf::io::CodedOutputStream; |
34 | | |
35 | | namespace yb { |
36 | | namespace rpc { |
37 | | |
38 | | namespace { |
39 | | |
40 | 178 | inline bool SliceRead(google::protobuf::io::CodedInputStream* input, Slice* out) { |
41 | 178 | uint32_t length; |
42 | 178 | if (!input->ReadVarint32(&length)) { |
43 | 0 | return false; |
44 | 0 | } |
45 | 178 | const void* data; |
46 | 178 | int size; |
47 | 178 | input->GetDirectBufferPointerInline(&data, &size); |
48 | 178 | if (!input->Skip(length)) { |
49 | 0 | return false; |
50 | 0 | } |
51 | 178 | *out = Slice(static_cast<const char*>(data), length); |
52 | 178 | return true; |
53 | 178 | } |
54 | | |
55 | 502 | inline size_t SliceSize(Slice value) { |
56 | 502 | uint32_t size = narrow_cast<uint32_t>(value.size()); |
57 | 502 | return CodedOutputStream::VarintSize32(size) + size; |
58 | 502 | } |
59 | | |
60 | 251 | inline uint8_t* SliceWrite(Slice value, uint8_t* out) { |
61 | 251 | uint32_t size = narrow_cast<uint32_t>(value.size()); |
62 | 251 | out = CodedOutputStream::WriteVarint32ToArray(size, out); |
63 | 251 | memcpy(out, value.data(), size); |
64 | 251 | return out + size; |
65 | 251 | } |
66 | | |
67 | | } // namespace |
68 | | |
69 | 1 | Status LightweightMessage::ParseFromSlice(const Slice& slice) { |
70 | 1 | google::protobuf::io::CodedInputStream in(slice.data(), narrow_cast<int>(slice.size())); |
71 | 1 | SetupLimit(&in); |
72 | 1 | return ParseFromCodedStream(&in); |
73 | 1 | } |
74 | | |
75 | 0 | std::string LightweightMessage::SerializeAsString() const { |
76 | 0 | size_t size = SerializedSize(); |
77 | 0 | std::string result; |
78 | 0 | result.resize(size); |
79 | 0 | SerializeToArray(pointer_cast<uint8_t*>(const_cast<char*>(result.data()))); |
80 | 0 | return result; |
81 | 0 | } |
82 | | |
83 | 5 | std::string LightweightMessage::ShortDebugString() const { |
84 | 5 | std::string result; |
85 | 5 | AppendToDebugString(&result); |
86 | 5 | return result; |
87 | 5 | } |
88 | | |
89 | 72.2M | Status AnyMessagePtr::ParseFromSlice(const Slice& slice) { |
90 | 72.2M | if (is_lightweight()) { |
91 | 0 | return lightweight()->ParseFromSlice(slice); |
92 | 0 | } |
93 | | |
94 | 72.2M | google::protobuf::io::CodedInputStream in(slice.data(), narrow_cast<int>(slice.size())); |
95 | 72.2M | SetupLimit(&in); |
96 | 72.2M | auto* proto = protobuf(); |
97 | 72.2M | if (PREDICT_FALSE(!proto->ParseFromCodedStream(&in))) { |
98 | 0 | return STATUS(InvalidArgument, proto->InitializationErrorString()); |
99 | 0 | } |
100 | 72.2M | return Status::OK(); |
101 | 72.2M | } |
102 | | |
103 | 292M | size_t AnyMessageConstPtr::SerializedSize() const { |
104 | 292M | return is_lightweight() ? lightweight()->SerializedSize()2 : protobuf()->ByteSizeLong()292M ; |
105 | 292M | } |
106 | | |
107 | 146M | Result<uint8_t*> AnyMessageConstPtr::SerializeToArray(uint8_t* out) const { |
108 | 146M | if (is_lightweight()) { |
109 | 1 | return lightweight()->SerializeToArray(out); |
110 | 1 | } |
111 | | |
112 | 146M | auto* proto = protobuf(); |
113 | 146M | if (PREDICT_FALSE(!proto->IsInitialized())) { |
114 | 1 | return STATUS_FORMAT(InvalidArgument, "RPC argument missing required fields: $0 ($1)", |
115 | 1 | proto->InitializationErrorString(), proto->ShortDebugString()); |
116 | 1 | } |
117 | | |
118 | 146M | return proto->SerializeWithCachedSizesToArray(out); |
119 | 146M | } |
120 | | |
121 | | template <> |
122 | | bool LightweightSerialization<WireFormatLite::TYPE_DOUBLE, double>::Read( |
123 | 1 | google::protobuf::io::CodedInputStream* input, double* out) { |
124 | 1 | return input->ReadLittleEndian64(reinterpret_cast<uint64_t*>(out)); |
125 | 1 | } |
126 | | |
127 | | template <> |
128 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_DOUBLE, double>::Write( |
129 | 1 | double value, uint8_t* out) { |
130 | 1 | return CodedOutputStream::WriteLittleEndian64ToArray(*reinterpret_cast<uint64*>(&value), out); |
131 | 1 | } |
132 | | |
133 | | template <> |
134 | | bool LightweightSerialization<WireFormatLite::TYPE_FLOAT, float>::Read( |
135 | 1 | google::protobuf::io::CodedInputStream* input, float* out) { |
136 | 1 | return input->ReadLittleEndian32(reinterpret_cast<uint32_t*>(out)); |
137 | 1 | } |
138 | | |
139 | | template <> |
140 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_FLOAT, float>::Write( |
141 | 1 | float value, uint8_t* out) { |
142 | 1 | return CodedOutputStream::WriteLittleEndian32ToArray(*reinterpret_cast<uint32*>(&value), out); |
143 | 1 | } |
144 | | |
145 | | template <> |
146 | | bool LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Read( |
147 | 12 | google::protobuf::io::CodedInputStream* input, int64_t* out) { |
148 | 12 | return input->ReadVarint64(reinterpret_cast<uint64_t*>(out)); |
149 | 12 | } |
150 | | |
151 | | template <> |
152 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Write( |
153 | 12 | int64_t value, uint8_t* out) { |
154 | 12 | return CodedOutputStream::WriteVarint64ToArray(value, out); |
155 | 12 | } |
156 | | |
157 | | template <> |
158 | 24 | size_t LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Size(int64_t value) { |
159 | 24 | return CodedOutputStream::VarintSize64(value); |
160 | 24 | } |
161 | | |
162 | | template <> |
163 | | bool LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Read( |
164 | 128 | google::protobuf::io::CodedInputStream* input, uint64_t* out) { |
165 | 128 | return input->ReadVarint64(out); |
166 | 128 | } |
167 | | |
168 | | template <> |
169 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Write( |
170 | 128 | uint64_t value, uint8_t* out) { |
171 | 128 | return CodedOutputStream::WriteVarint64ToArray(value, out); |
172 | 128 | } |
173 | | |
174 | | template <> |
175 | 256 | size_t LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Size(uint64_t value) { |
176 | 256 | return CodedOutputStream::VarintSize64(value); |
177 | 256 | } |
178 | | |
179 | | template <> |
180 | | bool LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Read( |
181 | 1 | google::protobuf::io::CodedInputStream* input, int32_t* out) { |
182 | 1 | return input->ReadVarint32(reinterpret_cast<uint32_t*>(out)); |
183 | 1 | } |
184 | | |
185 | | template <> |
186 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Write( |
187 | 1 | int32_t value, uint8_t* out) { |
188 | 1 | return CodedOutputStream::WriteVarint32ToArray(value, out); |
189 | 1 | } |
190 | | |
191 | | template <> |
192 | 2 | size_t LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Size(int32_t value) { |
193 | 2 | return CodedOutputStream::VarintSize32(value); |
194 | 2 | } |
195 | | |
196 | | template <> |
197 | | bool LightweightSerialization<WireFormatLite::TYPE_FIXED64, uint64_t>::Read( |
198 | 1 | google::protobuf::io::CodedInputStream* input, uint64_t* out) { |
199 | 1 | return input->ReadLittleEndian64(out); |
200 | 1 | } |
201 | | |
202 | | template <> |
203 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_FIXED64, uint64_t>::Write( |
204 | 1 | uint64_t value, uint8_t* out) { |
205 | 1 | return CodedOutputStream::WriteLittleEndian64ToArray(value, out); |
206 | 1 | } |
207 | | |
208 | | template <> |
209 | | bool LightweightSerialization<WireFormatLite::TYPE_FIXED32, uint32_t>::Read( |
210 | 58 | google::protobuf::io::CodedInputStream* input, uint32_t* out) { |
211 | 58 | return input->ReadLittleEndian32(out); |
212 | 58 | } |
213 | | |
214 | | template <> |
215 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_FIXED32, uint32_t>::Write( |
216 | 48 | uint32_t value, uint8_t* out) { |
217 | 48 | return CodedOutputStream::WriteLittleEndian32ToArray(value, out); |
218 | 48 | } |
219 | | |
220 | | template <> |
221 | | bool LightweightSerialization<WireFormatLite::TYPE_BOOL, bool>::Read( |
222 | 0 | google::protobuf::io::CodedInputStream* input, bool* out) { |
223 | 0 | uint64 temp; |
224 | 0 | if (!input->ReadVarint64(&temp)) { |
225 | 0 | return false; |
226 | 0 | } |
227 | 0 | *out = temp != 0; |
228 | 0 | return true; |
229 | 0 | } |
230 | | |
231 | | template <> |
232 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_BOOL, bool>::Write( |
233 | 0 | bool value, uint8_t* out) { |
234 | 0 | return CodedOutputStream::WriteVarint64ToArray(value, out); |
235 | 0 | } |
236 | | |
237 | | template <> |
238 | | bool LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Read( |
239 | 12 | google::protobuf::io::CodedInputStream* input, uint32_t* out) { |
240 | 12 | return input->ReadVarint32(out); |
241 | 12 | } |
242 | | |
243 | | template <> |
244 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Write( |
245 | 22 | uint32_t value, uint8_t* out) { |
246 | 22 | return CodedOutputStream::WriteVarint32ToArray(value, out); |
247 | 22 | } |
248 | | |
249 | | template <> |
250 | 44 | size_t LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Size(uint32_t value) { |
251 | 44 | return CodedOutputStream::VarintSize32(value); |
252 | 44 | } |
253 | | |
254 | | template <> |
255 | | bool LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Read( |
256 | 122 | google::protobuf::io::CodedInputStream* input, Slice* out) { |
257 | 122 | return SliceRead(input, out); |
258 | 122 | } |
259 | | |
260 | | template <> |
261 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Write( |
262 | 188 | Slice value, uint8_t* out) { |
263 | 188 | return SliceWrite(value, out); |
264 | 188 | } |
265 | | |
266 | | template <> |
267 | 376 | size_t LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Size(Slice value) { |
268 | 376 | return SliceSize(value); |
269 | 376 | } |
270 | | |
271 | | template <> |
272 | | bool LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Read( |
273 | 56 | google::protobuf::io::CodedInputStream* input, Slice* out) { |
274 | 56 | return SliceRead(input, out); |
275 | 56 | } |
276 | | |
277 | | template <> |
278 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Write( |
279 | 63 | Slice value, uint8_t* out) { |
280 | 63 | return SliceWrite(value, out); |
281 | 63 | } |
282 | | |
283 | | template <> |
284 | | size_t LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Size( |
285 | 126 | Slice value) { |
286 | 126 | return SliceSize(value); |
287 | 126 | } |
288 | | |
289 | | template <> |
290 | | bool LightweightSerialization<WireFormatLite::TYPE_SFIXED32, int32_t>::Read( |
291 | 12 | google::protobuf::io::CodedInputStream* input, int32_t* out) { |
292 | 12 | return input->ReadLittleEndian32(reinterpret_cast<uint32_t*>(out)); |
293 | 12 | } |
294 | | |
295 | | template <> |
296 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_SFIXED32, int32_t>::Write( |
297 | 18 | int32_t value, uint8_t* out) { |
298 | 18 | return CodedOutputStream::WriteLittleEndian32ToArray(value, out); |
299 | 18 | } |
300 | | |
301 | | template <> |
302 | | bool LightweightSerialization<WireFormatLite::TYPE_SFIXED64, int64_t>::Read( |
303 | 1 | google::protobuf::io::CodedInputStream* input, int64_t* out) { |
304 | 1 | return input->ReadLittleEndian64(reinterpret_cast<uint64_t*>(out)); |
305 | 1 | } |
306 | | |
307 | | template <> |
308 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_SFIXED64, int64_t>::Write( |
309 | 1 | int64_t value, uint8_t* out) { |
310 | 1 | return CodedOutputStream::WriteLittleEndian64ToArray(value, out); |
311 | 1 | } |
312 | | |
313 | | template <> |
314 | | bool LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Read( |
315 | 144 | google::protobuf::io::CodedInputStream* input, int32_t* out) { |
316 | 144 | uint32_t temp; |
317 | 144 | if (!input->ReadVarint32(&temp)) { |
318 | 0 | return false; |
319 | 0 | } |
320 | 144 | *out = WireFormatLite::ZigZagDecode32(temp); |
321 | 144 | return true; |
322 | 144 | } |
323 | | |
324 | | template <> |
325 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Write( |
326 | 222 | int32_t value, uint8_t* out) { |
327 | 222 | return CodedOutputStream::WriteVarint32ToArray(WireFormatLite::ZigZagEncode32(value), out); |
328 | 222 | } |
329 | | |
330 | | template <> |
331 | 444 | size_t LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Size(int32_t value) { |
332 | 444 | return CodedOutputStream::VarintSize32(WireFormatLite::ZigZagEncode32(value)); |
333 | 444 | } |
334 | | |
335 | | template <> |
336 | | bool LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Read( |
337 | 1 | google::protobuf::io::CodedInputStream* input, int64_t* out) { |
338 | 1 | uint64_t temp; |
339 | 1 | if (!input->ReadVarint64(&temp)) { |
340 | 0 | return false; |
341 | 0 | } |
342 | 1 | *out = WireFormatLite::ZigZagDecode64(temp); |
343 | 1 | return true; |
344 | 1 | } |
345 | | |
346 | | template <> |
347 | | uint8_t* LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Write( |
348 | 1 | int64_t value, uint8_t* out) { |
349 | 1 | return CodedOutputStream::WriteVarint64ToArray(WireFormatLite::ZigZagEncode64(value), out); |
350 | 1 | } |
351 | | |
352 | | template <> |
353 | 2 | size_t LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Size(int64_t value) { |
354 | 2 | return CodedOutputStream::VarintSize64(WireFormatLite::ZigZagEncode64(value)); |
355 | 2 | } |
356 | | |
357 | 3.08k | void AppendFieldTitle(const char* name, const char* suffix, bool* first, std::string* out) { |
358 | 3.08k | if (*first) { |
359 | 137 | *first = false; |
360 | 2.94k | } else { |
361 | 2.94k | *out += ' '; |
362 | 2.94k | } |
363 | 3.08k | *out += name; |
364 | 3.08k | *out += suffix; |
365 | 3.08k | } |
366 | | |
367 | 0 | CHECKED_STATUS ParseFailed(const char* field_name) { |
368 | 0 | return STATUS_FORMAT(Corruption, "Failed to parse '$0'", field_name); |
369 | 0 | } |
370 | | |
371 | 285M | void SetupLimit(google::protobuf::io::CodedInputStream* in) { |
372 | 285M | in->SetTotalBytesLimit(narrow_cast<int>(FLAGS_rpc_max_message_size), |
373 | 285M | narrow_cast<int>(FLAGS_rpc_max_message_size * 3 / 4)); |
374 | 285M | } |
375 | | |
376 | | } // namespace rpc |
377 | | } // namespace yb |