YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/util/cql_message.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/cql/ql/util/cql_message.h"
15
16
#include <lz4.h>
17
#include <snappy.h>
18
19
#include "yb/common/ql_protocol.pb.h"
20
#include "yb/common/ql_type.h"
21
#include "yb/common/ql_value.h"
22
#include "yb/common/schema.h"
23
24
#include "yb/gutil/casts.h"
25
#include "yb/gutil/endian.h"
26
#include "yb/gutil/strings/substitute.h"
27
28
#include "yb/util/logging.h"
29
#include "yb/util/random_util.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_format.h"
32
33
namespace yb {
34
namespace ql {
35
36
using std::shared_ptr;
37
using std::unique_ptr;
38
using std::string;
39
using std::set;
40
using std::vector;
41
using std::unordered_map;
42
using strings::Substitute;
43
using snappy::GetUncompressedLength;
44
using snappy::MaxCompressedLength;
45
using snappy::RawUncompress;
46
using snappy::RawCompress;
47
48
#undef RETURN_NOT_ENOUGH
49
#define RETURN_NOT_ENOUGH(sz)                               \
50
12.9M
  do {                                                      \
51
12.9M
    if (body_.size() < (sz)) {                              \
52
0
      return STATUS(NetworkError, "Truncated CQL message"); \
53
0
    }                                                       \
54
12.9M
  } while (0)
55
56
constexpr char CQLMessage::kCQLVersionOption[];
57
constexpr char CQLMessage::kCompressionOption[];
58
constexpr char CQLMessage::kNoCompactOption[];
59
60
constexpr char CQLMessage::kLZ4Compression[];
61
constexpr char CQLMessage::kSnappyCompression[];
62
63
constexpr char CQLMessage::kTopologyChangeEvent[];
64
constexpr char CQLMessage::kStatusChangeEvent[];
65
constexpr char CQLMessage::kSchemaChangeEvent[];
66
67
CHECKED_STATUS CQLMessage::QueryParameters::GetBindVariableValue(const std::string& name,
68
                                                                 const size_t pos,
69
18.7M
                                                                 const Value** value) const {
70
18.7M
  if (!value_map.empty()) {
71
148
    const auto itr = value_map.find(name);
72
148
    if (itr == value_map.end()) {
73
2
      return STATUS_SUBSTITUTE(RuntimeError, "Bind variable \"$0\" not found", name);
74
2
    }
75
146
    *value = &values[itr->second];
76
18.7M
  } else {
77
18.7M
    if (pos >= values.size()) {
78
      // Return error with 1-based position.
79
1
      return STATUS_SUBSTITUTE(RuntimeError, "Bind variable at position $0 not found", pos + 1);
80
1
    }
81
18.7M
    *value = &values[pos];
82
18.7M
  }
83
84
18.7M
  return Status::OK();
85
18.7M
}
86
87
Result<bool> CQLMessage::QueryParameters::IsBindVariableUnset(const std::string& name,
88
5.75M
                                                              const int64_t pos) const {
89
5.75M
  const Value* value = nullptr;
90
5.75M
  RETURN_NOT_OK(GetBindVariableValue(name, pos, &value));
91
5.75M
  return (value->kind == Value::Kind::NOT_SET);
92
5.75M
}
93
94
Status CQLMessage::QueryParameters::GetBindVariable(const std::string& name,
95
                                                    const int64_t pos,
96
                                                    const shared_ptr<QLType>& type,
97
12.9M
                                                    QLValue* value) const {
98
12.9M
  const Value* v = nullptr;
99
12.9M
  RETURN_NOT_OK(GetBindVariableValue(name, pos, &v));
100
12.9M
  switch (v->kind) {
101
12.9M
    case Value::Kind::NOT_NULL: {
102
12.9M
      if (v->value.empty()) {
103
3
        switch (type->main()) {
104
1
          case DataType::STRING:
105
1
            value->set_string_value("");
106
1
            return Status::OK();
107
2
          case DataType::BINARY:
108
2
            value->set_binary_value("");
109
2
            return Status::OK();
110
0
          case DataType::NULL_VALUE_TYPE: FALLTHROUGH_INTENDED;
111
0
          case DataType::INT8: FALLTHROUGH_INTENDED;
112
0
          case DataType::INT16: FALLTHROUGH_INTENDED;
113
0
          case DataType::INT32: FALLTHROUGH_INTENDED;
114
0
          case DataType::INT64: FALLTHROUGH_INTENDED;
115
0
          case DataType::BOOL: FALLTHROUGH_INTENDED;
116
0
          case DataType::FLOAT: FALLTHROUGH_INTENDED;
117
0
          case DataType::DOUBLE: FALLTHROUGH_INTENDED;
118
0
          case DataType::TIMESTAMP: FALLTHROUGH_INTENDED;
119
0
          case DataType::DECIMAL: FALLTHROUGH_INTENDED;
120
0
          case DataType::VARINT: FALLTHROUGH_INTENDED;
121
0
          case DataType::INET: FALLTHROUGH_INTENDED;
122
0
          case DataType::JSONB: FALLTHROUGH_INTENDED;
123
0
          case DataType::LIST: FALLTHROUGH_INTENDED;
124
0
          case DataType::MAP: FALLTHROUGH_INTENDED;
125
0
          case DataType::SET: FALLTHROUGH_INTENDED;
126
0
          case DataType::UUID: FALLTHROUGH_INTENDED;
127
0
          case DataType::TIMEUUID:
128
0
            value->SetNull();
129
0
            return Status::OK();
130
0
          case DataType::UNKNOWN_DATA: FALLTHROUGH_INTENDED;
131
0
          case DataType::TUPLE: FALLTHROUGH_INTENDED;
132
0
          case DataType::TYPEARGS: FALLTHROUGH_INTENDED;
133
0
          case DataType::USER_DEFINED_TYPE: FALLTHROUGH_INTENDED;
134
0
          case DataType::FROZEN: FALLTHROUGH_INTENDED;
135
0
          case DataType::DATE: FALLTHROUGH_INTENDED;
136
0
          case DataType::TIME: FALLTHROUGH_INTENDED;
137
0
          QL_INVALID_TYPES_IN_SWITCH:
138
0
            break;
139
3
        }
140
0
        return STATUS_SUBSTITUTE(
141
3
            NotSupported, "Unsupported datatype $0", static_cast<int>(type->main()));
142
3
      }
143
12.9M
      Slice data(v->value);
144
12.9M
      return value->Deserialize(type, YQL_CLIENT_CQL, &data);
145
12.9M
    }
146
42
    case Value::Kind::IS_NULL:
147
42
      value->SetNull();
148
42
      return Status::OK();
149
0
    case Value::Kind::NOT_SET:
150
0
      return Status::OK();
151
12.9M
  }
152
0
  return STATUS_SUBSTITUTE(
153
12.9M
      RuntimeError, "Invalid bind variable kind $0", static_cast<int>(v->kind));
154
12.9M
}
155
156
8.91M
Status CQLMessage::QueryParameters::ValidateConsistency() {
157
8.91M
  switch (consistency) {
158
34.7k
    case Consistency::LOCAL_ONE: FALLTHROUGH_INTENDED;
159
8.82M
    case Consistency::QUORUM: {
160
      // We are repurposing cassandra's "QUORUM" consistency level to indicate "STRONG"
161
      // consistency for YB. Although, by default the datastax client uses "LOCAL_ONE" as its
162
      // consistency level and since by default we want to support STRONG consistency, we use
163
      // STRONG consistency even for LOCAL_ONE. This way existing cassandra clients don't need
164
      // any changes.
165
8.82M
      set_yb_consistency_level(YBConsistencyLevel::STRONG);
166
8.82M
      break;
167
34.7k
    }
168
87.0k
    case Consistency::ONE: {
169
      // Here we repurpose cassandra's ONE consistency level to be CONSISTENT_PREFIX for us since
170
      // that seems to be the most appropriate.
171
87.0k
      set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
172
87.0k
      break;
173
34.7k
    }
174
104
    default:
175
104
      YB_LOG_EVERY_N_SECS(WARNING, 10) << "Consistency level " << static_cast<uint16_t>(consistency)
176
29
                                       << " is not supported, defaulting to strong consistency";
177
104
      set_yb_consistency_level(YBConsistencyLevel::STRONG);
178
8.91M
  }
179
8.90M
  return Status::OK();
180
8.91M
}
181
182
namespace {
183
184
template<class Type>
185
26.8M
Type LoadByte(const Slice& slice, size_t offset) {
186
26.8M
  return static_cast<Type>(Load8(slice.data() + offset));
187
26.8M
}
cql_message.cc:unsigned char yb::ql::(anonymous namespace)::LoadByte<unsigned char>(yb::Slice const&, unsigned long)
Line
Count
Source
185
17.8M
Type LoadByte(const Slice& slice, size_t offset) {
186
17.8M
  return static_cast<Type>(Load8(slice.data() + offset));
187
17.8M
}
cql_message.cc:yb::ql::CQLMessage::Opcode yb::ql::(anonymous namespace)::LoadByte<yb::ql::CQLMessage::Opcode>(yb::Slice const&, unsigned long)
Line
Count
Source
185
8.93M
Type LoadByte(const Slice& slice, size_t offset) {
186
8.93M
  return static_cast<Type>(Load8(slice.data() + offset));
187
8.93M
}
188
189
template<class Type>
190
Type LoadShort(const Slice& slice, size_t offset) {
191
  return static_cast<Type>(NetworkByteOrder::Load16(slice.data() + offset));
192
}
193
194
template<class Type>
195
8.93M
Type LoadInt(const Slice& slice, size_t offset) {
196
8.93M
  return static_cast<Type>(NetworkByteOrder::Load32(slice.data() + offset));
197
8.93M
}
198
199
} // namespace
200
201
// ------------------------------------ CQL request -----------------------------------
202
bool CQLRequest::ParseRequest(
203
  const Slice& mesg, const CompressionScheme compression_scheme,
204
8.94M
  unique_ptr<CQLRequest>* request, unique_ptr<CQLResponse>* error_response) {
205
206
8.94M
  *request = nullptr;
207
8.94M
  *error_response = nullptr;
208
209
8.94M
  if (mesg.size() < kMessageHeaderLength) {
210
0
    error_response->reset(
211
0
        new ErrorResponse(
212
0
            static_cast<StreamId>(0), ErrorResponse::Code::PROTOCOL_ERROR, "Incomplete header"));
213
0
    return false;
214
0
  }
215
216
8.94M
  Header header(
217
8.94M
      LoadByte<Version>(mesg, kHeaderPosVersion),
218
8.94M
      LoadByte<Flags>(mesg, kHeaderPosFlags),
219
8.94M
      ParseStreamId(mesg),
220
8.94M
      LoadByte<Opcode>(mesg, kHeaderPosOpcode));
221
222
8.94M
  uint32_t length = LoadInt<uint32_t>(mesg, kHeaderPosLength);
223
8.94M
  DVLOG(4) << "CQL message "
224
8.66k
           << "version 0x" << std::hex << static_cast<uint32_t>(header.version)   << " "
225
8.66k
           << "flags 0x"   << std::hex << static_cast<uint32_t>(header.flags)     << " "
226
8.66k
           << "stream id " << std::dec << static_cast<uint32_t>(header.stream_id) << " "
227
8.66k
           << "opcode 0x"  << std::hex << static_cast<uint32_t>(header.opcode)    << " "
228
8.66k
           << "length "    << std::dec << static_cast<uint32_t>(length);
229
230
  // Verify proper version that the response bit is not set in the request protocol version and
231
  // the protocol version is one we support.
232
8.94M
  if (header.version & kResponseVersion) {
233
0
    error_response->reset(
234
0
        new ErrorResponse(
235
0
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request"));
236
0
    return false;
237
0
  }
238
8.94M
  if (header.version < kMinimumVersion || 
header.version > kCurrentVersion8.93M
) {
239
31
    error_response->reset(
240
31
        new ErrorResponse(
241
31
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
242
31
            Substitute("Invalid or unsupported protocol version $0. Supported versions are between "
243
31
                       "$1 and $2.", header.version, kMinimumVersion, kCurrentVersion)));
244
31
    return false;
245
31
  }
246
247
8.94M
  size_t body_size = mesg.size() - kMessageHeaderLength;
248
8.94M
  const uint8_t* body_data = body_size > 0 ? 
mesg.data() + kMessageHeaderLength8.93M
:
to_uchar_ptr("")9.52k
;
249
8.94M
  unique_ptr<uint8_t[]> buffer;
250
251
  // If the message body is compressed, uncompress it.
252
8.94M
  if (body_size > 0 && 
(header.flags & kCompressionFlag)8.93M
) {
253
318
    if (header.opcode == Opcode::STARTUP) {
254
0
      error_response->reset(
255
0
          new ErrorResponse(
256
0
              header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
257
0
              "STARTUP request should not be compressed"));
258
0
      return false;
259
0
    }
260
318
    switch (compression_scheme) {
261
206
      case CompressionScheme::kLz4: {
262
206
        if (body_size < sizeof(uint32_t)) {
263
0
          error_response->reset(
264
0
              new ErrorResponse(
265
0
                  header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
266
0
                  "Insufficient compressed data"));
267
0
          return false;
268
0
        }
269
270
206
        const uint32_t uncomp_size = NetworkByteOrder::Load32(body_data);
271
206
        buffer = std::make_unique<uint8_t[]>(uncomp_size);
272
206
        body_data += sizeof(uncomp_size);
273
206
        body_size -= sizeof(uncomp_size);
274
206
        const int size = LZ4_decompress_safe(
275
206
            to_char_ptr(body_data), to_char_ptr(buffer.get()), narrow_cast<int>(body_size),
276
206
            uncomp_size);
277
206
        if (size < 0 || static_cast<uint32_t>(size) != uncomp_size) {
278
0
          error_response->reset(
279
0
              new ErrorResponse(
280
0
                  header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
281
0
                  "Error occurred when uncompressing CQL message"));
282
0
          return false;
283
0
        }
284
206
        body_data = buffer.get();
285
206
        body_size = uncomp_size;
286
206
        break;
287
206
      }
288
111
      case CompressionScheme::kSnappy: {
289
111
        size_t uncomp_size = 0;
290
111
        if (GetUncompressedLength(to_char_ptr(body_data), body_size, &uncomp_size)) {
291
111
          buffer = std::make_unique<uint8_t[]>(uncomp_size);
292
113
          if (
RawUncompress(to_char_ptr(body_data), body_size, to_char_ptr(buffer.get()))111
) {
293
113
            body_data = buffer.get();
294
113
            body_size = uncomp_size;
295
113
            break;
296
113
          }
297
111
        }
298
18.4E
        error_response->reset(
299
18.4E
            new ErrorResponse(
300
18.4E
                header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
301
18.4E
                "Error occurred when uncompressing CQL message"));
302
18.4E
        break;
303
111
      }
304
0
      case CompressionScheme::kNone:
305
0
        error_response->reset(
306
0
            new ErrorResponse(
307
0
                header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
308
0
                "No compression scheme specified"));
309
0
        return false;
310
318
    }
311
318
  }
312
313
8.94M
  const Slice body = (body_size == 0) ? 
Slice()2.57k
:
Slice(body_data, body_size)8.94M
;
314
315
  // Construct the skeleton request by the opcode
316
8.94M
  switch (header.opcode) {
317
10.0k
    case Opcode::STARTUP:
318
10.0k
      request->reset(new StartupRequest(header, body));
319
10.0k
      break;
320
6.29k
    case Opcode::AUTH_RESPONSE:
321
6.29k
      request->reset(new AuthResponseRequest(header, body));
322
6.29k
      break;
323
2.57k
    case Opcode::OPTIONS:
324
2.57k
      request->reset(new OptionsRequest(header, body));
325
2.57k
      break;
326
169k
    case Opcode::QUERY:
327
169k
      request->reset(new QueryRequest(header, body));
328
169k
      break;
329
5.69k
    case Opcode::PREPARE:
330
5.69k
      request->reset(new PrepareRequest(header, body));
331
5.69k
      break;
332
8.74M
    case Opcode::EXECUTE:
333
8.74M
      request->reset(new ExecuteRequest(header, body));
334
8.74M
      break;
335
1.62k
    case Opcode::BATCH:
336
1.62k
      request->reset(new BatchRequest(header, body));
337
1.62k
      break;
338
2.31k
    case Opcode::REGISTER:
339
2.31k
      request->reset(new RegisterRequest(header, body));
340
2.31k
      break;
341
342
    // These are not request but response opcodes
343
0
    case Opcode::ERROR:
344
0
    case Opcode::READY:
345
0
    case Opcode::AUTHENTICATE:
346
0
    case Opcode::SUPPORTED:
347
0
    case Opcode::RESULT:
348
0
    case Opcode::EVENT:
349
0
    case Opcode::AUTH_CHALLENGE:
350
0
    case Opcode::AUTH_SUCCESS:
351
0
      error_response->reset(
352
0
          new ErrorResponse(
353
0
              header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request opcode"));
354
0
      return false;
355
356
    // default: -> fall through
357
8.94M
  }
358
359
8.94M
  if (*request == nullptr) {
360
0
    error_response->reset(
361
0
        new ErrorResponse(
362
0
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Unknown opcode"));
363
0
    return false;
364
0
  }
365
366
  // Parse the request body
367
8.94M
  const Status status = (*request)->ParseBody();
368
8.94M
  if (!status.ok()) {
369
0
    error_response->reset(
370
0
        new ErrorResponse(
371
0
            *(*request), ErrorResponse::Code::PROTOCOL_ERROR, status.message().ToString()));
372
8.94M
  } else if (!(*request)->body_.empty()) {
373
    // Flag error when there are bytes remaining after we have parsed the whole request body
374
    // according to the protocol. Either the request's length field from the client is
375
    // wrong. Or we could have a bug in our parser.
376
0
    error_response->reset(
377
0
        new ErrorResponse(
378
0
            *(*request), ErrorResponse::Code::PROTOCOL_ERROR, "Request length too long"));
379
0
  }
380
381
  // If there is any error, free the partially parsed request and return.
382
8.94M
  if (*error_response != nullptr) {
383
0
    *request = nullptr;
384
0
    return false;
385
0
  }
386
387
  // Clear and release the body after parsing.
388
8.94M
  (*request)->body_.clear();
389
390
8.94M
  return true;
391
8.94M
}
392
393
8.93M
CQLRequest::CQLRequest(const Header& header, const Slice& body) : CQLMessage(header), body_(body) {
394
8.93M
}
395
396
8.93M
CQLRequest::~CQLRequest() {
397
8.93M
}
398
399
//----------------------------------------------------------------------------------------
400
401
0
Status CQLRequest::ParseUUID(string* value) {
402
0
  RETURN_NOT_ENOUGH(kUUIDSize);
403
0
  *value = string(to_char_ptr(body_.data()), kUUIDSize);
404
0
  body_.remove_prefix(kUUIDSize);
405
0
  DVLOG(4) << "CQL uuid " << *value;
406
0
  return Status::OK();
407
0
}
408
409
0
Status CQLRequest::ParseTimeUUID(string* value) {
410
0
  RETURN_NOT_ENOUGH(kUUIDSize);
411
0
  *value = string(to_char_ptr(body_.data()), kUUIDSize);
412
0
  body_.remove_prefix(kUUIDSize);
413
0
  DVLOG(4) << "CQL timeuuid " << *value;
414
0
  return Status::OK();
415
0
}
416
417
2.31k
Status CQLRequest::ParseStringList(vector<string>* list) {
418
2.31k
  DVLOG
(4) << "CQL string list ..."0
;
419
2.31k
  uint16_t count = 0;
420
2.31k
  RETURN_NOT_OK(ParseShort(&count));
421
2.31k
  list->resize(count);
422
9.26k
  for (uint16_t i = 0; i < count; 
++i6.95k
) {
423
6.95k
    RETURN_NOT_OK(ParseString(&list->at(i)));
424
6.95k
  }
425
2.31k
  return Status::OK();
426
2.31k
}
427
428
0
Status CQLRequest::ParseInet(Endpoint* value) {
429
0
  std::string ipaddr;
430
0
  int32_t port = 0;
431
0
  RETURN_NOT_OK(ParseBytes("CQL ipaddr", &CQLRequest::ParseByte, &ipaddr));
432
0
  RETURN_NOT_OK(ParseInt(&port));
433
0
  if (port < 0 || port > 65535) {
434
0
    return STATUS(NetworkError, "Invalid inet port");
435
0
  }
436
0
  IpAddress address;
437
0
  if (ipaddr.size() == boost::asio::ip::address_v4::bytes_type().size()) {
438
0
    boost::asio::ip::address_v4::bytes_type bytes;
439
0
    memcpy(bytes.data(), ipaddr.data(), ipaddr.size());
440
0
    address = boost::asio::ip::address_v4(bytes);
441
0
  } else if (ipaddr.size() == boost::asio::ip::address_v6::bytes_type().size()) {
442
0
    boost::asio::ip::address_v6::bytes_type bytes;
443
0
    memcpy(bytes.data(), ipaddr.data(), ipaddr.size());
444
0
    address = boost::asio::ip::address_v6(bytes);
445
0
  } else {
446
0
    return STATUS_SUBSTITUTE(NetworkError, "Invalid size of ipaddr: $0", ipaddr.size());
447
0
  }
448
0
  *value = Endpoint(address, port);
449
0
  DVLOG(4) << "CQL inet " << *value;
450
0
  return Status::OK();
451
0
}
452
453
10.0k
Status CQLRequest::ParseStringMap(unordered_map<string, string>* map) {
454
10.0k
  DVLOG
(4) << "CQL string map ..."3
;
455
10.0k
  uint16_t count = 0;
456
10.0k
  RETURN_NOT_OK(ParseShort(&count));
457
37.9k
  
for (uint16_t i = 0; 10.0k
i < count;
++i27.8k
) {
458
27.8k
    string name, value;
459
27.8k
    RETURN_NOT_OK(ParseString(&name));
460
27.8k
    RETURN_NOT_OK(ParseString(&value));
461
27.8k
    (*map)[name] = value;
462
27.8k
  }
463
10.0k
  return Status::OK();
464
10.0k
}
465
466
0
Status CQLRequest::ParseStringMultiMap(unordered_map<string, vector<string>>* map) {
467
0
  DVLOG(4) << "CQL string multimap ...";
468
0
  uint16_t count = 0;
469
0
  RETURN_NOT_OK(ParseShort(&count));
470
0
  for (uint16_t i = 0; i < count; ++i) {
471
0
    string name;
472
0
    vector<string> value;
473
0
    RETURN_NOT_OK(ParseString(&name));
474
0
    RETURN_NOT_OK(ParseStringList(&value));
475
0
    (*map)[name] = value;
476
0
  }
477
0
  return Status::OK();
478
0
}
479
480
0
Status CQLRequest::ParseBytesMap(unordered_map<string, string>* map) {
481
0
  DVLOG(4) << "CQL bytes map ...";
482
0
  uint16_t count = 0;
483
0
  RETURN_NOT_OK(ParseShort(&count));
484
0
  for (uint16_t i = 0; i < count; ++i) {
485
0
    string name, value;
486
0
    RETURN_NOT_OK(ParseString(&name));
487
0
    RETURN_NOT_OK(ParseBytes(&value));
488
0
    (*map)[name] = value;
489
0
  }
490
0
  return Status::OK();
491
0
}
492
493
12.9M
Status CQLRequest::ParseValue(const bool with_name, Value* value) {
494
12.9M
  DVLOG
(4) << "CQL value ..."3.23k
;
495
12.9M
  if (with_name) {
496
111
    RETURN_NOT_OK(ParseString(&value->name));
497
111
  }
498
  // Save data pointer to assign the value with length bytes below.
499
12.9M
  const uint8_t* data = body_.data();
500
12.9M
  int32_t length = 0;
501
12.9M
  RETURN_NOT_OK(ParseInt(&length));
502
12.9M
  if (length >= 0) {
503
12.9M
    value->kind = Value::Kind::NOT_NULL;
504
12.9M
    if (
length > 012.9M
) {
505
12.9M
      uint32_t unsigned_length = length;
506
12.9M
      RETURN_NOT_ENOUGH(unsigned_length);
507
12.9M
      value->value.assign(to_char_ptr(data), kIntSize + length);
508
12.9M
      body_.remove_prefix(length);
509
12.9M
      DVLOG
(4) << "CQL value bytes " << value->value24.7k
;
510
12.9M
    }
511
12.9M
  } else 
if (3.77k
VersionIsCompatible(kV4Version)3.77k
) {
512
44
    switch (length) {
513
41
      case -1:
514
41
        value->kind = Value::Kind::IS_NULL;
515
41
        break;
516
3
      case -2:
517
3
        value->kind = Value::Kind::NOT_SET;
518
3
        break;
519
0
      default:
520
0
        return STATUS(NetworkError, "Invalid length in value");
521
0
        break;
522
44
    }
523
3.72k
  } else {
524
3.72k
    value->kind = Value::Kind::IS_NULL;
525
3.72k
  }
526
12.9M
  return Status::OK();
527
12.9M
}
528
529
8.91M
Status CQLRequest::ParseQueryParameters(QueryParameters* params) {
530
8.91M
  DVLOG
(4) << "CQL query parameters ..."4.10k
;
531
8.91M
  RETURN_NOT_OK(ParseConsistency(&params->consistency));
532
8.91M
  RETURN_NOT_OK(params->ValidateConsistency());
533
8.91M
  RETURN_NOT_OK(ParseByte(&params->flags));
534
8.91M
  params->set_request_id(RandomUniformInt<uint64_t>());
535
8.91M
  if (params->flags & CQLMessage::QueryParameters::kWithValuesFlag) {
536
8.74M
    const bool with_name = (params->flags & CQLMessage::QueryParameters::kWithNamesForValuesFlag);
537
8.74M
    uint16_t count = 0;
538
8.74M
    RETURN_NOT_OK(ParseShort(&count));
539
8.74M
    params->values.resize(count);
540
20.5M
    for (uint16_t i = 0; i < count; 
++i11.8M
) {
541
11.8M
      Value& value = params->values[i];
542
11.8M
      RETURN_NOT_OK(ParseValue(with_name, &value));
543
11.8M
      if (with_name) {
544
111
        params->value_map[value.name] = i;
545
111
      }
546
11.8M
    }
547
8.74M
  }
548
8.91M
  if (params->flags & CQLMessage::QueryParameters::kWithPageSizeFlag) {
549
8.79M
    int32_t page_size = 0;
550
8.79M
    RETURN_NOT_OK(ParseInt(&page_size));
551
8.79M
    params->set_page_size(page_size);
552
8.79M
  }
553
8.91M
  if (params->flags & CQLMessage::QueryParameters::kWithPagingStateFlag) {
554
279
    string paging_state;
555
279
    RETURN_NOT_OK(ParseBytes(&paging_state));
556
279
    RETURN_NOT_OK(params->SetPagingState(paging_state));
557
279
  }
558
8.91M
  if (params->flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) {
559
0
    RETURN_NOT_OK(ParseConsistency(&params->serial_consistency));
560
0
  }
561
8.91M
  if (params->flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) {
562
8.79M
    RETURN_NOT_OK(ParseLong(&params->default_timestamp));
563
8.79M
  }
564
8.91M
  return Status::OK();
565
8.91M
}
566
567
9.28M
CHECKED_STATUS CQLRequest::ParseByte(uint8_t* value) {
568
9.28M
  static_assert(sizeof(*value) == kByteSize, "inconsistent byte size");
569
9.28M
  return ParseNum("CQL byte", Load8, value);
570
9.28M
}
571
572
18.3M
CHECKED_STATUS CQLRequest::ParseShort(uint16_t* value) {
573
18.3M
  static_assert(sizeof(*value) == kShortSize, "inconsistent short size");
574
18.3M
  return ParseNum("CQL byte", NetworkByteOrder::Load16, value);
575
18.3M
}
576
577
21.8M
CHECKED_STATUS CQLRequest::ParseInt(int32_t* value) {
578
21.8M
  static_assert(sizeof(*value) == kIntSize, "inconsistent int size");
579
21.8M
  return ParseNum("CQL int", NetworkByteOrder::Load32, value);
580
21.8M
}
581
582
8.79M
CHECKED_STATUS CQLRequest::ParseLong(int64_t* value) {
583
8.79M
  static_assert(sizeof(*value) == kLongSize, "inconsistent long size");
584
8.79M
  return ParseNum("CQL long", NetworkByteOrder::Load64, value);
585
8.79M
}
586
587
62.8k
CHECKED_STATUS CQLRequest::ParseString(std::string* value)  {
588
62.8k
  return ParseBytes("CQL string", &CQLRequest::ParseShort, value);
589
62.8k
}
590
591
175k
CHECKED_STATUS CQLRequest::ParseLongString(std::string* value)  {
592
175k
  return ParseBytes("CQL long string", &CQLRequest::ParseInt, value);
593
175k
}
594
595
9.11M
CHECKED_STATUS CQLRequest::ParseShortBytes(std::string* value) {
596
9.11M
  return ParseBytes("CQL short bytes", &CQLRequest::ParseShort, value);
597
9.11M
}
598
599
6.56k
CHECKED_STATUS CQLRequest::ParseBytes(std::string* value) {
600
6.56k
  return ParseBytes("CQL bytes", &CQLRequest::ParseInt, value);
601
6.56k
}
602
603
8.91M
CHECKED_STATUS CQLRequest::ParseConsistency(Consistency* consistency) {
604
8.91M
  static_assert(sizeof(*consistency) == kConsistencySize, "inconsistent consistency size");
605
8.91M
  return ParseNum("CQL consistency", NetworkByteOrder::Load16, consistency);
606
8.91M
}
607
608
// ------------------------------ Individual CQL requests -----------------------------------
609
StartupRequest::StartupRequest(const Header& header, const Slice& body)
610
10.0k
    : CQLRequest(header, body) {
611
10.0k
}
612
613
10.0k
StartupRequest::~StartupRequest() {
614
10.0k
}
615
616
10.0k
Status StartupRequest::ParseBody() {
617
10.0k
  return ParseStringMap(&options_);
618
10.0k
}
619
620
//----------------------------------------------------------------------------------------
621
AuthResponseRequest::AuthResponseRequest(const Header& header, const Slice& body)
622
6.29k
    : CQLRequest(header, body) {
623
6.29k
}
624
625
6.28k
AuthResponseRequest::~AuthResponseRequest() {
626
6.28k
}
627
628
6.29k
Status AuthResponseRequest::ParseBody() {
629
6.29k
  RETURN_NOT_OK(ParseBytes(&token_));
630
6.29k
  string error_msg;
631
6.29k
  do {
632
6.29k
    if (token_.empty()) {
633
0
      error_msg = "Invalid empty token!";
634
0
      break;
635
0
    }
636
6.29k
    if (token_[0] != '\0') {
637
0
      error_msg = "Invalid format. Message must begin with \\0";
638
0
      break;
639
0
    }
640
6.29k
    size_t next_delim = token_.find_first_of('\0', 1);
641
6.29k
    if (next_delim == std::string::npos) {
642
0
      error_msg = "Invalid format. Message must contain \\0 after username";
643
0
      break;
644
0
    }
645
    // Start from token_[1], read all the username.
646
6.29k
    params_.username = token_.substr(1, next_delim - 1);
647
    // Start from after the delimiter, go to the end.
648
6.29k
    params_.password = token_.substr(next_delim + 1);
649
6.29k
    return Status::OK();
650
6.29k
  } while (
00
);
651
0
  return STATUS(InvalidArgument, error_msg);
652
6.29k
}
653
654
CHECKED_STATUS AuthResponseRequest::AuthQueryParameters::GetBindVariable(
655
    const std::string& name,
656
    int64_t pos,
657
    const std::shared_ptr<QLType>& type,
658
5.89k
    QLValue* value) const {
659
5.89k
  if (pos == 0) {
660
5.89k
    value->set_string_value(username);
661
5.89k
    return Status::OK();
662
5.89k
  } else {
663
0
    return STATUS(InvalidArgument, Substitute("Bind variable position $0 out of range: ", pos));
664
0
  }
665
5.89k
}
666
667
//----------------------------------------------------------------------------------------
668
2.57k
OptionsRequest::OptionsRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
669
2.57k
}
670
671
2.57k
OptionsRequest::~OptionsRequest() {
672
2.57k
}
673
674
2.57k
Status OptionsRequest::ParseBody() {
675
  // Options body is empty
676
2.57k
  return Status::OK();
677
2.57k
}
678
679
//----------------------------------------------------------------------------------------
680
168k
QueryRequest::QueryRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
681
168k
}
682
683
169k
QueryRequest::~QueryRequest() {
684
169k
}
685
686
169k
Status QueryRequest::ParseBody() {
687
169k
  RETURN_NOT_OK(ParseLongString(&query_));
688
169k
  RETURN_NOT_OK(ParseQueryParameters(&params_));
689
169k
  return Status::OK();
690
169k
}
691
692
//----------------------------------------------------------------------------------------
693
5.69k
PrepareRequest::PrepareRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
694
5.69k
}
695
696
5.69k
PrepareRequest::~PrepareRequest() {
697
5.69k
}
698
699
5.69k
Status PrepareRequest::ParseBody() {
700
5.69k
  RETURN_NOT_OK(ParseLongString(&query_));
701
5.69k
  return Status::OK();
702
5.69k
}
703
704
//----------------------------------------------------------------------------------------
705
8.73M
ExecuteRequest::ExecuteRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
706
8.73M
}
707
708
8.74M
ExecuteRequest::~ExecuteRequest() {
709
8.74M
}
710
711
8.74M
Status ExecuteRequest::ParseBody() {
712
8.74M
  RETURN_NOT_OK(ParseShortBytes(&query_id_));
713
8.74M
  RETURN_NOT_OK(ParseQueryParameters(&params_));
714
8.74M
  return Status::OK();
715
8.74M
}
716
717
//----------------------------------------------------------------------------------------
718
1.62k
BatchRequest::BatchRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
719
1.62k
}
720
721
1.62k
BatchRequest::~BatchRequest() {
722
1.62k
}
723
724
1.62k
Status BatchRequest::ParseBody() {
725
1.62k
  uint8_t type = 0;
726
1.62k
  RETURN_NOT_OK(ParseByte(&type));
727
1.62k
  type_ = static_cast<Type>(type);
728
1.62k
  uint16_t query_count = 0;
729
1.62k
  RETURN_NOT_OK(ParseShort(&query_count));
730
1.62k
  queries_.resize(query_count);
731
370k
  for (uint16_t i = 0; i < query_count; 
++i368k
) {
732
368k
    Query& query = queries_[i];
733
368k
    uint8_t is_prepared_query = 0;
734
368k
    RETURN_NOT_OK(ParseByte(&is_prepared_query));
735
368k
    switch (is_prepared_query) {
736
218
      case 0:
737
218
        query.is_prepared = false;
738
218
        RETURN_NOT_OK(ParseLongString(&query.query));
739
218
        break;
740
368k
      case 1:
741
368k
        query.is_prepared = true;
742
368k
        RETURN_NOT_OK(ParseShortBytes(&query.query_id));
743
368k
        break;
744
368k
      default:
745
0
        return STATUS(NetworkError, "Invalid is_prepared_query byte in batch request");
746
0
        break;
747
368k
    }
748
368k
    uint16_t value_count = 0;
749
368k
    RETURN_NOT_OK(ParseShort(&value_count));
750
368k
    query.params.values.resize(value_count);
751
1.46M
    for (uint16_t j = 0; j < value_count; 
++j1.09M
) {
752
      // with_name is not possible in the protocol due to a design flaw. See JIRA CASSANDRA-10246.
753
1.09M
      RETURN_NOT_OK(ParseValue(false /* with_name */, &query.params.values[j]));
754
1.09M
    }
755
368k
  }
756
757
1.58k
  Consistency consistency = Consistency::ANY;
758
1.58k
  QueryParameters::Flags flags = 0;
759
1.58k
  Consistency serial_consistency = Consistency::ANY;
760
1.58k
  int64_t default_timestamp = 0;
761
1.58k
  RETURN_NOT_OK(ParseConsistency(&consistency));
762
1.58k
  RETURN_NOT_OK(ParseByte(&flags));
763
1.58k
  if (flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) {
764
0
    RETURN_NOT_OK(ParseConsistency(&serial_consistency));
765
0
  }
766
1.58k
  if (flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) {
767
261
    RETURN_NOT_OK(ParseLong(&default_timestamp));
768
261
  }
769
770
368k
  
for (Query& query : queries_)1.58k
{
771
368k
    QueryParameters& params = query.params;
772
368k
    params.consistency = consistency;
773
368k
    params.flags = flags;
774
368k
    params.serial_consistency = serial_consistency;
775
368k
    params.default_timestamp = default_timestamp;
776
368k
  }
777
1.58k
  return Status::OK();
778
1.58k
}
779
780
//----------------------------------------------------------------------------------------
781
RegisterRequest::RegisterRequest(const Header& header, const Slice& body)
782
2.31k
    : CQLRequest(header, body) {
783
2.31k
}
784
785
2.31k
RegisterRequest::~RegisterRequest() {
786
2.31k
}
787
788
2.31k
Status RegisterRequest::ParseBody() {
789
2.31k
  vector<string> event_types;
790
2.31k
  RETURN_NOT_OK(ParseStringList(&event_types));
791
2.31k
  events_ = kNoEvents;
792
793
6.95k
  for (const string& event_type : event_types) {
794
6.95k
    if (event_type == kTopologyChangeEvent) {
795
2.31k
      events_ |= kTopologyChange;
796
4.63k
    } else if (event_type == kStatusChangeEvent) {
797
2.31k
      events_ |= kStatusChange;
798
2.31k
    } else if (event_type == kSchemaChangeEvent) {
799
2.31k
      events_ |= kSchemaChange;
800
2.31k
    } else {
801
0
      return STATUS(NetworkError, "Invalid event type in register request");
802
0
    }
803
6.95k
  }
804
805
2.31k
  return Status::OK();
806
2.31k
}
807
808
// --------------------------- Serialization utility functions -------------------------------
809
namespace {
810
811
// Serialize a CQL number (8, 16, 32 and 64-bit integer). <num_type> is the integer type.
812
// <converter> converts the number from machine byte-order to network order and <data_type>
813
// is the coverter's return type. The converter's return type <data_type> is unsigned while
814
// <num_type> may be signed or unsigned.
815
template<typename num_type, typename data_type>
816
26.3M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
26.3M
  data_type byte_value;
818
26.3M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
26.3M
  mesg->append(&byte_value, sizeof(byte_value));
820
26.3M
}
cql_message.cc:void yb::ql::(anonymous namespace)::SerializeNum<int, unsigned int>(void (*)(void*, unsigned int), int, yb::faststring*)
Line
Count
Source
816
23.7M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
23.7M
  data_type byte_value;
818
23.7M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
23.7M
  mesg->append(&byte_value, sizeof(byte_value));
820
23.7M
}
cql_message.cc:void yb::ql::(anonymous namespace)::SerializeNum<unsigned short, unsigned short>(void (*)(void*, unsigned short), unsigned short, yb::faststring*)
Line
Count
Source
816
2.43M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
2.43M
  data_type byte_value;
818
2.43M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
2.43M
  mesg->append(&byte_value, sizeof(byte_value));
820
2.43M
}
cql_message.cc:void yb::ql::(anonymous namespace)::SerializeNum<unsigned char, unsigned char>(void (*)(void*, unsigned char), unsigned char, yb::faststring*)
Line
Count
Source
816
176k
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
176k
  data_type byte_value;
818
176k
  (*converter)(&byte_value, static_cast<data_type>(val));
819
176k
  mesg->append(&byte_value, sizeof(byte_value));
820
176k
}
821
822
// Serialize a CQL byte stream (string or bytes). <len_type> is the length type.
823
// <len_serializer> serializes the byte length from machine byte-order to network order.
824
template<typename len_type>
825
inline void SerializeBytes(
826
    void (*len_serializer)(len_type, faststring* mesg), string val,
827
1.43M
    faststring* mesg) {
828
1.43M
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
1.43M
  mesg->append(val);
830
1.43M
}
cql_message.cc:void yb::ql::(anonymous namespace)::SerializeBytes<unsigned short>(void (*)(unsigned short, yb::faststring*), std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::faststring*)
Line
Count
Source
827
1.42M
    faststring* mesg) {
828
1.42M
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
1.42M
  mesg->append(val);
830
1.42M
}
cql_message.cc:void yb::ql::(anonymous namespace)::SerializeBytes<int>(void (*)(int, yb::faststring*), std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::faststring*)
Line
Count
Source
827
6.44k
    faststring* mesg) {
828
6.44k
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
6.44k
  mesg->append(val);
830
6.44k
}
831
832
176k
inline void SerializeByte(const uint8_t value, faststring* mesg) {
833
176k
  static_assert(sizeof(value) == CQLMessage::kByteSize, "inconsistent byte size");
834
176k
  SerializeNum(Store8, value, mesg);
835
176k
}
836
837
2.43M
inline void SerializeShort(const uint16_t value, faststring* mesg) {
838
2.43M
  static_assert(sizeof(value) == CQLMessage::kShortSize, "inconsistent short size");
839
2.43M
  SerializeNum(NetworkByteOrder::Store16, value, mesg);
840
2.43M
}
841
842
23.6M
inline void SerializeInt(const int32_t value, faststring* mesg) {
843
23.6M
  static_assert(sizeof(value) == CQLMessage::kIntSize, "inconsistent int size");
844
23.6M
  SerializeNum(NetworkByteOrder::Store32, value, mesg);
845
23.6M
}
846
847
#if 0 // Save this function for future use
848
inline void SerializeLong(const int64_t value, faststring* mesg) {
849
  static_assert(sizeof(value) == CQLMessage::kLongSize, "inconsistent long size");
850
  SerializeNum(NetworkByteOrder::Store64, value, mesg);
851
}
852
#endif
853
854
1.41M
inline void SerializeString(const string& value, faststring* mesg) {
855
1.41M
  SerializeBytes(&SerializeShort, value, mesg);
856
1.41M
}
857
858
#if 0 // Save these functions for future use
859
inline void SerializeLongString(const string& value, faststring* mesg) {
860
  SerializeBytes(&SerializeInt, value, mesg);
861
}
862
#endif
863
864
6.70k
inline void SerializeShortBytes(const string& value, faststring* mesg) {
865
6.70k
  SerializeBytes(&SerializeShort, value, mesg);
866
6.70k
}
867
868
6.44k
inline void SerializeBytes(const string& value, faststring* mesg) {
869
6.44k
  SerializeBytes(&SerializeInt, value, mesg);
870
6.44k
}
871
872
#if 0 // Save these functions for future use
873
inline void SerializeConsistency(const CQLMessage::Consistency consistency, faststring* mesg) {
874
  static_assert(
875
      sizeof(consistency) == CQLMessage::kConsistencySize,
876
      "inconsistent consistency size");
877
  SerializeNum(NetworkByteOrder::Store16, consistency, mesg);
878
}
879
880
void SerializeUUID(const string& value, faststring* mesg) {
881
  if (value.size() == CQLMessage::kUUIDSize) {
882
    mesg->append(value);
883
  } else {
884
    LOG(ERROR) << "Internal error: inconsistent UUID size: " << value.size();
885
    uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0};
886
    mesg->append(empty_uuid, sizeof(empty_uuid));
887
  }
888
}
889
890
void SerializeTimeUUID(const string& value, faststring* mesg) {
891
  if (value.size() == CQLMessage::kUUIDSize) {
892
    mesg->append(value);
893
  } else {
894
    LOG(ERROR) << "Internal error: inconsistent TimeUUID size: " << value.size();
895
    uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0};
896
    mesg->append(empty_uuid, sizeof(empty_uuid));
897
  }
898
}
899
#endif
900
901
5.15k
void SerializeStringList(const vector<string>& list, faststring* mesg) {
902
5.15k
  SerializeShort(list.size(), mesg);
903
10.3k
  for (const auto& entry : list) {
904
10.3k
    SerializeString(entry, mesg);
905
10.3k
  }
906
5.15k
}
907
908
176k
void SerializeInet(const Endpoint& value, faststring* mesg) {
909
176k
  auto address = value.address();
910
176k
  if (address.is_v4()) {
911
176k
    auto bytes = address.to_v4().to_bytes();
912
176k
    SerializeByte(bytes.size(), mesg);
913
176k
    mesg->append(bytes.data(), bytes.size());
914
176k
  } else {
915
0
    auto bytes = address.to_v6().to_bytes();
916
0
    SerializeByte(bytes.size(), mesg);
917
0
    mesg->append(bytes.data(), bytes.size());
918
0
  }
919
176k
  const uint16_t port = value.port();
920
176k
  SerializeInt(port, mesg);
921
176k
}
922
923
#if 0 // Save these functions for future use
924
void SerializeStringMap(const unordered_map<string, string>& map, faststring* mesg) {
925
  SerializeShort(map.size(), mesg);
926
  for (const auto& element : map) {
927
    SerializeString(element.first, mesg);
928
    SerializeString(element.second, mesg);
929
  }
930
}
931
#endif
932
933
2.57k
void SerializeStringMultiMap(const unordered_map<string, vector<string>>& map, faststring* mesg) {
934
2.57k
  SerializeShort(map.size(), mesg);
935
5.15k
  for (const auto& element : map) {
936
5.15k
    SerializeString(element.first, mesg);
937
5.15k
    SerializeStringList(element.second, mesg);
938
5.15k
  }
939
2.57k
}
940
941
#if 0 // Save these functions for future use
942
void SerializeBytesMap(const unordered_map<string, string>& map, faststring* mesg) {
943
  SerializeShort(map.size(), mesg);
944
  for (const auto& element : map) {
945
    SerializeString(element.first, mesg);
946
    SerializeBytes(element.second, mesg);
947
  }
948
}
949
950
void SerializeValue(const CQLMessage::Value& value, faststring* mesg) {
951
  switch (value.kind) {
952
    case CQLMessage::Value::Kind::NOT_NULL:
953
      SerializeInt(value.value.size(), mesg);
954
      mesg->append(value.value);
955
      return;
956
    case CQLMessage::Value::Kind::IS_NULL:
957
      SerializeInt(-1, mesg);
958
      return;
959
    case CQLMessage::Value::Kind::NOT_SET: // NOT_SET value kind should appear in request msg only.
960
      break;
961
      // default: fall through
962
  }
963
  LOG(ERROR) << "Internal error: invalid/unknown value kind " << static_cast<uint32_t>(value.kind);
964
  SerializeInt(-1, mesg);
965
}
966
#endif
967
968
} // namespace
969
970
// ------------------------------------ CQL response -----------------------------------
971
CQLResponse::CQLResponse(const CQLRequest& request, const Opcode opcode)
972
8.93M
    : CQLMessage(Header(request.version() | kResponseVersion, 0, request.stream_id(), opcode)) {
973
8.93M
}
974
975
CQLResponse::CQLResponse(const StreamId stream_id, const Opcode opcode)
976
176k
    : CQLMessage(Header(kCurrentVersion | kResponseVersion, 0, stream_id, opcode)) {
977
176k
}
978
979
9.12M
CQLResponse::~CQLResponse() {
980
9.12M
}
981
982
// Short-hand macros for serializing fields from the message header
983
#define SERIALIZE_BYTE(buf, pos, value) \
984
27.2M
  Store8(&(buf)[pos], static_cast<uint8_t>(
value18.1M
))
985
#define SERIALIZE_SHORT(buf, pos, value) \
986
9.09M
  NetworkByteOrder::Store16(&(buf)[pos], static_cast<uint16_t>(value))
987
#define SERIALIZE_INT(buf, pos, value) \
988
18.2M
  NetworkByteOrder::Store32(&(buf)[pos], static_cast<int32_t>(value))
989
#define SERIALIZE_LONG(buf, pos, value) \
990
  NetworkByteOrder::Store64(&(buf)[pos], static_cast<int64_t>(value))
991
992
9.11M
void CQLResponse::Serialize(const CompressionScheme compression_scheme, faststring* mesg) const {
993
9.11M
  const size_t start_pos = mesg->size(); // save the start position
994
9.11M
  const bool compress = (compression_scheme != CQLMessage::CompressionScheme::kNone);
995
9.11M
  SerializeHeader(compress, mesg);
996
9.11M
  if (compress) {
997
357
    faststring body;
998
357
    SerializeBody(&body);
999
357
    switch (compression_scheme) {
1000
228
      case CQLMessage::CompressionScheme::kLz4: {
1001
228
        SerializeInt(static_cast<int32_t>(body.size()), mesg);
1002
228
        const size_t curr_size = mesg->size();
1003
228
        const int max_comp_size = LZ4_compressBound(narrow_cast<int>(body.size()));
1004
228
        mesg->resize(curr_size + max_comp_size);
1005
228
        const int comp_size = LZ4_compress_default(to_char_ptr(body.data()),
1006
228
                                                   to_char_ptr(mesg->data() + curr_size),
1007
228
                                                   narrow_cast<int>(body.size()),
1008
228
                                                   max_comp_size);
1009
228
        CHECK_NE
(comp_size, 0) << "LZ4 compression failed"0
;
1010
228
        mesg->resize(curr_size + comp_size);
1011
228
        break;
1012
0
      }
1013
135
      case CQLMessage::CompressionScheme::kSnappy: {
1014
135
        const size_t curr_size = mesg->size();
1015
135
        const size_t max_comp_size = MaxCompressedLength(body.size());
1016
135
        size_t comp_size = 0;
1017
135
        mesg->resize(curr_size + max_comp_size);
1018
135
        RawCompress(to_char_ptr(body.data()), body.size(),
1019
135
                    to_char_ptr(mesg->data() + curr_size), &comp_size);
1020
135
        mesg->resize(curr_size + comp_size);
1021
135
        break;
1022
0
      }
1023
0
      case CQLMessage::CompressionScheme::kNone:
1024
0
        LOG(FATAL) << "No compression scheme";
1025
0
        break;
1026
357
    }
1027
9.11M
  } else {
1028
9.11M
    SerializeBody(mesg);
1029
9.11M
  }
1030
9.11M
  SERIALIZE_INT(
1031
9.11M
      mesg->data(), start_pos + kHeaderPosLength, mesg->size() - start_pos - kMessageHeaderLength);
1032
9.11M
}
1033
1034
9.09M
void CQLResponse::SerializeHeader(const bool compress, faststring* mesg) const {
1035
9.09M
  uint8_t buffer[kMessageHeaderLength];
1036
9.09M
  SERIALIZE_BYTE(buffer, kHeaderPosVersion, version());
1037
9.09M
  SERIALIZE_BYTE(buffer, kHeaderPosFlags, flags() | (compress ? kCompressionFlag : 0));
1038
9.09M
  SERIALIZE_SHORT(buffer, kHeaderPosStreamId, stream_id());
1039
9.09M
  SERIALIZE_INT(buffer, kHeaderPosLength, 0);
1040
9.09M
  SERIALIZE_BYTE(buffer, kHeaderPosOpcode, opcode());
1041
9.09M
  mesg->append(buffer, sizeof(buffer));
1042
9.09M
}
1043
1044
#undef SERIALIZE_BYTE
1045
#undef SERIALIZE_SHORT
1046
#undef SERIALIZE_INT
1047
#undef SERIALIZE_LONG
1048
1049
// ------------------------------ Individual CQL responses -----------------------------------
1050
ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const string& message)
1051
5.26k
    : CQLResponse(request, Opcode::ERROR), code_(code), message_(message) {
1052
5.26k
}
1053
1054
ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const Status& status)
1055
    : CQLResponse(request, Opcode::ERROR), code_(code),
1056
0
      message_(to_char_ptr(status.message().data())) {
1057
0
}
1058
1059
ErrorResponse::ErrorResponse(const StreamId stream_id, const Code code, const string& message)
1060
32
    : CQLResponse(stream_id, Opcode::ERROR), code_(code), message_(message) {
1061
32
}
1062
1063
5.30k
ErrorResponse::~ErrorResponse() {
1064
5.30k
}
1065
1066
5.29k
void ErrorResponse::SerializeBody(faststring* mesg) const {
1067
5.29k
  SerializeInt(static_cast<int32_t>(code_), mesg);
1068
5.29k
  SerializeString(message_, mesg);
1069
5.29k
  SerializeErrorBody(mesg);
1070
5.29k
}
1071
1072
4.10k
void ErrorResponse::SerializeErrorBody(faststring* mesg) const {
1073
4.10k
}
1074
1075
//----------------------------------------------------------------------------------------
1076
UnpreparedErrorResponse::UnpreparedErrorResponse(const CQLRequest& request, const QueryId& query_id)
1077
1.19k
    : ErrorResponse(request, Code::UNPREPARED, "Unprepared query"), query_id_(query_id) {
1078
1.19k
}
1079
1080
1.19k
UnpreparedErrorResponse::~UnpreparedErrorResponse() {
1081
1.19k
}
1082
1083
1.19k
void UnpreparedErrorResponse::SerializeErrorBody(faststring* mesg) const {
1084
1.19k
  SerializeShortBytes(query_id_, mesg);
1085
1.19k
}
1086
1087
//----------------------------------------------------------------------------------------
1088
6.05k
ReadyResponse::ReadyResponse(const CQLRequest& request) : CQLResponse(request, Opcode::READY) {
1089
6.05k
}
1090
1091
6.05k
ReadyResponse::~ReadyResponse() {
1092
6.05k
}
1093
1094
6.04k
void ReadyResponse::SerializeBody(faststring* mesg) const {
1095
  // Ready body is empty
1096
6.04k
}
1097
1098
//----------------------------------------------------------------------------------------
1099
AuthenticateResponse::AuthenticateResponse(const CQLRequest& request, const string& authenticator)
1100
6.30k
    : CQLResponse(request, Opcode::AUTHENTICATE), authenticator_(authenticator) {
1101
6.30k
}
1102
1103
6.30k
AuthenticateResponse::~AuthenticateResponse() {
1104
6.30k
}
1105
1106
6.30k
void AuthenticateResponse::SerializeBody(faststring* mesg) const {
1107
6.30k
  SerializeString(authenticator_, mesg);
1108
6.30k
}
1109
1110
//----------------------------------------------------------------------------------------
1111
SupportedResponse::SupportedResponse(const CQLRequest& request,
1112
                                     const unordered_map<string, vector<string>>* options)
1113
2.57k
    : CQLResponse(request, Opcode::SUPPORTED), options_(options) {
1114
2.57k
}
1115
1116
2.57k
SupportedResponse::~SupportedResponse() {
1117
2.57k
}
1118
1119
2.57k
void SupportedResponse::SerializeBody(faststring* mesg) const {
1120
2.57k
  SerializeStringMultiMap(*options_, mesg);
1121
2.57k
}
1122
1123
//----------------------------------------------------------------------------------------
1124
ResultResponse::ResultResponse(const CQLRequest& request, const Kind kind)
1125
8.90M
  : CQLResponse(request, Opcode::RESULT), kind_(kind) {
1126
8.90M
}
1127
1128
8.92M
ResultResponse::~ResultResponse() {
1129
8.92M
}
1130
1131
0
ResultResponse::RowsMetadata::Type::Type(const Id id) : id(id) {
1132
0
  switch (id) {
1133
    // Verify that the type id is a primitive type indeed.
1134
0
    case Id::ASCII:
1135
0
    case Id::BIGINT:
1136
0
    case Id::BLOB:
1137
0
    case Id::BOOLEAN:
1138
0
    case Id::COUNTER:
1139
0
    case Id::DECIMAL:
1140
0
    case Id::DOUBLE:
1141
0
    case Id::FLOAT:
1142
0
    case Id::INT:
1143
0
    case Id::TIMESTAMP:
1144
0
    case Id::UUID:
1145
0
    case Id::VARCHAR:
1146
0
    case Id::VARINT:
1147
0
    case Id::TIMEUUID:
1148
0
    case Id::INET:
1149
0
    case Id::JSONB:
1150
0
    case Id::DATE:
1151
0
    case Id::TIME:
1152
0
    case Id::SMALLINT:
1153
0
    case Id::TINYINT:
1154
0
      return;
1155
1156
    // Non-primitive types
1157
0
    case Id::CUSTOM:
1158
0
    case Id::LIST:
1159
0
    case Id::MAP:
1160
0
    case Id::SET:
1161
0
    case Id::UDT:
1162
0
    case Id::TUPLE:
1163
0
      break;
1164
1165
    // default: fall through
1166
0
  }
1167
1168
0
  LOG(ERROR) << "Internal error: invalid/unknown primitive type id " << static_cast<uint32_t>(id);
1169
0
}
1170
1171
// These union members in Type below are not initialized by default. They need to be explicitly
1172
// initialized using the new() operator in the Type constructors.
1173
0
ResultResponse::RowsMetadata::Type::Type(const string& custom_class_name) : id(Id::CUSTOM) {
1174
0
  new(&this->custom_class_name) string(custom_class_name);
1175
0
}
1176
1177
ResultResponse::RowsMetadata::Type::Type(const Id id, shared_ptr<const Type> element_type)
1178
0
    : id(id) {
1179
0
  switch (id) {
1180
0
    case Id::LIST:
1181
0
    case Id::SET:
1182
0
      new(&this->element_type) shared_ptr<const Type>(element_type);
1183
0
      return;
1184
1185
    // Not list nor map
1186
0
    case Id::CUSTOM:
1187
0
    case Id::ASCII:
1188
0
    case Id::BIGINT:
1189
0
    case Id::BLOB:
1190
0
    case Id::BOOLEAN:
1191
0
    case Id::COUNTER:
1192
0
    case Id::DECIMAL:
1193
0
    case Id::DOUBLE:
1194
0
    case Id::FLOAT:
1195
0
    case Id::INT:
1196
0
    case Id::TIMESTAMP:
1197
0
    case Id::UUID:
1198
0
    case Id::VARCHAR:
1199
0
    case Id::VARINT:
1200
0
    case Id::TIMEUUID:
1201
0
    case Id::INET:
1202
0
    case Id::JSONB:
1203
0
    case Id::DATE:
1204
0
    case Id::TIME:
1205
0
    case Id::SMALLINT:
1206
0
    case Id::TINYINT:
1207
0
    case Id::MAP:
1208
0
    case Id::UDT:
1209
0
    case Id::TUPLE:
1210
0
      break;
1211
1212
    // default: fall through
1213
0
  }
1214
1215
0
  LOG(ERROR) << "Internal error: invalid/unknown list/map type id " << static_cast<uint32_t>(id);
1216
0
}
1217
1218
0
ResultResponse::RowsMetadata::Type::Type(shared_ptr<const MapType> map_type) : id(Id::MAP) {
1219
0
  new(&this->map_type) shared_ptr<const MapType>(map_type);
1220
0
}
1221
1222
0
ResultResponse::RowsMetadata::Type::Type(shared_ptr<const UDTType> udt_type) : id(Id::UDT) {
1223
0
  new(&this->udt_type) shared_ptr<const UDTType>(udt_type);
1224
0
}
1225
1226
ResultResponse::RowsMetadata::Type::Type(
1227
0
    shared_ptr<const TupleComponentTypes> tuple_component_types) : id(Id::TUPLE) {
1228
0
  new(&this->tuple_component_types) shared_ptr<const TupleComponentTypes>(tuple_component_types);
1229
0
}
1230
1231
1.00M
ResultResponse::RowsMetadata::Type::Type(const Type& t) : id(t.id) {
1232
1.00M
  switch (id) {
1233
0
    case Id::CUSTOM:
1234
0
      new(&this->custom_class_name) string(t.custom_class_name);
1235
0
      return;
1236
0
    case Id::ASCII:
1237
2.39k
    case Id::BIGINT:
1238
46.8k
    case Id::BLOB:
1239
81.8k
    case Id::BOOLEAN:
1240
81.8k
    case Id::COUNTER:
1241
83.6k
    case Id::DECIMAL:
1242
131k
    case Id::DOUBLE:
1243
131k
    case Id::FLOAT:
1244
254k
    case Id::INT:
1245
255k
    case Id::TIMESTAMP:
1246
341k
    case Id::UUID:
1247
820k
    case Id::VARCHAR:
1248
821k
    case Id::VARINT:
1249
821k
    case Id::TIMEUUID:
1250
884k
    case Id::INET:
1251
885k
    case Id::JSONB:
1252
885k
    case Id::DATE:
1253
885k
    case Id::TIME:
1254
885k
    case Id::SMALLINT:
1255
885k
    case Id::TINYINT:
1256
885k
      return;
1257
22.2k
    case Id::LIST:
1258
41.6k
    case Id::SET:
1259
41.6k
      new(&element_type) shared_ptr<const Type>(t.element_type);
1260
41.6k
      return;
1261
85.4k
    case Id::MAP:
1262
85.4k
      new(&map_type) shared_ptr<const MapType>(t.map_type);
1263
85.4k
      return;
1264
189
    case Id::UDT:
1265
189
      new(&udt_type) shared_ptr<const UDTType>(t.udt_type);
1266
189
      return;
1267
0
    case Id::TUPLE:
1268
0
      new(&tuple_component_types) shared_ptr<const TupleComponentTypes>(t.tuple_component_types);
1269
0
      return;
1270
1271
    // default: fall through
1272
1.00M
  }
1273
1274
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id);
1275
0
}
1276
1277
1.00M
ResultResponse::RowsMetadata::Type::Type(const shared_ptr<QLType>& ql_type) {
1278
1.00M
  auto type = ql_type;
1279
1.00M
  if (type->IsFrozen()) {
1280
295
    type = ql_type->param_type(0);
1281
295
  }
1282
1.00M
  switch (type->main()) {
1283
128
    case DataType::INT8:
1284
128
      id = Id::TINYINT;
1285
128
      return;
1286
135
    case DataType::INT16:
1287
135
      id = Id::SMALLINT;
1288
135
      return;
1289
122k
    case DataType::INT32:
1290
122k
      id = Id::INT;
1291
122k
      return;
1292
2.39k
    case DataType::INT64:
1293
2.39k
      id = Id::BIGINT;
1294
2.39k
      return;
1295
922
    case DataType::VARINT:
1296
922
      id = Id::VARINT;
1297
922
      return;
1298
255
    case DataType::FLOAT:
1299
255
      id = Id::FLOAT;
1300
255
      return;
1301
47.3k
    case DataType::DOUBLE:
1302
47.3k
      id = Id::DOUBLE;
1303
47.3k
      return;
1304
478k
    case DataType::STRING:
1305
478k
      id = Id::VARCHAR;
1306
478k
      return;
1307
35.0k
    case DataType::BOOL:
1308
35.0k
      id = Id::BOOLEAN;
1309
35.0k
      return;
1310
1.30k
    case DataType::TIMESTAMP:
1311
1.30k
      id = Id::TIMESTAMP;
1312
1.30k
      return;
1313
81
    case DataType::DATE:
1314
81
      id = Id::DATE;
1315
81
      return;
1316
38
    case DataType::TIME:
1317
38
      id = Id::TIME;
1318
38
      return;
1319
63.4k
    case DataType::INET:
1320
63.4k
      id = Id::INET;
1321
63.4k
      return;
1322
531
    case DataType::JSONB:
1323
531
      id = Id::JSONB;
1324
531
      return;
1325
85.9k
    case DataType::UUID:
1326
85.9k
      id = Id::UUID;
1327
85.9k
      return;
1328
39
    case DataType::TIMEUUID:
1329
39
      id = Id::TIMEUUID;
1330
39
      return;
1331
44.4k
    case DataType::BINARY:
1332
44.4k
      id = Id::BLOB;
1333
44.4k
      return;
1334
1.79k
    case DataType::DECIMAL:
1335
1.79k
      id = Id::DECIMAL;
1336
1.79k
      return;
1337
22.0k
    case DataType::LIST:
1338
22.0k
      id = Id::LIST;
1339
22.0k
      new (&element_type) shared_ptr<const Type>(
1340
22.0k
          std::make_shared<const Type>(Type(type->param_type(0))));
1341
22.0k
      return;
1342
19.4k
    case DataType::SET:
1343
19.4k
      id = Id::SET;
1344
19.4k
      new (&element_type) shared_ptr<const Type>(
1345
19.4k
          std::make_shared<const Type>(Type(type->param_type(0))));
1346
19.4k
      return;
1347
85.4k
    case DataType::MAP: {
1348
85.4k
      id = Id::MAP;
1349
85.4k
      auto key = std::make_shared<const Type>(Type(type->param_type(0)));
1350
85.4k
      auto value = std::make_shared<const Type>(Type(type->param_type(1)));
1351
85.4k
      new (&map_type) shared_ptr<const MapType>(std::make_shared<MapType>(MapType{key, value}));
1352
85.4k
      return;
1353
0
    }
1354
189
    case DataType::USER_DEFINED_TYPE: {
1355
189
      id = Id::UDT;
1356
189
      std::vector<UDTType::Field> fields;
1357
553
      for (size_t i = 0; i < type->params().size(); 
i++364
) {
1358
364
        auto field_type = std::make_shared<const Type>(Type(type->param_type(i)));
1359
364
        UDTType::Field field{type->udtype_field_name(i), field_type};
1360
364
        fields.push_back(std::move(field));
1361
364
      }
1362
189
      new(&udt_type) shared_ptr<const UDTType>(std::make_shared<UDTType>(
1363
189
          UDTType{type->udtype_keyspace_name(), type->udtype_name(), fields}));
1364
189
      return;
1365
0
    }
1366
0
    case DataType::FROZEN: FALLTHROUGH_INTENDED;
1367
0
    QL_UNSUPPORTED_TYPES_IN_SWITCH: FALLTHROUGH_INTENDED;
1368
0
    QL_INVALID_TYPES_IN_SWITCH:
1369
0
      break;
1370
1371
    // default: fall through
1372
1.00M
  }
1373
1374
0
  LOG(ERROR) << "Internal error: invalid/unsupported type " << type->ToString();
1375
0
}
1376
1377
2.01M
ResultResponse::RowsMetadata::Type::~Type() {
1378
2.01M
  switch (id) {
1379
0
    case Id::CUSTOM:
1380
0
      custom_class_name.~basic_string();
1381
0
      return;
1382
0
    case Id::ASCII:
1383
4.79k
    case Id::BIGINT:
1384
93.6k
    case Id::BLOB:
1385
163k
    case Id::BOOLEAN:
1386
163k
    case Id::COUNTER:
1387
167k
    case Id::DECIMAL:
1388
262k
    case Id::DOUBLE:
1389
262k
    case Id::FLOAT:
1390
508k
    case Id::INT:
1391
510k
    case Id::TIMESTAMP:
1392
682k
    case Id::UUID:
1393
1.64M
    case Id::VARCHAR:
1394
1.64M
    case Id::VARINT:
1395
1.64M
    case Id::TIMEUUID:
1396
1.76M
    case Id::INET:
1397
1.77M
    case Id::JSONB:
1398
1.77M
    case Id::DATE:
1399
1.77M
    case Id::TIME:
1400
1.77M
    case Id::SMALLINT:
1401
1.77M
    case Id::TINYINT:
1402
1.77M
      return;
1403
44.3k
    case Id::LIST:
1404
83.1k
    case Id::SET:
1405
83.1k
      element_type.reset();
1406
83.1k
      return;
1407
170k
    case Id::MAP:
1408
170k
      map_type.reset();
1409
170k
      return;
1410
378
    case Id::UDT:
1411
378
      udt_type.reset();
1412
378
      return;
1413
0
    case Id::TUPLE:
1414
0
      tuple_component_types.reset();
1415
0
      return;
1416
1417
    // default: fall through
1418
2.01M
  }
1419
1420
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id);
1421
0
}
1422
1423
ResultResponse::RowsMetadata::RowsMetadata()
1424
    : flags(kNoMetadata),
1425
      paging_state(""),
1426
      global_table_spec("" /* keyspace */, "" /* table_name */),
1427
4.94k
      col_count(0) {
1428
4.94k
}
1429
1430
ResultResponse::RowsMetadata::RowsMetadata(const client::YBTableName& table_name,
1431
                                           const vector<ColumnSchema>& columns,
1432
                                           const string& paging_state,
1433
                                           bool no_metadata)
1434
    : flags((no_metadata ? kNoMetadata : kHasGlobalTableSpec) |
1435
            (!paging_state.empty() ? kHasMorePages : 0)),
1436
      paging_state(paging_state),
1437
      global_table_spec(no_metadata ? "" : table_name.namespace_name(),
1438
                        no_metadata ? "" : table_name.table_name()),
1439
7.31M
      col_count(narrow_cast<int>(columns.size())) {
1440
7.31M
  if (!no_metadata) {
1441
96.0k
    col_specs.reserve(col_count);
1442
781k
    for (const auto& column : columns) {
1443
781k
      col_specs.emplace_back(column.name(), Type(column.type()));
1444
781k
    }
1445
96.0k
  }
1446
7.31M
}
1447
1448
8.90M
void ResultResponse::SerializeBody(faststring* mesg) const {
1449
8.90M
  SerializeInt(static_cast<int32_t>(kind_), mesg);
1450
8.90M
  SerializeResultBody(mesg);
1451
8.90M
}
1452
1453
1.01M
void ResultResponse::SerializeType(const RowsMetadata::Type* type, faststring* mesg) const {
1454
1.01M
  SerializeShort(static_cast<uint16_t>(type->id), mesg);
1455
1.01M
  switch (type->id) {
1456
0
    case RowsMetadata::Type::Id::CUSTOM:
1457
0
      SerializeString(type->custom_class_name, mesg);
1458
0
      return;
1459
0
    case RowsMetadata::Type::Id::ASCII:
1460
2.39k
    case RowsMetadata::Type::Id::BIGINT:
1461
46.8k
    case RowsMetadata::Type::Id::BLOB:
1462
81.8k
    case RowsMetadata::Type::Id::BOOLEAN:
1463
81.8k
    case RowsMetadata::Type::Id::COUNTER:
1464
83.6k
    case RowsMetadata::Type::Id::DECIMAL:
1465
131k
    case RowsMetadata::Type::Id::DOUBLE:
1466
131k
    case RowsMetadata::Type::Id::FLOAT:
1467
254k
    case RowsMetadata::Type::Id::INT:
1468
255k
    case RowsMetadata::Type::Id::TIMESTAMP:
1469
341k
    case RowsMetadata::Type::Id::UUID:
1470
820k
    case RowsMetadata::Type::Id::VARCHAR:
1471
821k
    case RowsMetadata::Type::Id::VARINT:
1472
821k
    case RowsMetadata::Type::Id::TIMEUUID:
1473
884k
    case RowsMetadata::Type::Id::INET:
1474
885k
    case RowsMetadata::Type::Id::JSONB:
1475
885k
    case RowsMetadata::Type::Id::DATE:
1476
885k
    case RowsMetadata::Type::Id::TIME:
1477
885k
    case RowsMetadata::Type::Id::SMALLINT:
1478
885k
    case RowsMetadata::Type::Id::TINYINT:
1479
885k
      return;
1480
22.2k
    case RowsMetadata::Type::Id::LIST:
1481
41.6k
    case RowsMetadata::Type::Id::SET:
1482
41.6k
      SerializeType(type->element_type.get(), mesg);
1483
41.6k
      return;
1484
85.4k
    case RowsMetadata::Type::Id::MAP:
1485
85.4k
      SerializeType(type->map_type->key_type.get(), mesg);
1486
85.4k
      SerializeType(type->map_type->value_type.get(), mesg);
1487
85.4k
      return;
1488
189
    case RowsMetadata::Type::Id::UDT:
1489
189
      SerializeString(type->udt_type->keyspace, mesg);
1490
189
      SerializeString(type->udt_type->name, mesg);
1491
189
      SerializeShort(type->udt_type->fields.size(), mesg);
1492
364
      for (const auto& field : type->udt_type->fields) {
1493
364
        SerializeString(field.name, mesg);
1494
364
        SerializeType(field.type.get(), mesg);
1495
364
      }
1496
189
      return;
1497
0
    case RowsMetadata::Type::Id::TUPLE:
1498
0
      SerializeShort(type->tuple_component_types->size(), mesg);
1499
0
      for (const auto& component_type : *type->tuple_component_types) {
1500
0
        SerializeType(component_type.get(), mesg);
1501
0
      }
1502
0
      return;
1503
1504
    // default: fall through
1505
1.01M
  }
1506
1507
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(type->id);
1508
0
}
1509
1510
void ResultResponse::SerializeColSpecs(
1511
      const bool has_global_table_spec, const RowsMetadata::GlobalTableSpec& global_table_spec,
1512
107k
      const vector<RowsMetadata::ColSpec>& col_specs, faststring* mesg) const {
1513
107k
  if (has_global_table_spec) {
1514
106k
    SerializeString(global_table_spec.keyspace, mesg);
1515
106k
    SerializeString(global_table_spec.table, mesg);
1516
106k
  }
1517
797k
  for (const auto& col_spec : col_specs) {
1518
797k
    if (!has_global_table_spec) {
1519
105
      SerializeString(col_spec.keyspace, mesg);
1520
105
      SerializeString(col_spec.table, mesg);
1521
105
    }
1522
797k
    SerializeString(col_spec.column, mesg);
1523
797k
    SerializeType(&col_spec.type, mesg);
1524
797k
  }
1525
107k
}
1526
1527
7.30M
void ResultResponse::SerializeRowsMetadata(const RowsMetadata& metadata, faststring* mesg) const {
1528
7.30M
  SerializeInt(metadata.flags, mesg);
1529
7.30M
  SerializeInt(metadata.col_count, mesg);
1530
7.30M
  if (metadata.flags & RowsMetadata::kHasMorePages) {
1531
279
    SerializeBytes(metadata.paging_state, mesg);
1532
279
  }
1533
7.30M
  if (metadata.flags & RowsMetadata::kNoMetadata) {
1534
7.21M
    return;
1535
7.21M
  }
1536
85.5k
  CHECK_EQ(metadata.col_count, metadata.col_specs.size());
1537
85.5k
  SerializeColSpecs(
1538
85.5k
      metadata.flags & RowsMetadata::kHasGlobalTableSpec, metadata.global_table_spec,
1539
85.5k
      metadata.col_specs, mesg);
1540
85.5k
}
1541
1542
//----------------------------------------------------------------------------------------
1543
VoidResultResponse::VoidResultResponse(const CQLRequest& request)
1544
1.56M
    : ResultResponse(request, Kind::VOID) {
1545
1.56M
}
1546
1547
1.56M
VoidResultResponse::~VoidResultResponse() {
1548
1.56M
}
1549
1550
1.56M
void VoidResultResponse::SerializeResultBody(faststring* mesg) const {
1551
  // Void result response body is empty
1552
1.56M
}
1553
1554
//----------------------------------------------------------------------------------------
1555
RowsResultResponse::RowsResultResponse(
1556
    const QueryRequest& request, const ql::RowsResult::SharedPtr& result)
1557
    : ResultResponse(request, Kind::ROWS), result_(result),
1558
98.9k
      skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) {
1559
98.9k
}
1560
1561
RowsResultResponse::RowsResultResponse(
1562
    const ExecuteRequest& request, const ql::RowsResult::SharedPtr& result)
1563
    : ResultResponse(request, Kind::ROWS), result_(result),
1564
7.21M
      skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) {
1565
7.21M
}
1566
1567
RowsResultResponse::RowsResultResponse(
1568
    const BatchRequest& request, const ql::RowsResult::SharedPtr& result)
1569
    : ResultResponse(request, Kind::ROWS), result_(result),
1570
52
      skip_metadata_(false) { // Batches don't have the skip_metadata flag.
1571
52
}
1572
1573
7.33M
RowsResultResponse::~RowsResultResponse() {
1574
7.33M
}
1575
1576
7.30M
void RowsResultResponse::SerializeResultBody(faststring* mesg) const {
1577
  // CQL ROWS Response = <metadata><rows_count><rows_content>
1578
7.30M
  SerializeRowsMetadata(
1579
7.30M
      RowsMetadata(result_->table_name(), result_->column_schemas(),
1580
7.30M
                   result_->paging_state(), skip_metadata_), mesg);
1581
1582
  // The <rows_count> (4 bytes) must be in the response in any case, so the 'rows_data()'
1583
  // string in the result must contain it.
1584
18.4E
  LOG_IF(DFATAL, result_->rows_data().size() < 4)
1585
18.4E
      << "Absent rows_count for the CQL ROWS Result Response (rows_data: "
1586
18.4E
      << result_->rows_data().size() << " bytes, expected >= 4)";
1587
7.30M
  mesg->append(result_->rows_data());
1588
7.30M
}
1589
1590
//----------------------------------------------------------------------------------------
1591
515
PreparedResultResponse::PreparedMetadata::PreparedMetadata() {
1592
515
}
1593
1594
PreparedResultResponse::PreparedMetadata::PreparedMetadata(
1595
    const client::YBTableName& table_name, const std::vector<int64_t>& hash_col_indices,
1596
    const vector<client::YBTableName>& bind_table_names,
1597
    const vector<ColumnSchema>& bind_variable_schemas)
1598
    : flags(table_name.empty() ? 0 : kHasGlobalTableSpec),
1599
5.00k
      global_table_spec(table_name.namespace_name(), table_name.table_name()) {
1600
5.00k
  this->pk_indices.reserve(hash_col_indices.size());
1601
6.48k
  for (const size_t index : hash_col_indices) {
1602
6.48k
    this->pk_indices.emplace_back(static_cast<uint16_t>(index));
1603
6.48k
  }
1604
5.00k
  col_specs.reserve(bind_variable_schemas.size());
1605
18.5k
  for (size_t i = 0; i < bind_variable_schemas.size(); 
i++13.5k
) {
1606
13.5k
    const ColumnSchema& var = bind_variable_schemas[i];
1607
13.5k
    if (flags & kHasGlobalTableSpec) {
1608
13.4k
      col_specs.emplace_back(var.name(), RowsMetadata::Type(var.type()));
1609
13.4k
    } else {
1610
101
      col_specs.emplace_back(bind_table_names[i], var.name(), RowsMetadata::Type(var.type()));
1611
101
    }
1612
13.5k
  }
1613
5.00k
}
1614
1615
PreparedResultResponse::PreparedResultResponse(const CQLRequest& request, const QueryId& query_id)
1616
515
    : ResultResponse(request, Kind::PREPARED), query_id_(query_id) {
1617
515
}
1618
1619
PreparedResultResponse::PreparedResultResponse(
1620
    const CQLRequest& request, const QueryId& query_id, const ql::PreparedResult& result)
1621
    : ResultResponse(request, Kind::PREPARED), query_id_(query_id),
1622
      prepared_metadata_(result.table_name(), result.hash_col_indices(),
1623
                         result.bind_table_names(), result.bind_variable_schemas()),
1624
      rows_metadata_(!result.column_schemas().empty() ?
1625
                     RowsMetadata(
1626
                         result.table_name(), result.column_schemas(),
1627
                         "" /* paging_state */, false /* no_metadata */) :
1628
5.00k
                     RowsMetadata()) {
1629
5.00k
}
1630
1631
5.52k
PreparedResultResponse::~PreparedResultResponse() {
1632
5.52k
}
1633
1634
void PreparedResultResponse::SerializePreparedMetadata(
1635
5.51k
    const PreparedMetadata& metadata, faststring* mesg) const {
1636
5.51k
  SerializeInt(metadata.flags, mesg);
1637
5.51k
  SerializeInt(narrow_cast<int32_t>(metadata.col_specs.size()), mesg);
1638
5.51k
  if (
VersionIsCompatible(kV4Version)5.51k
) {
1639
5.51k
    SerializeInt(narrow_cast<int32_t>(metadata.pk_indices.size()), mesg);
1640
6.48k
    for (const auto& pk_index : metadata.pk_indices) {
1641
6.48k
      SerializeShort(pk_index, mesg);
1642
6.48k
    }
1643
5.51k
  }
1644
5.51k
  SerializeColSpecs(
1645
5.51k
      metadata.flags & PreparedMetadata::kHasGlobalTableSpec, metadata.global_table_spec,
1646
5.51k
      metadata.col_specs, mesg);
1647
5.51k
}
1648
1649
5.51k
void PreparedResultResponse::SerializeResultBody(faststring* mesg) const {
1650
5.51k
  SerializeShortBytes(query_id_, mesg);
1651
5.51k
  SerializePreparedMetadata(prepared_metadata_, mesg);
1652
5.51k
  SerializeRowsMetadata(rows_metadata_, mesg);
1653
5.51k
}
1654
1655
//----------------------------------------------------------------------------------------
1656
SetKeyspaceResultResponse::SetKeyspaceResultResponse(
1657
    const CQLRequest& request, const ql::SetKeyspaceResult& result)
1658
4.42k
    : ResultResponse(request, Kind::SET_KEYSPACE), keyspace_(result.keyspace()) {
1659
4.42k
}
1660
1661
4.43k
SetKeyspaceResultResponse::~SetKeyspaceResultResponse() {
1662
4.43k
}
1663
1664
4.42k
void SetKeyspaceResultResponse::SerializeResultBody(faststring* mesg) const {
1665
4.42k
  SerializeString(keyspace_, mesg);
1666
4.42k
}
1667
1668
//----------------------------------------------------------------------------------------
1669
SchemaChangeResultResponse::SchemaChangeResultResponse(
1670
    const CQLRequest& request, const ql::SchemaChangeResult& result)
1671
    : ResultResponse(request, Kind::SCHEMA_CHANGE),
1672
      change_type_(result.change_type()), target_(result.object_type()),
1673
6.28k
      keyspace_(result.keyspace_name()), object_(result.object_name()) {
1674
6.28k
}
1675
1676
6.28k
SchemaChangeResultResponse::~SchemaChangeResultResponse() {
1677
6.28k
}
1678
1679
void SchemaChangeResultResponse::Serialize(const CompressionScheme compression_scheme,
1680
6.28k
                                           faststring* mesg) const {
1681
6.28k
  ResultResponse::Serialize(compression_scheme, mesg);
1682
1683
6.28k
  if (registered_events() & kSchemaChange) {
1684
    // TODO: Replace this hack that piggybacks a SCHEMA_CHANGE event along a SCHEMA_CHANGE result
1685
    // response with a formal event notification mechanism.
1686
4
    SchemaChangeEventResponse event(change_type_, target_, keyspace_, object_, argument_types_);
1687
4
    event.Serialize(compression_scheme, mesg);
1688
4
  }
1689
6.28k
}
1690
1691
6.28k
void SchemaChangeResultResponse::SerializeResultBody(faststring* mesg) const {
1692
6.28k
  SerializeString(change_type_, mesg);
1693
6.28k
  SerializeString(target_, mesg);
1694
6.28k
  if (target_ == "KEYSPACE") {
1695
3.22k
    SerializeString(keyspace_, mesg);
1696
3.22k
  } else 
if (3.06k
target_ == "TABLE"3.06k
||
target_ == "TYPE"92
) {
1697
3.06k
    SerializeString(keyspace_, mesg);
1698
3.06k
    SerializeString(object_, mesg);
1699
3.06k
  } else 
if (0
target_ == "FUNCTION"0
||
target_ == "AGGREGATE"0
) {
1700
0
    SerializeString(keyspace_, mesg);
1701
0
    SerializeString(object_, mesg);
1702
0
    SerializeStringList(argument_types_, mesg);
1703
0
  }
1704
6.28k
}
1705
1706
//----------------------------------------------------------------------------------------
1707
EventResponse::EventResponse(const string& event_type)
1708
176k
    : CQLResponse(kEventStreamId, Opcode::EVENT), event_type_(event_type) {
1709
176k
}
1710
1711
176k
EventResponse::~EventResponse() {
1712
176k
}
1713
1714
176k
void EventResponse::SerializeBody(faststring* mesg) const {
1715
176k
  SerializeString(event_type_, mesg);
1716
176k
  SerializeEventBody(mesg);
1717
176k
}
1718
1719
0
std::string EventResponse::ToString() const {
1720
0
  return event_type_ + ":" + BodyToString();
1721
0
}
1722
1723
//----------------------------------------------------------------------------------------
1724
TopologyChangeEventResponse::TopologyChangeEventResponse(const string& topology_change_type,
1725
                                                         const Endpoint& node)
1726
    : EventResponse(kTopologyChangeEvent), topology_change_type_(topology_change_type),
1727
176k
      node_(node) {
1728
176k
}
1729
1730
176k
TopologyChangeEventResponse::~TopologyChangeEventResponse() {
1731
176k
}
1732
1733
176k
void TopologyChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1734
176k
  SerializeString(topology_change_type_, mesg);
1735
176k
  SerializeInet(node_, mesg);
1736
176k
}
1737
1738
0
std::string TopologyChangeEventResponse::BodyToString() const {
1739
0
  return topology_change_type_;
1740
0
}
1741
1742
//----------------------------------------------------------------------------------------
1743
StatusChangeEventResponse::StatusChangeEventResponse(const string& status_change_type,
1744
                                                     const Endpoint& node)
1745
    : EventResponse(kStatusChangeEvent), status_change_type_(status_change_type),
1746
0
      node_(node) {
1747
0
}
1748
1749
0
StatusChangeEventResponse::~StatusChangeEventResponse() {
1750
0
}
1751
1752
0
void StatusChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1753
0
  SerializeString(status_change_type_, mesg);
1754
0
  SerializeInet(node_, mesg);
1755
0
}
1756
1757
0
std::string StatusChangeEventResponse::BodyToString() const {
1758
0
  return status_change_type_;
1759
0
}
1760
1761
//----------------------------------------------------------------------------------------
1762
const vector<string> SchemaChangeEventResponse::kEmptyArgumentTypes = {};
1763
1764
SchemaChangeEventResponse::SchemaChangeEventResponse(
1765
    const string& change_type, const string& target,
1766
    const string& keyspace, const string& object, const vector<string>& argument_types)
1767
    : EventResponse(kSchemaChangeEvent), change_type_(change_type), target_(target),
1768
4
      keyspace_(keyspace), object_(object), argument_types_(argument_types) {
1769
4
}
1770
1771
4
SchemaChangeEventResponse::~SchemaChangeEventResponse() {
1772
4
}
1773
1774
0
std::string SchemaChangeEventResponse::BodyToString() const {
1775
0
  return change_type_;
1776
0
}
1777
1778
4
void SchemaChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1779
4
  SerializeString(change_type_, mesg);
1780
4
  SerializeString(target_, mesg);
1781
4
  if (target_ == "KEYSPACE") {
1782
2
    SerializeString(keyspace_, mesg);
1783
2
  } else if (target_ == "TABLE" || 
target_ == "TYPE"0
) {
1784
2
    SerializeString(keyspace_, mesg);
1785
2
    SerializeString(object_, mesg);
1786
2
  } else 
if (0
target_ == "FUNCTION"0
||
target_ == "AGGREGATE"0
) {
1787
0
    SerializeString(keyspace_, mesg);
1788
0
    SerializeString(object_, mesg);
1789
0
    SerializeStringList(argument_types_, mesg);
1790
0
  }
1791
4
}
1792
1793
//----------------------------------------------------------------------------------------
1794
AuthChallengeResponse::AuthChallengeResponse(const CQLRequest& request, const string& token)
1795
0
    : CQLResponse(request, Opcode::AUTH_CHALLENGE), token_(token) {
1796
0
}
1797
1798
0
AuthChallengeResponse::~AuthChallengeResponse() {
1799
0
}
1800
1801
0
void AuthChallengeResponse::SerializeBody(faststring* mesg) const {
1802
0
  SerializeBytes(token_, mesg);
1803
0
}
1804
1805
//----------------------------------------------------------------------------------------
1806
AuthSuccessResponse::AuthSuccessResponse(const CQLRequest& request, const string& token)
1807
6.16k
    : CQLResponse(request, Opcode::AUTH_SUCCESS), token_(token) {
1808
6.16k
}
1809
1810
6.16k
AuthSuccessResponse::~AuthSuccessResponse() {
1811
6.16k
}
1812
1813
6.16k
void AuthSuccessResponse::SerializeBody(faststring* mesg) const {
1814
6.16k
  SerializeBytes(token_, mesg);
1815
6.16k
}
1816
1817
CQLServerEvent::CQLServerEvent(std::unique_ptr<EventResponse> event_response)
1818
176k
    : event_response_(std::move(event_response)) {
1819
176k
  CHECK_NOTNULL(event_response_.get());
1820
176k
  faststring temp;
1821
176k
  event_response_->Serialize(CQLMessage::CompressionScheme::kNone, &temp);
1822
176k
  serialized_response_ = RefCntBuffer(temp);
1823
176k
}
1824
1825
148k
void CQLServerEvent::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) const {
1826
148k
  output->push_back(serialized_response_);
1827
148k
}
1828
1829
0
std::string CQLServerEvent::ToString() const {
1830
0
  return event_response_->ToString();
1831
0
}
1832
1833
37.0k
CQLServerEventList::CQLServerEventList() {
1834
37.0k
}
1835
1836
27.7k
void CQLServerEventList::Transferred(const Status& status, rpc::Connection*) {
1837
27.7k
  if (!status.ok()) {
1838
0
    LOG(WARNING) << "Transfer of CQL server event failed: " << status.ToString();
1839
0
  }
1840
27.7k
}
1841
1842
void CQLServerEventList::Serialize(
1843
28.3k
    boost::container::small_vector_base<RefCntBuffer>* output) {
1844
148k
  for (const auto& cql_server_event : cql_server_events_) {
1845
148k
    cql_server_event->Serialize(output);
1846
148k
  }
1847
28.3k
}
1848
1849
0
std::string CQLServerEventList::ToString() const {
1850
0
  std::string ret = "";
1851
0
  for (const auto& cql_server_event : cql_server_events_) {
1852
0
    if (!ret.empty()) {
1853
0
      ret += ", ";
1854
0
    }
1855
0
    ret += cql_server_event->ToString();
1856
0
  }
1857
0
  return ret;
1858
0
}
1859
1860
176k
void CQLServerEventList::AddEvent(std::unique_ptr<CQLServerEvent> event) {
1861
176k
  cql_server_events_.push_back(std::move(event));
1862
176k
}
1863
1864
}  // namespace ql
1865
}  // namespace yb