YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/util/cql_message.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/cql/ql/util/cql_message.h"
15
16
#include <lz4.h>
17
#include <snappy.h>
18
19
#include "yb/common/ql_protocol.pb.h"
20
#include "yb/common/ql_type.h"
21
#include "yb/common/ql_value.h"
22
#include "yb/common/schema.h"
23
24
#include "yb/gutil/casts.h"
25
#include "yb/gutil/endian.h"
26
#include "yb/gutil/strings/substitute.h"
27
28
#include "yb/util/logging.h"
29
#include "yb/util/random_util.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_format.h"
32
33
namespace yb {
34
namespace ql {
35
36
using std::shared_ptr;
37
using std::unique_ptr;
38
using std::string;
39
using std::set;
40
using std::vector;
41
using std::unordered_map;
42
using strings::Substitute;
43
using snappy::GetUncompressedLength;
44
using snappy::MaxCompressedLength;
45
using snappy::RawUncompress;
46
using snappy::RawCompress;
47
48
#undef RETURN_NOT_ENOUGH
49
#define RETURN_NOT_ENOUGH(sz)                               \
50
7.08M
  do {                                                      \
51
7.08M
    if (body_.size() < (sz)) {                              \
52
0
      return STATUS(NetworkError, "Truncated CQL message"); \
53
0
    }                                                       \
54
7.08M
  } while (0)
55
56
constexpr char CQLMessage::kCQLVersionOption[];
57
constexpr char CQLMessage::kCompressionOption[];
58
constexpr char CQLMessage::kNoCompactOption[];
59
60
constexpr char CQLMessage::kLZ4Compression[];
61
constexpr char CQLMessage::kSnappyCompression[];
62
63
constexpr char CQLMessage::kTopologyChangeEvent[];
64
constexpr char CQLMessage::kStatusChangeEvent[];
65
constexpr char CQLMessage::kSchemaChangeEvent[];
66
67
CHECKED_STATUS CQLMessage::QueryParameters::GetBindVariableValue(const std::string& name,
68
                                                                 const size_t pos,
69
10.7M
                                                                 const Value** value) const {
70
10.7M
  if (!value_map.empty()) {
71
148
    const auto itr = value_map.find(name);
72
148
    if (itr == value_map.end()) {
73
2
      return STATUS_SUBSTITUTE(RuntimeError, "Bind variable \"$0\" not found", name);
74
2
    }
75
146
    *value = &values[itr->second];
76
10.7M
  } else {
77
10.7M
    if (pos >= values.size()) {
78
      // Return error with 1-based position.
79
1
      return STATUS_SUBSTITUTE(RuntimeError, "Bind variable at position $0 not found", pos + 1);
80
1
    }
81
10.7M
    *value = &values[pos];
82
10.7M
  }
83
84
10.7M
  return Status::OK();
85
10.7M
}
86
87
Result<bool> CQLMessage::QueryParameters::IsBindVariableUnset(const std::string& name,
88
3.55M
                                                              const int64_t pos) const {
89
3.55M
  const Value* value = nullptr;
90
3.55M
  RETURN_NOT_OK(GetBindVariableValue(name, pos, &value));
91
3.55M
  return (value->kind == Value::Kind::NOT_SET);
92
3.55M
}
93
94
Status CQLMessage::QueryParameters::GetBindVariable(const std::string& name,
95
                                                    const int64_t pos,
96
                                                    const shared_ptr<QLType>& type,
97
7.17M
                                                    QLValue* value) const {
98
7.17M
  const Value* v = nullptr;
99
7.17M
  RETURN_NOT_OK(GetBindVariableValue(name, pos, &v));
100
7.17M
  switch (v->kind) {
101
7.17M
    case Value::Kind::NOT_NULL: {
102
7.17M
      if (v->value.empty()) {
103
3
        switch (type->main()) {
104
1
          case DataType::STRING:
105
1
            value->set_string_value("");
106
1
            return Status::OK();
107
2
          case DataType::BINARY:
108
2
            value->set_binary_value("");
109
2
            return Status::OK();
110
0
          case DataType::NULL_VALUE_TYPE: FALLTHROUGH_INTENDED;
111
0
          case DataType::INT8: FALLTHROUGH_INTENDED;
112
0
          case DataType::INT16: FALLTHROUGH_INTENDED;
113
0
          case DataType::INT32: FALLTHROUGH_INTENDED;
114
0
          case DataType::INT64: FALLTHROUGH_INTENDED;
115
0
          case DataType::BOOL: FALLTHROUGH_INTENDED;
116
0
          case DataType::FLOAT: FALLTHROUGH_INTENDED;
117
0
          case DataType::DOUBLE: FALLTHROUGH_INTENDED;
118
0
          case DataType::TIMESTAMP: FALLTHROUGH_INTENDED;
119
0
          case DataType::DECIMAL: FALLTHROUGH_INTENDED;
120
0
          case DataType::VARINT: FALLTHROUGH_INTENDED;
121
0
          case DataType::INET: FALLTHROUGH_INTENDED;
122
0
          case DataType::JSONB: FALLTHROUGH_INTENDED;
123
0
          case DataType::LIST: FALLTHROUGH_INTENDED;
124
0
          case DataType::MAP: FALLTHROUGH_INTENDED;
125
0
          case DataType::SET: FALLTHROUGH_INTENDED;
126
0
          case DataType::UUID: FALLTHROUGH_INTENDED;
127
0
          case DataType::TIMEUUID:
128
0
            value->SetNull();
129
0
            return Status::OK();
130
0
          case DataType::UNKNOWN_DATA: FALLTHROUGH_INTENDED;
131
0
          case DataType::TUPLE: FALLTHROUGH_INTENDED;
132
0
          case DataType::TYPEARGS: FALLTHROUGH_INTENDED;
133
0
          case DataType::USER_DEFINED_TYPE: FALLTHROUGH_INTENDED;
134
0
          case DataType::FROZEN: FALLTHROUGH_INTENDED;
135
0
          case DataType::DATE: FALLTHROUGH_INTENDED;
136
0
          case DataType::TIME: FALLTHROUGH_INTENDED;
137
0
          QL_INVALID_TYPES_IN_SWITCH:
138
0
            break;
139
0
        }
140
0
        return STATUS_SUBSTITUTE(
141
0
            NotSupported, "Unsupported datatype $0", static_cast<int>(type->main()));
142
0
      }
143
7.17M
      Slice data(v->value);
144
7.17M
      return value->Deserialize(type, YQL_CLIENT_CQL, &data);
145
7.17M
    }
146
29
    case Value::Kind::IS_NULL:
147
29
      value->SetNull();
148
29
      return Status::OK();
149
0
    case Value::Kind::NOT_SET:
150
0
      return Status::OK();
151
0
  }
152
0
  return STATUS_SUBSTITUTE(
153
0
      RuntimeError, "Invalid bind variable kind $0", static_cast<int>(v->kind));
154
0
}
155
156
4.52M
Status CQLMessage::QueryParameters::ValidateConsistency() {
157
4.52M
  switch (consistency) {
158
29.0k
    case Consistency::LOCAL_ONE: FALLTHROUGH_INTENDED;
159
4.45M
    case Consistency::QUORUM: {
160
      // We are repurposing cassandra's "QUORUM" consistency level to indicate "STRONG"
161
      // consistency for YB. Although, by default the datastax client uses "LOCAL_ONE" as its
162
      // consistency level and since by default we want to support STRONG consistency, we use
163
      // STRONG consistency even for LOCAL_ONE. This way existing cassandra clients don't need
164
      // any changes.
165
4.45M
      set_yb_consistency_level(YBConsistencyLevel::STRONG);
166
4.45M
      break;
167
29.0k
    }
168
79.9k
    case Consistency::ONE: {
169
      // Here we repurpose cassandra's ONE consistency level to be CONSISTENT_PREFIX for us since
170
      // that seems to be the most appropriate.
171
79.9k
      set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
172
79.9k
      break;
173
29.0k
    }
174
56
    default:
175
56
      YB_LOG_EVERY_N_SECS(WARNING, 10) << "Consistency level " << static_cast<uint16_t>(consistency)
176
16
                                       << " is not supported, defaulting to strong consistency";
177
56
      set_yb_consistency_level(YBConsistencyLevel::STRONG);
178
4.52M
  }
179
4.53M
  return Status::OK();
180
4.52M
}
181
182
namespace {
183
184
template<class Type>
185
13.6M
Type LoadByte(const Slice& slice, size_t offset) {
186
13.6M
  return static_cast<Type>(Load8(slice.data() + offset));
187
13.6M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_18LoadByteIhEET_RKNS_5SliceEm
Line
Count
Source
185
9.11M
Type LoadByte(const Slice& slice, size_t offset) {
186
9.11M
  return static_cast<Type>(Load8(slice.data() + offset));
187
9.11M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_18LoadByteINS0_10CQLMessage6OpcodeEEET_RKNS_5SliceEm
Line
Count
Source
185
4.55M
Type LoadByte(const Slice& slice, size_t offset) {
186
4.55M
  return static_cast<Type>(Load8(slice.data() + offset));
187
4.55M
}
188
189
template<class Type>
190
Type LoadShort(const Slice& slice, size_t offset) {
191
  return static_cast<Type>(NetworkByteOrder::Load16(slice.data() + offset));
192
}
193
194
template<class Type>
195
4.55M
Type LoadInt(const Slice& slice, size_t offset) {
196
4.55M
  return static_cast<Type>(NetworkByteOrder::Load32(slice.data() + offset));
197
4.55M
}
198
199
} // namespace
200
201
// ------------------------------------ CQL request -----------------------------------
202
bool CQLRequest::ParseRequest(
203
  const Slice& mesg, const CompressionScheme compression_scheme,
204
4.56M
  unique_ptr<CQLRequest>* request, unique_ptr<CQLResponse>* error_response) {
205
206
4.56M
  *request = nullptr;
207
4.56M
  *error_response = nullptr;
208
209
4.56M
  if (mesg.size() < kMessageHeaderLength) {
210
0
    error_response->reset(
211
0
        new ErrorResponse(
212
0
            static_cast<StreamId>(0), ErrorResponse::Code::PROTOCOL_ERROR, "Incomplete header"));
213
0
    return false;
214
0
  }
215
216
4.56M
  Header header(
217
4.56M
      LoadByte<Version>(mesg, kHeaderPosVersion),
218
4.56M
      LoadByte<Flags>(mesg, kHeaderPosFlags),
219
4.56M
      ParseStreamId(mesg),
220
4.56M
      LoadByte<Opcode>(mesg, kHeaderPosOpcode));
221
222
4.56M
  uint32_t length = LoadInt<uint32_t>(mesg, kHeaderPosLength);
223
3.43k
  DVLOG(4) << "CQL message "
224
3.43k
           << "version 0x" << std::hex << static_cast<uint32_t>(header.version)   << " "
225
3.43k
           << "flags 0x"   << std::hex << static_cast<uint32_t>(header.flags)     << " "
226
3.43k
           << "stream id " << std::dec << static_cast<uint32_t>(header.stream_id) << " "
227
3.43k
           << "opcode 0x"  << std::hex << static_cast<uint32_t>(header.opcode)    << " "
228
3.43k
           << "length "    << std::dec << static_cast<uint32_t>(length);
229
230
  // Verify proper version that the response bit is not set in the request protocol version and
231
  // the protocol version is one we support.
232
4.56M
  if (header.version & kResponseVersion) {
233
0
    error_response->reset(
234
0
        new ErrorResponse(
235
0
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request"));
236
0
    return false;
237
0
  }
238
4.56M
  if (header.version < kMinimumVersion || header.version > kCurrentVersion) {
239
7
    error_response->reset(
240
7
        new ErrorResponse(
241
7
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
242
7
            Substitute("Invalid or unsupported protocol version $0. Supported versions are between "
243
7
                       "$1 and $2.", header.version, kMinimumVersion, kCurrentVersion)));
244
7
    return false;
245
7
  }
246
247
4.56M
  size_t body_size = mesg.size() - kMessageHeaderLength;
248
4.55M
  const uint8_t* body_data = body_size > 0 ? mesg.data() + kMessageHeaderLength : to_uchar_ptr("");
249
4.56M
  unique_ptr<uint8_t[]> buffer;
250
251
  // If the message body is compressed, uncompress it.
252
4.56M
  if (body_size > 0 && (header.flags & kCompressionFlag)) {
253
306
    if (header.opcode == Opcode::STARTUP) {
254
0
      error_response->reset(
255
0
          new ErrorResponse(
256
0
              header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
257
0
              "STARTUP request should not be compressed"));
258
0
      return false;
259
0
    }
260
306
    switch (compression_scheme) {
261
202
      case CompressionScheme::kLz4: {
262
202
        if (body_size < sizeof(uint32_t)) {
263
0
          error_response->reset(
264
0
              new ErrorResponse(
265
0
                  header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
266
0
                  "Insufficient compressed data"));
267
0
          return false;
268
0
        }
269
270
202
        const uint32_t uncomp_size = NetworkByteOrder::Load32(body_data);
271
202
        buffer = std::make_unique<uint8_t[]>(uncomp_size);
272
202
        body_data += sizeof(uncomp_size);
273
202
        body_size -= sizeof(uncomp_size);
274
202
        const int size = LZ4_decompress_safe(
275
202
            to_char_ptr(body_data), to_char_ptr(buffer.get()), narrow_cast<int>(body_size),
276
202
            uncomp_size);
277
203
        if (size < 0 || static_cast<uint32_t>(size) != uncomp_size) {
278
0
          error_response->reset(
279
0
              new ErrorResponse(
280
0
                  header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
281
0
                  "Error occurred when uncompressing CQL message"));
282
0
          return false;
283
0
        }
284
202
        body_data = buffer.get();
285
202
        body_size = uncomp_size;
286
202
        break;
287
202
      }
288
104
      case CompressionScheme::kSnappy: {
289
104
        size_t uncomp_size = 0;
290
104
        if (GetUncompressedLength(to_char_ptr(body_data), body_size, &uncomp_size)) {
291
104
          buffer = std::make_unique<uint8_t[]>(uncomp_size);
292
104
          if (RawUncompress(to_char_ptr(body_data), body_size, to_char_ptr(buffer.get()))) {
293
103
            body_data = buffer.get();
294
103
            body_size = uncomp_size;
295
103
            break;
296
103
          }
297
1
        }
298
1
        error_response->reset(
299
1
            new ErrorResponse(
300
1
                header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
301
1
                "Error occurred when uncompressing CQL message"));
302
1
        break;
303
1
      }
304
0
      case CompressionScheme::kNone:
305
0
        error_response->reset(
306
0
            new ErrorResponse(
307
0
                header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR,
308
0
                "No compression scheme specified"));
309
0
        return false;
310
4.56M
    }
311
4.56M
  }
312
313
4.56M
  const Slice body = (body_size == 0) ? Slice() : Slice(body_data, body_size);
314
315
  // Construct the skeleton request by the opcode
316
4.56M
  switch (header.opcode) {
317
9.39k
    case Opcode::STARTUP:
318
9.39k
      request->reset(new StartupRequest(header, body));
319
9.39k
      break;
320
6.28k
    case Opcode::AUTH_RESPONSE:
321
6.28k
      request->reset(new AuthResponseRequest(header, body));
322
6.28k
      break;
323
2.50k
    case Opcode::OPTIONS:
324
2.50k
      request->reset(new OptionsRequest(header, body));
325
2.50k
      break;
326
159k
    case Opcode::QUERY:
327
159k
      request->reset(new QueryRequest(header, body));
328
159k
      break;
329
5.03k
    case Opcode::PREPARE:
330
5.03k
      request->reset(new PrepareRequest(header, body));
331
5.03k
      break;
332
4.37M
    case Opcode::EXECUTE:
333
4.37M
      request->reset(new ExecuteRequest(header, body));
334
4.37M
      break;
335
1.65k
    case Opcode::BATCH:
336
1.65k
      request->reset(new BatchRequest(header, body));
337
1.65k
      break;
338
2.23k
    case Opcode::REGISTER:
339
2.23k
      request->reset(new RegisterRequest(header, body));
340
2.23k
      break;
341
342
    // These are not request but response opcodes
343
0
    case Opcode::ERROR:
344
0
    case Opcode::READY:
345
0
    case Opcode::AUTHENTICATE:
346
0
    case Opcode::SUPPORTED:
347
0
    case Opcode::RESULT:
348
0
    case Opcode::EVENT:
349
0
    case Opcode::AUTH_CHALLENGE:
350
0
    case Opcode::AUTH_SUCCESS:
351
0
      error_response->reset(
352
0
          new ErrorResponse(
353
0
              header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Not a request opcode"));
354
0
      return false;
355
356
    // default: -> fall through
357
4.56M
  }
358
359
4.56M
  if (*request == nullptr) {
360
0
    error_response->reset(
361
0
        new ErrorResponse(
362
0
            header.stream_id, ErrorResponse::Code::PROTOCOL_ERROR, "Unknown opcode"));
363
0
    return false;
364
0
  }
365
366
  // Parse the request body
367
4.56M
  const Status status = (*request)->ParseBody();
368
4.56M
  if (!status.ok()) {
369
0
    error_response->reset(
370
0
        new ErrorResponse(
371
0
            *(*request), ErrorResponse::Code::PROTOCOL_ERROR, status.message().ToString()));
372
4.56M
  } else if (!(*request)->body_.empty()) {
373
    // Flag error when there are bytes remaining after we have parsed the whole request body
374
    // according to the protocol. Either the request's length field from the client is
375
    // wrong. Or we could have a bug in our parser.
376
0
    error_response->reset(
377
0
        new ErrorResponse(
378
0
            *(*request), ErrorResponse::Code::PROTOCOL_ERROR, "Request length too long"));
379
0
  }
380
381
  // If there is any error, free the partially parsed request and return.
382
4.56M
  if (*error_response != nullptr) {
383
0
    *request = nullptr;
384
0
    return false;
385
0
  }
386
387
  // Clear and release the body after parsing.
388
4.56M
  (*request)->body_.clear();
389
390
4.56M
  return true;
391
4.56M
}
392
393
4.55M
CQLRequest::CQLRequest(const Header& header, const Slice& body) : CQLMessage(header), body_(body) {
394
4.55M
}
395
396
4.56M
CQLRequest::~CQLRequest() {
397
4.56M
}
398
399
//----------------------------------------------------------------------------------------
400
401
0
Status CQLRequest::ParseUUID(string* value) {
402
0
  RETURN_NOT_ENOUGH(kUUIDSize);
403
0
  *value = string(to_char_ptr(body_.data()), kUUIDSize);
404
0
  body_.remove_prefix(kUUIDSize);
405
0
  DVLOG(4) << "CQL uuid " << *value;
406
0
  return Status::OK();
407
0
}
408
409
0
Status CQLRequest::ParseTimeUUID(string* value) {
410
0
  RETURN_NOT_ENOUGH(kUUIDSize);
411
0
  *value = string(to_char_ptr(body_.data()), kUUIDSize);
412
0
  body_.remove_prefix(kUUIDSize);
413
0
  DVLOG(4) << "CQL timeuuid " << *value;
414
0
  return Status::OK();
415
0
}
416
417
2.23k
Status CQLRequest::ParseStringList(vector<string>* list) {
418
0
  DVLOG(4) << "CQL string list ...";
419
2.23k
  uint16_t count = 0;
420
2.23k
  RETURN_NOT_OK(ParseShort(&count));
421
2.23k
  list->resize(count);
422
8.94k
  for (uint16_t i = 0; i < count; ++i) {
423
6.70k
    RETURN_NOT_OK(ParseString(&list->at(i)));
424
6.70k
  }
425
2.23k
  return Status::OK();
426
2.23k
}
427
428
0
Status CQLRequest::ParseInet(Endpoint* value) {
429
0
  std::string ipaddr;
430
0
  int32_t port = 0;
431
0
  RETURN_NOT_OK(ParseBytes("CQL ipaddr", &CQLRequest::ParseByte, &ipaddr));
432
0
  RETURN_NOT_OK(ParseInt(&port));
433
0
  if (port < 0 || port > 65535) {
434
0
    return STATUS(NetworkError, "Invalid inet port");
435
0
  }
436
0
  IpAddress address;
437
0
  if (ipaddr.size() == boost::asio::ip::address_v4::bytes_type().size()) {
438
0
    boost::asio::ip::address_v4::bytes_type bytes;
439
0
    memcpy(bytes.data(), ipaddr.data(), ipaddr.size());
440
0
    address = boost::asio::ip::address_v4(bytes);
441
0
  } else if (ipaddr.size() == boost::asio::ip::address_v6::bytes_type().size()) {
442
0
    boost::asio::ip::address_v6::bytes_type bytes;
443
0
    memcpy(bytes.data(), ipaddr.data(), ipaddr.size());
444
0
    address = boost::asio::ip::address_v6(bytes);
445
0
  } else {
446
0
    return STATUS_SUBSTITUTE(NetworkError, "Invalid size of ipaddr: $0", ipaddr.size());
447
0
  }
448
0
  *value = Endpoint(address, port);
449
0
  DVLOG(4) << "CQL inet " << *value;
450
0
  return Status::OK();
451
0
}
452
453
9.39k
Status CQLRequest::ParseStringMap(unordered_map<string, string>* map) {
454
0
  DVLOG(4) << "CQL string map ...";
455
9.39k
  uint16_t count = 0;
456
9.39k
  RETURN_NOT_OK(ParseShort(&count));
457
36.1k
  for (uint16_t i = 0; i < count; ++i) {
458
26.7k
    string name, value;
459
26.7k
    RETURN_NOT_OK(ParseString(&name));
460
26.7k
    RETURN_NOT_OK(ParseString(&value));
461
26.7k
    (*map)[name] = value;
462
26.7k
  }
463
9.39k
  return Status::OK();
464
9.39k
}
465
466
0
Status CQLRequest::ParseStringMultiMap(unordered_map<string, vector<string>>* map) {
467
0
  DVLOG(4) << "CQL string multimap ...";
468
0
  uint16_t count = 0;
469
0
  RETURN_NOT_OK(ParseShort(&count));
470
0
  for (uint16_t i = 0; i < count; ++i) {
471
0
    string name;
472
0
    vector<string> value;
473
0
    RETURN_NOT_OK(ParseString(&name));
474
0
    RETURN_NOT_OK(ParseStringList(&value));
475
0
    (*map)[name] = value;
476
0
  }
477
0
  return Status::OK();
478
0
}
479
480
0
Status CQLRequest::ParseBytesMap(unordered_map<string, string>* map) {
481
0
  DVLOG(4) << "CQL bytes map ...";
482
0
  uint16_t count = 0;
483
0
  RETURN_NOT_OK(ParseShort(&count));
484
0
  for (uint16_t i = 0; i < count; ++i) {
485
0
    string name, value;
486
0
    RETURN_NOT_OK(ParseString(&name));
487
0
    RETURN_NOT_OK(ParseBytes(&value));
488
0
    (*map)[name] = value;
489
0
  }
490
0
  return Status::OK();
491
0
}
492
493
7.08M
Status CQLRequest::ParseValue(const bool with_name, Value* value) {
494
2.05k
  DVLOG(4) << "CQL value ...";
495
7.08M
  if (with_name) {
496
111
    RETURN_NOT_OK(ParseString(&value->name));
497
111
  }
498
  // Save data pointer to assign the value with length bytes below.
499
7.08M
  const uint8_t* data = body_.data();
500
7.08M
  int32_t length = 0;
501
7.08M
  RETURN_NOT_OK(ParseInt(&length));
502
7.08M
  if (length >= 0) {
503
7.08M
    value->kind = Value::Kind::NOT_NULL;
504
7.08M
    if (length > 0) {
505
7.08M
      uint32_t unsigned_length = length;
506
7.08M
      RETURN_NOT_ENOUGH(unsigned_length);
507
7.08M
      value->value.assign(to_char_ptr(data), kIntSize + length);
508
7.08M
      body_.remove_prefix(length);
509
11.4k
      DVLOG(4) << "CQL value bytes " << value->value;
510
7.08M
    }
511
356
  } else if (VersionIsCompatible(kV4Version)) {
512
32
    switch (length) {
513
29
      case -1:
514
29
        value->kind = Value::Kind::IS_NULL;
515
29
        break;
516
3
      case -2:
517
3
        value->kind = Value::Kind::NOT_SET;
518
3
        break;
519
0
      default:
520
0
        return STATUS(NetworkError, "Invalid length in value");
521
0
        break;
522
324
    }
523
324
  } else {
524
324
    value->kind = Value::Kind::IS_NULL;
525
324
  }
526
7.08M
  return Status::OK();
527
7.08M
}
528
529
4.53M
Status CQLRequest::ParseQueryParameters(QueryParameters* params) {
530
3.38k
  DVLOG(4) << "CQL query parameters ...";
531
4.53M
  RETURN_NOT_OK(ParseConsistency(&params->consistency));
532
4.53M
  RETURN_NOT_OK(params->ValidateConsistency());
533
4.53M
  RETURN_NOT_OK(ParseByte(&params->flags));
534
4.53M
  params->set_request_id(RandomUniformInt<uint64_t>());
535
4.53M
  if (params->flags & CQLMessage::QueryParameters::kWithValuesFlag) {
536
4.37M
    const bool with_name = (params->flags & CQLMessage::QueryParameters::kWithNamesForValuesFlag);
537
4.37M
    uint16_t count = 0;
538
4.37M
    RETURN_NOT_OK(ParseShort(&count));
539
4.37M
    params->values.resize(count);
540
10.3M
    for (uint16_t i = 0; i < count; ++i) {
541
5.92M
      Value& value = params->values[i];
542
5.92M
      RETURN_NOT_OK(ParseValue(with_name, &value));
543
5.92M
      if (with_name) {
544
111
        params->value_map[value.name] = i;
545
111
      }
546
5.92M
    }
547
4.37M
  }
548
4.53M
  if (params->flags & CQLMessage::QueryParameters::kWithPageSizeFlag) {
549
4.42M
    int32_t page_size = 0;
550
4.42M
    RETURN_NOT_OK(ParseInt(&page_size));
551
4.42M
    params->set_page_size(page_size);
552
4.42M
  }
553
4.53M
  if (params->flags & CQLMessage::QueryParameters::kWithPagingStateFlag) {
554
279
    string paging_state;
555
279
    RETURN_NOT_OK(ParseBytes(&paging_state));
556
279
    RETURN_NOT_OK(params->SetPagingState(paging_state));
557
279
  }
558
4.53M
  if (params->flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) {
559
0
    RETURN_NOT_OK(ParseConsistency(&params->serial_consistency));
560
0
  }
561
4.53M
  if (params->flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) {
562
4.42M
    RETURN_NOT_OK(ParseLong(&params->default_timestamp));
563
4.42M
  }
564
4.53M
  return Status::OK();
565
4.53M
}
566
567
4.92M
CHECKED_STATUS CQLRequest::ParseByte(uint8_t* value) {
568
4.92M
  static_assert(sizeof(*value) == kByteSize, "inconsistent byte size");
569
4.92M
  return ParseNum("CQL byte", Load8, value);
570
4.92M
}
571
572
9.60M
CHECKED_STATUS CQLRequest::ParseShort(uint16_t* value) {
573
9.60M
  static_assert(sizeof(*value) == kShortSize, "inconsistent short size");
574
9.60M
  return ParseNum("CQL byte", NetworkByteOrder::Load16, value);
575
9.60M
}
576
577
11.6M
CHECKED_STATUS CQLRequest::ParseInt(int32_t* value) {
578
11.6M
  static_assert(sizeof(*value) == kIntSize, "inconsistent int size");
579
11.6M
  return ParseNum("CQL int", NetworkByteOrder::Load32, value);
580
11.6M
}
581
582
4.42M
CHECKED_STATUS CQLRequest::ParseLong(int64_t* value) {
583
4.42M
  static_assert(sizeof(*value) == kLongSize, "inconsistent long size");
584
4.42M
  return ParseNum("CQL long", NetworkByteOrder::Load64, value);
585
4.42M
}
586
587
60.3k
CHECKED_STATUS CQLRequest::ParseString(std::string* value)  {
588
60.3k
  return ParseBytes("CQL string", &CQLRequest::ParseShort, value);
589
60.3k
}
590
591
165k
CHECKED_STATUS CQLRequest::ParseLongString(std::string* value)  {
592
165k
  return ParseBytes("CQL long string", &CQLRequest::ParseInt, value);
593
165k
}
594
595
4.76M
CHECKED_STATUS CQLRequest::ParseShortBytes(std::string* value) {
596
4.76M
  return ParseBytes("CQL short bytes", &CQLRequest::ParseShort, value);
597
4.76M
}
598
599
6.56k
CHECKED_STATUS CQLRequest::ParseBytes(std::string* value) {
600
6.56k
  return ParseBytes("CQL bytes", &CQLRequest::ParseInt, value);
601
6.56k
}
602
603
4.53M
CHECKED_STATUS CQLRequest::ParseConsistency(Consistency* consistency) {
604
4.53M
  static_assert(sizeof(*consistency) == kConsistencySize, "inconsistent consistency size");
605
4.53M
  return ParseNum("CQL consistency", NetworkByteOrder::Load16, consistency);
606
4.53M
}
607
608
// ------------------------------ Individual CQL requests -----------------------------------
609
StartupRequest::StartupRequest(const Header& header, const Slice& body)
610
9.38k
    : CQLRequest(header, body) {
611
9.38k
}
612
613
9.38k
StartupRequest::~StartupRequest() {
614
9.38k
}
615
616
9.39k
Status StartupRequest::ParseBody() {
617
9.39k
  return ParseStringMap(&options_);
618
9.39k
}
619
620
//----------------------------------------------------------------------------------------
621
AuthResponseRequest::AuthResponseRequest(const Header& header, const Slice& body)
622
6.28k
    : CQLRequest(header, body) {
623
6.28k
}
624
625
6.28k
AuthResponseRequest::~AuthResponseRequest() {
626
6.28k
}
627
628
6.28k
Status AuthResponseRequest::ParseBody() {
629
6.28k
  RETURN_NOT_OK(ParseBytes(&token_));
630
6.28k
  string error_msg;
631
6.28k
  do {
632
6.28k
    if (token_.empty()) {
633
0
      error_msg = "Invalid empty token!";
634
0
      break;
635
0
    }
636
6.28k
    if (token_[0] != '\0') {
637
0
      error_msg = "Invalid format. Message must begin with \\0";
638
0
      break;
639
0
    }
640
6.28k
    size_t next_delim = token_.find_first_of('\0', 1);
641
6.28k
    if (next_delim == std::string::npos) {
642
0
      error_msg = "Invalid format. Message must contain \\0 after username";
643
0
      break;
644
0
    }
645
    // Start from token_[1], read all the username.
646
6.28k
    params_.username = token_.substr(1, next_delim - 1);
647
    // Start from after the delimiter, go to the end.
648
6.28k
    params_.password = token_.substr(next_delim + 1);
649
6.28k
    return Status::OK();
650
0
  } while (0);
651
0
  return STATUS(InvalidArgument, error_msg);
652
6.28k
}
653
654
CHECKED_STATUS AuthResponseRequest::AuthQueryParameters::GetBindVariable(
655
    const std::string& name,
656
    int64_t pos,
657
    const std::shared_ptr<QLType>& type,
658
5.89k
    QLValue* value) const {
659
5.89k
  if (pos == 0) {
660
5.89k
    value->set_string_value(username);
661
5.89k
    return Status::OK();
662
0
  } else {
663
0
    return STATUS(InvalidArgument, Substitute("Bind variable position $0 out of range: ", pos));
664
0
  }
665
5.89k
}
666
667
//----------------------------------------------------------------------------------------
668
2.50k
OptionsRequest::OptionsRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
669
2.50k
}
670
671
2.50k
OptionsRequest::~OptionsRequest() {
672
2.50k
}
673
674
2.50k
Status OptionsRequest::ParseBody() {
675
  // Options body is empty
676
2.50k
  return Status::OK();
677
2.50k
}
678
679
//----------------------------------------------------------------------------------------
680
159k
QueryRequest::QueryRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
681
159k
}
682
683
160k
QueryRequest::~QueryRequest() {
684
160k
}
685
686
160k
Status QueryRequest::ParseBody() {
687
160k
  RETURN_NOT_OK(ParseLongString(&query_));
688
160k
  RETURN_NOT_OK(ParseQueryParameters(&params_));
689
160k
  return Status::OK();
690
160k
}
691
692
//----------------------------------------------------------------------------------------
693
5.02k
PrepareRequest::PrepareRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
694
5.02k
}
695
696
5.03k
PrepareRequest::~PrepareRequest() {
697
5.03k
}
698
699
5.03k
Status PrepareRequest::ParseBody() {
700
5.03k
  RETURN_NOT_OK(ParseLongString(&query_));
701
5.03k
  return Status::OK();
702
5.03k
}
703
704
//----------------------------------------------------------------------------------------
705
4.37M
ExecuteRequest::ExecuteRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
706
4.37M
}
707
708
4.37M
ExecuteRequest::~ExecuteRequest() {
709
4.37M
}
710
711
4.37M
Status ExecuteRequest::ParseBody() {
712
4.37M
  RETURN_NOT_OK(ParseShortBytes(&query_id_));
713
4.37M
  RETURN_NOT_OK(ParseQueryParameters(&params_));
714
4.37M
  return Status::OK();
715
4.37M
}
716
717
//----------------------------------------------------------------------------------------
718
1.65k
BatchRequest::BatchRequest(const Header& header, const Slice& body) : CQLRequest(header, body) {
719
1.65k
}
720
721
1.65k
BatchRequest::~BatchRequest() {
722
1.65k
}
723
724
1.65k
Status BatchRequest::ParseBody() {
725
1.65k
  uint8_t type = 0;
726
1.65k
  RETURN_NOT_OK(ParseByte(&type));
727
1.65k
  type_ = static_cast<Type>(type);
728
1.65k
  uint16_t query_count = 0;
729
1.65k
  RETURN_NOT_OK(ParseShort(&query_count));
730
1.65k
  queries_.resize(query_count);
731
390k
  for (uint16_t i = 0; i < query_count; ++i) {
732
389k
    Query& query = queries_[i];
733
389k
    uint8_t is_prepared_query = 0;
734
389k
    RETURN_NOT_OK(ParseByte(&is_prepared_query));
735
389k
    switch (is_prepared_query) {
736
218
      case 0:
737
218
        query.is_prepared = false;
738
218
        RETURN_NOT_OK(ParseLongString(&query.query));
739
218
        break;
740
389k
      case 1:
741
389k
        query.is_prepared = true;
742
389k
        RETURN_NOT_OK(ParseShortBytes(&query.query_id));
743
389k
        break;
744
0
      default:
745
0
        return STATUS(NetworkError, "Invalid is_prepared_query byte in batch request");
746
0
        break;
747
389k
    }
748
389k
    uint16_t value_count = 0;
749
389k
    RETURN_NOT_OK(ParseShort(&value_count));
750
389k
    query.params.values.resize(value_count);
751
1.55M
    for (uint16_t j = 0; j < value_count; ++j) {
752
      // with_name is not possible in the protocol due to a design flaw. See JIRA CASSANDRA-10246.
753
1.16M
      RETURN_NOT_OK(ParseValue(false /* with_name */, &query.params.values[j]));
754
1.16M
    }
755
389k
  }
756
757
1.52k
  Consistency consistency = Consistency::ANY;
758
1.52k
  QueryParameters::Flags flags = 0;
759
1.52k
  Consistency serial_consistency = Consistency::ANY;
760
1.52k
  int64_t default_timestamp = 0;
761
1.52k
  RETURN_NOT_OK(ParseConsistency(&consistency));
762
1.52k
  RETURN_NOT_OK(ParseByte(&flags));
763
1.52k
  if (flags & CQLMessage::QueryParameters::kWithSerialConsistencyFlag) {
764
0
    RETURN_NOT_OK(ParseConsistency(&serial_consistency));
765
0
  }
766
1.52k
  if (flags & CQLMessage::QueryParameters::kWithDefaultTimestampFlag) {
767
255
    RETURN_NOT_OK(ParseLong(&default_timestamp));
768
255
  }
769
770
389k
  for (Query& query : queries_) {
771
389k
    QueryParameters& params = query.params;
772
389k
    params.consistency = consistency;
773
389k
    params.flags = flags;
774
389k
    params.serial_consistency = serial_consistency;
775
389k
    params.default_timestamp = default_timestamp;
776
389k
  }
777
1.52k
  return Status::OK();
778
1.52k
}
779
780
//----------------------------------------------------------------------------------------
781
RegisterRequest::RegisterRequest(const Header& header, const Slice& body)
782
2.23k
    : CQLRequest(header, body) {
783
2.23k
}
784
785
2.23k
RegisterRequest::~RegisterRequest() {
786
2.23k
}
787
788
2.23k
Status RegisterRequest::ParseBody() {
789
2.23k
  vector<string> event_types;
790
2.23k
  RETURN_NOT_OK(ParseStringList(&event_types));
791
2.23k
  events_ = kNoEvents;
792
793
6.70k
  for (const string& event_type : event_types) {
794
6.70k
    if (event_type == kTopologyChangeEvent) {
795
2.23k
      events_ |= kTopologyChange;
796
4.47k
    } else if (event_type == kStatusChangeEvent) {
797
2.23k
      events_ |= kStatusChange;
798
2.23k
    } else if (event_type == kSchemaChangeEvent) {
799
2.23k
      events_ |= kSchemaChange;
800
0
    } else {
801
0
      return STATUS(NetworkError, "Invalid event type in register request");
802
0
    }
803
6.70k
  }
804
805
2.23k
  return Status::OK();
806
2.23k
}
807
808
// --------------------------- Serialization utility functions -------------------------------
809
namespace {
810
811
// Serialize a CQL number (8, 16, 32 and 64-bit integer). <num_type> is the integer type.
812
// <converter> converts the number from machine byte-order to network order and <data_type>
813
// is the coverter's return type. The converter's return type <data_type> is unsigned while
814
// <num_type> may be signed or unsigned.
815
template<typename num_type, typename data_type>
816
14.1M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
14.1M
  data_type byte_value;
818
14.1M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
14.1M
  mesg->append(&byte_value, sizeof(byte_value));
820
14.1M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIijEEvPFvPvT0_ET_PNS_10faststringE
Line
Count
Source
816
12.0M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
12.0M
  data_type byte_value;
818
12.0M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
12.0M
  mesg->append(&byte_value, sizeof(byte_value));
820
12.0M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIttEEvPFvPvT0_ET_PNS_10faststringE
Line
Count
Source
816
2.04M
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
2.04M
  data_type byte_value;
818
2.04M
  (*converter)(&byte_value, static_cast<data_type>(val));
819
2.04M
  mesg->append(&byte_value, sizeof(byte_value));
820
2.04M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_112SerializeNumIhhEEvPFvPvT0_ET_PNS_10faststringE
Line
Count
Source
816
52.8k
void SerializeNum(void (*converter)(void *, data_type), const num_type val, faststring* mesg) {
817
52.8k
  data_type byte_value;
818
52.8k
  (*converter)(&byte_value, static_cast<data_type>(val));
819
52.8k
  mesg->append(&byte_value, sizeof(byte_value));
820
52.8k
}
821
822
// Serialize a CQL byte stream (string or bytes). <len_type> is the length type.
823
// <len_serializer> serializes the byte length from machine byte-order to network order.
824
template<typename len_type>
825
inline void SerializeBytes(
826
    void (*len_serializer)(len_type, faststring* mesg), string val,
827
1.10M
    faststring* mesg) {
828
1.10M
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
1.10M
  mesg->append(val);
830
1.10M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_114SerializeBytesItEEvPFvT_PNS_10faststringEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEES5_
Line
Count
Source
827
1.10M
    faststring* mesg) {
828
1.10M
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
1.10M
  mesg->append(val);
830
1.10M
}
cql_message.cc:_ZN2yb2ql12_GLOBAL__N_114SerializeBytesIiEEvPFvT_PNS_10faststringEENSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEES5_
Line
Count
Source
827
6.44k
    faststring* mesg) {
828
6.44k
  (*len_serializer)(static_cast<len_type>(val.size()), mesg);
829
6.44k
  mesg->append(val);
830
6.44k
}
831
832
52.8k
inline void SerializeByte(const uint8_t value, faststring* mesg) {
833
52.8k
  static_assert(sizeof(value) == CQLMessage::kByteSize, "inconsistent byte size");
834
52.8k
  SerializeNum(Store8, value, mesg);
835
52.8k
}
836
837
2.04M
inline void SerializeShort(const uint16_t value, faststring* mesg) {
838
2.04M
  static_assert(sizeof(value) == CQLMessage::kShortSize, "inconsistent short size");
839
2.04M
  SerializeNum(NetworkByteOrder::Store16, value, mesg);
840
2.04M
}
841
842
12.0M
inline void SerializeInt(const int32_t value, faststring* mesg) {
843
12.0M
  static_assert(sizeof(value) == CQLMessage::kIntSize, "inconsistent int size");
844
12.0M
  SerializeNum(NetworkByteOrder::Store32, value, mesg);
845
12.0M
}
846
847
#if 0 // Save this function for future use
848
inline void SerializeLong(const int64_t value, faststring* mesg) {
849
  static_assert(sizeof(value) == CQLMessage::kLongSize, "inconsistent long size");
850
  SerializeNum(NetworkByteOrder::Store64, value, mesg);
851
}
852
#endif
853
854
1.09M
inline void SerializeString(const string& value, faststring* mesg) {
855
1.09M
  SerializeBytes(&SerializeShort, value, mesg);
856
1.09M
}
857
858
#if 0 // Save these functions for future use
859
inline void SerializeLongString(const string& value, faststring* mesg) {
860
  SerializeBytes(&SerializeInt, value, mesg);
861
}
862
#endif
863
864
6.05k
inline void SerializeShortBytes(const string& value, faststring* mesg) {
865
6.05k
  SerializeBytes(&SerializeShort, value, mesg);
866
6.05k
}
867
868
6.44k
inline void SerializeBytes(const string& value, faststring* mesg) {
869
6.44k
  SerializeBytes(&SerializeInt, value, mesg);
870
6.44k
}
871
872
#if 0 // Save these functions for future use
873
inline void SerializeConsistency(const CQLMessage::Consistency consistency, faststring* mesg) {
874
  static_assert(
875
      sizeof(consistency) == CQLMessage::kConsistencySize,
876
      "inconsistent consistency size");
877
  SerializeNum(NetworkByteOrder::Store16, consistency, mesg);
878
}
879
880
void SerializeUUID(const string& value, faststring* mesg) {
881
  if (value.size() == CQLMessage::kUUIDSize) {
882
    mesg->append(value);
883
  } else {
884
    LOG(ERROR) << "Internal error: inconsistent UUID size: " << value.size();
885
    uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0};
886
    mesg->append(empty_uuid, sizeof(empty_uuid));
887
  }
888
}
889
890
void SerializeTimeUUID(const string& value, faststring* mesg) {
891
  if (value.size() == CQLMessage::kUUIDSize) {
892
    mesg->append(value);
893
  } else {
894
    LOG(ERROR) << "Internal error: inconsistent TimeUUID size: " << value.size();
895
    uint8_t empty_uuid[CQLMessage::kUUIDSize] = {0};
896
    mesg->append(empty_uuid, sizeof(empty_uuid));
897
  }
898
}
899
#endif
900
901
5.00k
void SerializeStringList(const vector<string>& list, faststring* mesg) {
902
5.00k
  SerializeShort(list.size(), mesg);
903
10.0k
  for (const auto& entry : list) {
904
10.0k
    SerializeString(entry, mesg);
905
10.0k
  }
906
5.00k
}
907
908
52.8k
void SerializeInet(const Endpoint& value, faststring* mesg) {
909
52.8k
  auto address = value.address();
910
52.8k
  if (address.is_v4()) {
911
52.8k
    auto bytes = address.to_v4().to_bytes();
912
52.8k
    SerializeByte(bytes.size(), mesg);
913
52.8k
    mesg->append(bytes.data(), bytes.size());
914
0
  } else {
915
0
    auto bytes = address.to_v6().to_bytes();
916
0
    SerializeByte(bytes.size(), mesg);
917
0
    mesg->append(bytes.data(), bytes.size());
918
0
  }
919
52.8k
  const uint16_t port = value.port();
920
52.8k
  SerializeInt(port, mesg);
921
52.8k
}
922
923
#if 0 // Save these functions for future use
924
void SerializeStringMap(const unordered_map<string, string>& map, faststring* mesg) {
925
  SerializeShort(map.size(), mesg);
926
  for (const auto& element : map) {
927
    SerializeString(element.first, mesg);
928
    SerializeString(element.second, mesg);
929
  }
930
}
931
#endif
932
933
2.50k
void SerializeStringMultiMap(const unordered_map<string, vector<string>>& map, faststring* mesg) {
934
2.50k
  SerializeShort(map.size(), mesg);
935
5.00k
  for (const auto& element : map) {
936
5.00k
    SerializeString(element.first, mesg);
937
5.00k
    SerializeStringList(element.second, mesg);
938
5.00k
  }
939
2.50k
}
940
941
#if 0 // Save these functions for future use
942
void SerializeBytesMap(const unordered_map<string, string>& map, faststring* mesg) {
943
  SerializeShort(map.size(), mesg);
944
  for (const auto& element : map) {
945
    SerializeString(element.first, mesg);
946
    SerializeBytes(element.second, mesg);
947
  }
948
}
949
950
void SerializeValue(const CQLMessage::Value& value, faststring* mesg) {
951
  switch (value.kind) {
952
    case CQLMessage::Value::Kind::NOT_NULL:
953
      SerializeInt(value.value.size(), mesg);
954
      mesg->append(value.value);
955
      return;
956
    case CQLMessage::Value::Kind::IS_NULL:
957
      SerializeInt(-1, mesg);
958
      return;
959
    case CQLMessage::Value::Kind::NOT_SET: // NOT_SET value kind should appear in request msg only.
960
      break;
961
      // default: fall through
962
  }
963
  LOG(ERROR) << "Internal error: invalid/unknown value kind " << static_cast<uint32_t>(value.kind);
964
  SerializeInt(-1, mesg);
965
}
966
#endif
967
968
} // namespace
969
970
// ------------------------------------ CQL response -----------------------------------
971
CQLResponse::CQLResponse(const CQLRequest& request, const Opcode opcode)
972
4.55M
    : CQLMessage(Header(request.version() | kResponseVersion, 0, request.stream_id(), opcode)) {
973
4.55M
}
974
975
CQLResponse::CQLResponse(const StreamId stream_id, const Opcode opcode)
976
52.8k
    : CQLMessage(Header(kCurrentVersion | kResponseVersion, 0, stream_id, opcode)) {
977
52.8k
}
978
979
4.61M
CQLResponse::~CQLResponse() {
980
4.61M
}
981
982
// Short-hand macros for serializing fields from the message header
983
#define SERIALIZE_BYTE(buf, pos, value) \
984
13.7M
  Store8(&(buf)[pos], static_cast<uint8_t>(value))
985
#define SERIALIZE_SHORT(buf, pos, value) \
986
4.59M
  NetworkByteOrder::Store16(&(buf)[pos], static_cast<uint16_t>(value))
987
#define SERIALIZE_INT(buf, pos, value) \
988
9.21M
  NetworkByteOrder::Store32(&(buf)[pos], static_cast<int32_t>(value))
989
#define SERIALIZE_LONG(buf, pos, value) \
990
  NetworkByteOrder::Store64(&(buf)[pos], static_cast<int64_t>(value))
991
992
4.61M
void CQLResponse::Serialize(const CompressionScheme compression_scheme, faststring* mesg) const {
993
4.61M
  const size_t start_pos = mesg->size(); // save the start position
994
4.61M
  const bool compress = (compression_scheme != CQLMessage::CompressionScheme::kNone);
995
4.61M
  SerializeHeader(compress, mesg);
996
4.61M
  if (compress) {
997
347
    faststring body;
998
347
    SerializeBody(&body);
999
347
    switch (compression_scheme) {
1000
225
      case CQLMessage::CompressionScheme::kLz4: {
1001
225
        SerializeInt(static_cast<int32_t>(body.size()), mesg);
1002
225
        const size_t curr_size = mesg->size();
1003
225
        const int max_comp_size = LZ4_compressBound(narrow_cast<int>(body.size()));
1004
225
        mesg->resize(curr_size + max_comp_size);
1005
225
        const int comp_size = LZ4_compress_default(to_char_ptr(body.data()),
1006
225
                                                   to_char_ptr(mesg->data() + curr_size),
1007
225
                                                   narrow_cast<int>(body.size()),
1008
225
                                                   max_comp_size);
1009
0
        CHECK_NE(comp_size, 0) << "LZ4 compression failed";
1010
225
        mesg->resize(curr_size + comp_size);
1011
225
        break;
1012
0
      }
1013
126
      case CQLMessage::CompressionScheme::kSnappy: {
1014
126
        const size_t curr_size = mesg->size();
1015
126
        const size_t max_comp_size = MaxCompressedLength(body.size());
1016
126
        size_t comp_size = 0;
1017
126
        mesg->resize(curr_size + max_comp_size);
1018
126
        RawCompress(to_char_ptr(body.data()), body.size(),
1019
126
                    to_char_ptr(mesg->data() + curr_size), &comp_size);
1020
126
        mesg->resize(curr_size + comp_size);
1021
126
        break;
1022
0
      }
1023
0
      case CQLMessage::CompressionScheme::kNone:
1024
0
        LOG(FATAL) << "No compression scheme";
1025
0
        break;
1026
4.61M
    }
1027
4.61M
  } else {
1028
4.61M
    SerializeBody(mesg);
1029
4.61M
  }
1030
4.61M
  SERIALIZE_INT(
1031
4.61M
      mesg->data(), start_pos + kHeaderPosLength, mesg->size() - start_pos - kMessageHeaderLength);
1032
4.61M
}
1033
1034
4.59M
void CQLResponse::SerializeHeader(const bool compress, faststring* mesg) const {
1035
4.59M
  uint8_t buffer[kMessageHeaderLength];
1036
4.59M
  SERIALIZE_BYTE(buffer, kHeaderPosVersion, version());
1037
4.59M
  SERIALIZE_BYTE(buffer, kHeaderPosFlags, flags() | (compress ? kCompressionFlag : 0));
1038
4.59M
  SERIALIZE_SHORT(buffer, kHeaderPosStreamId, stream_id());
1039
4.59M
  SERIALIZE_INT(buffer, kHeaderPosLength, 0);
1040
4.59M
  SERIALIZE_BYTE(buffer, kHeaderPosOpcode, opcode());
1041
4.59M
  mesg->append(buffer, sizeof(buffer));
1042
4.59M
}
1043
1044
#undef SERIALIZE_BYTE
1045
#undef SERIALIZE_SHORT
1046
#undef SERIALIZE_INT
1047
#undef SERIALIZE_LONG
1048
1049
// ------------------------------ Individual CQL responses -----------------------------------
1050
ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const string& message)
1051
5.30k
    : CQLResponse(request, Opcode::ERROR), code_(code), message_(message) {
1052
5.30k
}
1053
1054
ErrorResponse::ErrorResponse(const CQLRequest& request, const Code code, const Status& status)
1055
    : CQLResponse(request, Opcode::ERROR), code_(code),
1056
0
      message_(to_char_ptr(status.message().data())) {
1057
0
}
1058
1059
ErrorResponse::ErrorResponse(const StreamId stream_id, const Code code, const string& message)
1060
8
    : CQLResponse(stream_id, Opcode::ERROR), code_(code), message_(message) {
1061
8
}
1062
1063
5.31k
ErrorResponse::~ErrorResponse() {
1064
5.31k
}
1065
1066
5.31k
void ErrorResponse::SerializeBody(faststring* mesg) const {
1067
5.31k
  SerializeInt(static_cast<int32_t>(code_), mesg);
1068
5.31k
  SerializeString(message_, mesg);
1069
5.31k
  SerializeErrorBody(mesg);
1070
5.31k
}
1071
1072
4.10k
void ErrorResponse::SerializeErrorBody(faststring* mesg) const {
1073
4.10k
}
1074
1075
//----------------------------------------------------------------------------------------
1076
UnpreparedErrorResponse::UnpreparedErrorResponse(const CQLRequest& request, const QueryId& query_id)
1077
1.20k
    : ErrorResponse(request, Code::UNPREPARED, "Unprepared query"), query_id_(query_id) {
1078
1.20k
}
1079
1080
1.20k
UnpreparedErrorResponse::~UnpreparedErrorResponse() {
1081
1.20k
}
1082
1083
1.20k
void UnpreparedErrorResponse::SerializeErrorBody(faststring* mesg) const {
1084
1.20k
  SerializeShortBytes(query_id_, mesg);
1085
1.20k
}
1086
1087
//----------------------------------------------------------------------------------------
1088
5.33k
ReadyResponse::ReadyResponse(const CQLRequest& request) : CQLResponse(request, Opcode::READY) {
1089
5.33k
}
1090
1091
5.33k
ReadyResponse::~ReadyResponse() {
1092
5.33k
}
1093
1094
5.33k
void ReadyResponse::SerializeBody(faststring* mesg) const {
1095
  // Ready body is empty
1096
5.33k
}
1097
1098
//----------------------------------------------------------------------------------------
1099
AuthenticateResponse::AuthenticateResponse(const CQLRequest& request, const string& authenticator)
1100
6.29k
    : CQLResponse(request, Opcode::AUTHENTICATE), authenticator_(authenticator) {
1101
6.29k
}
1102
1103
6.29k
AuthenticateResponse::~AuthenticateResponse() {
1104
6.29k
}
1105
1106
6.29k
void AuthenticateResponse::SerializeBody(faststring* mesg) const {
1107
6.29k
  SerializeString(authenticator_, mesg);
1108
6.29k
}
1109
1110
//----------------------------------------------------------------------------------------
1111
SupportedResponse::SupportedResponse(const CQLRequest& request,
1112
                                     const unordered_map<string, vector<string>>* options)
1113
2.50k
    : CQLResponse(request, Opcode::SUPPORTED), options_(options) {
1114
2.50k
}
1115
1116
2.50k
SupportedResponse::~SupportedResponse() {
1117
2.50k
}
1118
1119
2.50k
void SupportedResponse::SerializeBody(faststring* mesg) const {
1120
2.50k
  SerializeStringMultiMap(*options_, mesg);
1121
2.50k
}
1122
1123
//----------------------------------------------------------------------------------------
1124
ResultResponse::ResultResponse(const CQLRequest& request, const Kind kind)
1125
4.52M
  : CQLResponse(request, Opcode::RESULT), kind_(kind) {
1126
4.52M
}
1127
1128
4.53M
ResultResponse::~ResultResponse() {
1129
4.53M
}
1130
1131
0
ResultResponse::RowsMetadata::Type::Type(const Id id) : id(id) {
1132
0
  switch (id) {
1133
    // Verify that the type id is a primitive type indeed.
1134
0
    case Id::ASCII:
1135
0
    case Id::BIGINT:
1136
0
    case Id::BLOB:
1137
0
    case Id::BOOLEAN:
1138
0
    case Id::COUNTER:
1139
0
    case Id::DECIMAL:
1140
0
    case Id::DOUBLE:
1141
0
    case Id::FLOAT:
1142
0
    case Id::INT:
1143
0
    case Id::TIMESTAMP:
1144
0
    case Id::UUID:
1145
0
    case Id::VARCHAR:
1146
0
    case Id::VARINT:
1147
0
    case Id::TIMEUUID:
1148
0
    case Id::INET:
1149
0
    case Id::JSONB:
1150
0
    case Id::DATE:
1151
0
    case Id::TIME:
1152
0
    case Id::SMALLINT:
1153
0
    case Id::TINYINT:
1154
0
      return;
1155
1156
    // Non-primitive types
1157
0
    case Id::CUSTOM:
1158
0
    case Id::LIST:
1159
0
    case Id::MAP:
1160
0
    case Id::SET:
1161
0
    case Id::UDT:
1162
0
    case Id::TUPLE:
1163
0
      break;
1164
1165
    // default: fall through
1166
0
  }
1167
1168
0
  LOG(ERROR) << "Internal error: invalid/unknown primitive type id " << static_cast<uint32_t>(id);
1169
0
}
1170
1171
// These union members in Type below are not initialized by default. They need to be explicitly
1172
// initialized using the new() operator in the Type constructors.
1173
0
ResultResponse::RowsMetadata::Type::Type(const string& custom_class_name) : id(Id::CUSTOM) {
1174
0
  new(&this->custom_class_name) string(custom_class_name);
1175
0
}
1176
1177
ResultResponse::RowsMetadata::Type::Type(const Id id, shared_ptr<const Type> element_type)
1178
0
    : id(id) {
1179
0
  switch (id) {
1180
0
    case Id::LIST:
1181
0
    case Id::SET:
1182
0
      new(&this->element_type) shared_ptr<const Type>(element_type);
1183
0
      return;
1184
1185
    // Not list nor map
1186
0
    case Id::CUSTOM:
1187
0
    case Id::ASCII:
1188
0
    case Id::BIGINT:
1189
0
    case Id::BLOB:
1190
0
    case Id::BOOLEAN:
1191
0
    case Id::COUNTER:
1192
0
    case Id::DECIMAL:
1193
0
    case Id::DOUBLE:
1194
0
    case Id::FLOAT:
1195
0
    case Id::INT:
1196
0
    case Id::TIMESTAMP:
1197
0
    case Id::UUID:
1198
0
    case Id::VARCHAR:
1199
0
    case Id::VARINT:
1200
0
    case Id::TIMEUUID:
1201
0
    case Id::INET:
1202
0
    case Id::JSONB:
1203
0
    case Id::DATE:
1204
0
    case Id::TIME:
1205
0
    case Id::SMALLINT:
1206
0
    case Id::TINYINT:
1207
0
    case Id::MAP:
1208
0
    case Id::UDT:
1209
0
    case Id::TUPLE:
1210
0
      break;
1211
1212
    // default: fall through
1213
0
  }
1214
1215
0
  LOG(ERROR) << "Internal error: invalid/unknown list/map type id " << static_cast<uint32_t>(id);
1216
0
}
1217
1218
0
ResultResponse::RowsMetadata::Type::Type(shared_ptr<const MapType> map_type) : id(Id::MAP) {
1219
0
  new(&this->map_type) shared_ptr<const MapType>(map_type);
1220
0
}
1221
1222
0
ResultResponse::RowsMetadata::Type::Type(shared_ptr<const UDTType> udt_type) : id(Id::UDT) {
1223
0
  new(&this->udt_type) shared_ptr<const UDTType>(udt_type);
1224
0
}
1225
1226
ResultResponse::RowsMetadata::Type::Type(
1227
0
    shared_ptr<const TupleComponentTypes> tuple_component_types) : id(Id::TUPLE) {
1228
0
  new(&this->tuple_component_types) shared_ptr<const TupleComponentTypes>(tuple_component_types);
1229
0
}
1230
1231
936k
ResultResponse::RowsMetadata::Type::Type(const Type& t) : id(t.id) {
1232
936k
  switch (id) {
1233
0
    case Id::CUSTOM:
1234
0
      new(&this->custom_class_name) string(t.custom_class_name);
1235
0
      return;
1236
0
    case Id::ASCII:
1237
2.21k
    case Id::BIGINT:
1238
42.5k
    case Id::BLOB:
1239
76.3k
    case Id::BOOLEAN:
1240
76.3k
    case Id::COUNTER:
1241
78.1k
    case Id::DECIMAL:
1242
123k
    case Id::DOUBLE:
1243
124k
    case Id::FLOAT:
1244
242k
    case Id::INT:
1245
243k
    case Id::TIMESTAMP:
1246
322k
    case Id::UUID:
1247
768k
    case Id::VARCHAR:
1248
769k
    case Id::VARINT:
1249
769k
    case Id::TIMEUUID:
1250
824k
    case Id::INET:
1251
824k
    case Id::JSONB:
1252
824k
    case Id::DATE:
1253
824k
    case Id::TIME:
1254
825k
    case Id::SMALLINT:
1255
825k
    case Id::TINYINT:
1256
825k
      return;
1257
21.3k
    case Id::LIST:
1258
38.4k
    case Id::SET:
1259
38.4k
      new(&element_type) shared_ptr<const Type>(t.element_type);
1260
38.4k
      return;
1261
80.7k
    case Id::MAP:
1262
80.7k
      new(&map_type) shared_ptr<const MapType>(t.map_type);
1263
80.7k
      return;
1264
185
    case Id::UDT:
1265
185
      new(&udt_type) shared_ptr<const UDTType>(t.udt_type);
1266
185
      return;
1267
0
    case Id::TUPLE:
1268
0
      new(&tuple_component_types) shared_ptr<const TupleComponentTypes>(t.tuple_component_types);
1269
0
      return;
1270
1271
    // default: fall through
1272
0
  }
1273
1274
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id);
1275
0
}
1276
1277
936k
ResultResponse::RowsMetadata::Type::Type(const shared_ptr<QLType>& ql_type) {
1278
936k
  auto type = ql_type;
1279
936k
  if (type->IsFrozen()) {
1280
275
    type = ql_type->param_type(0);
1281
275
  }
1282
936k
  switch (type->main()) {
1283
122
    case DataType::INT8:
1284
122
      id = Id::TINYINT;
1285
122
      return;
1286
132
    case DataType::INT16:
1287
132
      id = Id::SMALLINT;
1288
132
      return;
1289
118k
    case DataType::INT32:
1290
118k
      id = Id::INT;
1291
118k
      return;
1292
2.21k
    case DataType::INT64:
1293
2.21k
      id = Id::BIGINT;
1294
2.21k
      return;
1295
922
    case DataType::VARINT:
1296
922
      id = Id::VARINT;
1297
922
      return;
1298
243
    case DataType::FLOAT:
1299
243
      id = Id::FLOAT;
1300
243
      return;
1301
45.6k
    case DataType::DOUBLE:
1302
45.6k
      id = Id::DOUBLE;
1303
45.6k
      return;
1304
445k
    case DataType::STRING:
1305
445k
      id = Id::VARCHAR;
1306
445k
      return;
1307
33.7k
    case DataType::BOOL:
1308
33.7k
      id = Id::BOOLEAN;
1309
33.7k
      return;
1310
1.07k
    case DataType::TIMESTAMP:
1311
1.07k
      id = Id::TIMESTAMP;
1312
1.07k
      return;
1313
44
    case DataType::DATE:
1314
44
      id = Id::DATE;
1315
44
      return;
1316
34
    case DataType::TIME:
1317
34
      id = Id::TIME;
1318
34
      return;
1319
55.3k
    case DataType::INET:
1320
55.3k
      id = Id::INET;
1321
55.3k
      return;
1322
438
    case DataType::JSONB:
1323
438
      id = Id::JSONB;
1324
438
      return;
1325
78.8k
    case DataType::UUID:
1326
78.8k
      id = Id::UUID;
1327
78.8k
      return;
1328
34
    case DataType::TIMEUUID:
1329
34
      id = Id::TIMEUUID;
1330
34
      return;
1331
40.3k
    case DataType::BINARY:
1332
40.3k
      id = Id::BLOB;
1333
40.3k
      return;
1334
1.79k
    case DataType::DECIMAL:
1335
1.79k
      id = Id::DECIMAL;
1336
1.79k
      return;
1337
21.1k
    case DataType::LIST:
1338
21.1k
      id = Id::LIST;
1339
21.1k
      new (&element_type) shared_ptr<const Type>(
1340
21.1k
          std::make_shared<const Type>(Type(type->param_type(0))));
1341
21.1k
      return;
1342
17.0k
    case DataType::SET:
1343
17.0k
      id = Id::SET;
1344
17.0k
      new (&element_type) shared_ptr<const Type>(
1345
17.0k
          std::make_shared<const Type>(Type(type->param_type(0))));
1346
17.0k
      return;
1347
80.6k
    case DataType::MAP: {
1348
80.6k
      id = Id::MAP;
1349
80.6k
      auto key = std::make_shared<const Type>(Type(type->param_type(0)));
1350
80.6k
      auto value = std::make_shared<const Type>(Type(type->param_type(1)));
1351
80.6k
      new (&map_type) shared_ptr<const MapType>(std::make_shared<MapType>(MapType{key, value}));
1352
80.6k
      return;
1353
0
    }
1354
185
    case DataType::USER_DEFINED_TYPE: {
1355
185
      id = Id::UDT;
1356
185
      std::vector<UDTType::Field> fields;
1357
537
      for (size_t i = 0; i < type->params().size(); i++) {
1358
352
        auto field_type = std::make_shared<const Type>(Type(type->param_type(i)));
1359
352
        UDTType::Field field{type->udtype_field_name(i), field_type};
1360
352
        fields.push_back(std::move(field));
1361
352
      }
1362
185
      new(&udt_type) shared_ptr<const UDTType>(std::make_shared<UDTType>(
1363
185
          UDTType{type->udtype_keyspace_name(), type->udtype_name(), fields}));
1364
185
      return;
1365
0
    }
1366
0
    case DataType::FROZEN: FALLTHROUGH_INTENDED;
1367
0
    QL_UNSUPPORTED_TYPES_IN_SWITCH: FALLTHROUGH_INTENDED;
1368
0
    QL_INVALID_TYPES_IN_SWITCH:
1369
0
      break;
1370
1371
    // default: fall through
1372
0
  }
1373
1374
0
  LOG(ERROR) << "Internal error: invalid/unsupported type " << type->ToString();
1375
0
}
1376
1377
1.87M
ResultResponse::RowsMetadata::Type::~Type() {
1378
1.87M
  switch (id) {
1379
0
    case Id::CUSTOM:
1380
0
      custom_class_name.~basic_string();
1381
0
      return;
1382
0
    case Id::ASCII:
1383
4.42k
    case Id::BIGINT:
1384
85.1k
    case Id::BLOB:
1385
152k
    case Id::BOOLEAN:
1386
152k
    case Id::COUNTER:
1387
156k
    case Id::DECIMAL:
1388
247k
    case Id::DOUBLE:
1389
247k
    case Id::FLOAT:
1390
484k
    case Id::INT:
1391
487k
    case Id::TIMESTAMP:
1392
644k
    case Id::UUID:
1393
1.53M
    case Id::VARCHAR:
1394
1.53M
    case Id::VARINT:
1395
1.53M
    case Id::TIMEUUID:
1396
1.64M
    case Id::INET:
1397
1.64M
    case Id::JSONB:
1398
1.64M
    case Id::DATE:
1399
1.64M
    case Id::TIME:
1400
1.64M
    case Id::SMALLINT:
1401
1.64M
    case Id::TINYINT:
1402
1.64M
      return;
1403
42.4k
    case Id::LIST:
1404
76.3k
    case Id::SET:
1405
76.3k
      element_type.reset();
1406
76.3k
      return;
1407
161k
    case Id::MAP:
1408
161k
      map_type.reset();
1409
161k
      return;
1410
370
    case Id::UDT:
1411
370
      udt_type.reset();
1412
370
      return;
1413
0
    case Id::TUPLE:
1414
0
      tuple_component_types.reset();
1415
0
      return;
1416
1417
    // default: fall through
1418
0
  }
1419
1420
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(id);
1421
0
}
1422
1423
ResultResponse::RowsMetadata::RowsMetadata()
1424
    : flags(kNoMetadata),
1425
      paging_state(""),
1426
      global_table_spec("" /* keyspace */, "" /* table_name */),
1427
4.52k
      col_count(0) {
1428
4.52k
}
1429
1430
ResultResponse::RowsMetadata::RowsMetadata(const client::YBTableName& table_name,
1431
                                           const vector<ColumnSchema>& columns,
1432
                                           const string& paging_state,
1433
                                           bool no_metadata)
1434
    : flags((no_metadata ? kNoMetadata : kHasGlobalTableSpec) |
1435
            (!paging_state.empty() ? kHasMorePages : 0)),
1436
      paging_state(paging_state),
1437
      global_table_spec(no_metadata ? "" : table_name.namespace_name(),
1438
                        no_metadata ? "" : table_name.table_name()),
1439
3.70M
      col_count(narrow_cast<int>(columns.size())) {
1440
3.70M
  if (!no_metadata) {
1441
89.6k
    col_specs.reserve(col_count);
1442
728k
    for (const auto& column : columns) {
1443
728k
      col_specs.emplace_back(column.name(), Type(column.type()));
1444
728k
    }
1445
89.6k
  }
1446
3.70M
}
1447
1448
4.52M
void ResultResponse::SerializeBody(faststring* mesg) const {
1449
4.52M
  SerializeInt(static_cast<int32_t>(kind_), mesg);
1450
4.52M
  SerializeResultBody(mesg);
1451
4.52M
}
1452
1453
941k
void ResultResponse::SerializeType(const RowsMetadata::Type* type, faststring* mesg) const {
1454
941k
  SerializeShort(static_cast<uint16_t>(type->id), mesg);
1455
941k
  switch (type->id) {
1456
0
    case RowsMetadata::Type::Id::CUSTOM:
1457
0
      SerializeString(type->custom_class_name, mesg);
1458
0
      return;
1459
0
    case RowsMetadata::Type::Id::ASCII:
1460
2.21k
    case RowsMetadata::Type::Id::BIGINT:
1461
42.5k
    case RowsMetadata::Type::Id::BLOB:
1462
76.3k
    case RowsMetadata::Type::Id::BOOLEAN:
1463
76.3k
    case RowsMetadata::Type::Id::COUNTER:
1464
78.1k
    case RowsMetadata::Type::Id::DECIMAL:
1465
123k
    case RowsMetadata::Type::Id::DOUBLE:
1466
123k
    case RowsMetadata::Type::Id::FLOAT:
1467
242k
    case RowsMetadata::Type::Id::INT:
1468
243k
    case RowsMetadata::Type::Id::TIMESTAMP:
1469
322k
    case RowsMetadata::Type::Id::UUID:
1470
767k
    case RowsMetadata::Type::Id::VARCHAR:
1471
768k
    case RowsMetadata::Type::Id::VARINT:
1472
768k
    case RowsMetadata::Type::Id::TIMEUUID:
1473
824k
    case RowsMetadata::Type::Id::INET:
1474
824k
    case RowsMetadata::Type::Id::JSONB:
1475
824k
    case RowsMetadata::Type::Id::DATE:
1476
824k
    case RowsMetadata::Type::Id::TIME:
1477
824k
    case RowsMetadata::Type::Id::SMALLINT:
1478
824k
    case RowsMetadata::Type::Id::TINYINT:
1479
824k
      return;
1480
21.3k
    case RowsMetadata::Type::Id::LIST:
1481
38.4k
    case RowsMetadata::Type::Id::SET:
1482
38.4k
      SerializeType(type->element_type.get(), mesg);
1483
38.4k
      return;
1484
80.7k
    case RowsMetadata::Type::Id::MAP:
1485
80.7k
      SerializeType(type->map_type->key_type.get(), mesg);
1486
80.7k
      SerializeType(type->map_type->value_type.get(), mesg);
1487
80.7k
      return;
1488
185
    case RowsMetadata::Type::Id::UDT:
1489
185
      SerializeString(type->udt_type->keyspace, mesg);
1490
185
      SerializeString(type->udt_type->name, mesg);
1491
185
      SerializeShort(type->udt_type->fields.size(), mesg);
1492
352
      for (const auto& field : type->udt_type->fields) {
1493
352
        SerializeString(field.name, mesg);
1494
352
        SerializeType(field.type.get(), mesg);
1495
352
      }
1496
185
      return;
1497
0
    case RowsMetadata::Type::Id::TUPLE:
1498
0
      SerializeShort(type->tuple_component_types->size(), mesg);
1499
0
      for (const auto& component_type : *type->tuple_component_types) {
1500
0
        SerializeType(component_type.get(), mesg);
1501
0
      }
1502
0
      return;
1503
1504
    // default: fall through
1505
0
  }
1506
1507
0
  LOG(ERROR) << "Internal error: unknown type id " << static_cast<uint32_t>(type->id);
1508
0
}
1509
1510
void ResultResponse::SerializeColSpecs(
1511
      const bool has_global_table_spec, const RowsMetadata::GlobalTableSpec& global_table_spec,
1512
99.8k
      const vector<RowsMetadata::ColSpec>& col_specs, faststring* mesg) const {
1513
99.8k
  if (has_global_table_spec) {
1514
99.4k
    SerializeString(global_table_spec.keyspace, mesg);
1515
99.4k
    SerializeString(global_table_spec.table, mesg);
1516
99.4k
  }
1517
740k
  for (const auto& col_spec : col_specs) {
1518
740k
    if (!has_global_table_spec) {
1519
105
      SerializeString(col_spec.keyspace, mesg);
1520
105
      SerializeString(col_spec.table, mesg);
1521
105
    }
1522
740k
    SerializeString(col_spec.column, mesg);
1523
740k
    SerializeType(&col_spec.type, mesg);
1524
740k
  }
1525
99.8k
}
1526
1527
3.71M
void ResultResponse::SerializeRowsMetadata(const RowsMetadata& metadata, faststring* mesg) const {
1528
3.71M
  SerializeInt(metadata.flags, mesg);
1529
3.71M
  SerializeInt(metadata.col_count, mesg);
1530
3.71M
  if (metadata.flags & RowsMetadata::kHasMorePages) {
1531
279
    SerializeBytes(metadata.paging_state, mesg);
1532
279
  }
1533
3.71M
  if (metadata.flags & RowsMetadata::kNoMetadata) {
1534
3.62M
    return;
1535
3.62M
  }
1536
87.5k
  CHECK_EQ(metadata.col_count, metadata.col_specs.size());
1537
87.5k
  SerializeColSpecs(
1538
87.5k
      metadata.flags & RowsMetadata::kHasGlobalTableSpec, metadata.global_table_spec,
1539
87.5k
      metadata.col_specs, mesg);
1540
87.5k
}
1541
1542
//----------------------------------------------------------------------------------------
1543
VoidResultResponse::VoidResultResponse(const CQLRequest& request)
1544
797k
    : ResultResponse(request, Kind::VOID) {
1545
797k
}
1546
1547
797k
VoidResultResponse::~VoidResultResponse() {
1548
797k
}
1549
1550
796k
void VoidResultResponse::SerializeResultBody(faststring* mesg) const {
1551
  // Void result response body is empty
1552
796k
}
1553
1554
//----------------------------------------------------------------------------------------
1555
RowsResultResponse::RowsResultResponse(
1556
    const QueryRequest& request, const ql::RowsResult::SharedPtr& result)
1557
    : ResultResponse(request, Kind::ROWS), result_(result),
1558
92.4k
      skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) {
1559
92.4k
}
1560
1561
RowsResultResponse::RowsResultResponse(
1562
    const ExecuteRequest& request, const ql::RowsResult::SharedPtr& result)
1563
    : ResultResponse(request, Kind::ROWS), result_(result),
1564
3.62M
      skip_metadata_(request.params().flags & CQLMessage::QueryParameters::kSkipMetadataFlag) {
1565
3.62M
}
1566
1567
RowsResultResponse::RowsResultResponse(
1568
    const BatchRequest& request, const ql::RowsResult::SharedPtr& result)
1569
    : ResultResponse(request, Kind::ROWS), result_(result),
1570
52
      skip_metadata_(false) { // Batches don't have the skip_metadata flag.
1571
52
}
1572
1573
3.72M
RowsResultResponse::~RowsResultResponse() {
1574
3.72M
}
1575
1576
3.71M
void RowsResultResponse::SerializeResultBody(faststring* mesg) const {
1577
  // CQL ROWS Response = <metadata><rows_count><rows_content>
1578
3.71M
  SerializeRowsMetadata(
1579
3.71M
      RowsMetadata(result_->table_name(), result_->column_schemas(),
1580
3.71M
                   result_->paging_state(), skip_metadata_), mesg);
1581
1582
  // The <rows_count> (4 bytes) must be in the response in any case, so the 'rows_data()'
1583
  // string in the result must contain it.
1584
18.4E
  LOG_IF(DFATAL, result_->rows_data().size() < 4)
1585
18.4E
      << "Absent rows_count for the CQL ROWS Result Response (rows_data: "
1586
18.4E
      << result_->rows_data().size() << " bytes, expected >= 4)";
1587
3.71M
  mesg->append(result_->rows_data());
1588
3.71M
}
1589
1590
//----------------------------------------------------------------------------------------
1591
337
PreparedResultResponse::PreparedMetadata::PreparedMetadata() {
1592
337
}
1593
1594
PreparedResultResponse::PreparedMetadata::PreparedMetadata(
1595
    const client::YBTableName& table_name, const std::vector<int64_t>& hash_col_indices,
1596
    const vector<client::YBTableName>& bind_table_names,
1597
    const vector<ColumnSchema>& bind_variable_schemas)
1598
    : flags(table_name.empty() ? 0 : kHasGlobalTableSpec),
1599
4.52k
      global_table_spec(table_name.namespace_name(), table_name.table_name()) {
1600
4.52k
  this->pk_indices.reserve(hash_col_indices.size());
1601
6.02k
  for (const size_t index : hash_col_indices) {
1602
6.02k
    this->pk_indices.emplace_back(static_cast<uint16_t>(index));
1603
6.02k
  }
1604
4.52k
  col_specs.reserve(bind_variable_schemas.size());
1605
16.9k
  for (size_t i = 0; i < bind_variable_schemas.size(); i++) {
1606
12.4k
    const ColumnSchema& var = bind_variable_schemas[i];
1607
12.4k
    if (flags & kHasGlobalTableSpec) {
1608
12.3k
      col_specs.emplace_back(var.name(), RowsMetadata::Type(var.type()));
1609
106
    } else {
1610
106
      col_specs.emplace_back(bind_table_names[i], var.name(), RowsMetadata::Type(var.type()));
1611
106
    }
1612
12.4k
  }
1613
4.52k
}
1614
1615
PreparedResultResponse::PreparedResultResponse(const CQLRequest& request, const QueryId& query_id)
1616
337
    : ResultResponse(request, Kind::PREPARED), query_id_(query_id) {
1617
337
}
1618
1619
PreparedResultResponse::PreparedResultResponse(
1620
    const CQLRequest& request, const QueryId& query_id, const ql::PreparedResult& result)
1621
    : ResultResponse(request, Kind::PREPARED), query_id_(query_id),
1622
      prepared_metadata_(result.table_name(), result.hash_col_indices(),
1623
                         result.bind_table_names(), result.bind_variable_schemas()),
1624
      rows_metadata_(!result.column_schemas().empty() ?
1625
                     RowsMetadata(
1626
                         result.table_name(), result.column_schemas(),
1627
                         "" /* paging_state */, false /* no_metadata */) :
1628
4.52k
                     RowsMetadata()) {
1629
4.52k
}
1630
1631
4.86k
PreparedResultResponse::~PreparedResultResponse() {
1632
4.86k
}
1633
1634
void PreparedResultResponse::SerializePreparedMetadata(
1635
4.85k
    const PreparedMetadata& metadata, faststring* mesg) const {
1636
4.85k
  SerializeInt(metadata.flags, mesg);
1637
4.85k
  SerializeInt(narrow_cast<int32_t>(metadata.col_specs.size()), mesg);
1638
4.85k
  if (VersionIsCompatible(kV4Version)) {
1639
4.85k
    SerializeInt(narrow_cast<int32_t>(metadata.pk_indices.size()), mesg);
1640
6.03k
    for (const auto& pk_index : metadata.pk_indices) {
1641
6.03k
      SerializeShort(pk_index, mesg);
1642
6.03k
    }
1643
4.85k
  }
1644
4.85k
  SerializeColSpecs(
1645
4.85k
      metadata.flags & PreparedMetadata::kHasGlobalTableSpec, metadata.global_table_spec,
1646
4.85k
      metadata.col_specs, mesg);
1647
4.85k
}
1648
1649
4.85k
void PreparedResultResponse::SerializeResultBody(faststring* mesg) const {
1650
4.85k
  SerializeShortBytes(query_id_, mesg);
1651
4.85k
  SerializePreparedMetadata(prepared_metadata_, mesg);
1652
4.85k
  SerializeRowsMetadata(rows_metadata_, mesg);
1653
4.85k
}
1654
1655
//----------------------------------------------------------------------------------------
1656
SetKeyspaceResultResponse::SetKeyspaceResultResponse(
1657
    const CQLRequest& request, const ql::SetKeyspaceResult& result)
1658
4.18k
    : ResultResponse(request, Kind::SET_KEYSPACE), keyspace_(result.keyspace()) {
1659
4.18k
}
1660
1661
4.18k
SetKeyspaceResultResponse::~SetKeyspaceResultResponse() {
1662
4.18k
}
1663
1664
4.18k
void SetKeyspaceResultResponse::SerializeResultBody(faststring* mesg) const {
1665
4.18k
  SerializeString(keyspace_, mesg);
1666
4.18k
}
1667
1668
//----------------------------------------------------------------------------------------
1669
SchemaChangeResultResponse::SchemaChangeResultResponse(
1670
    const CQLRequest& request, const ql::SchemaChangeResult& result)
1671
    : ResultResponse(request, Kind::SCHEMA_CHANGE),
1672
      change_type_(result.change_type()), target_(result.object_type()),
1673
6.05k
      keyspace_(result.keyspace_name()), object_(result.object_name()) {
1674
6.05k
}
1675
1676
6.05k
SchemaChangeResultResponse::~SchemaChangeResultResponse() {
1677
6.05k
}
1678
1679
void SchemaChangeResultResponse::Serialize(const CompressionScheme compression_scheme,
1680
6.05k
                                           faststring* mesg) const {
1681
6.05k
  ResultResponse::Serialize(compression_scheme, mesg);
1682
1683
6.05k
  if (registered_events() & kSchemaChange) {
1684
    // TODO: Replace this hack that piggybacks a SCHEMA_CHANGE event along a SCHEMA_CHANGE result
1685
    // response with a formal event notification mechanism.
1686
4
    SchemaChangeEventResponse event(change_type_, target_, keyspace_, object_, argument_types_);
1687
4
    event.Serialize(compression_scheme, mesg);
1688
4
  }
1689
6.05k
}
1690
1691
6.05k
void SchemaChangeResultResponse::SerializeResultBody(faststring* mesg) const {
1692
6.05k
  SerializeString(change_type_, mesg);
1693
6.05k
  SerializeString(target_, mesg);
1694
6.05k
  if (target_ == "KEYSPACE") {
1695
3.12k
    SerializeString(keyspace_, mesg);
1696
2.92k
  } else if (target_ == "TABLE" || target_ == "TYPE") {
1697
2.92k
    SerializeString(keyspace_, mesg);
1698
2.92k
    SerializeString(object_, mesg);
1699
0
  } else if (target_ == "FUNCTION" || target_ == "AGGREGATE") {
1700
0
    SerializeString(keyspace_, mesg);
1701
0
    SerializeString(object_, mesg);
1702
0
    SerializeStringList(argument_types_, mesg);
1703
0
  }
1704
6.05k
}
1705
1706
//----------------------------------------------------------------------------------------
1707
EventResponse::EventResponse(const string& event_type)
1708
52.8k
    : CQLResponse(kEventStreamId, Opcode::EVENT), event_type_(event_type) {
1709
52.8k
}
1710
1711
52.8k
EventResponse::~EventResponse() {
1712
52.8k
}
1713
1714
52.8k
void EventResponse::SerializeBody(faststring* mesg) const {
1715
52.8k
  SerializeString(event_type_, mesg);
1716
52.8k
  SerializeEventBody(mesg);
1717
52.8k
}
1718
1719
0
std::string EventResponse::ToString() const {
1720
0
  return event_type_ + ":" + BodyToString();
1721
0
}
1722
1723
//----------------------------------------------------------------------------------------
1724
TopologyChangeEventResponse::TopologyChangeEventResponse(const string& topology_change_type,
1725
                                                         const Endpoint& node)
1726
    : EventResponse(kTopologyChangeEvent), topology_change_type_(topology_change_type),
1727
52.8k
      node_(node) {
1728
52.8k
}
1729
1730
52.8k
TopologyChangeEventResponse::~TopologyChangeEventResponse() {
1731
52.8k
}
1732
1733
52.8k
void TopologyChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1734
52.8k
  SerializeString(topology_change_type_, mesg);
1735
52.8k
  SerializeInet(node_, mesg);
1736
52.8k
}
1737
1738
0
std::string TopologyChangeEventResponse::BodyToString() const {
1739
0
  return topology_change_type_;
1740
0
}
1741
1742
//----------------------------------------------------------------------------------------
1743
StatusChangeEventResponse::StatusChangeEventResponse(const string& status_change_type,
1744
                                                     const Endpoint& node)
1745
    : EventResponse(kStatusChangeEvent), status_change_type_(status_change_type),
1746
0
      node_(node) {
1747
0
}
1748
1749
0
StatusChangeEventResponse::~StatusChangeEventResponse() {
1750
0
}
1751
1752
0
void StatusChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1753
0
  SerializeString(status_change_type_, mesg);
1754
0
  SerializeInet(node_, mesg);
1755
0
}
1756
1757
0
std::string StatusChangeEventResponse::BodyToString() const {
1758
0
  return status_change_type_;
1759
0
}
1760
1761
//----------------------------------------------------------------------------------------
1762
const vector<string> SchemaChangeEventResponse::kEmptyArgumentTypes = {};
1763
1764
SchemaChangeEventResponse::SchemaChangeEventResponse(
1765
    const string& change_type, const string& target,
1766
    const string& keyspace, const string& object, const vector<string>& argument_types)
1767
    : EventResponse(kSchemaChangeEvent), change_type_(change_type), target_(target),
1768
4
      keyspace_(keyspace), object_(object), argument_types_(argument_types) {
1769
4
}
1770
1771
4
SchemaChangeEventResponse::~SchemaChangeEventResponse() {
1772
4
}
1773
1774
0
std::string SchemaChangeEventResponse::BodyToString() const {
1775
0
  return change_type_;
1776
0
}
1777
1778
4
void SchemaChangeEventResponse::SerializeEventBody(faststring* mesg) const {
1779
4
  SerializeString(change_type_, mesg);
1780
4
  SerializeString(target_, mesg);
1781
4
  if (target_ == "KEYSPACE") {
1782
2
    SerializeString(keyspace_, mesg);
1783
2
  } else if (target_ == "TABLE" || target_ == "TYPE") {
1784
2
    SerializeString(keyspace_, mesg);
1785
2
    SerializeString(object_, mesg);
1786
0
  } else if (target_ == "FUNCTION" || target_ == "AGGREGATE") {
1787
0
    SerializeString(keyspace_, mesg);
1788
0
    SerializeString(object_, mesg);
1789
0
    SerializeStringList(argument_types_, mesg);
1790
0
  }
1791
4
}
1792
1793
//----------------------------------------------------------------------------------------
1794
AuthChallengeResponse::AuthChallengeResponse(const CQLRequest& request, const string& token)
1795
0
    : CQLResponse(request, Opcode::AUTH_CHALLENGE), token_(token) {
1796
0
}
1797
1798
0
AuthChallengeResponse::~AuthChallengeResponse() {
1799
0
}
1800
1801
0
void AuthChallengeResponse::SerializeBody(faststring* mesg) const {
1802
0
  SerializeBytes(token_, mesg);
1803
0
}
1804
1805
//----------------------------------------------------------------------------------------
1806
AuthSuccessResponse::AuthSuccessResponse(const CQLRequest& request, const string& token)
1807
6.16k
    : CQLResponse(request, Opcode::AUTH_SUCCESS), token_(token) {
1808
6.16k
}
1809
1810
6.16k
AuthSuccessResponse::~AuthSuccessResponse() {
1811
6.16k
}
1812
1813
6.16k
void AuthSuccessResponse::SerializeBody(faststring* mesg) const {
1814
6.16k
  SerializeBytes(token_, mesg);
1815
6.16k
}
1816
1817
CQLServerEvent::CQLServerEvent(std::unique_ptr<EventResponse> event_response)
1818
52.8k
    : event_response_(std::move(event_response)) {
1819
52.8k
  CHECK_NOTNULL(event_response_.get());
1820
52.8k
  faststring temp;
1821
52.8k
  event_response_->Serialize(CQLMessage::CompressionScheme::kNone, &temp);
1822
52.8k
  serialized_response_ = RefCntBuffer(temp);
1823
52.8k
}
1824
1825
94.0k
void CQLServerEvent::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) const {
1826
94.0k
  output->push_back(serialized_response_);
1827
94.0k
}
1828
1829
0
std::string CQLServerEvent::ToString() const {
1830
0
  return event_response_->ToString();
1831
0
}
1832
1833
10.8k
CQLServerEventList::CQLServerEventList() {
1834
10.8k
}
1835
1836
18.9k
void CQLServerEventList::Transferred(const Status& status, rpc::Connection*) {
1837
18.9k
  if (!status.ok()) {
1838
0
    LOG(WARNING) << "Transfer of CQL server event failed: " << status.ToString();
1839
0
  }
1840
18.9k
}
1841
1842
void CQLServerEventList::Serialize(
1843
19.4k
    boost::container::small_vector_base<RefCntBuffer>* output) {
1844
94.3k
  for (const auto& cql_server_event : cql_server_events_) {
1845
94.3k
    cql_server_event->Serialize(output);
1846
94.3k
  }
1847
19.4k
}
1848
1849
0
std::string CQLServerEventList::ToString() const {
1850
0
  std::string ret = "";
1851
0
  for (const auto& cql_server_event : cql_server_events_) {
1852
0
    if (!ret.empty()) {
1853
0
      ret += ", ";
1854
0
    }
1855
0
    ret += cql_server_event->ToString();
1856
0
  }
1857
0
  return ret;
1858
0
}
1859
1860
52.8k
void CQLServerEventList::AddEvent(std::unique_ptr<CQLServerEvent> event) {
1861
52.8k
  cql_server_events_.push_back(std::move(event));
1862
52.8k
}
1863
1864
}  // namespace ql
1865
}  // namespace yb