YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/yql/cql/cqlserver/cql_rpc.h"
17
18
#include "yb/gutil/casts.h"
19
#include "yb/gutil/strings/escaping.h"
20
21
#include "yb/rpc/connection.h"
22
#include "yb/rpc/messenger.h"
23
#include "yb/rpc/reactor.h"
24
#include "yb/rpc/rpc_introspection.pb.h"
25
26
#include "yb/util/debug/trace_event.h"
27
#include "yb/util/result.h"
28
#include "yb/util/size_literals.h"
29
#include "yb/util/status_format.h"
30
31
#include "yb/yql/cql/cqlserver/cql_service.h"
32
33
using namespace std::literals;
34
using namespace std::placeholders;
35
using yb::ql::CQLMessage;
36
using yb::ql::CQLRequest;
37
using yb::ql::ErrorResponse;
38
using yb::operator"" _KB;
39
using yb::operator"" _MB;
40
41
DECLARE_bool(rpc_dump_all_traces);
42
DECLARE_int32(rpc_slow_query_threshold_ms);
43
DEFINE_int32(rpcz_max_cql_query_dump_size, 4_KB,
44
             "The maximum size of the CQL query string in the RPCZ dump.");
45
DEFINE_int32(rpcz_max_cql_batch_dump_count, 4_KB,
46
             "The maximum number of CQL batch elements in the RPCZ dump.");
47
DEFINE_bool(throttle_cql_calls_on_soft_memory_limit, true,
48
            "Whether to reject CQL calls when soft memory limit is reached.");
49
DEFINE_bool(display_bind_params_in_cql_details, true,
50
            "Whether to show bind params for CQL calls details in the RPCZ dump.");
51
52
constexpr int kDropPolicy = 1;
53
constexpr int kRejectPolicy = 0;
54
55
DEFINE_int32(throttle_cql_calls_policy, kRejectPolicy,
56
              "Policy for throttling CQL calls. 1 - drop throttled calls. "
57
              "0 - respond with OVERLOADED error.");
58
59
DECLARE_uint64(rpc_max_message_size);
60
61
// Max msg length for CQL.
62
// Since yb_rpc limit is 255MB, we limit consensensus size to 254MB,
63
// and hence max cql message length to 253MB
64
// This length corresponds to 3 strings with size of 64MB along with any additional fields
65
// and overheads
66
DEFINE_int32(max_message_length, 254_MB,
67
             "The maximum message length of the cql message.");
68
69
// By default the CQL server sends CQL EVENTs (opcode=0x0c) only if the connection was
70
// subscribed (via REGISTER request) for particular events. The flag allows to send all
71
// available event always - even if the connection was not subscribed for events.
72
DEFINE_bool(cql_server_always_send_events, false,
73
            "All CQL connections automatically subscribed for all CQL events.");
74
75
DECLARE_int32(client_read_write_timeout_ms);
76
77
namespace yb {
78
namespace cqlserver {
79
80
CQLConnectionContext::CQLConnectionContext(
81
    size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
82
    const MemTrackerPtr& call_tracker)
83
    : ql_session_(new ql::QLSession()),
84
      parser_(buffer_tracker, CQLMessage::kMessageHeaderLength, CQLMessage::kHeaderPosLength,
85
              FLAGS_max_message_length, rpc::IncludeHeader::kTrue, rpc::SkipEmptyMessages::kFalse,
86
              this),
87
      read_buffer_(receive_buffer_size, buffer_tracker),
88
10.0k
      call_tracker_(call_tracker) {
89
10.0k
  VLOG(1) << "CQL Connection Context: FLAGS_cql_server_always_send_events = " <<
90
0
      FLAGS_cql_server_always_send_events;
91
92
10.0k
  if (FLAGS_cql_server_always_send_events) {
93
4
    registered_events_ = CQLMessage::kAllEvents;
94
4
  }
95
10.0k
}
96
97
Result<rpc::ProcessCallsResult> CQLConnectionContext::ProcessCalls(
98
    const rpc::ConnectionPtr& connection, const IoVecs& data,
99
7.62M
    rpc::ReadBufferFull read_buffer_full) {
100
7.62M
  return parser_.Parse(connection, data, read_buffer_full, nullptr /* tracker_for_throttle */);
101
7.62M
}
102
103
Status CQLConnectionContext::HandleCall(
104
8.94M
    const rpc::ConnectionPtr& connection, rpc::CallData* call_data) {
105
8.94M
  auto reactor = connection->reactor();
106
8.94M
  DCHECK(reactor->IsCurrentThread());
107
108
8.94M
  auto call = rpc::InboundCall::Create<CQLInboundCall>(
109
8.94M
      connection, call_processed_listener(), ql_session_);
110
111
8.94M
  Status s = call->ParseFrom(call_tracker_, call_data);
112
8.94M
  if (!s.ok()) {
113
0
    LOG(WARNING) << connection->ToString() << ": received bad data: " << s.ToString();
114
0
    return STATUS_SUBSTITUTE(NetworkError, "Bad data: $0", s.ToUserMessage());
115
0
  }
116
117
8.94M
  if (FLAGS_throttle_cql_calls_on_soft_memory_limit) {
118
8.93M
    if (!CheckMemoryPressureWithLogging(call_tracker_, /* score= */ 0.0, "Rejecting CQL call: ")) {
119
1
      if (FLAGS_throttle_cql_calls_policy != kDropPolicy) {
120
1
        static Status status = STATUS(ServiceUnavailable, "Server is under memory pressure");
121
        // We did not store call yet, so should not notify that it was processed.
122
1
        call->ResetCallProcessedListener();
123
1
        call->RespondFailure(rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY, Status::OK());
124
1
      } // Otherwise silently drop the call without queueing it. Clients will get a timeout.
125
1
      return Status::OK();
126
1
    }
127
8.93M
  }
128
129
8.94M
  s = Store(call.get());
130
8.94M
  if (!s.ok()) {
131
0
    return s;
132
0
  }
133
134
8.94M
  reactor->messenger()->Handle(call, rpc::Queue::kTrue);
135
136
8.94M
  return Status::OK();
137
8.94M
}
138
139
17.8M
uint64_t CQLConnectionContext::ExtractCallId(rpc::InboundCall* call) {
140
17.8M
  return down_cast<CQLInboundCall*>(call)->stream_id();
141
17.8M
}
142
143
void CQLConnectionContext::DumpPB(const rpc::DumpRunningRpcsRequestPB& req,
144
0
                                  rpc::RpcConnectionPB* resp) {
145
0
  const string keyspace = ql_session_->current_keyspace();
146
0
  if (!keyspace.empty()) {
147
0
    resp->mutable_connection_details()->mutable_cql_connection_details()->set_keyspace(keyspace);
148
0
  }
149
0
  ConnectionContextWithCallId::DumpPB(req, resp);
150
0
}
151
152
CQLInboundCall::CQLInboundCall(rpc::ConnectionPtr conn,
153
                               CallProcessedListener call_processed_listener,
154
                               ql::QLSession::SharedPtr ql_session)
155
    : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)),
156
      ql_session_(std::move(ql_session)),
157
8.93M
      deadline_(CoarseMonoClock::now() + FLAGS_client_read_write_timeout_ms * 1ms) {
158
8.93M
}
159
160
8.93M
Status CQLInboundCall::ParseFrom(const MemTrackerPtr& call_tracker, rpc::CallData* call_data) {
161
8.93M
  TRACE_EVENT_FLOW_BEGIN0("rpc", "CQLInboundCall", this);
162
8.93M
  TRACE_EVENT0("rpc", "CQLInboundCall::ParseFrom");
163
164
8.93M
  consumption_ = ScopedTrackedConsumption(call_tracker, call_data->size());
165
166
  // Parsing of CQL message is deferred to CQLServiceImpl::Handle. Just save the serialized data.
167
8.93M
  request_data_memory_usage_.store(call_data->size(), std::memory_order_release);
168
8.93M
  request_data_ = std::move(*call_data);
169
8.93M
  serialized_request_ = Slice(request_data_.data(), request_data_.size());
170
171
  // Fill the service name method name to transfer the call to. The method name is for debug
172
  // tracing only. Inside CQLServiceImpl::Handle, we rely on the opcode to dispatch the execution.
173
8.93M
  stream_id_ = CQLRequest::ParseStreamId(serialized_request_);
174
175
8.93M
  return Status::OK();
176
8.93M
}
177
178
namespace {
179
180
const rpc::RemoteMethod remote_method("yb.cqlserver.CQLServerService", "ExecuteRequest");
181
182
}
183
184
8.94M
Slice CQLInboundCall::serialized_remote_method() const {
185
8.94M
  return remote_method.serialized_body();
186
8.94M
}
187
188
6.11k
Slice CQLInboundCall::static_serialized_remote_method() {
189
6.11k
  return remote_method.serialized_body();
190
6.11k
}
191
192
38.8k
Slice CQLInboundCall::method_name() const {
193
38.8k
  return remote_method.method_name();
194
38.8k
}
195
196
8.94M
void CQLInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) {
197
8.94M
  TRACE_EVENT0("rpc", "CQLInboundCall::Serialize");
198
8.94M
  CHECK_GT(response_msg_buf_.size(), 0);
199
200
8.94M
  output->push_back(std::move(response_msg_buf_));
201
8.94M
}
202
203
void CQLInboundCall::RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code,
204
1
                                    const Status& status) {
205
1
  const auto& context = static_cast<const CQLConnectionContext&>(connection()->context());
206
1
  const auto compression_scheme = context.compression_scheme();
207
1
  faststring msg;
208
1
  switch (error_code) {
209
1
    case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: {
210
      // Return OVERLOADED error to redirect CQL client to the next host.
211
1
      ErrorResponse(stream_id_, ErrorResponse::Code::OVERLOADED, status.message().ToBuffer())
212
1
          .Serialize(compression_scheme, &msg);
213
1
      break;
214
0
    }
215
0
    case rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN: {
216
      // Return OVERLOADED error to redirect CQL client to the next host.
217
0
      ErrorResponse(stream_id_, ErrorResponse::Code::OVERLOADED, "CQL shutting down")
218
0
          .Serialize(compression_scheme, &msg);
219
0
      break;
220
0
    }
221
0
    case rpc::ErrorStatusPB::ERROR_APPLICATION: FALLTHROUGH_INTENDED;
222
0
    case rpc::ErrorStatusPB::ERROR_NO_SUCH_METHOD: FALLTHROUGH_INTENDED;
223
0
    case rpc::ErrorStatusPB::ERROR_NO_SUCH_SERVICE: FALLTHROUGH_INTENDED;
224
0
    case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST: FALLTHROUGH_INTENDED;
225
0
    case rpc::ErrorStatusPB::FATAL_DESERIALIZING_REQUEST: FALLTHROUGH_INTENDED;
226
0
    case rpc::ErrorStatusPB::FATAL_VERSION_MISMATCH: FALLTHROUGH_INTENDED;
227
0
    case rpc::ErrorStatusPB::FATAL_UNAUTHORIZED: FALLTHROUGH_INTENDED;
228
0
    case rpc::ErrorStatusPB::FATAL_UNKNOWN: {
229
0
      LOG(ERROR) << "Unexpected error status: "
230
0
                 << rpc::ErrorStatusPB::RpcErrorCodePB_Name(error_code);
231
0
      ErrorResponse(stream_id_, ErrorResponse::Code::SERVER_ERROR, "Server error")
232
0
          .Serialize(compression_scheme, &msg);
233
0
      break;
234
0
    }
235
1
  }
236
1
  response_msg_buf_ = RefCntBuffer(msg);
237
238
1
  QueueResponse(/* is_success */ false);
239
1
}
240
241
8.92M
void CQLInboundCall::RespondSuccess(const RefCntBuffer& buffer) {
242
8.92M
  response_msg_buf_ = buffer;
243
8.92M
  RecordHandlingCompleted();
244
245
8.92M
  QueueResponse(/* is_success */ true);
246
8.92M
}
247
248
97
void CQLInboundCall::GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const {
249
97
  std::shared_ptr<const CQLRequest> request =
250
#ifdef THREAD_SANITIZER
251
      request_;
252
#else
253
97
      std::atomic_load_explicit(&request_, std::memory_order_acquire);
254
97
#endif
255
97
  if (request == nullptr) {
256
0
    return;
257
0
  }
258
97
  rpc::CQLCallDetailsPB* call_in_progress = call_in_progress_pb->mutable_cql_details();
259
97
  rpc::CQLStatementsDetailsPB* details_pb;
260
97
  std::shared_ptr<const CQLStatement> statement_ptr;
261
97
  string query_id;
262
97
  int j = 0;
263
97
  switch (request->opcode()) {
264
0
    case CQLMessage::Opcode::PREPARE:
265
0
      call_in_progress->set_type("PREPARE");
266
0
      details_pb = call_in_progress->add_call_details();
267
0
      details_pb->set_sql_string((static_cast<const ql::PrepareRequest&>(*request)).query()
268
0
                                    .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
269
0
      return;
270
96
    case CQLMessage::Opcode::EXECUTE: {
271
96
      call_in_progress->set_type("EXECUTE");
272
96
      details_pb = call_in_progress->add_call_details();
273
96
      const auto& exec_request = static_cast<const ql::ExecuteRequest&>(*request);
274
96
      query_id = exec_request.query_id();
275
96
      details_pb->set_sql_id(b2a_hex(query_id));
276
96
      statement_ptr = service_impl_->GetPreparedStatement(query_id);
277
96
      if (statement_ptr != nullptr) {
278
96
        details_pb->set_sql_string(statement_ptr->text()
279
96
                                       .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
280
96
      }
281
96
      if (FLAGS_display_bind_params_in_cql_details) {
282
96
        details_pb->set_params(yb::ToString(exec_request.params().values)
283
96
                                   .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
284
96
      }
285
96
      return;
286
0
    }
287
1
    case CQLMessage::Opcode::QUERY:
288
1
      call_in_progress->set_type("QUERY");
289
1
      details_pb = call_in_progress->add_call_details();
290
1
      details_pb->set_sql_string((static_cast<const ql::QueryRequest&>(*request)).query()
291
1
                                    .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
292
1
      return;
293
0
    case CQLMessage::Opcode::BATCH:
294
0
      call_in_progress->set_type("BATCH");
295
0
      for (const ql::BatchRequest::Query& batchQuery :
296
0
          (static_cast<const ql::BatchRequest&>(*request)).queries()) {
297
0
        details_pb = call_in_progress->add_call_details();
298
0
        if (batchQuery.is_prepared) {
299
0
          details_pb->set_sql_id(b2a_hex(batchQuery.query_id));
300
0
          statement_ptr = service_impl_->GetPreparedStatement(batchQuery.query_id);
301
0
          if (statement_ptr != nullptr) {
302
0
            details_pb->set_sql_string(
303
0
                statement_ptr->text().substr(0, FLAGS_rpcz_max_cql_query_dump_size));
304
0
          }
305
0
          if (FLAGS_display_bind_params_in_cql_details) {
306
0
            details_pb->set_params(yb::ToString(batchQuery.params.values)
307
0
                                       .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
308
0
          }
309
0
        } else {
310
0
          details_pb->set_sql_string(batchQuery.query
311
0
                                         .substr(0, FLAGS_rpcz_max_cql_query_dump_size));
312
0
        }
313
0
        if (++j >= FLAGS_rpcz_max_cql_batch_dump_count) {
314
          // Showing only rpcz_max_cql_batch_dump_count queries
315
0
          break;
316
0
        }
317
0
      }
318
0
      return;
319
0
    default:
320
0
      return;
321
97
  }
322
97
}
323
324
325
8.92M
void CQLInboundCall::LogTrace() const {
326
8.92M
  MonoTime now = MonoTime::Now();
327
8.92M
  auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
328
8.92M
  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces
329
8.92M
          || (trace_ && trace_->must_print())
330
8.92M
          || total_time > FLAGS_rpc_slow_query_threshold_ms)) {
331
97
      LOG(WARNING) << ToString() << " took " << total_time << "ms. Details:";
332
97
      rpc::RpcCallInProgressPB call_in_progress_pb;
333
97
      GetCallDetails(&call_in_progress_pb);
334
97
      LOG(WARNING) << call_in_progress_pb.DebugString() << "Trace: ";
335
97
      if (trace_) {
336
97
        trace_->Dump(&LOG(WARNING), /* include_time_deltas */ true);
337
97
      }
338
97
  }
339
8.92M
}
340
341
99
std::string CQLInboundCall::ToString() const {
342
99
  return Format("CQL Call from $0, stream id: $1", connection()->remote(), stream_id_);
343
99
}
344
345
bool CQLInboundCall::DumpPB(const rpc::DumpRunningRpcsRequestPB& req,
346
0
                            rpc::RpcCallInProgressPB* resp) {
347
348
0
  if (req.include_traces() && trace_) {
349
0
    resp->set_trace_buffer(trace_->DumpToString(true));
350
0
  }
351
0
  resp->set_elapsed_millis(
352
0
      MonoTime::Now().GetDeltaSince(timing_.time_received).ToMilliseconds());
353
0
  GetCallDetails(resp);
354
355
0
  return true;
356
0
}
357
358
27.2M
CoarseTimePoint CQLInboundCall::GetClientDeadline() const {
359
27.2M
  return deadline_;
360
27.2M
}
361
362
} // namespace cqlserver
363
} // namespace yb