/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_processor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/cql/cqlserver/cql_processor.h" |
17 | | |
18 | | #include <ldap.h> |
19 | | |
20 | | #include <boost/algorithm/string.hpp> |
21 | | |
22 | | #include "yb/common/ql_rowblock.h" |
23 | | #include "yb/common/ql_value.h" |
24 | | #include "yb/common/schema.h" |
25 | | |
26 | | #include "yb/gutil/bind.h" |
27 | | #include "yb/gutil/casts.h" |
28 | | #include "yb/gutil/strings/escaping.h" |
29 | | |
30 | | #include "yb/rpc/connection.h" |
31 | | #include "yb/rpc/messenger.h" |
32 | | |
33 | | #include "yb/util/crypt.h" |
34 | | #include "yb/util/flag_tags.h" |
35 | | #include "yb/util/format.h" |
36 | | #include "yb/util/logging.h" |
37 | | #include "yb/util/metrics.h" |
38 | | #include "yb/util/result.h" |
39 | | #include "yb/util/status_format.h" |
40 | | #include "yb/util/status_log.h" |
41 | | |
42 | | #include "yb/yql/cql/cqlserver/cql_service.h" |
43 | | #include "yb/yql/cql/ql/util/errcodes.h" |
44 | | |
45 | | using namespace std::literals; |
46 | | |
47 | | METRIC_DEFINE_histogram_with_percentiles( |
48 | | server, handler_latency_yb_cqlserver_CQLServerService_GetProcessor, |
49 | | "Time spent to get a processor for processing a CQL query request.", |
50 | | yb::MetricUnit::kMicroseconds, |
51 | | "Time spent to get a processor for processing a CQL query request.", 60000000LU, 2); |
52 | | METRIC_DEFINE_histogram_with_percentiles( |
53 | | server, handler_latency_yb_cqlserver_CQLServerService_ProcessRequest, |
54 | | "Time spent processing a CQL query request. From parsing till executing", |
55 | | yb::MetricUnit::kMicroseconds, |
56 | | "Time spent processing a CQL query request. From parsing till executing", 60000000LU, 2); |
57 | | METRIC_DEFINE_histogram_with_percentiles( |
58 | | server, handler_latency_yb_cqlserver_CQLServerService_ParseRequest, |
59 | | "Time spent parsing CQL query request", yb::MetricUnit::kMicroseconds, |
60 | | "Time spent parsing CQL query request", 60000000LU, 2); |
61 | | METRIC_DEFINE_histogram_with_percentiles( |
62 | | server, handler_latency_yb_cqlserver_CQLServerService_QueueResponse, |
63 | | "Time spent to queue the response for a CQL query request back on the network", |
64 | | yb::MetricUnit::kMicroseconds, |
65 | | "Time spent after computing the CQL response to queue it onto the connection.", 60000000LU, 2); |
66 | | METRIC_DEFINE_histogram_with_percentiles( |
67 | | server, handler_latency_yb_cqlserver_CQLServerService_ExecuteRequest, |
68 | | "Time spent executing the CQL query request in the handler", yb::MetricUnit::kMicroseconds, |
69 | | "Time spent executing the CQL query request in the handler", 60000000LU, 2); |
70 | | METRIC_DEFINE_counter( |
71 | | server, yb_cqlserver_CQLServerService_ParsingErrors, "Errors encountered when parsing ", |
72 | | yb::MetricUnit::kRequests, "Errors encountered when parsing "); |
73 | | METRIC_DEFINE_histogram_with_percentiles( |
74 | | server, handler_latency_yb_cqlserver_CQLServerService_Any, |
75 | | "yb.cqlserver.CQLServerService.AnyMethod RPC Time", yb::MetricUnit::kMicroseconds, |
76 | | "Microseconds spent handling " |
77 | | "yb.cqlserver.CQLServerService.AnyMethod() " |
78 | | "RPC requests", |
79 | | 60000000LU, 2); |
80 | | |
81 | | METRIC_DEFINE_gauge_int64(server, cql_processors_alive, |
82 | | "Number of alive CQL Processors.", |
83 | | yb::MetricUnit::kUnits, |
84 | | "Number of alive CQL Processors."); |
85 | | |
86 | | METRIC_DEFINE_counter(server, cql_processors_created, |
87 | | "Number of created CQL Processors.", |
88 | | yb::MetricUnit::kUnits, |
89 | | "Number of created CQL Processors."); |
90 | | |
91 | | METRIC_DEFINE_gauge_int64(server, cql_parsers_alive, |
92 | | "Number of alive CQL Parsers.", |
93 | | yb::MetricUnit::kUnits, |
94 | | "Number of alive CQL Parsers."); |
95 | | |
96 | | METRIC_DEFINE_counter(server, cql_parsers_created, |
97 | | "Number of created CQL Parsers.", |
98 | | yb::MetricUnit::kUnits, |
99 | | "Number of created CQL Parsers."); |
100 | | |
101 | | DECLARE_bool(use_cassandra_authentication); |
102 | | DECLARE_bool(ycql_cache_login_info); |
103 | | DECLARE_int32(client_read_write_timeout_ms); |
104 | | |
105 | | // LDAP specific flags |
106 | | DEFINE_bool(ycql_use_ldap, false, "Use LDAP for user logins"); |
107 | | DEFINE_string(ycql_ldap_users_to_skip_csv, "", "Users that are authenticated via the local password" |
108 | | " check instead of LDAP (if ycql_use_ldap=true). This is a comma separated list"); |
109 | | TAG_FLAG(ycql_ldap_users_to_skip_csv, sensitive_info); |
110 | | DEFINE_string(ycql_ldap_server, "", "LDAP server of the form <scheme>://<ip>:<port>"); |
111 | | DEFINE_bool(ycql_ldap_tls, false, "Connect to LDAP server using TLS encryption."); |
112 | | |
113 | | // LDAP flags for simple bind mode |
114 | | DEFINE_string(ycql_ldap_user_prefix, "", "String used for prepending the user name when forming " |
115 | | "the DN for binding to the LDAP server"); |
116 | | DEFINE_string(ycql_ldap_user_suffix, "", "String used for appending the user name when forming the " |
117 | | "DN for binding to the LDAP Server."); |
118 | | |
119 | | // Flags for LDAP search + bind mode |
120 | | DEFINE_string(ycql_ldap_base_dn, "", "Specifies the base directory to begin the user name search"); |
121 | | DEFINE_string(ycql_ldap_bind_dn, "", "Specifies the username to perform the initial search when " |
122 | | "doing search + bind authentication"); |
123 | | TAG_FLAG(ycql_ldap_bind_dn, sensitive_info); |
124 | | DEFINE_string(ycql_ldap_bind_passwd, "", "Password for username being used to perform the initial " |
125 | | "search when doing search + bind authentication"); |
126 | | TAG_FLAG(ycql_ldap_bind_passwd, sensitive_info); |
127 | | DEFINE_string(ycql_ldap_search_attribute, "", "Attribute to match against the username in the " |
128 | | "search when doing search + bind authentication. If no attribute is specified, the uid attribute " |
129 | | "is used."); |
130 | | DEFINE_string(ycql_ldap_search_filter, "", "The search filter to use when doing search + bind " |
131 | | "authentication."); |
132 | | |
133 | | namespace yb { |
134 | | namespace cqlserver { |
135 | | |
136 | | constexpr const char* const kCassandraPasswordAuthenticator = |
137 | | "org.apache.cassandra.auth.PasswordAuthenticator"; |
138 | | |
139 | | extern const char* const kRoleColumnNameSaltedHash; |
140 | | extern const char* const kRoleColumnNameCanLogin; |
141 | | |
142 | | using namespace yb::ql; // NOLINT |
143 | | |
144 | | using std::make_unique; |
145 | | using std::shared_ptr; |
146 | | using std::unique_ptr; |
147 | | |
148 | | using client::YBClient; |
149 | | using client::YBSession; |
150 | | using ql::ExecutedResult; |
151 | | using ql::PreparedResult; |
152 | | using ql::RowsResult; |
153 | | using ql::SetKeyspaceResult; |
154 | | using ql::SchemaChangeResult; |
155 | | using ql::QLProcessor; |
156 | | using ql::ParseTree; |
157 | | using ql::Statement; |
158 | | using ql::StatementBatch; |
159 | | using ql::ErrorCode; |
160 | | using ql::GetErrorCode; |
161 | | |
162 | | using ql::audit::IsPrepare; |
163 | | using ql::audit::ErrorIsFormatted; |
164 | | |
165 | | using strings::Substitute; |
166 | | |
167 | | using yb::util::bcrypt_checkpw; |
168 | | |
169 | | //------------------------------------------------------------------------------------------------ |
170 | | CQLMetrics::CQLMetrics(const scoped_refptr<yb::MetricEntity>& metric_entity) |
171 | 4.54k | : QLMetrics(metric_entity) { |
172 | 4.54k | time_to_process_request_ = |
173 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_ProcessRequest.Instantiate( |
174 | 4.54k | metric_entity); |
175 | 4.54k | time_to_get_cql_processor_ = |
176 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_GetProcessor.Instantiate(metric_entity); |
177 | 4.54k | time_to_parse_cql_wrapper_ = |
178 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_ParseRequest.Instantiate(metric_entity); |
179 | 4.54k | time_to_execute_cql_request_ = |
180 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_ExecuteRequest.Instantiate( |
181 | 4.54k | metric_entity); |
182 | 4.54k | time_to_queue_cql_response_ = |
183 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_QueueResponse.Instantiate(metric_entity); |
184 | 4.54k | rpc_method_metrics_.handler_latency = |
185 | 4.54k | METRIC_handler_latency_yb_cqlserver_CQLServerService_Any.Instantiate(metric_entity); |
186 | 4.54k | num_errors_parsing_cql_ = |
187 | 4.54k | METRIC_yb_cqlserver_CQLServerService_ParsingErrors.Instantiate(metric_entity); |
188 | 4.54k | cql_processors_alive_ = METRIC_cql_processors_alive.Instantiate(metric_entity, 0); |
189 | 4.54k | cql_processors_created_ = METRIC_cql_processors_created.Instantiate(metric_entity); |
190 | 4.54k | parsers_alive_ = METRIC_cql_parsers_alive.Instantiate(metric_entity, 0); |
191 | 4.54k | parsers_created_ = METRIC_cql_parsers_created.Instantiate(metric_entity); |
192 | 4.54k | } |
193 | | |
194 | | //------------------------------------------------------------------------------------------------ |
195 | | CQLProcessor::CQLProcessor(CQLServiceImpl* service_impl, const CQLProcessorListPos& pos) |
196 | | : QLProcessor(service_impl->client(), service_impl->metadata_cache(), |
197 | | service_impl->cql_metrics().get(), |
198 | | &service_impl->parser_pool(), |
199 | | service_impl->clock(), |
200 | | std::bind(&CQLServiceImpl::TransactionPool, service_impl)), |
201 | | service_impl_(service_impl), |
202 | | cql_metrics_(service_impl->cql_metrics()), |
203 | | pos_(pos), |
204 | | statement_executed_cb_(Bind(&CQLProcessor::StatementExecuted, Unretained(this))), |
205 | 17.1k | consumption_(service_impl->processors_mem_tracker(), sizeof(*this)) { |
206 | 17.1k | IncrementCounter(cql_metrics_->cql_processors_created_); |
207 | 17.1k | IncrementGauge(cql_metrics_->cql_processors_alive_); |
208 | 17.1k | } |
209 | | |
210 | 0 | CQLProcessor::~CQLProcessor() { |
211 | 0 | DecrementGauge(cql_metrics_->cql_processors_alive_); |
212 | 0 | } |
213 | | |
214 | 0 | void CQLProcessor::Shutdown() { |
215 | 0 | executor_.Shutdown(); |
216 | 0 | auto call = std::move(call_); |
217 | 0 | if (call) { |
218 | 0 | call->RespondFailure( |
219 | 0 | rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, STATUS(Aborted, "Aborted")); |
220 | 0 | } |
221 | 0 | } |
222 | | |
223 | 4.56M | void CQLProcessor::ProcessCall(rpc::InboundCallPtr call) { |
224 | 4.56M | call_ = std::dynamic_pointer_cast<CQLInboundCall>(std::move(call)); |
225 | 4.56M | call_->SetRpcMethodMetrics(cql_metrics_->rpc_method_metrics_); |
226 | 4.56M | is_rescheduled_.store(IsRescheduled::kFalse, std::memory_order_release); |
227 | 4.56M | audit_logger_.SetConnection(call_->connection()); |
228 | 4.56M | unique_ptr<CQLRequest> request; |
229 | 4.56M | unique_ptr<CQLResponse> response; |
230 | | |
231 | | // Parse the CQL request. If the parser failed, it sets the error message in response. |
232 | 4.56M | parse_begin_ = MonoTime::Now(); |
233 | 4.56M | const auto& context = static_cast<const CQLConnectionContext&>(call_->connection()->context()); |
234 | 4.56M | const auto compression_scheme = context.compression_scheme(); |
235 | 4.56M | if (!CQLRequest::ParseRequest(call_->serialized_request(), compression_scheme, |
236 | 7 | &request, &response)) { |
237 | 7 | cql_metrics_->num_errors_parsing_cql_->Increment(); |
238 | 7 | PrepareAndSendResponse(response); |
239 | 7 | return; |
240 | 7 | } |
241 | | |
242 | 4.56M | execute_begin_ = MonoTime::Now(); |
243 | 4.56M | cql_metrics_->time_to_parse_cql_wrapper_->Increment( |
244 | 4.56M | execute_begin_.GetDeltaSince(parse_begin_).ToMicroseconds()); |
245 | | |
246 | | // Execute the request (perhaps asynchronously). |
247 | 4.56M | SetCurrentSession(call_->ql_session()); |
248 | 4.56M | request_ = std::move(request); |
249 | 4.56M | call_->SetRequest(request_, service_impl_); |
250 | 4.56M | retry_count_ = 0; |
251 | 4.56M | response = ProcessRequest(*request_); |
252 | 4.56M | PrepareAndSendResponse(response); |
253 | 4.56M | } |
254 | | |
255 | 4.74M | void CQLProcessor::Release() { |
256 | 4.74M | call_ = nullptr; |
257 | 4.74M | request_ = nullptr; |
258 | 4.74M | stmts_.clear(); |
259 | 4.74M | parse_trees_.clear(); |
260 | 4.74M | SetCurrentSession(nullptr); |
261 | 4.74M | is_rescheduled_.store(IsRescheduled::kFalse, std::memory_order_release); |
262 | 4.74M | audit_logger_.SetConnection(nullptr); |
263 | 4.74M | service_impl_->ReturnProcessor(pos_); |
264 | 4.74M | } |
265 | | |
266 | 9.07M | void CQLProcessor::PrepareAndSendResponse(const unique_ptr<CQLResponse>& response) { |
267 | 9.07M | if (response) { |
268 | 4.54M | const CQLConnectionContext& context = |
269 | 4.54M | static_cast<const CQLConnectionContext&>(call_->connection()->context()); |
270 | 4.54M | response->set_registered_events(context.registered_events()); |
271 | 4.54M | SendResponse(*response); |
272 | 4.54M | } |
273 | 9.07M | } |
274 | | |
275 | 4.55M | void CQLProcessor::SendResponse(const CQLResponse& response) { |
276 | | // Serialize the response to return to the CQL client. In case of error, an error response |
277 | | // should still be present. |
278 | 4.55M | MonoTime response_begin = MonoTime::Now(); |
279 | 4.55M | const auto& context = static_cast<const CQLConnectionContext&>(call_->connection()->context()); |
280 | 4.55M | const auto compression_scheme = context.compression_scheme(); |
281 | 4.55M | faststring msg; |
282 | 4.55M | response.Serialize(compression_scheme, &msg); |
283 | 4.55M | call_->RespondSuccess(RefCntBuffer(msg)); |
284 | | |
285 | 4.55M | MonoTime response_done = MonoTime::Now(); |
286 | 4.55M | cql_metrics_->time_to_process_request_->Increment( |
287 | 4.55M | response_done.GetDeltaSince(parse_begin_).ToMicroseconds()); |
288 | 4.56M | if (request_ != nullptr) { |
289 | 4.56M | cql_metrics_->time_to_execute_cql_request_->Increment( |
290 | 4.56M | response_begin.GetDeltaSince(execute_begin_).ToMicroseconds()); |
291 | 4.56M | } |
292 | 4.55M | cql_metrics_->time_to_queue_cql_response_->Increment( |
293 | 4.55M | response_done.GetDeltaSince(response_begin).ToMicroseconds()); |
294 | | |
295 | 4.55M | Release(); |
296 | 4.55M | } |
297 | | |
298 | 67.4k | bool CQLProcessor::CheckAuthentication(const CQLRequest& req) const { |
299 | 67.4k | return call_->ql_session()->is_user_authenticated() || |
300 | | // CQL requests which do not need authorization. |
301 | 12.5k | req.opcode() == CQLMessage::Opcode::STARTUP || |
302 | | // Some drivers issue OPTIONS prior to AUTHENTICATE |
303 | 6.28k | req.opcode() == CQLMessage::Opcode::OPTIONS || |
304 | 6.28k | req.opcode() == CQLMessage::Opcode::AUTH_RESPONSE; |
305 | 67.4k | } |
306 | | |
307 | 4.56M | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const CQLRequest& req) { |
308 | 4.56M | if (FLAGS_use_cassandra_authentication && !CheckAuthentication(req)) { |
309 | 0 | LOG(ERROR) << "Could not execute statement by not authenticated user!"; |
310 | 0 | return make_unique<ErrorResponse>( |
311 | 0 | req, ErrorResponse::Code::SERVER_ERROR, |
312 | 0 | "Could not execute statement by not authenticated user"); |
313 | 0 | } |
314 | | |
315 | 4.56M | switch (req.opcode()) { |
316 | 2.50k | case CQLMessage::Opcode::OPTIONS: |
317 | 2.50k | return ProcessRequest(static_cast<const OptionsRequest&>(req)); |
318 | 9.39k | case CQLMessage::Opcode::STARTUP: |
319 | 9.39k | return ProcessRequest(static_cast<const StartupRequest&>(req)); |
320 | 5.02k | case CQLMessage::Opcode::PREPARE: |
321 | 5.02k | return ProcessRequest(static_cast<const PrepareRequest&>(req)); |
322 | 4.37M | case CQLMessage::Opcode::EXECUTE: |
323 | 4.37M | return ProcessRequest(static_cast<const ExecuteRequest&>(req)); |
324 | 159k | case CQLMessage::Opcode::QUERY: |
325 | 159k | return ProcessRequest(static_cast<const QueryRequest&>(req)); |
326 | 1.66k | case CQLMessage::Opcode::BATCH: |
327 | 1.66k | return ProcessRequest(static_cast<const BatchRequest&>(req)); |
328 | 6.28k | case CQLMessage::Opcode::AUTH_RESPONSE: |
329 | 6.28k | return ProcessRequest(static_cast<const AuthResponseRequest&>(req)); |
330 | 2.23k | case CQLMessage::Opcode::REGISTER: |
331 | 2.23k | return ProcessRequest(static_cast<const RegisterRequest&>(req)); |
332 | | |
333 | 0 | case CQLMessage::Opcode::ERROR: FALLTHROUGH_INTENDED; |
334 | 0 | case CQLMessage::Opcode::READY: FALLTHROUGH_INTENDED; |
335 | 0 | case CQLMessage::Opcode::AUTHENTICATE: FALLTHROUGH_INTENDED; |
336 | 0 | case CQLMessage::Opcode::SUPPORTED: FALLTHROUGH_INTENDED; |
337 | 0 | case CQLMessage::Opcode::RESULT: FALLTHROUGH_INTENDED; |
338 | 0 | case CQLMessage::Opcode::EVENT: FALLTHROUGH_INTENDED; |
339 | 0 | case CQLMessage::Opcode::AUTH_CHALLENGE: FALLTHROUGH_INTENDED; |
340 | 0 | case CQLMessage::Opcode::AUTH_SUCCESS: |
341 | 0 | break; |
342 | 0 | } |
343 | | |
344 | 0 | LOG(FATAL) << "Invalid CQL request: opcode = " << static_cast<int>(req.opcode()); |
345 | 0 | return nullptr; |
346 | 0 | } |
347 | | |
348 | 2.50k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const OptionsRequest& req) { |
349 | 2.50k | return make_unique<SupportedResponse>(req, &kSupportedOptions); |
350 | 2.50k | } |
351 | | |
352 | 9.39k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const StartupRequest& req) { |
353 | 26.7k | for (const auto& option : req.options()) { |
354 | 26.7k | const auto& name = option.first; |
355 | 26.7k | const auto& value = option.second; |
356 | 26.7k | const auto it = kSupportedOptions.find(name); |
357 | 26.7k | if (it == kSupportedOptions.end() || |
358 | 17.3k | std::find(it->second.begin(), it->second.end(), value) == it->second.end()) { |
359 | 17.3k | YB_LOG_EVERY_N_SECS(WARNING, 60) << Format("Unsupported driver option $0 = $1", name, value); |
360 | 17.3k | } |
361 | 26.7k | if (name == CQLMessage::kCompressionOption) { |
362 | 44 | auto& context = static_cast<CQLConnectionContext&>(call_->connection()->context()); |
363 | 44 | if (value == CQLMessage::kLZ4Compression) { |
364 | 22 | context.set_compression_scheme(CQLMessage::CompressionScheme::kLz4); |
365 | 22 | } else if (value == CQLMessage::kSnappyCompression) { |
366 | 22 | context.set_compression_scheme(CQLMessage::CompressionScheme::kSnappy); |
367 | 0 | } else { |
368 | 0 | return make_unique<ErrorResponse>( |
369 | 0 | req, ErrorResponse::Code::PROTOCOL_ERROR, |
370 | 0 | Substitute("Unsupported compression scheme $0", value)); |
371 | 0 | } |
372 | 44 | } |
373 | 26.7k | } |
374 | 9.39k | if (FLAGS_use_cassandra_authentication) { |
375 | 6.29k | return make_unique<AuthenticateResponse>(req, kCassandraPasswordAuthenticator); |
376 | 3.09k | } else { |
377 | 3.09k | return make_unique<ReadyResponse>(req); |
378 | 3.09k | } |
379 | 9.39k | } |
380 | | |
381 | 5.03k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const PrepareRequest& req) { |
382 | 3 | VLOG(1) << "PREPARE " << req.query(); |
383 | 5.03k | const CQLMessage::QueryId query_id = CQLStatement::GetQueryId( |
384 | 5.03k | ql_env_.CurrentKeyspace(), req.query()); |
385 | | // To prevent multiple clients from preparing the same new statement in parallel and trying to |
386 | | // cache the same statement (a typical "login storm" scenario), each caller will try to allocate |
387 | | // the statement in the cached statement first. If it already exists, the existing one will be |
388 | | // returned instead. Then, each client will try to prepare the statement. The first one will do |
389 | | // the actual prepare while the rest wait. As the rest do the prepare afterwards, the statement |
390 | | // is already prepared so it will be an no-op (see Statement::Prepare). |
391 | 5.03k | shared_ptr<CQLStatement> stmt = service_impl_->AllocatePreparedStatement( |
392 | 5.03k | query_id, ql_env_.CurrentKeyspace(), req.query()); |
393 | 5.03k | PreparedResult::UniPtr result; |
394 | 5.03k | Status s = stmt->Prepare(this, service_impl_->prepared_stmts_mem_tracker(), |
395 | 5.03k | false /* internal */, &result); |
396 | | |
397 | 5.03k | if (s.ok()) { |
398 | 4.85k | auto pt_result = stmt->GetParseTree(); |
399 | 4.85k | if (pt_result.ok()) { |
400 | 4.85k | s = audit_logger_.LogStatement(pt_result->root().get(), req.query(), |
401 | 4.85k | IsPrepare::kTrue); |
402 | 0 | } else { |
403 | 0 | s = pt_result.status(); |
404 | 0 | } |
405 | 177 | } else { |
406 | 177 | WARN_NOT_OK(audit_logger_.LogStatementError(req.query(), s, |
407 | 177 | ErrorIsFormatted::kTrue), |
408 | 177 | "Failed to log an audit record"); |
409 | 177 | } |
410 | | |
411 | 5.03k | if (!s.ok()) { |
412 | 172 | service_impl_->DeletePreparedStatement(stmt); |
413 | 172 | return ProcessError(s, stmt->query_id()); |
414 | 172 | } |
415 | | |
416 | 4.85k | return (result != nullptr) ? make_unique<PreparedResultResponse>(req, query_id, *result) |
417 | 341 | : make_unique<PreparedResultResponse>(req, query_id); |
418 | 4.85k | } |
419 | | |
420 | 4.37M | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const ExecuteRequest& req) { |
421 | 4.49k | VLOG(1) << "EXECUTE " << b2a_hex(req.query_id()); |
422 | 4.37M | const shared_ptr<const CQLStatement> stmt = GetPreparedStatement(req.query_id()); |
423 | 4.37M | if (stmt == nullptr) { |
424 | 125 | return ProcessError(ErrorStatus(ErrorCode::UNPREPARED_STATEMENT), req.query_id()); |
425 | 125 | } |
426 | 4.37M | const Status s = stmt->ExecuteAsync(this, req.params(), statement_executed_cb_); |
427 | 4.36M | return s.ok() ? nullptr : ProcessError(s, stmt->query_id()); |
428 | 4.37M | } |
429 | | |
430 | 159k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const QueryRequest& req) { |
431 | 1.75k | VLOG(1) << "QUERY " << req.query(); |
432 | 159k | if (service_impl_->system_cache() != nullptr) { |
433 | 149k | auto cached_response = service_impl_->system_cache()->Lookup(req.query()); |
434 | 149k | if (cached_response) { |
435 | 100 | VLOG(1) << "Using cached response for " << req.query(); |
436 | 9.91k | statement_executed_cb_.Run( |
437 | 9.91k | Status::OK(), |
438 | 9.91k | std::static_pointer_cast<ExecutedResult>(*cached_response)); |
439 | 9.91k | return nullptr; |
440 | 9.91k | } |
441 | 149k | } |
442 | 149k | RunAsync(req.query(), req.params(), statement_executed_cb_); |
443 | 149k | return nullptr; |
444 | 149k | } |
445 | | |
446 | 1.66k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const BatchRequest& req) { |
447 | 18.4E | VLOG(1) << "BATCH " << req.queries().size(); |
448 | | |
449 | 1.66k | StatementBatch batch; |
450 | 1.66k | batch.reserve(req.queries().size()); |
451 | | |
452 | | // If no errors happen, batch request started here will be ended by the executor. |
453 | 1.66k | Status s = audit_logger_.StartBatchRequest(req.queries().size(), |
454 | 1.66k | is_rescheduled_.load(std::memory_order_acquire)); |
455 | 1.66k | if (PREDICT_FALSE(!s.ok())) { |
456 | 0 | return ProcessError(s); |
457 | 0 | } |
458 | | |
459 | 1.66k | unique_ptr<CQLResponse> result; |
460 | | // For each query in the batch, look up the query id if it is a prepared statement, or prepare the |
461 | | // query if it is not prepared. Then execute the parse trees with the parameters. |
462 | 389k | for (const BatchRequest::Query& query : req.queries()) { |
463 | 389k | if (query.is_prepared) { |
464 | 0 | VLOG(1) << "BATCH EXECUTE " << b2a_hex(query.query_id); |
465 | 389k | const shared_ptr<const CQLStatement> stmt = GetPreparedStatement(query.query_id); |
466 | 389k | if (stmt == nullptr) { |
467 | 3 | result = ProcessError(ErrorStatus(ErrorCode::UNPREPARED_STATEMENT), query.query_id); |
468 | 3 | break; |
469 | 3 | } |
470 | 389k | const Result<const ParseTree&> parse_tree = stmt->GetParseTree(); |
471 | 389k | if (!parse_tree) { |
472 | 0 | result = ProcessError(parse_tree.status(), query.query_id); |
473 | 0 | break; |
474 | 0 | } |
475 | 389k | batch.emplace_back(*parse_tree, query.params); |
476 | 296 | } else { |
477 | 1 | VLOG(1) << "BATCH QUERY " << query.query; |
478 | 296 | ParseTree::UniPtr parse_tree; |
479 | 296 | s = Prepare(query.query, &parse_tree); |
480 | 296 | if (PREDICT_FALSE(!s.ok())) { |
481 | 2 | result = ProcessError(s); |
482 | 2 | break; |
483 | 2 | } |
484 | 294 | batch.emplace_back(*parse_tree, query.params); |
485 | 294 | parse_trees_.insert(std::move(parse_tree)); |
486 | 294 | } |
487 | 389k | } |
488 | | |
489 | 1.66k | if (result) { |
490 | 5 | s = audit_logger_.EndBatchRequest(); |
491 | | // Otherwise, batch request will be ended by the executor or when processing an error. |
492 | 5 | if (PREDICT_FALSE(!s.ok())) { |
493 | 0 | result = ProcessError(s); |
494 | 0 | } |
495 | 5 | return result; |
496 | 5 | } |
497 | | |
498 | 1.65k | ExecuteAsync(batch, statement_executed_cb_); |
499 | | |
500 | 1.65k | return nullptr; |
501 | 1.65k | } |
502 | | |
503 | 6.28k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const AuthResponseRequest& req) { |
504 | 6.28k | const auto& params = req.params(); |
505 | 6.28k | if (FLAGS_ycql_cache_login_info) { |
506 | 519 | auto salted_hash_result = ql_env_.RoleSaltedHash(params.username); |
507 | 519 | if (salted_hash_result.ok()) { |
508 | 398 | auto can_login_result = ql_env_.RoleCanLogin(params.username); |
509 | 398 | if (can_login_result.ok()) { |
510 | 398 | unique_ptr<CQLResponse> response = |
511 | 398 | ProcessAuthResult(*salted_hash_result, *can_login_result); |
512 | 0 | VLOG(1) << "Used cached authentication"; |
513 | 398 | Status s = audit_logger_.LogAuthResponse(*response); |
514 | 398 | return response; |
515 | 0 | } else { |
516 | 0 | VLOG(1) << "Unable to get can_login for user " << params.username << ": " |
517 | 0 | << can_login_result; |
518 | 0 | } |
519 | 121 | } else { |
520 | 0 | VLOG(1) << "Unable to get salted hash for user " << params.username << ": " |
521 | 0 | << salted_hash_result; |
522 | 121 | } |
523 | 519 | } |
524 | 5.89k | shared_ptr<Statement> stmt = service_impl_->GetAuthPreparedStatement(); |
525 | 5.89k | if (!stmt->Prepare(this, nullptr /* memtracker */, true /* internal */).ok()) { |
526 | 0 | return make_unique<ErrorResponse>( |
527 | 0 | req, ErrorResponse::Code::SERVER_ERROR, |
528 | 0 | "Could not prepare statement for querying user " + params.username); |
529 | 0 | } |
530 | 5.89k | if (!stmt->ExecuteAsync(this, params, statement_executed_cb_).ok()) { |
531 | 0 | LOG(ERROR) << "Could not execute prepared statement to fetch login info!"; |
532 | 0 | return make_unique<ErrorResponse>( |
533 | 0 | req, ErrorResponse::Code::SERVER_ERROR, |
534 | 0 | "Could not execute prepared statement for querying roles for user " + params.username); |
535 | 0 | } |
536 | 5.89k | return nullptr; |
537 | 5.89k | } |
538 | | |
539 | 2.23k | unique_ptr<CQLResponse> CQLProcessor::ProcessRequest(const RegisterRequest& req) { |
540 | 2.23k | CQLConnectionContext& context = |
541 | 2.23k | static_cast<CQLConnectionContext&>(call_->connection()->context()); |
542 | 2.23k | context.add_registered_events(req.events()); |
543 | 2.23k | return make_unique<ReadyResponse>(req); |
544 | 2.23k | } |
545 | | |
546 | 4.76M | shared_ptr<const CQLStatement> CQLProcessor::GetPreparedStatement(const CQLMessage::QueryId& id) { |
547 | 4.76M | shared_ptr<const CQLStatement> stmt = service_impl_->GetPreparedStatement(id); |
548 | 4.76M | if (stmt != nullptr) { |
549 | 4.76M | stmt->clear_reparsed(); |
550 | 4.76M | stmts_.insert(stmt); |
551 | 4.76M | } |
552 | 4.76M | return stmt; |
553 | 4.76M | } |
554 | | |
555 | 4.53M | void CQLProcessor::StatementExecuted(const Status& s, const ExecutedResult::SharedPtr& result) { |
556 | 4.51M | unique_ptr<CQLResponse> response(s.ok() ? ProcessResult(result) : ProcessError(s)); |
557 | 4.53M | if (response && !s.ok()) { |
558 | | // Error response means we're not going to be transparently restarting a query. |
559 | 4.87k | WARN_NOT_OK(audit_logger_.EndBatchRequest(), "Failed to end batch request"); |
560 | 4.87k | } |
561 | 4.53M | PrepareAndSendResponse(response); |
562 | 4.53M | } |
563 | | |
564 | | unique_ptr<CQLResponse> CQLProcessor::ProcessError(const Status& s, |
565 | 5.18k | boost::optional<CQLMessage::QueryId> query_id) { |
566 | 5.18k | if (s.IsQLError()) { |
567 | 5.04k | ErrorCode ql_errcode = GetErrorCode(s); |
568 | 5.04k | if (ql_errcode == ErrorCode::UNPREPARED_STATEMENT || |
569 | 4.92k | ql_errcode == ErrorCode::STALE_METADATA) { |
570 | | // Delete all stale prepared statements from our cache. Since CQL protocol allows only one |
571 | | // unprepared query id to be returned, we will return just the last unprepared / stale one |
572 | | // we found. |
573 | 1.07k | for (auto stmt : stmts_) { |
574 | 1.07k | if (stmt->stale()) { |
575 | 1.07k | service_impl_->DeletePreparedStatement(stmt); |
576 | 1.07k | } |
577 | 1.08k | if (stmt->unprepared() || stmt->stale()) { |
578 | 1.08k | query_id = stmt->query_id(); |
579 | 1.08k | } |
580 | 1.07k | } |
581 | 1.21k | if (query_id) { |
582 | 1.20k | return make_unique<UnpreparedErrorResponse>(*request_, *query_id); |
583 | 1.20k | } |
584 | | // When no unprepared query id is found, it means all statements we executed were queries |
585 | | // (non-prepared statements). In that case, just retry the request (once only). The retry |
586 | | // needs to be rescheduled in because this callback may not be executed in the RPC worker |
587 | | // thread. Also, rescheduling gives other calls a chance to execute first before we do. |
588 | 7 | if (++retry_count_ == 1) { |
589 | 7 | stmts_.clear(); |
590 | 7 | parse_trees_.clear(); |
591 | 7 | Reschedule(&process_request_task_.Bind(this)); |
592 | 7 | return nullptr; |
593 | 7 | } |
594 | 18.4E | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::INVALID, |
595 | 18.4E | "Query failed to execute due to stale metadata cache"); |
596 | 3.83k | } else if (ql_errcode < ErrorCode::SUCCESS) { |
597 | 3.83k | if (ql_errcode == ErrorCode::UNAUTHORIZED) { |
598 | 174 | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::UNAUTHORIZED, |
599 | 174 | s.ToUserMessage()); |
600 | 3.66k | } else if (ql_errcode > ErrorCode::LIMITATION_ERROR) { |
601 | | // System errors, internal errors, or crashes. |
602 | 2.09k | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR, |
603 | 2.09k | s.ToUserMessage()); |
604 | 1.56k | } else if (ql_errcode > ErrorCode::SEM_ERROR) { |
605 | | // Limitation, lexical, or parsing errors. |
606 | 272 | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SYNTAX_ERROR, |
607 | 272 | s.ToUserMessage()); |
608 | 1.29k | } else { |
609 | | // Semantic or execution errors. |
610 | 1.29k | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::INVALID, |
611 | 1.29k | s.ToUserMessage()); |
612 | 1.29k | } |
613 | 18.4E | } |
614 | | |
615 | 18.4E | LOG(ERROR) << "Internal error: invalid error code " << static_cast<int64_t>(GetErrorCode(s)); |
616 | 18.4E | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR, |
617 | 18.4E | "Invalid error code"); |
618 | 138 | } else if (s.IsNotAuthorized()) { |
619 | 60 | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::UNAUTHORIZED, |
620 | 60 | s.ToUserMessage()); |
621 | 60 | } |
622 | | |
623 | 78 | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR, |
624 | 78 | s.ToUserMessage()); |
625 | 78 | } |
626 | | |
627 | | namespace { |
628 | | |
629 | | struct LDAPMemoryDeleter { |
630 | 33 | void operator()(void* ptr) const { ldap_memfree(ptr); } |
631 | | }; |
632 | | |
633 | | struct LDAPMessageDeleter { |
634 | 27 | void operator()(LDAPMessage* ptr) const { ldap_msgfree(ptr); } |
635 | | }; |
636 | | |
637 | | struct LDAPDeleter { |
638 | 64 | void operator()(LDAP* ptr) const { ldap_unbind_ext(ptr, NULL, NULL); } |
639 | | }; |
640 | | |
641 | | using LDAPHolder = unique_ptr<LDAP, LDAPDeleter>; |
642 | | using LDAPMessageHolder = unique_ptr<LDAPMessage, LDAPMessageDeleter>; |
643 | | template<class T> |
644 | | using LDAPMemoryHolder = unique_ptr<T, LDAPMemoryDeleter>; |
645 | | |
646 | | class LDAPError { |
647 | | public: |
648 | 0 | explicit LDAPError(int c) : code(c), ldap_(nullptr) {} |
649 | 14 | LDAPError(int c, const LDAPHolder& l) : code(c), ldap_(&l) {} |
650 | | |
651 | 14 | LDAP* GetLDAP() const { return ldap_ ? ldap_->get() : nullptr; } |
652 | | const int code; |
653 | | |
654 | | private: |
655 | | const LDAPHolder* ldap_; |
656 | | }; |
657 | | |
658 | | /* |
659 | | * Add a detail error message text to the current error if one can be |
660 | | * constructed from the LDAP 'diagnostic message'. |
661 | | */ |
662 | 14 | ostream& operator<<(ostream& str, const LDAPError& error) { |
663 | 14 | str << ldap_err2string(error.code); |
664 | 14 | auto ldap = error.GetLDAP(); |
665 | 14 | if (ldap) { |
666 | 14 | char *message = nullptr; |
667 | 14 | const auto rc = ldap_get_option(ldap, LDAP_OPT_DIAGNOSTIC_MESSAGE, &message); |
668 | 14 | if (rc == LDAP_SUCCESS && message != nullptr) { |
669 | 13 | LDAPMemoryHolder<char> holder(message); |
670 | 13 | str << " LDAP diagnostics: " << message; |
671 | 13 | } |
672 | 14 | } |
673 | 14 | return str; |
674 | 14 | } |
675 | | |
676 | 64 | Result<LDAPHolder> InitializeLDAPConnection(const char *uris) { |
677 | 64 | LDAP* ldap_ptr = nullptr; |
678 | 64 | auto r = ldap_initialize(&ldap_ptr, uris); |
679 | 64 | if (r != LDAP_SUCCESS) { |
680 | 0 | return STATUS_FORMAT(InternalError, "could not initialize LDAP: $0", LDAPError(r)); |
681 | 0 | } |
682 | 64 | LDAPHolder ldap(ldap_ptr); |
683 | 64 | VLOG(4) << "Successfully initialized LDAP struct"; |
684 | | |
685 | 64 | int ldapversion = LDAP_VERSION3; |
686 | 64 | if ((r = ldap_set_option(ldap_ptr, LDAP_OPT_PROTOCOL_VERSION, &ldapversion)) != LDAP_SUCCESS) { |
687 | 0 | return STATUS_FORMAT(InternalError, "could not set LDAP protocol version: $0", |
688 | 0 | LDAPError(r, ldap)); |
689 | 0 | } |
690 | 64 | VLOG(4) << "Successfully set protocol version option"; |
691 | | |
692 | 64 | if (FLAGS_ycql_ldap_tls && ((r = ldap_start_tls_s(ldap_ptr, NULL, NULL)) != LDAP_SUCCESS)) { |
693 | 0 | return STATUS_FORMAT(InternalError, "could not start LDAP TLS session: $0", LDAPError(r, ldap)); |
694 | 0 | } |
695 | | |
696 | 64 | return ldap; |
697 | 64 | } |
698 | | |
699 | | /* |
700 | | * Return a newly allocated string copied from "pattern" with all |
701 | | * occurrences of the placeholder "$username" replaced with "user_name". |
702 | | */ |
703 | 14 | std::string FormatSearchFilter(const std::string& pattern, const std::string& user_name) { |
704 | 14 | return boost::replace_all_copy(pattern, "$username", user_name); |
705 | 14 | } |
706 | | |
707 | 60 | Result<bool> CheckLDAPAuth(const ql::AuthResponseRequest::AuthQueryParameters& params) { |
708 | 60 | if (params.username.empty() || params.password.empty()) { |
709 | | // Refer https://datatracker.ietf.org/doc/html/rfc4513#section-6.3.1 for details. Applications |
710 | | // are required to explicitly have this check and can't rely on an LDAP server to report |
711 | | // invalid credentials error. |
712 | 16 | return STATUS(InvalidArgument, "Empty username and/or password not allowed"); |
713 | 16 | } |
714 | | |
715 | 44 | if (FLAGS_ycql_ldap_server.empty()) |
716 | 0 | return STATUS(InvalidArgument, "LDAP server not specified"); |
717 | | |
718 | 44 | VLOG(4) << "Attempting ldap_initialize() with " << FLAGS_ycql_ldap_server; |
719 | | |
720 | 44 | const auto& uris = FLAGS_ycql_ldap_server; |
721 | 44 | auto ldap = VERIFY_RESULT(InitializeLDAPConnection(uris.c_str())); |
722 | | |
723 | 44 | int r; |
724 | 44 | std::string fulluser; |
725 | 44 | if (!FLAGS_ycql_ldap_base_dn.empty()) { |
726 | 36 | if (FLAGS_ycql_ldap_bind_dn.empty() || FLAGS_ycql_ldap_bind_passwd.empty()) { |
727 | 3 | return STATUS(InvalidArgument, |
728 | 3 | "Empty bind dn and/or bind password not allowed for search+bind mode"); |
729 | 3 | } |
730 | | |
731 | | /* |
732 | | * First perform an LDAP search to find the DN for the user we are |
733 | | * trying to log in as. |
734 | | */ |
735 | 33 | char ldap_no_attrs[sizeof(LDAP_NO_ATTRS)+1]; |
736 | 33 | strncpy(ldap_no_attrs, LDAP_NO_ATTRS, sizeof(ldap_no_attrs)); |
737 | 33 | char *attributes[] = {ldap_no_attrs, NULL}; |
738 | | |
739 | | /* |
740 | | * Disallow any characters that we would otherwise need to escape, |
741 | | * since they aren't really reasonable in a username anyway. Allowing |
742 | | * them would make it possible to inject any kind of custom filters in |
743 | | * the LDAP filter. |
744 | | */ |
745 | 305 | for (const char& c : params.username) { |
746 | 305 | switch (c) { |
747 | 2 | case '*': |
748 | 2 | case '(': |
749 | 2 | case ')': |
750 | 2 | case '\\': |
751 | 2 | case '/': |
752 | 2 | return STATUS(InvalidArgument, "invalid character in user name for LDAP authentication"); |
753 | 305 | } |
754 | 305 | } |
755 | | |
756 | | /* |
757 | | * Bind with a pre-defined username/password (if available) for |
758 | | * searching. If none is specified, this turns into an anonymous bind. |
759 | | */ |
760 | 31 | struct berval cred; |
761 | 31 | ber_str2bv(FLAGS_ycql_ldap_bind_passwd.c_str(), 0 /* len */, 0 /* duplicate */ , &cred); |
762 | 31 | r = ldap_sasl_bind_s(ldap.get(), FLAGS_ycql_ldap_bind_dn.c_str(), |
763 | 31 | LDAP_SASL_SIMPLE, &cred, |
764 | 31 | NULL /* serverctrls */, NULL /* clientctrls */, |
765 | 31 | NULL /* servercredp */); |
766 | 31 | if (r != LDAP_SUCCESS) { |
767 | 4 | return STATUS_FORMAT( |
768 | 4 | InvalidArgument, |
769 | 4 | "could not perform initial LDAP bind for ldapbinddn '$0' on server '$1': $2", |
770 | 4 | FLAGS_ycql_ldap_bind_dn, FLAGS_ycql_ldap_server, LDAPError(r, ldap)); |
771 | 4 | } |
772 | | |
773 | 27 | std::string filter; |
774 | | /* Build a custom filter or a single attribute filter? */ |
775 | 27 | if (!FLAGS_ycql_ldap_search_filter.empty()) { |
776 | 14 | filter = FormatSearchFilter(FLAGS_ycql_ldap_search_filter, params.username); |
777 | 13 | } else if (!FLAGS_ycql_ldap_search_attribute.empty()) { |
778 | 13 | filter = "(" + FLAGS_ycql_ldap_search_attribute + "=" + params.username + ")"; |
779 | 0 | } else { |
780 | 0 | filter = "(uid=" + params.username + ")"; |
781 | 0 | } |
782 | | |
783 | 27 | LDAPMessage *search_message; |
784 | 27 | r = ldap_search_ext_s(ldap.get(), FLAGS_ycql_ldap_base_dn.c_str(), LDAP_SCOPE_SUBTREE, |
785 | 27 | filter.c_str(), attributes, 0, NULL, NULL, NULL, 0, &search_message); |
786 | 27 | LDAPMessageHolder search_message_holder{search_message}; |
787 | | |
788 | 27 | if (r != LDAP_SUCCESS) { |
789 | 2 | return STATUS_FORMAT( |
790 | 2 | InternalError, "could not search LDAP for filter '$0' on server '$1': $2", filter, |
791 | 2 | FLAGS_ycql_ldap_server, LDAPError(r, ldap)); |
792 | 2 | } |
793 | | |
794 | 25 | auto count = ldap_count_entries(ldap.get(), search_message); |
795 | 25 | switch(count) { |
796 | 3 | case 0: |
797 | 3 | return STATUS_FORMAT( |
798 | 0 | NotFound, |
799 | 0 | "LDAP user '$0' does not exist. "\ |
800 | 0 | "LDAP search for filter '$1' on server '$2' returned no entries.", |
801 | 0 | params.username, filter, FLAGS_ycql_ldap_server); |
802 | 20 | case 1: |
803 | 20 | break; |
804 | 2 | default: |
805 | 2 | return STATUS_FORMAT( |
806 | 20 | NotFound, "LDAP user '$0' is not unique, $1 entries exist.", params.username, count); |
807 | 20 | } |
808 | | |
809 | | // No need to free entry pointer since it is a pointer to data in |
810 | | // search_message. Freeing search_message takes cares of it. |
811 | 20 | auto *entry = ldap_first_entry(ldap.get(), search_message); |
812 | 20 | char *dn = ldap_get_dn(ldap.get(), entry); |
813 | 20 | if (dn == NULL) { |
814 | 0 | int error; |
815 | 0 | ldap_get_option(ldap.get(), LDAP_OPT_ERROR_NUMBER, &error); |
816 | 0 | return STATUS_FORMAT( |
817 | 0 | NotFound, "could not get dn for the first entry matching '$0' on server '$1': $2", |
818 | 0 | filter, FLAGS_ycql_ldap_server, LDAPError(error, ldap)); |
819 | 0 | } |
820 | 20 | LDAPMemoryHolder<char> dn_holder{dn}; |
821 | 20 | fulluser = dn; |
822 | | |
823 | | /* |
824 | | * Need to re-initialize the LDAP connection, so that we can bind to |
825 | | * it with a different username. |
826 | | */ |
827 | 20 | ldap = VERIFY_RESULT(InitializeLDAPConnection(uris.c_str())); |
828 | 8 | } else { |
829 | 8 | fulluser = FLAGS_ycql_ldap_user_prefix + params.username + FLAGS_ycql_ldap_user_suffix; |
830 | 8 | } |
831 | | |
832 | 28 | VLOG(4) << "Checking authentication using LDAP for user DN=" << fulluser; |
833 | | |
834 | 28 | struct berval cred; |
835 | 28 | ber_str2bv(params.password.c_str(), 0 /* len */, 0 /* duplicate */, &cred); |
836 | 28 | r = ldap_sasl_bind_s(ldap.get(), fulluser.c_str(), |
837 | 28 | LDAP_SASL_SIMPLE, &cred, |
838 | 28 | NULL /* serverctrls */, NULL /* clientctrls */, |
839 | 28 | NULL /* servercredp */); |
840 | 28 | VLOG(4) << "ldap_sasl_bind_s return value =" << r; |
841 | | |
842 | 28 | if (r != LDAP_SUCCESS) { |
843 | 8 | std::ostringstream str; |
844 | 8 | str << "LDAP login failed for user '" << fulluser << "' on server '" |
845 | 8 | << FLAGS_ycql_ldap_server << "': " << LDAPError(r, ldap); |
846 | 8 | auto error_msg = str.str(); |
847 | 8 | if (r == LDAP_INVALID_CREDENTIALS) { |
848 | 7 | LOG(ERROR) << error_msg; |
849 | 7 | return false; |
850 | 7 | } |
851 | | |
852 | 1 | return STATUS(InternalError, error_msg); |
853 | 1 | } |
854 | | |
855 | 20 | return true; |
856 | 20 | } |
857 | | |
858 | 140 | static bool UserIn(const std::string& username, const std::string& users_to_skip) { |
859 | 140 | size_t comma_index = 0; |
860 | 140 | size_t prev_comma_index = -1; |
861 | | |
862 | | // TODO(Piyush): Store a static list of usernames from csv instead of traversing each time. |
863 | 140 | while ((comma_index = users_to_skip.find(",", prev_comma_index + 1)) != std::string::npos) { |
864 | 0 | if (users_to_skip.substr(prev_comma_index + 1, |
865 | 0 | comma_index - (prev_comma_index + 1)) == username) |
866 | 0 | return true; |
867 | 0 | VLOG(2) << "Check " << username << " with " |
868 | 0 | << users_to_skip.substr(prev_comma_index + 1, comma_index - (prev_comma_index + 1)); |
869 | 0 | prev_comma_index = comma_index; |
870 | 0 | } |
871 | 140 | VLOG(2) << "Check " << username << " with " |
872 | 140 | << users_to_skip.substr( |
873 | 140 | prev_comma_index + 1, users_to_skip.size() - (prev_comma_index + 1)); |
874 | 140 | return users_to_skip.substr(prev_comma_index + 1, users_to_skip.size() - (prev_comma_index + 1)) |
875 | 140 | == username; |
876 | 140 | } |
877 | | |
878 | | } // namespace |
879 | | |
880 | 6.23k | unique_ptr<CQLResponse> CQLProcessor::ProcessAuthResult(const string& saved_hash, bool can_login) { |
881 | 6.23k | const auto& req = down_cast<const AuthResponseRequest&>(*request_); |
882 | 6.23k | const auto& params = req.params(); |
883 | 6.23k | unique_ptr<CQLResponse> response = nullptr; |
884 | 6.23k | bool authenticated = false; |
885 | | |
886 | 6.23k | if (FLAGS_ycql_use_ldap && !UserIn(params.username, FLAGS_ycql_ldap_users_to_skip_csv)) { |
887 | 60 | Result<bool> ldap_auth_result = CheckLDAPAuth(req.params()); |
888 | 60 | if (!ldap_auth_result.ok()) { |
889 | 33 | return make_unique<ErrorResponse>( |
890 | 33 | *request_, ErrorResponse::Code::SERVER_ERROR, |
891 | 33 | "Failed to authenticate using LDAP: " + yb::ToString(ldap_auth_result)); |
892 | 27 | } else if (!*ldap_auth_result) { |
893 | 7 | response = make_unique<ErrorResponse>( |
894 | 7 | *request_, ErrorResponse::Code::BAD_CREDENTIALS, |
895 | 7 | "Failed to authenticate using LDAP: Provided username '" + params.username + |
896 | 7 | "' and/or password are incorrect"); |
897 | 20 | } else { |
898 | 20 | authenticated = true; |
899 | 20 | call_->ql_session()->set_current_role_name(params.username); |
900 | 20 | response = make_unique<AuthSuccessResponse>(*request_, |
901 | 20 | "" /* this does not matter */); |
902 | 20 | } |
903 | 6.17k | } else if (saved_hash.empty()) { |
904 | | // Username doesn't have a password, but one is required for authentication. Return an error. |
905 | 6 | response = make_unique<ErrorResponse>( |
906 | 6 | *request_, ErrorResponse::Code::BAD_CREDENTIALS, |
907 | 6 | "Provided username '" + params.username + "' and/or password are incorrect"); |
908 | 6.16k | } else { |
909 | 6.16k | if (!service_impl_->CheckPassword(params.password, saved_hash)) { |
910 | 7 | response = make_unique<ErrorResponse>( |
911 | 7 | *request_, ErrorResponse::Code::BAD_CREDENTIALS, |
912 | 7 | "Provided username '" + params.username + "' and/or password are incorrect"); |
913 | 6.16k | } else if (!can_login) { |
914 | 18 | response = make_unique<ErrorResponse>( |
915 | 18 | *request_, ErrorResponse::Code::BAD_CREDENTIALS, |
916 | 18 | params.username + " is not permitted to log in"); |
917 | 6.14k | } else { |
918 | 6.14k | call_->ql_session()->set_current_role_name(params.username); |
919 | 6.14k | response = make_unique<AuthSuccessResponse>(*request_, "" /* this does not matter */); |
920 | 6.14k | authenticated = true; |
921 | 6.14k | } |
922 | 6.16k | } |
923 | 6.20k | call_->ql_session()->set_user_authenticated(authenticated); |
924 | 6.20k | return response; |
925 | 6.23k | } |
926 | | |
927 | 4.52M | unique_ptr<CQLResponse> CQLProcessor::ProcessResult(const ExecutedResult::SharedPtr& result) { |
928 | 4.52M | if (result == nullptr) { |
929 | 796k | return make_unique<VoidResultResponse>(*request_); |
930 | 796k | } |
931 | 3.72M | switch (result->type()) { |
932 | 4.18k | case ExecutedResult::Type::SET_KEYSPACE: { |
933 | 4.18k | const auto& set_keyspace_result = static_cast<const SetKeyspaceResult&>(*result); |
934 | 4.18k | return make_unique<SetKeyspaceResultResponse>(*request_, set_keyspace_result); |
935 | 0 | } |
936 | 3.70M | case ExecutedResult::Type::ROWS: { |
937 | 3.70M | const RowsResult::SharedPtr& rows_result = std::static_pointer_cast<RowsResult>(result); |
938 | 3.70M | if (request_->opcode() != CQLMessage::Opcode::AUTH_RESPONSE) { |
939 | 3.69M | cql_metrics_->ql_response_size_bytes_->Increment(rows_result->rows_data().size()); |
940 | 3.69M | } |
941 | 3.70M | switch (request_->opcode()) { |
942 | 3.62M | case CQLMessage::Opcode::EXECUTE: |
943 | 3.62M | return make_unique<RowsResultResponse>(down_cast<const ExecuteRequest&>(*request_), |
944 | 3.62M | rows_result); |
945 | 92.8k | case CQLMessage::Opcode::QUERY: |
946 | 92.8k | return make_unique<RowsResultResponse>(down_cast<const QueryRequest&>(*request_), |
947 | 92.8k | rows_result); |
948 | 52 | case CQLMessage::Opcode::BATCH: |
949 | 52 | return make_unique<RowsResultResponse>(down_cast<const BatchRequest&>(*request_), |
950 | 52 | rows_result); |
951 | | |
952 | 5.88k | case CQLMessage::Opcode::AUTH_RESPONSE: { |
953 | 5.88k | const auto& req = down_cast<const AuthResponseRequest&>(*request_); |
954 | 5.88k | const auto& params = req.params(); |
955 | 5.88k | const auto row_block = rows_result->GetRowBlock(); |
956 | 5.88k | unique_ptr<CQLResponse> response = nullptr; |
957 | 5.88k | if (row_block->row_count() != 1) { |
958 | 52 | response = make_unique<ErrorResponse>( |
959 | 52 | *request_, ErrorResponse::Code::BAD_CREDENTIALS, |
960 | 52 | "Provided username '" + params.username + "' and/or password are incorrect"); |
961 | 5.83k | } else { |
962 | 5.83k | const auto& row = row_block->row(0); |
963 | 5.83k | const auto& schema = row_block->schema(); |
964 | | |
965 | 5.83k | const QLValue& salted_hash_value = |
966 | 5.83k | row.column(schema.find_column(kRoleColumnNameSaltedHash)); |
967 | 5.83k | const auto& can_login = |
968 | 5.83k | row.column(schema.find_column(kRoleColumnNameCanLogin)).bool_value(); |
969 | | // Returning empty string is fine since it would error out as expected, |
970 | | // if the hash is empty |
971 | 5.83k | const string saved_hash = salted_hash_value.IsNull() ? |
972 | 5.77k | "" : salted_hash_value.string_value(); |
973 | 5.83k | response = ProcessAuthResult(saved_hash, can_login); |
974 | 5.83k | } |
975 | 5.88k | Status s = audit_logger_.LogAuthResponse(*response); |
976 | 5.88k | if (!s.ok()) { |
977 | 0 | return make_unique<ErrorResponse>(*request_, ErrorResponse::Code::SERVER_ERROR, |
978 | 0 | "Failed to write an audit log record"); |
979 | 0 | } |
980 | | |
981 | 5.88k | return response; |
982 | 5.88k | } |
983 | 0 | case CQLMessage::Opcode::ERROR: FALLTHROUGH_INTENDED; |
984 | 0 | case CQLMessage::Opcode::STARTUP: FALLTHROUGH_INTENDED; |
985 | 0 | case CQLMessage::Opcode::READY: FALLTHROUGH_INTENDED; |
986 | 0 | case CQLMessage::Opcode::AUTHENTICATE: FALLTHROUGH_INTENDED; |
987 | 0 | case CQLMessage::Opcode::OPTIONS: FALLTHROUGH_INTENDED; |
988 | 0 | case CQLMessage::Opcode::SUPPORTED: FALLTHROUGH_INTENDED; |
989 | 0 | case CQLMessage::Opcode::RESULT: FALLTHROUGH_INTENDED; |
990 | 0 | case CQLMessage::Opcode::PREPARE: FALLTHROUGH_INTENDED; |
991 | 0 | case CQLMessage::Opcode::REGISTER: FALLTHROUGH_INTENDED; |
992 | 0 | case CQLMessage::Opcode::EVENT: FALLTHROUGH_INTENDED; |
993 | 0 | case CQLMessage::Opcode::AUTH_CHALLENGE: FALLTHROUGH_INTENDED; |
994 | 0 | case CQLMessage::Opcode::AUTH_SUCCESS: |
995 | 0 | break; |
996 | | // default: fall through. |
997 | 0 | } |
998 | 0 | LOG(FATAL) << "Internal error: not a request that returns result " |
999 | 0 | << static_cast<int>(request_->opcode()); |
1000 | 0 | break; |
1001 | 0 | } |
1002 | 6.05k | case ExecutedResult::Type::SCHEMA_CHANGE: { |
1003 | 6.05k | const auto& schema_change_result = static_cast<const SchemaChangeResult&>(*result); |
1004 | 6.05k | return make_unique<SchemaChangeResultResponse>(*request_, schema_change_result); |
1005 | 0 | } |
1006 | | |
1007 | | // default: fall through. |
1008 | 0 | } |
1009 | 0 | LOG(ERROR) << "Internal error: unknown result type " << static_cast<int>(result->type()); |
1010 | 0 | return make_unique<ErrorResponse>( |
1011 | 0 | *request_, ErrorResponse::Code::SERVER_ERROR, "Internal error: unknown result type"); |
1012 | 0 | } |
1013 | | |
1014 | 4.96M | bool CQLProcessor::NeedReschedule() { |
1015 | 4.96M | auto messenger = service_impl_->messenger(); |
1016 | 4.96M | if (!messenger) { |
1017 | 0 | return false; |
1018 | 0 | } |
1019 | 4.96M | return !messenger->ThreadPool(rpc::ServicePriority::kNormal).OwnsThisThread(); |
1020 | 4.96M | } |
1021 | | |
1022 | 4.94M | void CQLProcessor::Reschedule(rpc::ThreadPoolTask* task) { |
1023 | 4.94M | is_rescheduled_.store(IsRescheduled::kTrue, std::memory_order_release); |
1024 | 4.94M | auto messenger = service_impl_->messenger(); |
1025 | 12.5k | DCHECK(messenger != nullptr) << "No messenger to reschedule CQL call"; |
1026 | 4.94M | messenger->ThreadPool(rpc::ServicePriority::kNormal).Enqueue(task); |
1027 | 4.94M | } |
1028 | | |
1029 | 5.11M | CoarseTimePoint CQLProcessor::GetDeadline() const { |
1030 | 4.92M | return call_ ? call_->GetClientDeadline() |
1031 | 185k | : CoarseMonoClock::now() + FLAGS_client_read_write_timeout_ms * 1ms; |
1032 | 5.11M | } |
1033 | | |
1034 | | } // namespace cqlserver |
1035 | | } // namespace yb |