YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_processor.h
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
// This module is to define CQL processor. Each processor will be handling one and only one request
16
// at a time. As a result all processing code in this module and the modules that it is calling
17
// does not need to be thread safe.
18
// Notably, this does NOT apply to Reschedule implementation methods, which are called from
19
// different ExecContexts, so non-thread-safe fields should not be referenced there.
20
//--------------------------------------------------------------------------------------------------
21
22
#ifndef YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_
23
#define YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_
24
25
#include "yb/rpc/service_if.h"
26
27
#include "yb/yql/cql/cqlserver/cqlserver_fwd.h"
28
#include "yb/yql/cql/cqlserver/cql_rpc.h"
29
#include "yb/yql/cql/cqlserver/cql_statement.h"
30
31
#include "yb/yql/cql/ql/ql_processor.h"
32
#include "yb/yql/cql/ql/statement.h"
33
#include "yb/yql/cql/ql/util/cql_message.h"
34
35
namespace yb {
36
namespace cqlserver {
37
38
class CQLServiceImpl;
39
40
class CQLMetrics : public ql::QLMetrics {
41
 public:
42
  explicit CQLMetrics(const scoped_refptr<yb::MetricEntity>& metric_entity);
43
44
  scoped_refptr<yb::Histogram> time_to_process_request_;
45
  scoped_refptr<yb::Histogram> time_to_get_cql_processor_;
46
  scoped_refptr<yb::Histogram> time_to_parse_cql_wrapper_;
47
  scoped_refptr<yb::Histogram> time_to_execute_cql_request_;
48
49
  scoped_refptr<yb::Histogram> time_to_queue_cql_response_;
50
  scoped_refptr<yb::Counter> num_errors_parsing_cql_;
51
  // Rpc level metrics
52
  yb::rpc::RpcMethodMetrics rpc_method_metrics_;
53
54
  scoped_refptr<AtomicGauge<int64_t>> cql_processors_alive_;
55
  scoped_refptr<Counter> cql_processors_created_;
56
57
  scoped_refptr<AtomicGauge<int64_t>> parsers_alive_;
58
  scoped_refptr<Counter> parsers_created_;
59
};
60
61
// A list of CQL processors and position in the list.
62
63
class CQLProcessor : public ql::QLProcessor {
64
 public:
65
  // Constructor and destructor.
66
  explicit CQLProcessor(CQLServiceImpl* service_impl, const CQLProcessorListPos& pos);
67
  ~CQLProcessor();
68
69
  // Processing an inbound call.
70
  void ProcessCall(rpc::InboundCallPtr call);
71
72
  // Release the processor back to the CQLServiceImpl.
73
  void Release();
74
75
  void Shutdown();
76
77
 protected:
78
  bool NeedReschedule() override;
79
  void Reschedule(rpc::ThreadPoolTask* task) override;
80
  CoarseTimePoint GetDeadline() const override;
81
82
 private:
83
  bool CheckAuthentication(const ql::CQLRequest& req) const;
84
85
  // Process a CQL request.
86
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::CQLRequest& req);
87
88
  // Process specific CQL requests.
89
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::OptionsRequest& req);
90
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::StartupRequest& req);
91
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::PrepareRequest& req);
92
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::ExecuteRequest& req);
93
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::QueryRequest& req);
94
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::BatchRequest& req);
95
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::AuthResponseRequest& req);
96
  std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::RegisterRequest& req);
97
98
  // Get a prepared statement and adds it to the set of statements currently being executed.
99
  std::shared_ptr<const CQLStatement> GetPreparedStatement(const ql::CQLMessage::QueryId& id);
100
101
  // Statement executed callback.
102
  void StatementExecuted(const Status& s, const ql::ExecutedResult::SharedPtr& result = nullptr);
103
104
  // Process statement execution result and error.
105
  std::unique_ptr<ql::CQLResponse> ProcessResult(const ql::ExecutedResult::SharedPtr& result);
106
  std::unique_ptr<ql::CQLResponse> ProcessAuthResult(const string& saved_hash, bool can_login);
107
  std::unique_ptr<ql::CQLResponse> ProcessError(
108
      const Status& s,
109
      boost::optional<ql::CQLMessage::QueryId> query_id = boost::none);
110
111
  // Send response back to client.
112
  void PrepareAndSendResponse(const std::unique_ptr<ql::CQLResponse>& response);
113
  void SendResponse(const ql::CQLResponse& response);
114
115
  const std::unordered_map<std::string, std::vector<std::string>> kSupportedOptions = {
116
      {ql::CQLMessage::kCQLVersionOption,
117
          {"3.0.0" /* minimum */, "3.4.2" /* current */}},
118
      {ql::CQLMessage::kCompressionOption,
119
          {ql::CQLMessage::kLZ4Compression, ql::CQLMessage::kSnappyCompression}}
120
  };
121
122
  // Pointer to the containing CQL service implementation.
123
  CQLServiceImpl* const service_impl_;
124
125
  // CQL metrics.
126
  std::shared_ptr<CQLMetrics> cql_metrics_;
127
128
  // Position in the CQL processor list.
129
  const CQLProcessorListPos pos_;
130
131
  //----------------------------- StatementExecuted callback and state ---------------------------
132
133
  // Current call, request, prepared statements and parse trees being processed.
134
  CQLInboundCallPtr call_;
135
  std::shared_ptr<const ql::CQLRequest> request_;
136
  std::unordered_set<std::shared_ptr<const CQLStatement>> stmts_;
137
  std::unordered_set<ql::ParseTree::UniPtr> parse_trees_;
138
139
  // Current retry count.
140
  int retry_count_ = 0;
141
142
  // Parse and execute begin times.
143
  MonoTime parse_begin_;
144
  MonoTime execute_begin_;
145
146
  // Statement executed callback.
147
  ql::StatementExecutedCallback statement_executed_cb_;
148
149
  ScopedTrackedConsumption consumption_;
150
151
  //----------------------------------------------------------------------------------------------
152
153
  class ProcessRequestTask : public rpc::ThreadPoolTask {
154
   public:
155
7
    ProcessRequestTask& Bind(CQLProcessor* processor) {
156
7
      processor_ = processor;
157
7
      return *this;
158
7
    }
159
160
0
    virtual ~ProcessRequestTask() {}
161
162
   private:
163
7
    void Run() override {
164
7
      auto processor = processor_;
165
7
      processor_ = nullptr;
166
7
      std::unique_ptr<ql::CQLResponse> response(processor->ProcessRequest(*processor->request_));
167
7
      if (response != nullptr) {
168
1
        processor->SendResponse(*response);
169
1
      }
170
7
    }
171
172
7
    void Done(const Status& status) override {}
173
174
    CQLProcessor* processor_ = nullptr;
175
  };
176
177
  friend class ProcessRequestTask;
178
179
  ProcessRequestTask process_request_task_;
180
};
181
182
}  // namespace cqlserver
183
}  // namespace yb
184
185
#endif  // YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_