/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/util/cql_message.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/cql/ql/util/cql_message.h" |
15 | | |
16 | | #include <lz4.h> |
17 | | #include <snappy.h> |
18 | | |
19 | | #include "yb/common/ql_protocol.pb.h" |
20 | | #include "yb/common/ql_type.h" |
21 | | #include "yb/common/ql_value.h" |
22 | | #include "yb/common/schema.h" |
23 | | |
24 | | #include "yb/gutil/casts.h" |
25 | | #include "yb/gutil/endian.h" |
26 | | #include "yb/gutil/strings/substitute.h" |
27 | | |
28 | | #include "yb/util/logging.h" |
29 | | #include "yb/util/random_util.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/status_format.h" |
32 | | |
33 | | namespace yb { |
34 | | namespace ql { |
35 | | |
36 | | using std::shared_ptr; |
37 | | using std::unique_ptr; |
38 | | using std::string; |
39 | | using std::set; |
40 | | using std::vector; |
41 | | using std::unordered_map; |
42 | | using strings::Substitute; |
43 | | using snappy::GetUncompressedLength; |
44 | | using snappy::MaxCompressedLength; |
45 | | using snappy::RawUncompress; |
46 | | using snappy::RawCompress; |
47 | | |
48 | | #undef RETURN_NOT_ENOUGH |
49 | | #define RETURN_NOT_ENOUGH(sz) \ |
50 | 7.08M | do { \ |
51 | 7.08M | if (body_.size() < (sz)) { \ |
52 | 0 | return STATUS(NetworkError, "Truncated CQL message"); \ |
53 | 0 | } \ |
54 | 7.08M | } while (0) |
55 | | |
56 | | constexpr char CQLMessage::kCQLVersionOption[]; |
57 | | constexpr char CQLMessage::kCompressionOption[]; |
58 | | constexpr char CQLMessage::kNoCompactOption[]; |
59 | | |
60 | | constexpr char CQLMessage::kLZ4Compression[]; |
61 | | constexpr char CQLMessage::kSnappyCompression[]; |
62 | | |
63 | | constexpr char CQLMessage::kTopologyChangeEvent[]; |
64 | | constexpr char CQLMessage::kStatusChangeEvent[]; |
65 | | constexpr char CQLMessage::kSchemaChangeEvent[]; |
66 | | |
67 | | CHECKED_STATUS CQLMessage::QueryParameters::GetBindVariableValue(const std::string& name, |
68 | | const size_t pos, |
69 | 10.7M | const Value** value) const { |
70 | 10.7M | if (!value_map.empty()) { |
71 | 148 | const auto itr = value_map.find(name); |
72 | 148 | if (itr == value_map.end()) { |
73 | 2 | return STATUS_SUBSTITUTE(RuntimeError, "Bind variable \"$0\" not found", name); |
74 | 2 | } |
75 | 146 | *value = &values[itr->second]; |
76 | 10.7M | } else { |
77 | 10.7M | if (pos >= values.size()) { |
78 | | // Return error with 1-based position. |
79 | 1 | return STATUS_SUBSTITUTE(RuntimeError, "Bind variable at position $0 not found", pos + 1); |
80 | 1 | } |
81 | 10.7M | *value = &values[pos]; |
82 | 10.7M | } |
83 | | |
84 | 10.7M | return Status::OK(); |
85 | 10.7M | } |
86 | | |
87 | | Result<bool> CQLMessage::QueryParameters::IsBindVariableUnset(const std::string& name, |
88 | 3.55M | const int64_t pos) const { |
89 | 3.55M | const Value* value = nullptr; |
90 | 3.55M | RETURN_NOT_OK(GetBindVariableValue(name, pos, &value)); |
91 | 3.55M | return (value->kind == Value::Kind::NOT_SET); |
92 | 3.55M | } |
93 | | |
94 | | Status CQLMessage::QueryParameters::GetBindVariable(const std::string& name, |
95 | | const int64_t pos, |
96 | | const shared_ptr<QLType>& type, |
97 | 7.17M | QLValue* value) const { |
98 | 7.17M | const Value* v = nullptr; |
99 | 7.17M | RETURN_NOT_OK(GetBindVariableValue(name, pos, &v)); |
100 | 7.17M | switch (v->kind) { |
101 | 7.17M | case Value::Kind::NOT_NULL: { |
102 | 7.17M | if (v->value.empty()) { |
103 | 3 | switch (type->main()) { |
104 | 1 | case DataType::STRING: |
105 | 1 | value->set_string_value(""); |
106 | 1 | return Status::OK(); |
107 | 2 | case DataType::BINARY: |
108 | 2 | value->set_binary_value(""); |
109 | 2 | return Status::OK(); |
110 | 0 | case DataType::NULL_VALUE_TYPE: FALLTHROUGH_INTENDED; |
111 | 0 | case DataType::INT8: FALLTHROUGH_INTENDED; |
112 | 0 | case DataType::INT16: FALLTHROUGH_INTENDED; |
113 | 0 | case DataType::INT32: FALLTHROUGH_INTENDED; |
114 | 0 | case DataType::INT64: FALLTHROUGH_INTENDED; |
115 | 0 | case DataType::BOOL: FALLTHROUGH_INTENDED; |
116 | 0 | case DataType::FLOAT: FALLTHROUGH_INTENDED; |
117 | 0 | case DataType::DOUBLE: FALLTHROUGH_INTENDED; |
118 | 0 | case DataType::TIMESTAMP: FALLTHROUGH_INTENDED; |
119 | 0 | case DataType::DECIMAL: FALLTHROUGH_INTENDED; |
120 | 0 | case DataType::VARINT: FALLTHROUGH_INTENDED; |
121 | 0 | case DataType::INET: FALLTHROUGH_INTENDED; |
122 | 0 | case DataType::JSONB: FALLTHROUGH_INTENDED; |
123 | 0 | case DataType::LIST: FALLTHROUGH_INTENDED; |
124 | 0 | case DataType::MAP: FALLTHROUGH_INTENDED; |
125 | 0 | case DataType::SET: FALLTHROUGH_INTENDED; |
126 | 0 | case DataType::UUID: FALLTHROUGH_INTENDED; |
127 | 0 | case DataType::TIMEUUID: |
128 | 0 | value->SetNull(); |
129 | 0 | return Status::OK(); |
130 | 0 | case DataType::UNKNOWN_DATA: FALLTHROUGH_INTENDED; |
131 | 0 | case DataType::TUPLE: FALLTHROUGH_INTENDED; |
132 | 0 | case DataType::TYPEARGS: FALLTHROUGH_INTENDED; |
133 | 0 | case DataType::USER_DEFINED_TYPE: FALLTHROUGH_INTENDED; |
134 | 0 | case DataType::FROZEN: FALLTHROUGH_INTENDED; |
135 | 0 | case DataType::DATE: FALLTHROUGH_INTENDED; |
136 | 0 | case DataType::TIME: FALLTHROUGH_INTENDED; |
137 | 0 | QL_INVALID_TYPES_IN_SWITCH: |
138 | 0 | break; |
139 | 0 | } |
140 | 0 | return STATUS_SUBSTITUTE( |
141 | 0 | NotSupported, "Unsupported datatype $0", static_cast<int>(type->main())); |
142 | 0 | } |
143 | 7.17M | Slice data(v->value); |
144 | 7.17M | return value->Deserialize(type, YQL_CLIENT_CQL, &data); |
145 | 7.17M | } |
146 | 29 | case Value::Kind::IS_NULL: |
147 | 29 | value->SetNull(); |
148 | 29 | return Status::OK(); |
149 | 0 | case Value::Kind::NOT_SET: |
150 | 0 | return Status::OK(); |
151 | 0 | } |
152 | 0 | return STATUS_SUBSTITUTE( |
153 | 0 | RuntimeError, "Invalid bind variable kind $0", static_cast<int>(v->kind)); |
154 | 0 | } |
155 | | |
156 | 4.52M | Status CQLMessage::QueryParameters::ValidateConsistency() { |
157 | 4.52M | switch (consistency) { |
158 | 29.0k | case Consistency::LOCAL_ONE: FALLTHROUGH_INTENDED; |
159 | 4.45M | case Consistency::QUORUM: { |
160 | | // We are repurposing cassandra's "QUORUM" consistency level to indicate "STRONG" |
161 | | // consistency for YB. Although, by default the datastax client uses "LOCAL_ONE" as its |
162 | | // consistency level and since by default we want to support STRONG consistency, we use |
163 | | // STRONG consistency even for LOCAL_ONE. This way existing cassandra clients don't need |
164 | | // any changes. |
165 | 4.45M | set_yb_consistency_level(YBConsistencyLevel::STRONG); |
166 | 4.45M | break; |
167 | 29.0k | } |
168 | 79.9k | case Consistency::ONE: { |
169 | | // Here we repurpose cassandra's ONE consistency level to be CONSISTENT_PREFIX for us since |
170 | | // that seems to be the most appropriate. |
171 | 79.9k | set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
172 | 79.9k | break; |
173 | 29.0k | } |
174 | 56 | default: |
175 | 56 | YB_LOG_EVERY_N_SECS(WARNING, 10) << "Consistency level " << static_cast<uint16_t>(consistency) |
176 | 16 | << " is not supported, defaulting to strong consistency"; |
177 | 56 | set_yb_consistency_level(YBConsistencyLevel::STRONG); |
178 | 4.52M | } |
179 | 4.53M | return Status::OK(); |
180 | 4.52M | } |
181 | | |
182 | | namespace { |
183 | | |
184 | | template<class Type> |
185 | 13.6M | Type LoadByte(const Slice& slice, size_t offset) { |
186 | 13.6M | return static_cast<Type>(Load8(slice.data() + offset)); |
187 | 13.6M | } cql_message.cc:_ZN2yb2ql12_GLOBAL__N_18LoadByteIhEET_RKNS_5SliceEm Line | Count | Source | 185 | 9.11M | Type LoadByte(const Slice& slice, size_t offset) { | 186 | 9.11M | return static_cast<Type>(Load8(slice.data() + offset)); | 187 | 9.11M | } |
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_18LoadByteINS0_10CQLMessage6OpcodeEEET_RKNS_5SliceEm Line | Count | Source | 185 | 4.55M | Type LoadByte(const Slice& slice, size_t offset) { | 186 | 4.55M | return static_cast<Type>(Load8(slice.data() + offset)); | 187 | 4.55M | } |
|
188 | | |
189 | | template<class Type> |
190 | | Type LoadShort(const Slice& slice, size_t offset) { |
191 | | return static_cast<Type>(NetworkByteOrder::Load16(slice.data() + offset)); |
192 | | } |
193 | | |
194 | | template<class Type> |
195 | 4.55M | Type LoadInt(const Slice& slice, size_t offset) { |
196 | 4.55M | return static_cast<Type>(NetworkByteOrder::Load32(slice.data() + offset)); |
197 | 4.55M | } |
198 | | |
199 | | } // namespace |
200 | | |
201 | | // ------------------------------------ CQL request ----------------------------------- |
202 | | bool CQLRequest::ParseRequest( |
203 | | const Slice& mesg, const CompressionScheme compression_scheme, |
204 | 4.56M | unique_ptr<CQLRequest>* request, unique_ptr<CQLResponse>* error_response) { |
205 | | |
206 | 4.56M | *request = nullptr; |
207 | 4.56M | *error_response = nullptr; |
208 | | |
209 | 4.56M | if (mesg.size() < kMessageHeaderLength) { |
210 | 0 | error_response->reset( |
211 | 0 | new ErrorResponse( |
212 | 0 | static_cast<StreamId>(0), ErrorResponse::Code::PROTOCOL_ERROR, "Incomplete header")); |
213 | 0 | return false; |
214 | 0 | } |
215 | | |
216 | 4.56M | Header header( |
217 | 4.56M | LoadByte<Version>(mesg, kHeaderPosVersion), |
218 | 4.56M | LoadByte<Flags>(mesg, kHeaderPosFlags), |
219 | 4.56M | ParseStreamId(mesg), |
220 | 4.56M | LoadByte<Opcode>(mesg, kHeaderPosOpcode)); |
221 | | |
222 | 4.56M | uint32_t length = LoadInt<uint32_t>(mesg, kHeaderPosLength); |
223 | 3.43k | DVLOG(4) << "CQL message " |
224 | 3.43k | << "version 0x" << std::hex << static_cast<uint32_t>(header.version) << " " |
225 | 3.43k | << "flags 0x" << std::hex << static_cast<uint32_t>(header.flags) << " " |
226 | 3.43k | << "stream id " << std::dec << static_cast<uint32_t>(header.stream_id) << " " |
227 | 3.43k | << "opcode 0x" << std::hex << static_cast<uint32_t>(header.opcode) << " " |
228 | 3.43k | << "length " << std::dec << static_cast<uint32_t>(length); |
229 | | |
230 | | // Verify proper version that the response bit is not set in the request protocol version and |
231 | | // the protocol version is one we support. |
232 | 4.56M | if (header.version & kResponseVersion) { |
233 | 0 | error_response->reset( |
234 | 0 | new ErrorResponse( |
235 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request")); |
236 | 0 | return false; |
237 | 0 | } |
238 | 4.56M | if (header.version < kMinimumVersion || header.version > kCurrentVersion) { |
239 | 7 | error_response->reset( |
240 | 7 | new ErrorResponse( |
241 | 7 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
242 | 7 | Substitute("Invalid or unsupported protocol version $0. Supported versions are between " |
243 | 7 | "$1 and $2.", header.version, kMinimumVersion, kCurrentVersion))); |
244 | 7 | return false; |
245 | 7 | } |
246 | | |
247 | 4.56M | size_t body_size = mesg.size() - kMessageHeaderLength; |
248 | 4.55M | const uint8_t* body_data = body_size > 0 ? mesg.data() + kMessageHeaderLength : to_uchar_ptr(""); |
249 | 4.56M | unique_ptr<uint8_t[]> buffer; |
250 | | |
251 | | // If the message body is compressed, uncompress it. |
252 | 4.56M | if (body_size > 0 && (header.flags & kCompressionFlag)) { |
253 | 306 | if (header.opcode == Opcode::STARTUP) { |
254 | 0 | error_response->reset( |
255 | 0 | new ErrorResponse( |
256 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
257 | 0 | "STARTUP request should not be compressed")); |
258 | 0 | return false; |
259 | 0 | } |
260 | 306 | switch (compression_scheme) { |
261 | 202 | case CompressionScheme::kLz4: { |
262 | 202 | if (body_size < sizeof(uint32_t)) { |
263 | 0 | error_response->reset( |
264 | 0 | new ErrorResponse( |
265 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
266 | 0 | "Insufficient compressed data")); |
267 | 0 | return false; |
268 | 0 | } |
269 | | |
270 | 202 | const uint32_t uncomp_size = NetworkByteOrder::Load32(body_data); |
271 | 202 | buffer = std::make_unique<uint8_t[]>(uncomp_size); |
272 | 202 | body_data += sizeof(uncomp_size); |
273 | 202 | body_size -= sizeof(uncomp_size); |
274 | 202 | const int size = LZ4_decompress_safe( |
275 | 202 | to_char_ptr(body_data), to_char_ptr(buffer.get()), narrow_cast<int>(body_size), |
276 | 202 | uncomp_size); |
277 | 203 | if (size < 0 || static_cast<uint32_t>(size) != uncomp_size) { |
278 | 0 | error_response->reset( |
279 | 0 | new ErrorResponse( |
280 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
281 | 0 | "Error occurred when uncompressing CQL message")); |
282 | 0 | return false; |
283 | 0 | } |
284 | 202 | body_data = buffer.get(); |
285 | 202 | body_size = uncomp_size; |
286 | 202 | break; |
287 | 202 | } |
288 | 104 | case CompressionScheme::kSnappy: { |
289 | 104 | size_t uncomp_size = 0; |
290 | 104 | if (GetUncompressedLength(to_char_ptr(body_data), body_size, &uncomp_size)) { |
291 | 104 | buffer = std::make_unique<uint8_t[]>(uncomp_size); |
292 | 104 | if (RawUncompress(to_char_ptr(body_data), body_size, to_char_ptr(buffer.get()))) { |
293 | 103 | body_data = buffer.get(); |
294 | 103 | body_size = uncomp_size; |
295 | 103 | break; |
296 | 103 | } |
297 | 1 | } |
298 | 1 | error_response->reset( |
299 | 1 | new ErrorResponse( |
300 | 1 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
301 | 1 | "Error occurred when uncompressing CQL message")); |
302 | 1 | break; |
303 | 1 | } |
304 | 0 | case CompressionScheme::kNone: |
305 | 0 | error_response->reset( |
306 | 0 | new ErrorResponse( |
307 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, |
308 | 0 | "No compression scheme specified")); |
309 | 0 | return false; |
310 | 4.56M | } |
311 | 4.56M | } |
312 | | |
313 | 4.56M | const Slice body = (body_size == 0) ? Slice() : Slice(body_data, body_size); |
314 | | |
315 | | // Construct the skeleton request by the opcode |
316 | 4.56M | switch (header.opcode) { |
317 | 9.39k | case Opcode::STARTUP: |
318 | 9.39k | request->reset(new StartupRequest(header, body)); |
319 | 9.39k | break; |
320 | 6.28k | case Opcode::AUTH_RESPONSE: |
321 | 6.28k | request->reset(new AuthResponseRequest(header, body)); |
322 | 6.28k | break; |
323 | 2.50k | case Opcode::OPTIONS: |
324 | 2.50k | request->reset(new OptionsRequest(header, body)); |
325 | 2.50k | break; |
326 | 159k | case Opcode::QUERY: |
327 | 159k | request->reset(new QueryRequest(header, body)); |
328 | 159k | break; |
329 | 5.03k | case Opcode::PREPARE: |
330 | 5.03k | request->reset(new PrepareRequest(header, body)); |
331 | 5.03k | break; |
332 | 4.37M | case Opcode::EXECUTE: |
333 | 4.37M | request->reset(new ExecuteRequest(header, body)); |
334 | 4.37M | break; |
335 | 1.65k | case Opcode::BATCH: |
336 | 1.65k | request->reset(new BatchRequest(header, body)); |
337 | 1.65k | break; |
338 | 2.23k | case Opcode::REGISTER: |
339 | 2.23k | request->reset(new RegisterRequest(header, body)); |
340 | 2.23k | break; |
341 | | |
342 | | // These are not request but response opcodes |
343 | 0 | case Opcode::ERROR: |
344 | 0 | case Opcode::READY: |
345 | 0 | case Opcode::AUTHENTICATE: |
346 | 0 | case Opcode::SUPPORTED: |
347 | 0 | case Opcode::RESULT: |
348 | 0 | case Opcode::EVENT: |
349 | 0 | case Opcode::AUTH_CHALLENGE: |
350 | 0 | case Opcode::AUTH_SUCCESS: |
351 | 0 | error_response->reset( |
352 | 0 | new ErrorResponse( |
353 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request opcode")); |
354 | 0 | return false; |
355 | | |
356 | | // default: -> fall through |
357 | 4.56M | } |
358 | | |
359 | 4.56M | if (*request == nullptr) { |
360 | 0 | error_response->reset( |
361 | 0 | new ErrorResponse( |
362 | 0 | header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Unknown opcode")); |
363 | 0 | return false; |
364 | 0 | } |
365 | | |
366 | | // Parse the request body |
367 | 4.56M | const Status status = (*request)->ParseBody(); |
368 | 4.56M | if (!status.ok()) { |
369 | 0 | error_response->reset( |
370 | 0 | new ErrorResponse( |
371 | 0 | *(*request), ErrorResponse::Code::PROTOCOL_ERROR, status.message().ToString())); |
372 | 4.56M | } else if (!(*request)->body_.empty()) { |
373 | | // Flag error when there are bytes remaining after we have parsed the whole request body |
374 | | // according to the protocol. Either the request's length field from the client is |
375 | | // wrong. Or we could have a bug in our parser. |
376 | 0 | error_response->reset( |
377 | 0 | new ErrorResponse( |
378 | 0 | *(*request), ErrorResponse::Code::PROTOCOL_ERROR, "Request length too long")); |
379 | 0 | } |
380 | | |
381 | | // If there is any error, free the partially parsed request and return. |
382 | 4.56M | if (*error_response != nullptr) { |
383 | 0 | *request = nullptr; |
384 | 0 | return false; |
385 | 0 | } |
386 | | |
387 | | // Clear and release the body after parsing. |
388 | 4.56M | (*request)->body_.clear(); |
389 | | |
390 | 4.56M | return true; |
391 | 4.56M | } |
392 | | |
393 | 4.55M | CQLRequest::CQLRequest(const Header& header, const Slice& body) : CQLMessage(header), body_(body) { |
394 | 4.55M | } |
395 | | |
396 | 4.56M | CQLRequest::~CQLRequest() { |
397 | 4.56M | } |
398 | | |
399 | | //---------------------------------------------------------------------------------------- |
400 | | |
401 | 0 | Status CQLRequest::ParseUUID(string* value) { |
402 | 0 | RETURN_NOT_ENOUGH(kUUIDSize); |
403 | 0 | *value = string(to_char_ptr(body_.data()), kUUIDSize); |
404 | 0 | body_.remove_prefix(kUUIDSize); |
405 | 0 | DVLOG(4) << "CQL uuid " << *value; |
406 | 0 | return Status::OK(); |
407 | 0 | } |
408 | | |
409 | 0 | Status CQLRequest::ParseTimeUUID(string* value) { |
410 | 0 | RETURN_NOT_ENOUGH(kUUIDSize); |
411 | 0 | *value = string(to_char_ptr(body_.data()), kUUIDSize); |
412 | 0 | body_.remove_prefix(kUUIDSize); |
413 | 0 | DVLOG(4) << "CQL timeuuid " << *value; |
414 | 0 | return Status::OK(); |
415 | 0 | } |
416 | | |
417 | 2.23k | Status CQLRequest::ParseStringList(vector<string>* list) { |
418 | 0 | DVLOG(4) << "CQL string list ..."; |
419 | 2.23k | uint16_t count = 0; |
420 | 2.23k | RETURN_NOT_OK(ParseShort(&count)); |
421 | 2.23k | list->resize(count); |
422 | 8.94k | for (uint16_t i = 0; i < count; ++i) { |
423 | 6.70k | RETURN_NOT_OK(ParseString(&list->at(i))); |
424 | 6.70k | } |
425 | 2.23k | return Status::OK(); |
426 | 2.23k | } |
427 | | |
428 | 0 | Status CQLRequest::ParseInet(Endpoint* value) { |
429 | 0 | std::string ipaddr; |
430 | 0 | int32_t port = 0; |
431 | 0 | RETURN_NOT_OK(ParseBytes("CQL ipaddr", &CQLRequest::ParseByte, &ipaddr)); |
432 | 0 | RETURN_NOT_OK(ParseInt(&port)); |
433 | 0 | if (port < 0 || port > 65535) { |
434 | 0 | return STATUS(NetworkError, "Invalid inet port"); |
435 | 0 | } |
436 | 0 | IpAddress address; |
437 | 0 | if (ipaddr.size() == boost::asio::ip::address_v4::bytes_type().size()) { |
438 | 0 | boost::asio::ip::address_v4::bytes_type bytes; |
439 | 0 | memcpy(bytes.data(), ipaddr.data(), ipaddr.size()); |
440 | 0 | address = boost::asio::ip::address_v4(bytes); |
441 | 0 | } else if (ipaddr.size() == boost::asio::ip::address_v6::bytes_type().size()) { |
442 | 0 | boost::asio::ip::address_v6::bytes_type bytes; |
443 | 0 | memcpy(bytes.data(), ipaddr.data(), ipaddr.size()); |
444 | 0 | address = boost::asio::ip::address_v6(bytes); |
445 | 0 | } else { |
446 | 0 | return STATUS_SUBSTITUTE(NetworkError, "Invalid size of ipaddr: $0", ipaddr.size()); |
447 | 0 | } |
448 | 0 | *value = Endpoint(address, port); |
449 | 0 | DVLOG(4) << "CQL inet " << *value; |
450 | 0 | return Status::OK(); |
451 | 0 | } |
452 | | |
453 | 9.39k | Status CQLRequest::ParseStringMap(unordered_map<string, string>* map) { |
454 | 0 | DVLOG(4) << "CQL string map ..."; |
455 | 9.39k | uint16_t count = 0; |
456 | 9.39k | RETURN_NOT_OK(ParseShort(&count)); |
457 | 36.1k | for (uint16_t i = 0; i < count; ++i) { |
458 | 26.7k | string name, value; |
459 | 26.7k | RETURN_NOT_OK(ParseString(&name)); |
460 | 26.7k | RETURN_NOT_OK(ParseString(&value)); |
461 | 26.7k | (*map)[name] = value; |
462 | 26.7k | } |
463 | 9.39k | return Status::OK(); |
464 | 9.39k | } |
465 | | |
466 | 0 | Status CQLRequest::ParseStringMultiMap(unordered_map<string, vector<string>>* map) { |
467 | 0 | DVLOG(4) << "CQL string multimap ..."; |
468 | 0 | uint16_t count = 0; |
469 | 0 | RETURN_NOT_OK(ParseShort(&count)); |
470 | 0 | for (uint16_t i = 0; i < count; ++i) { |
471 | 0 | string name; |
472 | 0 | vector<string> value; |
473 | 0 | RETURN_NOT_OK(ParseString(&name)); |
474 | 0 | RETURN_NOT_OK(ParseStringList(&value)); |
475 | 0 | (*map)[name] = value; |
476 | 0 | } |
477 | 0 | return Status::OK(); |
478 | 0 | } |
479 | | |
480 | 0 | Status CQLRequest::ParseBytesMap(unordered_map<string, string>* map) { |
481 | 0 | DVLOG(4) << "CQL bytes map ..."; |
482 | 0 | uint16_t count = 0; |
483 | 0 | RETURN_NOT_OK(ParseShort(&count)); |
484 | 0 | for (uint16_t i = 0; i < count; ++i) { |
485 | 0 | string name, value; |
486 | 0 | RETURN_NOT_OK(ParseString(&name)); |
487 | 0 | RETURN_NOT_OK(ParseBytes(&value)); |
488 | 0 | (*map)[name] = value; |
489 | 0 | } |
490 | 0 | return Status::OK(); |
491 | 0 | } |
492 | | |
493 | 7.08M | Status CQLRequest::ParseValue(const bool with_name, Value* value) { |
494 | 2.05k | DVLOG(4) << "CQL value ..."; |
495 | 7.08M | if (with_name) { |
496 | 111 | RETURN_NOT_OK(ParseString(&value->name)); |
497 | 111 | } |
498 | | // Save data pointer to assign the value with length bytes below. |
499 | 7.08M | const uint8_t* data = body_.data(); |
500 | 7.08M | int32_t length = 0; |
501 | 7.08M | RETURN_NOT_OK(ParseInt(&length)); |
502 | 7.08M | if (length >= 0) { |
503 | 7.08M | value->kind = Value::Kind::NOT_NULL; |
504 | 7.08M | if (length > 0) { |
505 | 7.08M | uint32_t unsigned_length = length; |
506 | 7.08M | RETURN_NOT_ENOUGH(unsigned_length); |
507 | 7.08M | value->value.assign(to_char_ptr(data), kIntSize + length); |
508 | 7.08M | body_.remove_prefix(length); |
509 | 11.4k | DVLOG(4) << "CQL value bytes " << value->value; |
510 | 7.08M | } |
511 | 356 | } else if (VersionIsCompatible(kV4Version)) { |
512 | 32 | switch (length) { |
513 | 29 | case -1: |
514 | 29 | value->kind = Value::Kind::IS_NULL; |
515 | 29 | break; |
516 | 3 | case -2: |
517 | 3 | value->kind = Value::Kind::NOT_SET; |
518 | 3 | break; |
519 | 0 | default: |
520 | 0 | return STATUS(NetworkError, "Invalid length in value"); |
521 | 0 | break; |
522 | 324 | } |
523 | 324 | } else { |
524 | 324 | value->kind = Value::Kind::IS_NULL; |
525 | 324 | } |
526 | 7.08M | return Status::OK(); |
527 | 7.08M | } |
528 | | |
529 | 4.53M | Status CQLRequest::ParseQueryParameters(QueryParameters* params) { |
530 | 3.38k | DVLOG(4) << "CQL query parameters ..."; |
531 | 4.53M | RETURN_NOT_OK(ParseConsistency(¶ms->consistency)); |
532 | 4.53M | RETURN_NOT_OK(params->ValidateConsistency()); |
533 | 4.53M | RETURN_NOT_OK(ParseByte(¶ms->flags)); |
534 | 4.53M | params->set_request_id(RandomUniformInt<uint64_t>()); |
535 | 4.53M | if (params->flags & CQLMessage::QueryParameters::kWithValuesFlag) { |
536 | 4.37M | const bool with_name = (params->flags & CQLMessage::QueryParameters::kWithNamesForValuesFlag); |
537 | 4.37M | uint16_t count = 0; |
538 | 4.37M | RETURN_NOT_OK(ParseShort(&count)); |
539 | 4.37M | params->values.resize(count); |
540 | 10.3M | for (uint16_t i = 0; i < count; ++i) { |
541 | 5.92M | Value& value = params->values[i]; |
542 | 5.92M | RETURN_NOT_OK(ParseValue(with_name, &value)); |
543 | 5.92M | if (with_name) { |
544 | 111 | params->value_map[value.name] = i; |
545 | 111 | } |
546 | 5.92M | } |
547 | 4.37M | } |
548 | 4.53M | if (params->flags & CQLMessage::QueryParameters::kWithPageSizeFlag) { |
549 | 4.42M | int32_t page_size = 0; |
550 | 4.42M | RETURN_NOT_OK(ParseInt(&page_size)); |
551 | 4.42M | params->set_page_size(page_size); |
552 | 4.42M | } |
553 | 4.53M | if (params->flags & CQLMessage::QueryParameters::kWithPagingStateFlag) { |
554 | 279 | string paging_state; |
555 | 279 | RETURN_NOT_OK(ParseBytes(&paging_state)); |
556 | 279 | RETURN_NOT_OK(params->SetPagingState(paging_state)); |
557 | 279 | } |
558 | 4.53M | if (params->flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) { |
559 | 0 | RETURN_NOT_OK(ParseConsistency(¶ms->serial_consistency)); |
560 | 0 | } |
561 | 4.53M | if (params->flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) { |
562 | 4.42M | RETURN_NOT_OK(ParseLong(¶ms->default_timestamp)); |
563 | 4.42M | } |
564 | 4.53M | return Status::OK(); |
565 | 4.53M | } |
566 | | |
567 | 4.92M | CHECKED_STATUS CQLRequest::ParseByte(uint8_t* value) { |
568 | 4.92M | static_assert(sizeof(*value) == kByteSize, "inconsistent byte size"); |
569 | 4.92M | return ParseNum("CQL byte", Load8, value); |
570 | 4.92M | } |
571 | | |
572 | 9.60M | CHECKED_STATUS CQLRequest::ParseShort(uint16_t* value) { |
573 | 9.60M | static_assert(sizeof(*value) == kShortSize, "inconsistent short size"); |
574 | 9.60M | return ParseNum("CQL byte", NetworkByteOrder::Load16, value); |
575 | 9.60M | } |
576 | | |
577 | 11.6M | CHECKED_STATUS CQLRequest::ParseInt(int32_t* value) { |
578 | 11.6M | static_assert(sizeof(*value) == kIntSize, "inconsistent int size"); |
579 | 11.6M | return ParseNum("CQL int", NetworkByteOrder::Load32, value); |
580 | 11.6M | } |
581 | | |
582 | 4.42M | CHECKED_STATUS CQLRequest::ParseLong(int64_t* value) { |
583 | 4.42M | static_assert(sizeof(*value) == kLongSize, "inconsistent long size"); |
584 | 4.42M | return ParseNum("CQL long", NetworkByteOrder::Load64, value); |
585 | 4.42M | } |
586 | | |
587 | 60.3k | CHECKED_STATUS CQLRequest::ParseString(std::string* value) { |
588 | 60.3k | return ParseBytes("CQL string", &CQLRequest::ParseShort, value); |
589 | 60.3k | } |
590 | | |
591 | 165k | CHECKED_STATUS CQLRequest::ParseLongString(std::string* value) { |
592 | 165k | return ParseBytes("CQL long string", &CQLRequest::ParseInt, value); |
593 | 165k | } |
594 | | |
595 | 4.76M | CHECKED_STATUS CQLRequest::ParseShortBytes(std::string* value) { |
596 | 4.76M | return ParseBytes("CQL short bytes", &CQLRequest::ParseShort, value); |
597 | 4.76M | } |
598 | | |
599 | 6.56k | CHECKED_STATUS CQLRequest::ParseBytes(std::string* value) { |
600 | 6.56k | return ParseBytes("CQL bytes", &CQLRequest::ParseInt, value); |
601 | 6.56k | } |
602 | | |
603 | 4.53M | CHECKED_STATUS CQLRequest::ParseConsistency(Consistency* consistency) { |
604 | 4.53M | static_assert(sizeof(*consistency) == kConsistencySize, "inconsistent consistency size"); |
605 | 4.53M | return ParseNum("CQL consistency", NetworkByteOrder::Load16, consistency); |
606 | 4.53M | } |
607 | | |
608 | | // ------------------------------ Individual CQL requests ----------------------------------- |
609 | | StartupRequest::StartupRequest(const Header& header, const Slice& body) |
610 | 9.38k | : CQLRequest(header, body) { |
611 | 9.38k | } |
612 | | |
613 | 9.38k | StartupRequest::~StartupRequest() { |
614 | 9.38k | } |
615 | | |
616 | 9.39k | Status StartupRequest::ParseBody() { |
617 | 9.39k | return ParseStringMap(&options_); |
618 | 9.39k | } |
619 | | |
620 | | //---------------------------------------------------------------------------------------- |
621 | | AuthResponseRequest::AuthResponseRequest(const Header& header, const Slice& body) |
622 | 6.28k | : CQLRequest(header, body) { |
623 | 6.28k | } |
624 | | |
625 | 6.28k | AuthResponseRequest::~AuthResponseRequest() { |
626 | 6.28k | } |
627 | | |
628 | 6.28k | Status AuthResponseRequest::ParseBody() { |
629 | 6.28k | RETURN_NOT_OK(ParseBytes(&token_)); |
630 | 6.28k | string error_msg; |
631 | 6.28k | do { |
632 | 6.28k | if (token_.empty()) { |
633 | 0 | error_msg = "Invalid empty token!"; |
634 | 0 | break; |
635 | 0 | } |
636 | 6.28k | if (token_[0] != '\0') { |
637 | 0 | error_msg = "Invalid format. Message must begin with \\0"; |
638 | 0 | break; |
639 | 0 | } |
640 | 6.28k | size_t next_delim = token_.find_first_of('\0', 1); |
641 | 6.28k | if (next_delim == std::string::npos) { |
642 | 0 | error_msg = "Invalid format. Message must contain \\0 after username"; |
643 | 0 | break; |
644 | 0 | } |
645 | | // Start from token_[1], read all the username. |
646 | 6.28k | params_.username = token_.substr(1, next_delim - 1); |
647 | | // Start from after the delimiter, go to the end. |
648 | 6.28k | params_.password = token_.substr(next_delim + 1); |
649 | 6.28k | return Status::OK(); |
650 | 0 | } while (0); |
651 | 0 | return STATUS(InvalidArgument, error_msg); |
652 | 6.28k | } |
653 | | |
654 | | CHECKED_STATUS AuthResponseRequest::AuthQueryParameters::GetBindVariable( |
655 | | const std::string& name, |
656 | | int64_t pos, |
657 | | const std::shared_ptr<QLType>& type, |
658 | 5.89k | QLValue* value) const { |
659 | 5.89k | if (pos == 0) { |
660 | 5.89k | value->set_string_value(username); |
661 | 5.89k | return Status::OK(); |
662 | 0 | } else { |
663 | 0 | return STATUS(InvalidArgument, Substitute("Bind variable position $0 out of range: ", pos)); |
664 | 0 | } |
665 | 5.89k | } |
666 | | |
667 | | //---------------------------------------------------------------------------------------- |
668 | 2.50k | OptionsRequest::OptionsRequest(const Header& header, const Slice& body) : CQLRequest(header, body) { |
669 | 2.50k | } |
670 | | |
671 | 2.50k | OptionsRequest::~OptionsRequest() { |
672 | 2.50k | } |
673 | | |
674 | 2.50k | Status OptionsRequest::ParseBody() { |
675 | | // Options body is empty |
676 | 2.50k | return Status::OK(); |
677 | 2.50k | } |
678 | | |
679 | | //---------------------------------------------------------------------------------------- |
680 | 159k | QueryRequest::QueryRequest(const Header& header, const Slice& body) : CQLRequest(header, body) { |
681 | 159k | } |
682 | | |
683 | 160k | QueryRequest::~QueryRequest() { |
684 | 160k | } |
685 | | |
686 | 160k | Status QueryRequest::ParseBody() { |
687 | 160k | RETURN_NOT_OK(ParseLongString(&query_)); |
688 | 160k | RETURN_NOT_OK(ParseQueryParameters(¶ms_)); |
689 | 160k | return Status::OK(); |
690 | 160k | } |
691 | | |
692 | | //---------------------------------------------------------------------------------------- |
693 | 5.02k | PrepareRequest::PrepareRequest(const Header& header, const Slice& body) : CQLRequest(header, body) { |
694 | 5.02k | } |
695 | | |
696 | 5.03k | PrepareRequest::~PrepareRequest() { |
697 | 5.03k | } |
698 | | |
699 | 5.03k | Status PrepareRequest::ParseBody() { |
700 | 5.03k | RETURN_NOT_OK(ParseLongString(&query_)); |
701 | 5.03k | return Status::OK(); |
702 | 5.03k | } |
703 | | |
704 | | //---------------------------------------------------------------------------------------- |
705 | 4.37M | ExecuteRequest::ExecuteRequest(const Header& header, const Slice& body) : CQLRequest(header, body) { |
706 | 4.37M | } |
707 | | |
708 | 4.37M | ExecuteRequest::~ExecuteRequest() { |
709 | 4.37M | } |
710 | | |
711 | 4.37M | Status ExecuteRequest::ParseBody() { |
712 | 4.37M | RETURN_NOT_OK(ParseShortBytes(&query_id_)); |
713 | 4.37M | RETURN_NOT_OK(ParseQueryParameters(¶ms_)); |
714 | 4.37M | return Status::OK(); |
715 | 4.37M | } |
716 | | |
717 | | //---------------------------------------------------------------------------------------- |
718 | 1.65k | BatchRequest::BatchRequest(const Header& header, const Slice& body) : CQLRequest(header, body) { |
719 | 1.65k | } |
720 | | |
721 | 1.65k | BatchRequest::~BatchRequest() { |
722 | 1.65k | } |
723 | | |
724 | 1.65k | Status BatchRequest::ParseBody() { |
725 | 1.65k | uint8_t type = 0; |
726 | 1.65k | RETURN_NOT_OK(ParseByte(&type)); |
727 | 1.65k | type_ = static_cast<Type>(type); |
728 | 1.65k | uint16_t query_count = 0; |
729 | 1.65k | RETURN_NOT_OK(ParseShort(&query_count)); |
730 | 1.65k | queries_.resize(query_count); |
731 | 390k | for (uint16_t i = 0; i < query_count; ++i) { |
732 | 389k | Query& query = queries_[i]; |
733 | 389k | uint8_t is_prepared_query = 0; |
734 | 389k | RETURN_NOT_OK(ParseByte(&is_prepared_query)); |
735 | 389k | switch (is_prepared_query) { |
736 | 218 | case 0: |
737 | 218 | query.is_prepared = false; |
738 | 218 | RETURN_NOT_OK(ParseLongString(&query.query)); |
739 | 218 | break; |
740 | 389k | case 1: |
741 | 389k | query.is_prepared = true; |
742 | 389k | RETURN_NOT_OK(ParseShortBytes(&query.query_id)); |
743 | 389k | break; |
744 | 0 | default: |
745 | 0 | return STATUS(NetworkError, "Invalid is_prepared_query byte in batch request"); |
746 | 0 | break; |
747 | 389k | } |
748 | 389k | uint16_t value_count = 0; |
749 | 389k | RETURN_NOT_OK(ParseShort(&value_count)); |
750 | 389k | query.params.values.resize(value_count); |
751 | 1.55M | for (uint16_t j = 0; j < value_count; ++j) { |
752 | | // with_name is not possible in the protocol due to a design flaw. See JIRA CASSANDRA-10246. |
753 | 1.16M | RETURN_NOT_OK(ParseValue(false /* with_name */, &query.params.values[j])); |
754 | 1.16M | } |
755 | 389k | } |
756 | | |
757 | 1.52k | Consistency consistency = Consistency::ANY; |
758 | 1.52k | QueryParameters::Flags flags = 0; |
759 | 1.52k | Consistency serial_consistency = Consistency::ANY; |
760 | 1.52k | int64_t default_timestamp = 0; |
761 | 1.52k | RETURN_NOT_OK(ParseConsistency(&consistency)); |
762 | 1.52k | RETURN_NOT_OK(ParseByte(&flags)); |
763 | 1.52k | if (flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) { |
764 | 0 | RETURN_NOT_OK(ParseConsistency(&serial_consistency)); |
765 | 0 | } |
766 | 1.52k | if (flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) { |
767 | 255 | RETURN_NOT_OK(ParseLong(&default_timestamp)); |
768 | 255 | } |
769 | | |
770 | 389k | for (Query& query : queries_) { |
771 | 389k | QueryParameters& params = query.params; |
772 | 389k | params.consistency = consistency; |
773 | 389k | params.flags = flags; |
774 | 389k | params.serial_consistency = serial_consistency; |
775 | 389k | params.default_timestamp = default_timestamp; |
776 | 389k | } |
777 | 1.52k | return Status::OK(); |
778 | 1.52k | } |
779 | | |
780 | | //---------------------------------------------------------------------------------------- |
781 | | RegisterRequest::RegisterRequest(const Header& header, const Slice& body) |
782 | 2.23k | : CQLRequest(header, body) { |
783 | 2.23k | } |
784 | | |
785 | 2.23k | RegisterRequest::~RegisterRequest() { |
786 | 2.23k | } |
787 | | |
788 | 2.23k | Status RegisterRequest::ParseBody() { |
789 | 2.23k | vector<string> event_types; |
790 | 2.23k | RETURN_NOT_OK(ParseStringList(&event_types)); |
791 | 2.23k | events_ = kNoEvents; |
792 | | |
793 | 6.70k | for (const string& event_type : event_types) { |
794 | 6.70k | if (event_type == kTopologyChangeEvent) { |
795 | 2.23k | events_ |= kTopologyChange; |
796 | 4.47k | } else if (event_type == kStatusChangeEvent) { |
797 | 2.23k | events_ |= kStatusChange; |
798 | 2.23k | } else if (event_type == kSchemaChangeEvent) { |
799 | 2.23k | events_ |= kSchemaChange; |
800 | 0 | } else { |
801 | 0 | return STATUS(NetworkError, "Invalid event type in register request"); |
802 | 0 | } |
803 | 6.70k | } |
804 | | |
805 | 2.23k | return Status::OK(); |
806 | 2.23k | } |
807 | | |
808 | | // --------------------------- Serialization utility functions ------------------------------- |
809 | | namespace { |
810 | | |
811 | | // Serialize a CQL number (8, 16, 32 and 64-bit integer). <num_type> is the integer type. |
812 | | // <converter> converts the number from machine byte-order to network order and <data_type> |
813 | | // is the coverter's return type. The converter's return type <data_type> is unsigned while |
814 | | // <num_type> may be signed or unsigned. |
815 | | template<typename num_type, typename data_type> |
816 | 14.1M | void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) { |
817 | 14.1M | data_type byte_value; |
818 | 14.1M | (*converter)(&byte_value, static_cast<data_type>(val)); |
819 | 14.1M | mesg->append(&byte_value, sizeof(byte_value)); |
820 | 14.1M | } cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIijEEvPFvPvT0_ET_PNS_10faststringE Line | Count | Source | 816 | 12.0M | void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) { | 817 | 12.0M | data_type byte_value; | 818 | 12.0M | (*converter)(&byte_value, static_cast<data_type>(val)); | 819 | 12.0M | mesg->append(&byte_value, sizeof(byte_value)); | 820 | 12.0M | } |
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIttEEvPFvPvT0_ET_PNS_10faststringE Line | Count | Source | 816 | 2.04M | void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) { | 817 | 2.04M | data_type byte_value; | 818 | 2.04M | (*converter)(&byte_value, static_cast<data_type>(val)); | 819 | 2.04M | mesg->append(&byte_value, sizeof(byte_value)); | 820 | 2.04M | } |
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIhhEEvPFvPvT0_ET_PNS_10faststringE Line | Count | Source | 816 | 52.8k | void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) { | 817 | 52.8k | data_type byte_value; | 818 | 52.8k | (*converter)(&byte_value, static_cast<data_type>(val)); | 819 | 52.8k | mesg->append(&byte_value, sizeof(byte_value)); | 820 | 52.8k | } |
|
821 | | |
822 | | // Serialize a CQL byte stream (string or bytes). <len_type> is the length type. |
823 | | // <len_serializer> serializes the byte length from machine byte-order to network order. |
824 | | template<typename len_type> |
825 | | inline void SerializeBytes( |
826 | | void (*len_serializer)(len_type, faststring* mesg), string val, |
827 | 1.10M | faststring* mesg) { |
828 | 1.10M | (*len_serializer)(static_cast<len_type>(val.size()), mesg); |
829 | 1.10M | mesg->append(val); |
830 | 1.10M | } cql_message.cc:_ZN2yb2ql12_GLOBAL__N_114SerializeBytesItEEvPFvT_PNS_10faststringEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEES5_ Line | Count | Source | 827 | 1.10M | faststring* mesg) { | 828 | 1.10M | (*len_serializer)(static_cast<len_type>(val.size()), mesg); | 829 | 1.10M | mesg->append(val); | 830 | 1.10M | } |
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_114SerializeBytesIiEEvPFvT_PNS_10faststringEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEES5_ Line | Count | Source | 827 | 6.44k | faststring* mesg) { | 828 | 6.44k | (*len_serializer)(static_cast<len_type>(val.size()), mesg); | 829 | 6.44k | mesg->append(val); | 830 | 6.44k | } |
|
831 | | |
832 | 52.8k | inline void SerializeByte(const uint8_t value, faststring* mesg) { |
833 | 52.8k | static_assert(sizeof(value) == CQLMessage::kByteSize, "inconsistent byte size"); |
834 | 52.8k | SerializeNum(Store8, value, mesg); |
835 | 52.8k | } |
836 | | |
837 | 2.04M | inline void SerializeShort(const uint16_t value, faststring* mesg) { |
838 | 2.04M | static_assert(sizeof(value) == CQLMessage::kShortSize, "inconsistent short size"); |
839 | 2.04M | SerializeNum(NetworkByteOrder::Store16, value, mesg); |
840 | 2.04M | } |
841 | | |
842 | 12.0M | inline void SerializeInt(const int32_t value, faststring* mesg) { |
843 | 12.0M | static_assert(sizeof(value) == CQLMessage::kIntSize, "inconsistent int size"); |
844 | 12.0M | SerializeNum(NetworkByteOrder::Store32, value, mesg); |
845 | 12.0M | } |
846 | | |
847 | | #if 0 // Save this function for future use |
848 | | inline void SerializeLong(const int64_t value, faststring* mesg) { |
849 | | static_assert(sizeof(value) == CQLMessage::kLongSize, "inconsistent long size"); |
850 | | SerializeNum(NetworkByteOrder::Store64, value, mesg); |
851 | | } |
852 | | #endif |
853 | | |
854 | 1.09M | inline void SerializeString(const string& value, faststring* mesg) { |
855 | 1.09M | SerializeBytes(&SerializeShort, value, mesg); |
856 | 1.09M | } |
857 | | |
858 | | #if 0 // Save these functions for future use |
859 | | inline void SerializeLongString(const string& value, faststring* mesg) { |
860 | | SerializeBytes(&SerializeInt, value, mesg); |
861 | | } |
862 | | #endif |
863 | | |
864 | 6.05k | inline void SerializeShortBytes(const string& value, faststring* mesg) { |
865 | 6.05k | SerializeBytes(&SerializeShort, value, mesg); |
866 | 6.05k | } |
867 | | |
868 | 6.44k | inline void SerializeBytes(const string& value, faststring* mesg) { |
869 | 6.44k | SerializeBytes(&SerializeInt, value, mesg); |
870 | 6.44k | } |
871 | | |
872 | | #if 0 // Save these functions for future use |
873 | | inline void SerializeConsistency(const CQLMessage::Consistency consistency, faststring* mesg) { |
874 | | static_assert( |
875 | | sizeof(consistency) == CQLMessage::kConsistencySize, |
876 | | "inconsistent consistency size"); |
877 | | SerializeNum(NetworkByteOrder::Store16, consistency, mesg); |
878 | | } |
879 | | |
880 | | void SerializeUUID(const string& value, faststring* mesg) { |
881 | | if (value.size() == CQLMessage::kUUIDSize) { |
882 | | mesg->append(value); |
883 | | } else { |
884 | | LOG(ERROR) << "Internal error: inconsistent UUID size: " << value.size(); |
885 | | uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0}; |
886 | | mesg->append(empty_uuid, sizeof(empty_uuid)); |
887 | | } |
888 | | } |
889 | | |
890 | | void SerializeTimeUUID(const string& value, faststring* mesg) { |
891 | | if (value.size() == CQLMessage::kUUIDSize) { |
892 | | mesg->append(value); |
893 | | } else { |
894 | | LOG(ERROR) << "Internal error: inconsistent TimeUUID size: " << value.size(); |
895 | | uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0}; |
896 | | mesg->append(empty_uuid, sizeof(empty_uuid)); |
897 | | } |
898 | | } |
899 | | #endif |
900 | | |
901 | 5.00k | void SerializeStringList(const vector<string>& list, faststring* mesg) { |
902 | 5.00k | SerializeShort(list.size(), mesg); |
903 | 10.0k | for (const auto& entry : list) { |
904 | 10.0k | SerializeString(entry, mesg); |
905 | 10.0k | } |
906 | 5.00k | } |
907 | | |
908 | 52.8k | void SerializeInet(const Endpoint& value, faststring* mesg) { |
909 | 52.8k | auto address = value.address(); |
910 | 52.8k | if (address.is_v4()) { |
911 | 52.8k | auto bytes = address.to_v4().to_bytes(); |
912 | 52.8k | SerializeByte(bytes.size(), mesg); |
913 | 52.8k | mesg->append(bytes.data(), bytes.size()); |
914 | 0 | } else { |
915 | 0 | auto bytes = address.to_v6().to_bytes(); |
916 | 0 | SerializeByte(bytes.size(), mesg); |
917 | 0 | mesg->append(bytes.data(), bytes.size()); |
918 | 0 | } |
919 | 52.8k | const uint16_t port = value.port(); |
920 | 52.8k | SerializeInt(port, mesg); |
921 | 52.8k | } |
922 | | |
923 | | #if 0 // Save these functions for future use |
924 | | void SerializeStringMap(const unordered_map<string, string>& map, faststring* mesg) { |
925 | | SerializeShort(map.size(), mesg); |
926 | | for (const auto& element : map) { |
927 | | SerializeString(element.first, mesg); |
928 | | SerializeString(element.second, mesg); |
929 | | } |
930 | | } |
931 | | #endif |
932 | | |
933 | 2.50k | void SerializeStringMultiMap(const unordered_map<string, vector<string>>& map, faststring* mesg) { |
934 | 2.50k | SerializeShort(map.size(), mesg); |
935 | 5.00k | for (const auto& element : map) { |
936 | 5.00k | SerializeString(element.first, mesg); |
937 | 5.00k | SerializeStringList(element.second, mesg); |
938 | 5.00k | } |
939 | 2.50k | } |
940 | | |
941 | | #if 0 // Save these functions for future use |
942 | | void SerializeBytesMap(const unordered_map<string, string>& map, faststring* mesg) { |
943 | | SerializeShort(map.size(), mesg); |
944 | | for (const auto& element : map) { |
945 | | SerializeString(element.first, mesg); |
946 | | SerializeBytes(element.second, mesg); |
947 | | } |
948 | | } |
949 | | |
950 | | void SerializeValue(const CQLMessage::Value& value, faststring* mesg) { |
951 | | switch (value.kind) { |
952 | | case CQLMessage::Value::Kind::NOT_NULL: |
953 | | SerializeInt(value.value.size(), mesg); |
954 | | mesg->append(value.value); |
955 | | return; |
956 | | case CQLMessage::Value::Kind::IS_NULL: |
957 | | SerializeInt(-1, mesg); |
958 | | return; |
959 | | case CQLMessage::Value::Kind::NOT_SET: // NOT_SET value kind should appear in request msg only. |
960 | | break; |
961 | | // default: fall through |
962 | | } |
963 | | LOG(ERROR) << "Internal error: invalid/unknown value kind " << static_cast<uint32_t>(value.kind); |
964 | | SerializeInt(-1, mesg); |
965 | | } |
966 | | #endif |
967 | | |
968 | | } // namespace |
969 | | |
970 | | // ------------------------------------ CQL response ----------------------------------- |
971 | | CQLResponse::CQLResponse(const CQLRequest& request, const Opcode opcode) |
972 | 4.55M | : CQLMessage(Header(request.version() | kResponseVersion, 0, request.stream_id(), opcode)) { |
973 | 4.55M | } |
974 | | |
975 | | CQLResponse::CQLResponse(const StreamId stream_id, const Opcode opcode) |
976 | 52.8k | : CQLMessage(Header(kCurrentVersion | kResponseVersion, 0, stream_id, opcode)) { |
977 | 52.8k | } |
978 | | |
979 | 4.61M | CQLResponse::~CQLResponse() { |
980 | 4.61M | } |
981 | | |
982 | | // Short-hand macros for serializing fields from the message header |
983 | | #define SERIALIZE_BYTE(buf, pos, value) \ |
984 | 13.7M | Store8(&(buf)[pos], static_cast<uint8_t>(value)) |
985 | | #define SERIALIZE_SHORT(buf, pos, value) \ |
986 | 4.59M | NetworkByteOrder::Store16(&(buf)[pos], static_cast<uint16_t>(value)) |
987 | | #define SERIALIZE_INT(buf, pos, value) \ |
988 | 9.21M | NetworkByteOrder::Store32(&(buf)[pos], static_cast<int32_t>(value)) |
989 | | #define SERIALIZE_LONG(buf, pos, value) \ |
990 | | NetworkByteOrder::Store64(&(buf)[pos], static_cast<int64_t>(value)) |
991 | | |
992 | 4.61M | void CQLResponse::Serialize(const CompressionScheme compression_scheme, faststring* mesg) const { |
993 | 4.61M | const size_t start_pos = mesg->size(); // save the start position |
994 | 4.61M | const bool compress = (compression_scheme != CQLMessage::CompressionScheme::kNone); |
995 | 4.61M | SerializeHeader(compress, mesg); |
996 | 4.61M | if (compress) { |
997 | 347 | faststring body; |
998 | 347 | SerializeBody(&body); |
999 | 347 | switch (compression_scheme) { |
1000 | 225 | case CQLMessage::CompressionScheme::kLz4: { |
1001 | 225 | SerializeInt(static_cast<int32_t>(body.size()), mesg); |
1002 | 225 | const size_t curr_size = mesg->size(); |
1003 | 225 | const int max_comp_size = LZ4_compressBound(narrow_cast<int>(body.size())); |
1004 | 225 | mesg->resize(curr_size + max_comp_size); |
1005 | 225 | const int comp_size = LZ4_compress_default(to_char_ptr(body.data()), |
1006 | 225 | to_char_ptr(mesg->data() + curr_size), |
1007 | 225 | narrow_cast<int>(body.size()), |
1008 | 225 | max_comp_size); |
1009 | 0 | CHECK_NE(comp_size, 0) << "LZ4 compression failed"; |
1010 | 225 | mesg->resize(curr_size + comp_size); |
1011 | 225 | break; |
1012 | 0 | } |
1013 | 126 | case CQLMessage::CompressionScheme::kSnappy: { |
1014 | 126 | const size_t curr_size = mesg->size(); |
1015 | 126 | const size_t max_comp_size = MaxCompressedLength(body.size()); |
1016 | 126 | size_t comp_size = 0; |
1017 | 126 | mesg->resize(curr_size + max_comp_size); |
1018 | 126 | RawCompress(to_char_ptr(body.data()), body.size(), |
1019 | 126 | to_char_ptr(mesg->data() + curr_size), &comp_size); |
1020 | 126 | mesg->resize(curr_size + comp_size); |
1021 | 126 | break; |
1022 | 0 | } |
1023 | 0 | case CQLMessage::CompressionScheme::kNone: |
1024 | 0 | LOG(FATAL) << "No compression scheme"; |
1025 | 0 | break; |
1026 | 4.61M | } |
1027 | 4.61M | } else { |
1028 | 4.61M | SerializeBody(mesg); |
1029 | 4.61M | } |
1030 | 4.61M | SERIALIZE_INT( |
1031 | 4.61M | mesg->data(), start_pos + kHeaderPosLength, mesg->size() - start_pos - kMessageHeaderLength); |
1032 | 4.61M | } |
1033 | | |
1034 | 4.59M | void CQLResponse::SerializeHeader(const bool compress, faststring* mesg) const { |
1035 | 4.59M | uint8_t buffer[kMessageHeaderLength]; |
1036 | 4.59M | SERIALIZE_BYTE(buffer, kHeaderPosVersion, version()); |
1037 | 4.59M | SERIALIZE_BYTE(buffer, kHeaderPosFlags, flags() | (compress ? kCompressionFlag : 0)); |
1038 | 4.59M | SERIALIZE_SHORT(buffer, kHeaderPosStreamId, stream_id()); |
1039 | 4.59M | SERIALIZE_INT(buffer, kHeaderPosLength, 0); |
1040 | 4.59M | SERIALIZE_BYTE(buffer, kHeaderPosOpcode, opcode()); |
1041 | 4.59M | mesg->append(buffer, sizeof(buffer)); |
1042 | 4.59M | } |
1043 | | |
1044 | | #undef SERIALIZE_BYTE |
1045 | | #undef SERIALIZE_SHORT |
1046 | | #undef SERIALIZE_INT |
1047 | | #undef SERIALIZE_LONG |
1048 | | |
1049 | | // ------------------------------ Individual CQL responses ----------------------------------- |
1050 | | ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const string& message) |
1051 | 5.30k | : CQLResponse(request, Opcode::ERROR), code_(code), message_(message) { |
1052 | 5.30k | } |
1053 | | |
1054 | | ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const Status& status) |
1055 | | : CQLResponse(request, Opcode::ERROR), code_(code), |
1056 | 0 | message_(to_char_ptr(status.message().data())) { |
1057 | 0 | } |
1058 | | |
1059 | | ErrorResponse::ErrorResponse(const StreamId stream_id, const Code code, const string& message) |
1060 | 8 | : CQLResponse(stream_id, Opcode::ERROR), code_(code), message_(message) { |
1061 | 8 | } |
1062 | | |
1063 | 5.31k | ErrorResponse::~ErrorResponse() { |
1064 | 5.31k | } |
1065 | | |
1066 | 5.31k | void ErrorResponse::SerializeBody(faststring* mesg) const { |
1067 | 5.31k | SerializeInt(static_cast<int32_t>(code_), mesg); |
1068 | 5.31k | SerializeString(message_, mesg); |
1069 | 5.31k | SerializeErrorBody(mesg); |
1070 | 5.31k | } |
1071 | | |
1072 | 4.10k | void ErrorResponse::SerializeErrorBody(faststring* mesg) const { |
1073 | 4.10k | } |
1074 | | |
1075 | | //---------------------------------------------------------------------------------------- |
1076 | | UnpreparedErrorResponse::UnpreparedErrorResponse(const CQLRequest& request, const QueryId& query_id) |
1077 | 1.20k | : ErrorResponse(request, Code::UNPREPARED, "Unprepared query"), query_id_(query_id) { |
1078 | 1.20k | } |
1079 | | |
1080 | 1.20k | UnpreparedErrorResponse::~UnpreparedErrorResponse() { |
1081 | 1.20k | } |
1082 | | |
1083 | 1.20k | void UnpreparedErrorResponse::SerializeErrorBody(faststring* mesg) const { |
1084 | 1.20k | SerializeShortBytes(query_id_, mesg); |
1085 | 1.20k | } |
1086 | | |
1087 | | //---------------------------------------------------------------------------------------- |
1088 | 5.33k | ReadyResponse::ReadyResponse(const CQLRequest& request) : CQLResponse(request, Opcode::READY) { |
1089 | 5.33k | } |
1090 | | |
1091 | 5.33k | ReadyResponse::~ReadyResponse() { |
1092 | 5.33k | } |
1093 | | |
1094 | 5.33k | void ReadyResponse::SerializeBody(faststring* mesg) const { |
1095 | | // Ready body is empty |
1096 | 5.33k | } |
1097 | | |
1098 | | //---------------------------------------------------------------------------------------- |
1099 | | AuthenticateResponse::AuthenticateResponse(const CQLRequest& request, const string& authenticator) |
1100 | 6.29k | : CQLResponse(request, Opcode::AUTHENTICATE), authenticator_(authenticator) { |
1101 | 6.29k | } |
1102 | | |
1103 | 6.29k | AuthenticateResponse::~AuthenticateResponse() { |
1104 | 6.29k | } |
1105 | | |
1106 | 6.29k | void AuthenticateResponse::SerializeBody(faststring* mesg) const { |
1107 | 6.29k | SerializeString(authenticator_, mesg); |
1108 | 6.29k | } |
1109 | | |
1110 | | //---------------------------------------------------------------------------------------- |
1111 | | SupportedResponse::SupportedResponse(const CQLRequest& request, |
1112 | | const unordered_map<string, vector<string>>* options) |
1113 | 2.50k | : CQLResponse(request, Opcode::SUPPORTED), options_(options) { |
1114 | 2.50k | } |
1115 | | |
1116 | 2.50k | SupportedResponse::~SupportedResponse() { |
1117 | 2.50k | } |
1118 | | |
1119 | 2.50k | void SupportedResponse::SerializeBody(faststring* mesg) const { |
1120 | 2.50k | SerializeStringMultiMap(*options_, mesg); |
1121 | 2.50k | } |
1122 | | |
1123 | | //---------------------------------------------------------------------------------------- |
1124 | | ResultResponse::ResultResponse(const CQLRequest& request, const Kind kind) |
1125 | 4.52M | : CQLResponse(request, Opcode::RESULT), kind_(kind) { |
1126 | 4.52M | } |
1127 | | |
1128 | 4.53M | ResultResponse::~ResultResponse() { |
1129 | 4.53M | } |
1130 | | |
1131 | 0 | ResultResponse::RowsMetadata::Type::Type(const Id id) : id(id) { |
1132 | 0 | switch (id) { |
1133 | | // Verify that the type id is a primitive type indeed. |
1134 | 0 | case Id::ASCII: |
1135 | 0 | case Id::BIGINT: |
1136 | 0 | case Id::BLOB: |
1137 | 0 | case Id::BOOLEAN: |
1138 | 0 | case Id::COUNTER: |
1139 | 0 | case Id::DECIMAL: |
1140 | 0 | case Id::DOUBLE: |
1141 | 0 | case Id::FLOAT: |
1142 | 0 | case Id::INT: |
1143 | 0 | case Id::TIMESTAMP: |
1144 | 0 | case Id::UUID: |
1145 | 0 | case Id::VARCHAR: |
1146 | 0 | case Id::VARINT: |
1147 | 0 | case Id::TIMEUUID: |
1148 | 0 | case Id::INET: |
1149 | 0 | case Id::JSONB: |
1150 | 0 | case Id::DATE: |
1151 | 0 | case Id::TIME: |
1152 | 0 | case Id::SMALLINT: |
1153 | 0 | case Id::TINYINT: |
1154 | 0 | return; |
1155 | | |
1156 | | // Non-primitive types |
1157 | 0 | case Id::CUSTOM: |
1158 | 0 | case Id::LIST: |
1159 | 0 | case Id::MAP: |
1160 | 0 | case Id::SET: |
1161 | 0 | case Id::UDT: |
1162 | 0 | case Id::TUPLE: |
1163 | 0 | break; |
1164 | | |
1165 | | // default: fall through |
1166 | 0 | } |
1167 | | |
1168 | 0 | LOG(ERROR) << "Internal error: invalid/unknown primitive type id " << static_cast<uint32_t>(id); |
1169 | 0 | } |
1170 | | |
1171 | | // These union members in Type below are not initialized by default. They need to be explicitly |
1172 | | // initialized using the new() operator in the Type constructors. |
1173 | 0 | ResultResponse::RowsMetadata::Type::Type(const string& custom_class_name) : id(Id::CUSTOM) { |
1174 | 0 | new(&this->custom_class_name) string(custom_class_name); |
1175 | 0 | } |
1176 | | |
1177 | | ResultResponse::RowsMetadata::Type::Type(const Id id, shared_ptr<const Type> element_type) |
1178 | 0 | : id(id) { |
1179 | 0 | switch (id) { |
1180 | 0 | case Id::LIST: |
1181 | 0 | case Id::SET: |
1182 | 0 | new(&this->element_type) shared_ptr<const Type>(element_type); |
1183 | 0 | return; |
1184 | | |
1185 | | // Not list nor map |
1186 | 0 | case Id::CUSTOM: |
1187 | 0 | case Id::ASCII: |
1188 | 0 | case Id::BIGINT: |
1189 | 0 | case Id::BLOB: |
1190 | 0 | case Id::BOOLEAN: |
1191 | 0 | case Id::COUNTER: |
1192 | 0 | case Id::DECIMAL: |
1193 | 0 | case Id::DOUBLE: |
1194 | 0 | case Id::FLOAT: |
1195 | 0 | case Id::INT: |
1196 | 0 | case Id::TIMESTAMP: |
1197 | 0 | case Id::UUID: |
1198 | 0 | case Id::VARCHAR: |
1199 | 0 | case Id::VARINT: |
1200 | 0 | case Id::TIMEUUID: |
1201 | 0 | case Id::INET: |
1202 | 0 | case Id::JSONB: |
1203 | 0 | case Id::DATE: |
1204 | 0 | case Id::TIME: |
1205 | 0 | case Id::SMALLINT: |
1206 | 0 | case Id::TINYINT: |
1207 | 0 | case Id::MAP: |
1208 | 0 | case Id::UDT: |
1209 | 0 | case Id::TUPLE: |
1210 | 0 | break; |
1211 | | |
1212 | | // default: fall through |
1213 | 0 | } |
1214 | | |
1215 | 0 | LOG(ERROR) << "Internal error: invalid/unknown list/map type id " << static_cast<uint32_t>(id); |
1216 | 0 | } |
1217 | | |
1218 | 0 | ResultResponse::RowsMetadata::Type::Type(shared_ptr<const MapType> map_type) : id(Id::MAP) { |
1219 | 0 | new(&this->map_type) shared_ptr<const MapType>(map_type); |
1220 | 0 | } |
1221 | | |
1222 | 0 | ResultResponse::RowsMetadata::Type::Type(shared_ptr<const UDTType> udt_type) : id(Id::UDT) { |
1223 | 0 | new(&this->udt_type) shared_ptr<const UDTType>(udt_type); |
1224 | 0 | } |
1225 | | |
1226 | | ResultResponse::RowsMetadata::Type::Type( |
1227 | 0 | shared_ptr<const TupleComponentTypes> tuple_component_types) : id(Id::TUPLE) { |
1228 | 0 | new(&this->tuple_component_types) shared_ptr<const TupleComponentTypes>(tuple_component_types); |
1229 | 0 | } |
1230 | | |
1231 | 936k | ResultResponse::RowsMetadata::Type::Type(const Type& t) : id(t.id) { |
1232 | 936k | switch (id) { |
1233 | 0 | case Id::CUSTOM: |
1234 | 0 | new(&this->custom_class_name) string(t.custom_class_name); |
1235 | 0 | return; |
1236 | 0 | case Id::ASCII: |
1237 | 2.21k | case Id::BIGINT: |
1238 | 42.5k | case Id::BLOB: |
1239 | 76.3k | case Id::BOOLEAN: |
1240 | 76.3k | case Id::COUNTER: |
1241 | 78.1k | case Id::DECIMAL: |
1242 | 123k | case Id::DOUBLE: |
1243 | 124k | case Id::FLOAT: |
1244 | 242k | case Id::INT: |
1245 | 243k | case Id::TIMESTAMP: |
1246 | 322k | case Id::UUID: |
1247 | 768k | case Id::VARCHAR: |
1248 | 769k | case Id::VARINT: |
1249 | 769k | case Id::TIMEUUID: |
1250 | 824k | case Id::INET: |
1251 | 824k | case Id::JSONB: |
1252 | 824k | case Id::DATE: |
1253 | 824k | case Id::TIME: |
1254 | 825k | case Id::SMALLINT: |
1255 | 825k | case Id::TINYINT: |
1256 | 825k | return; |
1257 | 21.3k | case Id::LIST: |
1258 | 38.4k | case Id::SET: |
1259 | 38.4k | new(&element_type) shared_ptr<const Type>(t.element_type); |
1260 | 38.4k | return; |
1261 | 80.7k | case Id::MAP: |
1262 | 80.7k | new(&map_type) shared_ptr<const MapType>(t.map_type); |
1263 | 80.7k | return; |
1264 | 185 | case Id::UDT: |
1265 | 185 | new(&udt_type) shared_ptr<const UDTType>(t.udt_type); |
1266 | 185 | return; |
1267 | 0 | case Id::TUPLE: |
1268 | 0 | new(&tuple_component_types) shared_ptr<const TupleComponentTypes>(t.tuple_component_types); |
1269 | 0 | return; |
1270 | | |
1271 | | // default: fall through |
1272 | 0 | } |
1273 | | |
1274 | 0 | LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id); |
1275 | 0 | } |
1276 | | |
1277 | 936k | ResultResponse::RowsMetadata::Type::Type(const shared_ptr<QLType>& ql_type) { |
1278 | 936k | auto type = ql_type; |
1279 | 936k | if (type->IsFrozen()) { |
1280 | 275 | type = ql_type->param_type(0); |
1281 | 275 | } |
1282 | 936k | switch (type->main()) { |
1283 | 122 | case DataType::INT8: |
1284 | 122 | id = Id::TINYINT; |
1285 | 122 | return; |
1286 | 132 | case DataType::INT16: |
1287 | 132 | id = Id::SMALLINT; |
1288 | 132 | return; |
1289 | 118k | case DataType::INT32: |
1290 | 118k | id = Id::INT; |
1291 | 118k | return; |
1292 | 2.21k | case DataType::INT64: |
1293 | 2.21k | id = Id::BIGINT; |
1294 | 2.21k | return; |
1295 | 922 | case DataType::VARINT: |
1296 | 922 | id = Id::VARINT; |
1297 | 922 | return; |
1298 | 243 | case DataType::FLOAT: |
1299 | 243 | id = Id::FLOAT; |
1300 | 243 | return; |
1301 | 45.6k | case DataType::DOUBLE: |
1302 | 45.6k | id = Id::DOUBLE; |
1303 | 45.6k | return; |
1304 | 445k | case DataType::STRING: |
1305 | 445k | id = Id::VARCHAR; |
1306 | 445k | return; |
1307 | 33.7k | case DataType::BOOL: |
1308 | 33.7k | id = Id::BOOLEAN; |
1309 | 33.7k | return; |
1310 | 1.07k | case DataType::TIMESTAMP: |
1311 | 1.07k | id = Id::TIMESTAMP; |
1312 | 1.07k | return; |
1313 | 44 | case DataType::DATE: |
1314 | 44 | id = Id::DATE; |
1315 | 44 | return; |
1316 | 34 | case DataType::TIME: |
1317 | 34 | id = Id::TIME; |
1318 | 34 | return; |
1319 | 55.3k | case DataType::INET: |
1320 | 55.3k | id = Id::INET; |
1321 | 55.3k | return; |
1322 | 438 | case DataType::JSONB: |
1323 | 438 | id = Id::JSONB; |
1324 | 438 | return; |
1325 | 78.8k | case DataType::UUID: |
1326 | 78.8k | id = Id::UUID; |
1327 | 78.8k | return; |
1328 | 34 | case DataType::TIMEUUID: |
1329 | 34 | id = Id::TIMEUUID; |
1330 | 34 | return; |
1331 | 40.3k | case DataType::BINARY: |
1332 | 40.3k | id = Id::BLOB; |
1333 | 40.3k | return; |
1334 | 1.79k | case DataType::DECIMAL: |
1335 | 1.79k | id = Id::DECIMAL; |
1336 | 1.79k | return; |
1337 | 21.1k | case DataType::LIST: |
1338 | 21.1k | id = Id::LIST; |
1339 | 21.1k | new (&element_type) shared_ptr<const Type>( |
1340 | 21.1k | std::make_shared<const Type>(Type(type->param_type(0)))); |
1341 | 21.1k | return; |
1342 | 17.0k | case DataType::SET: |
1343 | 17.0k | id = Id::SET; |
1344 | 17.0k | new (&element_type) shared_ptr<const Type>( |
1345 | 17.0k | std::make_shared<const Type>(Type(type->param_type(0)))); |
1346 | 17.0k | return; |
1347 | 80.6k | case DataType::MAP: { |
1348 | 80.6k | id = Id::MAP; |
1349 | 80.6k | auto key = std::make_shared<const Type>(Type(type->param_type(0))); |
1350 | 80.6k | auto value = std::make_shared<const Type>(Type(type->param_type(1))); |
1351 | 80.6k | new (&map_type) shared_ptr<const MapType>(std::make_shared<MapType>(MapType{key, value})); |
1352 | 80.6k | return; |
1353 | 0 | } |
1354 | 185 | case DataType::USER_DEFINED_TYPE: { |
1355 | 185 | id = Id::UDT; |
1356 | 185 | std::vector<UDTType::Field> fields; |
1357 | 537 | for (size_t i = 0; i < type->params().size(); i++) { |
1358 | 352 | auto field_type = std::make_shared<const Type>(Type(type->param_type(i))); |
1359 | 352 | UDTType::Field field{type->udtype_field_name(i), field_type}; |
1360 | 352 | fields.push_back(std::move(field)); |
1361 | 352 | } |
1362 | 185 | new(&udt_type) shared_ptr<const UDTType>(std::make_shared<UDTType>( |
1363 | 185 | UDTType{type->udtype_keyspace_name(), type->udtype_name(), fields})); |
1364 | 185 | return; |
1365 | 0 | } |
1366 | 0 | case DataType::FROZEN: FALLTHROUGH_INTENDED; |
1367 | 0 | QL_UNSUPPORTED_TYPES_IN_SWITCH: FALLTHROUGH_INTENDED; |
1368 | 0 | QL_INVALID_TYPES_IN_SWITCH: |
1369 | 0 | break; |
1370 | | |
1371 | | // default: fall through |
1372 | 0 | } |
1373 | | |
1374 | 0 | LOG(ERROR) << "Internal error: invalid/unsupported type " << type->ToString(); |
1375 | 0 | } |
1376 | | |
1377 | 1.87M | ResultResponse::RowsMetadata::Type::~Type() { |
1378 | 1.87M | switch (id) { |
1379 | 0 | case Id::CUSTOM: |
1380 | 0 | custom_class_name.~basic_string(); |
1381 | 0 | return; |
1382 | 0 | case Id::ASCII: |
1383 | 4.42k | case Id::BIGINT: |
1384 | 85.1k | case Id::BLOB: |
1385 | 152k | case Id::BOOLEAN: |
1386 | 152k | case Id::COUNTER: |
1387 | 156k | case Id::DECIMAL: |
1388 | 247k | case Id::DOUBLE: |
1389 | 247k | case Id::FLOAT: |
1390 | 484k | case Id::INT: |
1391 | 487k | case Id::TIMESTAMP: |
1392 | 644k | case Id::UUID: |
1393 | 1.53M | case Id::VARCHAR: |
1394 | 1.53M | case Id::VARINT: |
1395 | 1.53M | case Id::TIMEUUID: |
1396 | 1.64M | case Id::INET: |
1397 | 1.64M | case Id::JSONB: |
1398 | 1.64M | case Id::DATE: |
1399 | 1.64M | case Id::TIME: |
1400 | 1.64M | case Id::SMALLINT: |
1401 | 1.64M | case Id::TINYINT: |
1402 | 1.64M | return; |
1403 | 42.4k | case Id::LIST: |
1404 | 76.3k | case Id::SET: |
1405 | 76.3k | element_type.reset(); |
1406 | 76.3k | return; |
1407 | 161k | case Id::MAP: |
1408 | 161k | map_type.reset(); |
1409 | 161k | return; |
1410 | 370 | case Id::UDT: |
1411 | 370 | udt_type.reset(); |
1412 | 370 | return; |
1413 | 0 | case Id::TUPLE: |
1414 | 0 | tuple_component_types.reset(); |
1415 | 0 | return; |
1416 | | |
1417 | | // default: fall through |
1418 | 0 | } |
1419 | | |
1420 | 0 | LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id); |
1421 | 0 | } |
1422 | | |
1423 | | ResultResponse::RowsMetadata::RowsMetadata() |
1424 | | : flags(kNoMetadata), |
1425 | | paging_state(""), |
1426 | | global_table_spec("" /* keyspace */, "" /* table_name */), |
1427 | 4.52k | col_count(0) { |
1428 | 4.52k | } |
1429 | | |
1430 | | ResultResponse::RowsMetadata::RowsMetadata(const client::YBTableName& table_name, |
1431 | | const vector<ColumnSchema>& columns, |
1432 | | const string& paging_state, |
1433 | | bool no_metadata) |
1434 | | : flags((no_metadata ? kNoMetadata : kHasGlobalTableSpec) | |
1435 | | (!paging_state.empty() ? kHasMorePages : 0)), |
1436 | | paging_state(paging_state), |
1437 | | global_table_spec(no_metadata ? "" : table_name.namespace_name(), |
1438 | | no_metadata ? "" : table_name.table_name()), |
1439 | 3.70M | col_count(narrow_cast<int>(columns.size())) { |
1440 | 3.70M | if (!no_metadata) { |
1441 | 89.6k | col_specs.reserve(col_count); |
1442 | 728k | for (const auto& column : columns) { |
1443 | 728k | col_specs.emplace_back(column.name(), Type(column.type())); |
1444 | 728k | } |
1445 | 89.6k | } |
1446 | 3.70M | } |
1447 | | |
1448 | 4.52M | void ResultResponse::SerializeBody(faststring* mesg) const { |
1449 | 4.52M | SerializeInt(static_cast<int32_t>(kind_), mesg); |
1450 | 4.52M | SerializeResultBody(mesg); |
1451 | 4.52M | } |
1452 | | |
1453 | 941k | void ResultResponse::SerializeType(const RowsMetadata::Type* type, faststring* mesg) const { |
1454 | 941k | SerializeShort(static_cast<uint16_t>(type->id), mesg); |
1455 | 941k | switch (type->id) { |
1456 | 0 | case RowsMetadata::Type::Id::CUSTOM: |
1457 | 0 | SerializeString(type->custom_class_name, mesg); |
1458 | 0 | return; |
1459 | 0 | case RowsMetadata::Type::Id::ASCII: |
1460 | 2.21k | case RowsMetadata::Type::Id::BIGINT: |
1461 | 42.5k | case RowsMetadata::Type::Id::BLOB: |
1462 | 76.3k | case RowsMetadata::Type::Id::BOOLEAN: |
1463 | 76.3k | case RowsMetadata::Type::Id::COUNTER: |
1464 | 78.1k | case RowsMetadata::Type::Id::DECIMAL: |
1465 | 123k | case RowsMetadata::Type::Id::DOUBLE: |
1466 | 123k | case RowsMetadata::Type::Id::FLOAT: |
1467 | 242k | case RowsMetadata::Type::Id::INT: |
1468 | 243k | case RowsMetadata::Type::Id::TIMESTAMP: |
1469 | 322k | case RowsMetadata::Type::Id::UUID: |
1470 | 767k | case RowsMetadata::Type::Id::VARCHAR: |
1471 | 768k | case RowsMetadata::Type::Id::VARINT: |
1472 | 768k | case RowsMetadata::Type::Id::TIMEUUID: |
1473 | 824k | case RowsMetadata::Type::Id::INET: |
1474 | 824k | case RowsMetadata::Type::Id::JSONB: |
1475 | 824k | case RowsMetadata::Type::Id::DATE: |
1476 | 824k | case RowsMetadata::Type::Id::TIME: |
1477 | 824k | case RowsMetadata::Type::Id::SMALLINT: |
1478 | 824k | case RowsMetadata::Type::Id::TINYINT: |
1479 | 824k | return; |
1480 | 21.3k | case RowsMetadata::Type::Id::LIST: |
1481 | 38.4k | case RowsMetadata::Type::Id::SET: |
1482 | 38.4k | SerializeType(type->element_type.get(), mesg); |
1483 | 38.4k | return; |
1484 | 80.7k | case RowsMetadata::Type::Id::MAP: |
1485 | 80.7k | SerializeType(type->map_type->key_type.get(), mesg); |
1486 | 80.7k | SerializeType(type->map_type->value_type.get(), mesg); |
1487 | 80.7k | return; |
1488 | 185 | case RowsMetadata::Type::Id::UDT: |
1489 | 185 | SerializeString(type->udt_type->keyspace, mesg); |
1490 | 185 | SerializeString(type->udt_type->name, mesg); |
1491 | 185 | SerializeShort(type->udt_type->fields.size(), mesg); |
1492 | 352 | for (const auto& field : type->udt_type->fields) { |
1493 | 352 | SerializeString(field.name, mesg); |
1494 | 352 | SerializeType(field.type.get(), mesg); |
1495 | 352 | } |
1496 | 185 | return; |
1497 | 0 | case RowsMetadata::Type::Id::TUPLE: |
1498 | 0 | SerializeShort(type->tuple_component_types->size(), mesg); |
1499 | 0 | for (const auto& component_type : *type->tuple_component_types) { |
1500 | 0 | SerializeType(component_type.get(), mesg); |
1501 | 0 | } |
1502 | 0 | return; |
1503 | | |
1504 | | // default: fall through |
1505 | 0 | } |
1506 | | |
1507 | 0 | LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(type->id); |
1508 | 0 | } |
1509 | | |
1510 | | void ResultResponse::SerializeColSpecs( |
1511 | | const bool has_global_table_spec, const RowsMetadata::GlobalTableSpec& global_table_spec, |
1512 | 99.8k | const vector<RowsMetadata::ColSpec>& col_specs, faststring* mesg) const { |
1513 | 99.8k | if (has_global_table_spec) { |
1514 | 99.4k | SerializeString(global_table_spec.keyspace, mesg); |
1515 | 99.4k | SerializeString(global_table_spec.table, mesg); |
1516 | 99.4k | } |
1517 | 740k | for (const auto& col_spec : col_specs) { |
1518 | 740k | if (!has_global_table_spec) { |
1519 | 105 | SerializeString(col_spec.keyspace, mesg); |
1520 | 105 | SerializeString(col_spec.table, mesg); |
1521 | 105 | } |
1522 | 740k | SerializeString(col_spec.column, mesg); |
1523 | 740k | SerializeType(&col_spec.type, mesg); |
1524 | 740k | } |
1525 | 99.8k | } |
1526 | | |
1527 | 3.71M | void ResultResponse::SerializeRowsMetadata(const RowsMetadata& metadata, faststring* mesg) const { |
1528 | 3.71M | SerializeInt(metadata.flags, mesg); |
1529 | 3.71M | SerializeInt(metadata.col_count, mesg); |
1530 | 3.71M | if (metadata.flags & RowsMetadata::kHasMorePages) { |
1531 | 279 | SerializeBytes(metadata.paging_state, mesg); |
1532 | 279 | } |
1533 | 3.71M | if (metadata.flags & RowsMetadata::kNoMetadata) { |
1534 | 3.62M | return; |
1535 | 3.62M | } |
1536 | 87.5k | CHECK_EQ(metadata.col_count, metadata.col_specs.size()); |
1537 | 87.5k | SerializeColSpecs( |
1538 | 87.5k | metadata.flags & RowsMetadata::kHasGlobalTableSpec, metadata.global_table_spec, |
1539 | 87.5k | metadata.col_specs, mesg); |
1540 | 87.5k | } |
1541 | | |
1542 | | //---------------------------------------------------------------------------------------- |
1543 | | VoidResultResponse::VoidResultResponse(const CQLRequest& request) |
1544 | 797k | : ResultResponse(request, Kind::VOID) { |
1545 | 797k | } |
1546 | | |
1547 | 797k | VoidResultResponse::~VoidResultResponse() { |
1548 | 797k | } |
1549 | | |
1550 | 796k | void VoidResultResponse::SerializeResultBody(faststring* mesg) const { |
1551 | | // Void result response body is empty |
1552 | 796k | } |
1553 | | |
1554 | | //---------------------------------------------------------------------------------------- |
1555 | | RowsResultResponse::RowsResultResponse( |
1556 | | const QueryRequest& request, const ql::RowsResult::SharedPtr& result) |
1557 | | : ResultResponse(request, Kind::ROWS), result_(result), |
1558 | 92.4k | skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) { |
1559 | 92.4k | } |
1560 | | |
1561 | | RowsResultResponse::RowsResultResponse( |
1562 | | const ExecuteRequest& request, const ql::RowsResult::SharedPtr& result) |
1563 | | : ResultResponse(request, Kind::ROWS), result_(result), |
1564 | 3.62M | skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) { |
1565 | 3.62M | } |
1566 | | |
1567 | | RowsResultResponse::RowsResultResponse( |
1568 | | const BatchRequest& request, const ql::RowsResult::SharedPtr& result) |
1569 | | : ResultResponse(request, Kind::ROWS), result_(result), |
1570 | 52 | skip_metadata_(false) { // Batches don't have the skip_metadata flag. |
1571 | 52 | } |
1572 | | |
1573 | 3.72M | RowsResultResponse::~RowsResultResponse() { |
1574 | 3.72M | } |
1575 | | |
1576 | 3.71M | void RowsResultResponse::SerializeResultBody(faststring* mesg) const { |
1577 | | // CQL ROWS Response = <metadata><rows_count><rows_content> |
1578 | 3.71M | SerializeRowsMetadata( |
1579 | 3.71M | RowsMetadata(result_->table_name(), result_->column_schemas(), |
1580 | 3.71M | result_->paging_state(), skip_metadata_), mesg); |
1581 | | |
1582 | | // The <rows_count> (4 bytes) must be in the response in any case, so the 'rows_data()' |
1583 | | // string in the result must contain it. |
1584 | 18.4E | LOG_IF(DFATAL, result_->rows_data().size() < 4) |
1585 | 18.4E | << "Absent rows_count for the CQL ROWS Result Response (rows_data: " |
1586 | 18.4E | << result_->rows_data().size() << " bytes, expected >= 4)"; |
1587 | 3.71M | mesg->append(result_->rows_data()); |
1588 | 3.71M | } |
1589 | | |
1590 | | //---------------------------------------------------------------------------------------- |
1591 | 337 | PreparedResultResponse::PreparedMetadata::PreparedMetadata() { |
1592 | 337 | } |
1593 | | |
1594 | | PreparedResultResponse::PreparedMetadata::PreparedMetadata( |
1595 | | const client::YBTableName& table_name, const std::vector<int64_t>& hash_col_indices, |
1596 | | const vector<client::YBTableName>& bind_table_names, |
1597 | | const vector<ColumnSchema>& bind_variable_schemas) |
1598 | | : flags(table_name.empty() ? 0 : kHasGlobalTableSpec), |
1599 | 4.52k | global_table_spec(table_name.namespace_name(), table_name.table_name()) { |
1600 | 4.52k | this->pk_indices.reserve(hash_col_indices.size()); |
1601 | 6.02k | for (const size_t index : hash_col_indices) { |
1602 | 6.02k | this->pk_indices.emplace_back(static_cast<uint16_t>(index)); |
1603 | 6.02k | } |
1604 | 4.52k | col_specs.reserve(bind_variable_schemas.size()); |
1605 | 16.9k | for (size_t i = 0; i < bind_variable_schemas.size(); i++) { |
1606 | 12.4k | const ColumnSchema& var = bind_variable_schemas[i]; |
1607 | 12.4k | if (flags & kHasGlobalTableSpec) { |
1608 | 12.3k | col_specs.emplace_back(var.name(), RowsMetadata::Type(var.type())); |
1609 | 106 | } else { |
1610 | 106 | col_specs.emplace_back(bind_table_names[i], var.name(), RowsMetadata::Type(var.type())); |
1611 | 106 | } |
1612 | 12.4k | } |
1613 | 4.52k | } |
1614 | | |
1615 | | PreparedResultResponse::PreparedResultResponse(const CQLRequest& request, const QueryId& query_id) |
1616 | 337 | : ResultResponse(request, Kind::PREPARED), query_id_(query_id) { |
1617 | 337 | } |
1618 | | |
1619 | | PreparedResultResponse::PreparedResultResponse( |
1620 | | const CQLRequest& request, const QueryId& query_id, const ql::PreparedResult& result) |
1621 | | : ResultResponse(request, Kind::PREPARED), query_id_(query_id), |
1622 | | prepared_metadata_(result.table_name(), result.hash_col_indices(), |
1623 | | result.bind_table_names(), result.bind_variable_schemas()), |
1624 | | rows_metadata_(!result.column_schemas().empty() ? |
1625 | | RowsMetadata( |
1626 | | result.table_name(), result.column_schemas(), |
1627 | | "" /* paging_state */, false /* no_metadata */) : |
1628 | 4.52k | RowsMetadata()) { |
1629 | 4.52k | } |
1630 | | |
1631 | 4.86k | PreparedResultResponse::~PreparedResultResponse() { |
1632 | 4.86k | } |
1633 | | |
1634 | | void PreparedResultResponse::SerializePreparedMetadata( |
1635 | 4.85k | const PreparedMetadata& metadata, faststring* mesg) const { |
1636 | 4.85k | SerializeInt(metadata.flags, mesg); |
1637 | 4.85k | SerializeInt(narrow_cast<int32_t>(metadata.col_specs.size()), mesg); |
1638 | 4.85k | if (VersionIsCompatible(kV4Version)) { |
1639 | 4.85k | SerializeInt(narrow_cast<int32_t>(metadata.pk_indices.size()), mesg); |
1640 | 6.03k | for (const auto& pk_index : metadata.pk_indices) { |
1641 | 6.03k | SerializeShort(pk_index, mesg); |
1642 | 6.03k | } |
1643 | 4.85k | } |
1644 | 4.85k | SerializeColSpecs( |
1645 | 4.85k | metadata.flags & PreparedMetadata::kHasGlobalTableSpec, metadata.global_table_spec, |
1646 | 4.85k | metadata.col_specs, mesg); |
1647 | 4.85k | } |
1648 | | |
1649 | 4.85k | void PreparedResultResponse::SerializeResultBody(faststring* mesg) const { |
1650 | 4.85k | SerializeShortBytes(query_id_, mesg); |
1651 | 4.85k | SerializePreparedMetadata(prepared_metadata_, mesg); |
1652 | 4.85k | SerializeRowsMetadata(rows_metadata_, mesg); |
1653 | 4.85k | } |
1654 | | |
1655 | | //---------------------------------------------------------------------------------------- |
1656 | | SetKeyspaceResultResponse::SetKeyspaceResultResponse( |
1657 | | const CQLRequest& request, const ql::SetKeyspaceResult& result) |
1658 | 4.18k | : ResultResponse(request, Kind::SET_KEYSPACE), keyspace_(result.keyspace()) { |
1659 | 4.18k | } |
1660 | | |
1661 | 4.18k | SetKeyspaceResultResponse::~SetKeyspaceResultResponse() { |
1662 | 4.18k | } |
1663 | | |
1664 | 4.18k | void SetKeyspaceResultResponse::SerializeResultBody(faststring* mesg) const { |
1665 | 4.18k | SerializeString(keyspace_, mesg); |
1666 | 4.18k | } |
1667 | | |
1668 | | //---------------------------------------------------------------------------------------- |
1669 | | SchemaChangeResultResponse::SchemaChangeResultResponse( |
1670 | | const CQLRequest& request, const ql::SchemaChangeResult& result) |
1671 | | : ResultResponse(request, Kind::SCHEMA_CHANGE), |
1672 | | change_type_(result.change_type()), target_(result.object_type()), |
1673 | 6.05k | keyspace_(result.keyspace_name()), object_(result.object_name()) { |
1674 | 6.05k | } |
1675 | | |
1676 | 6.05k | SchemaChangeResultResponse::~SchemaChangeResultResponse() { |
1677 | 6.05k | } |
1678 | | |
1679 | | void SchemaChangeResultResponse::Serialize(const CompressionScheme compression_scheme, |
1680 | 6.05k | faststring* mesg) const { |
1681 | 6.05k | ResultResponse::Serialize(compression_scheme, mesg); |
1682 | | |
1683 | 6.05k | if (registered_events() & kSchemaChange) { |
1684 | | // TODO: Replace this hack that piggybacks a SCHEMA_CHANGE event along a SCHEMA_CHANGE result |
1685 | | // response with a formal event notification mechanism. |
1686 | 4 | SchemaChangeEventResponse event(change_type_, target_, keyspace_, object_, argument_types_); |
1687 | 4 | event.Serialize(compression_scheme, mesg); |
1688 | 4 | } |
1689 | 6.05k | } |
1690 | | |
1691 | 6.05k | void SchemaChangeResultResponse::SerializeResultBody(faststring* mesg) const { |
1692 | 6.05k | SerializeString(change_type_, mesg); |
1693 | 6.05k | SerializeString(target_, mesg); |
1694 | 6.05k | if (target_ == "KEYSPACE") { |
1695 | 3.12k | SerializeString(keyspace_, mesg); |
1696 | 2.92k | } else if (target_ == "TABLE" || target_ == "TYPE") { |
1697 | 2.92k | SerializeString(keyspace_, mesg); |
1698 | 2.92k | SerializeString(object_, mesg); |
1699 | 0 | } else if (target_ == "FUNCTION" || target_ == "AGGREGATE") { |
1700 | 0 | SerializeString(keyspace_, mesg); |
1701 | 0 | SerializeString(object_, mesg); |
1702 | 0 | SerializeStringList(argument_types_, mesg); |
1703 | 0 | } |
1704 | 6.05k | } |
1705 | | |
1706 | | //---------------------------------------------------------------------------------------- |
1707 | | EventResponse::EventResponse(const string& event_type) |
1708 | 52.8k | : CQLResponse(kEventStreamId, Opcode::EVENT), event_type_(event_type) { |
1709 | 52.8k | } |
1710 | | |
1711 | 52.8k | EventResponse::~EventResponse() { |
1712 | 52.8k | } |
1713 | | |
1714 | 52.8k | void EventResponse::SerializeBody(faststring* mesg) const { |
1715 | 52.8k | SerializeString(event_type_, mesg); |
1716 | 52.8k | SerializeEventBody(mesg); |
1717 | 52.8k | } |
1718 | | |
1719 | 0 | std::string EventResponse::ToString() const { |
1720 | 0 | return event_type_ + ":" + BodyToString(); |
1721 | 0 | } |
1722 | | |
1723 | | //---------------------------------------------------------------------------------------- |
1724 | | TopologyChangeEventResponse::TopologyChangeEventResponse(const string& topology_change_type, |
1725 | | const Endpoint& node) |
1726 | | : EventResponse(kTopologyChangeEvent), topology_change_type_(topology_change_type), |
1727 | 52.8k | node_(node) { |
1728 | 52.8k | } |
1729 | | |
1730 | 52.8k | TopologyChangeEventResponse::~TopologyChangeEventResponse() { |
1731 | 52.8k | } |
1732 | | |
1733 | 52.8k | void TopologyChangeEventResponse::SerializeEventBody(faststring* mesg) const { |
1734 | 52.8k | SerializeString(topology_change_type_, mesg); |
1735 | 52.8k | SerializeInet(node_, mesg); |
1736 | 52.8k | } |
1737 | | |
1738 | 0 | std::string TopologyChangeEventResponse::BodyToString() const { |
1739 | 0 | return topology_change_type_; |
1740 | 0 | } |
1741 | | |
1742 | | //---------------------------------------------------------------------------------------- |
1743 | | StatusChangeEventResponse::StatusChangeEventResponse(const string& status_change_type, |
1744 | | const Endpoint& node) |
1745 | | : EventResponse(kStatusChangeEvent), status_change_type_(status_change_type), |
1746 | 0 | node_(node) { |
1747 | 0 | } |
1748 | | |
1749 | 0 | StatusChangeEventResponse::~StatusChangeEventResponse() { |
1750 | 0 | } |
1751 | | |
1752 | 0 | void StatusChangeEventResponse::SerializeEventBody(faststring* mesg) const { |
1753 | 0 | SerializeString(status_change_type_, mesg); |
1754 | 0 | SerializeInet(node_, mesg); |
1755 | 0 | } |
1756 | | |
1757 | 0 | std::string StatusChangeEventResponse::BodyToString() const { |
1758 | 0 | return status_change_type_; |
1759 | 0 | } |
1760 | | |
1761 | | //---------------------------------------------------------------------------------------- |
1762 | | const vector<string> SchemaChangeEventResponse::kEmptyArgumentTypes = {}; |
1763 | | |
1764 | | SchemaChangeEventResponse::SchemaChangeEventResponse( |
1765 | | const string& change_type, const string& target, |
1766 | | const string& keyspace, const string& object, const vector<string>& argument_types) |
1767 | | : EventResponse(kSchemaChangeEvent), change_type_(change_type), target_(target), |
1768 | 4 | keyspace_(keyspace), object_(object), argument_types_(argument_types) { |
1769 | 4 | } |
1770 | | |
1771 | 4 | SchemaChangeEventResponse::~SchemaChangeEventResponse() { |
1772 | 4 | } |
1773 | | |
1774 | 0 | std::string SchemaChangeEventResponse::BodyToString() const { |
1775 | 0 | return change_type_; |
1776 | 0 | } |
1777 | | |
1778 | 4 | void SchemaChangeEventResponse::SerializeEventBody(faststring* mesg) const { |
1779 | 4 | SerializeString(change_type_, mesg); |
1780 | 4 | SerializeString(target_, mesg); |
1781 | 4 | if (target_ == "KEYSPACE") { |
1782 | 2 | SerializeString(keyspace_, mesg); |
1783 | 2 | } else if (target_ == "TABLE" || target_ == "TYPE") { |
1784 | 2 | SerializeString(keyspace_, mesg); |
1785 | 2 | SerializeString(object_, mesg); |
1786 | 0 | } else if (target_ == "FUNCTION" || target_ == "AGGREGATE") { |
1787 | 0 | SerializeString(keyspace_, mesg); |
1788 | 0 | SerializeString(object_, mesg); |
1789 | 0 | SerializeStringList(argument_types_, mesg); |
1790 | 0 | } |
1791 | 4 | } |
1792 | | |
1793 | | //---------------------------------------------------------------------------------------- |
1794 | | AuthChallengeResponse::AuthChallengeResponse(const CQLRequest& request, const string& token) |
1795 | 0 | : CQLResponse(request, Opcode::AUTH_CHALLENGE), token_(token) { |
1796 | 0 | } |
1797 | | |
1798 | 0 | AuthChallengeResponse::~AuthChallengeResponse() { |
1799 | 0 | } |
1800 | | |
1801 | 0 | void AuthChallengeResponse::SerializeBody(faststring* mesg) const { |
1802 | 0 | SerializeBytes(token_, mesg); |
1803 | 0 | } |
1804 | | |
1805 | | //---------------------------------------------------------------------------------------- |
1806 | | AuthSuccessResponse::AuthSuccessResponse(const CQLRequest& request, const string& token) |
1807 | 6.16k | : CQLResponse(request, Opcode::AUTH_SUCCESS), token_(token) { |
1808 | 6.16k | } |
1809 | | |
1810 | 6.16k | AuthSuccessResponse::~AuthSuccessResponse() { |
1811 | 6.16k | } |
1812 | | |
1813 | 6.16k | void AuthSuccessResponse::SerializeBody(faststring* mesg) const { |
1814 | 6.16k | SerializeBytes(token_, mesg); |
1815 | 6.16k | } |
1816 | | |
1817 | | CQLServerEvent::CQLServerEvent(std::unique_ptr<EventResponse> event_response) |
1818 | 52.8k | : event_response_(std::move(event_response)) { |
1819 | 52.8k | CHECK_NOTNULL(event_response_.get()); |
1820 | 52.8k | faststring temp; |
1821 | 52.8k | event_response_->Serialize(CQLMessage::CompressionScheme::kNone, &temp); |
1822 | 52.8k | serialized_response_ = RefCntBuffer(temp); |
1823 | 52.8k | } |
1824 | | |
1825 | 94.0k | void CQLServerEvent::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) const { |
1826 | 94.0k | output->push_back(serialized_response_); |
1827 | 94.0k | } |
1828 | | |
1829 | 0 | std::string CQLServerEvent::ToString() const { |
1830 | 0 | return event_response_->ToString(); |
1831 | 0 | } |
1832 | | |
1833 | 10.8k | CQLServerEventList::CQLServerEventList() { |
1834 | 10.8k | } |
1835 | | |
1836 | 18.9k | void CQLServerEventList::Transferred(const Status& status, rpc::Connection*) { |
1837 | 18.9k | if (!status.ok()) { |
1838 | 0 | LOG(WARNING) << "Transfer of CQL server event failed: " << status.ToString(); |
1839 | 0 | } |
1840 | 18.9k | } |
1841 | | |
1842 | | void CQLServerEventList::Serialize( |
1843 | 19.4k | boost::container::small_vector_base<RefCntBuffer>* output) { |
1844 | 94.3k | for (const auto& cql_server_event : cql_server_events_) { |
1845 | 94.3k | cql_server_event->Serialize(output); |
1846 | 94.3k | } |
1847 | 19.4k | } |
1848 | | |
1849 | 0 | std::string CQLServerEventList::ToString() const { |
1850 | 0 | std::string ret = ""; |
1851 | 0 | for (const auto& cql_server_event : cql_server_events_) { |
1852 | 0 | if (!ret.empty()) { |
1853 | 0 | ret += ", "; |
1854 | 0 | } |
1855 | 0 | ret += cql_server_event->ToString(); |
1856 | 0 | } |
1857 | 0 | return ret; |
1858 | 0 | } |
1859 | | |
1860 | 52.8k | void CQLServerEventList::AddEvent(std::unique_ptr<CQLServerEvent> event) { |
1861 | 52.8k | cql_server_events_.push_back(std::move(event)); |
1862 | 52.8k | } |
1863 | | |
1864 | | } // namespace ql |
1865 | | } // namespace yb |