/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/executor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/cql/ql/exec/executor.h" |
17 | | |
18 | | #include "yb/client/callbacks.h" |
19 | | #include "yb/client/client.h" |
20 | | #include "yb/client/error.h" |
21 | | #include "yb/client/rejection_score_source.h" |
22 | | #include "yb/client/table.h" |
23 | | #include "yb/client/table_alterer.h" |
24 | | #include "yb/client/table_creator.h" |
25 | | #include "yb/client/yb_op.h" |
26 | | |
27 | | #include "yb/common/common.pb.h" |
28 | | #include "yb/common/consistent_read_point.h" |
29 | | #include "yb/common/index.h" |
30 | | #include "yb/common/index_column.h" |
31 | | #include "yb/common/ql_protocol_util.h" |
32 | | #include "yb/common/ql_rowblock.h" |
33 | | #include "yb/common/ql_value.h" |
34 | | #include "yb/common/schema.h" |
35 | | #include "yb/common/wire_protocol.h" |
36 | | |
37 | | #include "yb/gutil/casts.h" |
38 | | |
39 | | #include "yb/rpc/thread_pool.h" |
40 | | |
41 | | #include "yb/util/decimal.h" |
42 | | #include "yb/util/metrics.h" |
43 | | #include "yb/util/random_util.h" |
44 | | #include "yb/util/result.h" |
45 | | #include "yb/util/status_format.h" |
46 | | #include "yb/util/trace.h" |
47 | | |
48 | | #include "yb/yql/cql/ql/exec/exec_context.h" |
49 | | #include "yb/yql/cql/ql/ptree/column_desc.h" |
50 | | #include "yb/yql/cql/ql/ptree/parse_tree.h" |
51 | | #include "yb/yql/cql/ql/ptree/pt_alter_keyspace.h" |
52 | | #include "yb/yql/cql/ql/ptree/pt_alter_role.h" |
53 | | #include "yb/yql/cql/ql/ptree/pt_alter_table.h" |
54 | | #include "yb/yql/cql/ql/ptree/pt_column_definition.h" |
55 | | #include "yb/yql/cql/ql/ptree/pt_create_index.h" |
56 | | #include "yb/yql/cql/ql/ptree/pt_create_keyspace.h" |
57 | | #include "yb/yql/cql/ql/ptree/pt_create_role.h" |
58 | | #include "yb/yql/cql/ql/ptree/pt_create_table.h" |
59 | | #include "yb/yql/cql/ql/ptree/pt_create_type.h" |
60 | | #include "yb/yql/cql/ql/ptree/pt_delete.h" |
61 | | #include "yb/yql/cql/ql/ptree/pt_drop.h" |
62 | | #include "yb/yql/cql/ql/ptree/pt_explain.h" |
63 | | #include "yb/yql/cql/ql/ptree/pt_expr.h" |
64 | | #include "yb/yql/cql/ql/ptree/pt_grant_revoke.h" |
65 | | #include "yb/yql/cql/ql/ptree/pt_insert.h" |
66 | | #include "yb/yql/cql/ql/ptree/pt_insert_json_clause.h" |
67 | | #include "yb/yql/cql/ql/ptree/pt_transaction.h" |
68 | | #include "yb/yql/cql/ql/ptree/pt_truncate.h" |
69 | | #include "yb/yql/cql/ql/ptree/pt_update.h" |
70 | | #include "yb/yql/cql/ql/ptree/pt_use_keyspace.h" |
71 | | #include "yb/yql/cql/ql/ql_processor.h" |
72 | | #include "yb/yql/cql/ql/util/errcodes.h" |
73 | | |
74 | | using namespace std::literals; |
75 | | using namespace std::placeholders; |
76 | | |
77 | | namespace yb { |
78 | | namespace ql { |
79 | | |
80 | | using std::string; |
81 | | using std::shared_ptr; |
82 | | |
83 | | using audit::AuditLogger; |
84 | | using audit::IsPrepare; |
85 | | using audit::ErrorIsFormatted; |
86 | | using client::YBColumnSpec; |
87 | | using client::YBOperation; |
88 | | using client::YBqlOpPtr; |
89 | | using client::YBqlReadOp; |
90 | | using client::YBqlReadOpPtr; |
91 | | using client::YBqlWriteOp; |
92 | | using client::YBqlWriteOpPtr; |
93 | | using client::YBSchema; |
94 | | using client::YBSchemaBuilder; |
95 | | using client::YBSessionPtr; |
96 | | using client::YBTableAlterer; |
97 | | using client::YBTableCreator; |
98 | | using client::YBTableName; |
99 | | using client::YBTableType; |
100 | | using strings::Substitute; |
101 | | |
102 | 34.8M | #define RETURN_STMT_NOT_OK(s, reset_async_calls) do { \ |
103 | 1.42k | auto&& _s = (s); \ |
104 | 24.6M | if (PREDICT_FALSE(!_s.ok())) { \ |
105 | 4.42k | return StatementExecuted(MoveStatus(_s), (reset_async_calls)); } \ |
106 | 24.6M | } while (false) |
107 | | |
108 | | //-------------------------------------------------------------------------------------------------- |
109 | | DEFINE_bool(ycql_serial_operation_in_transaction_block, true, |
110 | | "If true, operations within a transaction block must be executed in order, " |
111 | | "at least semantically speaking."); |
112 | | |
113 | | extern ErrorCode QLStatusToErrorCode(QLResponsePB::QLStatus status); |
114 | | |
115 | | Executor::Executor(QLEnv* ql_env, AuditLogger* audit_logger, Rescheduler* rescheduler, |
116 | | const QLMetrics* ql_metrics) |
117 | | : ql_env_(ql_env), |
118 | | audit_logger_(*audit_logger), |
119 | | rescheduler_(rescheduler), |
120 | | session_(ql_env_->NewSession()), |
121 | 17.1k | ql_metrics_(ql_metrics) { |
122 | 17.1k | } |
123 | | |
124 | 1 | Executor::~Executor() { |
125 | 0 | LOG_IF(DFATAL, HasAsyncCalls()) |
126 | 0 | << "Async calls still running: " << num_async_calls(); |
127 | 1 | } |
128 | | |
129 | 0 | void Executor::Shutdown() { |
130 | 0 | int counter = 0; |
131 | 0 | while (HasAsyncCalls()) { |
132 | 0 | if (++counter == 1000) { |
133 | 0 | LOG(DFATAL) << "Too long Executor shutdown: " << num_async_calls(); |
134 | 0 | } |
135 | 0 | std::this_thread::sleep_for(10ms); |
136 | 0 | } |
137 | 0 | } |
138 | | |
139 | | //-------------------------------------------------------------------------------------------------- |
140 | | |
141 | 4.71M | bool Executor::HasAsyncCalls() { |
142 | 4.71M | return num_async_calls() != kAsyncCallsIdle; |
143 | 4.71M | } |
144 | | |
145 | 4.71M | Executor::ResetAsyncCalls Executor::PrepareExecuteAsync() { |
146 | 1.62k | LOG_IF(DFATAL, !cb_.is_null()) << __func__ << " while another execution is in progress."; |
147 | 2.27k | LOG_IF(DFATAL, HasAsyncCalls()) |
148 | 2.27k | << __func__ << " while have " << num_async_calls() << " async calls running"; |
149 | 4.71M | num_async_calls_.store(0, std::memory_order_release); |
150 | 4.71M | return ResetAsyncCalls(&num_async_calls_); |
151 | 4.71M | } |
152 | | |
153 | | void Executor::ExecuteAsync(const ParseTree& parse_tree, const StatementParameters& params, |
154 | 4.71M | StatementExecutedCallback cb) { |
155 | 4.71M | auto reset_async_calls = PrepareExecuteAsync(); |
156 | 4.71M | cb_ = std::move(cb); |
157 | 4.71M | session_->SetDeadline(rescheduler_->GetDeadline()); |
158 | 4.71M | session_->SetForceConsistentRead(client::ForceConsistentRead::kFalse); |
159 | 4.71M | auto read_time = params.read_time(); |
160 | 4.71M | if (read_time) { |
161 | 276 | session_->SetReadPoint(read_time); |
162 | 4.71M | } else { |
163 | 4.71M | session_->SetReadPoint(client::Restart::kFalse); |
164 | 4.71M | } |
165 | 4.71M | RETURN_STMT_NOT_OK(Execute(parse_tree, params), &reset_async_calls); |
166 | | |
167 | 4.71M | FlushAsync(&reset_async_calls); |
168 | 4.71M | } |
169 | | |
170 | 1.65k | void Executor::ExecuteAsync(const StatementBatch& batch, StatementExecutedCallback cb) { |
171 | 1.65k | auto reset_async_calls = PrepareExecuteAsync(); |
172 | | |
173 | 1.65k | cb_ = std::move(cb); |
174 | 1.65k | session_->SetDeadline(rescheduler_->GetDeadline()); |
175 | 1.65k | session_->SetForceConsistentRead(client::ForceConsistentRead::kFalse); |
176 | 1.65k | session_->SetReadPoint(client::Restart::kFalse); |
177 | | |
178 | | // Table for DML batches, where all statements must modify the same table. |
179 | 1.65k | client::YBTablePtr dml_batch_table; |
180 | | |
181 | | // Verify the statements in the batch. |
182 | 389k | for (const auto& pair : batch) { |
183 | 389k | const ParseTree& parse_tree = pair.first; |
184 | 389k | const TreeNode* tnode = parse_tree.root().get(); |
185 | 389k | if (tnode != nullptr) { |
186 | 389k | switch (tnode->opcode()) { |
187 | 389k | case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED; |
188 | 389k | case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED; |
189 | 389k | case TreeNodeOpcode::kPTDeleteStmt: { |
190 | 389k | const auto *stmt = static_cast<const PTDmlStmt *>(tnode); |
191 | 389k | if (stmt->if_clause() != nullptr && !stmt->returns_status()) { |
192 | 1 | return StatementExecuted( |
193 | 1 | ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID, |
194 | 1 | "batch execution of conditional DML statement without RETURNS STATUS " |
195 | 1 | "AS ROW clause is not supported yet"), |
196 | 1 | &reset_async_calls); |
197 | 1 | } |
198 | | |
199 | 389k | if (stmt->ModifiesMultipleRows()) { |
200 | 1 | return StatementExecuted( |
201 | 1 | ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID, |
202 | 1 | "batch execution with DML statements modifying multiple rows is not " |
203 | 1 | "supported yet"), |
204 | 1 | &reset_async_calls); |
205 | 1 | } |
206 | | |
207 | 389k | if (!returns_status_batch_opt_) { |
208 | 1.65k | returns_status_batch_opt_ = stmt->returns_status(); |
209 | 387k | } else if (stmt->returns_status() != *returns_status_batch_opt_) { |
210 | 1 | return StatementExecuted( |
211 | 1 | ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID, |
212 | 1 | "batch execution mixing statements with and without RETURNS STATUS " |
213 | 1 | "AS ROW is not supported"), |
214 | 1 | &reset_async_calls); |
215 | 1 | } |
216 | | |
217 | 389k | if (*returns_status_batch_opt_) { |
218 | 162 | if (dml_batch_table == nullptr) { |
219 | 55 | dml_batch_table = stmt->table(); |
220 | 107 | } else if (dml_batch_table->id() != stmt->table()->id()) { |
221 | 1 | return StatementExecuted( |
222 | 1 | ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID, |
223 | 1 | "batch execution with RETURNS STATUS statements cannot span multiple " |
224 | 1 | "tables"), |
225 | 1 | &reset_async_calls); |
226 | 1 | } |
227 | 389k | } |
228 | | |
229 | 389k | break; |
230 | 389k | } |
231 | 0 | default: |
232 | 0 | return StatementExecuted( |
233 | 0 | ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID, |
234 | 0 | "batch execution supports INSERT, UPDATE and DELETE statements only " |
235 | 0 | "currently"), |
236 | 0 | &reset_async_calls); |
237 | 0 | break; |
238 | 389k | } |
239 | 389k | } |
240 | 389k | } |
241 | | |
242 | 389k | for (const auto& pair : batch) { |
243 | 389k | const ParseTree& parse_tree = pair.first; |
244 | 389k | const StatementParameters& params = pair.second; |
245 | 389k | RETURN_STMT_NOT_OK(Execute(parse_tree, params), &reset_async_calls); |
246 | 389k | } |
247 | | |
248 | 1.65k | RETURN_STMT_NOT_OK(audit_logger_.EndBatchRequest(), &reset_async_calls); |
249 | | |
250 | 1.65k | FlushAsync(&reset_async_calls); |
251 | 1.65k | } |
252 | | |
253 | | //-------------------------------------------------------------------------------------------------- |
254 | | |
255 | 5.10M | Status Executor::Execute(const ParseTree& parse_tree, const StatementParameters& params) { |
256 | | // Prepare execution context and execute the parse tree's root node. |
257 | 5.10M | exec_contexts_.emplace_back(parse_tree, params); |
258 | 5.10M | exec_context_ = &exec_contexts_.back(); |
259 | 5.10M | auto root_node = parse_tree.root().get(); |
260 | 5.10M | RETURN_NOT_OK(PreExecTreeNode(root_node)); |
261 | 5.10M | RETURN_NOT_OK(audit_logger_.LogStatement(root_node, exec_context_->stmt(), |
262 | 5.10M | IsPrepare::kFalse)); |
263 | 5.10M | Status s = ExecTreeNode(root_node); |
264 | 5.10M | if (!s.ok()) { |
265 | 2.27k | RETURN_NOT_OK(audit_logger_.LogStatementError(root_node, exec_context_->stmt(), s, |
266 | 2.27k | ErrorIsFormatted::kFalse)); |
267 | 2.27k | } |
268 | 5.10M | return ProcessStatementStatus(parse_tree, s); |
269 | 5.10M | } |
270 | | |
271 | | //-------------------------------------------------------------------------------------------------- |
272 | | |
273 | 5.08M | Status Executor::PreExecTreeNode(TreeNode *tnode) { |
274 | 5.08M | if (!tnode) { |
275 | 3 | return Status::OK(); |
276 | 5.08M | } else if (tnode->opcode() == TreeNodeOpcode::kPTInsertStmt) { |
277 | 1.16M | return PreExecTreeNode(static_cast<PTInsertStmt*>(tnode)); |
278 | 3.92M | } else { |
279 | 3.92M | return Status::OK(); |
280 | 3.92M | } |
281 | 5.08M | } |
282 | | |
283 | 1.16M | Status Executor::PreExecTreeNode(PTInsertStmt *tnode) { |
284 | 1.16M | if (tnode->InsertingValue()->opcode() == TreeNodeOpcode::kPTInsertJsonClause) { |
285 | | // We couldn't resolve JSON clause bind variable until now |
286 | 80 | return PreExecTreeNode(static_cast<PTInsertJsonClause*>(tnode->InsertingValue().get())); |
287 | 1.16M | } else { |
288 | 1.16M | return Status::OK(); |
289 | 1.16M | } |
290 | 1.16M | } |
291 | | |
292 | 110 | shared_ptr<client::YBTable> Executor::GetTableFromStatement(const TreeNode *tnode) const { |
293 | 110 | if (tnode != nullptr) { |
294 | 110 | switch (tnode->opcode()) { |
295 | 0 | case TreeNodeOpcode::kPTAlterTable: |
296 | 0 | return static_cast<const PTAlterTable *>(tnode)->table(); |
297 | | |
298 | 22 | case TreeNodeOpcode::kPTSelectStmt: |
299 | 22 | return static_cast<const PTSelectStmt *>(tnode)->table(); |
300 | | |
301 | 71 | case TreeNodeOpcode::kPTInsertStmt: |
302 | 71 | return static_cast<const PTInsertStmt *>(tnode)->table(); |
303 | | |
304 | 0 | case TreeNodeOpcode::kPTDeleteStmt: |
305 | 0 | return static_cast<const PTDeleteStmt *>(tnode)->table(); |
306 | | |
307 | 17 | case TreeNodeOpcode::kPTUpdateStmt: |
308 | 17 | return static_cast<const PTUpdateStmt *>(tnode)->table(); |
309 | | |
310 | 0 | case TreeNodeOpcode::kPTExplainStmt: |
311 | 0 | return GetTableFromStatement(static_cast<const PTExplainStmt *>(tnode)->stmt().get()); |
312 | | |
313 | 0 | default: break; |
314 | 0 | } |
315 | 0 | } |
316 | | |
317 | 0 | return nullptr; |
318 | 0 | } |
319 | | |
320 | | //-------------------------------------------------------------------------------------------------- |
321 | | |
322 | 5.37M | Status Executor::ExecTreeNode(const TreeNode *tnode) { |
323 | 5.37M | if (tnode == nullptr) { |
324 | 3 | return Status::OK(); |
325 | 3 | } |
326 | 5.37M | TnodeContext* tnode_context = nullptr; |
327 | 5.37M | if (tnode->opcode() != TreeNodeOpcode::kPTListNode) { |
328 | 5.32M | tnode_context = exec_context_->AddTnode(tnode); |
329 | 5.32M | if (tnode->IsDml() && static_cast<const PTDmlStmt *>(tnode)->RequiresTransaction()) { |
330 | 82.2k | RETURN_NOT_OK(exec_context_->StartTransaction(SNAPSHOT_ISOLATION, ql_env_, rescheduler_)); |
331 | 82.2k | } |
332 | 5.32M | } |
333 | 5.37M | switch (tnode->opcode()) { |
334 | 55.6k | case TreeNodeOpcode::kPTListNode: |
335 | 55.6k | return ExecPTNode(static_cast<const PTListNode *>(tnode)); |
336 | | |
337 | 1.15k | case TreeNodeOpcode::kPTCreateTable: FALLTHROUGH_INTENDED; |
338 | 1.59k | case TreeNodeOpcode::kPTCreateIndex: |
339 | 1.59k | return ExecPTNode(static_cast<const PTCreateTable *>(tnode)); |
340 | | |
341 | 55 | case TreeNodeOpcode::kPTAlterTable: |
342 | 55 | return ExecPTNode(static_cast<const PTAlterTable *>(tnode)); |
343 | | |
344 | 46 | case TreeNodeOpcode::kPTCreateType: |
345 | 46 | return ExecPTNode(static_cast<const PTCreateType *>(tnode)); |
346 | | |
347 | 757 | case TreeNodeOpcode::kPTCreateRole: |
348 | 757 | return ExecPTNode(static_cast<const PTCreateRole *>(tnode)); |
349 | | |
350 | 58 | case TreeNodeOpcode::kPTAlterRole: |
351 | 58 | return ExecPTNode(static_cast<const PTAlterRole *>(tnode)); |
352 | | |
353 | 52 | case TreeNodeOpcode::kPTGrantRevokeRole: |
354 | 52 | return ExecPTNode(static_cast<const PTGrantRevokeRole *>(tnode)); |
355 | | |
356 | 3.48k | case TreeNodeOpcode::kPTDropStmt: |
357 | 3.48k | return ExecPTNode(static_cast<const PTDropStmt *>(tnode)); |
358 | | |
359 | 721 | case TreeNodeOpcode::kPTGrantRevokePermission: |
360 | 721 | return ExecPTNode(static_cast<const PTGrantRevokePermission *>(tnode)); |
361 | | |
362 | 3.90M | case TreeNodeOpcode::kPTSelectStmt: |
363 | 3.90M | return ExecPTNode(static_cast<const PTSelectStmt *>(tnode), tnode_context); |
364 | | |
365 | 1.29M | case TreeNodeOpcode::kPTInsertStmt: |
366 | 1.29M | return ExecPTNode(static_cast<const PTInsertStmt *>(tnode), tnode_context); |
367 | | |
368 | 740 | case TreeNodeOpcode::kPTDeleteStmt: |
369 | 740 | return ExecPTNode(static_cast<const PTDeleteStmt *>(tnode), tnode_context); |
370 | | |
371 | 3.24k | case TreeNodeOpcode::kPTUpdateStmt: |
372 | 3.24k | return ExecPTNode(static_cast<const PTUpdateStmt *>(tnode), tnode_context); |
373 | | |
374 | 55.6k | case TreeNodeOpcode::kPTStartTransaction: |
375 | 55.6k | return ExecPTNode(static_cast<const PTStartTransaction *>(tnode)); |
376 | | |
377 | 55.6k | case TreeNodeOpcode::kPTCommit: |
378 | 55.6k | return ExecPTNode(static_cast<const PTCommit *>(tnode)); |
379 | | |
380 | 3.02k | case TreeNodeOpcode::kPTTruncateStmt: |
381 | 3.02k | return ExecPTNode(static_cast<const PTTruncateStmt *>(tnode)); |
382 | | |
383 | 1.61k | case TreeNodeOpcode::kPTCreateKeyspace: |
384 | 1.61k | return ExecPTNode(static_cast<const PTCreateKeyspace *>(tnode)); |
385 | | |
386 | 4.18k | case TreeNodeOpcode::kPTUseKeyspace: |
387 | 4.18k | return ExecPTNode(static_cast<const PTUseKeyspace *>(tnode)); |
388 | | |
389 | 21 | case TreeNodeOpcode::kPTAlterKeyspace: |
390 | 21 | return ExecPTNode(static_cast<const PTAlterKeyspace *>(tnode)); |
391 | | |
392 | 165 | case TreeNodeOpcode::kPTExplainStmt: |
393 | 165 | return ExecPTNode(static_cast<const PTExplainStmt *>(tnode)); |
394 | | |
395 | 0 | default: |
396 | 0 | return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_SUPPORTED); |
397 | 5.37M | } |
398 | 5.37M | } |
399 | | |
400 | 757 | Status Executor::ExecPTNode(const PTCreateRole *tnode) { |
401 | 757 | const Status s = ql_env_->CreateRole(tnode->role_name(), tnode->salted_hash(), tnode->login(), |
402 | 757 | tnode->superuser()); |
403 | 757 | if (PREDICT_FALSE(!s.ok())) { |
404 | 6 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
405 | 6 | if (s.IsAlreadyPresent()) { |
406 | 0 | error_code = ErrorCode::DUPLICATE_ROLE; |
407 | 6 | } else if (s.IsNotAuthorized()) { |
408 | 6 | error_code = ErrorCode::UNAUTHORIZED; |
409 | 6 | } |
410 | | |
411 | 6 | if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_ROLE) { |
412 | 0 | return Status::OK(); |
413 | 0 | } |
414 | | |
415 | | // TODO (Bristy) : Set result_ properly. |
416 | 6 | return exec_context_->Error(tnode, s, error_code); |
417 | 6 | } |
418 | | |
419 | 751 | return Status::OK(); |
420 | 751 | } |
421 | | |
422 | | //-------------------------------------------------------------------------------------------------- |
423 | | |
424 | 58 | Status Executor::ExecPTNode(const PTAlterRole *tnode) { |
425 | 58 | const Status s = ql_env_->AlterRole(tnode->role_name(), tnode->salted_hash(), tnode->login(), |
426 | 58 | tnode->superuser()); |
427 | 58 | if (PREDICT_FALSE(!s.ok())) { |
428 | 13 | ErrorCode error_code = ErrorCode::ROLE_NOT_FOUND; |
429 | 13 | if (s.IsNotAuthorized()) { |
430 | 10 | error_code = ErrorCode::UNAUTHORIZED; |
431 | 10 | } |
432 | 13 | return exec_context_->Error(tnode, s, error_code); |
433 | 13 | } |
434 | | |
435 | 45 | return Status::OK(); |
436 | 45 | } |
437 | | |
438 | | //-------------------------------------------------------------------------------------------------- |
439 | | |
440 | 52 | Status Executor::ExecPTNode(const PTGrantRevokeRole* tnode) { |
441 | 52 | const Status s = ql_env_->GrantRevokeRole(tnode->statement_type(), tnode->granted_role_name(), |
442 | 52 | tnode->recipient_role_name()); |
443 | 52 | if (PREDICT_FALSE(!s.ok())) { |
444 | 3 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
445 | 3 | if (s.IsInvalidArgument()) { |
446 | 2 | error_code = ErrorCode::INVALID_REQUEST; |
447 | 1 | } else if (s.IsNotFound()) { |
448 | 1 | error_code = ErrorCode::ROLE_NOT_FOUND; |
449 | 1 | } |
450 | | // TODO (Bristy) : Set result_ properly. |
451 | 3 | return exec_context_->Error(tnode, s, error_code); |
452 | 3 | } |
453 | 49 | return Status::OK(); |
454 | 49 | } |
455 | | |
456 | | //-------------------------------------------------------------------------------------------------- |
457 | | |
458 | 55.6k | Status Executor::ExecPTNode(const PTListNode *tnode) { |
459 | 181k | for (TreeNode::SharedPtr dml : tnode->node_list()) { |
460 | 181k | RETURN_NOT_OK(ExecTreeNode(dml.get())); |
461 | 181k | } |
462 | 55.6k | return Status::OK(); |
463 | 55.6k | } |
464 | | |
465 | | //-------------------------------------------------------------------------------------------------- |
466 | | |
467 | 46 | Status Executor::ExecPTNode(const PTCreateType *tnode) { |
468 | 46 | YBTableName yb_name = tnode->yb_type_name(); |
469 | | |
470 | 46 | const std::string& type_name = yb_name.table_name(); |
471 | 46 | std::string keyspace_name = yb_name.namespace_name(); |
472 | | |
473 | 46 | std::vector<std::string> field_names; |
474 | 46 | std::vector<std::shared_ptr<QLType>> field_types; |
475 | | |
476 | 86 | for (const PTTypeField::SharedPtr& field : tnode->fields()->node_list()) { |
477 | 86 | field_names.emplace_back(field->yb_name()); |
478 | 86 | field_types.push_back(field->ql_type()); |
479 | 86 | } |
480 | | |
481 | 46 | Status s = ql_env_->CreateUDType(keyspace_name, type_name, field_names, field_types); |
482 | 46 | if (PREDICT_FALSE(!s.ok())) { |
483 | 1 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
484 | 1 | if (s.IsAlreadyPresent()) { |
485 | 1 | error_code = ErrorCode::DUPLICATE_TYPE; |
486 | 0 | } else if (s.IsNotFound()) { |
487 | 0 | error_code = ErrorCode::KEYSPACE_NOT_FOUND; |
488 | 0 | } else if (s.IsInvalidArgument()) { |
489 | 0 | error_code = ErrorCode::INVALID_TYPE_DEFINITION; |
490 | 0 | } |
491 | | |
492 | 1 | if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_TYPE) { |
493 | 0 | return Status::OK(); |
494 | 0 | } |
495 | | |
496 | 1 | return exec_context_->Error(tnode->type_name(), s, error_code); |
497 | 1 | } |
498 | | |
499 | 45 | result_ = std::make_shared<SchemaChangeResult>("CREATED", "TYPE", keyspace_name, type_name); |
500 | 45 | return Status::OK(); |
501 | 45 | } |
502 | | |
503 | | //-------------------------------------------------------------------------------------------------- |
504 | | |
505 | 1.59k | Status Executor::ExecPTNode(const PTCreateTable *tnode) { |
506 | 1.59k | YBTableName table_name = tnode->yb_table_name(); |
507 | | |
508 | 1.59k | if (table_name.is_system() && client::FLAGS_yb_system_namespace_readonly) { |
509 | 1 | return exec_context_->Error(tnode->table_name(), ErrorCode::SYSTEM_NAMESPACE_READONLY); |
510 | 1 | } |
511 | | |
512 | | // Setting up columns. |
513 | 1.59k | Status s; |
514 | 1.59k | YBSchema schema; |
515 | 1.59k | YBSchemaBuilder b; |
516 | 1.59k | shared_ptr<YBTableCreator> table_creator(ql_env_->NewTableCreator()); |
517 | | // Table properties is kept in the metadata of the IndexTable. |
518 | 1.59k | TableProperties table_properties; |
519 | | // IndexInfo is kept in the metadata of the Table that is being indexed. |
520 | 1.59k | IndexInfoPB *index_info = nullptr; |
521 | | |
522 | | // When creating an index, we construct IndexInfo and associated it with the data-table. Later, |
523 | | // when operating on the data-table, we can decide if updating the index-tables are needed. |
524 | 1.59k | if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) { |
525 | 440 | const PTCreateIndex *index_node = static_cast<const PTCreateIndex*>(tnode); |
526 | | |
527 | 440 | index_info = table_creator->mutable_index_info(); |
528 | 440 | index_info->set_indexed_table_id(index_node->indexed_table_id()); |
529 | 440 | index_info->set_is_local(index_node->is_local()); |
530 | 440 | index_info->set_is_unique(index_node->is_unique()); |
531 | 440 | index_info->set_is_backfill_deferred(index_node->is_backfill_deferred()); |
532 | 440 | index_info->set_hash_column_count(narrow_cast<uint32_t>(tnode->hash_columns().size())); |
533 | 440 | index_info->set_range_column_count(narrow_cast<uint32_t>(tnode->primary_columns().size())); |
534 | 440 | index_info->set_use_mangled_column_name(true); |
535 | | |
536 | | // List key columns of data-table being indexed. |
537 | 2.22k | for (const auto& col_desc : index_node->column_descs()) { |
538 | 2.22k | if (col_desc.is_hash()) { |
539 | 662 | index_info->add_indexed_hash_column_ids(col_desc.id()); |
540 | 1.56k | } else if (col_desc.is_primary()) { |
541 | 501 | index_info->add_indexed_range_column_ids(col_desc.id()); |
542 | 501 | } |
543 | 2.22k | } |
544 | | |
545 | 440 | if (index_node->where_clause()) { |
546 | | // TODO (Piyush): Add a ToString method for PTExpr and log the where clause. |
547 | 129 | IndexInfoPB::WherePredicateSpecPB *where_predicate_spec = |
548 | 129 | index_info->mutable_where_predicate_spec(); |
549 | | |
550 | 129 | RETURN_NOT_OK(PTExprToPB(index_node->where_clause(), |
551 | 129 | where_predicate_spec->mutable_where_expr())); |
552 | | |
553 | 162 | for (auto column_id : *(index_node->where_clause_column_refs())) { |
554 | 162 | where_predicate_spec->add_column_ids(column_id); |
555 | 162 | } |
556 | 129 | } |
557 | 440 | } |
558 | | |
559 | 1.89k | for (const auto& column : tnode->hash_columns()) { |
560 | 1.89k | if (column->sorting_type() != SortingType::kNotSpecified) { |
561 | 0 | return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION); |
562 | 0 | } |
563 | 1.89k | b.AddColumn(column->coldef_name().c_str()) |
564 | 1.89k | ->Type(column->ql_type()) |
565 | 1.89k | ->HashPrimaryKey() |
566 | 1.89k | ->Order(column->order()); |
567 | 1.89k | RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column)); |
568 | 1.89k | } |
569 | | |
570 | 1.80k | for (const auto& column : tnode->primary_columns()) { |
571 | 1.80k | b.AddColumn(column->coldef_name().c_str()) |
572 | 1.80k | ->Type(column->ql_type()) |
573 | 1.80k | ->PrimaryKey() |
574 | 1.80k | ->Order(column->order()) |
575 | 1.80k | ->SetSortingType(column->sorting_type()); |
576 | 1.80k | RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column)); |
577 | 1.80k | } |
578 | | |
579 | 2.26k | for (const auto& column : tnode->columns()) { |
580 | 2.26k | if (column->sorting_type() != SortingType::kNotSpecified) { |
581 | 0 | return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION); |
582 | 0 | } |
583 | 2.26k | YBColumnSpec *column_spec = b.AddColumn(column->coldef_name().c_str()) |
584 | 2.26k | ->Type(column->ql_type()) |
585 | 2.26k | ->Nullable() |
586 | 2.26k | ->Order(column->order()); |
587 | 2.26k | if (column->is_static()) { |
588 | 43 | column_spec->StaticColumn(); |
589 | 43 | } |
590 | 2.26k | if (column->is_counter()) { |
591 | 16 | column_spec->Counter(); |
592 | 16 | } |
593 | 2.26k | RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column)); |
594 | 2.26k | } |
595 | | |
596 | 1.59k | s = tnode->ToTableProperties(&table_properties); |
597 | 1.59k | if (!s.ok()) { |
598 | 0 | return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION); |
599 | 0 | } |
600 | 1.59k | b.SetTableProperties(table_properties); |
601 | | |
602 | 1.59k | s = b.Build(&schema); |
603 | 1.59k | if (PREDICT_FALSE(!s.ok())) { |
604 | 0 | return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION); |
605 | 0 | } |
606 | | |
607 | | // Create table. |
608 | 1.59k | table_creator->table_name(table_name) |
609 | 1.59k | .table_type(YBTableType::YQL_TABLE_TYPE) |
610 | 1.59k | .creator_role_name(ql_env_->CurrentRoleName()) |
611 | 1.59k | .schema(&schema); |
612 | | |
613 | 1.59k | if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) { |
614 | 440 | const PTCreateIndex *index_node = static_cast<const PTCreateIndex*>(tnode); |
615 | 440 | table_creator->indexed_table_id(index_node->indexed_table_id()); |
616 | 440 | table_creator->is_local_index(index_node->is_local()); |
617 | 440 | table_creator->is_unique_index(index_node->is_unique()); |
618 | 440 | table_creator->is_backfill_deferred(index_node->is_backfill_deferred()); |
619 | 440 | } |
620 | | |
621 | | // Clean-up table cache BEFORE op (the cache is used by other processor threads). |
622 | 1.59k | ql_env_->RemoveCachedTableDesc(table_name); |
623 | 1.59k | if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) { |
624 | 440 | const YBTableName indexed_table_name = |
625 | 440 | static_cast<const PTCreateIndex*>(tnode)->indexed_table_name(); |
626 | 440 | ql_env_->RemoveCachedTableDesc(indexed_table_name); |
627 | 440 | } |
628 | | |
629 | 1.59k | s = table_creator->Create(); |
630 | 1.59k | if (PREDICT_FALSE(!s.ok())) { |
631 | 8 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
632 | 8 | if (s.IsAlreadyPresent()) { |
633 | 4 | error_code = ErrorCode::DUPLICATE_OBJECT; |
634 | 4 | } else if (s.IsNotFound()) { |
635 | 3 | error_code = tnode->opcode() == TreeNodeOpcode::kPTCreateIndex |
636 | 1 | ? ErrorCode::OBJECT_NOT_FOUND |
637 | 2 | : ErrorCode::KEYSPACE_NOT_FOUND; |
638 | 1 | } else if (s.IsInvalidArgument()) { |
639 | 0 | error_code = ErrorCode::INVALID_TABLE_DEFINITION; |
640 | 0 | } |
641 | | |
642 | 8 | if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_OBJECT) { |
643 | 1 | return Status::OK(); |
644 | 1 | } |
645 | | |
646 | 7 | return exec_context_->Error(tnode->table_name(), s, error_code); |
647 | 7 | } |
648 | | |
649 | | // Clean-up table cache AFTER op (the cache is used by other processor threads). |
650 | 1.58k | ql_env_->RemoveCachedTableDesc(table_name); |
651 | | |
652 | 1.58k | if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) { |
653 | 434 | const YBTableName indexed_table_name = |
654 | 434 | static_cast<const PTCreateIndex*>(tnode)->indexed_table_name(); |
655 | | // Clean-up table cache AFTER op (the cache is used by other processor threads). |
656 | 434 | ql_env_->RemoveCachedTableDesc(indexed_table_name); |
657 | | |
658 | 434 | result_ = std::make_shared<SchemaChangeResult>( |
659 | 434 | "UPDATED", "TABLE", indexed_table_name.namespace_name(), indexed_table_name.table_name()); |
660 | 1.15k | } else { |
661 | 1.15k | result_ = std::make_shared<SchemaChangeResult>( |
662 | 1.15k | "CREATED", "TABLE", table_name.namespace_name(), table_name.table_name()); |
663 | 1.15k | } |
664 | 1.58k | return Status::OK(); |
665 | 1.58k | } |
666 | | |
667 | 5.96k | Status Executor::AddColumnToIndexInfo(IndexInfoPB *index_info, const PTColumnDefinition *column) { |
668 | | // Associate index-column with data-column. |
669 | 5.96k | if (index_info) { |
670 | | // Note that column_id is assigned by master server, so we don't have it yet. When processing |
671 | | // create index request, server will update IndexInfo with proper column_id. |
672 | 1.72k | auto *col = index_info->add_columns(); |
673 | 1.72k | col->set_column_name(column->coldef_name().c_str()); |
674 | 1.72k | col->set_indexed_column_id(column->indexed_ref()); |
675 | 1.72k | RETURN_NOT_OK(PTExprToPB(column->colexpr(), col->mutable_colexpr())); |
676 | 1.72k | } |
677 | 5.96k | return Status::OK(); |
678 | 5.96k | } |
679 | | |
680 | | //-------------------------------------------------------------------------------------------------- |
681 | | |
682 | 55 | Status Executor::ExecPTNode(const PTAlterTable *tnode) { |
683 | 55 | YBTableName table_name = tnode->yb_table_name(); |
684 | | |
685 | 55 | shared_ptr<YBTableAlterer> table_alterer(ql_env_->NewTableAlterer(table_name)); |
686 | | |
687 | 56 | for (const auto& mod_column : tnode->mod_columns()) { |
688 | 56 | switch (mod_column->mod_type()) { |
689 | 35 | case ALTER_ADD: |
690 | 35 | table_alterer->AddColumn(mod_column->new_name()->data()) |
691 | 35 | ->Type(mod_column->ql_type()); |
692 | 35 | break; |
693 | 7 | case ALTER_DROP: |
694 | 7 | table_alterer->DropColumn(mod_column->old_name()->last_name().data()); |
695 | 7 | break; |
696 | 14 | case ALTER_RENAME: |
697 | 14 | table_alterer->AlterColumn(mod_column->old_name()->last_name().data()) |
698 | 14 | ->RenameTo(mod_column->new_name()->c_str()); |
699 | 14 | break; |
700 | 0 | case ALTER_TYPE: |
701 | | // Not yet supported by AlterTableRequestPB. |
702 | 0 | return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_YET_IMPLEMENTED); |
703 | 56 | } |
704 | 56 | } |
705 | | |
706 | 55 | if (!tnode->mod_props().empty()) { |
707 | 7 | TableProperties table_properties; |
708 | 7 | Status s = tnode->ToTableProperties(&table_properties); |
709 | 7 | if(PREDICT_FALSE(!s.ok())) { |
710 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
711 | 0 | } |
712 | | |
713 | 7 | table_alterer->SetTableProperties(table_properties); |
714 | 7 | } |
715 | | |
716 | | // Clean-up table cache BEFORE op (the cache is used by other processor threads). |
717 | 55 | ql_env_->RemoveCachedTableDesc(table_name); |
718 | | |
719 | 55 | Status s = table_alterer->Alter(); |
720 | 55 | if (PREDICT_FALSE(!s.ok())) { |
721 | 0 | return exec_context_->Error(tnode, s, ErrorCode::EXEC_ERROR); |
722 | 0 | } |
723 | | |
724 | 55 | result_ = std::make_shared<SchemaChangeResult>( |
725 | 55 | "UPDATED", "TABLE", table_name.namespace_name(), table_name.table_name()); |
726 | | |
727 | | // Clean-up table cache AFTER op (the cache is used by other processor threads). |
728 | 55 | ql_env_->RemoveCachedTableDesc(table_name); |
729 | 55 | return Status::OK(); |
730 | 55 | } |
731 | | |
732 | | //-------------------------------------------------------------------------------------------------- |
733 | | |
734 | 3.48k | Status Executor::ExecPTNode(const PTDropStmt *tnode) { |
735 | 3.48k | Status s; |
736 | 3.48k | ErrorCode error_not_found = ErrorCode::SERVER_ERROR; |
737 | | |
738 | 3.48k | switch (tnode->drop_type()) { |
739 | 1.10k | case ObjectType::TABLE: { |
740 | | // Drop the table. |
741 | 1.10k | const YBTableName table_name = tnode->yb_table_name(); |
742 | | // Clean-up table cache BEFORE op (the cache is used by other processor threads). |
743 | 1.10k | ql_env_->RemoveCachedTableDesc(table_name); |
744 | | |
745 | 1.10k | s = ql_env_->DeleteTable(table_name); |
746 | 1.10k | error_not_found = ErrorCode::OBJECT_NOT_FOUND; |
747 | 1.10k | result_ = std::make_shared<SchemaChangeResult>( |
748 | 1.10k | "DROPPED", "TABLE", table_name.namespace_name(), table_name.table_name()); |
749 | | |
750 | | // Clean-up table cache AFTER op (the cache is used by other processor threads). |
751 | 1.10k | ql_env_->RemoveCachedTableDesc(table_name); |
752 | 1.10k | break; |
753 | 0 | } |
754 | | |
755 | 105 | case ObjectType::INDEX: { |
756 | | // Drop the index. |
757 | 105 | const YBTableName table_name = tnode->yb_table_name(); |
758 | | // Clean-up table cache BEFORE op (the cache is used by other processor threads). |
759 | 105 | ql_env_->RemoveCachedTableDesc(table_name); |
760 | | |
761 | 105 | YBTableName indexed_table_name; |
762 | 105 | s = ql_env_->DeleteIndexTable(table_name, &indexed_table_name); |
763 | 105 | error_not_found = ErrorCode::OBJECT_NOT_FOUND; |
764 | 105 | result_ = std::make_shared<SchemaChangeResult>( |
765 | 105 | "UPDATED", "TABLE", indexed_table_name.namespace_name(), indexed_table_name.table_name()); |
766 | | |
767 | | // Clean-up table cache AFTER op (the cache is used by other processor threads). |
768 | 105 | ql_env_->RemoveCachedTableDesc(table_name); |
769 | 105 | ql_env_->RemoveCachedTableDesc(indexed_table_name); |
770 | 105 | break; |
771 | 0 | } |
772 | | |
773 | 1.49k | case ObjectType::SCHEMA: { |
774 | | // Drop the keyspace. |
775 | 1.49k | const string keyspace_name(tnode->name()->last_name().c_str()); |
776 | 1.49k | s = ql_env_->DeleteKeyspace(keyspace_name); |
777 | 1.49k | error_not_found = ErrorCode::KEYSPACE_NOT_FOUND; |
778 | 1.49k | result_ = std::make_shared<SchemaChangeResult>("DROPPED", "KEYSPACE", keyspace_name); |
779 | 1.49k | break; |
780 | 0 | } |
781 | | |
782 | 53 | case ObjectType::TYPE: { |
783 | | // Drop the type. |
784 | 53 | const string type_name(tnode->name()->last_name().c_str()); |
785 | 53 | const string namespace_name(tnode->name()->first_name().c_str()); |
786 | 53 | s = ql_env_->DeleteUDType(namespace_name, type_name); |
787 | 53 | error_not_found = ErrorCode::TYPE_NOT_FOUND; |
788 | 53 | result_ = std::make_shared<SchemaChangeResult>("DROPPED", "TYPE", namespace_name, type_name); |
789 | 53 | ql_env_->RemoveCachedUDType(namespace_name, type_name); |
790 | 53 | break; |
791 | 0 | } |
792 | | |
793 | 730 | case ObjectType::ROLE: { |
794 | | // Drop the role. |
795 | 730 | const string role_name(tnode->name()->QLName()); |
796 | 730 | s = ql_env_->DeleteRole(role_name); |
797 | 730 | error_not_found = ErrorCode::ROLE_NOT_FOUND; |
798 | | // TODO (Bristy) : Set result_ properly. |
799 | 730 | break; |
800 | 0 | } |
801 | | |
802 | 0 | default: |
803 | 0 | return exec_context_->Error(tnode->name(), ErrorCode::FEATURE_NOT_SUPPORTED); |
804 | 3.48k | } |
805 | | |
806 | 3.48k | if (PREDICT_FALSE(!s.ok())) { |
807 | 33 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
808 | | |
809 | 33 | if (s.IsNotFound()) { |
810 | | // Ignore not found error for a DROP IF EXISTS statement. |
811 | 20 | if (tnode->drop_if_exists()) { |
812 | 5 | return Status::OK(); |
813 | 5 | } |
814 | | |
815 | 15 | error_code = error_not_found; |
816 | 13 | } else if (s.IsNotAuthorized()) { |
817 | 3 | error_code = ErrorCode::UNAUTHORIZED; |
818 | 10 | } else if(s.IsQLError()) { |
819 | 6 | error_code = ErrorCode::INVALID_REQUEST; |
820 | 6 | } |
821 | | |
822 | 28 | return exec_context_->Error(tnode->name(), s, error_code); |
823 | 3.45k | } |
824 | | |
825 | 3.45k | return Status::OK(); |
826 | 3.45k | } |
827 | | |
828 | 721 | Status Executor::ExecPTNode(const PTGrantRevokePermission* tnode) { |
829 | 721 | const string role_name = tnode->role_name()->QLName(); |
830 | 721 | const string canonical_resource = tnode->canonical_resource(); |
831 | 721 | const char* resource_name = tnode->resource_name(); |
832 | 721 | const char* namespace_name = tnode->namespace_name(); |
833 | 721 | ResourceType resource_type = tnode->resource_type(); |
834 | 721 | PermissionType permission = tnode->permission(); |
835 | 721 | const auto statement_type = tnode->statement_type(); |
836 | | |
837 | 721 | Status s = ql_env_->GrantRevokePermission(statement_type, permission, resource_type, |
838 | 721 | canonical_resource, resource_name, namespace_name, |
839 | 721 | role_name); |
840 | | |
841 | 721 | if (PREDICT_FALSE(!s.ok())) { |
842 | 0 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
843 | 0 | if (s.IsInvalidArgument()) { |
844 | 0 | error_code = ErrorCode::INVALID_ARGUMENTS; |
845 | 0 | } |
846 | 0 | if (s.IsNotFound()) { |
847 | 0 | error_code = ErrorCode::RESOURCE_NOT_FOUND; |
848 | 0 | } |
849 | 0 | return exec_context_->Error(tnode, s, error_code); |
850 | 0 | } |
851 | | // TODO (Bristy) : Return proper result. |
852 | 721 | return Status::OK(); |
853 | 721 | } |
854 | | |
855 | | Status Executor::GetOffsetOrLimit( |
856 | | const PTSelectStmt* tnode, |
857 | | const std::function<PTExprPtr(const PTSelectStmt* tnode)>& get_val, |
858 | | const string& clause_type, |
859 | 3.62M | int32_t* value) { |
860 | 3.62M | QLExpressionPB expr_pb; |
861 | 3.62M | Status s = (PTExprToPB(get_val(tnode), &expr_pb)); |
862 | 3.62M | if (PREDICT_FALSE(!s.ok())) { |
863 | 1 | return exec_context_->Error(get_val(tnode), s, ErrorCode::INVALID_ARGUMENTS); |
864 | 1 | } |
865 | | |
866 | 3.62M | if (expr_pb.has_value() && IsNull(expr_pb.value())) { |
867 | 2 | return exec_context_->Error(get_val(tnode), |
868 | 2 | Substitute("$0 value cannot be null.", clause_type).c_str(), |
869 | 2 | ErrorCode::INVALID_ARGUMENTS); |
870 | 2 | } |
871 | | |
872 | | // This should be ensured by checks before getting here. |
873 | 13.2k | DCHECK(expr_pb.has_value() && expr_pb.value().has_int32_value()) |
874 | 13.2k | << "Integer constant expected for " + clause_type + " clause"; |
875 | | |
876 | 3.62M | if (expr_pb.value().int32_value() < 0) { |
877 | 0 | return exec_context_->Error(get_val(tnode), |
878 | 0 | Substitute("$0 value cannot be negative.", clause_type).c_str(), |
879 | 0 | ErrorCode::INVALID_ARGUMENTS); |
880 | 0 | } |
881 | 3.62M | *value = expr_pb.value().int32_value(); |
882 | 3.62M | return Status::OK(); |
883 | 3.62M | } |
884 | | |
885 | | //-------------------------------------------------------------------------------------------------- |
886 | | |
887 | | // NOTE: This function is being called recursively. |
888 | | // - The paging-state is loaded to the context only on the first call (Call from user) |
889 | | // - Similarly, all code in this function must work for both cases - calls by users and recursive |
890 | | // calls within the same process. These two different cases can be cleaned up later to avoid |
891 | | // confusion. |
892 | 3.90M | Status Executor::ExecPTNode(const PTSelectStmt *tnode, TnodeContext* tnode_context) { |
893 | 3.90M | const shared_ptr<client::YBTable>& table = tnode->table(); |
894 | 3.90M | if (table == nullptr) { |
895 | | // If this is a request for 'system.peers_v2' table make sure that we send the appropriate error |
896 | | // so that the client driver can query the proper peers table i.e. 'system.peers' based on the |
897 | | // error. |
898 | 2.09k | if (tnode->is_system() && |
899 | 2.09k | tnode->table_name().table_name() == "peers_v2" && |
900 | 2.09k | tnode->table_name().namespace_name() == "system") { |
901 | 2.09k | string error_msg = "Unknown keyspace/cf pair (system.peers_v2)"; |
902 | 2.09k | return exec_context_->Error(tnode, error_msg, ErrorCode::SERVER_ERROR); |
903 | 2.09k | } |
904 | | |
905 | | // If this is a system table but the table does not exist, it is okay. Just return OK with void |
906 | | // result. |
907 | 2 | return tnode->is_system() ? Status::OK() |
908 | 0 | : exec_context_->Error(tnode, ErrorCode::OBJECT_NOT_FOUND); |
909 | 2 | } |
910 | | |
911 | | // If there is a table id in the statement parameter's paging state, this is a continuation of a |
912 | | // prior SELECT statement. Verify that the same table/index still exists and matches the table id |
913 | | // for query without index, or the index id in the leaf node (where child_select is null also). |
914 | 3.90M | const StatementParameters& params = exec_context_->params(); |
915 | 3.90M | const bool continue_user_request = !tnode->child_select() && !params.table_id().empty(); |
916 | 3.90M | if (continue_user_request && params.table_id() != table->id()) { |
917 | 2 | return exec_context_->Error(tnode, "Object no longer exists.", ErrorCode::OBJECT_NOT_FOUND); |
918 | 2 | } |
919 | | |
920 | | // Read the paging state from user input "params". |
921 | 3.90M | QueryPagingState *query_state = VERIFY_RESULT(LoadPagingStateFromUser(tnode, tnode_context)); |
922 | 3.90M | if (query_state->reached_select_limit()) { |
923 | | // Return the result without executing the node. |
924 | 2 | return result_ != nullptr ? Status::OK() : GenerateEmptyResult(tnode); |
925 | 2 | } |
926 | | |
927 | | // If there is an index to select from, execute it. |
928 | 3.90M | if (tnode->child_select()) { |
929 | 0 | LOG_IF(DFATAL, result_) << "Expecting result is not yet initialized"; |
930 | 1.95k | const PTSelectStmt* child_select = tnode->child_select().get(); |
931 | 1.95k | TnodeContext* child_context = tnode_context->AddChildTnode(child_select); |
932 | 1.95k | RETURN_NOT_OK(ExecPTNode(child_select, child_context)); |
933 | | // If the index covers the SELECT query fully, we are done. Otherwise, continue to prepare |
934 | | // the SELECT from the table using the primary key to be returned from the index select. |
935 | 1.95k | if (child_select->covers_fully()) { |
936 | 1.55k | return Status::OK(); |
937 | 1.55k | } |
938 | | // If the child uncovered index select has set result_ already it must have been able |
939 | | // to guarantee an empty result (i.e. if WHERE clause guarantees no rows could match) |
940 | | // so we can just return. |
941 | 404 | if (result_) { |
942 | 0 | LOG_IF(DFATAL, result_->type() != ExecutedResult::Type::ROWS) |
943 | 0 | << "Expecting result type is ROWS=" << static_cast<int>(ExecutedResult::Type::ROWS) |
944 | 0 | << ", got result type=" << static_cast<int>(result_->type()); |
945 | 0 | auto rows_result = std::static_pointer_cast<RowsResult>(result_); |
946 | 0 | RSTATUS_DCHECK(rows_result->paging_state().empty(), |
947 | 0 | Corruption, "Expecting result_ to be empty with empty paging state"); |
948 | 0 | RSTATUS_DCHECK(rows_result->rows_data() == string(4, '\0'), // Encoded row_count == 0. |
949 | 0 | Corruption, "Expecting result_ to be empty with result row_count equals 0"); |
950 | 0 | return Status::OK(); |
951 | 0 | } |
952 | 3.90M | } |
953 | | |
954 | | // Create the read request. |
955 | 3.90M | YBqlReadOpPtr select_op(table->NewQLSelect()); |
956 | 3.90M | QLReadRequestPB *req = select_op->mutable_request(); |
957 | | |
958 | | // Where clause - Hash, range, and regular columns. |
959 | 3.90M | req->set_is_aggregate(tnode->is_aggregate()); |
960 | 3.90M | Result<uint64_t> max_rows_estimate = WhereClauseToPB(req, tnode->key_where_ops(), |
961 | 3.90M | tnode->where_ops(), |
962 | 3.90M | tnode->subscripted_col_where_ops(), |
963 | 3.90M | tnode->json_col_where_ops(), |
964 | 3.90M | tnode->partition_key_ops(), |
965 | 3.90M | tnode->func_ops(), |
966 | 3.90M | tnode_context); |
967 | 3.90M | if (PREDICT_FALSE(!max_rows_estimate)) { |
968 | 12 | return exec_context_->Error(tnode, max_rows_estimate.status(), ErrorCode::INVALID_ARGUMENTS); |
969 | 12 | } |
970 | | |
971 | | // If where clause restrictions guarantee no rows could match, return empty result immediately. |
972 | 3.90M | if (*max_rows_estimate == 0 && !tnode->is_aggregate()) { |
973 | 0 | return GenerateEmptyResult(tnode); |
974 | 0 | } |
975 | | |
976 | 3.90M | req->set_is_forward_scan(tnode->is_forward_scan()); |
977 | | |
978 | | // Specify selected list by adding the expressions to selected_exprs in read request. |
979 | 3.90M | QLRSRowDescPB *rsrow_desc_pb = req->mutable_rsrow_desc(); |
980 | 4.14M | for (const auto& expr : tnode->selected_exprs()) { |
981 | 4.14M | if (expr->opcode() == TreeNodeOpcode::kPTAllColumns) { |
982 | 3.73M | const Status s = PTExprToPB(static_cast<const PTAllColumns*>(expr.get()), req); |
983 | 3.73M | if (PREDICT_FALSE(!s.ok())) { |
984 | 0 | return exec_context_->Error(expr, s, ErrorCode::INVALID_ARGUMENTS); |
985 | 0 | } |
986 | 412k | } else { |
987 | 412k | const Status s = PTExprToPB(expr, req->add_selected_exprs()); |
988 | 412k | if (PREDICT_FALSE(!s.ok())) { |
989 | 7 | return exec_context_->Error(expr, s, ErrorCode::INVALID_ARGUMENTS); |
990 | 7 | } |
991 | | |
992 | | // Add the expression metadata (rsrow descriptor). |
993 | 412k | QLRSColDescPB *rscol_desc_pb = rsrow_desc_pb->add_rscol_descs(); |
994 | 412k | rscol_desc_pb->set_name(expr->QLName()); |
995 | 412k | expr->rscol_type_PB(rscol_desc_pb->mutable_ql_type()); |
996 | 412k | } |
997 | 4.14M | } |
998 | | |
999 | | // Setup the column values that need to be read. |
1000 | 3.90M | Status s = ColumnRefsToPB(tnode, req->mutable_column_refs()); |
1001 | 3.90M | if (PREDICT_FALSE(!s.ok())) { |
1002 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1003 | 0 | } |
1004 | | |
1005 | | // Set the IF clause. |
1006 | 3.90M | if (tnode->if_clause() != nullptr) { |
1007 | 112 | s = PTExprToPB(tnode->if_clause(), select_op->mutable_request()->mutable_if_expr()); |
1008 | 112 | if (PREDICT_FALSE(!s.ok())) { |
1009 | 0 | return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS); |
1010 | 0 | } |
1011 | 3.90M | } |
1012 | | |
1013 | | // Specify distinct columns or non. |
1014 | 3.90M | req->set_distinct(tnode->distinct()); |
1015 | | |
1016 | | // Default row count limit is the page size. |
1017 | | // We should return paging state when page size limit is hit. |
1018 | | // For system tables, we do not support page size so do nothing. |
1019 | 3.90M | if (!tnode->is_system()) { |
1020 | 3.63M | req->set_limit(params.page_size()); |
1021 | 3.63M | req->set_return_paging_state(true); |
1022 | 3.63M | } |
1023 | | |
1024 | 3.90M | if (!tnode->child_select()) { |
1025 | | // DocDB will do LIMIT and OFFSET computation for this query. |
1026 | 3.88M | if (tnode->limit()) { |
1027 | | // Setup request to DocDB according to the given LIMIT. |
1028 | 3.60M | size_t user_limit = query_state->select_limit() - query_state->read_count(); |
1029 | 3.61M | if (!req->has_limit() || user_limit <= req->limit()) { |
1030 | | // Set limit and instruct DocDB to clear paging state if limit is reached. |
1031 | 3.61M | req->set_limit(user_limit); |
1032 | 3.61M | req->set_return_paging_state(false); |
1033 | 3.61M | } |
1034 | 3.60M | } |
1035 | | |
1036 | 3.88M | if (tnode->offset()) { |
1037 | | // Setup request to DocDB according to the given OFFSET. |
1038 | 335 | auto user_offset = query_state->select_offset() - query_state->skip_count(); |
1039 | 335 | req->set_offset(user_offset); |
1040 | 335 | req->set_return_paging_state(true); |
1041 | 335 | } |
1042 | 3.88M | } |
1043 | | |
1044 | | // If this is a continuation of a prior user's request, set the next partition key, row key, |
1045 | | // and total number of rows read in the request's paging state. |
1046 | 3.90M | if (continue_user_request) { |
1047 | 279 | QLPagingStatePB *paging_state = req->mutable_paging_state(); |
1048 | | |
1049 | 279 | paging_state->set_next_partition_key(query_state->next_partition_key()); |
1050 | 279 | paging_state->set_next_row_key(query_state->next_row_key()); |
1051 | 279 | paging_state->set_total_num_rows_read(query_state->total_num_rows_read()); |
1052 | 279 | paging_state->set_total_rows_skipped(query_state->total_rows_skipped()); |
1053 | 279 | } |
1054 | | |
1055 | | // Set the consistency level for the operation. Always use strong consistency for system tables. |
1056 | 259k | select_op->set_yb_consistency_level(tnode->is_system() ? YBConsistencyLevel::STRONG |
1057 | 3.64M | : params.yb_consistency_level()); |
1058 | | |
1059 | | // Save the hash_code and max_hash_code limits computed from the request's partition_key_ops in |
1060 | | // WhereClauseToPB(). These will be used later to be set in the request protobuf in |
1061 | | // AdvanceToNextPartition() for multi-partition selects. The limits need to be saved now because |
1062 | | // before AdvanceToNextPartition(), `Batcher::DoAdd()` of the current request might reuse and |
1063 | | // mutate these limits to set tighter limits specific to the current partition. Then when |
1064 | | // continuing to the next partition we cannot use the partition-specific limits, nor clear them |
1065 | | // completely and lose the top-level limits. |
1066 | | |
1067 | | // Example: For `SELECT ... WHERE token(h) > 10 and token(h) < 100 and h IN (1,7,12)`, the `token` |
1068 | | // conditions set the top-level hash code limits, and `h` sets the individual partitions to read. |
1069 | | // The top-level hash code limits are needed because if the hash code for any of the partitions |
1070 | | // falls outside the `token`-set range we need to return no results for that partition. |
1071 | | |
1072 | 3.90M | if (req->has_hash_code()) |
1073 | 33 | tnode_context->set_hash_code_from_partition_key_ops(req->hash_code()); |
1074 | 3.90M | if (req->has_max_hash_code()) |
1075 | 23 | tnode_context->set_max_hash_code_from_partition_key_ops(req->max_hash_code()); |
1076 | | |
1077 | | // If we have several hash partitions (i.e. IN condition on hash columns) we initialize the |
1078 | | // start partition here, and then iteratively scan the rest in FetchMoreRows. |
1079 | | // Otherwise, the request will already have the right hashed column values set. |
1080 | 3.90M | if (tnode_context->UnreadPartitionsRemaining() > 0) { |
1081 | 1.07k | tnode_context->InitializePartition(select_op->mutable_request(), continue_user_request); |
1082 | | |
1083 | | // We can optimize to run the ops in parallel (rather than serially) if: |
1084 | | // - the estimated max number of rows is less than req limit (min of page size and CQL limit). |
1085 | | // - there is no offset (which requires passing skipped rows from one request to the next). |
1086 | 1.07k | if (*max_rows_estimate <= req->limit() && !req->has_offset()) { |
1087 | 988 | AddOperation(select_op, tnode_context); |
1088 | 6.29k | while (tnode_context->UnreadPartitionsRemaining() > 1) { |
1089 | 5.30k | YBqlReadOpPtr op(table->NewQLSelect()); |
1090 | 5.30k | op->mutable_request()->CopyFrom(select_op->request()); |
1091 | 5.30k | op->set_yb_consistency_level(select_op->yb_consistency_level()); |
1092 | 5.30k | tnode_context->AdvanceToNextPartition(op->mutable_request()); |
1093 | 5.30k | AddOperation(op, tnode_context); |
1094 | 5.30k | select_op = op; // Use new op as base for the next one, if any. |
1095 | 5.30k | } |
1096 | 988 | return Status::OK(); |
1097 | 988 | } |
1098 | 3.89M | } |
1099 | | |
1100 | | // If this select statement uses an uncovered index underneath, save this op as a template to |
1101 | | // read from the table once the primary keys are returned from the uncovered index. The paging |
1102 | | // state should be used by the underlying select from the index only which decides where to |
1103 | | // continue the select from the index. |
1104 | 3.89M | if (tnode->child_select() && !tnode->child_select()->covers_fully()) { |
1105 | 404 | req->clear_return_paging_state(); |
1106 | 404 | tnode_context->SetUncoveredSelectOp(select_op); |
1107 | 404 | result_ = std::make_shared<RowsResult>(select_op.get()); |
1108 | 404 | return Status::OK(); |
1109 | 404 | } |
1110 | | |
1111 | | // Add the operation. |
1112 | 3.89M | AddOperation(select_op, tnode_context); |
1113 | 3.89M | return Status::OK(); |
1114 | 3.89M | } |
1115 | | |
1116 | | Result<QueryPagingState*> Executor::LoadPagingStateFromUser(const PTSelectStmt* tnode, |
1117 | 3.89M | TnodeContext* tnode_context) { |
1118 | 3.89M | QueryPagingState *query_state = tnode_context->query_state(); |
1119 | 3.89M | if (query_state) { |
1120 | | // If select_state is already set, use it. |
1121 | 0 | if (tnode->limit()) { |
1122 | | // Need to compute the maximum number of rows to fetch for this user's request. |
1123 | 0 | query_state->AdjustMaxFetchSizeToSelectLimit(); |
1124 | 0 | } |
1125 | 0 | return query_state; |
1126 | 0 | } |
1127 | | |
1128 | | // Create query_state for this execution. |
1129 | | // - Only top-level-select node should have the row counter. |
1130 | | // - User do not care about row counter of an inner or nested query. |
1131 | 3.89M | const StatementParameters& params = exec_context_->params(); |
1132 | 3.89M | query_state = tnode_context->CreateQueryState(params, tnode->IsTopLevelReadNode()); |
1133 | 3.89M | if (tnode->limit()) { |
1134 | 3.61M | RSTATUS_DCHECK(tnode->IsTopLevelReadNode(), Corruption, |
1135 | 3.61M | "LIMIT clause cannot be applied to nested SELECT"); |
1136 | 3.61M | if (!query_state->has_select_limit()) { |
1137 | 3.60M | int32_t limit; |
1138 | 3.60M | RETURN_NOT_OK(GetOffsetOrLimit( |
1139 | 3.60M | tnode, |
1140 | 3.60M | [](const PTSelectStmt* tnode) -> PTExpr::SharedPtr { return tnode->limit(); }, |
1141 | 3.60M | "LIMIT", &limit)); |
1142 | 3.60M | query_state->set_select_limit(limit); |
1143 | 3.60M | } |
1144 | | |
1145 | 3.61M | query_state->AdjustMaxFetchSizeToSelectLimit(); |
1146 | 3.61M | } |
1147 | | |
1148 | 3.89M | if (tnode->offset()) { |
1149 | 393 | RSTATUS_DCHECK(tnode->IsTopLevelReadNode(), Corruption, |
1150 | 393 | "OFFSET clause cannot be applied to nested SELECT"); |
1151 | 393 | if (!query_state->has_select_offset()) { |
1152 | 251 | int32_t offset; |
1153 | 251 | RETURN_NOT_OK(GetOffsetOrLimit( |
1154 | 251 | tnode, |
1155 | 251 | [](const PTSelectStmt *tnode) -> PTExpr::SharedPtr { return tnode->offset(); }, |
1156 | 251 | "OFFSET", &offset)); |
1157 | 249 | query_state->set_select_offset(offset); |
1158 | 249 | } |
1159 | 393 | } |
1160 | | |
1161 | 3.89M | return query_state; |
1162 | 3.89M | } |
1163 | | |
1164 | 2 | Status Executor::GenerateEmptyResult(const PTSelectStmt* tnode) { |
1165 | 2 | YBqlReadOpPtr select_op(tnode->table()->NewQLSelect()); |
1166 | 2 | QLRowBlock empty_row_block(tnode->table()->InternalSchema(), {}); |
1167 | 2 | faststring buffer; |
1168 | 2 | empty_row_block.Serialize(select_op->request().client(), &buffer); |
1169 | 2 | *select_op->mutable_rows_data() = buffer.ToString(); |
1170 | 2 | result_ = std::make_shared<RowsResult>(select_op.get()); |
1171 | | |
1172 | 2 | return Status::OK(); |
1173 | 2 | } |
1174 | | |
1175 | | Result<bool> Executor::FetchMoreRows(const PTSelectStmt* tnode, |
1176 | | const YBqlReadOpPtr& op, |
1177 | | TnodeContext* tnode_context, |
1178 | 3.92M | ExecContext* exec_context) { |
1179 | 3.92M | if (!tnode_context->rows_result()) { |
1180 | 0 | return STATUS(InternalError, "Missing result for SELECT operation"); |
1181 | 0 | } |
1182 | | |
1183 | 3.92M | QueryPagingState *query_state = tnode_context->query_state(); |
1184 | 3.92M | if (tnode->limit() && query_state->reached_select_limit()) { |
1185 | | // If the LIMIT clause has been reached, we are done. |
1186 | 3.61M | RETURN_NOT_OK(tnode_context->ClearQueryState()); |
1187 | 3.61M | return false; |
1188 | 310k | } |
1189 | | |
1190 | | //------------------------------------------------------------------------------------------------ |
1191 | | // Check if we should fetch more rows. |
1192 | | |
1193 | | // If there is no paging state the current scan has exhausted its results. The paging state |
1194 | | // might be non-empty, but just contain num_rows_skipped, in this case the |
1195 | | // 'next_partition_key' and 'next_row_key' would be empty indicating that we've finished |
1196 | | // reading the current partition. |
1197 | 310k | if (tnode_context->FinishedReadingPartition()) { |
1198 | | // If there or no other partitions to query, we are done. |
1199 | 283k | if (tnode_context->UnreadPartitionsRemaining() <= 1) { |
1200 | | // Clear the paging state, since we don't have any more data left in the table. |
1201 | 282k | RETURN_NOT_OK(tnode_context->ClearQueryState()); |
1202 | 282k | return false; |
1203 | 1.10k | } |
1204 | | |
1205 | | // Sanity check that if we finished a partition the next partition/row key are empty. |
1206 | | // Otherwise we could start scanning the next partition from the wrong place. |
1207 | 1.10k | DCHECK(query_state->next_partition_key().empty()); |
1208 | 1.10k | DCHECK(query_state->next_row_key().empty()); |
1209 | | |
1210 | | // Otherwise, we continue to the next partition. |
1211 | 1.10k | tnode_context->AdvanceToNextPartition(op->mutable_request()); |
1212 | 1.10k | } |
1213 | | |
1214 | | // Setup counters in read request to DocDB. |
1215 | 27.8k | const int64_t current_fetch_row_count = tnode_context->row_count(); |
1216 | 27.8k | const int64_t total_rows_skipped = query_state->skip_count(); |
1217 | 27.8k | const int64_t total_row_count = query_state->read_count(); |
1218 | | |
1219 | | // If we reached the fetch limit (min of paging_size and limit clause), this batch is done. |
1220 | 27.8k | int64_t fetch_limit = query_state->max_fetch_size(); |
1221 | 33.5k | if (fetch_limit >= 0 && current_fetch_row_count >= fetch_limit) { |
1222 | | // If we need to return a paging state to the user, we create it here so that we can resume from |
1223 | | // the exact place where we left off: partition index and primary key within that partition. |
1224 | 282 | if (op->request().return_paging_state()) { |
1225 | 282 | query_state->set_original_request_id(exec_context_->params().request_id()); |
1226 | 282 | query_state->set_table_id(tnode->table()->id()); |
1227 | 282 | query_state->set_total_num_rows_read(total_row_count); |
1228 | 282 | query_state->set_total_rows_skipped(total_rows_skipped); |
1229 | | |
1230 | | // Set the partition to resume from. Relevant for multi-partition selects, i.e. with IN |
1231 | | // condition on the partition columns. |
1232 | 282 | query_state->set_next_partition_index(tnode_context->current_partition_index()); |
1233 | | |
1234 | | // Write paging state to the node's rows_result to prepare for future batches. |
1235 | 282 | RETURN_NOT_OK(tnode_context->ComposeRowsResultForUser(nullptr, true /* for_new_batches */)); |
1236 | 282 | } |
1237 | 282 | return false; |
1238 | 27.5k | } |
1239 | | |
1240 | | //------------------------------------------------------------------------------------------------ |
1241 | | // Fetch more results. |
1242 | | // Update limit, offset and paging_state information for next scan request. |
1243 | 27.5k | op->mutable_request()->set_limit(fetch_limit - current_fetch_row_count); |
1244 | 27.5k | if (tnode->offset()) { |
1245 | | // The paging state keeps a running count of the number of rows skipped so far. |
1246 | 1.36k | int64_t offset = std::max(static_cast<int64_t>(0), |
1247 | 1.36k | query_state->select_offset() - total_rows_skipped); |
1248 | 1.36k | op->mutable_request()->set_offset(offset); |
1249 | 1.36k | } |
1250 | | |
1251 | 27.5k | QLPagingStatePB *paging_state = op->mutable_request()->mutable_paging_state(); |
1252 | 27.5k | paging_state->set_next_partition_key(query_state->next_partition_key()); |
1253 | 27.5k | paging_state->set_next_row_key(query_state->next_row_key()); |
1254 | 27.5k | paging_state->set_total_num_rows_read(total_row_count); |
1255 | 27.5k | paging_state->set_total_rows_skipped(total_rows_skipped); |
1256 | | |
1257 | 27.5k | return true; |
1258 | 27.5k | } |
1259 | | |
1260 | | Result<bool> Executor::FetchRowsByKeys(const PTSelectStmt* tnode, |
1261 | | const YBqlReadOpPtr& select_op, |
1262 | | const QLRowBlock& keys, |
1263 | 1.52k | TnodeContext* tnode_context) { |
1264 | 1.52k | const Schema& schema = tnode->table()->InternalSchema(); |
1265 | 702 | for (const QLRow& key : keys.rows()) { |
1266 | 702 | YBqlReadOpPtr op(tnode->table()->NewQLSelect()); |
1267 | 702 | op->set_yb_consistency_level(select_op->yb_consistency_level()); |
1268 | 702 | QLReadRequestPB* req = op->mutable_request(); |
1269 | 702 | req->CopyFrom(select_op->request()); |
1270 | 702 | RETURN_NOT_OK(WhereKeyToPB(req, schema, key)); |
1271 | 702 | AddOperation(op, tnode_context); |
1272 | 702 | } |
1273 | 1.52k | return !keys.rows().empty(); |
1274 | 1.52k | } |
1275 | | |
1276 | | //-------------------------------------------------------------------------------------------------- |
1277 | | |
1278 | 1.29M | Status Executor::ExecPTNode(const PTInsertStmt *tnode, TnodeContext* tnode_context) { |
1279 | | // Create write request. |
1280 | 1.29M | const shared_ptr<client::YBTable>& table = tnode->table(); |
1281 | 1.29M | YBqlWriteOpPtr insert_op(table->NewQLInsert()); |
1282 | 1.29M | QLWriteRequestPB *req = insert_op->mutable_request(); |
1283 | | |
1284 | | // Set the ttl. |
1285 | 1.29M | Status s = TtlToPB(tnode, req); |
1286 | 1.29M | if (PREDICT_FALSE(!s.ok())) { |
1287 | 4 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1288 | 4 | } |
1289 | | |
1290 | | // Set the timestamp. |
1291 | 1.29M | s = TimestampToPB(tnode, req); |
1292 | 1.29M | if (PREDICT_FALSE(!s.ok())) { |
1293 | 8 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1294 | 8 | } |
1295 | | |
1296 | | // Set the values for columns. |
1297 | 1.29M | if (tnode->InsertingValue()->opcode() == TreeNodeOpcode::kPTInsertJsonClause) { |
1298 | | // Error messages are already formatted and don't need additional wrap |
1299 | 77 | RETURN_NOT_OK( |
1300 | 77 | InsertJsonClauseToPB(tnode, |
1301 | 77 | static_cast<PTInsertJsonClause*>(tnode->InsertingValue().get()), |
1302 | 77 | req)); |
1303 | 1.29M | } else { |
1304 | 1.29M | s = ColumnArgsToPB(tnode, req); |
1305 | 1.29M | if (PREDICT_FALSE(!s.ok())) { |
1306 | | // Note: INVALID_ARGUMENTS is retryable error code (due to mapping into STALE_METADATA), |
1307 | | // INVALID_REQUEST - non-retryable. |
1308 | 62 | ErrorCode error_code = |
1309 | 62 | s.code() == Status::kNotSupported || s.code() == Status::kRuntimeError ? |
1310 | 58 | ErrorCode::INVALID_REQUEST : ErrorCode::INVALID_ARGUMENTS; |
1311 | | |
1312 | 62 | return exec_context_->Error(tnode, s, error_code); |
1313 | 62 | } |
1314 | 1.29M | } |
1315 | | |
1316 | | // Setup the column values that need to be read. |
1317 | 1.29M | s = ColumnRefsToPB(tnode, req->mutable_column_refs()); |
1318 | 1.29M | if (PREDICT_FALSE(!s.ok())) { |
1319 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1320 | 0 | } |
1321 | | |
1322 | | // Set the IF clause. |
1323 | 1.29M | if (tnode->if_clause() != nullptr) { |
1324 | 67 | s = PTExprToPB(tnode->if_clause(), insert_op->mutable_request()->mutable_if_expr()); |
1325 | 67 | if (PREDICT_FALSE(!s.ok())) { |
1326 | 0 | return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS); |
1327 | 0 | } |
1328 | 67 | req->set_else_error(tnode->else_error()); |
1329 | 67 | } |
1330 | | |
1331 | | // Set the RETURNS clause if set. |
1332 | 1.29M | if (tnode->returns_status()) { |
1333 | 267 | req->set_returns_status(true); |
1334 | 267 | } |
1335 | | |
1336 | | // Set whether write op writes to the static/primary row. |
1337 | 1.29M | insert_op->set_writes_static_row(tnode->ModifiesStaticRow()); |
1338 | 1.29M | insert_op->set_writes_primary_row(tnode->ModifiesPrimaryRow()); |
1339 | | |
1340 | | // Add the operation. |
1341 | 1.29M | return AddOperation(insert_op, tnode_context); |
1342 | 1.29M | } |
1343 | | |
1344 | | //-------------------------------------------------------------------------------------------------- |
1345 | | |
1346 | 740 | Status Executor::ExecPTNode(const PTDeleteStmt *tnode, TnodeContext* tnode_context) { |
1347 | | // Create write request. |
1348 | 740 | const shared_ptr<client::YBTable>& table = tnode->table(); |
1349 | 740 | YBqlWriteOpPtr delete_op(table->NewQLDelete()); |
1350 | 740 | QLWriteRequestPB *req = delete_op->mutable_request(); |
1351 | | |
1352 | | // Set the timestamp. |
1353 | 740 | Status s = TimestampToPB(tnode, req); |
1354 | 740 | if (PREDICT_FALSE(!s.ok())) { |
1355 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1356 | 0 | } |
1357 | | |
1358 | | // Where clause - Hash, range, and regular columns. |
1359 | | // NOTE: Currently, where clause for write op doesn't allow regular columns. |
1360 | 740 | s = WhereClauseToPB(req, tnode->key_where_ops(), tnode->where_ops(), |
1361 | 740 | tnode->subscripted_col_where_ops()); |
1362 | 740 | if (PREDICT_FALSE(!s.ok())) { |
1363 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1364 | 0 | } |
1365 | | |
1366 | | // Setup the column values that need to be read. |
1367 | 740 | s = ColumnRefsToPB(tnode, req->mutable_column_refs()); |
1368 | 740 | if (PREDICT_FALSE(!s.ok())) { |
1369 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1370 | 0 | } |
1371 | 740 | s = ColumnArgsToPB(tnode, req); |
1372 | 740 | if (PREDICT_FALSE(!s.ok())) { |
1373 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1374 | 0 | } |
1375 | | |
1376 | | // Set the IF clause. |
1377 | 740 | if (tnode->if_clause() != nullptr) { |
1378 | 22 | s = PTExprToPB(tnode->if_clause(), delete_op->mutable_request()->mutable_if_expr()); |
1379 | 22 | if (PREDICT_FALSE(!s.ok())) { |
1380 | 0 | return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS); |
1381 | 0 | } |
1382 | 22 | req->set_else_error(tnode->else_error()); |
1383 | 22 | } |
1384 | | |
1385 | | // Set the RETURNS clause if set. |
1386 | 740 | if (tnode->returns_status()) { |
1387 | 4 | req->set_returns_status(true); |
1388 | 4 | } |
1389 | | |
1390 | | // Set whether write op writes to the static/primary row. |
1391 | 740 | delete_op->set_writes_static_row(tnode->ModifiesStaticRow()); |
1392 | 740 | delete_op->set_writes_primary_row(tnode->ModifiesPrimaryRow()); |
1393 | | |
1394 | | // Add the operation. |
1395 | 740 | return AddOperation(delete_op, tnode_context); |
1396 | 740 | } |
1397 | | |
1398 | | //-------------------------------------------------------------------------------------------------- |
1399 | | |
1400 | 3.24k | Status Executor::ExecPTNode(const PTUpdateStmt *tnode, TnodeContext* tnode_context) { |
1401 | | // Create write request. |
1402 | 3.24k | const shared_ptr<client::YBTable>& table = tnode->table(); |
1403 | 3.24k | YBqlWriteOpPtr update_op(table->NewQLUpdate()); |
1404 | 3.24k | QLWriteRequestPB *req = update_op->mutable_request(); |
1405 | | |
1406 | | // Where clause - Hash, range, and regular columns. |
1407 | | // NOTE: Currently, where clause for write op doesn't allow regular columns. |
1408 | 3.24k | Status s = WhereClauseToPB(req, tnode->key_where_ops(), tnode->where_ops(), |
1409 | 3.24k | tnode->subscripted_col_where_ops()); |
1410 | 3.24k | if (PREDICT_FALSE(!s.ok())) { |
1411 | 6 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1412 | 6 | } |
1413 | | |
1414 | | // Set the ttl. |
1415 | 3.23k | s = TtlToPB(tnode, req); |
1416 | 3.23k | if (PREDICT_FALSE(!s.ok())) { |
1417 | 1 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1418 | 1 | } |
1419 | | |
1420 | | // Set the timestamp. |
1421 | 3.23k | s = TimestampToPB(tnode, req); |
1422 | 3.23k | if (PREDICT_FALSE(!s.ok())) { |
1423 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1424 | 0 | } |
1425 | | |
1426 | | // Setup the columns' new values. |
1427 | 3.23k | s = ColumnArgsToPB(tnode, update_op->mutable_request()); |
1428 | 3.23k | if (PREDICT_FALSE(!s.ok())) { |
1429 | 10 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1430 | 10 | } |
1431 | | |
1432 | 3.22k | if (req->column_values_size() == 0) { |
1433 | | // We can reach here only in case of an UPDATE that consists of only setting |
1434 | | // jsonb col's attributes to 'null' along with ignore_null_jsonb_attributes=true |
1435 | 0 | VLOG(1) << "Avoid updating indexes since 0 cols are written"; |
1436 | 22 | if (tnode->returns_status()) { |
1437 | | // Return row with [applied] = false with appropriate [message]. |
1438 | 1 | std::shared_ptr<std::vector<ColumnSchema>> columns = |
1439 | 1 | std::make_shared<std::vector<ColumnSchema>>(); |
1440 | 1 | const auto& schema = table->schema(); |
1441 | 1 | columns->reserve(schema.num_columns() + 2); |
1442 | 1 | columns->emplace_back("[applied]", DataType::BOOL); |
1443 | 1 | columns->emplace_back("[message]", DataType::STRING); |
1444 | 1 | columns->insert(columns->end(), schema.columns().begin(), schema.columns().end()); |
1445 | | |
1446 | 1 | QLRowBlock result_row_block(Schema(*columns, 0)); |
1447 | 1 | QLRow& row = result_row_block.Extend(); |
1448 | 1 | row.mutable_column(0)->set_bool_value(false); |
1449 | 1 | row.mutable_column(1)->set_string_value( |
1450 | 1 | "No update performed as all JSON cols are set to 'null'"); |
1451 | | // Leave the rest of the columns null in this case. |
1452 | | |
1453 | 1 | faststring row_data; |
1454 | 1 | result_row_block.Serialize(YQL_CLIENT_CQL, &row_data); |
1455 | | |
1456 | 1 | result_ = std::make_shared<RowsResult>(table->name(), columns, row_data.ToString()); |
1457 | 21 | } else if (tnode->if_clause() != nullptr) { |
1458 | | // Return row with [applied] = false. |
1459 | 1 | std::shared_ptr<std::vector<ColumnSchema>> columns = |
1460 | 1 | std::make_shared<std::vector<ColumnSchema>>(); |
1461 | 1 | columns->emplace_back("[applied]", DataType::BOOL); |
1462 | | |
1463 | 1 | QLRowBlock result_row_block(Schema(*columns, 0)); |
1464 | 1 | QLRow& row = result_row_block.Extend(); |
1465 | 1 | row.mutable_column(0)->set_bool_value(false); |
1466 | | |
1467 | 1 | faststring row_data; |
1468 | 1 | result_row_block.Serialize(YQL_CLIENT_CQL, &row_data); |
1469 | | |
1470 | 1 | result_ = std::make_shared<RowsResult>(table->name(), columns, row_data.ToString()); |
1471 | 1 | } |
1472 | | |
1473 | 22 | return Status::OK(); |
1474 | 22 | } |
1475 | | |
1476 | | // Setup the column values that need to be read. |
1477 | 3.20k | s = ColumnRefsToPB(tnode, req->mutable_column_refs()); |
1478 | 3.20k | if (PREDICT_FALSE(!s.ok())) { |
1479 | 0 | return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS); |
1480 | 0 | } |
1481 | | |
1482 | | // Set the IF clause. |
1483 | 3.20k | if (tnode->if_clause() != nullptr) { |
1484 | 47 | s = PTExprToPB(tnode->if_clause(), update_op->mutable_request()->mutable_if_expr()); |
1485 | 47 | if (PREDICT_FALSE(!s.ok())) { |
1486 | 0 | return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS); |
1487 | 0 | } |
1488 | 47 | req->set_else_error(tnode->else_error()); |
1489 | 47 | } |
1490 | | |
1491 | | // Set the RETURNS clause if set. |
1492 | 3.20k | if (tnode->returns_status()) { |
1493 | 4 | req->set_returns_status(true); |
1494 | 4 | } |
1495 | | |
1496 | | // Set whether write op writes to the static/primary row. |
1497 | 3.20k | update_op->set_writes_static_row(tnode->ModifiesStaticRow()); |
1498 | 3.20k | update_op->set_writes_primary_row(tnode->ModifiesPrimaryRow()); |
1499 | | |
1500 | | // Add the operation. |
1501 | 3.20k | return AddOperation(update_op, tnode_context); |
1502 | 3.20k | } |
1503 | | |
1504 | | //-------------------------------------------------------------------------------------------------- |
1505 | | |
1506 | 55.6k | Status Executor::ExecPTNode(const PTStartTransaction *tnode) { |
1507 | 55.6k | return exec_context_->StartTransaction(tnode->isolation_level(), ql_env_, rescheduler_); |
1508 | 55.6k | } |
1509 | | |
1510 | | //-------------------------------------------------------------------------------------------------- |
1511 | | |
1512 | 55.6k | Status Executor::ExecPTNode(const PTCommit *tnode) { |
1513 | | // Commit happens after the write operations have been flushed and responded. |
1514 | 55.6k | return Status::OK(); |
1515 | 55.6k | } |
1516 | | |
1517 | | //-------------------------------------------------------------------------------------------------- |
1518 | | |
1519 | 3.02k | Status Executor::ExecPTNode(const PTTruncateStmt *tnode) { |
1520 | 3.02k | return ql_env_->TruncateTable(tnode->table_id()); |
1521 | 3.02k | } |
1522 | | |
1523 | | //-------------------------------------------------------------------------------------------------- |
1524 | | |
1525 | 1.61k | Status Executor::ExecPTNode(const PTCreateKeyspace *tnode) { |
1526 | 1.61k | Status s = ql_env_->CreateKeyspace(tnode->name()); |
1527 | | |
1528 | 1.61k | if (PREDICT_FALSE(!s.ok())) { |
1529 | 2 | ErrorCode error_code = ErrorCode::SERVER_ERROR; |
1530 | | |
1531 | 2 | if (s.IsAlreadyPresent()) { |
1532 | 2 | if (tnode->create_if_not_exists()) { |
1533 | | // Case: CREATE KEYSPACE IF NOT EXISTS name; |
1534 | 0 | return Status::OK(); |
1535 | 0 | } |
1536 | | |
1537 | 2 | error_code = ErrorCode::KEYSPACE_ALREADY_EXISTS; |
1538 | 2 | } |
1539 | | |
1540 | 2 | return exec_context_->Error(tnode, s, error_code); |
1541 | 1.60k | } |
1542 | | |
1543 | 1.60k | result_ = std::make_shared<SchemaChangeResult>("CREATED", "KEYSPACE", tnode->name()); |
1544 | 1.60k | return Status::OK(); |
1545 | 1.60k | } |
1546 | | |
1547 | | //-------------------------------------------------------------------------------------------------- |
1548 | | |
1549 | 4.18k | Status Executor::ExecPTNode(const PTUseKeyspace *tnode) { |
1550 | 4.18k | const MonoTime start_time = MonoTime::Now(); |
1551 | 4.18k | const Status s = ql_env_->UseKeyspace(tnode->name()); |
1552 | 4.18k | if (PREDICT_FALSE(!s.ok())) { |
1553 | 1 | ErrorCode error_code = s.IsNotFound() ? ErrorCode::KEYSPACE_NOT_FOUND : ErrorCode::SERVER_ERROR; |
1554 | 1 | return exec_context_->Error(tnode, s, error_code); |
1555 | 1 | } |
1556 | | |
1557 | 4.18k | result_ = std::make_shared<SetKeyspaceResult>(tnode->name()); |
1558 | | |
1559 | 4.18k | if (ql_metrics_ != nullptr) { |
1560 | 4.18k | const auto delta_usec = (MonoTime::Now() - start_time).ToMicroseconds(); |
1561 | 4.18k | ql_metrics_->ql_use_->Increment(delta_usec); |
1562 | 4.18k | } |
1563 | 4.18k | return Status::OK(); |
1564 | 4.18k | } |
1565 | | |
1566 | | //-------------------------------------------------------------------------------------------------- |
1567 | | |
1568 | 21 | Status Executor::ExecPTNode(const PTAlterKeyspace *tnode) { |
1569 | | // To get new keyspace properties use: tnode->keyspace_properties() |
1570 | | // Current implementation only check existence of this keyspace. |
1571 | 21 | const Status s = ql_env_->AlterKeyspace(tnode->name()); |
1572 | | |
1573 | 21 | if (PREDICT_FALSE(!s.ok())) { |
1574 | 1 | ErrorCode error_code = s.IsNotFound() ? ErrorCode::KEYSPACE_NOT_FOUND : ErrorCode::SERVER_ERROR; |
1575 | 1 | return exec_context_->Error(tnode, s, error_code); |
1576 | 1 | } |
1577 | | |
1578 | 20 | result_ = std::make_shared<SchemaChangeResult>("UPDATED", "KEYSPACE", tnode->name()); |
1579 | 20 | return Status::OK(); |
1580 | 20 | } |
1581 | | |
1582 | | //-------------------------------------------------------------------------------------------------- |
1583 | | |
1584 | | namespace { |
1585 | | |
1586 | 340 | void AddStringRow(const string& str, QLRowBlock* row_block) { |
1587 | 340 | row_block->Extend().mutable_column(0)->set_string_value(str); |
1588 | 340 | } |
1589 | | |
1590 | 340 | void RightPad(const int length, string *s) { |
1591 | 340 | s->append(length - s->length(), ' '); |
1592 | 340 | } |
1593 | | } // namespace |
1594 | | |
1595 | 165 | Status Executor::ExecPTNode(const PTExplainStmt *tnode) { |
1596 | 165 | TreeNode::SharedPtr subStmt = tnode->stmt(); |
1597 | 165 | PTDmlStmt *dmlStmt = down_cast<PTDmlStmt *>(subStmt.get()); |
1598 | 165 | const YBTableName explainTable(YQL_DATABASE_CQL, "Explain"); |
1599 | 165 | ColumnSchema explainColumn("QUERY PLAN", STRING); |
1600 | 165 | auto explainColumns = std::make_shared<std::vector<ColumnSchema>>( |
1601 | 165 | std::initializer_list<ColumnSchema>{explainColumn}); |
1602 | 165 | auto explainSchema = std::make_shared<Schema>(*explainColumns, 0); |
1603 | 165 | QLRowBlock row_block(*explainSchema); |
1604 | 165 | faststring buffer; |
1605 | 165 | ExplainPlanPB explain_plan = dmlStmt->AnalysisResultToPB(); |
1606 | 165 | switch (explain_plan.plan_case()) { |
1607 | 161 | case ExplainPlanPB::kSelectPlan: { |
1608 | 161 | SelectPlanPB *select_plan = explain_plan.mutable_select_plan(); |
1609 | 161 | if (select_plan->has_aggregate()) { |
1610 | 0 | RightPad(select_plan->output_width(), select_plan->mutable_aggregate()); |
1611 | 0 | AddStringRow(select_plan->aggregate(), &row_block); |
1612 | 0 | } |
1613 | 161 | RightPad(select_plan->output_width(), select_plan->mutable_select_type()); |
1614 | 161 | AddStringRow(select_plan->select_type(), &row_block); |
1615 | 161 | if (select_plan->has_key_conditions()) { |
1616 | 26 | RightPad(select_plan->output_width(), select_plan->mutable_key_conditions()); |
1617 | 26 | AddStringRow(select_plan->key_conditions(), &row_block); |
1618 | 26 | } |
1619 | 161 | if (select_plan->has_filter()) { |
1620 | 142 | RightPad(select_plan->output_width(), select_plan->mutable_filter()); |
1621 | 142 | AddStringRow(select_plan->filter(), &row_block); |
1622 | 142 | } |
1623 | 161 | break; |
1624 | 0 | } |
1625 | 1 | case ExplainPlanPB::kInsertPlan: { |
1626 | 1 | InsertPlanPB *insert_plan = explain_plan.mutable_insert_plan(); |
1627 | 1 | RightPad(insert_plan->output_width(), insert_plan->mutable_insert_type()); |
1628 | 1 | AddStringRow(insert_plan->insert_type(), &row_block); |
1629 | 1 | break; |
1630 | 0 | } |
1631 | 1 | case ExplainPlanPB::kUpdatePlan: { |
1632 | 1 | UpdatePlanPB *update_plan = explain_plan.mutable_update_plan(); |
1633 | 1 | RightPad(update_plan->output_width(), update_plan->mutable_update_type()); |
1634 | 1 | AddStringRow(update_plan->update_type(), &row_block); |
1635 | 1 | RightPad(update_plan->output_width(), update_plan->mutable_scan_type()); |
1636 | 1 | AddStringRow(update_plan->scan_type(), &row_block); |
1637 | 1 | RightPad(update_plan->output_width(), update_plan->mutable_key_conditions()); |
1638 | 1 | AddStringRow(update_plan->key_conditions(), &row_block); |
1639 | 1 | break; |
1640 | 0 | } |
1641 | 2 | case ExplainPlanPB::kDeletePlan: { |
1642 | 2 | DeletePlanPB *delete_plan = explain_plan.mutable_delete_plan(); |
1643 | 2 | RightPad(delete_plan->output_width(), delete_plan->mutable_delete_type()); |
1644 | 2 | AddStringRow(delete_plan->delete_type(), &row_block); |
1645 | 2 | RightPad(delete_plan->output_width(), delete_plan->mutable_scan_type()); |
1646 | 2 | AddStringRow(delete_plan->scan_type(), &row_block); |
1647 | 2 | RightPad(delete_plan->output_width(), delete_plan->mutable_key_conditions()); |
1648 | 2 | AddStringRow(delete_plan->key_conditions(), &row_block); |
1649 | 2 | if (delete_plan->has_filter()) { |
1650 | 1 | RightPad(delete_plan->output_width(), delete_plan->mutable_filter()); |
1651 | 1 | AddStringRow(delete_plan->filter(), &row_block); |
1652 | 1 | } |
1653 | 2 | break; |
1654 | 0 | } |
1655 | 0 | case ExplainPlanPB::PLAN_NOT_SET: { |
1656 | 0 | return exec_context_->Error(tnode, ErrorCode::EXEC_ERROR); |
1657 | 0 | break; |
1658 | 165 | } |
1659 | 165 | } |
1660 | 165 | row_block.Serialize(YQL_CLIENT_CQL, &buffer); |
1661 | 165 | result_ = std::make_shared<RowsResult>(explainTable, explainColumns, buffer.ToString()); |
1662 | 165 | return Status::OK(); |
1663 | 165 | } |
1664 | | |
1665 | | //-------------------------------------------------------------------------------------------------- |
1666 | | |
1667 | | namespace { |
1668 | | |
1669 | | // When executing a DML in a transaction or a SELECT statement on a transaction-enabled table, the |
1670 | | // following transient errors may happen for which the YCQL service will restart the transaction. |
1671 | | // - TryAgain: when the transaction has a write conflict with another transaction or read- |
1672 | | // restart is required. |
1673 | | // - Expired: when the transaction expires due to missed heartbeat |
1674 | 5.37M | bool NeedsRestart(const Status& s) { |
1675 | 5.37M | return s.IsTryAgain() || s.IsExpired(); |
1676 | 5.37M | } |
1677 | | |
1678 | 5.31M | bool ShouldRestart(const Status& s, Rescheduler* rescheduler) { |
1679 | 5.31M | return NeedsRestart(s) && (CoarseMonoClock::now() < rescheduler->GetDeadline()); |
1680 | 5.31M | } |
1681 | | |
1682 | | // Process TnodeContexts and their children under an ExecContext. |
1683 | | Status ProcessTnodeContexts(ExecContext* exec_context, |
1684 | 5.21M | const std::function<Result<bool>(TnodeContext*)>& processor) { |
1685 | 5.33M | for (TnodeContext& tnode_context : exec_context->tnode_contexts()) { |
1686 | 5.33M | TnodeContext* p = &tnode_context; |
1687 | 10.6M | while (p != nullptr) { |
1688 | 5.34M | const Result<bool> done = processor(p); |
1689 | 5.34M | RETURN_NOT_OK(done); |
1690 | 5.34M | if (done.get()) { |
1691 | 56.4k | return Status::OK(); |
1692 | 56.4k | } |
1693 | 5.28M | p = p->child_context(); |
1694 | 5.28M | } |
1695 | 5.33M | } |
1696 | 5.15M | return Status::OK(); |
1697 | 5.21M | } |
1698 | | |
1699 | 9.92M | bool NeedsFlush(const client::YBSessionPtr& session) { |
1700 | | // We need to flush session if we have added operations because some errors are only checked |
1701 | | // during session flush and passed into flush callback. |
1702 | 9.92M | return session->HasNotFlushedOperations(); |
1703 | 9.92M | } |
1704 | | |
1705 | | } // namespace |
1706 | | |
1707 | 1.39M | client::YBSessionPtr Executor::GetSession(ExecContext* exec_context) { |
1708 | 1.24M | return exec_context->HasTransaction() ? exec_context->transactional_session() : session_; |
1709 | 1.39M | } |
1710 | | |
1711 | 9.61M | void Executor::FlushAsync(ResetAsyncCalls* reset_async_calls) { |
1712 | 9.61M | if (num_async_calls() != 0) { |
1713 | 0 | LOG(DFATAL) << __func__ << " while have " << num_async_calls() << " async calls running"; |
1714 | 0 | return; |
1715 | 0 | } |
1716 | | |
1717 | | // Buffered read/write operations are flushed in rounds. In each round, FlushAsync() is called to |
1718 | | // flush buffered operations in the non-transactional session in the Executor or the transactional |
1719 | | // session in each ExecContext if any. Also, transactions in any ExecContext ready to commit with |
1720 | | // no more pending operation are also committed. If there is no session to flush nor |
1721 | | // transaction to commit, the statement is executed. |
1722 | | // |
1723 | | // As flushes and commits happen, multiple FlushAsyncDone() and CommitDone() callbacks can be |
1724 | | // invoked concurrently. To avoid race condition among them, the async-call count before calling |
1725 | | // FlushAsync() and CommitTransaction(). This is necessary so that only the last callback will |
1726 | | // correctly detect that all async calls are done invoked before processing the async results |
1727 | | // exclusively. |
1728 | 9.61M | write_batch_.Clear(); |
1729 | 9.61M | std::vector<std::pair<YBSessionPtr, ExecContext*>> flush_sessions; |
1730 | 9.61M | std::vector<ExecContext*> commit_contexts; |
1731 | 9.61M | if (NeedsFlush(session_)) { |
1732 | 4.66M | flush_sessions.push_back({session_, nullptr}); |
1733 | 4.66M | } |
1734 | 10.3M | for (ExecContext& exec_context : exec_contexts_) { |
1735 | 10.3M | if (exec_context.HasTransaction()) { |
1736 | 231k | auto transactional_session = exec_context.transactional_session(); |
1737 | 231k | if (NeedsFlush(transactional_session)) { |
1738 | | // In case or retry we should ignore values that could be written by previous attempts |
1739 | | // of retried operation. |
1740 | 150k | transactional_session->SetInTxnLimit(transactional_session->read_point()->Now()); |
1741 | 150k | flush_sessions.push_back({transactional_session, &exec_context}); |
1742 | 81.5k | } else if (!exec_context.HasPendingOperations()) { |
1743 | 78.5k | commit_contexts.push_back(&exec_context); |
1744 | 78.5k | } |
1745 | 231k | } |
1746 | 10.3M | } |
1747 | | |
1748 | | // Commit transactions first before flushing operations in case some operations are blocked by |
1749 | | // prior operations in the uncommitted transactions. num_flushes_ is updated before FlushAsync() |
1750 | | // and CommitTransaction() are called to avoid race condition of recursive FlushAsync() called |
1751 | | // from FlushAsyncDone() and CommitDone(). |
1752 | 9.61M | num_flushes_ += flush_sessions.size(); |
1753 | 9.61M | async_status_ = Status::OK(); |
1754 | | |
1755 | 9.61M | if (flush_sessions.empty() && commit_contexts.empty()) { |
1756 | | // If this is a batch returning status, append the rows in the user-given order before |
1757 | | // returning result. |
1758 | 4.70M | if (IsReturnsStatusBatch()) { |
1759 | 158 | for (ExecContext& exec_context : exec_contexts_) { |
1760 | 158 | int64_t row_count = 0; |
1761 | 158 | RETURN_STMT_NOT_OK(ProcessTnodeContexts( |
1762 | 158 | &exec_context, |
1763 | 158 | [this, &exec_context, &row_count](TnodeContext* tnode_context) -> Result<bool> { |
1764 | 158 | for (client::YBqlOpPtr& op : tnode_context->ops()) { |
1765 | 158 | if (!op->rows_data().empty()) { |
1766 | 158 | DCHECK_EQ(++row_count, 1) << exec_context.stmt() |
1767 | 158 | << " returned multiple status rows"; |
1768 | 158 | RETURN_NOT_OK(AppendRowsResult(std::make_shared<RowsResult>(op.get()))); |
1769 | 158 | } |
1770 | 158 | } |
1771 | 158 | return false; // not done |
1772 | 158 | }), reset_async_calls); |
1773 | 158 | } |
1774 | 52 | } |
1775 | 4.70M | return StatementExecuted(Status::OK(), reset_async_calls); |
1776 | 4.91M | } |
1777 | | |
1778 | 4.91M | reset_async_calls->Cancel(); |
1779 | 4.91M | num_async_calls_.store(flush_sessions.size() + commit_contexts.size(), std::memory_order_release); |
1780 | 78.5k | for (auto* exec_context : commit_contexts) { |
1781 | 78.5k | exec_context->CommitTransaction( |
1782 | 78.5k | rescheduler_->GetDeadline(), [this, exec_context](const Status& s) { |
1783 | 78.5k | CommitDone(s, exec_context); |
1784 | 78.5k | }); |
1785 | 78.5k | } |
1786 | | // Use the same score on each tablet. So probability of rejecting write should be related |
1787 | | // to used capacity. |
1788 | 4.91M | auto rejection_score_source = std::make_shared<client::RejectionScoreSource>(); |
1789 | 4.82M | for (const auto& pair : flush_sessions) { |
1790 | 4.82M | auto session = pair.first; |
1791 | 4.82M | auto exec_context = pair.second; |
1792 | 4.82M | session->SetRejectionScoreSource(rejection_score_source); |
1793 | 4.82M | TRACE("Flush Async"); |
1794 | 4.83M | session->FlushAsync([this, exec_context](client::FlushStatus* flush_status) { |
1795 | 4.83M | FlushAsyncDone(flush_status, exec_context); |
1796 | 4.83M | }); |
1797 | 4.82M | } |
1798 | 4.91M | } |
1799 | | |
1800 | | // As multiple FlushAsyncDone() and CommitDone() can be invoked concurrently for different |
1801 | | // ExecContexts, care must be taken so that the callbacks only update the individual ExecContexts. |
1802 | | // Any update on data structures shared in Executor should either be protected by a mutex or |
1803 | | // deferred to ProcessAsyncResults() that will be invoked exclusively. |
1804 | 4.81M | void Executor::FlushAsyncDone(client::FlushStatus* flush_status, ExecContext* exec_context) { |
1805 | 4.81M | TRACE("Flush Async Done"); |
1806 | | // Process FlushAsync status for either transactional session in an ExecContext, or the |
1807 | | // non-transactional session in the Executor for other ExecContexts with no transactional session. |
1808 | | |
1809 | | // When any error occurs during the dispatching of YBOperation, YBSession saves the error and |
1810 | | // returns IOError. When it happens, retrieves the errors and discard the IOError. |
1811 | 4.81M | Status s = flush_status->status; |
1812 | 4.81M | OpErrors op_errors; |
1813 | 4.81M | if (s.IsIOError()) { |
1814 | 56.9k | for (const auto& error : flush_status->errors) { |
1815 | 56.9k | op_errors[static_cast<const client::YBqlOp*>(&error->failed_op())] = error->status(); |
1816 | 56.9k | } |
1817 | 56.7k | s = Status::OK(); |
1818 | 56.7k | } |
1819 | | |
1820 | 4.81M | if (s.ok()) { |
1821 | 4.81M | if (exec_context != nullptr) { |
1822 | 150k | s = ProcessAsyncStatus(op_errors, exec_context); |
1823 | 150k | if (!s.ok()) { |
1824 | 1.41k | std::lock_guard<std::mutex> lock(status_mutex_); |
1825 | 1.41k | async_status_ = s; |
1826 | 1.41k | } |
1827 | 4.66M | } else { |
1828 | 5.04M | for (auto& exec_context : exec_contexts_) { |
1829 | 5.04M | if (!exec_context.HasTransaction()) { |
1830 | 5.04M | s = ProcessAsyncStatus(op_errors, &exec_context); |
1831 | 5.04M | if (!s.ok()) { |
1832 | 1.13k | std::lock_guard<std::mutex> lock(status_mutex_); |
1833 | 1.13k | async_status_ = s; |
1834 | 1.13k | } |
1835 | 5.04M | } |
1836 | 5.04M | } |
1837 | 4.66M | } |
1838 | 215 | } else { |
1839 | 215 | std::lock_guard<std::mutex> lock(status_mutex_); |
1840 | 215 | async_status_ = s; |
1841 | 215 | } |
1842 | | |
1843 | | // Process async results exclusively if this is the last callback of the last FlushAsync() and |
1844 | | // there is no more outstanding async call. |
1845 | 4.84M | if (AddFetch(&num_async_calls_, -1, std::memory_order_acq_rel) == 0) { |
1846 | 4.84M | ResetAsyncCalls reset_async_calls(&num_async_calls_); |
1847 | 4.84M | ProcessAsyncResults(/* rescheduled */ false, &reset_async_calls); |
1848 | 4.84M | } |
1849 | 4.81M | } |
1850 | | |
1851 | 78.5k | void Executor::CommitDone(Status s, ExecContext* exec_context) { |
1852 | 78.5k | TRACE("Commit Transaction Done"); |
1853 | | |
1854 | 78.5k | if (s.ok()) { |
1855 | 32.7k | if (ql_metrics_ != nullptr) { |
1856 | 32.7k | const MonoTime now = MonoTime::Now(); |
1857 | 32.7k | const auto delta_usec = (now - exec_context->transaction_start_time()).ToMicroseconds(); |
1858 | 32.7k | ql_metrics_->ql_transaction_->Increment(delta_usec); |
1859 | 32.7k | } |
1860 | 45.7k | } else { |
1861 | 45.7k | if (ShouldRestart(s, rescheduler_)) { |
1862 | 45.7k | exec_context->Reset(client::Restart::kTrue, rescheduler_); |
1863 | 3 | } else { |
1864 | 3 | std::lock_guard<std::mutex> lock(status_mutex_); |
1865 | 3 | async_status_ = s; |
1866 | 3 | } |
1867 | 45.7k | } |
1868 | | |
1869 | | // Process async results exclusively if this is the last callback of the last FlushAsync() and |
1870 | | // there is no more outstanding async call. |
1871 | 78.5k | if (AddFetch(&num_async_calls_, -1, std::memory_order_acq_rel) == 0) { |
1872 | 78.1k | ResetAsyncCalls reset_async_calls(&num_async_calls_); |
1873 | 78.1k | ProcessAsyncResults(/* rescheduled */ false, &reset_async_calls); |
1874 | 78.1k | } |
1875 | 78.5k | } |
1876 | | |
1877 | 9.82M | void Executor::ProcessAsyncResults(const bool rescheduled, ResetAsyncCalls* reset_async_calls) { |
1878 | 9.82M | if (num_async_calls() != 0) { |
1879 | 0 | LOG(DFATAL) << __func__ << " while have " << num_async_calls() << " async calls running"; |
1880 | 0 | return; |
1881 | 0 | } |
1882 | | |
1883 | | // If the current thread is not the RPC worker thread, call the callback directly. Otherwise, |
1884 | | // reschedule the call to resume in the RPC worker thread. |
1885 | 9.82M | if (!rescheduled && rescheduler_->NeedReschedule()) { |
1886 | 4.88M | return rescheduler_->Reschedule(&process_async_results_task_.Bind(this, reset_async_calls)); |
1887 | 4.88M | } |
1888 | | |
1889 | | // Return error immediately when async call failed. |
1890 | 4.94M | RETURN_STMT_NOT_OK(async_status_, reset_async_calls); |
1891 | | |
1892 | | // Go through each ExecContext and process async results. |
1893 | 4.94M | bool need_flush = false; |
1894 | 4.94M | bool has_restart = false; |
1895 | 4.92M | const MonoTime now = (ql_metrics_ != nullptr) ? MonoTime::Now() : MonoTime(); |
1896 | 10.2M | for (auto exec_itr = exec_contexts_.begin(); exec_itr != exec_contexts_.end(); ) { |
1897 | | |
1898 | | // Set current ExecContext. |
1899 | 5.31M | exec_context_ = &*exec_itr; |
1900 | | |
1901 | | // Restart a statement if necessary |
1902 | 5.31M | if (exec_context_->restart()) { |
1903 | 102k | has_restart = true; |
1904 | 102k | const TreeNode *root = exec_context_->parse_tree().root().get(); |
1905 | | // Clear partial rows accumulated from the SELECT statement. |
1906 | 102k | if (root->opcode() == TreeNodeOpcode::kPTSelectStmt) { |
1907 | 33 | result_ = nullptr; |
1908 | 33 | } |
1909 | | |
1910 | | // We should restart read, but read time was specified by caller. |
1911 | | // For instance it could happen in case of pagination. |
1912 | 102k | if (exec_context_->params().read_time()) { |
1913 | 0 | return StatementExecuted( |
1914 | 0 | STATUS(IllegalState, "Restart read required, but read time specified by caller"), |
1915 | 0 | reset_async_calls); |
1916 | 0 | } |
1917 | | |
1918 | 102k | YBSessionPtr session = GetSession(exec_context_); |
1919 | 102k | session->SetReadPoint(client::Restart::kTrue); |
1920 | 102k | RETURN_STMT_NOT_OK(ExecTreeNode(root), reset_async_calls); |
1921 | 102k | need_flush |= NeedsFlush(session); |
1922 | 102k | exec_itr++; |
1923 | 102k | continue; |
1924 | 5.21M | } |
1925 | | |
1926 | | // Go through each TnodeContext in an ExecContext and process async results. |
1927 | 5.21M | auto& tnode_contexts = exec_context_->tnode_contexts(); |
1928 | 10.5M | for (auto tnode_itr = tnode_contexts.begin(); tnode_itr != tnode_contexts.end(); ) { |
1929 | 5.32M | TnodeContext& tnode_context = *tnode_itr; |
1930 | | |
1931 | 5.32M | const Result<bool> result = ProcessTnodeResults(&tnode_context); |
1932 | 5.32M | RETURN_STMT_NOT_OK(result, reset_async_calls); |
1933 | 5.32M | if (*result) { |
1934 | 48.2k | need_flush = true; |
1935 | 48.2k | } |
1936 | | |
1937 | | // If this statement is restarted, stop traversing the rest of the statement tnodes. |
1938 | 5.32M | if (exec_context_->restart()) { |
1939 | 0 | break; |
1940 | 0 | } |
1941 | | |
1942 | | // If there are pending ops, we are not done with this statement tnode yet. |
1943 | 5.32M | if (tnode_context.HasPendingOperations()) { |
1944 | 69.0k | tnode_itr++; |
1945 | 69.0k | continue; |
1946 | 69.0k | } |
1947 | | |
1948 | | // For SELECT statement, aggregate result sets if needed. |
1949 | 5.25M | const TreeNode *tnode = tnode_context.tnode(); |
1950 | 5.25M | if (tnode->opcode() == TreeNodeOpcode::kPTSelectStmt) { |
1951 | 3.89M | RETURN_STMT_NOT_OK( |
1952 | 3.89M | AggregateResultSets(static_cast<const PTSelectStmt *>(tnode), &tnode_context), |
1953 | 3.89M | reset_async_calls); |
1954 | 3.89M | } |
1955 | | |
1956 | | // Update the metrics for SELECT/INSERT/UPDATE/DELETE here after the ops have been completed |
1957 | | // but exclude the time to commit the transaction if any. Report the metric only once. |
1958 | 5.25M | if (ql_metrics_ != nullptr && !tnode_context.end_time().Initialized()) { |
1959 | 5.23M | tnode_context.set_end_time(now); |
1960 | 5.23M | const auto delta_usec = tnode_context.execution_time().ToMicroseconds(); |
1961 | 5.23M | switch (tnode->opcode()) { |
1962 | 3.89M | case TreeNodeOpcode::kPTSelectStmt: |
1963 | 3.89M | ql_metrics_->ql_select_->Increment(delta_usec); |
1964 | 3.89M | break; |
1965 | 1.23M | case TreeNodeOpcode::kPTInsertStmt: |
1966 | 1.23M | ql_metrics_->ql_insert_->Increment(delta_usec); |
1967 | 1.23M | break; |
1968 | 3.00k | case TreeNodeOpcode::kPTUpdateStmt: |
1969 | 3.00k | ql_metrics_->ql_update_->Increment(delta_usec); |
1970 | 3.00k | break; |
1971 | 695 | case TreeNodeOpcode::kPTDeleteStmt: |
1972 | 695 | ql_metrics_->ql_delete_->Increment(delta_usec); |
1973 | 695 | break; |
1974 | 55.6k | case TreeNodeOpcode::kPTStartTransaction: |
1975 | 111k | case TreeNodeOpcode::kPTCommit: |
1976 | 111k | break; |
1977 | 0 | default: |
1978 | 0 | LOG(FATAL) << "unexpected operation " << tnode->opcode(); |
1979 | 5.23M | } |
1980 | 5.24M | if (tnode->IsDml()) { |
1981 | 5.13M | ql_metrics_->time_to_execute_ql_query_->Increment(delta_usec); |
1982 | 5.13M | } |
1983 | 5.24M | } |
1984 | | |
1985 | | // If this is a batch returning status, keep the statement tnode with its ops so that we can |
1986 | | // return the row status when all statements in the batch finish. |
1987 | 5.26M | if (IsReturnsStatusBatch()) { |
1988 | 469 | tnode_itr++; |
1989 | 469 | continue; |
1990 | 469 | } |
1991 | | |
1992 | | // Move rows results and remove the statement tnode that has completed. |
1993 | 5.26M | RETURN_STMT_NOT_OK(AppendRowsResult(std::move(tnode_context.rows_result())), |
1994 | 5.26M | reset_async_calls); |
1995 | 5.26M | tnode_itr = tnode_contexts.erase(tnode_itr); |
1996 | 5.26M | } |
1997 | | |
1998 | | // If the current ExecContext is restarted, process it again. Otherwise, move to the next one. |
1999 | 5.22M | if (!exec_context_->restart()) { |
2000 | 5.19M | exec_itr++; |
2001 | 5.19M | } |
2002 | 5.22M | } |
2003 | | |
2004 | | // If there are buffered ops that need flushes, the flushes need to be rescheduled if this call |
2005 | | // hasn't been rescheduled. This is necessary because in an RF1 setup, the flush is a direct |
2006 | | // local call and the recursive FlushAsync and FlushAsyncDone calls for a full table scan can |
2007 | | // recurse too deeply hitting the stack limitiation. Rescheduling can also avoid occupying the |
2008 | | // RPC worker thread for too long starving other CQL calls waiting in the queue. If there is no |
2009 | | // buffered ops to flush, just call FlushAsync() to commit the transactions if any. |
2010 | | // |
2011 | | // When restart is required we will reexecute the whole operation with new transaction. |
2012 | | // Since restart could happen multiple times, it is possible that we will do it recursively, |
2013 | | // when local call is enabled. |
2014 | | // So to avoid stack overflow we use reschedule in this case. |
2015 | 4.95M | if ((need_flush || has_restart) && !rescheduled) { |
2016 | 0 | rescheduler_->Reschedule(&flush_async_task_.Bind(this, reset_async_calls)); |
2017 | 4.95M | } else { |
2018 | 4.95M | FlushAsync(reset_async_calls); |
2019 | 4.95M | } |
2020 | 4.95M | } |
2021 | | |
2022 | 5.32M | Result<bool> Executor::ProcessTnodeResults(TnodeContext* tnode_context) { |
2023 | 5.32M | bool has_buffered_ops = false; |
2024 | | |
2025 | | // Go through each op in a TnodeContext and process async results. |
2026 | 5.32M | const TreeNode *tnode = tnode_context->tnode(); |
2027 | 5.32M | auto& ops = tnode_context->ops(); |
2028 | 10.5M | for (auto op_itr = ops.begin(); op_itr != ops.end(); ) { |
2029 | 5.21M | YBqlOpPtr& op = *op_itr; |
2030 | | |
2031 | | // Apply any op that has not been applied and executed. |
2032 | 5.21M | if (!op->response().has_status()) { |
2033 | 35.7k | DCHECK_EQ(op->type(), YBOperation::Type::QL_WRITE); |
2034 | 35.7k | if (write_batch_.Add( |
2035 | 14.7k | std::static_pointer_cast<YBqlWriteOp>(op), tnode_context, exec_context_)) { |
2036 | 14.7k | YBSessionPtr session = GetSession(exec_context_); |
2037 | 14.7k | TRACE("Apply"); |
2038 | 14.7k | session->Apply(op); |
2039 | 14.7k | has_buffered_ops = true; |
2040 | 14.7k | } |
2041 | 35.7k | op_itr++; |
2042 | 35.7k | continue; |
2043 | 35.7k | } |
2044 | | |
2045 | | // If the statement is in a transaction, check the status of the current operation. If it |
2046 | | // failed to apply (either because of an execution error or unsatisfied IF condition), quit the |
2047 | | // execution and abort the transaction. Also, if this is a batch returning status, mark all |
2048 | | // other ops in this statement as done and clear the rows data to make sure only one status row |
2049 | | // is returned from this statement. |
2050 | | // |
2051 | | // Note: For an error response, we only get to this point if using 'RETURNS STATUS AS ROW'. |
2052 | | // Otherwise, ProcessAsyncResults() should have failed so we would have returned above already. |
2053 | 5.18M | if (exec_context_->HasTransaction() && |
2054 | 93.2k | (op->response().status() != QLResponsePB_QLStatus_YQL_STATUS_OK || |
2055 | 93.1k | (op->response().has_applied() && !op->response().applied()))) { |
2056 | 109 | exec_context_->AbortTransaction(); |
2057 | 109 | if (IsReturnsStatusBatch()) { |
2058 | 103 | RETURN_NOT_OK(ProcessTnodeContexts( |
2059 | 103 | exec_context_, |
2060 | 103 | [&op](TnodeContext* tnode_context) -> Result<bool> { |
2061 | 103 | for (auto& other : tnode_context->ops()) { |
2062 | 103 | if (other != op) { |
2063 | 103 | other->mutable_response()->set_status(QLResponsePB::YQL_STATUS_OK); |
2064 | 103 | other->mutable_rows_data()->clear(); |
2065 | 103 | } |
2066 | 103 | } |
2067 | 103 | return false; // not done |
2068 | 103 | })); |
2069 | 103 | } |
2070 | 109 | } |
2071 | | |
2072 | | // If the transaction is ready to commit, apply child transaction results if any. |
2073 | 5.18M | if (exec_context_->HasTransaction()) { |
2074 | 93.1k | if (tnode_context->HasPendingOperations()) { |
2075 | | // Defer the child transaction result applying till the last TNode operation finish. |
2076 | | // This prevents the incomplete operation deletion in the end of the loop. |
2077 | 0 | op_itr++; |
2078 | 0 | continue; |
2079 | 93.1k | } else { |
2080 | 93.1k | const QLResponsePB& response = op->response(); |
2081 | 93.1k | if (response.has_child_transaction_result()) { |
2082 | 14.1k | const auto& result = response.child_transaction_result(); |
2083 | 14.1k | const Status s = exec_context_->ApplyChildTransactionResult(result); |
2084 | | // If restart is needed, reset the current context and return immediately. |
2085 | 14.1k | if (ShouldRestart(s, rescheduler_)) { |
2086 | 0 | exec_context_->Reset(client::Restart::kTrue, rescheduler_); |
2087 | 0 | return false; |
2088 | 0 | } |
2089 | 14.1k | RETURN_NOT_OK(s); |
2090 | 14.1k | } |
2091 | 93.1k | } |
2092 | 93.1k | } |
2093 | | |
2094 | | // If this is a batch returning status, defer appending the row because we need to return the |
2095 | | // results in the user-given order when all statements in the batch finish. |
2096 | 5.18M | if (IsReturnsStatusBatch()) { |
2097 | 469 | op_itr++; |
2098 | 469 | continue; |
2099 | 469 | } |
2100 | | |
2101 | | // Append the rows if present. |
2102 | 5.18M | if (!op->rows_data().empty()) { |
2103 | 3.94M | SCHECK(!tnode->IsTopLevelReadNode() || tnode_context->query_state() != nullptr, |
2104 | 3.94M | Corruption, "Query state cannot be NULL for SELECT"); |
2105 | | // NOTE: Although it is odd to check for LIMIT counters before appending a new set of data |
2106 | | // instead of when counting rows during appending, it is safer to do it this way. |
2107 | | // - This function is processing callbacks from RPC whenever data is arrived from DocDB. |
2108 | | // - If the arriving rows exceed the LIMIT, all of them will still be passed to this function |
2109 | | // and must be blocked and rejected here before they are appended to tnode_context. |
2110 | 3.94M | if (tnode->IsTopLevelReadNode() && tnode_context->query_state()->reached_select_limit()) { |
2111 | | // We've reached the end of scan. Ignore the rest of the operators and results. |
2112 | 5 | RETURN_NOT_OK(tnode_context->ClearQueryState()); |
2113 | 5 | break; |
2114 | 3.94M | } |
2115 | 3.94M | RETURN_NOT_OK(tnode_context->AppendRowsResult(std::make_shared<RowsResult>(op.get()))); |
2116 | 3.94M | } |
2117 | | |
2118 | | // For SELECT statement, check if there are more rows to fetch and apply the op as needed. |
2119 | 5.18M | if (tnode->opcode() == TreeNodeOpcode::kPTSelectStmt) { |
2120 | 3.93M | const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode); |
2121 | | // Do this except for the parent SELECT with an index. For covered index, we will select |
2122 | | // from the index only. For uncovered index, the parent SELECT will fetch using the primary |
2123 | | // keys returned from below. |
2124 | 3.93M | if (!select_stmt->child_select()) { |
2125 | 3.92M | DCHECK_EQ(op->type(), YBOperation::Type::QL_READ); |
2126 | 3.92M | const auto& read_op = std::static_pointer_cast<YBqlReadOp>(op); |
2127 | 3.92M | if (VERIFY_RESULT(FetchMoreRows(select_stmt, read_op, tnode_context, exec_context_))) { |
2128 | 33.2k | op->mutable_response()->Clear(); |
2129 | 33.2k | TRACE("Apply"); |
2130 | 33.2k | session_->Apply(op); |
2131 | 33.2k | has_buffered_ops = true; |
2132 | 33.2k | op_itr++; |
2133 | 33.2k | continue; |
2134 | 33.2k | } |
2135 | 5.14M | } |
2136 | 3.93M | } |
2137 | | |
2138 | | // Remove the op that has completed. |
2139 | 5.14M | op_itr = ops.erase(op_itr); |
2140 | 5.14M | } |
2141 | | |
2142 | | // If there is a child context, process it. |
2143 | 5.32M | TnodeContext* child_context = tnode_context->child_context(); |
2144 | 5.32M | if (child_context != nullptr) { |
2145 | 3.63k | const TreeNode *child_tnode = child_context->tnode(); |
2146 | 3.63k | if (VERIFY_RESULT(ProcessTnodeResults(child_context))) { |
2147 | 1.51k | has_buffered_ops = true; |
2148 | 1.51k | } |
2149 | | |
2150 | | // If the child selects from an uncovered index, extract the primary keys returned and use them |
2151 | | // to select from the indexed table. |
2152 | 3.63k | RSTATUS_DCHECK_EQ(tnode->opcode(), TreeNodeOpcode::kPTSelectStmt, |
2153 | 3.63k | Corruption, "Expecting SELECT opcode"); |
2154 | 3.63k | RSTATUS_DCHECK_EQ(child_tnode->opcode(), TreeNodeOpcode::kPTSelectStmt, |
2155 | 3.63k | Corruption, "Expecting nested SELECT opcode"); |
2156 | 3.63k | RSTATUS_DCHECK(!static_cast<const PTSelectStmt *>(child_tnode)->index_id().empty(), |
2157 | 3.63k | Corruption, "Expecting valid index id"); |
2158 | | |
2159 | 3.63k | const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode); |
2160 | 3.63k | const auto* child_select = static_cast<const PTSelectStmt *>(child_tnode); |
2161 | | |
2162 | 3.63k | string& rows_data = child_context->rows_result()->rows_data(); |
2163 | 3.63k | if (!child_select->covers_fully() && !rows_data.empty()) { |
2164 | 1.52k | QLRowBlock* keys = tnode_context->keys(); |
2165 | 1.52k | keys->rows().clear(); |
2166 | 1.52k | Slice data(rows_data); |
2167 | 1.52k | RETURN_NOT_OK(keys->Deserialize(YQL_CLIENT_CQL, &data)); |
2168 | 1.52k | const YBqlReadOpPtr& select_op = tnode_context->uncovered_select_op(); |
2169 | 1.52k | if (VERIFY_RESULT(FetchRowsByKeys(select_stmt, select_op, *keys, tnode_context))) { |
2170 | 361 | has_buffered_ops = true; |
2171 | 361 | } |
2172 | 1.52k | rows_data.clear(); |
2173 | 1.52k | } |
2174 | | |
2175 | | // Finalize the execution. We will send this result to users, and they send us subsequent |
2176 | | // requests if the paging state is not empty. |
2177 | | // 1. Case no child: The result is already in the node. |
2178 | | // 1. Case fully_covered index: The result is in child_select node. |
2179 | | // 2. Case partially_covered index: |
2180 | | // - The result and row-counter are kept in parent node. |
2181 | | // - The paging state is in the child node. |
2182 | 3.63k | if (!tnode_context->HasPendingOperations() && !child_context->HasPendingOperations()) { |
2183 | 1.91k | RETURN_NOT_OK(tnode_context->ComposeRowsResultForUser(child_select, |
2184 | 1.91k | false /* for_new_batches */)); |
2185 | 1.91k | } |
2186 | 3.63k | } |
2187 | | |
2188 | 5.32M | return has_buffered_ops; |
2189 | 5.32M | } |
2190 | | |
2191 | | //-------------------------------------------------------------------------------------------------- |
2192 | | |
2193 | | namespace { |
2194 | | |
2195 | | // Check if index updates can be issued from CQL proxy directly when executing a DML. Only indexes |
2196 | | // that index primary key columns only may be updated from CQL proxy. |
2197 | 337 | bool UpdateIndexesLocally(const PTDmlStmt *tnode, const QLWriteRequestPB& req) { |
2198 | 337 | if (req.has_if_expr() || req.returns_status()) { |
2199 | 2 | return false; |
2200 | 2 | } |
2201 | | |
2202 | 335 | switch (req.type()) { |
2203 | | // For insert, the pk-only indexes can be updated from CQL proxy directly. |
2204 | 241 | case QLWriteRequestPB::QL_STMT_INSERT: |
2205 | 241 | return true; |
2206 | | |
2207 | | // For update, the pk-only indexes can be updated from CQL proxy directly only when not all |
2208 | | // columns are set to null. Otherwise, the row may be removed by the DML if it was created via |
2209 | | // update (i.e. no liveness column) and the remaining columns are already null. |
2210 | 66 | case QLWriteRequestPB::QL_STMT_UPDATE: { |
2211 | 68 | for (const auto& column_value : req.column_values()) { |
2212 | 68 | switch (column_value.expr().expr_case()) { |
2213 | 68 | case QLExpressionPB::ExprCase::kValue: |
2214 | 68 | if (!IsNull(column_value.expr().value())) { |
2215 | 60 | return true; |
2216 | 60 | } |
2217 | 8 | break; |
2218 | 0 | case QLExpressionPB::ExprCase::kColumnId: FALLTHROUGH_INTENDED; |
2219 | 0 | case QLExpressionPB::ExprCase::kSubscriptedCol: FALLTHROUGH_INTENDED; |
2220 | 0 | case QLExpressionPB::ExprCase::kJsonColumn: FALLTHROUGH_INTENDED; |
2221 | 0 | case QLExpressionPB::ExprCase::kBfcall: FALLTHROUGH_INTENDED; |
2222 | 0 | case QLExpressionPB::ExprCase::kTscall: FALLTHROUGH_INTENDED; |
2223 | 0 | case QLExpressionPB::ExprCase::kCondition: FALLTHROUGH_INTENDED; |
2224 | 0 | case QLExpressionPB::ExprCase::kBocall: FALLTHROUGH_INTENDED; |
2225 | 0 | case QLExpressionPB::ExprCase::kBindId: FALLTHROUGH_INTENDED; |
2226 | 0 | case QLExpressionPB::ExprCase::EXPR_NOT_SET: |
2227 | 0 | return false; |
2228 | 68 | } |
2229 | 68 | } |
2230 | 6 | return false; |
2231 | 66 | } |
2232 | | // For delete, the pk-only indexes can be updated from CQL proxy directly only if the whole |
2233 | | // is deleted and it is not a range delete. |
2234 | 28 | case QLWriteRequestPB::QL_STMT_DELETE: { |
2235 | 28 | const Schema& schema = tnode->table()->InternalSchema(); |
2236 | 28 | return (req.column_values().empty() && |
2237 | 12 | static_cast<size_t>(req.range_column_values_size()) == |
2238 | 12 | schema.num_range_key_columns()); |
2239 | 0 | } |
2240 | 0 | } |
2241 | 0 | return false; // Not feasible |
2242 | 0 | } |
2243 | | |
2244 | | } // namespace |
2245 | | |
2246 | | Status Executor::UpdateIndexes(const PTDmlStmt *tnode, |
2247 | | QLWriteRequestPB *req, |
2248 | 85.2k | TnodeContext* tnode_context) { |
2249 | | // DML with TTL is not allowed if indexes are present. |
2250 | 85.2k | if (req->has_ttl()) { |
2251 | 0 | return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_SUPPORTED); |
2252 | 0 | } |
2253 | | |
2254 | | // If updates of pk-only indexes can be issued from CQL proxy directly, do it. Otherwise, add |
2255 | | // them to the list of indexes to be updated from tserver. |
2256 | 85.2k | if (!tnode->pk_only_indexes().empty()) { |
2257 | 336 | if (UpdateIndexesLocally(tnode, *req)) { |
2258 | 307 | RETURN_NOT_OK(AddIndexWriteOps(tnode, *req, tnode_context)); |
2259 | 29 | } else { |
2260 | 146 | for (const auto& index : tnode->pk_only_indexes()) { |
2261 | 146 | req->add_update_index_ids(index->id()); |
2262 | 146 | } |
2263 | 29 | } |
2264 | 336 | } |
2265 | | |
2266 | | // Add non-pk-only indexes to the list of indexes to be updated from tserver also. |
2267 | 107k | for (const auto& index_id : tnode->non_pk_only_indexes()) { |
2268 | 107k | req->add_update_index_ids(index_id); |
2269 | 107k | } |
2270 | | |
2271 | | // For update/delete, check if it just deletes some columns. If so, add the rest columns to be |
2272 | | // read so that tserver can check if they are all null also. We require this information (whether |
2273 | | // all columns are null) for rows which don't have a liveness column - for such a row (i.e., |
2274 | | // without liveness column, if all columns are null, the row is as good as deleted. And in this |
2275 | | // case, the tserver will have to remove the corresponding index entries from indexes. |
2276 | 85.2k | if ((req->type() == QLWriteRequestPB::QL_STMT_UPDATE || |
2277 | 82.4k | req->type() == QLWriteRequestPB::QL_STMT_DELETE) && |
2278 | 2.62k | !req->column_values().empty()) { |
2279 | 2.37k | bool all_null = true; |
2280 | 2.37k | std::set<int32> column_dels; |
2281 | 2.37k | const Schema& schema = tnode->table()->InternalSchema(); |
2282 | 2.94k | for (const QLColumnValuePB& column_value : req->column_values()) { |
2283 | 2.94k | const ColumnSchema& col_desc = VERIFY_RESULT( |
2284 | 2.94k | schema.column_by_id(ColumnId(column_value.column_id()))); |
2285 | | |
2286 | 2.94k | if (column_value.has_expr() && |
2287 | 2.94k | column_value.expr().has_value() && |
2288 | 2.90k | !col_desc.is_static() && // Don't consider static column values. |
2289 | 2.90k | !IsNull(column_value.expr().value())) { |
2290 | 2.24k | all_null = false; |
2291 | 2.24k | break; |
2292 | 2.24k | } |
2293 | 700 | column_dels.insert(column_value.column_id()); |
2294 | 700 | } |
2295 | 2.37k | if (all_null) { |
2296 | | // Ensure all columns of row are read by docdb layer before performing the write operation. |
2297 | 123 | const MCSet<int32>& column_refs = tnode->column_refs(); |
2298 | 413 | for (size_t idx = schema.num_key_columns(); idx < schema.num_columns(); idx++) { |
2299 | 290 | const int32 column_id = schema.column_id(idx); |
2300 | 290 | if (!schema.column(idx).is_static() && |
2301 | 288 | column_refs.count(column_id) == 0 && // Add col only if not already in column_refs. |
2302 | 84 | column_dels.count(column_id) == 0) { |
2303 | | // If col is already in delete list, don't add it. This is okay because of the following |
2304 | | // reason. |
2305 | | // |
2306 | | // We reach here if we have - |
2307 | | // 1. an UPDATE statement with all = NULL type of set clauses |
2308 | | // 2. a DELETE statement on some cols. This is as good as setting those cols to NULL. |
2309 | | // |
2310 | | // If column is not in column_refs but already there in the column_dels list, we need |
2311 | | // not add it in column_refs because the IsRowDeleted() function in cql_operation.cc |
2312 | | // doesn't need to know if this column was NULL or not in the old/existing row. |
2313 | | // Since the new row has BULL for this column, the loop in the function "continue"s. |
2314 | | // |
2315 | | // Also, if a column is deleted/set to NULL, you might wonder why we don't add it to |
2316 | | // column_refs in case it is part of an index (in which case we need to delete the |
2317 | | // index entry for the old value). But this isn't an issue, because the column would |
2318 | | // already have been added to column_refs as part of AnalyzeIndexesForWrites(). |
2319 | 10 | req->mutable_column_refs()->add_ids(column_id); |
2320 | 10 | } |
2321 | 290 | } |
2322 | 123 | } |
2323 | 2.37k | } |
2324 | | |
2325 | 85.2k | if (!req->update_index_ids().empty() && tnode->RequiresTransaction()) { |
2326 | 82.0k | RETURN_NOT_OK(exec_context_->PrepareChildTransaction( |
2327 | 82.0k | rescheduler_->GetDeadline(), req->mutable_child_transaction_data())); |
2328 | 82.0k | } |
2329 | 85.2k | return Status::OK(); |
2330 | 85.2k | } |
2331 | | |
2332 | | // Add the write operations to update the pk-only indexes. |
2333 | | Status Executor::AddIndexWriteOps(const PTDmlStmt *tnode, |
2334 | | const QLWriteRequestPB& req, |
2335 | 306 | TnodeContext* tnode_context) { |
2336 | 306 | const Schema& schema = tnode->table()->InternalSchema(); |
2337 | 306 | const bool is_upsert = (req.type() == QLWriteRequestPB::QL_STMT_INSERT || |
2338 | 66 | req.type() == QLWriteRequestPB::QL_STMT_UPDATE); |
2339 | | // Populate a column-id to value map. |
2340 | 306 | std::unordered_map<ColumnId, const QLExpressionPB&> values; |
2341 | 920 | for (size_t i = 0; i < schema.num_hash_key_columns(); i++) { |
2342 | 614 | values.emplace(schema.column_id(i), req.hashed_column_values(narrow_cast<int>(i))); |
2343 | 614 | } |
2344 | 477 | for (size_t i = 0; i < schema.num_range_key_columns(); i++) { |
2345 | 171 | values.emplace(schema.column_id(schema.num_hash_key_columns() + i), |
2346 | 171 | req.range_column_values(narrow_cast<int>(i))); |
2347 | 171 | } |
2348 | 306 | if (is_upsert) { |
2349 | 425 | for (const auto& column_value : req.column_values()) { |
2350 | 425 | values.emplace(ColumnId(column_value.column_id()), column_value.expr()); |
2351 | 425 | } |
2352 | 302 | } |
2353 | | |
2354 | | // Create the write operation for each index and populate it using the original operation. |
2355 | | // CQL does not allow the primary key to be updated, so PK-only index rows will be either |
2356 | | // deleted when the row in the main table is deleted, or it will be inserted into the index |
2357 | | // when a row is inserted into the main table or updated (for a non-pk column). |
2358 | 598 | for (const auto& index_table : tnode->pk_only_indexes()) { |
2359 | 598 | const IndexInfo* index = |
2360 | 598 | VERIFY_RESULT(tnode->table()->index_map().FindIndex(index_table->id())); |
2361 | 567 | const bool index_ready_to_accept = (is_upsert ? index->HasWritePermission() |
2362 | 31 | : index->HasDeletePermission()); |
2363 | 598 | if (!index_ready_to_accept) { |
2364 | 0 | VLOG(2) << "Index not ready to apply operaton " << index->ToString(); |
2365 | | // We are in the process of backfilling the index. It should not be updated with a |
2366 | | // write/delete yet. The backfill stage will update the index for such entries. |
2367 | 60 | continue; |
2368 | 60 | } |
2369 | 538 | YBqlWriteOpPtr index_op(is_upsert ? index_table->NewQLInsert() : index_table->NewQLDelete()); |
2370 | 538 | index_op->set_writes_primary_row(true); |
2371 | 538 | QLWriteRequestPB *index_req = index_op->mutable_request(); |
2372 | 538 | index_req->set_request_id(req.request_id()); |
2373 | 538 | index_req->set_query_id(req.query_id()); |
2374 | 2.66k | for (size_t i = 0; i < index->columns().size(); i++) { |
2375 | 2.12k | const ColumnId indexed_column_id = index->column(i).indexed_column_id; |
2376 | 2.12k | if (i < index->hash_column_count()) { |
2377 | 655 | *index_req->add_hashed_column_values() = values.at(indexed_column_id); |
2378 | 1.47k | } else if (i < index->key_column_count()) { |
2379 | 1.11k | *index_req->add_range_column_values() = values.at(indexed_column_id); |
2380 | 360 | } else if (is_upsert) { |
2381 | 342 | const auto itr = values.find(indexed_column_id); |
2382 | 342 | if (itr != values.end()) { |
2383 | 262 | QLColumnValuePB* column_value = index_req->add_column_values(); |
2384 | 262 | column_value->set_column_id(index->column(i).column_id); |
2385 | 262 | *column_value->mutable_expr() = itr->second; |
2386 | 262 | } |
2387 | 342 | } |
2388 | 2.12k | } |
2389 | 538 | RETURN_NOT_OK(AddOperation(index_op, tnode_context)); |
2390 | 538 | } |
2391 | | |
2392 | 306 | return Status::OK(); |
2393 | 306 | } |
2394 | | |
2395 | | //-------------------------------------------------------------------------------------------------- |
2396 | | |
2397 | | bool Executor::WriteBatch::Add(const YBqlWriteOpPtr& op, |
2398 | | const TnodeContext* tnode_context, |
2399 | 1.33M | ExecContext* exec_context) { |
2400 | 1.33M | if (FLAGS_ycql_serial_operation_in_transaction_block && |
2401 | | // Inside BEGIN TRANSACTION; ... END TRANSACTION; |
2402 | 1.33M | exec_context && exec_context->HasTransaction()) { |
2403 | 186k | bool allow_parallel_exec = false; // Always False for the main table. |
2404 | | |
2405 | | // Check if the index-update can be executed in parallel - only |
2406 | | // when the main table update is complete or started. |
2407 | | // The first op in TNode is treated as the main table operation. |
2408 | | // size = 1 means this (usually main table) 'op' is the first and one only op now. |
2409 | | // size > 1 means main op + a set of secondary index operations. |
2410 | 186k | if (tnode_context->ops().size() > 1) { |
2411 | 901 | const client::YBqlOpPtr main_tbl_op = tnode_context->ops()[0]; |
2412 | | // If the main table operation is complete or just started. |
2413 | 901 | allow_parallel_exec = main_tbl_op->response().has_status() || |
2414 | 900 | exec_context->transactional_session()->IsInProgress(main_tbl_op); |
2415 | 901 | } |
2416 | | |
2417 | 186k | if (!allow_parallel_exec) { |
2418 | 186k | if (Empty()) { |
2419 | 149k | ops_by_primary_key_.insert(op); |
2420 | 149k | return true; // Start first write op execution. |
2421 | 149k | } |
2422 | 36.6k | return false; |
2423 | 36.6k | } |
2424 | 186k | } |
2425 | | |
2426 | | // Checks if the write operation reads the primary/static row and if another operation that writes |
2427 | | // the primary/static row by the same primary/hash key already exists. |
2428 | 1.14M | if ((op->ReadsPrimaryRow() && ops_by_primary_key_.count(op) > 0) || |
2429 | 1.14M | (op->ReadsStaticRow() && ops_by_hash_key_.count(op) > 0)) { |
2430 | 10 | return false; |
2431 | 10 | } |
2432 | | |
2433 | 1.14M | if (op->WritesPrimaryRow()) { ops_by_primary_key_.insert(op); } |
2434 | 1.14M | if (op->WritesStaticRow()) { ops_by_hash_key_.insert(op); } |
2435 | 1.14M | return true; |
2436 | 1.14M | } |
2437 | | |
2438 | 14.2M | void Executor::WriteBatch::Clear() { |
2439 | 14.2M | ops_by_primary_key_.clear(); |
2440 | 14.2M | ops_by_hash_key_.clear(); |
2441 | 14.2M | } |
2442 | | |
2443 | 4.09M | bool Executor::WriteBatch::Empty() const { |
2444 | 4.09M | return ops_by_primary_key_.empty() && ops_by_hash_key_.empty(); |
2445 | 4.09M | } |
2446 | | |
2447 | | //-------------------------------------------------------------------------------------------------- |
2448 | | |
2449 | 3.88M | void Executor::AddOperation(const YBqlReadOpPtr& op, TnodeContext *tnode_context) { |
2450 | 22.4k | DCHECK(write_batch_.Empty()) << "Concurrent read and write operations not supported yet"; |
2451 | | |
2452 | 3.88M | op->mutable_request()->set_request_id(exec_context_->params().request_id()); |
2453 | 3.88M | tnode_context->AddOperation(op); |
2454 | | |
2455 | | // We need consistent read point if statement is executed in multiple RPC commands. |
2456 | 3.88M | if (tnode_context->UnreadPartitionsRemaining() > 0 || |
2457 | 3.89M | op->request().hashed_column_values().empty()) { |
2458 | 178k | session_->SetForceConsistentRead(client::ForceConsistentRead::kTrue); |
2459 | 178k | } |
2460 | | |
2461 | 3.88M | TRACE("Apply"); |
2462 | 3.88M | session_->Apply(op); |
2463 | 3.88M | } |
2464 | | |
2465 | 1.29M | Status Executor::AddOperation(const YBqlWriteOpPtr& op, TnodeContext* tnode_context) { |
2466 | 1.29M | tnode_context->AddOperation(op); |
2467 | | |
2468 | | // Check for inter-dependency in the current write batch before applying the write operation. |
2469 | | // Apply it in the transactional session in exec_context for the current statement if there is |
2470 | | // one. Otherwise, apply to the non-transactional session in the executor. |
2471 | 1.29M | if (write_batch_.Add(op, tnode_context, exec_context_)) { |
2472 | 1.28M | YBSessionPtr session = GetSession(exec_context_); |
2473 | 1.28M | TRACE("Apply"); |
2474 | 1.28M | session->Apply(op); |
2475 | 1.28M | } |
2476 | | |
2477 | | // Also update secondary indexes if needed. |
2478 | 1.29M | if (op->table()->index_map().empty()) { |
2479 | 1.21M | return Status::OK(); |
2480 | 1.21M | } |
2481 | 84.4k | const auto* dml_stmt = static_cast<const PTDmlStmt*>(tnode_context->tnode()); |
2482 | 84.4k | return UpdateIndexes(dml_stmt, op->mutable_request(), tnode_context); |
2483 | 84.4k | } |
2484 | | |
2485 | | //-------------------------------------------------------------------------------------------------- |
2486 | | |
2487 | 10.2M | Status Executor::ProcessStatementStatus(const ParseTree& parse_tree, const Status& s) { |
2488 | 10.2M | if (PREDICT_FALSE(!s.ok() && s.IsQLError() && !parse_tree.reparsed())) { |
2489 | | // If execution fails because the statement was analyzed with stale metadata cache, the |
2490 | | // statement needs to be reparsed and re-analyzed. Symptoms of stale metadata are as listed |
2491 | | // below. Expand the list in future as new cases arise. |
2492 | | // - TABLET_NOT_FOUND when the tserver fails to execute the YBQLOp because the tablet is not |
2493 | | // found (ENG-945). |
2494 | | // - WRONG_METADATA_VERSION when the schema version the tablet holds is different from the one |
2495 | | // used by the semantic analyzer. |
2496 | | // - INVALID_TABLE_DEFINITION when a referenced user-defined type is not found. |
2497 | | // - INVALID_ARGUMENTS when the column datatype is inconsistent with the supplied value in an |
2498 | | // INSERT or UPDATE statement. |
2499 | 4.76k | const ErrorCode errcode = GetErrorCode(s); |
2500 | 4.76k | if (errcode == ErrorCode::TABLET_NOT_FOUND || |
2501 | 4.36k | errcode == ErrorCode::WRONG_METADATA_VERSION || |
2502 | 2.69k | errcode == ErrorCode::INVALID_TABLE_DEFINITION || |
2503 | 2.69k | errcode == ErrorCode::INVALID_TYPE_DEFINITION || |
2504 | 2.69k | errcode == ErrorCode::INVALID_ARGUMENTS || |
2505 | 2.58k | errcode == ErrorCode::OBJECT_NOT_FOUND || |
2506 | 2.58k | errcode == ErrorCode::TYPE_NOT_FOUND) { |
2507 | 2.19k | if (errcode == ErrorCode::INVALID_ARGUMENTS) { |
2508 | | // Check the table schema is up-to-date. |
2509 | 110 | const shared_ptr<client::YBTable> table = GetTableFromStatement(parse_tree.root().get()); |
2510 | 110 | if (table) { |
2511 | 110 | const uint32_t current_schema_ver = table->schema().version(); |
2512 | 110 | uint32_t updated_schema_ver = 0; |
2513 | 110 | const Status s_get_schema = ql_env_->GetUpToDateTableSchemaVersion( |
2514 | 110 | table->name(), &updated_schema_ver); |
2515 | | |
2516 | 110 | if (s_get_schema.ok() && updated_schema_ver == current_schema_ver) { |
2517 | 102 | return s; // Do not retry via STALE_METADATA code if the table schema is up-to-date. |
2518 | 102 | } |
2519 | 2.08k | } |
2520 | 110 | } |
2521 | | |
2522 | 2.08k | parse_tree.ClearAnalyzedTableCache(ql_env_); |
2523 | 2.08k | parse_tree.ClearAnalyzedUDTypeCache(ql_env_); |
2524 | 2.08k | parse_tree.set_stale(); |
2525 | 2.08k | return ErrorStatus(ErrorCode::STALE_METADATA); |
2526 | 2.08k | } |
2527 | 4.76k | } |
2528 | 10.2M | return s; |
2529 | 10.2M | } |
2530 | | |
2531 | | Status Executor::ProcessOpStatus(const PTDmlStmt* stmt, |
2532 | | const YBqlOpPtr& op, |
2533 | 5.19M | ExecContext* exec_context) { |
2534 | 5.19M | const QLResponsePB &resp = op->response(); |
2535 | | // Returns if this op was deferred and has not been completed, or it has been completed okay. |
2536 | 5.19M | if (!resp.has_status() || resp.status() == QLResponsePB::YQL_STATUS_OK) { |
2537 | 5.17M | return Status::OK(); |
2538 | 5.17M | } |
2539 | | |
2540 | 20.9k | if (resp.status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { |
2541 | 33 | auto s = STATUS(TryAgain, resp.error_message()); |
2542 | 33 | RETURN_NOT_OK(audit_logger_.LogStatementError(stmt, exec_context_->stmt(), s, |
2543 | 33 | ErrorIsFormatted::kFalse)); |
2544 | 33 | return s; |
2545 | 20.8k | } |
2546 | | |
2547 | | // If we got an error we need to manually produce a result in the op. |
2548 | 20.8k | if (stmt->IsWriteOp() && stmt->returns_status()) { |
2549 | 109 | std::vector<ColumnSchema> columns; |
2550 | 109 | const auto& schema = stmt->table()->schema(); |
2551 | 109 | columns.reserve(stmt->table()->schema().num_columns() + 2); |
2552 | 109 | columns.emplace_back("[applied]", DataType::BOOL); |
2553 | 109 | columns.emplace_back("[message]", DataType::STRING); |
2554 | 109 | columns.insert(columns.end(), schema.columns().begin(), schema.columns().end()); |
2555 | 109 | auto* column_schemas = op->mutable_response()->mutable_column_schemas(); |
2556 | 109 | column_schemas->Clear(); |
2557 | 454 | for (const auto& column : columns) { |
2558 | 454 | ColumnSchemaToPB(column, column_schemas->Add()); |
2559 | 454 | } |
2560 | | |
2561 | 109 | QLRowBlock result_row_block(Schema(columns, 0)); |
2562 | 109 | QLRow& row = result_row_block.Extend(); |
2563 | 109 | row.mutable_column(0)->set_bool_value(false); |
2564 | 109 | row.mutable_column(1)->set_string_value(resp.error_message()); |
2565 | | // Leave the rest of the columns null in this case. |
2566 | | |
2567 | 109 | faststring row_data; |
2568 | 109 | result_row_block.Serialize(YQL_CLIENT_CQL, &row_data); |
2569 | 109 | *op->mutable_rows_data() = row_data.ToString(); |
2570 | 109 | return Status::OK(); |
2571 | 109 | } |
2572 | | |
2573 | 20.7k | const ErrorCode errcode = QLStatusToErrorCode(resp.status()); |
2574 | 20.7k | auto s = exec_context->Error(stmt, resp.error_message().c_str(), errcode); |
2575 | 20.7k | RETURN_NOT_OK(audit_logger_.LogStatementError(stmt, exec_context_->stmt(), s, |
2576 | 20.7k | ErrorIsFormatted::kTrue)); |
2577 | 20.7k | return s; |
2578 | 20.7k | } |
2579 | | |
2580 | 5.20M | Status Executor::ProcessAsyncStatus(const OpErrors& op_errors, ExecContext* exec_context) { |
2581 | 5.20M | return ProcessTnodeContexts( |
2582 | 5.20M | exec_context, |
2583 | 5.36M | [this, exec_context, &op_errors](TnodeContext* tnode_context) -> Result<bool> { |
2584 | 5.36M | const TreeNode* tnode = tnode_context->tnode(); |
2585 | 5.23M | for (auto& op : tnode_context->ops()) { |
2586 | 5.23M | Status s; |
2587 | 5.23M | const auto itr = op_errors.find(op.get()); |
2588 | 5.23M | if (itr != op_errors.end()) { |
2589 | 56.9k | s = itr->second; |
2590 | 56.9k | } |
2591 | 5.23M | if (PREDICT_FALSE(!s.ok() && !NeedsRestart(s))) { |
2592 | | // YBOperation returns not-found error when the tablet is not found. |
2593 | 401 | const auto errcode = s.IsNotFound() ? ErrorCode::TABLET_NOT_FOUND |
2594 | 65 | : ErrorCode::EXEC_ERROR; |
2595 | 466 | s = exec_context->Error(tnode, s, errcode); |
2596 | 466 | } |
2597 | 5.23M | if (s.ok()) { |
2598 | 15.4k | DCHECK(tnode->IsDml()) << "Only DML should issue a read/write operation"; |
2599 | 5.18M | s = ProcessOpStatus(static_cast<const PTDmlStmt *>(tnode), op, exec_context); |
2600 | 5.18M | } |
2601 | 5.23M | if (ShouldRestart(s, rescheduler_)) { |
2602 | 56.6k | exec_context->Reset(client::Restart::kTrue, rescheduler_); |
2603 | 56.6k | return true; // done |
2604 | 56.6k | } |
2605 | 5.18M | RETURN_NOT_OK(ProcessStatementStatus(exec_context->parse_tree(), s)); |
2606 | 5.18M | } |
2607 | 5.30M | return false; // not done |
2608 | 5.36M | }); |
2609 | 5.20M | } |
2610 | | |
2611 | 5.25M | Status Executor::AppendRowsResult(RowsResult::SharedPtr&& rows_result) { |
2612 | 5.25M | if (!rows_result) { |
2613 | 1.34M | return Status::OK(); |
2614 | 1.34M | } |
2615 | 3.90M | if (!result_) { |
2616 | 3.89M | result_ = std::move(rows_result); |
2617 | 3.89M | return Status::OK(); |
2618 | 3.89M | } |
2619 | 1.99k | CHECK(result_->type() == ExecutedResult::Type::ROWS); |
2620 | 1.99k | return std::static_pointer_cast<RowsResult>(result_)->Append(std::move(*rows_result)); |
2621 | 1.99k | } |
2622 | | |
2623 | 4.70M | void Executor::StatementExecuted(const Status& s, ResetAsyncCalls* reset_async_calls) { |
2624 | | // Update metrics for all statements executed. |
2625 | 4.70M | if (s.ok() && ql_metrics_ != nullptr) { |
2626 | 5.08M | for (auto& exec_context : exec_contexts_) { |
2627 | 15.8k | for (auto& tnode_context : exec_context.tnode_contexts()) { |
2628 | 15.8k | const TreeNode* tnode = tnode_context.tnode(); |
2629 | 15.8k | if (tnode != nullptr) { |
2630 | 15.8k | switch (tnode->opcode()) { |
2631 | 4 | case TreeNodeOpcode::kPTSelectStmt: FALLTHROUGH_INTENDED; |
2632 | 162 | case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED; |
2633 | 176 | case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED; |
2634 | 176 | case TreeNodeOpcode::kPTDeleteStmt: FALLTHROUGH_INTENDED; |
2635 | 4.35k | case TreeNodeOpcode::kPTUseKeyspace: FALLTHROUGH_INTENDED; |
2636 | 4.35k | case TreeNodeOpcode::kPTListNode: FALLTHROUGH_INTENDED; |
2637 | 4.35k | case TreeNodeOpcode::kPTStartTransaction: FALLTHROUGH_INTENDED; |
2638 | 4.35k | case TreeNodeOpcode::kPTCommit: |
2639 | | // The metrics for SELECT/INSERT/UPDATE/DELETE have been updated when the ops have |
2640 | | // been completed in FlushAsyncDone(). Exclude PTListNode also as we are interested |
2641 | | // in the metrics of its constituent DMLs only. Transaction metrics have been |
2642 | | // updated in CommitDone(). |
2643 | | // The metrics for USE have been updated in ExecPTNode(). |
2644 | 4.35k | break; |
2645 | 11.5k | default: { |
2646 | 11.5k | const MonoTime now = MonoTime::Now(); |
2647 | 11.5k | const auto delta_usec = (now - tnode_context.start_time()).ToMicroseconds(); |
2648 | 11.5k | ql_metrics_->ql_others_->Increment(delta_usec); |
2649 | 11.5k | ql_metrics_->time_to_execute_ql_query_->Increment(delta_usec); |
2650 | 11.5k | break; |
2651 | 4.35k | } |
2652 | 15.8k | } |
2653 | 15.8k | } |
2654 | 15.8k | } |
2655 | 5.08M | ql_metrics_->num_retries_to_execute_ql_->Increment(exec_context.num_retries()); |
2656 | 5.08M | } |
2657 | 4.69M | ql_metrics_->num_flushes_to_execute_ql_->Increment(num_flushes_); |
2658 | 4.69M | } |
2659 | | |
2660 | | // Clean up and invoke statement-executed callback. |
2661 | 4.70M | ExecutedResult::SharedPtr result = s.ok() ? std::move(result_) : nullptr; |
2662 | 4.70M | StatementExecutedCallback cb = std::move(cb_); |
2663 | 4.70M | Reset(reset_async_calls); |
2664 | 4.70M | cb.Run(s, result); |
2665 | 4.70M | } |
2666 | | |
2667 | 4.71M | void Executor::Reset(ResetAsyncCalls* reset_async_calls) { |
2668 | 4.71M | exec_context_ = nullptr; |
2669 | 4.71M | exec_contexts_.clear(); |
2670 | 4.71M | write_batch_.Clear(); |
2671 | 4.71M | session_->Abort(); |
2672 | 4.71M | num_flushes_ = 0; |
2673 | 4.71M | result_ = nullptr; |
2674 | 4.71M | cb_.Reset(); |
2675 | 4.71M | returns_status_batch_opt_ = boost::none; |
2676 | 4.71M | reset_async_calls->Perform(); |
2677 | 4.71M | } |
2678 | | |
2679 | 3.89M | QLExpressionPB* CreateQLExpression(QLWriteRequestPB *req, const ColumnDesc& col_desc) { |
2680 | 3.89M | if (col_desc.is_hash()) { |
2681 | 1.35M | return req->add_hashed_column_values(); |
2682 | 2.53M | } else if (col_desc.is_primary()) { |
2683 | 1.17M | return req->add_range_column_values(); |
2684 | 1.35M | } else { |
2685 | 1.35M | QLColumnValuePB *col_pb = req->add_column_values(); |
2686 | 1.35M | col_pb->set_column_id(col_desc.id()); |
2687 | 1.35M | return col_pb->mutable_expr(); |
2688 | 1.35M | } |
2689 | 3.89M | } |
2690 | | |
2691 | | Executor::ExecutorTask& Executor::ExecutorTask::Bind( |
2692 | 4.88M | Executor* executor, Executor::ResetAsyncCalls* reset_async_calls) { |
2693 | 4.88M | executor_ = executor; |
2694 | 4.88M | reset_async_calls_ = std::move(*reset_async_calls); |
2695 | 4.88M | return *this; |
2696 | 4.88M | } |
2697 | | |
2698 | 4.92M | void Executor::ExecutorTask::Run() { |
2699 | 4.92M | auto executor = executor_; |
2700 | 4.92M | executor_ = nullptr; |
2701 | 4.92M | DoRun(executor, &reset_async_calls_); |
2702 | 4.92M | } |
2703 | | |
2704 | 4.92M | void Executor::ExecutorTask::Done(const Status& status) { |
2705 | 4.92M | if (!status.ok()) { |
2706 | 0 | reset_async_calls_.Perform(); |
2707 | 0 | } |
2708 | 4.92M | } |
2709 | | |
2710 | | Executor::ResetAsyncCalls::ResetAsyncCalls(std::atomic<int64_t>* num_async_calls) |
2711 | 9.65M | : num_async_calls_(num_async_calls) { |
2712 | 4.32k | LOG_IF(DFATAL, num_async_calls && num_async_calls->load(std::memory_order_acquire)) |
2713 | 4.32k | << "Expected 0 async calls, but have: " << num_async_calls->load(std::memory_order_acquire); |
2714 | 9.65M | } |
2715 | | |
2716 | | Executor::ResetAsyncCalls::ResetAsyncCalls(ResetAsyncCalls&& rhs) |
2717 | 0 | : num_async_calls_(rhs.num_async_calls_) { |
2718 | 0 | rhs.num_async_calls_ = nullptr; |
2719 | 0 | LOG_IF(DFATAL, num_async_calls_ && num_async_calls_->load(std::memory_order_acquire)) |
2720 | 0 | << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire); |
2721 | 0 | } |
2722 | | |
2723 | 4.89M | void Executor::ResetAsyncCalls::operator=(ResetAsyncCalls&& rhs) { |
2724 | 4.89M | Perform(); |
2725 | 4.89M | num_async_calls_ = rhs.num_async_calls_; |
2726 | 4.89M | rhs.num_async_calls_ = nullptr; |
2727 | 4.83k | LOG_IF(DFATAL, num_async_calls_ && num_async_calls_->load(std::memory_order_acquire)) |
2728 | 4.83k | << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire); |
2729 | 4.89M | } |
2730 | | |
2731 | 4.90M | void Executor::ResetAsyncCalls::Cancel() { |
2732 | 4.90M | num_async_calls_ = nullptr; |
2733 | 4.90M | } |
2734 | | |
2735 | 9.62M | Executor::ResetAsyncCalls::~ResetAsyncCalls() { |
2736 | 9.62M | Perform(); |
2737 | 9.62M | } |
2738 | | |
2739 | 19.2M | void Executor::ResetAsyncCalls::Perform() { |
2740 | 19.2M | if (!num_async_calls_) { |
2741 | 14.5M | return; |
2742 | 14.5M | } |
2743 | | |
2744 | 18.4E | LOG_IF(DFATAL, num_async_calls_->load(std::memory_order_acquire)) |
2745 | 18.4E | << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire); |
2746 | 4.70M | num_async_calls_->store(kAsyncCallsIdle, std::memory_order_release); |
2747 | 4.70M | num_async_calls_ = nullptr; |
2748 | 4.70M | } |
2749 | | |
2750 | | } // namespace ql |
2751 | | } // namespace yb |