/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_processor.h
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | // This module is to define CQL processor. Each processor will be handling one and only one request |
16 | | // at a time. As a result all processing code in this module and the modules that it is calling |
17 | | // does not need to be thread safe. |
18 | | // Notably, this does NOT apply to Reschedule implementation methods, which are called from |
19 | | // different ExecContexts, so non-thread-safe fields should not be referenced there. |
20 | | //-------------------------------------------------------------------------------------------------- |
21 | | |
22 | | #ifndef YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_ |
23 | | #define YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_ |
24 | | |
25 | | #include "yb/rpc/service_if.h" |
26 | | |
27 | | #include "yb/yql/cql/cqlserver/cqlserver_fwd.h" |
28 | | #include "yb/yql/cql/cqlserver/cql_rpc.h" |
29 | | #include "yb/yql/cql/cqlserver/cql_statement.h" |
30 | | |
31 | | #include "yb/yql/cql/ql/ql_processor.h" |
32 | | #include "yb/yql/cql/ql/statement.h" |
33 | | #include "yb/yql/cql/ql/util/cql_message.h" |
34 | | |
35 | | namespace yb { |
36 | | namespace cqlserver { |
37 | | |
38 | | class CQLServiceImpl; |
39 | | |
40 | | class CQLMetrics : public ql::QLMetrics { |
41 | | public: |
42 | | explicit CQLMetrics(const scoped_refptr<yb::MetricEntity>& metric_entity); |
43 | | |
44 | | scoped_refptr<yb::Histogram> time_to_process_request_; |
45 | | scoped_refptr<yb::Histogram> time_to_get_cql_processor_; |
46 | | scoped_refptr<yb::Histogram> time_to_parse_cql_wrapper_; |
47 | | scoped_refptr<yb::Histogram> time_to_execute_cql_request_; |
48 | | |
49 | | scoped_refptr<yb::Histogram> time_to_queue_cql_response_; |
50 | | scoped_refptr<yb::Counter> num_errors_parsing_cql_; |
51 | | // Rpc level metrics |
52 | | yb::rpc::RpcMethodMetrics rpc_method_metrics_; |
53 | | |
54 | | scoped_refptr<AtomicGauge<int64_t>> cql_processors_alive_; |
55 | | scoped_refptr<Counter> cql_processors_created_; |
56 | | |
57 | | scoped_refptr<AtomicGauge<int64_t>> parsers_alive_; |
58 | | scoped_refptr<Counter> parsers_created_; |
59 | | }; |
60 | | |
61 | | // A list of CQL processors and position in the list. |
62 | | |
63 | | class CQLProcessor : public ql::QLProcessor { |
64 | | public: |
65 | | // Constructor and destructor. |
66 | | explicit CQLProcessor(CQLServiceImpl* service_impl, const CQLProcessorListPos& pos); |
67 | | ~CQLProcessor(); |
68 | | |
69 | | // Processing an inbound call. |
70 | | void ProcessCall(rpc::InboundCallPtr call); |
71 | | |
72 | | // Release the processor back to the CQLServiceImpl. |
73 | | void Release(); |
74 | | |
75 | | void Shutdown(); |
76 | | |
77 | | protected: |
78 | | bool NeedReschedule() override; |
79 | | void Reschedule(rpc::ThreadPoolTask* task) override; |
80 | | CoarseTimePoint GetDeadline() const override; |
81 | | |
82 | | private: |
83 | | bool CheckAuthentication(const ql::CQLRequest& req) const; |
84 | | |
85 | | // Process a CQL request. |
86 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::CQLRequest& req); |
87 | | |
88 | | // Process specific CQL requests. |
89 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::OptionsRequest& req); |
90 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::StartupRequest& req); |
91 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::PrepareRequest& req); |
92 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::ExecuteRequest& req); |
93 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::QueryRequest& req); |
94 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::BatchRequest& req); |
95 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::AuthResponseRequest& req); |
96 | | std::unique_ptr<ql::CQLResponse> ProcessRequest(const ql::RegisterRequest& req); |
97 | | |
98 | | // Get a prepared statement and adds it to the set of statements currently being executed. |
99 | | std::shared_ptr<const CQLStatement> GetPreparedStatement(const ql::CQLMessage::QueryId& id); |
100 | | |
101 | | // Statement executed callback. |
102 | | void StatementExecuted(const Status& s, const ql::ExecutedResult::SharedPtr& result = nullptr); |
103 | | |
104 | | // Process statement execution result and error. |
105 | | std::unique_ptr<ql::CQLResponse> ProcessResult(const ql::ExecutedResult::SharedPtr& result); |
106 | | std::unique_ptr<ql::CQLResponse> ProcessAuthResult(const string& saved_hash, bool can_login); |
107 | | std::unique_ptr<ql::CQLResponse> ProcessError( |
108 | | const Status& s, |
109 | | boost::optional<ql::CQLMessage::QueryId> query_id = boost::none); |
110 | | |
111 | | // Send response back to client. |
112 | | void PrepareAndSendResponse(const std::unique_ptr<ql::CQLResponse>& response); |
113 | | void SendResponse(const ql::CQLResponse& response); |
114 | | |
115 | | const std::unordered_map<std::string, std::vector<std::string>> kSupportedOptions = { |
116 | | {ql::CQLMessage::kCQLVersionOption, |
117 | | {"3.0.0" /* minimum */, "3.4.2" /* current */}}, |
118 | | {ql::CQLMessage::kCompressionOption, |
119 | | {ql::CQLMessage::kLZ4Compression, ql::CQLMessage::kSnappyCompression}} |
120 | | }; |
121 | | |
122 | | // Pointer to the containing CQL service implementation. |
123 | | CQLServiceImpl* const service_impl_; |
124 | | |
125 | | // CQL metrics. |
126 | | std::shared_ptr<CQLMetrics> cql_metrics_; |
127 | | |
128 | | // Position in the CQL processor list. |
129 | | const CQLProcessorListPos pos_; |
130 | | |
131 | | //----------------------------- StatementExecuted callback and state --------------------------- |
132 | | |
133 | | // Current call, request, prepared statements and parse trees being processed. |
134 | | CQLInboundCallPtr call_; |
135 | | std::shared_ptr<const ql::CQLRequest> request_; |
136 | | std::unordered_set<std::shared_ptr<const CQLStatement>> stmts_; |
137 | | std::unordered_set<ql::ParseTree::UniPtr> parse_trees_; |
138 | | |
139 | | // Current retry count. |
140 | | int retry_count_ = 0; |
141 | | |
142 | | // Parse and execute begin times. |
143 | | MonoTime parse_begin_; |
144 | | MonoTime execute_begin_; |
145 | | |
146 | | // Statement executed callback. |
147 | | ql::StatementExecutedCallback statement_executed_cb_; |
148 | | |
149 | | ScopedTrackedConsumption consumption_; |
150 | | |
151 | | //---------------------------------------------------------------------------------------------- |
152 | | |
153 | | class ProcessRequestTask : public rpc::ThreadPoolTask { |
154 | | public: |
155 | 7 | ProcessRequestTask& Bind(CQLProcessor* processor) { |
156 | 7 | processor_ = processor; |
157 | 7 | return *this; |
158 | 7 | } |
159 | | |
160 | 0 | virtual ~ProcessRequestTask() {} |
161 | | |
162 | | private: |
163 | 7 | void Run() override { |
164 | 7 | auto processor = processor_; |
165 | 7 | processor_ = nullptr; |
166 | 7 | std::unique_ptr<ql::CQLResponse> response(processor->ProcessRequest(*processor->request_)); |
167 | 7 | if (response != nullptr) { |
168 | 1 | processor->SendResponse(*response); |
169 | 1 | } |
170 | 7 | } |
171 | | |
172 | 7 | void Done(const Status& status) override {} |
173 | | |
174 | | CQLProcessor* processor_ = nullptr; |
175 | | }; |
176 | | |
177 | | friend class ProcessRequestTask; |
178 | | |
179 | | ProcessRequestTask process_request_task_; |
180 | | }; |
181 | | |
182 | | } // namespace cqlserver |
183 | | } // namespace yb |
184 | | |
185 | | #endif // YB_YQL_CQL_CQLSERVER_CQL_PROCESSOR_H_ |