/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/executor.h
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | // Entry point for the execution process. |
16 | | //-------------------------------------------------------------------------------------------------- |
17 | | |
18 | | #ifndef YB_YQL_CQL_QL_EXEC_EXECUTOR_H_ |
19 | | #define YB_YQL_CQL_QL_EXEC_EXECUTOR_H_ |
20 | | |
21 | | #include <mutex> |
22 | | #include <vector> |
23 | | |
24 | | #include <rapidjson/document.h> |
25 | | |
26 | | #include "yb/client/yb_op.h" |
27 | | |
28 | | #include "yb/common/pgsql_protocol.pb.h" |
29 | | #include "yb/common/ql_expr.h" |
30 | | #include "yb/common/ql_type.h" |
31 | | |
32 | | #include "yb/gutil/callback.h" |
33 | | |
34 | | #include "yb/rpc/thread_pool.h" |
35 | | |
36 | | #include "yb/util/memory/mc_types.h" |
37 | | |
38 | | #include "yb/yql/cql/ql/exec/exec_fwd.h" |
39 | | #include "yb/yql/cql/ql/ptree/ptree_fwd.h" |
40 | | #include "yb/yql/cql/ql/ptree/pt_expr_types.h" |
41 | | #include "yb/yql/cql/ql/util/util_fwd.h" |
42 | | #include "yb/yql/cql/ql/util/statement_result.h" |
43 | | |
44 | | namespace yb { |
45 | | |
46 | | namespace client { |
47 | | class YBColumnSpec; |
48 | | } // namespace client |
49 | | |
50 | | namespace ql { |
51 | | |
52 | | namespace audit { |
53 | | |
54 | | class AuditLogger; |
55 | | |
56 | | } |
57 | | |
58 | | class QLMetrics; |
59 | | |
60 | | class Executor : public QLExprExecutor { |
61 | | public: |
62 | | //------------------------------------------------------------------------------------------------ |
63 | | // Public types. |
64 | | typedef std::unique_ptr<Executor> UniPtr; |
65 | | typedef std::unique_ptr<const Executor> UniPtrConst; |
66 | | |
67 | | //------------------------------------------------------------------------------------------------ |
68 | | // Constructor & destructor. |
69 | | Executor(QLEnv *ql_env, audit::AuditLogger* audit_logger, Rescheduler* rescheduler, |
70 | | const QLMetrics* ql_metrics); |
71 | | virtual ~Executor(); |
72 | | |
73 | | // Execute the given statement (parse tree) or batch. The parse trees and the parameters must not |
74 | | // be destroyed until the statements have been executed. |
75 | | void ExecuteAsync(const ParseTree& parse_tree, const StatementParameters& params, |
76 | | StatementExecutedCallback cb); |
77 | | void ExecuteAsync(const StatementBatch& batch, StatementExecutedCallback cb); |
78 | | |
79 | | void Shutdown(); |
80 | | |
81 | | static constexpr int64_t kAsyncCallsIdle = -1; |
82 | | |
83 | | private: |
84 | | class ResetAsyncCalls { |
85 | | public: |
86 | | explicit ResetAsyncCalls(std::atomic<int64_t>* num_async_calls); |
87 | | |
88 | | ResetAsyncCalls(const ResetAsyncCalls&) = delete; |
89 | | void operator=(const ResetAsyncCalls&) = delete; |
90 | | |
91 | | ResetAsyncCalls(ResetAsyncCalls&& rhs); |
92 | | void operator=(ResetAsyncCalls&& rhs); |
93 | | |
94 | 0 | bool empty() const { |
95 | 0 | return num_async_calls_ == nullptr; |
96 | 0 | } |
97 | | |
98 | | void Cancel(); |
99 | | void Perform(); |
100 | | |
101 | | ~ResetAsyncCalls(); |
102 | | |
103 | | private: |
104 | | std::atomic<int64_t>* num_async_calls_; |
105 | | }; |
106 | | |
107 | | ResetAsyncCalls PrepareExecuteAsync(); |
108 | | |
109 | | bool HasAsyncCalls(); |
110 | | |
111 | | //------------------------------------------------------------------------------------------------ |
112 | | // Currently, we don't yet have code generator into byte code, so the following ExecTNode() |
113 | | // functions are operating directly on the parse tree. |
114 | | // Execute a parse tree. |
115 | | CHECKED_STATUS Execute(const ParseTree& parse_tree, const StatementParameters& params); |
116 | | |
117 | | // Run runtime analysis and prepare for execution within the execution context. |
118 | | // Serves for processing things unavailable for initial semantic analysis. |
119 | | CHECKED_STATUS PreExecTreeNode(TreeNode *tnode); |
120 | | |
121 | | CHECKED_STATUS PreExecTreeNode(PTInsertStmt *tnode); |
122 | | |
123 | | CHECKED_STATUS PreExecTreeNode(PTInsertJsonClause *tnode); |
124 | | |
125 | | // Convert JSON value to an expression acording to its given expected type |
126 | | Result<PTExprPtr> ConvertJsonToExpr(const rapidjson::Value& json_value, |
127 | | const QLType::SharedPtr& type, |
128 | | const YBLocationPtr& loc); |
129 | | |
130 | | Result<PTExprPtr> ConvertJsonToExprInner(const rapidjson::Value& json_value, |
131 | | const QLType::SharedPtr& type, |
132 | | const YBLocationPtr& loc); |
133 | | |
134 | | // Execute any TreeNode. This function determines how to execute a node. |
135 | | CHECKED_STATUS ExecTreeNode(const TreeNode *tnode); |
136 | | |
137 | | // Execute a list of statements. |
138 | | CHECKED_STATUS ExecPTNode(const PTListNode *tnode); |
139 | | |
140 | | CHECKED_STATUS GetOffsetOrLimit( |
141 | | const PTSelectStmt* tnode, |
142 | | const std::function<PTExprPtr(const PTSelectStmt* tnode)>& get_val, |
143 | | const string& clause_type, |
144 | | int32_t* value); |
145 | | |
146 | | // Create a table (including index table for CREATE INDEX). |
147 | | CHECKED_STATUS ExecPTNode(const PTCreateTable *tnode); |
148 | | CHECKED_STATUS AddColumnToIndexInfo(IndexInfoPB *index_info, const PTColumnDefinition *column); |
149 | | |
150 | | // Alter a table. |
151 | | CHECKED_STATUS ExecPTNode(const PTAlterTable *tnode); |
152 | | |
153 | | // Drop a table. |
154 | | CHECKED_STATUS ExecPTNode(const PTDropStmt *tnode); |
155 | | |
156 | | // Create a user-defined type. |
157 | | CHECKED_STATUS ExecPTNode(const PTCreateType *tnode); |
158 | | |
159 | | // Creates a role. |
160 | | CHECKED_STATUS ExecPTNode(const PTCreateRole *tnode); |
161 | | |
162 | | // Alter an existing role. |
163 | | CHECKED_STATUS ExecPTNode(const PTAlterRole *tnode); |
164 | | |
165 | | // Grants or revokes a role to another role. |
166 | | CHECKED_STATUS ExecPTNode(const PTGrantRevokeRole* tnode); |
167 | | |
168 | | // Grants or revokes permissions to resources (roles/tables/keyspaces). |
169 | | CHECKED_STATUS ExecPTNode(const PTGrantRevokePermission* tnode); |
170 | | |
171 | | // Select statement. |
172 | | CHECKED_STATUS ExecPTNode(const PTSelectStmt *tnode, TnodeContext* tnode_context); |
173 | | |
174 | | // Insert statement. |
175 | | CHECKED_STATUS ExecPTNode(const PTInsertStmt *tnode, TnodeContext* tnode_context); |
176 | | |
177 | | // Delete statement. |
178 | | CHECKED_STATUS ExecPTNode(const PTDeleteStmt *tnode, TnodeContext* tnode_context); |
179 | | |
180 | | // Update statement. |
181 | | CHECKED_STATUS ExecPTNode(const PTUpdateStmt *tnode, TnodeContext* tnode_context); |
182 | | |
183 | | // Explain statement. |
184 | | CHECKED_STATUS ExecPTNode(const PTExplainStmt *tnode); |
185 | | |
186 | | // Truncate statement. |
187 | | CHECKED_STATUS ExecPTNode(const PTTruncateStmt *tnode); |
188 | | |
189 | | // Start a transaction. |
190 | | CHECKED_STATUS ExecPTNode(const PTStartTransaction *tnode); |
191 | | |
192 | | // Commit a transaction. |
193 | | CHECKED_STATUS ExecPTNode(const PTCommit *tnode); |
194 | | |
195 | | // Create a keyspace. |
196 | | CHECKED_STATUS ExecPTNode(const PTCreateKeyspace *tnode); |
197 | | |
198 | | // Use a keyspace. |
199 | | CHECKED_STATUS ExecPTNode(const PTUseKeyspace *tnode); |
200 | | |
201 | | // Alter a keyspace. |
202 | | CHECKED_STATUS ExecPTNode(const PTAlterKeyspace *tnode); |
203 | | |
204 | | //------------------------------------------------------------------------------------------------ |
205 | | // Result processing. |
206 | | |
207 | | // Returns the YBSession for the statement in execution. |
208 | | client::YBSessionPtr GetSession(ExecContext* exec_context); |
209 | | |
210 | | // Flush operations that have been applied and commit. If there is none, finish the statement |
211 | | // execution. |
212 | | void FlushAsync(ResetAsyncCalls* reset_async_calls); |
213 | | |
214 | | // Callback for FlushAsync. |
215 | | void FlushAsyncDone(client::FlushStatus* s, ExecContext* exec_context = nullptr); |
216 | | |
217 | | // Callback for Commit. |
218 | | void CommitDone(Status s, ExecContext* exec_context); |
219 | | |
220 | | // Process async results from FlushAsync and Commit. |
221 | | void ProcessAsyncResults(bool rescheduled, ResetAsyncCalls* reset_async_calls); |
222 | | |
223 | | // Process async results from FlushAsync and Commit for a tnode. Returns true if there are new ops |
224 | | // being buffered to be flushed. |
225 | | Result<bool> ProcessTnodeResults(TnodeContext* tnode_context); |
226 | | |
227 | | // Process the status of executing a statement. |
228 | | CHECKED_STATUS ProcessStatementStatus(const ParseTree& parse_tree, const Status& s); |
229 | | |
230 | | // Process the read/write op status. |
231 | | CHECKED_STATUS ProcessOpStatus(const PTDmlStmt* stmt, |
232 | | const client::YBqlOpPtr& op, |
233 | | ExecContext* exec_context); |
234 | | |
235 | | std::shared_ptr<client::YBTable> GetTableFromStatement(const TreeNode *tnode) const; |
236 | | |
237 | | // Process status of FlushAsyncDone. |
238 | | using OpErrors = std::unordered_map<const client::YBqlOp*, Status>; |
239 | | CHECKED_STATUS ProcessAsyncStatus(const OpErrors& op_errors, ExecContext* exec_context); |
240 | | |
241 | | // Append rows result. |
242 | | CHECKED_STATUS AppendRowsResult(RowsResult::SharedPtr&& rows_result); |
243 | | |
244 | | // Read paging state from user's StatementParams. |
245 | | Result<QueryPagingState*> LoadPagingStateFromUser(const PTSelectStmt* tnode, |
246 | | TnodeContext* tnode_context); |
247 | | |
248 | | // When request does not need to be executed, create and return empty result (0 row) to users. |
249 | | CHECKED_STATUS GenerateEmptyResult(const PTSelectStmt* tnode); |
250 | | |
251 | | // Continue a multi-partition select (e.g. table scan or query with 'IN' condition on hash cols). |
252 | | Result<bool> FetchMoreRows(const PTSelectStmt* tnode, |
253 | | const client::YBqlReadOpPtr& op, |
254 | | TnodeContext* tnode_context, |
255 | | ExecContext* exec_context); |
256 | | |
257 | | // Fetch rows for a select statement using primary keys selected from an uncovered index. |
258 | | Result<bool> FetchRowsByKeys(const PTSelectStmt* tnode, |
259 | | const client::YBqlReadOpPtr& select_op, |
260 | | const QLRowBlock& keys, |
261 | | TnodeContext* tnode_context); |
262 | | |
263 | | // Aggregate all result sets from all tablet servers to form the requested resultset. |
264 | | CHECKED_STATUS AggregateResultSets(const PTSelectStmt* pt_select, TnodeContext* tnode_context); |
265 | | CHECKED_STATUS EvalCount(const std::shared_ptr<QLRowBlock>& row_block, |
266 | | int column_index, |
267 | | QLValue *ql_value); |
268 | | CHECKED_STATUS EvalMax(const std::shared_ptr<QLRowBlock>& row_block, |
269 | | int column_index, |
270 | | QLValue *ql_value); |
271 | | CHECKED_STATUS EvalMin(const std::shared_ptr<QLRowBlock>& row_block, |
272 | | int column_index, |
273 | | QLValue *ql_value); |
274 | | CHECKED_STATUS EvalSum(const std::shared_ptr<QLRowBlock>& row_block, |
275 | | int column_index, |
276 | | DataType data_type, |
277 | | QLValue *ql_value); |
278 | | CHECKED_STATUS EvalAvg(const std::shared_ptr<QLRowBlock>& row_block, |
279 | | int column_index, |
280 | | DataType data_type, |
281 | | QLValue *ql_value); |
282 | | |
283 | | // Invoke statement executed callback. |
284 | | void StatementExecuted(const Status& s, ResetAsyncCalls* reset_async_calls); |
285 | | |
286 | | // Reset execution state. |
287 | | void Reset(ResetAsyncCalls* reset_async_calls); |
288 | | |
289 | | //------------------------------------------------------------------------------------------------ |
290 | | // Expression evaluation. |
291 | | |
292 | | // CHECKED_STATUS EvalTimeUuidExpr(const PTExpr::SharedPtr& expr, EvalTimeUuidValue *result); |
293 | | // CHECKED_STATUS ConvertFromTimeUuid(EvalValue *result, const EvalTimeUuidValue& uuid_value); |
294 | | CHECKED_STATUS PTExprToPB(const PTExprPtr& expr, QLExpressionPB *expr_pb); |
295 | | |
296 | | // Constant expressions. |
297 | | CHECKED_STATUS PTConstToPB(const PTExprPtr& const_pt, QLValuePB *const_pb, |
298 | | bool negate = false); |
299 | | CHECKED_STATUS PTExprToPB(const PTConstVarInt *const_pt, QLValuePB *const_pb, bool negate); |
300 | | CHECKED_STATUS PTExprToPB(const PTConstDecimal *const_pt, QLValuePB *const_pb, bool negate); |
301 | | CHECKED_STATUS PTExprToPB(const PTConstInt *const_pt, QLValuePB *const_pb, bool negate); |
302 | | CHECKED_STATUS PTExprToPB(const PTConstDouble *const_pt, QLValuePB *const_pb, bool negate); |
303 | | CHECKED_STATUS PTExprToPB(const PTConstText *const_pt, QLValuePB *const_pb); |
304 | | CHECKED_STATUS PTExprToPB(const PTConstBool *const_pt, QLValuePB *const_pb); |
305 | | CHECKED_STATUS PTExprToPB(const PTConstUuid *const_pt, QLValuePB *const_pb); |
306 | | CHECKED_STATUS PTExprToPB(const PTConstBinary *const_pt, QLValuePB *const_pb); |
307 | | |
308 | | // Bind variable. |
309 | | CHECKED_STATUS PTExprToPB(const PTBindVar *bind_pt, QLExpressionPB *bind_pb); |
310 | | |
311 | | // Column types. |
312 | | CHECKED_STATUS PTExprToPB(const PTRef *ref_pt, QLExpressionPB *ref_pb); |
313 | | CHECKED_STATUS PTExprToPB(const PTSubscriptedColumn *ref_pt, QLExpressionPB *ref_pb); |
314 | | CHECKED_STATUS PTExprToPB(const PTJsonColumnWithOperators *ref_pt, QLExpressionPB *ref_pb); |
315 | | CHECKED_STATUS PTExprToPB(const PTAllColumns *ref_all, QLReadRequestPB *req); |
316 | | |
317 | | // Operators. |
318 | | // There's only one, so call it PTUMinus for now. |
319 | | CHECKED_STATUS PTUMinusToPB(const PTOperator1 *op_pt, QLExpressionPB *op_pb); |
320 | | CHECKED_STATUS PTUMinusToPB(const PTOperator1 *op_pt, QLValuePB *const_pb); |
321 | | CHECKED_STATUS PTJsonOperatorToPB(const PTJsonOperatorPtr& json_pt, |
322 | | QLJsonOperationPB *op_pb); |
323 | | |
324 | | // Builtin calls. |
325 | | // Even though BFCall and TSCall are processed similarly in executor at this point because they |
326 | | // have similar protobuf, it is best not to merge the two functions "BFCallToPB" and "TSCallToPB" |
327 | | // into one. That way, coding changes to one case doesn't affect the other in the future. |
328 | | CHECKED_STATUS PTExprToPB(const PTBcall *bcall_pt, QLExpressionPB *bcall_pb); |
329 | | CHECKED_STATUS BFCallToPB(const PTBcall *bcall_pt, QLExpressionPB *expr_pb); |
330 | | CHECKED_STATUS TSCallToPB(const PTBcall *bcall_pt, QLExpressionPB *expr_pb); |
331 | | |
332 | | // Constructors for collection and UDT. |
333 | | CHECKED_STATUS PTExprToPB(const PTCollectionExpr *const_pt, QLValuePB *const_pb); |
334 | | CHECKED_STATUS PTExprToPB(const PTCollectionExpr *expr_pt, QLExpressionPB *expr_pb); |
335 | | |
336 | | // Logic expressions. |
337 | | CHECKED_STATUS PTExprToPB(const PTLogic1 *logic_pt, QLExpressionPB *logic_pb); |
338 | | CHECKED_STATUS PTExprToPB(const PTLogic2 *logic_pt, QLExpressionPB *logic_pb); |
339 | | |
340 | | // Relation expressions. |
341 | | CHECKED_STATUS PTExprToPB(const PTRelation0 *relation_pt, QLExpressionPB *relation_pb); |
342 | | CHECKED_STATUS PTExprToPB(const PTRelation1 *relation_pt, QLExpressionPB *relation_pb); |
343 | | CHECKED_STATUS PTExprToPB(const PTRelation2 *relation_pt, QLExpressionPB *relation_pb); |
344 | | CHECKED_STATUS PTExprToPB(const PTRelation3 *relation_pt, QLExpressionPB *relation_pb); |
345 | | |
346 | | //------------------------------------------------------------------------------------------------ |
347 | | |
348 | | // Set the time to live for the values affected by the current write request. |
349 | | CHECKED_STATUS TtlToPB(const PTDmlStmt *tnode, QLWriteRequestPB *req); |
350 | | |
351 | | // Set the timestamp for the values affected by the current write request. |
352 | | CHECKED_STATUS TimestampToPB(const PTDmlStmt *tnode, QLWriteRequestPB *req); |
353 | | |
354 | | // Convert PTExpr to appropriate QLExpressionPB with appropriate validation. |
355 | | CHECKED_STATUS PTExprToPBValidated(const PTExprPtr& expr, QLExpressionPB *expr_pb); |
356 | | |
357 | | //------------------------------------------------------------------------------------------------ |
358 | | // Column evaluation. |
359 | | |
360 | | // Convert column references to protobuf. |
361 | | CHECKED_STATUS ColumnRefsToPB(const PTDmlStmt *tnode, QLReferencedColumnsPB *columns_pb); |
362 | | |
363 | | // Convert column arguments to protobuf. |
364 | | CHECKED_STATUS ColumnArgsToPB(const PTDmlStmt *tnode, QLWriteRequestPB *req); |
365 | | |
366 | | // Convert INSERT JSON clause to protobuf. |
367 | | CHECKED_STATUS InsertJsonClauseToPB(const PTInsertStmt *insert_stmt, |
368 | | const PTInsertJsonClause *json_clause, |
369 | | QLWriteRequestPB *req); |
370 | | |
371 | | //------------------------------------------------------------------------------------------------ |
372 | | // Where clause evaluation. |
373 | | |
374 | | // Convert where clause to protobuf for read request. |
375 | | Result<uint64_t> WhereClauseToPB(QLReadRequestPB *req, |
376 | | const MCVector<ColumnOp>& key_where_ops, |
377 | | const MCList<ColumnOp>& where_ops, |
378 | | const MCList<SubscriptedColumnOp>& subcol_where_ops, |
379 | | const MCList<JsonColumnOp>& jsoncol_where_ops, |
380 | | const MCList<PartitionKeyOp>& partition_key_ops, |
381 | | const MCList<FuncOp>& func_ops, |
382 | | TnodeContext* tnode_context); |
383 | | |
384 | | // Convert where clause to protobuf for write request. |
385 | | CHECKED_STATUS WhereClauseToPB(QLWriteRequestPB *req, |
386 | | const MCVector<ColumnOp>& key_where_ops, |
387 | | const MCList<ColumnOp>& where_ops, |
388 | | const MCList<SubscriptedColumnOp>& subcol_where_ops); |
389 | | |
390 | | // Set a primary key in a read request. |
391 | | CHECKED_STATUS WhereKeyToPB(QLReadRequestPB *req, const Schema& schema, const QLRow& key); |
392 | | |
393 | | // Convert an expression op in where clause to protobuf. |
394 | | CHECKED_STATUS WhereOpToPB(QLConditionPB *condition, const ColumnOp& col_op); |
395 | | CHECKED_STATUS WhereSubColOpToPB(QLConditionPB *condition, const SubscriptedColumnOp& subcol_op); |
396 | | CHECKED_STATUS WhereJsonColOpToPB(QLConditionPB *condition, const JsonColumnOp& jsoncol_op); |
397 | | CHECKED_STATUS FuncOpToPB(QLConditionPB *condition, const FuncOp& func_op); |
398 | | |
399 | | //------------------------------------------------------------------------------------------------ |
400 | | // Add a read/write operation for the current statement and apply it. For write operation, check |
401 | | // for inter-dependency before applying. If it is a write operation to a table with secondary |
402 | | // indexes, update them as needed. |
403 | | void AddOperation(const client::YBqlReadOpPtr& op, TnodeContext *tnode_context); |
404 | | CHECKED_STATUS AddOperation(const client::YBqlWriteOpPtr& op, TnodeContext *tnode_context); |
405 | | |
406 | | // Is this a batch returning status? |
407 | 28.1M | bool IsReturnsStatusBatch() const { |
408 | 28.1M | return returns_status_batch_opt_ && *returns_status_batch_opt_738k ; |
409 | 28.1M | } |
410 | | |
411 | | //------------------------------------------------------------------------------------------------ |
412 | | CHECKED_STATUS UpdateIndexes(const PTDmlStmt *tnode, |
413 | | QLWriteRequestPB *req, |
414 | | TnodeContext* tnode_context); |
415 | | CHECKED_STATUS AddIndexWriteOps(const PTDmlStmt *tnode, |
416 | | const QLWriteRequestPB& req, |
417 | | TnodeContext* tnode_context); |
418 | | |
419 | 45.9M | int64_t num_async_calls() const { |
420 | 45.9M | return num_async_calls_.load(std::memory_order_acquire); |
421 | 45.9M | } |
422 | | |
423 | | //------------------------------------------------------------------------------------------------ |
424 | | // Helper class to separate inter-dependent write operations. |
425 | | class WriteBatch { |
426 | | public: |
427 | | // Add a write operation. Returns true if it does not depend on another operation in the batch. |
428 | | // Returns false if it does and is not added. In that case, the operation needs to be deferred |
429 | | // until the dependent operation has been applied. |
430 | | bool Add(const client::YBqlWriteOpPtr& op, |
431 | | const TnodeContext* tnode_context, |
432 | | ExecContext* exec_context); |
433 | | |
434 | | // Clear the batch. |
435 | | void Clear(); |
436 | | |
437 | | // Check if the batch is empty. |
438 | | bool Empty() const; |
439 | | |
440 | | private: |
441 | | // Sets of write operations separated by their primary and keys. |
442 | | std::unordered_set<client::YBqlWriteOpPtr, |
443 | | client::YBqlWritePrimaryKeyComparator, |
444 | | client::YBqlWritePrimaryKeyComparator> ops_by_primary_key_; |
445 | | std::unordered_set<client::YBqlWriteOpPtr, |
446 | | client::YBqlWriteHashKeyComparator, |
447 | | client::YBqlWriteHashKeyComparator> ops_by_hash_key_; |
448 | | }; |
449 | | |
450 | | //------------------------------------------------------------------------------------------------ |
451 | | // Environment (YBClient) for executing statements. |
452 | | QLEnv *ql_env_; |
453 | | |
454 | | // Used for logging audit records. |
455 | | audit::AuditLogger& audit_logger_; |
456 | | |
457 | | // A rescheduler to reschedule the current call. |
458 | | Rescheduler* const rescheduler_; |
459 | | |
460 | | // Execution context of the statement currently being executed, and the contexts for all |
461 | | // statements in execution. The contexts are created and destroyed for each execution. |
462 | | ExecContext* exec_context_ = nullptr; |
463 | | std::list<ExecContext> exec_contexts_; |
464 | | |
465 | | // Batch of outstanding write operations that are being applied. |
466 | | WriteBatch write_batch_; |
467 | | |
468 | | // Session to apply non-transactional read/write operations. Transactional read/write operations |
469 | | // are applied using the corresponding transactional session in ExecContext. |
470 | | const client::YBSessionPtr session_; |
471 | | |
472 | | // The number of outstanding async calls pending. 0 means that we are processing result of all |
473 | | // calls, -1 (kAsyncCallsIdle) means that this executor is idle. |
474 | | std::atomic<int64_t> num_async_calls_ = {kAsyncCallsIdle}; |
475 | | |
476 | | // The async error status and the mutex to protect its update. |
477 | | std::mutex status_mutex_; |
478 | | Status async_status_; |
479 | | |
480 | | // The number of FlushAsync called to execute the statements. |
481 | | int64_t num_flushes_ = 0; |
482 | | |
483 | | // Execution result. |
484 | | ExecutedResult::SharedPtr result_; |
485 | | |
486 | | // Statement executed callback. |
487 | | StatementExecutedCallback cb_; |
488 | | |
489 | | // QLMetrics to keep track of node parsing etc. |
490 | | const QLMetrics* ql_metrics_; |
491 | | |
492 | | // Whether this is a batch with statements that returns status. |
493 | | boost::optional<bool> returns_status_batch_opt_; |
494 | | |
495 | | class ExecutorTask : public rpc::ThreadPoolTask { |
496 | | public: |
497 | | ExecutorTask& Bind(Executor* executor, ResetAsyncCalls* reset_async_calls); |
498 | | |
499 | 2 | virtual ~ExecutorTask() = default; |
500 | | |
501 | | private: |
502 | | void Run() override; |
503 | | void Done(const Status& status) override; |
504 | | virtual void DoRun(Executor* executor, ResetAsyncCalls* reset_async_calls) = 0; |
505 | | |
506 | | Executor* executor_ = nullptr; |
507 | | ResetAsyncCalls reset_async_calls_{nullptr}; |
508 | | }; |
509 | | |
510 | | class ProcessAsyncResultsTask : public ExecutorTask { |
511 | | public: |
512 | 9.28M | void DoRun(Executor* executor, ResetAsyncCalls* reset_async_calls) override { |
513 | 9.28M | executor->ProcessAsyncResults(true /* rescheduled */, reset_async_calls); |
514 | 9.28M | } |
515 | | }; |
516 | | |
517 | | friend class ProcessAsyncResultsTask; |
518 | | |
519 | | ProcessAsyncResultsTask process_async_results_task_; |
520 | | |
521 | | class FlushAsyncTask : public ExecutorTask { |
522 | | private: |
523 | 0 | void DoRun(Executor* executor, ResetAsyncCalls* reset_async_calls) override { |
524 | 0 | executor->FlushAsync(reset_async_calls); |
525 | 0 | } |
526 | | }; |
527 | | |
528 | | friend class FlushAsyncTask; |
529 | | |
530 | | FlushAsyncTask flush_async_task_; |
531 | | }; |
532 | | |
533 | | // Normalize the JSON object key according to CQL rules: |
534 | | // Key is made lowercase unless it's double-quoted - in which case double quotes are removed |
535 | | std::string NormalizeJsonKey(const std::string& key); |
536 | | |
537 | | // Create an appropriate QLExpressionPB depending on a column description |
538 | | QLExpressionPB* CreateQLExpression(QLWriteRequestPB *req, const ColumnDesc& col_desc); |
539 | | |
540 | | } // namespace ql |
541 | | } // namespace yb |
542 | | |
543 | | #endif // YB_YQL_CQL_QL_EXEC_EXECUTOR_H_ |