YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/lightweight_message.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/lightweight_message.h"
15
16
#include <google/protobuf/message.h>
17
18
#include "yb/util/pb_util.h"
19
#include "yb/util/size_literals.h"
20
21
using namespace yb::size_literals;
22
23
// Maximum size of RPC should be larger than size of consensus batch
24
// At each layer, we embed the "message" from the previous layer.
25
// In order to send three strings of 64, the request from cql/redis will be larger
26
// than that because we will have overheads from that layer.
27
// Hence, we have a limit of 254MB at the consensus layer.
28
// The rpc layer adds its own headers, so we limit the rpc message size to 255MB.
29
DEFINE_uint64(rpc_max_message_size, 255_MB,
30
              "The maximum size of a message of any RPC that the server will accept.");
31
32
using google::protobuf::internal::WireFormatLite;
33
using google::protobuf::io::CodedOutputStream;
34
35
namespace yb {
36
namespace rpc {
37
38
namespace {
39
40
178
inline bool SliceRead(google::protobuf::io::CodedInputStream* input, Slice* out) {
41
178
  uint32_t length;
42
178
  if (!input->ReadVarint32(&length)) {
43
0
    return false;
44
0
  }
45
178
  const void* data;
46
178
  int size;
47
178
  input->GetDirectBufferPointerInline(&data, &size);
48
178
  if (!input->Skip(length)) {
49
0
    return false;
50
0
  }
51
178
  *out = Slice(static_cast<const char*>(data), length);
52
178
  return true;
53
178
}
54
55
502
inline size_t SliceSize(Slice value) {
56
502
  uint32_t size = narrow_cast<uint32_t>(value.size());
57
502
  return CodedOutputStream::VarintSize32(size) + size;
58
502
}
59
60
251
inline uint8_t* SliceWrite(Slice value, uint8_t* out) {
61
251
  uint32_t size = narrow_cast<uint32_t>(value.size());
62
251
  out = CodedOutputStream::WriteVarint32ToArray(size, out);
63
251
  memcpy(out, value.data(), size);
64
251
  return out + size;
65
251
}
66
67
} // namespace
68
69
1
Status LightweightMessage::ParseFromSlice(const Slice& slice) {
70
1
  google::protobuf::io::CodedInputStream in(slice.data(), narrow_cast<int>(slice.size()));
71
1
  SetupLimit(&in);
72
1
  return ParseFromCodedStream(&in);
73
1
}
74
75
0
std::string LightweightMessage::SerializeAsString() const {
76
0
  size_t size = SerializedSize();
77
0
  std::string result;
78
0
  result.resize(size);
79
0
  SerializeToArray(pointer_cast<uint8_t*>(const_cast<char*>(result.data())));
80
0
  return result;
81
0
}
82
83
5
std::string LightweightMessage::ShortDebugString() const {
84
5
  std::string result;
85
5
  AppendToDebugString(&result);
86
5
  return result;
87
5
}
88
89
72.2M
Status AnyMessagePtr::ParseFromSlice(const Slice& slice) {
90
72.2M
  if (is_lightweight()) {
91
0
    return lightweight()->ParseFromSlice(slice);
92
0
  }
93
94
72.2M
  google::protobuf::io::CodedInputStream in(slice.data(), narrow_cast<int>(slice.size()));
95
72.2M
  SetupLimit(&in);
96
72.2M
  auto* proto = protobuf();
97
72.2M
  if (PREDICT_FALSE(!proto->ParseFromCodedStream(&in))) {
98
0
    return STATUS(InvalidArgument, proto->InitializationErrorString());
99
0
  }
100
72.2M
  return Status::OK();
101
72.2M
}
102
103
292M
size_t AnyMessageConstPtr::SerializedSize() const {
104
292M
  return is_lightweight() ? 
lightweight()->SerializedSize()2
:
protobuf()->ByteSizeLong()292M
;
105
292M
}
106
107
146M
Result<uint8_t*> AnyMessageConstPtr::SerializeToArray(uint8_t* out) const {
108
146M
  if (is_lightweight()) {
109
1
    return lightweight()->SerializeToArray(out);
110
1
  }
111
112
146M
  auto* proto = protobuf();
113
146M
  if (PREDICT_FALSE(!proto->IsInitialized())) {
114
1
    return STATUS_FORMAT(InvalidArgument, "RPC argument missing required fields: $0 ($1)",
115
1
                         proto->InitializationErrorString(), proto->ShortDebugString());
116
1
  }
117
118
146M
  return proto->SerializeWithCachedSizesToArray(out);
119
146M
}
120
121
template <>
122
bool LightweightSerialization<WireFormatLite::TYPE_DOUBLE, double>::Read(
123
1
    google::protobuf::io::CodedInputStream* input, double* out) {
124
1
  return input->ReadLittleEndian64(reinterpret_cast<uint64_t*>(out));
125
1
}
126
127
template <>
128
uint8_t* LightweightSerialization<WireFormatLite::TYPE_DOUBLE, double>::Write(
129
1
    double value, uint8_t* out) {
130
1
  return CodedOutputStream::WriteLittleEndian64ToArray(*reinterpret_cast<uint64*>(&value), out);
131
1
}
132
133
template <>
134
bool LightweightSerialization<WireFormatLite::TYPE_FLOAT, float>::Read(
135
1
    google::protobuf::io::CodedInputStream* input, float* out) {
136
1
  return input->ReadLittleEndian32(reinterpret_cast<uint32_t*>(out));
137
1
}
138
139
template <>
140
uint8_t* LightweightSerialization<WireFormatLite::TYPE_FLOAT, float>::Write(
141
1
    float value, uint8_t* out) {
142
1
  return CodedOutputStream::WriteLittleEndian32ToArray(*reinterpret_cast<uint32*>(&value), out);
143
1
}
144
145
template <>
146
bool LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Read(
147
12
    google::protobuf::io::CodedInputStream* input, int64_t* out) {
148
12
  return input->ReadVarint64(reinterpret_cast<uint64_t*>(out));
149
12
}
150
151
template <>
152
uint8_t* LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Write(
153
12
    int64_t value, uint8_t* out) {
154
12
  return CodedOutputStream::WriteVarint64ToArray(value, out);
155
12
}
156
157
template <>
158
24
size_t LightweightSerialization<WireFormatLite::TYPE_INT64, int64_t>::Size(int64_t value) {
159
24
  return CodedOutputStream::VarintSize64(value);
160
24
}
161
162
template <>
163
bool LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Read(
164
128
    google::protobuf::io::CodedInputStream* input, uint64_t* out) {
165
128
  return input->ReadVarint64(out);
166
128
}
167
168
template <>
169
uint8_t* LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Write(
170
128
    uint64_t value, uint8_t* out) {
171
128
  return CodedOutputStream::WriteVarint64ToArray(value, out);
172
128
}
173
174
template <>
175
256
size_t LightweightSerialization<WireFormatLite::TYPE_UINT64, uint64_t>::Size(uint64_t value) {
176
256
  return CodedOutputStream::VarintSize64(value);
177
256
}
178
179
template <>
180
bool LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Read(
181
1
    google::protobuf::io::CodedInputStream* input, int32_t* out) {
182
1
  return input->ReadVarint32(reinterpret_cast<uint32_t*>(out));
183
1
}
184
185
template <>
186
uint8_t* LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Write(
187
1
    int32_t value, uint8_t* out) {
188
1
  return CodedOutputStream::WriteVarint32ToArray(value, out);
189
1
}
190
191
template <>
192
2
size_t LightweightSerialization<WireFormatLite::TYPE_INT32, int32_t>::Size(int32_t value) {
193
2
  return CodedOutputStream::VarintSize32(value);
194
2
}
195
196
template <>
197
bool LightweightSerialization<WireFormatLite::TYPE_FIXED64, uint64_t>::Read(
198
1
    google::protobuf::io::CodedInputStream* input, uint64_t* out) {
199
1
  return input->ReadLittleEndian64(out);
200
1
}
201
202
template <>
203
uint8_t* LightweightSerialization<WireFormatLite::TYPE_FIXED64, uint64_t>::Write(
204
1
    uint64_t value, uint8_t* out) {
205
1
  return CodedOutputStream::WriteLittleEndian64ToArray(value, out);
206
1
}
207
208
template <>
209
bool LightweightSerialization<WireFormatLite::TYPE_FIXED32, uint32_t>::Read(
210
58
    google::protobuf::io::CodedInputStream* input, uint32_t* out) {
211
58
  return input->ReadLittleEndian32(out);
212
58
}
213
214
template <>
215
uint8_t* LightweightSerialization<WireFormatLite::TYPE_FIXED32, uint32_t>::Write(
216
48
    uint32_t value, uint8_t* out) {
217
48
  return CodedOutputStream::WriteLittleEndian32ToArray(value, out);
218
48
}
219
220
template <>
221
bool LightweightSerialization<WireFormatLite::TYPE_BOOL, bool>::Read(
222
0
    google::protobuf::io::CodedInputStream* input, bool* out) {
223
0
  uint64 temp;
224
0
  if (!input->ReadVarint64(&temp)) {
225
0
    return false;
226
0
  }
227
0
  *out = temp != 0;
228
0
  return true;
229
0
}
230
231
template <>
232
uint8_t* LightweightSerialization<WireFormatLite::TYPE_BOOL, bool>::Write(
233
0
    bool value, uint8_t* out) {
234
0
  return CodedOutputStream::WriteVarint64ToArray(value, out);
235
0
}
236
237
template <>
238
bool LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Read(
239
12
    google::protobuf::io::CodedInputStream* input, uint32_t* out) {
240
12
  return input->ReadVarint32(out);
241
12
}
242
243
template <>
244
uint8_t* LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Write(
245
22
    uint32_t value, uint8_t* out) {
246
22
  return CodedOutputStream::WriteVarint32ToArray(value, out);
247
22
}
248
249
template <>
250
44
size_t LightweightSerialization<WireFormatLite::TYPE_UINT32, uint32_t>::Size(uint32_t value) {
251
44
  return CodedOutputStream::VarintSize32(value);
252
44
}
253
254
template <>
255
bool LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Read(
256
122
    google::protobuf::io::CodedInputStream* input, Slice* out) {
257
122
  return SliceRead(input, out);
258
122
}
259
260
template <>
261
uint8_t* LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Write(
262
188
    Slice value, uint8_t* out) {
263
188
  return SliceWrite(value, out);
264
188
}
265
266
template <>
267
376
size_t LightweightSerialization<WireFormatLite::TYPE_BYTES, Slice>::Size(Slice value) {
268
376
  return SliceSize(value);
269
376
}
270
271
template <>
272
bool LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Read(
273
56
    google::protobuf::io::CodedInputStream* input, Slice* out) {
274
56
  return SliceRead(input, out);
275
56
}
276
277
template <>
278
uint8_t* LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Write(
279
63
    Slice value, uint8_t* out) {
280
63
  return SliceWrite(value, out);
281
63
}
282
283
template <>
284
size_t LightweightSerialization<WireFormatLite::TYPE_STRING, Slice>::Size(
285
126
    Slice value) {
286
126
  return SliceSize(value);
287
126
}
288
289
template <>
290
bool LightweightSerialization<WireFormatLite::TYPE_SFIXED32, int32_t>::Read(
291
12
    google::protobuf::io::CodedInputStream* input, int32_t* out) {
292
12
  return input->ReadLittleEndian32(reinterpret_cast<uint32_t*>(out));
293
12
}
294
295
template <>
296
uint8_t* LightweightSerialization<WireFormatLite::TYPE_SFIXED32, int32_t>::Write(
297
18
    int32_t value, uint8_t* out) {
298
18
  return CodedOutputStream::WriteLittleEndian32ToArray(value, out);
299
18
}
300
301
template <>
302
bool LightweightSerialization<WireFormatLite::TYPE_SFIXED64, int64_t>::Read(
303
1
    google::protobuf::io::CodedInputStream* input, int64_t* out) {
304
1
  return input->ReadLittleEndian64(reinterpret_cast<uint64_t*>(out));
305
1
}
306
307
template <>
308
uint8_t* LightweightSerialization<WireFormatLite::TYPE_SFIXED64, int64_t>::Write(
309
1
    int64_t value, uint8_t* out) {
310
1
  return CodedOutputStream::WriteLittleEndian64ToArray(value, out);
311
1
}
312
313
template <>
314
bool LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Read(
315
144
    google::protobuf::io::CodedInputStream* input, int32_t* out) {
316
144
  uint32_t temp;
317
144
  if (!input->ReadVarint32(&temp)) {
318
0
    return false;
319
0
  }
320
144
  *out = WireFormatLite::ZigZagDecode32(temp);
321
144
  return true;
322
144
}
323
324
template <>
325
uint8_t* LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Write(
326
222
    int32_t value, uint8_t* out) {
327
222
  return CodedOutputStream::WriteVarint32ToArray(WireFormatLite::ZigZagEncode32(value), out);
328
222
}
329
330
template <>
331
444
size_t LightweightSerialization<WireFormatLite::TYPE_SINT32, int32_t>::Size(int32_t value) {
332
444
  return CodedOutputStream::VarintSize32(WireFormatLite::ZigZagEncode32(value));
333
444
}
334
335
template <>
336
bool LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Read(
337
1
    google::protobuf::io::CodedInputStream* input, int64_t* out) {
338
1
  uint64_t temp;
339
1
  if (!input->ReadVarint64(&temp)) {
340
0
    return false;
341
0
  }
342
1
  *out = WireFormatLite::ZigZagDecode64(temp);
343
1
  return true;
344
1
}
345
346
template <>
347
uint8_t* LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Write(
348
1
    int64_t value, uint8_t* out) {
349
1
  return CodedOutputStream::WriteVarint64ToArray(WireFormatLite::ZigZagEncode64(value), out);
350
1
}
351
352
template <>
353
2
size_t LightweightSerialization<WireFormatLite::TYPE_SINT64, int64_t>::Size(int64_t value) {
354
2
  return CodedOutputStream::VarintSize64(WireFormatLite::ZigZagEncode64(value));
355
2
}
356
357
3.08k
void AppendFieldTitle(const char* name, const char* suffix, bool* first, std::string* out) {
358
3.08k
  if (*first) {
359
137
    *first = false;
360
2.94k
  } else {
361
2.94k
    *out += ' ';
362
2.94k
  }
363
3.08k
  *out += name;
364
3.08k
  *out += suffix;
365
3.08k
}
366
367
0
CHECKED_STATUS ParseFailed(const char* field_name) {
368
0
  return STATUS_FORMAT(Corruption, "Failed to parse '$0'", field_name);
369
0
}
370
371
285M
void SetupLimit(google::protobuf::io::CodedInputStream* in) {
372
285M
  in->SetTotalBytesLimit(narrow_cast<int>(FLAGS_rpc_max_message_size),
373
285M
                         narrow_cast<int>(FLAGS_rpc_max_message_size * 3 / 4));
374
285M
}
375
376
} // namespace rpc
377
} // namespace yb