YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cql_test_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/integration-tests/cql_test_util.h"
15
16
#include <cassandra.h>
17
18
#include <thread>
19
20
#include "yb/gutil/casts.h"
21
#include "yb/gutil/strings/join.h"
22
23
#include "yb/util/enums.h"
24
#include "yb/util/status_log.h"
25
#include "yb/util/tsan_util.h"
26
27
using namespace std::literals;
28
29
namespace yb {
30
31
// Supported types - read value:
32
template <>
33
4.36k
CassError GetCassandraValue<std::string>::Apply(const CassValue* val, std::string* v) {
34
4.36k
  const char* s = nullptr;
35
4.36k
  size_t sz = 0;
36
4.36k
  auto result = cass_value_get_string(val, &s, &sz);
37
4.36k
  if (result != CASS_OK) {
38
0
    return result;
39
0
  }
40
4.36k
  *v = std::string(s, sz);
41
4.36k
  return result;
42
4.36k
}
43
44
template <>
45
2.86k
CassError GetCassandraValue<Slice>::Apply(const CassValue* val, Slice* v) {
46
2.86k
  const cass_byte_t* data = nullptr;
47
2.86k
  size_t size = 0;
48
2.86k
  auto result = cass_value_get_bytes(val, &data, &size);
49
2.86k
  *v = Slice(data, size);
50
2.86k
  return result;
51
2.86k
}
52
53
template <>
54
7
CassError GetCassandraValue<cass_bool_t>::Apply(const CassValue* val, cass_bool_t* v) {
55
7
  return cass_value_get_bool(val, v);
56
7
}
57
58
template <>
59
3
CassError GetCassandraValue<cass_float_t>::Apply(const CassValue* val, cass_float_t* v) {
60
3
  return cass_value_get_float(val, v);
61
3
}
62
63
template <>
64
17
CassError GetCassandraValue<cass_double_t>::Apply(const CassValue* val, cass_double_t* v) {
65
17
  return cass_value_get_double(val, v);
66
17
}
67
68
template <>
69
79
CassError GetCassandraValue<cass_int32_t>::Apply(const CassValue* val, cass_int32_t* v) {
70
79
  return cass_value_get_int32(val, v);
71
79
}
72
73
template <>
74
34
CassError GetCassandraValue<cass_int64_t>::Apply(const CassValue* val, cass_int64_t* v) {
75
34
  return cass_value_get_int64(val, v);
76
34
}
77
78
template <>
79
4
CassError GetCassandraValue<CassandraJson>::Apply(const CassValue* val, CassandraJson* v) {
80
4
  std::string temp;
81
4
  auto result = GetCassandraValue<std::string>::Apply(val, &temp);
82
4
  *v = CassandraJson(std::move(temp));
83
4
  return result;
84
4
}
85
86
template <>
87
1.43k
CassError GetCassandraValue<CassUuid>::Apply(const CassValue* val, CassUuid* v) {
88
1.43k
  return cass_value_get_uuid(val, v);
89
1.43k
}
90
91
template <>
92
1.48k
CassError GetCassandraValue<CassInet>::Apply(const CassValue* val, CassInet* v) {
93
1.48k
  return cass_value_get_inet(val, v);
94
1.48k
}
95
96
11.5k
bool CassandraValue::IsNull() const {
97
11.5k
  return cass_value_is_null(value_);
98
11.5k
}
99
100
11.5k
std::string CassandraValue::ToString() const {
101
11.5k
  auto value_type = cass_value_type(value_);
102
11.5k
  if (IsNull()) {
103
0
    return "NULL";
104
0
  }
105
11.5k
  switch (value_type) {
106
2.86k
    case CASS_VALUE_TYPE_BLOB:
107
2.86k
      return As<Slice>().ToDebugHexString();
108
4.34k
    case CASS_VALUE_TYPE_VARCHAR:
109
4.34k
      return As<std::string>();
110
0
    case CASS_VALUE_TYPE_BIGINT:
111
0
      return std::to_string(As<cass_int64_t>());
112
2
    case CASS_VALUE_TYPE_INT:
113
2
      return std::to_string(As<cass_int32_t>());
114
1.43k
    case CASS_VALUE_TYPE_UUID: {
115
1.43k
      char buffer[CASS_UUID_STRING_LENGTH];
116
1.43k
      cass_uuid_string(As<CassUuid>(), buffer);
117
1.43k
      return buffer;
118
0
    }
119
1.48k
    case CASS_VALUE_TYPE_INET: {
120
1.48k
      char buffer[CASS_INET_STRING_LENGTH];
121
1.48k
      cass_inet_string(As<CassInet>(), buffer);
122
1.48k
      return buffer;
123
0
    }
124
1.43k
    case CASS_VALUE_TYPE_MAP: {
125
1.43k
      std::string result = "{";
126
1.43k
      CassIteratorPtr iterator(cass_iterator_from_map(value_));
127
1.43k
      bool first = true;
128
2.91k
      while (cass_iterator_next(iterator.get())) {
129
1.48k
        if (first) {
130
1.43k
          first = false;
131
1.43k
        } else {
132
48
          result += ", ";
133
48
        }
134
1.48k
        result += CassandraValue(cass_iterator_get_map_key(iterator.get())).ToString();
135
1.48k
        result += " => ";
136
1.48k
        result += CassandraValue(cass_iterator_get_map_value(iterator.get())).ToString();
137
1.48k
      }
138
1.43k
      result += "}";
139
1.43k
      return result;
140
0
    }
141
0
    case CASS_VALUE_TYPE_LIST: {
142
0
      std::string result = "[";
143
0
      CassIteratorPtr iterator(cass_iterator_from_collection(value_));
144
0
      bool first = true;
145
0
      while (cass_iterator_next(iterator.get())) {
146
0
        if (!first) {
147
0
          result += ", ";
148
0
        }
149
0
        first = false;
150
0
        result += CassandraValue(cass_iterator_get_value(iterator.get())).ToString();
151
0
      }
152
0
      result += "]";
153
0
      return result;
154
0
    }
155
0
    default:
156
0
      return "Not supported: " + std::to_string(to_underlying(value_type));
157
11.5k
  }
158
11.5k
}
159
160
10.0k
bool CassandraRowIterator::Next() {
161
10.0k
  return cass_iterator_next(cass_iterator_.get()) != cass_false;
162
10.0k
}
163
164
8.59k
CassandraValue CassandraRowIterator::Value() const {
165
8.59k
  return CassandraValue(cass_iterator_get_column(cass_iterator_.get()));
166
8.59k
}
167
168
161
CassandraValue CassandraRow::Value(size_t index) const {
169
161
  return CassandraValue(cass_row_get_column(cass_row_, index));
170
161
}
171
172
1.43k
CassandraRowIterator CassandraRow::CreateIterator() const {
173
1.43k
  return CassandraRowIterator(cass_iterator_from_row(cass_row_));
174
1.43k
}
175
176
1
std::string CassandraRow::RenderToString(const std::string& separator) {
177
1
  std::string result;
178
1
  auto iter = CreateIterator();
179
3
  while (iter.Next()) {
180
2
    if (!result.empty()) {
181
1
      result += separator;
182
1
    }
183
2
    result += iter.Value().ToString();
184
2
  }
185
1
  return result;
186
1
}
187
188
0
void CassandraRow::TakeIterator(CassIteratorPtr iterator) {
189
0
  cass_iterator_ = std::move(iterator);
190
0
}
191
192
1.66k
bool CassandraIterator::Next() {
193
1.66k
  return cass_iterator_next(cass_iterator_.get()) != cass_false;
194
1.66k
}
195
196
1.52k
CassandraRow CassandraIterator::Row() {
197
1.52k
  return CassandraRow(cass_iterator_get_row(cass_iterator_.get()));
198
1.52k
}
199
200
0
void CassandraIterator::MoveToRow(CassandraRow* row) {
201
0
  row->TakeIterator(std::move(cass_iterator_));
202
0
}
203
204
146
CassandraIterator CassandraResult::CreateIterator() const {
205
146
  return CassandraIterator(cass_iterator_from_result(cass_result_.get()));
206
146
}
207
208
std::string CassandraResult::RenderToString(
209
1
    const std::string& line_separator, const std::string& value_separator) const {
210
1
  std::string result;
211
1
  auto iter = CreateIterator();
212
2
  while (iter.Next()) {
213
1
    auto row = iter.Row();
214
1
    if (!result.empty()) {
215
0
      result += ";";
216
0
    }
217
1
    result += row.RenderToString();
218
1
  }
219
1
  return result;
220
1
}
221
222
3
bool CassandraFuture::Ready() const {
223
3
  return cass_future_ready(future_.get());
224
3
}
225
226
32.3k
CHECKED_STATUS CassandraFuture::Wait() {
227
32.3k
  cass_future_wait(future_.get());
228
32.3k
  return CheckErrorCode();
229
32.3k
}
230
231
0
CHECKED_STATUS CassandraFuture::WaitFor(MonoDelta duration) {
232
0
  if (!cass_future_wait_timed(future_.get(), duration.ToMicroseconds())) {
233
0
    return STATUS(TimedOut, "Future timed out");
234
0
  }
235
236
0
  return CheckErrorCode();
237
0
}
238
239
148
CassandraResult CassandraFuture::Result() {
240
148
  return CassandraResult(cass_future_get_result(future_.get()));
241
148
}
242
243
620
CassandraPrepared CassandraFuture::Prepared() {
244
620
  return CassandraPrepared(cass_future_get_prepared(future_.get()));
245
620
}
246
247
32.3k
Status CassandraFuture::CheckErrorCode() {
248
32.3k
  const CassError rc = cass_future_error_code(future_.get());
249
32.3k
  VLOG
(2) << "Last operation RC: " << rc6
;
250
251
32.3k
  if (rc != CASS_OK) {
252
61
    const char* message = nullptr;
253
61
    size_t message_sz = 0;
254
61
    cass_future_error_message(future_.get(), &message, &message_sz);
255
61
    if (message_sz == 0) {
256
1
      message = cass_error_desc(rc);
257
1
      message_sz = strlen(message);
258
1
    }
259
61
    Slice message_slice(message, message_sz);
260
61
    switch (rc) {
261
2
      case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE: FALLTHROUGH_INTENDED;
262
3
      case CASS_ERROR_SERVER_OVERLOADED:
263
3
        return STATUS(ServiceUnavailable, message_slice);
264
3
      case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
265
3
        return STATUS(TimedOut, message_slice);
266
3
      case CASS_ERROR_SERVER_INVALID_QUERY:
267
3
        return STATUS(QLError, message_slice);
268
52
      default:
269
52
        LOG(INFO) << "Cassandra error code: " << rc << ": " << message;
270
52
        return STATUS(RuntimeError, message_slice);
271
61
    }
272
61
  }
273
274
32.2k
  return Status::OK();
275
32.3k
}
276
277
namespace {
278
279
1.12M
void CheckErrorCode(const CassError& error_code) {
280
1.12M
  CHECK_EQ
(CASS_OK, error_code) << ": " << cass_error_desc(error_code)0
;
281
1.12M
}
282
283
} // namespace
284
285
2
void CassandraStatement::SetKeyspace(const string& keyspace) {
286
2
  CheckErrorCode(cass_statement_set_keyspace(cass_statement_.get(), keyspace.c_str()));
287
2
}
288
289
728k
void CassandraStatement::Bind(size_t index, const string& v) {
290
728k
  CheckErrorCode(cass_statement_bind_string(cass_statement_.get(), index, v.c_str()));
291
728k
}
292
293
7
void CassandraStatement::Bind(size_t index, const cass_bool_t& v) {
294
7
  CheckErrorCode(cass_statement_bind_bool(cass_statement_.get(), index, v));
295
7
}
296
297
3
void CassandraStatement::Bind(size_t index, const cass_float_t& v) {
298
3
  CheckErrorCode(cass_statement_bind_float(cass_statement_.get(), index, v));
299
3
}
300
301
10
void CassandraStatement::Bind(size_t index, const cass_double_t& v) {
302
10
  CheckErrorCode(cass_statement_bind_double(cass_statement_.get(), index, v));
303
10
}
304
305
17.2k
void CassandraStatement::Bind(size_t index, const cass_int32_t& v) {
306
17.2k
  CheckErrorCode(cass_statement_bind_int32(cass_statement_.get(), index, v));
307
17.2k
}
308
309
375k
void CassandraStatement::Bind(size_t index, const cass_int64_t& v) {
310
375k
  CheckErrorCode(cass_statement_bind_int64(cass_statement_.get(), index, v));
311
375k
}
312
313
3
void CassandraStatement::Bind(size_t index, const CassandraJson& v) {
314
3
  CheckErrorCode(cass_statement_bind_string(cass_statement_.get(), index, v.value().c_str()));
315
3
}
316
317
9
CassStatement* CassandraStatement::get() const {
318
9
  return cass_statement_.get();
319
9
}
320
321
360k
void CassandraBatch::Add(CassandraStatement* statement) {
322
360k
  cass_batch_add_statement(cass_batch_.get(), statement->cass_statement_.get());
323
360k
}
324
325
137
void DeleteSession::operator()(CassSession* session) const {
326
137
  if (session != nullptr) {
327
137
    WARN_NOT_OK(CassandraFuture(cass_session_close(session)).Wait(), "Close session");
328
137
    cass_session_free(session);
329
137
  }
330
137
}
331
332
141
CHECKED_STATUS CassandraSession::Connect(CassCluster* cluster) {
333
141
  cass_session_.reset(CHECK_NOTNULL(cass_session_new()));
334
141
  return CassandraFuture(cass_session_connect(cass_session_.get(), cluster)).Wait();
335
141
}
336
337
140
Result<CassandraSession> CassandraSession::Create(CassCluster* cluster) {
338
140
  LOG(INFO) << "Create new session ...";
339
140
  CassandraSession result;
340
140
  RETURN_NOT_OK(result.Connect(cluster));
341
114
  LOG(INFO) << "Create new session - DONE";
342
114
  return result;
343
140
}
344
345
29.9k
CHECKED_STATUS CassandraSession::Execute(const CassandraStatement& statement) {
346
29.9k
  CassandraFuture future(cass_session_execute(
347
29.9k
      cass_session_.get(), statement.cass_statement_.get()));
348
29.9k
  return future.Wait();
349
29.9k
}
350
351
149
Result<CassandraResult> CassandraSession::ExecuteWithResult(const CassandraStatement& statement) {
352
149
  CassandraFuture future(cass_session_execute(
353
149
      cass_session_.get(), statement.cass_statement_.get()));
354
149
  RETURN_NOT_OK(future.Wait());
355
148
  return future.Result();
356
149
}
357
358
1
Result<std::string> CassandraSession::ExecuteAndRenderToString(const std::string& statement) {
359
1
  return VERIFY_RESULT(ExecuteWithResult(statement)).RenderToString();
360
1
}
361
362
646
CassandraFuture CassandraSession::ExecuteGetFuture(const CassandraStatement& statement) {
363
646
  return CassandraFuture(
364
646
      cass_session_execute(cass_session_.get(), statement.cass_statement_.get()));
365
646
}
366
367
46
CassandraFuture CassandraSession::ExecuteGetFuture(const string& query) {
368
46
  LOG(INFO) << "Execute query: " << query;
369
46
  return ExecuteGetFuture(CassandraStatement(query));
370
46
}
371
372
1.61k
CHECKED_STATUS CassandraSession::ExecuteQuery(const string& query) {
373
1.61k
  LOG(INFO) << "Execute query: " << query;
374
1.61k
  return Execute(CassandraStatement(query));
375
1.61k
}
376
377
3
Result<CassandraResult> CassandraSession::ExecuteWithResult(const string& query) {
378
3
  LOG(INFO) << "Execute query: " << query;
379
3
  return ExecuteWithResult(CassandraStatement(query));
380
3
}
381
382
697
CHECKED_STATUS CassandraSession::ExecuteBatch(const CassandraBatch& batch) {
383
697
  return SubmitBatch(batch).Wait();
384
697
}
385
386
1.29k
CassandraFuture CassandraSession::SubmitBatch(const CassandraBatch& batch) {
387
1.29k
  return CassandraFuture(
388
1.29k
      cass_session_execute_batch(cass_session_.get(), batch.cass_batch_.get()));
389
1.29k
}
390
391
Result<CassandraPrepared> CassandraSession::Prepare(
392
621
    const string& prepare_query, MonoDelta timeout, const string& local_keyspace) {
393
621
  VLOG(2) << "Execute prepare request: " << prepare_query << ", timeout: " << timeout
394
0
          << ", keyspace: " << local_keyspace;
395
621
  auto deadline = CoarseMonoClock::now() + timeout;
396
621
  for (;;) {
397
621
    CassFuture* cass_future_ptr = nullptr;
398
621
    if (local_keyspace.empty()) {
399
619
        cass_future_ptr = cass_session_prepare(cass_session_.get(), prepare_query.c_str());
400
619
    } else {
401
2
        CassandraStatement statement(prepare_query);
402
2
        statement.SetKeyspace(local_keyspace);
403
2
        cass_future_ptr = cass_session_prepare_from_existing(cass_session_.get(), statement.get());
404
2
    }
405
406
621
    CassandraFuture future(cass_future_ptr);
407
621
    auto wait_result = future.Wait();
408
621
    if (wait_result.ok()) {
409
620
      return future.Prepared();
410
620
    }
411
412
1
    if (timeout == MonoDelta::kZero || 
CoarseMonoClock::now() > deadline0
) {
413
1
      return wait_result;
414
1
    }
415
0
    std::this_thread::sleep_for(100ms);
416
0
  }
417
621
}
418
419
53
void CassandraSession::Reset() {
420
53
  cass_session_.reset();
421
53
}
422
423
389k
CassandraStatement CassandraPrepared::Bind() {
424
389k
  return CassandraStatement(cass_prepared_bind(prepared_.get()));
425
389k
}
426
427
const MonoDelta kCassandraTimeOut = 20s * kTimeMultiplier;
428
const std::string kCqlTestKeyspace = "test";
429
430
CppCassandraDriver::CppCassandraDriver(
431
    const std::vector<std::string>& hosts, uint16_t port,
432
81
    UsePartitionAwareRouting use_partition_aware_routing) {
433
434
  // Enable detailed tracing inside driver.
435
81
  if (VLOG_IS_ON(4)) {
436
0
    cass_log_set_level(CASS_LOG_TRACE);
437
81
  } else if (VLOG_IS_ON(3)) {
438
0
    cass_log_set_level(CASS_LOG_DEBUG);
439
81
  } else if (VLOG_IS_ON(2)) {
440
0
    cass_log_set_level(CASS_LOG_INFO);
441
81
  } else if (VLOG_IS_ON(1)) {
442
0
    cass_log_set_level(CASS_LOG_WARN);
443
0
  }
444
445
81
  auto hosts_str = JoinStrings(hosts, ",");
446
81
  LOG(INFO) << "Create Cassandra cluster to " << hosts_str << " :" << port << " ...";
447
81
  cass_cluster_ = CHECK_NOTNULL(cass_cluster_new());
448
81
  CheckErrorCode(cass_cluster_set_contact_points(cass_cluster_, hosts_str.c_str()));
449
81
  CheckErrorCode(cass_cluster_set_port(cass_cluster_, port));
450
81
  cass_cluster_set_request_timeout(
451
81
      cass_cluster_, narrow_cast<uint32_t>(kCassandraTimeOut.ToMilliseconds()));
452
453
  // Setup cluster configuration: partitions metadata refresh timer = 3 seconds.
454
81
  cass_cluster_set_partition_aware_routing(
455
81
      cass_cluster_, use_partition_aware_routing ? 
cass_true79
:
cass_false2
, 3);
456
81
}
457
458
80
CppCassandraDriver::~CppCassandraDriver() {
459
80
  LOG(INFO) << "Terminating driver...";
460
461
80
  if (cass_cluster_) {
462
80
    cass_cluster_free(cass_cluster_);
463
80
    cass_cluster_ = nullptr;
464
80
  }
465
466
80
  LOG(INFO) << "Terminating driver - DONE";
467
80
}
468
469
141
Result<CassandraSession> CppCassandraDriver::CreateSession() {
470
141
  return CassandraSession::Create(cass_cluster_);
471
141
}
472
473
3
Result<CassandraSession> EstablishSession(CppCassandraDriver* driver) {
474
3
  auto session = VERIFY_RESULT(driver->CreateSession());
475
3
  RETURN_NOT_OK(
476
3
      session.ExecuteQuery(Format("CREATE KEYSPACE IF NOT EXISTS $0;", kCqlTestKeyspace)));
477
3
  RETURN_NOT_OK(session.ExecuteQuery(Format("USE $0;", kCqlTestKeyspace)));
478
3
  return session;
479
3
}
480
481
} // namespace yb