/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/yql/cql/cqlserver/cql_rpc.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | #include "yb/gutil/strings/escaping.h" |
20 | | |
21 | | #include "yb/rpc/connection.h" |
22 | | #include "yb/rpc/messenger.h" |
23 | | #include "yb/rpc/reactor.h" |
24 | | #include "yb/rpc/rpc_introspection.pb.h" |
25 | | |
26 | | #include "yb/util/debug/trace_event.h" |
27 | | #include "yb/util/result.h" |
28 | | #include "yb/util/size_literals.h" |
29 | | #include "yb/util/status_format.h" |
30 | | |
31 | | #include "yb/yql/cql/cqlserver/cql_service.h" |
32 | | |
33 | | using namespace std::literals; |
34 | | using namespace std::placeholders; |
35 | | using yb::ql::CQLMessage; |
36 | | using yb::ql::CQLRequest; |
37 | | using yb::ql::ErrorResponse; |
38 | | using yb::operator"" _KB; |
39 | | using yb::operator"" _MB; |
40 | | |
41 | | DECLARE_bool(rpc_dump_all_traces); |
42 | | DECLARE_int32(rpc_slow_query_threshold_ms); |
43 | | DEFINE_int32(rpcz_max_cql_query_dump_size, 4_KB, |
44 | | "The maximum size of the CQL query string in the RPCZ dump."); |
45 | | DEFINE_int32(rpcz_max_cql_batch_dump_count, 4_KB, |
46 | | "The maximum number of CQL batch elements in the RPCZ dump."); |
47 | | DEFINE_bool(throttle_cql_calls_on_soft_memory_limit, true, |
48 | | "Whether to reject CQL calls when soft memory limit is reached."); |
49 | | DEFINE_bool(display_bind_params_in_cql_details, true, |
50 | | "Whether to show bind params for CQL calls details in the RPCZ dump."); |
51 | | |
52 | | constexpr int kDropPolicy = 1; |
53 | | constexpr int kRejectPolicy = 0; |
54 | | |
55 | | DEFINE_int32(throttle_cql_calls_policy, kRejectPolicy, |
56 | | "Policy for throttling CQL calls. 1 - drop throttled calls. " |
57 | | "0 - respond with OVERLOADED error."); |
58 | | |
59 | | DECLARE_uint64(rpc_max_message_size); |
60 | | |
61 | | // Max msg length for CQL. |
62 | | // Since yb_rpc limit is 255MB, we limit consensensus size to 254MB, |
63 | | // and hence max cql message length to 253MB |
64 | | // This length corresponds to 3 strings with size of 64MB along with any additional fields |
65 | | // and overheads |
66 | | DEFINE_int32(max_message_length, 254_MB, |
67 | | "The maximum message length of the cql message."); |
68 | | |
69 | | // By default the CQL server sends CQL EVENTs (opcode=0x0c) only if the connection was |
70 | | // subscribed (via REGISTER request) for particular events. The flag allows to send all |
71 | | // available event always - even if the connection was not subscribed for events. |
72 | | DEFINE_bool(cql_server_always_send_events, false, |
73 | | "All CQL connections automatically subscribed for all CQL events."); |
74 | | |
75 | | DECLARE_int32(client_read_write_timeout_ms); |
76 | | |
77 | | namespace yb { |
78 | | namespace cqlserver { |
79 | | |
80 | | CQLConnectionContext::CQLConnectionContext( |
81 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
82 | | const MemTrackerPtr& call_tracker) |
83 | | : ql_session_(new ql::QLSession()), |
84 | | parser_(buffer_tracker, CQLMessage::kMessageHeaderLength, CQLMessage::kHeaderPosLength, |
85 | | FLAGS_max_message_length, rpc::IncludeHeader::kTrue, rpc::SkipEmptyMessages::kFalse, |
86 | | this), |
87 | | read_buffer_(receive_buffer_size, buffer_tracker), |
88 | 9.40k | call_tracker_(call_tracker) { |
89 | 0 | VLOG(1) << "CQL Connection Context: FLAGS_cql_server_always_send_events = " << |
90 | 0 | FLAGS_cql_server_always_send_events; |
91 | | |
92 | 9.40k | if (FLAGS_cql_server_always_send_events) { |
93 | 4 | registered_events_ = CQLMessage::kAllEvents; |
94 | 4 | } |
95 | 9.40k | } |
96 | | |
97 | | Result<rpc::ProcessCallsResult> CQLConnectionContext::ProcessCalls( |
98 | | const rpc::ConnectionPtr& connection, const IoVecs& data, |
99 | 3.85M | rpc::ReadBufferFull read_buffer_full) { |
100 | 3.85M | return parser_.Parse(connection, data, read_buffer_full, nullptr /* tracker_for_throttle */); |
101 | 3.85M | } |
102 | | |
103 | | Status CQLConnectionContext::HandleCall( |
104 | 4.56M | const rpc::ConnectionPtr& connection, rpc::CallData* call_data) { |
105 | 4.56M | auto reactor = connection->reactor(); |
106 | 4.56M | DCHECK(reactor->IsCurrentThread()); |
107 | | |
108 | 4.56M | auto call = rpc::InboundCall::Create<CQLInboundCall>( |
109 | 4.56M | connection, call_processed_listener(), ql_session_); |
110 | | |
111 | 4.56M | Status s = call->ParseFrom(call_tracker_, call_data); |
112 | 4.56M | if (!s.ok()) { |
113 | 0 | LOG(WARNING) << connection->ToString() << ": received bad data: " << s.ToString(); |
114 | 0 | return STATUS_SUBSTITUTE(NetworkError, "Bad data: $0", s.ToUserMessage()); |
115 | 0 | } |
116 | | |
117 | 4.56M | if (FLAGS_throttle_cql_calls_on_soft_memory_limit) { |
118 | 4.56M | if (!CheckMemoryPressureWithLogging(call_tracker_, /* score= */ 0.0, "Rejecting CQL call: ")) { |
119 | 1 | if (FLAGS_throttle_cql_calls_policy != kDropPolicy) { |
120 | 1 | static Status status = STATUS(ServiceUnavailable, "Server is under memory pressure"); |
121 | | // We did not store call yet, so should not notify that it was processed. |
122 | 1 | call->ResetCallProcessedListener(); |
123 | 1 | call->RespondFailure(rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY, Status::OK()); |
124 | 1 | } // Otherwise silently drop the call without queueing it. Clients will get a timeout. |
125 | 1 | return Status::OK(); |
126 | 1 | } |
127 | 4.56M | } |
128 | | |
129 | 4.56M | s = Store(call.get()); |
130 | 4.56M | if (!s.ok()) { |
131 | 0 | return s; |
132 | 0 | } |
133 | | |
134 | 4.56M | reactor->messenger()->Handle(call, rpc::Queue::kTrue); |
135 | | |
136 | 4.56M | return Status::OK(); |
137 | 4.56M | } |
138 | | |
139 | 9.11M | uint64_t CQLConnectionContext::ExtractCallId(rpc::InboundCall* call) { |
140 | 9.11M | return down_cast<CQLInboundCall*>(call)->stream_id(); |
141 | 9.11M | } |
142 | | |
143 | | void CQLConnectionContext::DumpPB(const rpc::DumpRunningRpcsRequestPB& req, |
144 | 0 | rpc::RpcConnectionPB* resp) { |
145 | 0 | const string keyspace = ql_session_->current_keyspace(); |
146 | 0 | if (!keyspace.empty()) { |
147 | 0 | resp->mutable_connection_details()->mutable_cql_connection_details()->set_keyspace(keyspace); |
148 | 0 | } |
149 | 0 | ConnectionContextWithCallId::DumpPB(req, resp); |
150 | 0 | } |
151 | | |
152 | | CQLInboundCall::CQLInboundCall(rpc::ConnectionPtr conn, |
153 | | CallProcessedListener call_processed_listener, |
154 | | ql::QLSession::SharedPtr ql_session) |
155 | | : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)), |
156 | | ql_session_(std::move(ql_session)), |
157 | 4.55M | deadline_(CoarseMonoClock::now() + FLAGS_client_read_write_timeout_ms * 1ms) { |
158 | 4.55M | } |
159 | | |
160 | 4.56M | Status CQLInboundCall::ParseFrom(const MemTrackerPtr& call_tracker, rpc::CallData* call_data) { |
161 | 4.56M | TRACE_EVENT_FLOW_BEGIN0("rpc", "CQLInboundCall", this); |
162 | 4.56M | TRACE_EVENT0("rpc", "CQLInboundCall::ParseFrom"); |
163 | | |
164 | 4.56M | consumption_ = ScopedTrackedConsumption(call_tracker, call_data->size()); |
165 | | |
166 | | // Parsing of CQL message is deferred to CQLServiceImpl::Handle. Just save the serialized data. |
167 | 4.56M | request_data_memory_usage_.store(call_data->size(), std::memory_order_release); |
168 | 4.56M | request_data_ = std::move(*call_data); |
169 | 4.56M | serialized_request_ = Slice(request_data_.data(), request_data_.size()); |
170 | | |
171 | | // Fill the service name method name to transfer the call to. The method name is for debug |
172 | | // tracing only. Inside CQLServiceImpl::Handle, we rely on the opcode to dispatch the execution. |
173 | 4.56M | stream_id_ = CQLRequest::ParseStreamId(serialized_request_); |
174 | | |
175 | 4.56M | return Status::OK(); |
176 | 4.56M | } |
177 | | |
178 | | namespace { |
179 | | |
180 | | const rpc::RemoteMethod remote_method("yb.cqlserver.CQLServerService", "ExecuteRequest"); |
181 | | |
182 | | } |
183 | | |
184 | 4.56M | Slice CQLInboundCall::serialized_remote_method() const { |
185 | 4.56M | return remote_method.serialized_body(); |
186 | 4.56M | } |
187 | | |
188 | 4.54k | Slice CQLInboundCall::static_serialized_remote_method() { |
189 | 4.54k | return remote_method.serialized_body(); |
190 | 4.54k | } |
191 | | |
192 | 34.1k | Slice CQLInboundCall::method_name() const { |
193 | 34.1k | return remote_method.method_name(); |
194 | 34.1k | } |
195 | | |
196 | 4.52M | void CQLInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
197 | 4.52M | TRACE_EVENT0("rpc", "CQLInboundCall::Serialize"); |
198 | 4.52M | CHECK_GT(response_msg_buf_.size(), 0); |
199 | | |
200 | 4.52M | output->push_back(std::move(response_msg_buf_)); |
201 | 4.52M | } |
202 | | |
203 | | void CQLInboundCall::RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code, |
204 | 1 | const Status& status) { |
205 | 1 | const auto& context = static_cast<const CQLConnectionContext&>(connection()->context()); |
206 | 1 | const auto compression_scheme = context.compression_scheme(); |
207 | 1 | faststring msg; |
208 | 1 | switch (error_code) { |
209 | 1 | case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: { |
210 | | // Return OVERLOADED error to redirect CQL client to the next host. |
211 | 1 | ErrorResponse(stream_id_, ErrorResponse::Code::OVERLOADED, status.message().ToBuffer()) |
212 | 1 | .Serialize(compression_scheme, &msg); |
213 | 1 | break; |
214 | 0 | } |
215 | 0 | case rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN: { |
216 | | // Return OVERLOADED error to redirect CQL client to the next host. |
217 | 0 | ErrorResponse(stream_id_, ErrorResponse::Code::OVERLOADED, "CQL shutting down") |
218 | 0 | .Serialize(compression_scheme, &msg); |
219 | 0 | break; |
220 | 0 | } |
221 | 0 | case rpc::ErrorStatusPB::ERROR_APPLICATION: FALLTHROUGH_INTENDED; |
222 | 0 | case rpc::ErrorStatusPB::ERROR_NO_SUCH_METHOD: FALLTHROUGH_INTENDED; |
223 | 0 | case rpc::ErrorStatusPB::ERROR_NO_SUCH_SERVICE: FALLTHROUGH_INTENDED; |
224 | 0 | case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST: FALLTHROUGH_INTENDED; |
225 | 0 | case rpc::ErrorStatusPB::FATAL_DESERIALIZING_REQUEST: FALLTHROUGH_INTENDED; |
226 | 0 | case rpc::ErrorStatusPB::FATAL_VERSION_MISMATCH: FALLTHROUGH_INTENDED; |
227 | 0 | case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED: FALLTHROUGH_INTENDED; |
228 | 0 | case rpc::ErrorStatusPB::FATAL_UNKNOWN: { |
229 | 0 | LOG(ERROR) << "Unexpected error status: " |
230 | 0 | << rpc::ErrorStatusPB::RpcErrorCodePB_Name(error_code); |
231 | 0 | ErrorResponse(stream_id_, ErrorResponse::Code::SERVER_ERROR, "Server error") |
232 | 0 | .Serialize(compression_scheme, &msg); |
233 | 0 | break; |
234 | 1 | } |
235 | 1 | } |
236 | 1 | response_msg_buf_ = RefCntBuffer(msg); |
237 | | |
238 | 1 | QueueResponse(/* is_success */ false); |
239 | 1 | } |
240 | | |
241 | 4.55M | void CQLInboundCall::RespondSuccess(const RefCntBuffer& buffer) { |
242 | 4.55M | response_msg_buf_ = buffer; |
243 | 4.55M | RecordHandlingCompleted(); |
244 | | |
245 | 4.55M | QueueResponse(/* is_success */ true); |
246 | 4.55M | } |
247 | | |
248 | 111 | void CQLInboundCall::GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const { |
249 | 111 | std::shared_ptr<const CQLRequest> request = |
250 | | #ifdef THREAD_SANITIZER |
251 | | request_; |
252 | | #else |
253 | 111 | std::atomic_load_explicit(&request_, std::memory_order_acquire); |
254 | 111 | #endif |
255 | 111 | if (request == nullptr) { |
256 | 0 | return; |
257 | 0 | } |
258 | 111 | rpc::CQLCallDetailsPB* call_in_progress = call_in_progress_pb->mutable_cql_details(); |
259 | 111 | rpc::CQLStatementsDetailsPB* details_pb; |
260 | 111 | std::shared_ptr<const CQLStatement> statement_ptr; |
261 | 111 | string query_id; |
262 | 111 | int j = 0; |
263 | 111 | switch (request->opcode()) { |
264 | 0 | case CQLMessage::Opcode::PREPARE: |
265 | 0 | call_in_progress->set_type("PREPARE"); |
266 | 0 | details_pb = call_in_progress->add_call_details(); |
267 | 0 | details_pb->set_sql_string((static_cast<const ql::PrepareRequest&>(*request)).query() |
268 | 0 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
269 | 0 | return; |
270 | 110 | case CQLMessage::Opcode::EXECUTE: { |
271 | 110 | call_in_progress->set_type("EXECUTE"); |
272 | 110 | details_pb = call_in_progress->add_call_details(); |
273 | 110 | const auto& exec_request = static_cast<const ql::ExecuteRequest&>(*request); |
274 | 110 | query_id = exec_request.query_id(); |
275 | 110 | details_pb->set_sql_id(b2a_hex(query_id)); |
276 | 110 | statement_ptr = service_impl_->GetPreparedStatement(query_id); |
277 | 110 | if (statement_ptr != nullptr) { |
278 | 110 | details_pb->set_sql_string(statement_ptr->text() |
279 | 110 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
280 | 110 | } |
281 | 110 | if (FLAGS_display_bind_params_in_cql_details) { |
282 | 110 | details_pb->set_params(yb::ToString(exec_request.params().values) |
283 | 110 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
284 | 110 | } |
285 | 110 | return; |
286 | 0 | } |
287 | 1 | case CQLMessage::Opcode::QUERY: |
288 | 1 | call_in_progress->set_type("QUERY"); |
289 | 1 | details_pb = call_in_progress->add_call_details(); |
290 | 1 | details_pb->set_sql_string((static_cast<const ql::QueryRequest&>(*request)).query() |
291 | 1 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
292 | 1 | return; |
293 | 0 | case CQLMessage::Opcode::BATCH: |
294 | 0 | call_in_progress->set_type("BATCH"); |
295 | 0 | for (const ql::BatchRequest::Query& batchQuery : |
296 | 0 | (static_cast<const ql::BatchRequest&>(*request)).queries()) { |
297 | 0 | details_pb = call_in_progress->add_call_details(); |
298 | 0 | if (batchQuery.is_prepared) { |
299 | 0 | details_pb->set_sql_id(b2a_hex(batchQuery.query_id)); |
300 | 0 | statement_ptr = service_impl_->GetPreparedStatement(batchQuery.query_id); |
301 | 0 | if (statement_ptr != nullptr) { |
302 | 0 | details_pb->set_sql_string( |
303 | 0 | statement_ptr->text().substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
304 | 0 | } |
305 | 0 | if (FLAGS_display_bind_params_in_cql_details) { |
306 | 0 | details_pb->set_params(yb::ToString(batchQuery.params.values) |
307 | 0 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
308 | 0 | } |
309 | 0 | } else { |
310 | 0 | details_pb->set_sql_string(batchQuery.query |
311 | 0 | .substr(0, FLAGS_rpcz_max_cql_query_dump_size)); |
312 | 0 | } |
313 | 0 | if (++j >= FLAGS_rpcz_max_cql_batch_dump_count) { |
314 | | // Showing only rpcz_max_cql_batch_dump_count queries |
315 | 0 | break; |
316 | 0 | } |
317 | 0 | } |
318 | 0 | return; |
319 | 0 | default: |
320 | 0 | return; |
321 | 111 | } |
322 | 111 | } |
323 | | |
324 | | |
325 | 4.54M | void CQLInboundCall::LogTrace() const { |
326 | 4.54M | MonoTime now = MonoTime::Now(); |
327 | 4.54M | auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds(); |
328 | 4.54M | if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces |
329 | 4.54M | || (trace_ && trace_->must_print()) |
330 | 111 | || total_time > FLAGS_rpc_slow_query_threshold_ms)) { |
331 | 111 | LOG(WARNING) << ToString() << " took " << total_time << "ms. Details:"; |
332 | 111 | rpc::RpcCallInProgressPB call_in_progress_pb; |
333 | 111 | GetCallDetails(&call_in_progress_pb); |
334 | 111 | LOG(WARNING) << call_in_progress_pb.DebugString() << "Trace: "; |
335 | 111 | if (trace_) { |
336 | 111 | trace_->Dump(&LOG(WARNING), /* include_time_deltas */ true); |
337 | 111 | } |
338 | 111 | } |
339 | 4.54M | } |
340 | | |
341 | 113 | std::string CQLInboundCall::ToString() const { |
342 | 113 | return Format("CQL Call from $0, stream id: $1", connection()->remote(), stream_id_); |
343 | 113 | } |
344 | | |
345 | | bool CQLInboundCall::DumpPB(const rpc::DumpRunningRpcsRequestPB& req, |
346 | 0 | rpc::RpcCallInProgressPB* resp) { |
347 | |
|
348 | 0 | if (req.include_traces() && trace_) { |
349 | 0 | resp->set_trace_buffer(trace_->DumpToString(true)); |
350 | 0 | } |
351 | 0 | resp->set_elapsed_millis( |
352 | 0 | MonoTime::Now().GetDeltaSince(timing_.time_received).ToMilliseconds()); |
353 | 0 | GetCallDetails(resp); |
354 | |
|
355 | 0 | return true; |
356 | 0 | } |
357 | | |
358 | 14.0M | CoarseTimePoint CQLInboundCall::GetClientDeadline() const { |
359 | 14.0M | return deadline_; |
360 | 14.0M | } |
361 | | |
362 | | } // namespace cqlserver |
363 | | } // namespace yb |