YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_processor.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/cql/cqlserver/cql_processor.h"
17
18
#include <ldap.h>
19
20
#include <boost/algorithm/string.hpp>
21
22
#include "yb/common/ql_rowblock.h"
23
#include "yb/common/ql_value.h"
24
#include "yb/common/schema.h"
25
26
#include "yb/gutil/bind.h"
27
#include "yb/gutil/casts.h"
28
#include "yb/gutil/strings/escaping.h"
29
30
#include "yb/rpc/connection.h"
31
#include "yb/rpc/messenger.h"
32
33
#include "yb/util/crypt.h"
34
#include "yb/util/flag_tags.h"
35
#include "yb/util/format.h"
36
#include "yb/util/logging.h"
37
#include "yb/util/metrics.h"
38
#include "yb/util/result.h"
39
#include "yb/util/status_format.h"
40
#include "yb/util/status_log.h"
41
42
#include "yb/yql/cql/cqlserver/cql_service.h"
43
#include "yb/yql/cql/ql/util/errcodes.h"
44
45
using namespace std::literals;
46
47
METRIC_DEFINE_histogram_with_percentiles(
48
    server, handler_latency_yb_cqlserver_CQLServerService_GetProcessor,
49
    "Time spent to get a processor for processing a CQL query request.",
50
    yb::MetricUnit::kMicroseconds,
51
    "Time spent to get a processor for processing a CQL query request.", 60000000LU, 2);
52
METRIC_DEFINE_histogram_with_percentiles(
53
    server, handler_latency_yb_cqlserver_CQLServerService_ProcessRequest,
54
    "Time spent processing a CQL query request. From parsing till executing",
55
    yb::MetricUnit::kMicroseconds,
56
    "Time spent processing a CQL query request. From parsing till executing", 60000000LU, 2);
57
METRIC_DEFINE_histogram_with_percentiles(
58
    server, handler_latency_yb_cqlserver_CQLServerService_ParseRequest,
59
    "Time spent parsing CQL query request", yb::MetricUnit::kMicroseconds,
60
    "Time spent parsing CQL query request", 60000000LU, 2);
61
METRIC_DEFINE_histogram_with_percentiles(
62
    server, handler_latency_yb_cqlserver_CQLServerService_QueueResponse,
63
    "Time spent to queue the response for a CQL query request back on the network",
64
    yb::MetricUnit::kMicroseconds,
65
    "Time spent after computing the CQL response to queue it onto the connection.", 60000000LU, 2);
66
METRIC_DEFINE_histogram_with_percentiles(
67
    server, handler_latency_yb_cqlserver_CQLServerService_ExecuteRequest,
68
    "Time spent executing the CQL query request in the handler", yb::MetricUnit::kMicroseconds,
69
    "Time spent executing the CQL query request in the handler", 60000000LU, 2);
70
METRIC_DEFINE_counter(
71
    server, yb_cqlserver_CQLServerService_ParsingErrors, "Errors encountered when parsing ",
72
    yb::MetricUnit::kRequests, "Errors encountered when parsing ");
73
METRIC_DEFINE_histogram_with_percentiles(
74
    server, handler_latency_yb_cqlserver_CQLServerService_Any,
75
    "yb.cqlserver.CQLServerService.AnyMethod RPC Time", yb::MetricUnit::kMicroseconds,
76
    "Microseconds spent handling "
77
    "yb.cqlserver.CQLServerService.AnyMethod() "
78
    "RPC requests",
79
    60000000LU, 2);
80
81
METRIC_DEFINE_gauge_int64(server, cql_processors_alive,
82
                          "Number of alive CQL Processors.",
83
                          yb::MetricUnit::kUnits,
84
                          "Number of alive CQL Processors.");
85
86
METRIC_DEFINE_counter(server, cql_processors_created,
87
                      "Number of created CQL Processors.",
88
                      yb::MetricUnit::kUnits,
89
                      "Number of created CQL Processors.");
90
91
METRIC_DEFINE_gauge_int64(server, cql_parsers_alive,
92
                          "Number of alive CQL Parsers.",
93
                          yb::MetricUnit::kUnits,
94
                          "Number of alive CQL Parsers.");
95
96
METRIC_DEFINE_counter(server, cql_parsers_created,
97
                      "Number of created CQL Parsers.",
98
                      yb::MetricUnit::kUnits,
99
                      "Number of created CQL Parsers.");
100
101
DECLARE_bool(use_cassandra_authentication);
102
DECLARE_bool(ycql_cache_login_info);
103
DECLARE_int32(client_read_write_timeout_ms);
104
105
// LDAP specific flags
106
DEFINE_bool(ycql_use_ldap, false, "Use LDAP for user logins");
107
DEFINE_string(ycql_ldap_users_to_skip_csv, "", "Users that are authenticated via the local password"
108
  " check instead of LDAP (if ycql_use_ldap=true). This is a comma separated list");
109
TAG_FLAG(ycql_ldap_users_to_skip_csv, sensitive_info);
110
DEFINE_string(ycql_ldap_server, "", "LDAP server of the form <scheme>://<ip>:<port>");
111
DEFINE_bool(ycql_ldap_tls, false, "Connect to LDAP server using TLS encryption.");
112
113
// LDAP flags for simple bind mode
114
DEFINE_string(ycql_ldap_user_prefix, "", "String used for prepending the user name when forming "
115
  "the DN for binding to the LDAP server");
116
DEFINE_string(ycql_ldap_user_suffix, "", "String used for appending the user name when forming the "
117
  "DN for binding to the LDAP Server.");
118
119
// Flags for LDAP search + bind mode
120
DEFINE_string(ycql_ldap_base_dn, "", "Specifies the base directory to begin the user name search");
121
DEFINE_string(ycql_ldap_bind_dn, "", "Specifies the username to perform the initial search when "
122
  "doing search + bind authentication");
123
TAG_FLAG(ycql_ldap_bind_dn, sensitive_info);
124
DEFINE_string(ycql_ldap_bind_passwd, "", "Password for username being used to perform the initial "
125
  "search when doing search + bind authentication");
126
TAG_FLAG(ycql_ldap_bind_passwd, sensitive_info);
127
DEFINE_string(ycql_ldap_search_attribute, "", "Attribute to match against the username in the "
128
  "search when doing search + bind authentication. If no attribute is specified, the uid attribute "
129
  "is used.");
130
DEFINE_string(ycql_ldap_search_filter, "", "The search filter to use when doing search + bind "
131
  "authentication.");
132
133
namespace yb {
134
namespace cqlserver {
135
136
constexpr const char* const kCassandraPasswordAuthenticator =
137
    "org.apache.cassandra.auth.PasswordAuthenticator";
138
139
extern const char* const kRoleColumnNameSaltedHash;
140
extern const char* const kRoleColumnNameCanLogin;
141
142
using namespace yb::ql; // NOLINT
143
144
using std::make_unique;
145
using std::shared_ptr;
146
using std::unique_ptr;
147
148
using client::YBClient;
149
using client::YBSession;
150
using ql::ExecutedResult;
151
using ql::PreparedResult;
152
using ql::RowsResult;
153
using ql::SetKeyspaceResult;
154
using ql::SchemaChangeResult;
155
using ql::QLProcessor;
156
using ql::ParseTree;
157
using ql::Statement;
158
using ql::StatementBatch;
159
using ql::ErrorCode;
160
using ql::GetErrorCode;
161
162
using ql::audit::IsPrepare;
163
using ql::audit::ErrorIsFormatted;
164
165
using strings::Substitute;
166
167
using yb::util::bcrypt_checkpw;
168
169
//------------------------------------------------------------------------------------------------
170
CQLMetrics::CQLMetrics(const scoped_refptr<yb::MetricEntity>& metric_entity)
171
4.54k
    : QLMetrics(metric_entity) {
172
4.54k
  time_to_process_request_ =
173
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_ProcessRequest.Instantiate(
174
4.54k
          metric_entity);
175
4.54k
  time_to_get_cql_processor_ =
176
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_GetProcessor.Instantiate(metric_entity);
177
4.54k
  time_to_parse_cql_wrapper_ =
178
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_ParseRequest.Instantiate(metric_entity);
179
4.54k
  time_to_execute_cql_request_ =
180
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_ExecuteRequest.Instantiate(
181
4.54k
          metric_entity);
182
4.54k
  time_to_queue_cql_response_ =
183
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_QueueResponse.Instantiate(metric_entity);
184
4.54k
  rpc_method_metrics_.handler_latency =
185
4.54k
      METRIC_handler_latency_yb_cqlserver_CQLServerService_Any.Instantiate(metric_entity);
186
4.54k
  num_errors_parsing_cql_ =
187
4.54k
      METRIC_yb_cqlserver_CQLServerService_ParsingErrors.Instantiate(metric_entity);
188
4.54k
  cql_processors_alive_ = METRIC_cql_processors_alive.Instantiate(metric_entity, 0);
189
4.54k
  cql_processors_created_ = METRIC_cql_processors_created.Instantiate(metric_entity);
190
4.54k
  parsers_alive_ = METRIC_cql_parsers_alive.Instantiate(metric_entity, 0);
191
4.54k
  parsers_created_ = METRIC_cql_parsers_created.Instantiate(metric_entity);
192
4.54k
}
193
194
//------------------------------------------------------------------------------------------------
195
CQLProcessor::CQLProcessor(CQLServiceImpl* service_impl, const CQLProcessorListPos& pos)
196
    : QLProcessor(service_impl->client(), service_impl->metadata_cache(),
197
                  service_impl->cql_metrics().get(),
198
                  &service_impl->parser_pool(),
199
                  service_impl->clock(),
200
                  std::bind(&CQLServiceImpl::TransactionPool, service_impl)),
201
      service_impl_(service_impl),
202
      cql_metrics_(service_impl->cql_metrics()),
203
      pos_(pos),
204
      statement_executed_cb_(Bind(&CQLProcessor::StatementExecuted, Unretained(this))),
205
17.1k
      consumption_(service_impl->processors_mem_tracker(), sizeof(*this)) {
206
17.1k
  IncrementCounter(cql_metrics_->cql_processors_created_);
207
17.1k
  IncrementGauge(cql_metrics_->cql_processors_alive_);
208
17.1k
}
209
210
0
CQLProcessor::~CQLProcessor() {
211
0
  DecrementGauge(cql_metrics_->cql_processors_alive_);
212
0
}
213
214
0
void CQLProcessor::Shutdown() {
215
0
  executor_.Shutdown();
216
0
  auto call = std::move(call_);
217
0
  if (call) {
218
0
    call->RespondFailure(
219
0
        rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, STATUS(Aborted, "Aborted"));
220
0
  }
221
0
}
222
223
4.56M
void CQLProcessor::ProcessCall(rpc::InboundCallPtr call) {
224
4.56M
  call_ = std::dynamic_pointer_cast<CQLInboundCall>(std::move(call));
225
4.56M
  call_->SetRpcMethodMetrics(cql_metrics_->rpc_method_metrics_);
226
4.56M
  is_rescheduled_.store(IsRescheduled::kFalse, std::memory_order_release);
227
4.56M
  audit_logger_.SetConnection(call_->connection());
228
4.56M
  unique_ptr<CQLRequest> request;
229
4.56M
  unique_ptr<CQLResponse> response;
230
231
  // Parse the CQL request. If the parser failed, it sets the error message in response.
232
4.56M
  parse_begin_ = MonoTime::Now();
233
4.56M
  const auto& context = static_cast<const CQLConnectionContext&>(call_->connection()->context());
234
4.56M
  const auto compression_scheme = context.compression_scheme();
235
4.56M
  if (!CQLRequest::ParseRequest(call_->serialized_request(), compression_scheme,
236
7
                                &request, &response)) {
237
7
    cql_metrics_->num_errors_parsing_cql_->Increment();
238
7
    PrepareAndSendResponse(response);
239
7
    return;
240
7
  }
241
242
4.56M
  execute_begin_ = MonoTime::Now();
243
4.56M
  cql_metrics_->time_to_parse_cql_wrapper_->Increment(
244
4.56M
      execute_begin_.GetDeltaSince(parse_begin_).ToMicroseconds());
245
246
  // Execute the request (perhaps asynchronously).
247
4.56M
  SetCurrentSession(call_->ql_session());
248
4.56M
  request_ = std::move(request);
249
4.56M
  call_->SetRequest(request_, service_impl_);
250
4.56M
  retry_count_ = 0;
251
4.56M
  response = ProcessRequest(*request_);
252
4.56M
  PrepareAndSendResponse(response);
253
4.56M
}
254
255
4.74M
void CQLProcessor::Release() {
256
4.74M
  call_ = nullptr;
257
4.74M
  request_ = nullptr;
258
4.74M
  stmts_.clear();
259
4.74M
  parse_trees_.clear();
260
4.74M
  SetCurrentSession(nullptr);
261
4.74M
  is_rescheduled_.store(IsRescheduled::kFalse, std::memory_order_release);
262
4.74M
  audit_logger_.SetConnection(nullptr);
263
4.74M
  service_impl_->ReturnProcessor(pos_);
264
4.74M
}
265
266
9.07M
void CQLProcessor::PrepareAndSendResponse(const unique_ptr<CQLResponse>& response) {
267
9.07M
  if (response) {
268
4.54M
    const CQLConnectionContext& context =
269
4.54M
        static_cast<const CQLConnectionContext&>(call_->connection()->context());
270
4.54M
    response->set_registered_events(context.registered_events());
271
4.54M
    SendResponse(*response);
272
4.54M
  }
273
9.07M
}
274
275
4.55M
void CQLProcessor::SendResponse(const CQLResponse& response) {
276
  // Serialize the response to return to the CQL client. In case of error, an error response
277
  // should still be present.
278
4.55M
  MonoTime response_begin = MonoTime::Now();
279
4.55M
  const auto& context = static_cast<const CQLConnectionContext&>(call_->connection()->context());
280
4.55M
  const auto compression_scheme = context.compression_scheme();
281
4.55M
  faststring msg;
282
4.55M
  response.Serialize(compression_scheme, &msg);
283
4.55M
  call_->RespondSuccess(RefCntBuffer(msg));
284
285
4.55M
  MonoTime response_done = MonoTime::Now();
286
4.55M
  cql_metrics_->time_to_process_request_->Increment(
287
4.55M
      response_done.GetDeltaSince(parse_begin_).ToMicroseconds());
288
4.56M
  if (request_ != nullptr) {
289
4.56M
    cql_metrics_->time_to_execute_cql_request_->Increment(
290
4.56M
        response_begin.GetDeltaSince(execute_begin_).ToMicroseconds());
291
4.56M
  }
292
4.55M
  cql_metrics_->time_to_queue_cql_response_->Increment(
293
4.55M
      response_done.GetDeltaSince(response_begin).ToMicroseconds());
294
295
4.55M
  Release();
296
4.55M
}
297
298
67.4k
bool CQLProcessor::CheckAuthentication(const CQLRequest& req) const {
299
67.4k
  return call_->ql_session()->is_user_authenticated() ||
300
      // CQL requests which do not need authorization.
301
12.5k
      req.opcode() == CQLMessage::Opcode::STARTUP ||
302
      // Some drivers issue OPTIONS prior to AUTHENTICATE
303
6.28k
      req.opcode() == CQLMessage::Opcode::OPTIONS ||
304
6.28k
      req.opcode() == CQLMessage::Opcode::AUTH_RESPONSE;
305
67.4k
}
306
307
4.56M
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const CQLRequest& req) {
308
4.56M
  if (FLAGS_use_cassandra_authentication && !CheckAuthentication(req)) {
309
0
    LOG(ERROR) << "Could not execute statement by not authenticated user!";
310
0
    return make_unique<ErrorResponse>(
311
0
        req, ErrorResponse::Code::SERVER_ERROR,
312
0
        "Could not execute statement by not authenticated user");
313
0
  }
314
315
4.56M
  switch (req.opcode()) {
316
2.50k
    case CQLMessage::Opcode::OPTIONS:
317
2.50k
      return ProcessRequest(static_cast<const OptionsRequest&>(req));
318
9.39k
    case CQLMessage::Opcode::STARTUP:
319
9.39k
      return ProcessRequest(static_cast<const StartupRequest&>(req));
320
5.02k
    case CQLMessage::Opcode::PREPARE:
321
5.02k
      return ProcessRequest(static_cast<const PrepareRequest&>(req));
322
4.37M
    case CQLMessage::Opcode::EXECUTE:
323
4.37M
      return ProcessRequest(static_cast<const ExecuteRequest&>(req));
324
159k
    case CQLMessage::Opcode::QUERY:
325
159k
      return ProcessRequest(static_cast<const QueryRequest&>(req));
326
1.66k
    case CQLMessage::Opcode::BATCH:
327
1.66k
      return ProcessRequest(static_cast<const BatchRequest&>(req));
328
6.28k
    case CQLMessage::Opcode::AUTH_RESPONSE:
329
6.28k
      return ProcessRequest(static_cast<const AuthResponseRequest&>(req));
330
2.23k
    case CQLMessage::Opcode::REGISTER:
331
2.23k
      return ProcessRequest(static_cast<const RegisterRequest&>(req));
332
333
0
    case CQLMessage::Opcode::ERROR: FALLTHROUGH_INTENDED;
334
0
    case CQLMessage::Opcode::READY: FALLTHROUGH_INTENDED;
335
0
    case CQLMessage::Opcode::AUTHENTICATE: FALLTHROUGH_INTENDED;
336
0
    case CQLMessage::Opcode::SUPPORTED: FALLTHROUGH_INTENDED;
337
0
    case CQLMessage::Opcode::RESULT: FALLTHROUGH_INTENDED;
338
0
    case CQLMessage::Opcode::EVENT: FALLTHROUGH_INTENDED;
339
0
    case CQLMessage::Opcode::AUTH_CHALLENGE: FALLTHROUGH_INTENDED;
340
0
    case CQLMessage::Opcode::AUTH_SUCCESS:
341
0
      break;
342
0
  }
343
344
0
  LOG(FATAL) << "Invalid CQL request: opcode = " << static_cast<int>(req.opcode());
345
0
  return nullptr;
346
0
}
347
348
2.50k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const OptionsRequest& req) {
349
2.50k
  return make_unique<SupportedResponse>(req, &kSupportedOptions);
350
2.50k
}
351
352
9.39k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const StartupRequest& req) {
353
26.7k
  for (const auto& option : req.options()) {
354
26.7k
    const auto& name = option.first;
355
26.7k
    const auto& value = option.second;
356
26.7k
    const auto it = kSupportedOptions.find(name);
357
26.7k
    if (it == kSupportedOptions.end() ||
358
17.3k
        std::find(it->second.begin(), it->second.end(), value) == it->second.end()) {
359
17.3k
      YB_LOG_EVERY_N_SECS(WARNING, 60) << Format("Unsupported driver option $0 = $1", name, value);
360
17.3k
    }
361
26.7k
    if (name == CQLMessage::kCompressionOption) {
362
44
      auto& context = static_cast<CQLConnectionContext&>(call_->connection()->context());
363
44
      if (value == CQLMessage::kLZ4Compression) {
364
22
        context.set_compression_scheme(CQLMessage::CompressionScheme::kLz4);
365
22
      } else if (value == CQLMessage::kSnappyCompression) {
366
22
        context.set_compression_scheme(CQLMessage::CompressionScheme::kSnappy);
367
0
      } else {
368
0
        return make_unique<ErrorResponse>(
369
0
            req, ErrorResponse::Code::PROTOCOL_ERROR,
370
0
            Substitute("Unsupported compression scheme $0", value));
371
0
      }
372
44
    }
373
26.7k
  }
374
9.39k
  if (FLAGS_use_cassandra_authentication) {
375
6.29k
    return make_unique<AuthenticateResponse>(req, kCassandraPasswordAuthenticator);
376
3.09k
  } else {
377
3.09k
    return make_unique<ReadyResponse>(req);
378
3.09k
  }
379
9.39k
}
380
381
5.03k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const PrepareRequest& req) {
382
3
  VLOG(1) << "PREPARE " << req.query();
383
5.03k
  const CQLMessage::QueryId query_id = CQLStatement::GetQueryId(
384
5.03k
      ql_env_.CurrentKeyspace(), req.query());
385
  // To prevent multiple clients from preparing the same new statement in parallel and trying to
386
  // cache the same statement (a typical "login storm" scenario), each caller will try to allocate
387
  // the statement in the cached statement first. If it already exists, the existing one will be
388
  // returned instead. Then, each client will try to prepare the statement. The first one will do
389
  // the actual prepare while the rest wait. As the rest do the prepare afterwards, the statement
390
  // is already prepared so it will be an no-op (see Statement::Prepare).
391
5.03k
  shared_ptr<CQLStatement> stmt = service_impl_->AllocatePreparedStatement(
392
5.03k
      query_id, ql_env_.CurrentKeyspace(), req.query());
393
5.03k
  PreparedResult::UniPtr result;
394
5.03k
  Status s = stmt->Prepare(this, service_impl_->prepared_stmts_mem_tracker(),
395
5.03k
                           false /* internal */, &result);
396
397
5.03k
  if (s.ok()) {
398
4.85k
    auto pt_result = stmt->GetParseTree();
399
4.85k
    if (pt_result.ok()) {
400
4.85k
      s = audit_logger_.LogStatement(pt_result->root().get(), req.query(),
401
4.85k
                                     IsPrepare::kTrue);
402
0
    } else {
403
0
      s = pt_result.status();
404
0
    }
405
177
  } else {
406
177
    WARN_NOT_OK(audit_logger_.LogStatementError(req.query(), s,
407
177
                                                ErrorIsFormatted::kTrue),
408
177
                "Failed to log an audit record");
409
177
  }
410
411
5.03k
  if (!s.ok()) {
412
172
    service_impl_->DeletePreparedStatement(stmt);
413
172
    return ProcessError(s, stmt->query_id());
414
172
  }
415
416
4.85k
  return (result != nullptr) ? make_unique<PreparedResultResponse>(req, query_id, *result)
417
341
                             : make_unique<PreparedResultResponse>(req, query_id);
418
4.85k
}
419
420
4.37M
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const ExecuteRequest& req) {
421
4.49k
  VLOG(1) << "EXECUTE " << b2a_hex(req.query_id());
422
4.37M
  const shared_ptr<const CQLStatement> stmt = GetPreparedStatement(req.query_id());
423
4.37M
  if (stmt == nullptr) {
424
125
    return ProcessError(ErrorStatus(ErrorCode::UNPREPARED_STATEMENT), req.query_id());
425
125
  }
426
4.37M
  const Status s = stmt->ExecuteAsync(this, req.params(), statement_executed_cb_);
427
4.36M
  return s.ok() ? nullptr : ProcessError(s, stmt->query_id());
428
4.37M
}
429
430
159k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const QueryRequest& req) {
431
1.75k
  VLOG(1) << "QUERY " << req.query();
432
159k
  if (service_impl_->system_cache() != nullptr) {
433
149k
    auto cached_response = service_impl_->system_cache()->Lookup(req.query());
434
149k
    if (cached_response) {
435
100
      VLOG(1) << "Using cached response for " << req.query();
436
9.91k
      statement_executed_cb_.Run(
437
9.91k
          Status::OK(),
438
9.91k
          std::static_pointer_cast<ExecutedResult>(*cached_response));
439
9.91k
      return nullptr;
440
9.91k
    }
441
149k
  }
442
149k
  RunAsync(req.query(), req.params(), statement_executed_cb_);
443
149k
  return nullptr;
444
149k
}
445
446
1.66k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const BatchRequest& req) {
447
18.4E
  VLOG(1) << "BATCH " << req.queries().size();
448
449
1.66k
  StatementBatch batch;
450
1.66k
  batch.reserve(req.queries().size());
451
452
  // If no errors happen, batch request started here will be ended by the executor.
453
1.66k
  Status s = audit_logger_.StartBatchRequest(req.queries().size(),
454
1.66k
                                             is_rescheduled_.load(std::memory_order_acquire));
455
1.66k
  if (PREDICT_FALSE(!s.ok())) {
456
0
    return ProcessError(s);
457
0
  }
458
459
1.66k
  unique_ptr<CQLResponse> result;
460
  // For each query in the batch, look up the query id if it is a prepared statement, or prepare the
461
  // query if it is not prepared. Then execute the parse trees with the parameters.
462
389k
  for (const BatchRequest::Query& query : req.queries()) {
463
389k
    if (query.is_prepared) {
464
0
      VLOG(1) << "BATCH EXECUTE " << b2a_hex(query.query_id);
465
389k
      const shared_ptr<const CQLStatement> stmt = GetPreparedStatement(query.query_id);
466
389k
      if (stmt == nullptr) {
467
3
        result = ProcessError(ErrorStatus(ErrorCode::UNPREPARED_STATEMENT), query.query_id);
468
3
        break;
469
3
      }
470
389k
      const Result<const ParseTree&> parse_tree = stmt->GetParseTree();
471
389k
      if (!parse_tree) {
472
0
        result = ProcessError(parse_tree.status(), query.query_id);
473
0
        break;
474
0
      }
475
389k
      batch.emplace_back(*parse_tree, query.params);
476
296
    } else {
477
1
      VLOG(1) << "BATCH QUERY " << query.query;
478
296
      ParseTree::UniPtr parse_tree;
479
296
      s = Prepare(query.query, &parse_tree);
480
296
      if (PREDICT_FALSE(!s.ok())) {
481
2
        result = ProcessError(s);
482
2
        break;
483
2
      }
484
294
      batch.emplace_back(*parse_tree, query.params);
485
294
      parse_trees_.insert(std::move(parse_tree));
486
294
    }
487
389k
  }
488
489
1.66k
  if (result) {
490
5
    s = audit_logger_.EndBatchRequest();
491
    // Otherwise, batch request will be ended by the executor or when processing an error.
492
5
    if (PREDICT_FALSE(!s.ok())) {
493
0
      result = ProcessError(s);
494
0
    }
495
5
    return result;
496
5
  }
497
498
1.65k
  ExecuteAsync(batch, statement_executed_cb_);
499
500
1.65k
  return nullptr;
501
1.65k
}
502
503
6.28k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const AuthResponseRequest& req) {
504
6.28k
  const auto& params = req.params();
505
6.28k
  if (FLAGS_ycql_cache_login_info) {
506
519
    auto salted_hash_result = ql_env_.RoleSaltedHash(params.username);
507
519
    if (salted_hash_result.ok()) {
508
398
      auto can_login_result = ql_env_.RoleCanLogin(params.username);
509
398
      if (can_login_result.ok()) {
510
398
        unique_ptr<CQLResponse> response =
511
398
          ProcessAuthResult(*salted_hash_result, *can_login_result);
512
0
        VLOG(1) << "Used cached authentication";
513
398
        Status s = audit_logger_.LogAuthResponse(*response);
514
398
        return response;
515
0
      } else {
516
0
        VLOG(1) << "Unable to get can_login for user " << params.username << ": "
517
0
                  << can_login_result;
518
0
      }
519
121
    } else {
520
0
      VLOG(1) << "Unable to get salted hash for user " << params.username << ": "
521
0
                << salted_hash_result;
522
121
    }
523
519
  }
524
5.89k
  shared_ptr<Statement> stmt = service_impl_->GetAuthPreparedStatement();
525
5.89k
  if (!stmt->Prepare(this, nullptr /* memtracker */, true /* internal */).ok()) {
526
0
    return make_unique<ErrorResponse>(
527
0
        req, ErrorResponse::Code::SERVER_ERROR,
528
0
        "Could not prepare statement for querying user " + params.username);
529
0
  }
530
5.89k
  if (!stmt->ExecuteAsync(this, params, statement_executed_cb_).ok()) {
531
0
    LOG(ERROR) << "Could not execute prepared statement to fetch login info!";
532
0
    return make_unique<ErrorResponse>(
533
0
        req, ErrorResponse::Code::SERVER_ERROR,
534
0
        "Could not execute prepared statement for querying roles for user " + params.username);
535
0
  }
536
5.89k
  return nullptr;
537
5.89k
}
538
539
2.23k
unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const RegisterRequest& req) {
540
2.23k
  CQLConnectionContext& context =
541
2.23k
      static_cast<CQLConnectionContext&>(call_->connection()->context());
542
2.23k
  context.add_registered_events(req.events());
543
2.23k
  return make_unique<ReadyResponse>(req);
544
2.23k
}
545
546
4.76M
shared_ptr<const CQLStatement> CQLProcessor::GetPreparedStatement(const CQLMessage::QueryId& id) {
547
4.76M
  shared_ptr<const CQLStatement> stmt = service_impl_->GetPreparedStatement(id);
548
4.76M
  if (stmt != nullptr) {
549
4.76M
    stmt->clear_reparsed();
550
4.76M
    stmts_.insert(stmt);
551
4.76M
  }
552
4.76M
  return stmt;
553
4.76M
}
554
555
4.53M
void CQLProcessor::StatementExecuted(const Status& s, const ExecutedResult::SharedPtr& result) {
556
4.51M
  unique_ptr<CQLResponse> response(s.ok() ? ProcessResult(result) : ProcessError(s));
557
4.53M
  if (response && !s.ok()) {
558
    // Error response means we're not going to be transparently restarting a query.
559
4.87k
    WARN_NOT_OK(audit_logger_.EndBatchRequest(), "Failed to end batch request");
560
4.87k
  }
561
4.53M
  PrepareAndSendResponse(response);
562
4.53M
}
563
564
unique_ptr<CQLResponse> CQLProcessor::ProcessError(const Status& s,
565
5.18k
                                                   boost::optional<CQLMessage::QueryId> query_id) {
566
5.18k
  if (s.IsQLError()) {
567
5.04k
    ErrorCode ql_errcode = GetErrorCode(s);
568
5.04k
    if (ql_errcode == ErrorCode::UNPREPARED_STATEMENT ||
569
4.92k
        ql_errcode == ErrorCode::STALE_METADATA) {
570
      // Delete all stale prepared statements from our cache. Since CQL protocol allows only one
571
      // unprepared query id to be returned, we will return just the last unprepared / stale one
572
      // we found.
573
1.07k
      for (auto stmt : stmts_) {
574
1.07k
        if (stmt->stale()) {
575
1.07k
          service_impl_->DeletePreparedStatement(stmt);
576
1.07k
        }
577
1.08k
        if (stmt->unprepared() || stmt->stale()) {
578
1.08k
          query_id = stmt->query_id();
579
1.08k
        }
580
1.07k
      }
581
1.21k
      if (query_id) {
582
1.20k
        return make_unique<UnpreparedErrorResponse>(*request_, *query_id);
583
1.20k
      }
584
      // When no unprepared query id is found, it means all statements we executed were queries
585
      // (non-prepared statements). In that case, just retry the request (once only). The retry
586
      // needs to be rescheduled in because this callback may not be executed in the RPC worker
587
      // thread. Also, rescheduling gives other calls a chance to execute first before we do.
588
7
      if (++retry_count_ == 1) {
589
7
        stmts_.clear();
590
7
        parse_trees_.clear();
591
7
        Reschedule(&process_request_task_.Bind(this));
592
7
        return nullptr;
593
7
      }
594
18.4E
      return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::INVALID,
595
18.4E
                                        "Query failed to execute due to stale metadata cache");
596
3.83k
    } else if (ql_errcode < ErrorCode::SUCCESS) {
597
3.83k
      if (ql_errcode == ErrorCode::UNAUTHORIZED) {
598
174
        return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::UNAUTHORIZED,
599
174
                                          s.ToUserMessage());
600
3.66k
      } else if (ql_errcode > ErrorCode::LIMITATION_ERROR) {
601
        // System errors, internal errors, or crashes.
602
2.09k
        return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR,
603
2.09k
                                          s.ToUserMessage());
604
1.56k
      } else if (ql_errcode > ErrorCode::SEM_ERROR) {
605
        // Limitation, lexical, or parsing errors.
606
272
        return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SYNTAX_ERROR,
607
272
                                          s.ToUserMessage());
608
1.29k
      } else {
609
        // Semantic or execution errors.
610
1.29k
        return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::INVALID,
611
1.29k
                                          s.ToUserMessage());
612
1.29k
      }
613
18.4E
    }
614
615
18.4E
    LOG(ERROR) << "Internal error: invalid error code " << static_cast<int64_t>(GetErrorCode(s));
616
18.4E
    return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR,
617
18.4E
                                      "Invalid error code");
618
138
  } else if (s.IsNotAuthorized()) {
619
60
    return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::UNAUTHORIZED,
620
60
                                      s.ToUserMessage());
621
60
  }
622
623
78
  return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR,
624
78
                                    s.ToUserMessage());
625
78
}
626
627
namespace {
628
629
struct LDAPMemoryDeleter {
630
33
  void operator()(void* ptr) const { ldap_memfree(ptr); }
631
};
632
633
struct LDAPMessageDeleter {
634
27
  void operator()(LDAPMessage* ptr) const { ldap_msgfree(ptr); }
635
};
636
637
struct LDAPDeleter {
638
64
  void operator()(LDAP* ptr) const { ldap_unbind_ext(ptr, NULL, NULL); }
639
};
640
641
using LDAPHolder = unique_ptr<LDAP, LDAPDeleter>;
642
using LDAPMessageHolder = unique_ptr<LDAPMessage, LDAPMessageDeleter>;
643
template<class T>
644
using LDAPMemoryHolder = unique_ptr<T, LDAPMemoryDeleter>;
645
646
class LDAPError {
647
 public:
648
0
  explicit LDAPError(int c) : code(c), ldap_(nullptr) {}
649
14
  LDAPError(int c, const LDAPHolder& l) : code(c), ldap_(&l) {}
650
651
14
  LDAP* GetLDAP() const { return ldap_ ? ldap_->get() : nullptr; }
652
  const int code;
653
654
 private:
655
  const LDAPHolder* ldap_;
656
};
657
658
/*
659
* Add a detail error message text to the current error if one can be
660
* constructed from the LDAP 'diagnostic message'.
661
*/
662
14
ostream& operator<<(ostream& str, const LDAPError& error) {
663
14
  str << ldap_err2string(error.code);
664
14
  auto ldap = error.GetLDAP();
665
14
  if (ldap) {
666
14
    char *message = nullptr;
667
14
    const auto rc = ldap_get_option(ldap, LDAP_OPT_DIAGNOSTIC_MESSAGE, &message);
668
14
    if (rc == LDAP_SUCCESS && message != nullptr) {
669
13
      LDAPMemoryHolder<char> holder(message);
670
13
      str << " LDAP diagnostics: " << message;
671
13
    }
672
14
  }
673
14
  return str;
674
14
}
675
676
64
Result<LDAPHolder> InitializeLDAPConnection(const char *uris) {
677
64
  LDAP* ldap_ptr = nullptr;
678
64
  auto r = ldap_initialize(&ldap_ptr, uris);
679
64
  if (r != LDAP_SUCCESS) {
680
0
    return STATUS_FORMAT(InternalError, "could not initialize LDAP: $0", LDAPError(r));
681
0
  }
682
64
  LDAPHolder ldap(ldap_ptr);
683
64
  VLOG(4) << "Successfully initialized LDAP struct";
684
685
64
  int ldapversion = LDAP_VERSION3;
686
64
  if ((r = ldap_set_option(ldap_ptr, LDAP_OPT_PROTOCOL_VERSION, &ldapversion)) != LDAP_SUCCESS) {
687
0
    return STATUS_FORMAT(InternalError, "could not set LDAP protocol version: $0",
688
0
        LDAPError(r, ldap));
689
0
  }
690
64
  VLOG(4) << "Successfully set protocol version option";
691
692
64
  if (FLAGS_ycql_ldap_tls && ((r = ldap_start_tls_s(ldap_ptr, NULL, NULL)) != LDAP_SUCCESS)) {
693
0
    return STATUS_FORMAT(InternalError, "could not start LDAP TLS session: $0", LDAPError(r, ldap));
694
0
  }
695
696
64
  return ldap;
697
64
}
698
699
/*
700
 * Return a newly allocated string copied from "pattern" with all
701
 * occurrences of the placeholder "$username" replaced with "user_name".
702
 */
703
14
std::string FormatSearchFilter(const std::string& pattern, const std::string& user_name) {
704
14
  return boost::replace_all_copy(pattern, "$username", user_name);
705
14
}
706
707
60
Result<bool> CheckLDAPAuth(const ql::AuthResponseRequest::AuthQueryParameters& params) {
708
60
  if (params.username.empty() || params.password.empty()) {
709
    // Refer https://datatracker.ietf.org/doc/html/rfc4513#section-6.3.1 for details. Applications
710
    // are required to explicitly have this check and can't rely on an LDAP server to report
711
    // invalid credentials error.
712
16
    return STATUS(InvalidArgument, "Empty username and/or password not allowed");
713
16
  }
714
715
44
  if (FLAGS_ycql_ldap_server.empty())
716
0
    return STATUS(InvalidArgument, "LDAP server not specified");
717
718
44
  VLOG(4) << "Attempting ldap_initialize() with " << FLAGS_ycql_ldap_server;
719
720
44
  const auto& uris = FLAGS_ycql_ldap_server;
721
44
  auto ldap = VERIFY_RESULT(InitializeLDAPConnection(uris.c_str()));
722
723
44
  int r;
724
44
  std::string fulluser;
725
44
  if (!FLAGS_ycql_ldap_base_dn.empty()) {
726
36
    if (FLAGS_ycql_ldap_bind_dn.empty() || FLAGS_ycql_ldap_bind_passwd.empty()) {
727
3
      return STATUS(InvalidArgument,
728
3
                    "Empty bind dn and/or bind password not allowed for search+bind mode");
729
3
    }
730
731
    /*
732
    * First perform an LDAP search to find the DN for the user we are
733
    * trying to log in as.
734
    */
735
33
    char ldap_no_attrs[sizeof(LDAP_NO_ATTRS)+1];
736
33
    strncpy(ldap_no_attrs, LDAP_NO_ATTRS, sizeof(ldap_no_attrs));
737
33
    char *attributes[] = {ldap_no_attrs, NULL};
738
739
    /*
740
    * Disallow any characters that we would otherwise need to escape,
741
    * since they aren't really reasonable in a username anyway. Allowing
742
    * them would make it possible to inject any kind of custom filters in
743
    * the LDAP filter.
744
    */
745
305
    for (const char& c : params.username) {
746
305
      switch (c) {
747
2
        case '*':
748
2
        case '(':
749
2
        case ')':
750
2
        case '\\':
751
2
        case '/':
752
2
          return STATUS(InvalidArgument, "invalid character in user name for LDAP authentication");
753
305
      }
754
305
    }
755
756
    /*
757
    * Bind with a pre-defined username/password (if available) for
758
    * searching. If none is specified, this turns into an anonymous bind.
759
    */
760
31
    struct berval cred;
761
31
    ber_str2bv(FLAGS_ycql_ldap_bind_passwd.c_str(), 0 /* len */, 0 /* duplicate */ , &cred);
762
31
    r = ldap_sasl_bind_s(ldap.get(), FLAGS_ycql_ldap_bind_dn.c_str(),
763
31
                         LDAP_SASL_SIMPLE, &cred,
764
31
                         NULL /* serverctrls */, NULL /* clientctrls */,
765
31
                         NULL /* servercredp */);
766
31
    if (r != LDAP_SUCCESS) {
767
4
      return STATUS_FORMAT(
768
4
        InvalidArgument,
769
4
        "could not perform initial LDAP bind for ldapbinddn '$0' on server '$1': $2",
770
4
        FLAGS_ycql_ldap_bind_dn, FLAGS_ycql_ldap_server, LDAPError(r, ldap));
771
4
    }
772
773
27
    std::string filter;
774
    /* Build a custom filter or a single attribute filter? */
775
27
    if (!FLAGS_ycql_ldap_search_filter.empty()) {
776
14
      filter = FormatSearchFilter(FLAGS_ycql_ldap_search_filter, params.username);
777
13
    } else if (!FLAGS_ycql_ldap_search_attribute.empty()) {
778
13
      filter = "(" + FLAGS_ycql_ldap_search_attribute + "=" + params.username + ")";
779
0
    } else {
780
0
      filter = "(uid=" + params.username + ")";
781
0
    }
782
783
27
    LDAPMessage *search_message;
784
27
    r = ldap_search_ext_s(ldap.get(), FLAGS_ycql_ldap_base_dn.c_str(), LDAP_SCOPE_SUBTREE,
785
27
                          filter.c_str(), attributes, 0, NULL, NULL, NULL, 0, &search_message);
786
27
    LDAPMessageHolder search_message_holder{search_message};
787
788
27
    if (r != LDAP_SUCCESS) {
789
2
      return STATUS_FORMAT(
790
2
          InternalError, "could not search LDAP for filter '$0' on server '$1': $2", filter,
791
2
          FLAGS_ycql_ldap_server, LDAPError(r, ldap));
792
2
    }
793
794
25
    auto count = ldap_count_entries(ldap.get(), search_message);
795
25
    switch(count) {
796
3
      case 0:
797
3
        return STATUS_FORMAT(
798
0
            NotFound,
799
0
            "LDAP user '$0' does not exist. "\
800
0
            "LDAP search for filter '$1' on server '$2' returned no entries.",
801
0
            params.username, filter, FLAGS_ycql_ldap_server);
802
20
      case 1:
803
20
        break;
804
2
      default:
805
2
        return STATUS_FORMAT(
806
20
            NotFound, "LDAP user '$0' is not unique, $1 entries exist.", params.username, count);
807
20
    }
808
809
    // No need to free entry pointer since it is a pointer to data in
810
    // search_message. Freeing search_message takes cares of it.
811
20
    auto *entry = ldap_first_entry(ldap.get(), search_message);
812
20
    char *dn = ldap_get_dn(ldap.get(), entry);
813
20
    if (dn == NULL) {
814
0
      int error;
815
0
      ldap_get_option(ldap.get(), LDAP_OPT_ERROR_NUMBER, &error);
816
0
      return STATUS_FORMAT(
817
0
          NotFound, "could not get dn for the first entry matching '$0' on server '$1': $2",
818
0
          filter, FLAGS_ycql_ldap_server, LDAPError(error, ldap));
819
0
    }
820
20
    LDAPMemoryHolder<char> dn_holder{dn};
821
20
    fulluser = dn;
822
823
    /*
824
    * Need to re-initialize the LDAP connection, so that we can bind to
825
    * it with a different username.
826
    */
827
20
    ldap = VERIFY_RESULT(InitializeLDAPConnection(uris.c_str()));
828
8
  } else {
829
8
    fulluser = FLAGS_ycql_ldap_user_prefix + params.username + FLAGS_ycql_ldap_user_suffix;
830
8
  }
831
832
28
  VLOG(4) << "Checking authentication using LDAP for user DN=" << fulluser;
833
834
28
  struct berval cred;
835
28
  ber_str2bv(params.password.c_str(), 0 /* len */, 0 /* duplicate */, &cred);
836
28
  r = ldap_sasl_bind_s(ldap.get(), fulluser.c_str(),
837
28
                       LDAP_SASL_SIMPLE, &cred,
838
28
                       NULL /* serverctrls */, NULL /* clientctrls */,
839
28
                       NULL /* servercredp */);
840
28
  VLOG(4) << "ldap_sasl_bind_s return value =" << r;
841
842
28
  if (r != LDAP_SUCCESS) {
843
8
    std::ostringstream str;
844
8
    str << "LDAP login failed for user '" << fulluser << "' on server '"
845
8
        << FLAGS_ycql_ldap_server << "': " << LDAPError(r, ldap);
846
8
    auto error_msg = str.str();
847
8
    if (r == LDAP_INVALID_CREDENTIALS) {
848
7
      LOG(ERROR) << error_msg;
849
7
      return false;
850
7
    }
851
852
1
    return STATUS(InternalError, error_msg);
853
1
  }
854
855
20
  return true;
856
20
}
857
858
140
static bool UserIn(const std::string& username, const std::string& users_to_skip) {
859
140
  size_t comma_index = 0;
860
140
  size_t prev_comma_index = -1;
861
862
  // TODO(Piyush): Store a static list of usernames from csv instead of traversing each time.
863
140
  while ((comma_index = users_to_skip.find(",", prev_comma_index + 1)) != std::string::npos) {
864
0
    if (users_to_skip.substr(prev_comma_index + 1,
865
0
                             comma_index - (prev_comma_index + 1)) == username)
866
0
      return true;
867
0
    VLOG(2) << "Check " << username << " with "
868
0
            << users_to_skip.substr(prev_comma_index + 1, comma_index - (prev_comma_index + 1));
869
0
    prev_comma_index = comma_index;
870
0
  }
871
140
  VLOG(2) << "Check " << username << " with "
872
140
          << users_to_skip.substr(
873
140
              prev_comma_index + 1, users_to_skip.size() - (prev_comma_index + 1));
874
140
  return users_to_skip.substr(prev_comma_index + 1, users_to_skip.size() - (prev_comma_index + 1))
875
140
    == username;
876
140
}
877
878
} // namespace
879
880
6.23k
unique_ptr<CQLResponse> CQLProcessor::ProcessAuthResult(const string& saved_hash, bool can_login) {
881
6.23k
  const auto& req = down_cast<const AuthResponseRequest&>(*request_);
882
6.23k
  const auto& params = req.params();
883
6.23k
  unique_ptr<CQLResponse> response = nullptr;
884
6.23k
  bool authenticated = false;
885
886
6.23k
  if (FLAGS_ycql_use_ldap && !UserIn(params.username, FLAGS_ycql_ldap_users_to_skip_csv)) {
887
60
    Result<bool> ldap_auth_result = CheckLDAPAuth(req.params());
888
60
    if (!ldap_auth_result.ok()) {
889
33
      return make_unique<ErrorResponse>(
890
33
          *request_, ErrorResponse::Code::SERVER_ERROR,
891
33
          "Failed to authenticate using LDAP: " + yb::ToString(ldap_auth_result));
892
27
    } else if (!*ldap_auth_result) {
893
7
      response = make_unique<ErrorResponse>(
894
7
          *request_, ErrorResponse::Code::BAD_CREDENTIALS,
895
7
          "Failed to authenticate using LDAP: Provided username '" + params.username +
896
7
          "' and/or password are incorrect");
897
20
    } else {
898
20
      authenticated = true;
899
20
      call_->ql_session()->set_current_role_name(params.username);
900
20
      response = make_unique<AuthSuccessResponse>(*request_,
901
20
                                                  "" /* this does not matter */);
902
20
    }
903
6.17k
  } else if (saved_hash.empty()) {
904
    // Username doesn't have a password, but one is required for authentication. Return an error.
905
6
    response = make_unique<ErrorResponse>(
906
6
        *request_, ErrorResponse::Code::BAD_CREDENTIALS,
907
6
        "Provided username '" + params.username + "' and/or password are incorrect");
908
6.16k
  } else {
909
6.16k
    if (!service_impl_->CheckPassword(params.password, saved_hash)) {
910
7
      response = make_unique<ErrorResponse>(
911
7
          *request_, ErrorResponse::Code::BAD_CREDENTIALS,
912
7
          "Provided username '" + params.username + "' and/or password are incorrect");
913
6.16k
    } else if (!can_login) {
914
18
      response = make_unique<ErrorResponse>(
915
18
          *request_, ErrorResponse::Code::BAD_CREDENTIALS,
916
18
          params.username + " is not permitted to log in");
917
6.14k
    } else {
918
6.14k
      call_->ql_session()->set_current_role_name(params.username);
919
6.14k
      response = make_unique<AuthSuccessResponse>(*request_, "" /* this does not matter */);
920
6.14k
      authenticated = true;
921
6.14k
    }
922
6.16k
  }
923
6.20k
  call_->ql_session()->set_user_authenticated(authenticated);
924
6.20k
  return response;
925
6.23k
}
926
927
4.52M
unique_ptr<CQLResponse> CQLProcessor::ProcessResult(const ExecutedResult::SharedPtr& result) {
928
4.52M
  if (result == nullptr) {
929
796k
    return make_unique<VoidResultResponse>(*request_);
930
796k
  }
931
3.72M
  switch (result->type()) {
932
4.18k
    case ExecutedResult::Type::SET_KEYSPACE: {
933
4.18k
      const auto& set_keyspace_result = static_cast<const SetKeyspaceResult&>(*result);
934
4.18k
      return make_unique<SetKeyspaceResultResponse>(*request_, set_keyspace_result);
935
0
    }
936
3.70M
    case ExecutedResult::Type::ROWS: {
937
3.70M
      const RowsResult::SharedPtr& rows_result = std::static_pointer_cast<RowsResult>(result);
938
3.70M
      if (request_->opcode() != CQLMessage::Opcode::AUTH_RESPONSE) {
939
3.69M
        cql_metrics_->ql_response_size_bytes_->Increment(rows_result->rows_data().size());
940
3.69M
      }
941
3.70M
      switch (request_->opcode()) {
942
3.62M
        case CQLMessage::Opcode::EXECUTE:
943
3.62M
          return make_unique<RowsResultResponse>(down_cast<const ExecuteRequest&>(*request_),
944
3.62M
                                                 rows_result);
945
92.8k
        case CQLMessage::Opcode::QUERY:
946
92.8k
          return make_unique<RowsResultResponse>(down_cast<const QueryRequest&>(*request_),
947
92.8k
                                                 rows_result);
948
52
        case CQLMessage::Opcode::BATCH:
949
52
          return make_unique<RowsResultResponse>(down_cast<const BatchRequest&>(*request_),
950
52
                                                 rows_result);
951
952
5.88k
        case CQLMessage::Opcode::AUTH_RESPONSE: {
953
5.88k
          const auto& req = down_cast<const AuthResponseRequest&>(*request_);
954
5.88k
          const auto& params = req.params();
955
5.88k
          const auto row_block = rows_result->GetRowBlock();
956
5.88k
          unique_ptr<CQLResponse> response = nullptr;
957
5.88k
          if (row_block->row_count() != 1) {
958
52
            response = make_unique<ErrorResponse>(
959
52
                *request_, ErrorResponse::Code::BAD_CREDENTIALS,
960
52
                "Provided username '" + params.username + "' and/or password are incorrect");
961
5.83k
          } else {
962
5.83k
            const auto& row = row_block->row(0);
963
5.83k
            const auto& schema = row_block->schema();
964
965
5.83k
            const QLValue& salted_hash_value =
966
5.83k
                row.column(schema.find_column(kRoleColumnNameSaltedHash));
967
5.83k
            const auto& can_login =
968
5.83k
                row.column(schema.find_column(kRoleColumnNameCanLogin)).bool_value();
969
            // Returning empty string is fine since it would error out as expected,
970
            // if the hash is empty
971
5.83k
            const string saved_hash = salted_hash_value.IsNull() ?
972
5.77k
                "" : salted_hash_value.string_value();
973
5.83k
            response = ProcessAuthResult(saved_hash, can_login);
974
5.83k
          }
975
5.88k
          Status s = audit_logger_.LogAuthResponse(*response);
976
5.88k
          if (!s.ok()) {
977
0
            return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR,
978
0
                                              "Failed to write an audit log record");
979
0
          }
980
981
5.88k
          return response;
982
5.88k
        }
983
0
        case CQLMessage::Opcode::ERROR:   FALLTHROUGH_INTENDED;
984
0
        case CQLMessage::Opcode::STARTUP: FALLTHROUGH_INTENDED;
985
0
        case CQLMessage::Opcode::READY:   FALLTHROUGH_INTENDED;
986
0
        case CQLMessage::Opcode::AUTHENTICATE: FALLTHROUGH_INTENDED;
987
0
        case CQLMessage::Opcode::OPTIONS:   FALLTHROUGH_INTENDED;
988
0
        case CQLMessage::Opcode::SUPPORTED: FALLTHROUGH_INTENDED;
989
0
        case CQLMessage::Opcode::RESULT:    FALLTHROUGH_INTENDED;
990
0
        case CQLMessage::Opcode::PREPARE:   FALLTHROUGH_INTENDED;
991
0
        case CQLMessage::Opcode::REGISTER:  FALLTHROUGH_INTENDED;
992
0
        case CQLMessage::Opcode::EVENT:     FALLTHROUGH_INTENDED;
993
0
        case CQLMessage::Opcode::AUTH_CHALLENGE: FALLTHROUGH_INTENDED;
994
0
        case CQLMessage::Opcode::AUTH_SUCCESS:
995
0
          break;
996
        // default: fall through.
997
0
      }
998
0
      LOG(FATAL) << "Internal error: not a request that returns result "
999
0
                 << static_cast<int>(request_->opcode());
1000
0
      break;
1001
0
    }
1002
6.05k
    case ExecutedResult::Type::SCHEMA_CHANGE: {
1003
6.05k
      const auto& schema_change_result = static_cast<const SchemaChangeResult&>(*result);
1004
6.05k
      return make_unique<SchemaChangeResultResponse>(*request_, schema_change_result);
1005
0
    }
1006
1007
    // default: fall through.
1008
0
  }
1009
0
  LOG(ERROR) << "Internal error: unknown result type " << static_cast<int>(result->type());
1010
0
  return make_unique<ErrorResponse>(
1011
0
      *request_, ErrorResponse::Code::SERVER_ERROR, "Internal error: unknown result type");
1012
0
}
1013
1014
4.96M
bool CQLProcessor::NeedReschedule() {
1015
4.96M
  auto messenger = service_impl_->messenger();
1016
4.96M
  if (!messenger) {
1017
0
    return false;
1018
0
  }
1019
4.96M
  return !messenger->ThreadPool(rpc::ServicePriority::kNormal).OwnsThisThread();
1020
4.96M
}
1021
1022
4.94M
void CQLProcessor::Reschedule(rpc::ThreadPoolTask* task) {
1023
4.94M
  is_rescheduled_.store(IsRescheduled::kTrue, std::memory_order_release);
1024
4.94M
  auto messenger = service_impl_->messenger();
1025
12.5k
  DCHECK(messenger != nullptr) << "No messenger to reschedule CQL call";
1026
4.94M
  messenger->ThreadPool(rpc::ServicePriority::kNormal).Enqueue(task);
1027
4.94M
}
1028
1029
5.11M
CoarseTimePoint CQLProcessor::GetDeadline() const {
1030
4.92M
  return call_ ? call_->GetClientDeadline()
1031
185k
               : CoarseMonoClock::now() + FLAGS_client_read_write_timeout_ms * 1ms;
1032
5.11M
}
1033
1034
}  // namespace cqlserver
1035
}  // namespace yb