YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/executor.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/cql/ql/exec/executor.h"
17
18
#include "yb/client/callbacks.h"
19
#include "yb/client/client.h"
20
#include "yb/client/error.h"
21
#include "yb/client/rejection_score_source.h"
22
#include "yb/client/table.h"
23
#include "yb/client/table_alterer.h"
24
#include "yb/client/table_creator.h"
25
#include "yb/client/yb_op.h"
26
27
#include "yb/common/common.pb.h"
28
#include "yb/common/consistent_read_point.h"
29
#include "yb/common/index.h"
30
#include "yb/common/index_column.h"
31
#include "yb/common/ql_protocol_util.h"
32
#include "yb/common/ql_rowblock.h"
33
#include "yb/common/ql_value.h"
34
#include "yb/common/schema.h"
35
#include "yb/common/wire_protocol.h"
36
37
#include "yb/gutil/casts.h"
38
39
#include "yb/rpc/thread_pool.h"
40
41
#include "yb/util/decimal.h"
42
#include "yb/util/metrics.h"
43
#include "yb/util/random_util.h"
44
#include "yb/util/result.h"
45
#include "yb/util/status_format.h"
46
#include "yb/util/trace.h"
47
48
#include "yb/yql/cql/ql/exec/exec_context.h"
49
#include "yb/yql/cql/ql/ptree/column_desc.h"
50
#include "yb/yql/cql/ql/ptree/parse_tree.h"
51
#include "yb/yql/cql/ql/ptree/pt_alter_keyspace.h"
52
#include "yb/yql/cql/ql/ptree/pt_alter_role.h"
53
#include "yb/yql/cql/ql/ptree/pt_alter_table.h"
54
#include "yb/yql/cql/ql/ptree/pt_column_definition.h"
55
#include "yb/yql/cql/ql/ptree/pt_create_index.h"
56
#include "yb/yql/cql/ql/ptree/pt_create_keyspace.h"
57
#include "yb/yql/cql/ql/ptree/pt_create_role.h"
58
#include "yb/yql/cql/ql/ptree/pt_create_table.h"
59
#include "yb/yql/cql/ql/ptree/pt_create_type.h"
60
#include "yb/yql/cql/ql/ptree/pt_delete.h"
61
#include "yb/yql/cql/ql/ptree/pt_drop.h"
62
#include "yb/yql/cql/ql/ptree/pt_explain.h"
63
#include "yb/yql/cql/ql/ptree/pt_expr.h"
64
#include "yb/yql/cql/ql/ptree/pt_grant_revoke.h"
65
#include "yb/yql/cql/ql/ptree/pt_insert.h"
66
#include "yb/yql/cql/ql/ptree/pt_insert_json_clause.h"
67
#include "yb/yql/cql/ql/ptree/pt_transaction.h"
68
#include "yb/yql/cql/ql/ptree/pt_truncate.h"
69
#include "yb/yql/cql/ql/ptree/pt_update.h"
70
#include "yb/yql/cql/ql/ptree/pt_use_keyspace.h"
71
#include "yb/yql/cql/ql/ql_processor.h"
72
#include "yb/yql/cql/ql/util/errcodes.h"
73
74
using namespace std::literals;
75
using namespace std::placeholders;
76
77
namespace yb {
78
namespace ql {
79
80
using std::string;
81
using std::shared_ptr;
82
83
using audit::AuditLogger;
84
using audit::IsPrepare;
85
using audit::ErrorIsFormatted;
86
using client::YBColumnSpec;
87
using client::YBOperation;
88
using client::YBqlOpPtr;
89
using client::YBqlReadOp;
90
using client::YBqlReadOpPtr;
91
using client::YBqlWriteOp;
92
using client::YBqlWriteOpPtr;
93
using client::YBSchema;
94
using client::YBSchemaBuilder;
95
using client::YBSessionPtr;
96
using client::YBTableAlterer;
97
using client::YBTableCreator;
98
using client::YBTableName;
99
using client::YBTableType;
100
using strings::Substitute;
101
102
34.8M
#define RETURN_STMT_NOT_OK(s, reset_async_calls) do {                      \
103
1.42k
    auto&& _s = (s);                                                       \
104
24.6M
    if (PREDICT_FALSE(!_s.ok())) {                                         \
105
4.42k
      return StatementExecuted(MoveStatus(_s), (reset_async_calls)); }     \
106
24.6M
    } while (false)
107
108
//--------------------------------------------------------------------------------------------------
109
DEFINE_bool(ycql_serial_operation_in_transaction_block, true,
110
            "If true, operations within a transaction block must be executed in order, "
111
            "at least semantically speaking.");
112
113
extern ErrorCode QLStatusToErrorCode(QLResponsePB::QLStatus status);
114
115
Executor::Executor(QLEnv* ql_env, AuditLogger* audit_logger, Rescheduler* rescheduler,
116
                   const QLMetrics* ql_metrics)
117
    : ql_env_(ql_env),
118
      audit_logger_(*audit_logger),
119
      rescheduler_(rescheduler),
120
      session_(ql_env_->NewSession()),
121
17.1k
      ql_metrics_(ql_metrics) {
122
17.1k
}
123
124
1
Executor::~Executor() {
125
0
  LOG_IF(DFATAL, HasAsyncCalls())
126
0
      << "Async calls still running: " << num_async_calls();
127
1
}
128
129
0
void Executor::Shutdown() {
130
0
  int counter = 0;
131
0
  while (HasAsyncCalls()) {
132
0
    if (++counter == 1000) {
133
0
      LOG(DFATAL) << "Too long Executor shutdown: " << num_async_calls();
134
0
    }
135
0
    std::this_thread::sleep_for(10ms);
136
0
  }
137
0
}
138
139
//--------------------------------------------------------------------------------------------------
140
141
4.71M
bool Executor::HasAsyncCalls() {
142
4.71M
  return num_async_calls() != kAsyncCallsIdle;
143
4.71M
}
144
145
4.71M
Executor::ResetAsyncCalls Executor::PrepareExecuteAsync() {
146
1.62k
  LOG_IF(DFATAL, !cb_.is_null()) << __func__ << " while another execution is in progress.";
147
2.27k
  LOG_IF(DFATAL, HasAsyncCalls())
148
2.27k
      << __func__ << " while have " << num_async_calls() << " async calls running";
149
4.71M
  num_async_calls_.store(0, std::memory_order_release);
150
4.71M
  return ResetAsyncCalls(&num_async_calls_);
151
4.71M
}
152
153
void Executor::ExecuteAsync(const ParseTree& parse_tree, const StatementParameters& params,
154
4.71M
                            StatementExecutedCallback cb) {
155
4.71M
  auto reset_async_calls = PrepareExecuteAsync();
156
4.71M
  cb_ = std::move(cb);
157
4.71M
  session_->SetDeadline(rescheduler_->GetDeadline());
158
4.71M
  session_->SetForceConsistentRead(client::ForceConsistentRead::kFalse);
159
4.71M
  auto read_time = params.read_time();
160
4.71M
  if (read_time) {
161
276
    session_->SetReadPoint(read_time);
162
4.71M
  } else {
163
4.71M
    session_->SetReadPoint(client::Restart::kFalse);
164
4.71M
  }
165
4.71M
  RETURN_STMT_NOT_OK(Execute(parse_tree, params), &reset_async_calls);
166
167
4.71M
  FlushAsync(&reset_async_calls);
168
4.71M
}
169
170
1.65k
void Executor::ExecuteAsync(const StatementBatch& batch, StatementExecutedCallback cb) {
171
1.65k
  auto reset_async_calls = PrepareExecuteAsync();
172
173
1.65k
  cb_ = std::move(cb);
174
1.65k
  session_->SetDeadline(rescheduler_->GetDeadline());
175
1.65k
  session_->SetForceConsistentRead(client::ForceConsistentRead::kFalse);
176
1.65k
  session_->SetReadPoint(client::Restart::kFalse);
177
178
  // Table for DML batches, where all statements must modify the same table.
179
1.65k
  client::YBTablePtr dml_batch_table;
180
181
  // Verify the statements in the batch.
182
389k
  for (const auto& pair : batch) {
183
389k
    const ParseTree& parse_tree = pair.first;
184
389k
    const TreeNode* tnode = parse_tree.root().get();
185
389k
    if (tnode != nullptr) {
186
389k
      switch (tnode->opcode()) {
187
389k
        case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED;
188
389k
        case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED;
189
389k
        case TreeNodeOpcode::kPTDeleteStmt: {
190
389k
          const auto *stmt = static_cast<const PTDmlStmt *>(tnode);
191
389k
          if (stmt->if_clause() != nullptr && !stmt->returns_status()) {
192
1
            return StatementExecuted(
193
1
                ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID,
194
1
                            "batch execution of conditional DML statement without RETURNS STATUS "
195
1
                            "AS ROW clause is not supported yet"),
196
1
                &reset_async_calls);
197
1
          }
198
199
389k
          if (stmt->ModifiesMultipleRows()) {
200
1
            return StatementExecuted(
201
1
                ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID,
202
1
                            "batch execution with DML statements modifying multiple rows is not "
203
1
                            "supported yet"),
204
1
                &reset_async_calls);
205
1
          }
206
207
389k
          if (!returns_status_batch_opt_) {
208
1.65k
            returns_status_batch_opt_ = stmt->returns_status();
209
387k
          } else if (stmt->returns_status() != *returns_status_batch_opt_) {
210
1
            return StatementExecuted(
211
1
                ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID,
212
1
                            "batch execution mixing statements with and without RETURNS STATUS "
213
1
                            "AS ROW is not supported"),
214
1
                &reset_async_calls);
215
1
          }
216
217
389k
          if (*returns_status_batch_opt_) {
218
162
            if (dml_batch_table == nullptr) {
219
55
              dml_batch_table = stmt->table();
220
107
            } else if (dml_batch_table->id() != stmt->table()->id()) {
221
1
              return StatementExecuted(
222
1
                  ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID,
223
1
                              "batch execution with RETURNS STATUS statements cannot span multiple "
224
1
                              "tables"),
225
1
                  &reset_async_calls);
226
1
            }
227
389k
          }
228
229
389k
          break;
230
389k
        }
231
0
        default:
232
0
          return StatementExecuted(
233
0
              ErrorStatus(ErrorCode::CQL_STATEMENT_INVALID,
234
0
                          "batch execution supports INSERT, UPDATE and DELETE statements only "
235
0
                          "currently"),
236
0
              &reset_async_calls);
237
0
          break;
238
389k
      }
239
389k
    }
240
389k
  }
241
242
389k
  for (const auto& pair : batch) {
243
389k
    const ParseTree& parse_tree = pair.first;
244
389k
    const StatementParameters& params = pair.second;
245
389k
    RETURN_STMT_NOT_OK(Execute(parse_tree, params), &reset_async_calls);
246
389k
  }
247
248
1.65k
  RETURN_STMT_NOT_OK(audit_logger_.EndBatchRequest(), &reset_async_calls);
249
250
1.65k
  FlushAsync(&reset_async_calls);
251
1.65k
}
252
253
//--------------------------------------------------------------------------------------------------
254
255
5.10M
Status Executor::Execute(const ParseTree& parse_tree, const StatementParameters& params) {
256
  // Prepare execution context and execute the parse tree's root node.
257
5.10M
  exec_contexts_.emplace_back(parse_tree, params);
258
5.10M
  exec_context_ = &exec_contexts_.back();
259
5.10M
  auto root_node = parse_tree.root().get();
260
5.10M
  RETURN_NOT_OK(PreExecTreeNode(root_node));
261
5.10M
  RETURN_NOT_OK(audit_logger_.LogStatement(root_node, exec_context_->stmt(),
262
5.10M
                                           IsPrepare::kFalse));
263
5.10M
  Status s = ExecTreeNode(root_node);
264
5.10M
  if (!s.ok()) {
265
2.27k
    RETURN_NOT_OK(audit_logger_.LogStatementError(root_node, exec_context_->stmt(), s,
266
2.27k
                                                  ErrorIsFormatted::kFalse));
267
2.27k
  }
268
5.10M
  return ProcessStatementStatus(parse_tree, s);
269
5.10M
}
270
271
//--------------------------------------------------------------------------------------------------
272
273
5.08M
Status Executor::PreExecTreeNode(TreeNode *tnode) {
274
5.08M
  if (!tnode) {
275
3
    return Status::OK();
276
5.08M
  } else if (tnode->opcode() == TreeNodeOpcode::kPTInsertStmt) {
277
1.16M
    return PreExecTreeNode(static_cast<PTInsertStmt*>(tnode));
278
3.92M
  } else {
279
3.92M
    return Status::OK();
280
3.92M
  }
281
5.08M
}
282
283
1.16M
Status Executor::PreExecTreeNode(PTInsertStmt *tnode) {
284
1.16M
  if (tnode->InsertingValue()->opcode() == TreeNodeOpcode::kPTInsertJsonClause) {
285
    // We couldn't resolve JSON clause bind variable until now
286
80
    return PreExecTreeNode(static_cast<PTInsertJsonClause*>(tnode->InsertingValue().get()));
287
1.16M
  } else {
288
1.16M
    return Status::OK();
289
1.16M
  }
290
1.16M
}
291
292
110
shared_ptr<client::YBTable> Executor::GetTableFromStatement(const TreeNode *tnode) const {
293
110
  if (tnode != nullptr) {
294
110
    switch (tnode->opcode()) {
295
0
      case TreeNodeOpcode::kPTAlterTable:
296
0
        return static_cast<const PTAlterTable *>(tnode)->table();
297
298
22
      case TreeNodeOpcode::kPTSelectStmt:
299
22
        return static_cast<const PTSelectStmt *>(tnode)->table();
300
301
71
      case TreeNodeOpcode::kPTInsertStmt:
302
71
        return static_cast<const PTInsertStmt *>(tnode)->table();
303
304
0
      case TreeNodeOpcode::kPTDeleteStmt:
305
0
        return static_cast<const PTDeleteStmt *>(tnode)->table();
306
307
17
      case TreeNodeOpcode::kPTUpdateStmt:
308
17
        return static_cast<const PTUpdateStmt *>(tnode)->table();
309
310
0
      case TreeNodeOpcode::kPTExplainStmt:
311
0
        return GetTableFromStatement(static_cast<const PTExplainStmt *>(tnode)->stmt().get());
312
313
0
      default: break;
314
0
    }
315
0
  }
316
317
0
  return nullptr;
318
0
}
319
320
//--------------------------------------------------------------------------------------------------
321
322
5.37M
Status Executor::ExecTreeNode(const TreeNode *tnode) {
323
5.37M
  if (tnode == nullptr) {
324
3
    return Status::OK();
325
3
  }
326
5.37M
  TnodeContext* tnode_context = nullptr;
327
5.37M
  if (tnode->opcode() != TreeNodeOpcode::kPTListNode) {
328
5.32M
    tnode_context = exec_context_->AddTnode(tnode);
329
5.32M
    if (tnode->IsDml() && static_cast<const PTDmlStmt *>(tnode)->RequiresTransaction()) {
330
82.2k
      RETURN_NOT_OK(exec_context_->StartTransaction(SNAPSHOT_ISOLATION, ql_env_, rescheduler_));
331
82.2k
    }
332
5.32M
  }
333
5.37M
  switch (tnode->opcode()) {
334
55.6k
    case TreeNodeOpcode::kPTListNode:
335
55.6k
      return ExecPTNode(static_cast<const PTListNode *>(tnode));
336
337
1.15k
    case TreeNodeOpcode::kPTCreateTable: FALLTHROUGH_INTENDED;
338
1.59k
    case TreeNodeOpcode::kPTCreateIndex:
339
1.59k
      return ExecPTNode(static_cast<const PTCreateTable *>(tnode));
340
341
55
    case TreeNodeOpcode::kPTAlterTable:
342
55
      return ExecPTNode(static_cast<const PTAlterTable *>(tnode));
343
344
46
    case TreeNodeOpcode::kPTCreateType:
345
46
      return ExecPTNode(static_cast<const PTCreateType *>(tnode));
346
347
757
    case TreeNodeOpcode::kPTCreateRole:
348
757
      return ExecPTNode(static_cast<const PTCreateRole *>(tnode));
349
350
58
    case TreeNodeOpcode::kPTAlterRole:
351
58
      return ExecPTNode(static_cast<const PTAlterRole *>(tnode));
352
353
52
    case TreeNodeOpcode::kPTGrantRevokeRole:
354
52
      return ExecPTNode(static_cast<const PTGrantRevokeRole *>(tnode));
355
356
3.48k
    case TreeNodeOpcode::kPTDropStmt:
357
3.48k
      return ExecPTNode(static_cast<const PTDropStmt *>(tnode));
358
359
721
    case TreeNodeOpcode::kPTGrantRevokePermission:
360
721
      return ExecPTNode(static_cast<const PTGrantRevokePermission *>(tnode));
361
362
3.90M
    case TreeNodeOpcode::kPTSelectStmt:
363
3.90M
      return ExecPTNode(static_cast<const PTSelectStmt *>(tnode), tnode_context);
364
365
1.29M
    case TreeNodeOpcode::kPTInsertStmt:
366
1.29M
      return ExecPTNode(static_cast<const PTInsertStmt *>(tnode), tnode_context);
367
368
740
    case TreeNodeOpcode::kPTDeleteStmt:
369
740
      return ExecPTNode(static_cast<const PTDeleteStmt *>(tnode), tnode_context);
370
371
3.24k
    case TreeNodeOpcode::kPTUpdateStmt:
372
3.24k
      return ExecPTNode(static_cast<const PTUpdateStmt *>(tnode), tnode_context);
373
374
55.6k
    case TreeNodeOpcode::kPTStartTransaction:
375
55.6k
      return ExecPTNode(static_cast<const PTStartTransaction *>(tnode));
376
377
55.6k
    case TreeNodeOpcode::kPTCommit:
378
55.6k
      return ExecPTNode(static_cast<const PTCommit *>(tnode));
379
380
3.02k
    case TreeNodeOpcode::kPTTruncateStmt:
381
3.02k
      return ExecPTNode(static_cast<const PTTruncateStmt *>(tnode));
382
383
1.61k
    case TreeNodeOpcode::kPTCreateKeyspace:
384
1.61k
      return ExecPTNode(static_cast<const PTCreateKeyspace *>(tnode));
385
386
4.18k
    case TreeNodeOpcode::kPTUseKeyspace:
387
4.18k
      return ExecPTNode(static_cast<const PTUseKeyspace *>(tnode));
388
389
21
    case TreeNodeOpcode::kPTAlterKeyspace:
390
21
      return ExecPTNode(static_cast<const PTAlterKeyspace *>(tnode));
391
392
165
    case TreeNodeOpcode::kPTExplainStmt:
393
165
      return ExecPTNode(static_cast<const PTExplainStmt *>(tnode));
394
395
0
    default:
396
0
      return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_SUPPORTED);
397
5.37M
  }
398
5.37M
}
399
400
757
Status Executor::ExecPTNode(const PTCreateRole *tnode) {
401
757
  const Status s = ql_env_->CreateRole(tnode->role_name(), tnode->salted_hash(), tnode->login(),
402
757
                                       tnode->superuser());
403
757
  if (PREDICT_FALSE(!s.ok())) {
404
6
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
405
6
    if (s.IsAlreadyPresent()) {
406
0
      error_code = ErrorCode::DUPLICATE_ROLE;
407
6
    } else if (s.IsNotAuthorized()) {
408
6
      error_code = ErrorCode::UNAUTHORIZED;
409
6
    }
410
411
6
    if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_ROLE) {
412
0
      return Status::OK();
413
0
    }
414
415
    // TODO (Bristy) : Set result_ properly.
416
6
    return exec_context_->Error(tnode, s, error_code);
417
6
  }
418
419
751
  return Status::OK();
420
751
}
421
422
//--------------------------------------------------------------------------------------------------
423
424
58
Status Executor::ExecPTNode(const PTAlterRole *tnode) {
425
58
  const Status s = ql_env_->AlterRole(tnode->role_name(), tnode->salted_hash(), tnode->login(),
426
58
                                      tnode->superuser());
427
58
  if (PREDICT_FALSE(!s.ok())) {
428
13
    ErrorCode error_code = ErrorCode::ROLE_NOT_FOUND;
429
13
    if (s.IsNotAuthorized()) {
430
10
      error_code = ErrorCode::UNAUTHORIZED;
431
10
    }
432
13
    return exec_context_->Error(tnode, s, error_code);
433
13
  }
434
435
45
  return Status::OK();
436
45
}
437
438
//--------------------------------------------------------------------------------------------------
439
440
52
Status Executor::ExecPTNode(const PTGrantRevokeRole* tnode) {
441
52
  const Status s = ql_env_->GrantRevokeRole(tnode->statement_type(), tnode->granted_role_name(),
442
52
                                            tnode->recipient_role_name());
443
52
  if (PREDICT_FALSE(!s.ok())) {
444
3
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
445
3
    if (s.IsInvalidArgument()) {
446
2
      error_code = ErrorCode::INVALID_REQUEST;
447
1
    } else if (s.IsNotFound()) {
448
1
      error_code = ErrorCode::ROLE_NOT_FOUND;
449
1
    }
450
    // TODO (Bristy) : Set result_ properly.
451
3
    return exec_context_->Error(tnode, s, error_code);
452
3
  }
453
49
  return Status::OK();
454
49
}
455
456
//--------------------------------------------------------------------------------------------------
457
458
55.6k
Status Executor::ExecPTNode(const PTListNode *tnode) {
459
181k
  for (TreeNode::SharedPtr dml : tnode->node_list()) {
460
181k
    RETURN_NOT_OK(ExecTreeNode(dml.get()));
461
181k
  }
462
55.6k
  return Status::OK();
463
55.6k
}
464
465
//--------------------------------------------------------------------------------------------------
466
467
46
Status Executor::ExecPTNode(const PTCreateType *tnode) {
468
46
  YBTableName yb_name = tnode->yb_type_name();
469
470
46
  const std::string& type_name = yb_name.table_name();
471
46
  std::string keyspace_name = yb_name.namespace_name();
472
473
46
  std::vector<std::string> field_names;
474
46
  std::vector<std::shared_ptr<QLType>> field_types;
475
476
86
  for (const PTTypeField::SharedPtr& field : tnode->fields()->node_list()) {
477
86
    field_names.emplace_back(field->yb_name());
478
86
    field_types.push_back(field->ql_type());
479
86
  }
480
481
46
  Status s = ql_env_->CreateUDType(keyspace_name, type_name, field_names, field_types);
482
46
  if (PREDICT_FALSE(!s.ok())) {
483
1
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
484
1
    if (s.IsAlreadyPresent()) {
485
1
      error_code = ErrorCode::DUPLICATE_TYPE;
486
0
    } else if (s.IsNotFound()) {
487
0
      error_code = ErrorCode::KEYSPACE_NOT_FOUND;
488
0
    } else if (s.IsInvalidArgument()) {
489
0
      error_code = ErrorCode::INVALID_TYPE_DEFINITION;
490
0
    }
491
492
1
    if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_TYPE) {
493
0
      return Status::OK();
494
0
    }
495
496
1
    return exec_context_->Error(tnode->type_name(), s, error_code);
497
1
  }
498
499
45
  result_ = std::make_shared<SchemaChangeResult>("CREATED", "TYPE", keyspace_name, type_name);
500
45
  return Status::OK();
501
45
}
502
503
//--------------------------------------------------------------------------------------------------
504
505
1.59k
Status Executor::ExecPTNode(const PTCreateTable *tnode) {
506
1.59k
  YBTableName table_name = tnode->yb_table_name();
507
508
1.59k
  if (table_name.is_system() && client::FLAGS_yb_system_namespace_readonly) {
509
1
    return exec_context_->Error(tnode->table_name(), ErrorCode::SYSTEM_NAMESPACE_READONLY);
510
1
  }
511
512
  // Setting up columns.
513
1.59k
  Status s;
514
1.59k
  YBSchema schema;
515
1.59k
  YBSchemaBuilder b;
516
1.59k
  shared_ptr<YBTableCreator> table_creator(ql_env_->NewTableCreator());
517
  // Table properties is kept in the metadata of the IndexTable.
518
1.59k
  TableProperties table_properties;
519
  // IndexInfo is kept in the metadata of the Table that is being indexed.
520
1.59k
  IndexInfoPB *index_info = nullptr;
521
522
  // When creating an index, we construct IndexInfo and associated it with the data-table. Later,
523
  // when operating on the data-table, we can decide if updating the index-tables are needed.
524
1.59k
  if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) {
525
440
    const PTCreateIndex *index_node = static_cast<const PTCreateIndex*>(tnode);
526
527
440
    index_info = table_creator->mutable_index_info();
528
440
    index_info->set_indexed_table_id(index_node->indexed_table_id());
529
440
    index_info->set_is_local(index_node->is_local());
530
440
    index_info->set_is_unique(index_node->is_unique());
531
440
    index_info->set_is_backfill_deferred(index_node->is_backfill_deferred());
532
440
    index_info->set_hash_column_count(narrow_cast<uint32_t>(tnode->hash_columns().size()));
533
440
    index_info->set_range_column_count(narrow_cast<uint32_t>(tnode->primary_columns().size()));
534
440
    index_info->set_use_mangled_column_name(true);
535
536
    // List key columns of data-table being indexed.
537
2.22k
    for (const auto& col_desc : index_node->column_descs()) {
538
2.22k
      if (col_desc.is_hash()) {
539
662
        index_info->add_indexed_hash_column_ids(col_desc.id());
540
1.56k
      } else if (col_desc.is_primary()) {
541
501
        index_info->add_indexed_range_column_ids(col_desc.id());
542
501
      }
543
2.22k
    }
544
545
440
    if (index_node->where_clause()) {
546
      // TODO (Piyush): Add a ToString method for PTExpr and log the where clause.
547
129
      IndexInfoPB::WherePredicateSpecPB *where_predicate_spec =
548
129
        index_info->mutable_where_predicate_spec();
549
550
129
      RETURN_NOT_OK(PTExprToPB(index_node->where_clause(),
551
129
        where_predicate_spec->mutable_where_expr()));
552
553
162
      for (auto column_id : *(index_node->where_clause_column_refs())) {
554
162
        where_predicate_spec->add_column_ids(column_id);
555
162
      }
556
129
    }
557
440
  }
558
559
1.89k
  for (const auto& column : tnode->hash_columns()) {
560
1.89k
    if (column->sorting_type() != SortingType::kNotSpecified) {
561
0
      return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION);
562
0
    }
563
1.89k
    b.AddColumn(column->coldef_name().c_str())
564
1.89k
      ->Type(column->ql_type())
565
1.89k
      ->HashPrimaryKey()
566
1.89k
      ->Order(column->order());
567
1.89k
    RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column));
568
1.89k
  }
569
570
1.80k
  for (const auto& column : tnode->primary_columns()) {
571
1.80k
    b.AddColumn(column->coldef_name().c_str())
572
1.80k
      ->Type(column->ql_type())
573
1.80k
      ->PrimaryKey()
574
1.80k
      ->Order(column->order())
575
1.80k
      ->SetSortingType(column->sorting_type());
576
1.80k
    RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column));
577
1.80k
  }
578
579
2.26k
  for (const auto& column : tnode->columns()) {
580
2.26k
    if (column->sorting_type() != SortingType::kNotSpecified) {
581
0
      return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION);
582
0
    }
583
2.26k
    YBColumnSpec *column_spec = b.AddColumn(column->coldef_name().c_str())
584
2.26k
                                  ->Type(column->ql_type())
585
2.26k
                                  ->Nullable()
586
2.26k
                                  ->Order(column->order());
587
2.26k
    if (column->is_static()) {
588
43
      column_spec->StaticColumn();
589
43
    }
590
2.26k
    if (column->is_counter()) {
591
16
      column_spec->Counter();
592
16
    }
593
2.26k
    RETURN_NOT_OK(AddColumnToIndexInfo(index_info, column));
594
2.26k
  }
595
596
1.59k
  s = tnode->ToTableProperties(&table_properties);
597
1.59k
  if (!s.ok()) {
598
0
    return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION);
599
0
  }
600
1.59k
  b.SetTableProperties(table_properties);
601
602
1.59k
  s = b.Build(&schema);
603
1.59k
  if (PREDICT_FALSE(!s.ok())) {
604
0
    return exec_context_->Error(tnode->columns().front(), s, ErrorCode::INVALID_TABLE_DEFINITION);
605
0
  }
606
607
  // Create table.
608
1.59k
  table_creator->table_name(table_name)
609
1.59k
      .table_type(YBTableType::YQL_TABLE_TYPE)
610
1.59k
      .creator_role_name(ql_env_->CurrentRoleName())
611
1.59k
      .schema(&schema);
612
613
1.59k
  if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) {
614
440
    const PTCreateIndex *index_node = static_cast<const PTCreateIndex*>(tnode);
615
440
    table_creator->indexed_table_id(index_node->indexed_table_id());
616
440
    table_creator->is_local_index(index_node->is_local());
617
440
    table_creator->is_unique_index(index_node->is_unique());
618
440
    table_creator->is_backfill_deferred(index_node->is_backfill_deferred());
619
440
  }
620
621
  // Clean-up table cache BEFORE op (the cache is used by other processor threads).
622
1.59k
  ql_env_->RemoveCachedTableDesc(table_name);
623
1.59k
  if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) {
624
440
    const YBTableName indexed_table_name =
625
440
        static_cast<const PTCreateIndex*>(tnode)->indexed_table_name();
626
440
    ql_env_->RemoveCachedTableDesc(indexed_table_name);
627
440
  }
628
629
1.59k
  s = table_creator->Create();
630
1.59k
  if (PREDICT_FALSE(!s.ok())) {
631
8
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
632
8
    if (s.IsAlreadyPresent()) {
633
4
      error_code = ErrorCode::DUPLICATE_OBJECT;
634
4
    } else if (s.IsNotFound()) {
635
3
      error_code = tnode->opcode() == TreeNodeOpcode::kPTCreateIndex
636
1
                   ? ErrorCode::OBJECT_NOT_FOUND
637
2
                   : ErrorCode::KEYSPACE_NOT_FOUND;
638
1
    } else if (s.IsInvalidArgument()) {
639
0
      error_code = ErrorCode::INVALID_TABLE_DEFINITION;
640
0
    }
641
642
8
    if (tnode->create_if_not_exists() && error_code == ErrorCode::DUPLICATE_OBJECT) {
643
1
      return Status::OK();
644
1
    }
645
646
7
    return exec_context_->Error(tnode->table_name(), s, error_code);
647
7
  }
648
649
  // Clean-up table cache AFTER op (the cache is used by other processor threads).
650
1.58k
  ql_env_->RemoveCachedTableDesc(table_name);
651
652
1.58k
  if (tnode->opcode() == TreeNodeOpcode::kPTCreateIndex) {
653
434
    const YBTableName indexed_table_name =
654
434
        static_cast<const PTCreateIndex*>(tnode)->indexed_table_name();
655
    // Clean-up table cache AFTER op (the cache is used by other processor threads).
656
434
    ql_env_->RemoveCachedTableDesc(indexed_table_name);
657
658
434
    result_ = std::make_shared<SchemaChangeResult>(
659
434
        "UPDATED", "TABLE", indexed_table_name.namespace_name(), indexed_table_name.table_name());
660
1.15k
  } else {
661
1.15k
    result_ = std::make_shared<SchemaChangeResult>(
662
1.15k
        "CREATED", "TABLE", table_name.namespace_name(), table_name.table_name());
663
1.15k
  }
664
1.58k
  return Status::OK();
665
1.58k
}
666
667
5.96k
Status Executor::AddColumnToIndexInfo(IndexInfoPB *index_info, const PTColumnDefinition *column) {
668
  // Associate index-column with data-column.
669
5.96k
  if (index_info) {
670
    // Note that column_id is assigned by master server, so we don't have it yet. When processing
671
    // create index request, server will update IndexInfo with proper column_id.
672
1.72k
    auto *col = index_info->add_columns();
673
1.72k
    col->set_column_name(column->coldef_name().c_str());
674
1.72k
    col->set_indexed_column_id(column->indexed_ref());
675
1.72k
    RETURN_NOT_OK(PTExprToPB(column->colexpr(), col->mutable_colexpr()));
676
1.72k
  }
677
5.96k
  return Status::OK();
678
5.96k
}
679
680
//--------------------------------------------------------------------------------------------------
681
682
55
Status Executor::ExecPTNode(const PTAlterTable *tnode) {
683
55
  YBTableName table_name = tnode->yb_table_name();
684
685
55
  shared_ptr<YBTableAlterer> table_alterer(ql_env_->NewTableAlterer(table_name));
686
687
56
  for (const auto& mod_column : tnode->mod_columns()) {
688
56
    switch (mod_column->mod_type()) {
689
35
      case ALTER_ADD:
690
35
        table_alterer->AddColumn(mod_column->new_name()->data())
691
35
            ->Type(mod_column->ql_type());
692
35
        break;
693
7
      case ALTER_DROP:
694
7
        table_alterer->DropColumn(mod_column->old_name()->last_name().data());
695
7
        break;
696
14
      case ALTER_RENAME:
697
14
        table_alterer->AlterColumn(mod_column->old_name()->last_name().data())
698
14
            ->RenameTo(mod_column->new_name()->c_str());
699
14
        break;
700
0
      case ALTER_TYPE:
701
        // Not yet supported by AlterTableRequestPB.
702
0
        return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_YET_IMPLEMENTED);
703
56
    }
704
56
  }
705
706
55
  if (!tnode->mod_props().empty()) {
707
7
    TableProperties table_properties;
708
7
    Status s = tnode->ToTableProperties(&table_properties);
709
7
    if(PREDICT_FALSE(!s.ok())) {
710
0
      return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
711
0
    }
712
713
7
    table_alterer->SetTableProperties(table_properties);
714
7
  }
715
716
  // Clean-up table cache BEFORE op (the cache is used by other processor threads).
717
55
  ql_env_->RemoveCachedTableDesc(table_name);
718
719
55
  Status s = table_alterer->Alter();
720
55
  if (PREDICT_FALSE(!s.ok())) {
721
0
    return exec_context_->Error(tnode, s, ErrorCode::EXEC_ERROR);
722
0
  }
723
724
55
  result_ = std::make_shared<SchemaChangeResult>(
725
55
      "UPDATED", "TABLE", table_name.namespace_name(), table_name.table_name());
726
727
  // Clean-up table cache AFTER op (the cache is used by other processor threads).
728
55
  ql_env_->RemoveCachedTableDesc(table_name);
729
55
  return Status::OK();
730
55
}
731
732
//--------------------------------------------------------------------------------------------------
733
734
3.48k
Status Executor::ExecPTNode(const PTDropStmt *tnode) {
735
3.48k
  Status s;
736
3.48k
  ErrorCode error_not_found = ErrorCode::SERVER_ERROR;
737
738
3.48k
  switch (tnode->drop_type()) {
739
1.10k
    case ObjectType::TABLE: {
740
      // Drop the table.
741
1.10k
      const YBTableName table_name = tnode->yb_table_name();
742
      // Clean-up table cache BEFORE op (the cache is used by other processor threads).
743
1.10k
      ql_env_->RemoveCachedTableDesc(table_name);
744
745
1.10k
      s = ql_env_->DeleteTable(table_name);
746
1.10k
      error_not_found = ErrorCode::OBJECT_NOT_FOUND;
747
1.10k
      result_ = std::make_shared<SchemaChangeResult>(
748
1.10k
          "DROPPED", "TABLE", table_name.namespace_name(), table_name.table_name());
749
750
      // Clean-up table cache AFTER op (the cache is used by other processor threads).
751
1.10k
      ql_env_->RemoveCachedTableDesc(table_name);
752
1.10k
      break;
753
0
    }
754
755
105
    case ObjectType::INDEX: {
756
      // Drop the index.
757
105
      const YBTableName table_name = tnode->yb_table_name();
758
      // Clean-up table cache BEFORE op (the cache is used by other processor threads).
759
105
      ql_env_->RemoveCachedTableDesc(table_name);
760
761
105
      YBTableName indexed_table_name;
762
105
      s = ql_env_->DeleteIndexTable(table_name, &indexed_table_name);
763
105
      error_not_found = ErrorCode::OBJECT_NOT_FOUND;
764
105
      result_ = std::make_shared<SchemaChangeResult>(
765
105
          "UPDATED", "TABLE", indexed_table_name.namespace_name(), indexed_table_name.table_name());
766
767
      // Clean-up table cache AFTER op (the cache is used by other processor threads).
768
105
      ql_env_->RemoveCachedTableDesc(table_name);
769
105
      ql_env_->RemoveCachedTableDesc(indexed_table_name);
770
105
      break;
771
0
    }
772
773
1.49k
    case ObjectType::SCHEMA: {
774
      // Drop the keyspace.
775
1.49k
      const string keyspace_name(tnode->name()->last_name().c_str());
776
1.49k
      s = ql_env_->DeleteKeyspace(keyspace_name);
777
1.49k
      error_not_found = ErrorCode::KEYSPACE_NOT_FOUND;
778
1.49k
      result_ = std::make_shared<SchemaChangeResult>("DROPPED", "KEYSPACE", keyspace_name);
779
1.49k
      break;
780
0
    }
781
782
53
    case ObjectType::TYPE: {
783
      // Drop the type.
784
53
      const string type_name(tnode->name()->last_name().c_str());
785
53
      const string namespace_name(tnode->name()->first_name().c_str());
786
53
      s = ql_env_->DeleteUDType(namespace_name, type_name);
787
53
      error_not_found = ErrorCode::TYPE_NOT_FOUND;
788
53
      result_ = std::make_shared<SchemaChangeResult>("DROPPED", "TYPE", namespace_name, type_name);
789
53
      ql_env_->RemoveCachedUDType(namespace_name, type_name);
790
53
      break;
791
0
    }
792
793
730
    case ObjectType::ROLE: {
794
      // Drop the role.
795
730
      const string role_name(tnode->name()->QLName());
796
730
      s = ql_env_->DeleteRole(role_name);
797
730
      error_not_found = ErrorCode::ROLE_NOT_FOUND;
798
      // TODO (Bristy) : Set result_ properly.
799
730
      break;
800
0
    }
801
802
0
    default:
803
0
      return exec_context_->Error(tnode->name(), ErrorCode::FEATURE_NOT_SUPPORTED);
804
3.48k
  }
805
806
3.48k
  if (PREDICT_FALSE(!s.ok())) {
807
33
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
808
809
33
    if (s.IsNotFound()) {
810
      // Ignore not found error for a DROP IF EXISTS statement.
811
20
      if (tnode->drop_if_exists()) {
812
5
        return Status::OK();
813
5
      }
814
815
15
      error_code = error_not_found;
816
13
    } else if (s.IsNotAuthorized()) {
817
3
      error_code = ErrorCode::UNAUTHORIZED;
818
10
    } else if(s.IsQLError()) {
819
6
      error_code = ErrorCode::INVALID_REQUEST;
820
6
    }
821
822
28
    return exec_context_->Error(tnode->name(), s, error_code);
823
3.45k
  }
824
825
3.45k
  return Status::OK();
826
3.45k
}
827
828
721
Status Executor::ExecPTNode(const PTGrantRevokePermission* tnode) {
829
721
  const string role_name = tnode->role_name()->QLName();
830
721
  const string canonical_resource = tnode->canonical_resource();
831
721
  const char* resource_name = tnode->resource_name();
832
721
  const char* namespace_name = tnode->namespace_name();
833
721
  ResourceType resource_type = tnode->resource_type();
834
721
  PermissionType permission = tnode->permission();
835
721
  const auto statement_type = tnode->statement_type();
836
837
721
  Status s = ql_env_->GrantRevokePermission(statement_type, permission, resource_type,
838
721
                                            canonical_resource, resource_name, namespace_name,
839
721
                                            role_name);
840
841
721
  if (PREDICT_FALSE(!s.ok())) {
842
0
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
843
0
    if (s.IsInvalidArgument()) {
844
0
      error_code = ErrorCode::INVALID_ARGUMENTS;
845
0
    }
846
0
    if (s.IsNotFound()) {
847
0
      error_code = ErrorCode::RESOURCE_NOT_FOUND;
848
0
    }
849
0
    return exec_context_->Error(tnode, s, error_code);
850
0
  }
851
  // TODO (Bristy) : Return proper result.
852
721
  return Status::OK();
853
721
}
854
855
Status Executor::GetOffsetOrLimit(
856
    const PTSelectStmt* tnode,
857
    const std::function<PTExprPtr(const PTSelectStmt* tnode)>& get_val,
858
    const string& clause_type,
859
3.62M
    int32_t* value) {
860
3.62M
  QLExpressionPB expr_pb;
861
3.62M
  Status s = (PTExprToPB(get_val(tnode), &expr_pb));
862
3.62M
  if (PREDICT_FALSE(!s.ok())) {
863
1
    return exec_context_->Error(get_val(tnode), s, ErrorCode::INVALID_ARGUMENTS);
864
1
  }
865
866
3.62M
  if (expr_pb.has_value() && IsNull(expr_pb.value())) {
867
2
    return exec_context_->Error(get_val(tnode),
868
2
                                Substitute("$0 value cannot be null.", clause_type).c_str(),
869
2
                                ErrorCode::INVALID_ARGUMENTS);
870
2
  }
871
872
  // This should be ensured by checks before getting here.
873
13.2k
  DCHECK(expr_pb.has_value() && expr_pb.value().has_int32_value())
874
13.2k
      << "Integer constant expected for " + clause_type + " clause";
875
876
3.62M
  if (expr_pb.value().int32_value() < 0) {
877
0
    return exec_context_->Error(get_val(tnode),
878
0
                                Substitute("$0 value cannot be negative.", clause_type).c_str(),
879
0
                                ErrorCode::INVALID_ARGUMENTS);
880
0
  }
881
3.62M
  *value = expr_pb.value().int32_value();
882
3.62M
  return Status::OK();
883
3.62M
}
884
885
//--------------------------------------------------------------------------------------------------
886
887
// NOTE: This function is being called recursively.
888
// - The paging-state is loaded to the context only on the first call (Call from user)
889
// - Similarly, all code in this function must work for both cases - calls by users and recursive
890
//   calls within the same process. These two different cases can be cleaned up later to avoid
891
//   confusion.
892
3.90M
Status Executor::ExecPTNode(const PTSelectStmt *tnode, TnodeContext* tnode_context) {
893
3.90M
  const shared_ptr<client::YBTable>& table = tnode->table();
894
3.90M
  if (table == nullptr) {
895
    // If this is a request for 'system.peers_v2' table make sure that we send the appropriate error
896
    // so that the client driver can query the proper peers table i.e. 'system.peers' based on the
897
    // error.
898
2.09k
    if (tnode->is_system() &&
899
2.09k
        tnode->table_name().table_name() == "peers_v2" &&
900
2.09k
        tnode->table_name().namespace_name() == "system") {
901
2.09k
      string error_msg = "Unknown keyspace/cf pair (system.peers_v2)";
902
2.09k
      return exec_context_->Error(tnode, error_msg, ErrorCode::SERVER_ERROR);
903
2.09k
    }
904
905
    // If this is a system table but the table does not exist, it is okay. Just return OK with void
906
    // result.
907
2
    return tnode->is_system() ? Status::OK()
908
0
                              : exec_context_->Error(tnode, ErrorCode::OBJECT_NOT_FOUND);
909
2
  }
910
911
  // If there is a table id in the statement parameter's paging state, this is a continuation of a
912
  // prior SELECT statement. Verify that the same table/index still exists and matches the table id
913
  // for query without index, or the index id in the leaf node (where child_select is null also).
914
3.90M
  const StatementParameters& params = exec_context_->params();
915
3.90M
  const bool continue_user_request = !tnode->child_select() && !params.table_id().empty();
916
3.90M
  if (continue_user_request && params.table_id() != table->id()) {
917
2
    return exec_context_->Error(tnode, "Object no longer exists.", ErrorCode::OBJECT_NOT_FOUND);
918
2
  }
919
920
  // Read the paging state from user input "params".
921
3.90M
  QueryPagingState *query_state = VERIFY_RESULT(LoadPagingStateFromUser(tnode, tnode_context));
922
3.90M
  if (query_state->reached_select_limit()) {
923
    // Return the result without executing the node.
924
2
    return result_ != nullptr ? Status::OK() : GenerateEmptyResult(tnode);
925
2
  }
926
927
  // If there is an index to select from, execute it.
928
3.90M
  if (tnode->child_select()) {
929
0
    LOG_IF(DFATAL, result_) << "Expecting result is not yet initialized";
930
1.95k
    const PTSelectStmt* child_select = tnode->child_select().get();
931
1.95k
    TnodeContext* child_context = tnode_context->AddChildTnode(child_select);
932
1.95k
    RETURN_NOT_OK(ExecPTNode(child_select, child_context));
933
    // If the index covers the SELECT query fully, we are done. Otherwise, continue to prepare
934
    // the SELECT from the table using the primary key to be returned from the index select.
935
1.95k
    if (child_select->covers_fully()) {
936
1.55k
      return Status::OK();
937
1.55k
    }
938
    // If the child uncovered index select has set result_ already it must have been able
939
    // to guarantee an empty result (i.e. if WHERE clause guarantees no rows could match)
940
    // so we can just return.
941
404
    if (result_) {
942
0
      LOG_IF(DFATAL, result_->type() != ExecutedResult::Type::ROWS)
943
0
          << "Expecting result type is ROWS=" << static_cast<int>(ExecutedResult::Type::ROWS)
944
0
          << ", got result type=" << static_cast<int>(result_->type());
945
0
      auto rows_result = std::static_pointer_cast<RowsResult>(result_);
946
0
      RSTATUS_DCHECK(rows_result->paging_state().empty(),
947
0
                     Corruption, "Expecting result_ to be empty with empty paging state");
948
0
      RSTATUS_DCHECK(rows_result->rows_data() == string(4, '\0'), // Encoded row_count == 0.
949
0
                     Corruption, "Expecting result_ to be empty with result row_count equals 0");
950
0
      return Status::OK();
951
0
    }
952
3.90M
  }
953
954
  // Create the read request.
955
3.90M
  YBqlReadOpPtr select_op(table->NewQLSelect());
956
3.90M
  QLReadRequestPB *req = select_op->mutable_request();
957
958
  // Where clause - Hash, range, and regular columns.
959
3.90M
  req->set_is_aggregate(tnode->is_aggregate());
960
3.90M
  Result<uint64_t> max_rows_estimate = WhereClauseToPB(req, tnode->key_where_ops(),
961
3.90M
                                                       tnode->where_ops(),
962
3.90M
                                                       tnode->subscripted_col_where_ops(),
963
3.90M
                                                       tnode->json_col_where_ops(),
964
3.90M
                                                       tnode->partition_key_ops(),
965
3.90M
                                                       tnode->func_ops(),
966
3.90M
                                                       tnode_context);
967
3.90M
  if (PREDICT_FALSE(!max_rows_estimate)) {
968
12
    return exec_context_->Error(tnode, max_rows_estimate.status(), ErrorCode::INVALID_ARGUMENTS);
969
12
  }
970
971
  // If where clause restrictions guarantee no rows could match, return empty result immediately.
972
3.90M
  if (*max_rows_estimate == 0 && !tnode->is_aggregate()) {
973
0
    return GenerateEmptyResult(tnode);
974
0
  }
975
976
3.90M
  req->set_is_forward_scan(tnode->is_forward_scan());
977
978
  // Specify selected list by adding the expressions to selected_exprs in read request.
979
3.90M
  QLRSRowDescPB *rsrow_desc_pb = req->mutable_rsrow_desc();
980
4.14M
  for (const auto& expr : tnode->selected_exprs()) {
981
4.14M
    if (expr->opcode() == TreeNodeOpcode::kPTAllColumns) {
982
3.73M
      const Status s = PTExprToPB(static_cast<const PTAllColumns*>(expr.get()), req);
983
3.73M
      if (PREDICT_FALSE(!s.ok())) {
984
0
        return exec_context_->Error(expr, s, ErrorCode::INVALID_ARGUMENTS);
985
0
      }
986
412k
    } else {
987
412k
      const Status s = PTExprToPB(expr, req->add_selected_exprs());
988
412k
      if (PREDICT_FALSE(!s.ok())) {
989
7
        return exec_context_->Error(expr, s, ErrorCode::INVALID_ARGUMENTS);
990
7
      }
991
992
      // Add the expression metadata (rsrow descriptor).
993
412k
      QLRSColDescPB *rscol_desc_pb = rsrow_desc_pb->add_rscol_descs();
994
412k
      rscol_desc_pb->set_name(expr->QLName());
995
412k
      expr->rscol_type_PB(rscol_desc_pb->mutable_ql_type());
996
412k
    }
997
4.14M
  }
998
999
  // Setup the column values that need to be read.
1000
3.90M
  Status s = ColumnRefsToPB(tnode, req->mutable_column_refs());
1001
3.90M
  if (PREDICT_FALSE(!s.ok())) {
1002
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1003
0
  }
1004
1005
  // Set the IF clause.
1006
3.90M
  if (tnode->if_clause() != nullptr) {
1007
112
    s = PTExprToPB(tnode->if_clause(), select_op->mutable_request()->mutable_if_expr());
1008
112
    if (PREDICT_FALSE(!s.ok())) {
1009
0
      return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS);
1010
0
    }
1011
3.90M
  }
1012
1013
  // Specify distinct columns or non.
1014
3.90M
  req->set_distinct(tnode->distinct());
1015
1016
  // Default row count limit is the page size.
1017
  // We should return paging state when page size limit is hit.
1018
  // For system tables, we do not support page size so do nothing.
1019
3.90M
  if (!tnode->is_system()) {
1020
3.63M
    req->set_limit(params.page_size());
1021
3.63M
    req->set_return_paging_state(true);
1022
3.63M
  }
1023
1024
3.90M
  if (!tnode->child_select()) {
1025
    // DocDB will do LIMIT and OFFSET computation for this query.
1026
3.88M
    if (tnode->limit()) {
1027
      // Setup request to DocDB according to the given LIMIT.
1028
3.60M
      size_t user_limit = query_state->select_limit() - query_state->read_count();
1029
3.61M
      if (!req->has_limit() || user_limit <= req->limit()) {
1030
        // Set limit and instruct DocDB to clear paging state if limit is reached.
1031
3.61M
        req->set_limit(user_limit);
1032
3.61M
        req->set_return_paging_state(false);
1033
3.61M
      }
1034
3.60M
    }
1035
1036
3.88M
    if (tnode->offset()) {
1037
      // Setup request to DocDB according to the given OFFSET.
1038
335
      auto user_offset = query_state->select_offset() - query_state->skip_count();
1039
335
      req->set_offset(user_offset);
1040
335
      req->set_return_paging_state(true);
1041
335
    }
1042
3.88M
  }
1043
1044
  // If this is a continuation of a prior user's request, set the next partition key, row key,
1045
  // and total number of rows read in the request's paging state.
1046
3.90M
  if (continue_user_request) {
1047
279
    QLPagingStatePB *paging_state = req->mutable_paging_state();
1048
1049
279
    paging_state->set_next_partition_key(query_state->next_partition_key());
1050
279
    paging_state->set_next_row_key(query_state->next_row_key());
1051
279
    paging_state->set_total_num_rows_read(query_state->total_num_rows_read());
1052
279
    paging_state->set_total_rows_skipped(query_state->total_rows_skipped());
1053
279
  }
1054
1055
  // Set the consistency level for the operation. Always use strong consistency for system tables.
1056
259k
  select_op->set_yb_consistency_level(tnode->is_system() ? YBConsistencyLevel::STRONG
1057
3.64M
                                                         : params.yb_consistency_level());
1058
1059
  // Save the hash_code and max_hash_code limits computed from the request's partition_key_ops in
1060
  // WhereClauseToPB(). These will be used later to be set in the request protobuf in
1061
  // AdvanceToNextPartition() for multi-partition selects. The limits need to be saved now because
1062
  // before AdvanceToNextPartition(), `Batcher::DoAdd()` of the current request might reuse and
1063
  // mutate these limits to set tighter limits specific to the current partition. Then when
1064
  // continuing to the next partition we cannot use the partition-specific limits, nor clear them
1065
  // completely and lose the top-level limits.
1066
1067
  // Example: For `SELECT ... WHERE token(h) > 10 and token(h) < 100 and h IN (1,7,12)`, the `token`
1068
  // conditions set the top-level hash code limits, and `h` sets the individual partitions to read.
1069
  // The top-level hash code limits are needed because if the hash code for any of the partitions
1070
  // falls outside the `token`-set range we need to return no results for that partition.
1071
1072
3.90M
  if (req->has_hash_code())
1073
33
    tnode_context->set_hash_code_from_partition_key_ops(req->hash_code());
1074
3.90M
  if (req->has_max_hash_code())
1075
23
    tnode_context->set_max_hash_code_from_partition_key_ops(req->max_hash_code());
1076
1077
  // If we have several hash partitions (i.e. IN condition on hash columns) we initialize the
1078
  // start partition here, and then iteratively scan the rest in FetchMoreRows.
1079
  // Otherwise, the request will already have the right hashed column values set.
1080
3.90M
  if (tnode_context->UnreadPartitionsRemaining() > 0) {
1081
1.07k
    tnode_context->InitializePartition(select_op->mutable_request(), continue_user_request);
1082
1083
    // We can optimize to run the ops in parallel (rather than serially) if:
1084
    // - the estimated max number of rows is less than req limit (min of page size and CQL limit).
1085
    // - there is no offset (which requires passing skipped rows from one request to the next).
1086
1.07k
    if (*max_rows_estimate <= req->limit() && !req->has_offset()) {
1087
988
      AddOperation(select_op, tnode_context);
1088
6.29k
      while (tnode_context->UnreadPartitionsRemaining() > 1) {
1089
5.30k
        YBqlReadOpPtr op(table->NewQLSelect());
1090
5.30k
        op->mutable_request()->CopyFrom(select_op->request());
1091
5.30k
        op->set_yb_consistency_level(select_op->yb_consistency_level());
1092
5.30k
        tnode_context->AdvanceToNextPartition(op->mutable_request());
1093
5.30k
        AddOperation(op, tnode_context);
1094
5.30k
        select_op = op; // Use new op as base for the next one, if any.
1095
5.30k
      }
1096
988
      return Status::OK();
1097
988
    }
1098
3.89M
  }
1099
1100
  // If this select statement uses an uncovered index underneath, save this op as a template to
1101
  // read from the table once the primary keys are returned from the uncovered index. The paging
1102
  // state should be used by the underlying select from the index only which decides where to
1103
  // continue the select from the index.
1104
3.89M
  if (tnode->child_select() && !tnode->child_select()->covers_fully()) {
1105
404
    req->clear_return_paging_state();
1106
404
    tnode_context->SetUncoveredSelectOp(select_op);
1107
404
    result_ = std::make_shared<RowsResult>(select_op.get());
1108
404
    return Status::OK();
1109
404
  }
1110
1111
  // Add the operation.
1112
3.89M
  AddOperation(select_op, tnode_context);
1113
3.89M
  return Status::OK();
1114
3.89M
}
1115
1116
Result<QueryPagingState*> Executor::LoadPagingStateFromUser(const PTSelectStmt* tnode,
1117
3.89M
                                                            TnodeContext* tnode_context) {
1118
3.89M
  QueryPagingState *query_state = tnode_context->query_state();
1119
3.89M
  if (query_state) {
1120
    // If select_state is already set, use it.
1121
0
    if (tnode->limit()) {
1122
      // Need to compute the maximum number of rows to fetch for this user's request.
1123
0
      query_state->AdjustMaxFetchSizeToSelectLimit();
1124
0
    }
1125
0
    return query_state;
1126
0
  }
1127
1128
  // Create query_state for this execution.
1129
  // - Only top-level-select node should have the row counter.
1130
  // - User do not care about row counter of an inner or nested query.
1131
3.89M
  const StatementParameters& params = exec_context_->params();
1132
3.89M
  query_state = tnode_context->CreateQueryState(params, tnode->IsTopLevelReadNode());
1133
3.89M
  if (tnode->limit()) {
1134
3.61M
    RSTATUS_DCHECK(tnode->IsTopLevelReadNode(), Corruption,
1135
3.61M
                   "LIMIT clause cannot be applied to nested SELECT");
1136
3.61M
    if (!query_state->has_select_limit()) {
1137
3.60M
      int32_t limit;
1138
3.60M
      RETURN_NOT_OK(GetOffsetOrLimit(
1139
3.60M
          tnode,
1140
3.60M
          [](const PTSelectStmt* tnode) -> PTExpr::SharedPtr { return tnode->limit(); },
1141
3.60M
          "LIMIT", &limit));
1142
3.60M
      query_state->set_select_limit(limit);
1143
3.60M
    }
1144
1145
3.61M
    query_state->AdjustMaxFetchSizeToSelectLimit();
1146
3.61M
  }
1147
1148
3.89M
  if (tnode->offset()) {
1149
393
    RSTATUS_DCHECK(tnode->IsTopLevelReadNode(), Corruption,
1150
393
                   "OFFSET clause cannot be applied to nested SELECT");
1151
393
    if (!query_state->has_select_offset()) {
1152
251
      int32_t offset;
1153
251
      RETURN_NOT_OK(GetOffsetOrLimit(
1154
251
          tnode,
1155
251
          [](const PTSelectStmt *tnode) -> PTExpr::SharedPtr { return tnode->offset(); },
1156
251
          "OFFSET", &offset));
1157
249
      query_state->set_select_offset(offset);
1158
249
    }
1159
393
  }
1160
1161
3.89M
  return query_state;
1162
3.89M
}
1163
1164
2
Status Executor::GenerateEmptyResult(const PTSelectStmt* tnode) {
1165
2
  YBqlReadOpPtr select_op(tnode->table()->NewQLSelect());
1166
2
  QLRowBlock empty_row_block(tnode->table()->InternalSchema(), {});
1167
2
  faststring buffer;
1168
2
  empty_row_block.Serialize(select_op->request().client(), &buffer);
1169
2
  *select_op->mutable_rows_data() = buffer.ToString();
1170
2
  result_ = std::make_shared<RowsResult>(select_op.get());
1171
1172
2
  return Status::OK();
1173
2
}
1174
1175
Result<bool> Executor::FetchMoreRows(const PTSelectStmt* tnode,
1176
                                     const YBqlReadOpPtr& op,
1177
                                     TnodeContext* tnode_context,
1178
3.92M
                                     ExecContext* exec_context) {
1179
3.92M
  if (!tnode_context->rows_result()) {
1180
0
    return STATUS(InternalError, "Missing result for SELECT operation");
1181
0
  }
1182
1183
3.92M
  QueryPagingState *query_state = tnode_context->query_state();
1184
3.92M
  if (tnode->limit() && query_state->reached_select_limit()) {
1185
    // If the LIMIT clause has been reached, we are done.
1186
3.61M
    RETURN_NOT_OK(tnode_context->ClearQueryState());
1187
3.61M
    return false;
1188
310k
  }
1189
1190
  //------------------------------------------------------------------------------------------------
1191
  // Check if we should fetch more rows.
1192
1193
  // If there is no paging state the current scan has exhausted its results. The paging state
1194
  // might be non-empty, but just contain num_rows_skipped, in this case the
1195
  // 'next_partition_key' and 'next_row_key' would be empty indicating that we've finished
1196
  // reading the current partition.
1197
310k
  if (tnode_context->FinishedReadingPartition()) {
1198
    // If there or no other partitions to query, we are done.
1199
283k
    if (tnode_context->UnreadPartitionsRemaining() <= 1) {
1200
      // Clear the paging state, since we don't have any more data left in the table.
1201
282k
      RETURN_NOT_OK(tnode_context->ClearQueryState());
1202
282k
      return false;
1203
1.10k
    }
1204
1205
    // Sanity check that if we finished a partition the next partition/row key are empty.
1206
    // Otherwise we could start scanning the next partition from the wrong place.
1207
1.10k
    DCHECK(query_state->next_partition_key().empty());
1208
1.10k
    DCHECK(query_state->next_row_key().empty());
1209
1210
    // Otherwise, we continue to the next partition.
1211
1.10k
    tnode_context->AdvanceToNextPartition(op->mutable_request());
1212
1.10k
  }
1213
1214
  // Setup counters in read request to DocDB.
1215
27.8k
  const int64_t current_fetch_row_count = tnode_context->row_count();
1216
27.8k
  const int64_t total_rows_skipped = query_state->skip_count();
1217
27.8k
  const int64_t total_row_count = query_state->read_count();
1218
1219
  // If we reached the fetch limit (min of paging_size and limit clause), this batch is done.
1220
27.8k
  int64_t fetch_limit = query_state->max_fetch_size();
1221
33.5k
  if (fetch_limit >= 0 && current_fetch_row_count >= fetch_limit) {
1222
    // If we need to return a paging state to the user, we create it here so that we can resume from
1223
    // the exact place where we left off: partition index and primary key within that partition.
1224
282
    if (op->request().return_paging_state()) {
1225
282
      query_state->set_original_request_id(exec_context_->params().request_id());
1226
282
      query_state->set_table_id(tnode->table()->id());
1227
282
      query_state->set_total_num_rows_read(total_row_count);
1228
282
      query_state->set_total_rows_skipped(total_rows_skipped);
1229
1230
      // Set the partition to resume from. Relevant for multi-partition selects, i.e. with IN
1231
      // condition on the partition columns.
1232
282
      query_state->set_next_partition_index(tnode_context->current_partition_index());
1233
1234
      // Write paging state to the node's rows_result to prepare for future batches.
1235
282
      RETURN_NOT_OK(tnode_context->ComposeRowsResultForUser(nullptr, true /* for_new_batches */));
1236
282
    }
1237
282
    return false;
1238
27.5k
  }
1239
1240
  //------------------------------------------------------------------------------------------------
1241
  // Fetch more results.
1242
  // Update limit, offset and paging_state information for next scan request.
1243
27.5k
  op->mutable_request()->set_limit(fetch_limit - current_fetch_row_count);
1244
27.5k
  if (tnode->offset()) {
1245
    // The paging state keeps a running count of the number of rows skipped so far.
1246
1.36k
    int64_t offset = std::max(static_cast<int64_t>(0),
1247
1.36k
                              query_state->select_offset() - total_rows_skipped);
1248
1.36k
    op->mutable_request()->set_offset(offset);
1249
1.36k
  }
1250
1251
27.5k
  QLPagingStatePB *paging_state = op->mutable_request()->mutable_paging_state();
1252
27.5k
  paging_state->set_next_partition_key(query_state->next_partition_key());
1253
27.5k
  paging_state->set_next_row_key(query_state->next_row_key());
1254
27.5k
  paging_state->set_total_num_rows_read(total_row_count);
1255
27.5k
  paging_state->set_total_rows_skipped(total_rows_skipped);
1256
1257
27.5k
  return true;
1258
27.5k
}
1259
1260
Result<bool> Executor::FetchRowsByKeys(const PTSelectStmt* tnode,
1261
                                       const YBqlReadOpPtr& select_op,
1262
                                       const QLRowBlock& keys,
1263
1.52k
                                       TnodeContext* tnode_context) {
1264
1.52k
  const Schema& schema = tnode->table()->InternalSchema();
1265
702
  for (const QLRow& key : keys.rows()) {
1266
702
    YBqlReadOpPtr op(tnode->table()->NewQLSelect());
1267
702
    op->set_yb_consistency_level(select_op->yb_consistency_level());
1268
702
    QLReadRequestPB* req = op->mutable_request();
1269
702
    req->CopyFrom(select_op->request());
1270
702
    RETURN_NOT_OK(WhereKeyToPB(req, schema, key));
1271
702
    AddOperation(op, tnode_context);
1272
702
  }
1273
1.52k
  return !keys.rows().empty();
1274
1.52k
}
1275
1276
//--------------------------------------------------------------------------------------------------
1277
1278
1.29M
Status Executor::ExecPTNode(const PTInsertStmt *tnode, TnodeContext* tnode_context) {
1279
  // Create write request.
1280
1.29M
  const shared_ptr<client::YBTable>& table = tnode->table();
1281
1.29M
  YBqlWriteOpPtr insert_op(table->NewQLInsert());
1282
1.29M
  QLWriteRequestPB *req = insert_op->mutable_request();
1283
1284
  // Set the ttl.
1285
1.29M
  Status s = TtlToPB(tnode, req);
1286
1.29M
  if (PREDICT_FALSE(!s.ok())) {
1287
4
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1288
4
  }
1289
1290
  // Set the timestamp.
1291
1.29M
  s = TimestampToPB(tnode, req);
1292
1.29M
  if (PREDICT_FALSE(!s.ok())) {
1293
8
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1294
8
  }
1295
1296
  // Set the values for columns.
1297
1.29M
  if (tnode->InsertingValue()->opcode() == TreeNodeOpcode::kPTInsertJsonClause) {
1298
    // Error messages are already formatted and don't need additional wrap
1299
77
    RETURN_NOT_OK(
1300
77
        InsertJsonClauseToPB(tnode,
1301
77
                             static_cast<PTInsertJsonClause*>(tnode->InsertingValue().get()),
1302
77
                             req));
1303
1.29M
  } else {
1304
1.29M
    s = ColumnArgsToPB(tnode, req);
1305
1.29M
    if (PREDICT_FALSE(!s.ok())) {
1306
      // Note: INVALID_ARGUMENTS is retryable error code (due to mapping into STALE_METADATA),
1307
      //       INVALID_REQUEST - non-retryable.
1308
62
      ErrorCode error_code =
1309
62
          s.code() == Status::kNotSupported || s.code() == Status::kRuntimeError ?
1310
58
          ErrorCode::INVALID_REQUEST : ErrorCode::INVALID_ARGUMENTS;
1311
1312
62
      return exec_context_->Error(tnode, s, error_code);
1313
62
    }
1314
1.29M
  }
1315
1316
  // Setup the column values that need to be read.
1317
1.29M
  s = ColumnRefsToPB(tnode, req->mutable_column_refs());
1318
1.29M
  if (PREDICT_FALSE(!s.ok())) {
1319
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1320
0
  }
1321
1322
  // Set the IF clause.
1323
1.29M
  if (tnode->if_clause() != nullptr) {
1324
67
    s = PTExprToPB(tnode->if_clause(), insert_op->mutable_request()->mutable_if_expr());
1325
67
    if (PREDICT_FALSE(!s.ok())) {
1326
0
      return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS);
1327
0
    }
1328
67
    req->set_else_error(tnode->else_error());
1329
67
  }
1330
1331
  // Set the RETURNS clause if set.
1332
1.29M
  if (tnode->returns_status()) {
1333
267
    req->set_returns_status(true);
1334
267
  }
1335
1336
  // Set whether write op writes to the static/primary row.
1337
1.29M
  insert_op->set_writes_static_row(tnode->ModifiesStaticRow());
1338
1.29M
  insert_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());
1339
1340
  // Add the operation.
1341
1.29M
  return AddOperation(insert_op, tnode_context);
1342
1.29M
}
1343
1344
//--------------------------------------------------------------------------------------------------
1345
1346
740
Status Executor::ExecPTNode(const PTDeleteStmt *tnode, TnodeContext* tnode_context) {
1347
  // Create write request.
1348
740
  const shared_ptr<client::YBTable>& table = tnode->table();
1349
740
  YBqlWriteOpPtr delete_op(table->NewQLDelete());
1350
740
  QLWriteRequestPB *req = delete_op->mutable_request();
1351
1352
  // Set the timestamp.
1353
740
  Status s = TimestampToPB(tnode, req);
1354
740
  if (PREDICT_FALSE(!s.ok())) {
1355
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1356
0
  }
1357
1358
  // Where clause - Hash, range, and regular columns.
1359
  // NOTE: Currently, where clause for write op doesn't allow regular columns.
1360
740
  s = WhereClauseToPB(req, tnode->key_where_ops(), tnode->where_ops(),
1361
740
                      tnode->subscripted_col_where_ops());
1362
740
  if (PREDICT_FALSE(!s.ok())) {
1363
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1364
0
  }
1365
1366
  // Setup the column values that need to be read.
1367
740
  s = ColumnRefsToPB(tnode, req->mutable_column_refs());
1368
740
  if (PREDICT_FALSE(!s.ok())) {
1369
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1370
0
  }
1371
740
  s = ColumnArgsToPB(tnode, req);
1372
740
  if (PREDICT_FALSE(!s.ok())) {
1373
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1374
0
  }
1375
1376
  // Set the IF clause.
1377
740
  if (tnode->if_clause() != nullptr) {
1378
22
    s = PTExprToPB(tnode->if_clause(), delete_op->mutable_request()->mutable_if_expr());
1379
22
    if (PREDICT_FALSE(!s.ok())) {
1380
0
      return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS);
1381
0
    }
1382
22
    req->set_else_error(tnode->else_error());
1383
22
  }
1384
1385
  // Set the RETURNS clause if set.
1386
740
  if (tnode->returns_status()) {
1387
4
    req->set_returns_status(true);
1388
4
  }
1389
1390
  // Set whether write op writes to the static/primary row.
1391
740
  delete_op->set_writes_static_row(tnode->ModifiesStaticRow());
1392
740
  delete_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());
1393
1394
  // Add the operation.
1395
740
  return AddOperation(delete_op, tnode_context);
1396
740
}
1397
1398
//--------------------------------------------------------------------------------------------------
1399
1400
3.24k
Status Executor::ExecPTNode(const PTUpdateStmt *tnode, TnodeContext* tnode_context) {
1401
  // Create write request.
1402
3.24k
  const shared_ptr<client::YBTable>& table = tnode->table();
1403
3.24k
  YBqlWriteOpPtr update_op(table->NewQLUpdate());
1404
3.24k
  QLWriteRequestPB *req = update_op->mutable_request();
1405
1406
  // Where clause - Hash, range, and regular columns.
1407
  // NOTE: Currently, where clause for write op doesn't allow regular columns.
1408
3.24k
  Status s = WhereClauseToPB(req, tnode->key_where_ops(), tnode->where_ops(),
1409
3.24k
                             tnode->subscripted_col_where_ops());
1410
3.24k
  if (PREDICT_FALSE(!s.ok())) {
1411
6
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1412
6
  }
1413
1414
  // Set the ttl.
1415
3.23k
  s = TtlToPB(tnode, req);
1416
3.23k
  if (PREDICT_FALSE(!s.ok())) {
1417
1
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1418
1
  }
1419
1420
  // Set the timestamp.
1421
3.23k
  s = TimestampToPB(tnode, req);
1422
3.23k
  if (PREDICT_FALSE(!s.ok())) {
1423
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1424
0
  }
1425
1426
  // Setup the columns' new values.
1427
3.23k
  s = ColumnArgsToPB(tnode, update_op->mutable_request());
1428
3.23k
  if (PREDICT_FALSE(!s.ok())) {
1429
10
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1430
10
  }
1431
1432
3.22k
  if (req->column_values_size() == 0) {
1433
    // We can reach here only in case of an UPDATE that consists of only setting
1434
    // jsonb col's attributes to 'null' along with ignore_null_jsonb_attributes=true
1435
0
    VLOG(1) << "Avoid updating indexes since 0 cols are written";
1436
22
    if (tnode->returns_status()) {
1437
      // Return row with [applied] = false with appropriate [message].
1438
1
      std::shared_ptr<std::vector<ColumnSchema>> columns =
1439
1
        std::make_shared<std::vector<ColumnSchema>>();
1440
1
      const auto& schema = table->schema();
1441
1
      columns->reserve(schema.num_columns() + 2);
1442
1
      columns->emplace_back("[applied]", DataType::BOOL);
1443
1
      columns->emplace_back("[message]", DataType::STRING);
1444
1
      columns->insert(columns->end(), schema.columns().begin(), schema.columns().end());
1445
1446
1
      QLRowBlock result_row_block(Schema(*columns, 0));
1447
1
      QLRow& row = result_row_block.Extend();
1448
1
      row.mutable_column(0)->set_bool_value(false);
1449
1
      row.mutable_column(1)->set_string_value(
1450
1
        "No update performed as all JSON cols are set to 'null'");
1451
      // Leave the rest of the columns null in this case.
1452
1453
1
      faststring row_data;
1454
1
      result_row_block.Serialize(YQL_CLIENT_CQL, &row_data);
1455
1456
1
      result_ = std::make_shared<RowsResult>(table->name(), columns, row_data.ToString());
1457
21
    } else if (tnode->if_clause() != nullptr) {
1458
      // Return row with [applied] = false.
1459
1
      std::shared_ptr<std::vector<ColumnSchema>> columns =
1460
1
        std::make_shared<std::vector<ColumnSchema>>();
1461
1
      columns->emplace_back("[applied]", DataType::BOOL);
1462
1463
1
      QLRowBlock result_row_block(Schema(*columns, 0));
1464
1
      QLRow& row = result_row_block.Extend();
1465
1
      row.mutable_column(0)->set_bool_value(false);
1466
1467
1
      faststring row_data;
1468
1
      result_row_block.Serialize(YQL_CLIENT_CQL, &row_data);
1469
1470
1
      result_ = std::make_shared<RowsResult>(table->name(), columns, row_data.ToString());
1471
1
    }
1472
1473
22
    return Status::OK();
1474
22
  }
1475
1476
  // Setup the column values that need to be read.
1477
3.20k
  s = ColumnRefsToPB(tnode, req->mutable_column_refs());
1478
3.20k
  if (PREDICT_FALSE(!s.ok())) {
1479
0
    return exec_context_->Error(tnode, s, ErrorCode::INVALID_ARGUMENTS);
1480
0
  }
1481
1482
  // Set the IF clause.
1483
3.20k
  if (tnode->if_clause() != nullptr) {
1484
47
    s = PTExprToPB(tnode->if_clause(), update_op->mutable_request()->mutable_if_expr());
1485
47
    if (PREDICT_FALSE(!s.ok())) {
1486
0
      return exec_context_->Error(tnode->if_clause(), s, ErrorCode::INVALID_ARGUMENTS);
1487
0
    }
1488
47
    req->set_else_error(tnode->else_error());
1489
47
  }
1490
1491
  // Set the RETURNS clause if set.
1492
3.20k
  if (tnode->returns_status()) {
1493
4
    req->set_returns_status(true);
1494
4
  }
1495
1496
  // Set whether write op writes to the static/primary row.
1497
3.20k
  update_op->set_writes_static_row(tnode->ModifiesStaticRow());
1498
3.20k
  update_op->set_writes_primary_row(tnode->ModifiesPrimaryRow());
1499
1500
  // Add the operation.
1501
3.20k
  return AddOperation(update_op, tnode_context);
1502
3.20k
}
1503
1504
//--------------------------------------------------------------------------------------------------
1505
1506
55.6k
Status Executor::ExecPTNode(const PTStartTransaction *tnode) {
1507
55.6k
  return exec_context_->StartTransaction(tnode->isolation_level(), ql_env_, rescheduler_);
1508
55.6k
}
1509
1510
//--------------------------------------------------------------------------------------------------
1511
1512
55.6k
Status Executor::ExecPTNode(const PTCommit *tnode) {
1513
  // Commit happens after the write operations have been flushed and responded.
1514
55.6k
  return Status::OK();
1515
55.6k
}
1516
1517
//--------------------------------------------------------------------------------------------------
1518
1519
3.02k
Status Executor::ExecPTNode(const PTTruncateStmt *tnode) {
1520
3.02k
  return ql_env_->TruncateTable(tnode->table_id());
1521
3.02k
}
1522
1523
//--------------------------------------------------------------------------------------------------
1524
1525
1.61k
Status Executor::ExecPTNode(const PTCreateKeyspace *tnode) {
1526
1.61k
  Status s = ql_env_->CreateKeyspace(tnode->name());
1527
1528
1.61k
  if (PREDICT_FALSE(!s.ok())) {
1529
2
    ErrorCode error_code = ErrorCode::SERVER_ERROR;
1530
1531
2
    if (s.IsAlreadyPresent()) {
1532
2
      if (tnode->create_if_not_exists()) {
1533
        // Case: CREATE KEYSPACE IF NOT EXISTS name;
1534
0
        return Status::OK();
1535
0
      }
1536
1537
2
      error_code = ErrorCode::KEYSPACE_ALREADY_EXISTS;
1538
2
    }
1539
1540
2
    return exec_context_->Error(tnode, s, error_code);
1541
1.60k
  }
1542
1543
1.60k
  result_ = std::make_shared<SchemaChangeResult>("CREATED", "KEYSPACE", tnode->name());
1544
1.60k
  return Status::OK();
1545
1.60k
}
1546
1547
//--------------------------------------------------------------------------------------------------
1548
1549
4.18k
Status Executor::ExecPTNode(const PTUseKeyspace *tnode) {
1550
4.18k
  const MonoTime start_time = MonoTime::Now();
1551
4.18k
  const Status s = ql_env_->UseKeyspace(tnode->name());
1552
4.18k
  if (PREDICT_FALSE(!s.ok())) {
1553
1
    ErrorCode error_code = s.IsNotFound() ? ErrorCode::KEYSPACE_NOT_FOUND : ErrorCode::SERVER_ERROR;
1554
1
    return exec_context_->Error(tnode, s, error_code);
1555
1
  }
1556
1557
4.18k
  result_ = std::make_shared<SetKeyspaceResult>(tnode->name());
1558
1559
4.18k
  if (ql_metrics_ != nullptr) {
1560
4.18k
    const auto delta_usec = (MonoTime::Now() - start_time).ToMicroseconds();
1561
4.18k
    ql_metrics_->ql_use_->Increment(delta_usec);
1562
4.18k
  }
1563
4.18k
  return Status::OK();
1564
4.18k
}
1565
1566
//--------------------------------------------------------------------------------------------------
1567
1568
21
Status Executor::ExecPTNode(const PTAlterKeyspace *tnode) {
1569
  // To get new keyspace properties use: tnode->keyspace_properties()
1570
  // Current implementation only check existence of this keyspace.
1571
21
  const Status s = ql_env_->AlterKeyspace(tnode->name());
1572
1573
21
  if (PREDICT_FALSE(!s.ok())) {
1574
1
    ErrorCode error_code = s.IsNotFound() ? ErrorCode::KEYSPACE_NOT_FOUND : ErrorCode::SERVER_ERROR;
1575
1
    return exec_context_->Error(tnode, s, error_code);
1576
1
  }
1577
1578
20
  result_ = std::make_shared<SchemaChangeResult>("UPDATED", "KEYSPACE", tnode->name());
1579
20
  return Status::OK();
1580
20
}
1581
1582
//--------------------------------------------------------------------------------------------------
1583
1584
namespace {
1585
1586
340
void AddStringRow(const string& str, QLRowBlock* row_block) {
1587
340
  row_block->Extend().mutable_column(0)->set_string_value(str);
1588
340
}
1589
1590
340
void RightPad(const int length, string *s) {
1591
340
  s->append(length - s->length(), ' ');
1592
340
}
1593
} // namespace
1594
1595
165
Status Executor::ExecPTNode(const PTExplainStmt *tnode) {
1596
165
  TreeNode::SharedPtr subStmt = tnode->stmt();
1597
165
  PTDmlStmt *dmlStmt = down_cast<PTDmlStmt *>(subStmt.get());
1598
165
  const YBTableName explainTable(YQL_DATABASE_CQL, "Explain");
1599
165
  ColumnSchema explainColumn("QUERY PLAN", STRING);
1600
165
  auto explainColumns = std::make_shared<std::vector<ColumnSchema>>(
1601
165
      std::initializer_list<ColumnSchema>{explainColumn});
1602
165
  auto explainSchema = std::make_shared<Schema>(*explainColumns, 0);
1603
165
  QLRowBlock row_block(*explainSchema);
1604
165
  faststring buffer;
1605
165
  ExplainPlanPB explain_plan = dmlStmt->AnalysisResultToPB();
1606
165
  switch (explain_plan.plan_case()) {
1607
161
    case ExplainPlanPB::kSelectPlan: {
1608
161
      SelectPlanPB *select_plan = explain_plan.mutable_select_plan();
1609
161
      if (select_plan->has_aggregate()) {
1610
0
        RightPad(select_plan->output_width(), select_plan->mutable_aggregate());
1611
0
        AddStringRow(select_plan->aggregate(), &row_block);
1612
0
      }
1613
161
      RightPad(select_plan->output_width(), select_plan->mutable_select_type());
1614
161
      AddStringRow(select_plan->select_type(), &row_block);
1615
161
      if (select_plan->has_key_conditions()) {
1616
26
        RightPad(select_plan->output_width(), select_plan->mutable_key_conditions());
1617
26
        AddStringRow(select_plan->key_conditions(), &row_block);
1618
26
      }
1619
161
      if (select_plan->has_filter()) {
1620
142
        RightPad(select_plan->output_width(), select_plan->mutable_filter());
1621
142
        AddStringRow(select_plan->filter(), &row_block);
1622
142
      }
1623
161
      break;
1624
0
    }
1625
1
    case ExplainPlanPB::kInsertPlan: {
1626
1
      InsertPlanPB *insert_plan = explain_plan.mutable_insert_plan();
1627
1
      RightPad(insert_plan->output_width(), insert_plan->mutable_insert_type());
1628
1
      AddStringRow(insert_plan->insert_type(), &row_block);
1629
1
      break;
1630
0
    }
1631
1
    case ExplainPlanPB::kUpdatePlan: {
1632
1
      UpdatePlanPB *update_plan = explain_plan.mutable_update_plan();
1633
1
      RightPad(update_plan->output_width(), update_plan->mutable_update_type());
1634
1
      AddStringRow(update_plan->update_type(), &row_block);
1635
1
      RightPad(update_plan->output_width(), update_plan->mutable_scan_type());
1636
1
      AddStringRow(update_plan->scan_type(), &row_block);
1637
1
      RightPad(update_plan->output_width(), update_plan->mutable_key_conditions());
1638
1
      AddStringRow(update_plan->key_conditions(), &row_block);
1639
1
      break;
1640
0
    }
1641
2
    case ExplainPlanPB::kDeletePlan: {
1642
2
      DeletePlanPB *delete_plan = explain_plan.mutable_delete_plan();
1643
2
      RightPad(delete_plan->output_width(), delete_plan->mutable_delete_type());
1644
2
      AddStringRow(delete_plan->delete_type(), &row_block);
1645
2
      RightPad(delete_plan->output_width(), delete_plan->mutable_scan_type());
1646
2
      AddStringRow(delete_plan->scan_type(), &row_block);
1647
2
      RightPad(delete_plan->output_width(), delete_plan->mutable_key_conditions());
1648
2
      AddStringRow(delete_plan->key_conditions(), &row_block);
1649
2
      if (delete_plan->has_filter()) {
1650
1
        RightPad(delete_plan->output_width(), delete_plan->mutable_filter());
1651
1
        AddStringRow(delete_plan->filter(), &row_block);
1652
1
      }
1653
2
      break;
1654
0
    }
1655
0
    case ExplainPlanPB::PLAN_NOT_SET: {
1656
0
      return exec_context_->Error(tnode, ErrorCode::EXEC_ERROR);
1657
0
      break;
1658
165
    }
1659
165
  }
1660
165
  row_block.Serialize(YQL_CLIENT_CQL, &buffer);
1661
165
  result_ = std::make_shared<RowsResult>(explainTable, explainColumns, buffer.ToString());
1662
165
  return Status::OK();
1663
165
}
1664
1665
//--------------------------------------------------------------------------------------------------
1666
1667
namespace {
1668
1669
// When executing a DML in a transaction or a SELECT statement on a transaction-enabled table, the
1670
// following transient errors may happen for which the YCQL service will restart the transaction.
1671
// - TryAgain: when the transaction has a write conflict with another transaction or read-
1672
//             restart is required.
1673
// - Expired:  when the transaction expires due to missed heartbeat
1674
5.37M
bool NeedsRestart(const Status& s) {
1675
5.37M
  return s.IsTryAgain() || s.IsExpired();
1676
5.37M
}
1677
1678
5.31M
bool ShouldRestart(const Status& s, Rescheduler* rescheduler) {
1679
5.31M
  return NeedsRestart(s) && (CoarseMonoClock::now() < rescheduler->GetDeadline());
1680
5.31M
}
1681
1682
// Process TnodeContexts and their children under an ExecContext.
1683
Status ProcessTnodeContexts(ExecContext* exec_context,
1684
5.21M
                            const std::function<Result<bool>(TnodeContext*)>& processor) {
1685
5.33M
  for (TnodeContext& tnode_context : exec_context->tnode_contexts()) {
1686
5.33M
    TnodeContext* p = &tnode_context;
1687
10.6M
    while (p != nullptr) {
1688
5.34M
      const Result<bool> done = processor(p);
1689
5.34M
      RETURN_NOT_OK(done);
1690
5.34M
      if (done.get()) {
1691
56.4k
        return Status::OK();
1692
56.4k
      }
1693
5.28M
      p = p->child_context();
1694
5.28M
    }
1695
5.33M
  }
1696
5.15M
  return Status::OK();
1697
5.21M
}
1698
1699
9.92M
bool NeedsFlush(const client::YBSessionPtr& session) {
1700
  // We need to flush session if we have added operations because some errors are only checked
1701
  // during session flush and passed into flush callback.
1702
9.92M
  return session->HasNotFlushedOperations();
1703
9.92M
}
1704
1705
} // namespace
1706
1707
1.39M
client::YBSessionPtr Executor::GetSession(ExecContext* exec_context) {
1708
1.24M
  return exec_context->HasTransaction() ? exec_context->transactional_session() : session_;
1709
1.39M
}
1710
1711
9.61M
void Executor::FlushAsync(ResetAsyncCalls* reset_async_calls) {
1712
9.61M
  if (num_async_calls() != 0) {
1713
0
    LOG(DFATAL) << __func__ << " while have " << num_async_calls() << " async calls running";
1714
0
    return;
1715
0
  }
1716
1717
  // Buffered read/write operations are flushed in rounds. In each round, FlushAsync() is called to
1718
  // flush buffered operations in the non-transactional session in the Executor or the transactional
1719
  // session in each ExecContext if any. Also, transactions in any ExecContext ready to commit with
1720
  // no more pending operation are also committed. If there is no session to flush nor
1721
  // transaction to commit, the statement is executed.
1722
  //
1723
  // As flushes and commits happen, multiple FlushAsyncDone() and CommitDone() callbacks can be
1724
  // invoked concurrently. To avoid race condition among them, the async-call count before calling
1725
  // FlushAsync() and CommitTransaction(). This is necessary so that only the last callback will
1726
  // correctly detect that all async calls are done invoked before processing the async results
1727
  // exclusively.
1728
9.61M
  write_batch_.Clear();
1729
9.61M
  std::vector<std::pair<YBSessionPtr, ExecContext*>> flush_sessions;
1730
9.61M
  std::vector<ExecContext*> commit_contexts;
1731
9.61M
  if (NeedsFlush(session_)) {
1732
4.66M
    flush_sessions.push_back({session_, nullptr});
1733
4.66M
  }
1734
10.3M
  for (ExecContext& exec_context : exec_contexts_) {
1735
10.3M
    if (exec_context.HasTransaction()) {
1736
231k
      auto transactional_session = exec_context.transactional_session();
1737
231k
      if (NeedsFlush(transactional_session)) {
1738
        // In case or retry we should ignore values that could be written by previous attempts
1739
        // of retried operation.
1740
150k
        transactional_session->SetInTxnLimit(transactional_session->read_point()->Now());
1741
150k
        flush_sessions.push_back({transactional_session, &exec_context});
1742
81.5k
      } else if (!exec_context.HasPendingOperations()) {
1743
78.5k
        commit_contexts.push_back(&exec_context);
1744
78.5k
      }
1745
231k
    }
1746
10.3M
  }
1747
1748
  // Commit transactions first before flushing operations in case some operations are blocked by
1749
  // prior operations in the uncommitted transactions. num_flushes_ is updated before FlushAsync()
1750
  // and CommitTransaction() are called to avoid race condition of recursive FlushAsync() called
1751
  // from FlushAsyncDone() and CommitDone().
1752
9.61M
  num_flushes_ += flush_sessions.size();
1753
9.61M
  async_status_ = Status::OK();
1754
1755
9.61M
  if (flush_sessions.empty() && commit_contexts.empty()) {
1756
    // If this is a batch returning status, append the rows in the user-given order before
1757
    // returning result.
1758
4.70M
    if (IsReturnsStatusBatch()) {
1759
158
      for (ExecContext& exec_context : exec_contexts_) {
1760
158
        int64_t row_count = 0;
1761
158
        RETURN_STMT_NOT_OK(ProcessTnodeContexts(
1762
158
            &exec_context,
1763
158
            [this, &exec_context, &row_count](TnodeContext* tnode_context) -> Result<bool> {
1764
158
              for (client::YBqlOpPtr& op : tnode_context->ops()) {
1765
158
                if (!op->rows_data().empty()) {
1766
158
                  DCHECK_EQ(++row_count, 1) << exec_context.stmt()
1767
158
                                            << " returned multiple status rows";
1768
158
                  RETURN_NOT_OK(AppendRowsResult(std::make_shared<RowsResult>(op.get())));
1769
158
                }
1770
158
              }
1771
158
              return false; // not done
1772
158
            }), reset_async_calls);
1773
158
      }
1774
52
    }
1775
4.70M
    return StatementExecuted(Status::OK(), reset_async_calls);
1776
4.91M
  }
1777
1778
4.91M
  reset_async_calls->Cancel();
1779
4.91M
  num_async_calls_.store(flush_sessions.size() + commit_contexts.size(), std::memory_order_release);
1780
78.5k
  for (auto* exec_context : commit_contexts) {
1781
78.5k
    exec_context->CommitTransaction(
1782
78.5k
        rescheduler_->GetDeadline(), [this, exec_context](const Status& s) {
1783
78.5k
      CommitDone(s, exec_context);
1784
78.5k
    });
1785
78.5k
  }
1786
  // Use the same score on each tablet. So probability of rejecting write should be related
1787
  // to used capacity.
1788
4.91M
  auto rejection_score_source = std::make_shared<client::RejectionScoreSource>();
1789
4.82M
  for (const auto& pair : flush_sessions) {
1790
4.82M
    auto session = pair.first;
1791
4.82M
    auto exec_context = pair.second;
1792
4.82M
    session->SetRejectionScoreSource(rejection_score_source);
1793
4.82M
    TRACE("Flush Async");
1794
4.83M
    session->FlushAsync([this, exec_context](client::FlushStatus* flush_status) {
1795
4.83M
        FlushAsyncDone(flush_status, exec_context);
1796
4.83M
      });
1797
4.82M
  }
1798
4.91M
}
1799
1800
// As multiple FlushAsyncDone() and CommitDone() can be invoked concurrently for different
1801
// ExecContexts, care must be taken so that the callbacks only update the individual ExecContexts.
1802
// Any update on data structures shared in Executor should either be protected by a mutex or
1803
// deferred to ProcessAsyncResults() that will be invoked exclusively.
1804
4.81M
void Executor::FlushAsyncDone(client::FlushStatus* flush_status, ExecContext* exec_context) {
1805
4.81M
  TRACE("Flush Async Done");
1806
  // Process FlushAsync status for either transactional session in an ExecContext, or the
1807
  // non-transactional session in the Executor for other ExecContexts with no transactional session.
1808
1809
  // When any error occurs during the dispatching of YBOperation, YBSession saves the error and
1810
  // returns IOError. When it happens, retrieves the errors and discard the IOError.
1811
4.81M
  Status s = flush_status->status;
1812
4.81M
  OpErrors op_errors;
1813
4.81M
  if (s.IsIOError()) {
1814
56.9k
    for (const auto& error : flush_status->errors) {
1815
56.9k
      op_errors[static_cast<const client::YBqlOp*>(&error->failed_op())] = error->status();
1816
56.9k
    }
1817
56.7k
    s = Status::OK();
1818
56.7k
  }
1819
1820
4.81M
  if (s.ok()) {
1821
4.81M
    if (exec_context != nullptr) {
1822
150k
      s = ProcessAsyncStatus(op_errors, exec_context);
1823
150k
      if (!s.ok()) {
1824
1.41k
        std::lock_guard<std::mutex> lock(status_mutex_);
1825
1.41k
        async_status_ = s;
1826
1.41k
      }
1827
4.66M
    } else {
1828
5.04M
      for (auto& exec_context : exec_contexts_) {
1829
5.04M
        if (!exec_context.HasTransaction()) {
1830
5.04M
          s = ProcessAsyncStatus(op_errors, &exec_context);
1831
5.04M
          if (!s.ok()) {
1832
1.13k
            std::lock_guard<std::mutex> lock(status_mutex_);
1833
1.13k
            async_status_ = s;
1834
1.13k
          }
1835
5.04M
        }
1836
5.04M
      }
1837
4.66M
    }
1838
215
  } else {
1839
215
    std::lock_guard<std::mutex> lock(status_mutex_);
1840
215
    async_status_ = s;
1841
215
  }
1842
1843
  // Process async results exclusively if this is the last callback of the last FlushAsync() and
1844
  // there is no more outstanding async call.
1845
4.84M
  if (AddFetch(&num_async_calls_, -1, std::memory_order_acq_rel) == 0) {
1846
4.84M
    ResetAsyncCalls reset_async_calls(&num_async_calls_);
1847
4.84M
    ProcessAsyncResults(/* rescheduled */ false, &reset_async_calls);
1848
4.84M
  }
1849
4.81M
}
1850
1851
78.5k
void Executor::CommitDone(Status s, ExecContext* exec_context) {
1852
78.5k
  TRACE("Commit Transaction Done");
1853
1854
78.5k
  if (s.ok()) {
1855
32.7k
    if (ql_metrics_ != nullptr) {
1856
32.7k
      const MonoTime now = MonoTime::Now();
1857
32.7k
      const auto delta_usec = (now - exec_context->transaction_start_time()).ToMicroseconds();
1858
32.7k
      ql_metrics_->ql_transaction_->Increment(delta_usec);
1859
32.7k
    }
1860
45.7k
  } else {
1861
45.7k
    if (ShouldRestart(s, rescheduler_)) {
1862
45.7k
      exec_context->Reset(client::Restart::kTrue, rescheduler_);
1863
3
    } else {
1864
3
      std::lock_guard<std::mutex> lock(status_mutex_);
1865
3
      async_status_ = s;
1866
3
    }
1867
45.7k
  }
1868
1869
  // Process async results exclusively if this is the last callback of the last FlushAsync() and
1870
  // there is no more outstanding async call.
1871
78.5k
  if (AddFetch(&num_async_calls_, -1, std::memory_order_acq_rel) == 0) {
1872
78.1k
    ResetAsyncCalls reset_async_calls(&num_async_calls_);
1873
78.1k
    ProcessAsyncResults(/* rescheduled */ false, &reset_async_calls);
1874
78.1k
  }
1875
78.5k
}
1876
1877
9.82M
void Executor::ProcessAsyncResults(const bool rescheduled, ResetAsyncCalls* reset_async_calls) {
1878
9.82M
  if (num_async_calls() != 0) {
1879
0
    LOG(DFATAL) << __func__ << " while have " << num_async_calls() << " async calls running";
1880
0
    return;
1881
0
  }
1882
1883
  // If the current thread is not the RPC worker thread, call the callback directly. Otherwise,
1884
  // reschedule the call to resume in the RPC worker thread.
1885
9.82M
  if (!rescheduled && rescheduler_->NeedReschedule()) {
1886
4.88M
    return rescheduler_->Reschedule(&process_async_results_task_.Bind(this, reset_async_calls));
1887
4.88M
  }
1888
1889
  // Return error immediately when async call failed.
1890
4.94M
  RETURN_STMT_NOT_OK(async_status_, reset_async_calls);
1891
1892
  // Go through each ExecContext and process async results.
1893
4.94M
  bool need_flush = false;
1894
4.94M
  bool has_restart = false;
1895
4.92M
  const MonoTime now = (ql_metrics_ != nullptr) ? MonoTime::Now() : MonoTime();
1896
10.2M
  for (auto exec_itr = exec_contexts_.begin(); exec_itr != exec_contexts_.end(); ) {
1897
1898
    // Set current ExecContext.
1899
5.31M
    exec_context_ = &*exec_itr;
1900
1901
    // Restart a statement if necessary
1902
5.31M
    if (exec_context_->restart()) {
1903
102k
      has_restart = true;
1904
102k
      const TreeNode *root = exec_context_->parse_tree().root().get();
1905
      // Clear partial rows accumulated from the SELECT statement.
1906
102k
      if (root->opcode() == TreeNodeOpcode::kPTSelectStmt) {
1907
33
        result_ = nullptr;
1908
33
      }
1909
1910
      // We should restart read, but read time was specified by caller.
1911
      // For instance it could happen in case of pagination.
1912
102k
      if (exec_context_->params().read_time()) {
1913
0
        return StatementExecuted(
1914
0
            STATUS(IllegalState, "Restart read required, but read time specified by caller"),
1915
0
            reset_async_calls);
1916
0
      }
1917
1918
102k
      YBSessionPtr session = GetSession(exec_context_);
1919
102k
      session->SetReadPoint(client::Restart::kTrue);
1920
102k
      RETURN_STMT_NOT_OK(ExecTreeNode(root), reset_async_calls);
1921
102k
      need_flush |= NeedsFlush(session);
1922
102k
      exec_itr++;
1923
102k
      continue;
1924
5.21M
    }
1925
1926
    // Go through each TnodeContext in an ExecContext and process async results.
1927
5.21M
    auto& tnode_contexts = exec_context_->tnode_contexts();
1928
10.5M
    for (auto tnode_itr = tnode_contexts.begin(); tnode_itr != tnode_contexts.end(); ) {
1929
5.32M
      TnodeContext& tnode_context = *tnode_itr;
1930
1931
5.32M
      const Result<bool> result = ProcessTnodeResults(&tnode_context);
1932
5.32M
      RETURN_STMT_NOT_OK(result, reset_async_calls);
1933
5.32M
      if (*result) {
1934
48.2k
        need_flush = true;
1935
48.2k
      }
1936
1937
      // If this statement is restarted, stop traversing the rest of the statement tnodes.
1938
5.32M
      if (exec_context_->restart()) {
1939
0
        break;
1940
0
      }
1941
1942
      // If there are pending ops, we are not done with this statement tnode yet.
1943
5.32M
      if (tnode_context.HasPendingOperations()) {
1944
69.0k
        tnode_itr++;
1945
69.0k
        continue;
1946
69.0k
      }
1947
1948
      // For SELECT statement, aggregate result sets if needed.
1949
5.25M
      const TreeNode *tnode = tnode_context.tnode();
1950
5.25M
      if (tnode->opcode() == TreeNodeOpcode::kPTSelectStmt) {
1951
3.89M
        RETURN_STMT_NOT_OK(
1952
3.89M
            AggregateResultSets(static_cast<const PTSelectStmt *>(tnode), &tnode_context),
1953
3.89M
            reset_async_calls);
1954
3.89M
      }
1955
1956
      // Update the metrics for SELECT/INSERT/UPDATE/DELETE here after the ops have been completed
1957
      // but exclude the time to commit the transaction if any. Report the metric only once.
1958
5.25M
      if (ql_metrics_ != nullptr && !tnode_context.end_time().Initialized()) {
1959
5.23M
        tnode_context.set_end_time(now);
1960
5.23M
        const auto delta_usec = tnode_context.execution_time().ToMicroseconds();
1961
5.23M
        switch (tnode->opcode()) {
1962
3.89M
          case TreeNodeOpcode::kPTSelectStmt:
1963
3.89M
            ql_metrics_->ql_select_->Increment(delta_usec);
1964
3.89M
            break;
1965
1.23M
          case TreeNodeOpcode::kPTInsertStmt:
1966
1.23M
            ql_metrics_->ql_insert_->Increment(delta_usec);
1967
1.23M
            break;
1968
3.00k
          case TreeNodeOpcode::kPTUpdateStmt:
1969
3.00k
            ql_metrics_->ql_update_->Increment(delta_usec);
1970
3.00k
            break;
1971
695
          case TreeNodeOpcode::kPTDeleteStmt:
1972
695
            ql_metrics_->ql_delete_->Increment(delta_usec);
1973
695
            break;
1974
55.6k
          case TreeNodeOpcode::kPTStartTransaction:
1975
111k
          case TreeNodeOpcode::kPTCommit:
1976
111k
            break;
1977
0
          default:
1978
0
            LOG(FATAL) << "unexpected operation " << tnode->opcode();
1979
5.23M
        }
1980
5.24M
        if (tnode->IsDml()) {
1981
5.13M
          ql_metrics_->time_to_execute_ql_query_->Increment(delta_usec);
1982
5.13M
        }
1983
5.24M
      }
1984
1985
      // If this is a batch returning status, keep the statement tnode with its ops so that we can
1986
      // return the row status when all statements in the batch finish.
1987
5.26M
      if (IsReturnsStatusBatch()) {
1988
469
        tnode_itr++;
1989
469
        continue;
1990
469
      }
1991
1992
      // Move rows results and remove the statement tnode that has completed.
1993
5.26M
      RETURN_STMT_NOT_OK(AppendRowsResult(std::move(tnode_context.rows_result())),
1994
5.26M
                         reset_async_calls);
1995
5.26M
      tnode_itr = tnode_contexts.erase(tnode_itr);
1996
5.26M
    }
1997
1998
    // If the current ExecContext is restarted, process it again. Otherwise, move to the next one.
1999
5.22M
    if (!exec_context_->restart()) {
2000
5.19M
      exec_itr++;
2001
5.19M
    }
2002
5.22M
  }
2003
2004
  // If there are buffered ops that need flushes, the flushes need to be rescheduled if this call
2005
  // hasn't been rescheduled. This is necessary because in an RF1 setup, the flush is a direct
2006
  // local call and the recursive FlushAsync and FlushAsyncDone calls for a full table scan can
2007
  // recurse too deeply hitting the stack limitiation. Rescheduling can also avoid occupying the
2008
  // RPC worker thread for too long starving other CQL calls waiting in the queue. If there is no
2009
  // buffered ops to flush, just call FlushAsync() to commit the transactions if any.
2010
  //
2011
  // When restart is required we will reexecute the whole operation with new transaction.
2012
  // Since restart could happen multiple times, it is possible that we will do it recursively,
2013
  // when local call is enabled.
2014
  // So to avoid stack overflow we use reschedule in this case.
2015
4.95M
  if ((need_flush || has_restart) && !rescheduled) {
2016
0
    rescheduler_->Reschedule(&flush_async_task_.Bind(this, reset_async_calls));
2017
4.95M
  } else {
2018
4.95M
    FlushAsync(reset_async_calls);
2019
4.95M
  }
2020
4.95M
}
2021
2022
5.32M
Result<bool> Executor::ProcessTnodeResults(TnodeContext* tnode_context) {
2023
5.32M
  bool has_buffered_ops = false;
2024
2025
  // Go through each op in a TnodeContext and process async results.
2026
5.32M
  const TreeNode *tnode = tnode_context->tnode();
2027
5.32M
  auto& ops = tnode_context->ops();
2028
10.5M
  for (auto op_itr = ops.begin(); op_itr != ops.end(); ) {
2029
5.21M
    YBqlOpPtr& op = *op_itr;
2030
2031
    // Apply any op that has not been applied and executed.
2032
5.21M
    if (!op->response().has_status()) {
2033
35.7k
      DCHECK_EQ(op->type(), YBOperation::Type::QL_WRITE);
2034
35.7k
      if (write_batch_.Add(
2035
14.7k
          std::static_pointer_cast<YBqlWriteOp>(op), tnode_context, exec_context_)) {
2036
14.7k
        YBSessionPtr session = GetSession(exec_context_);
2037
14.7k
        TRACE("Apply");
2038
14.7k
        session->Apply(op);
2039
14.7k
        has_buffered_ops = true;
2040
14.7k
      }
2041
35.7k
      op_itr++;
2042
35.7k
      continue;
2043
35.7k
    }
2044
2045
    // If the statement is in a transaction, check the status of the current operation. If it
2046
    // failed to apply (either because of an execution error or unsatisfied IF condition), quit the
2047
    // execution and abort the transaction. Also, if this is a batch returning status, mark all
2048
    // other ops in this statement as done and clear the rows data to make sure only one status row
2049
    // is returned from this statement.
2050
    //
2051
    // Note: For an error response, we only get to this point if using 'RETURNS STATUS AS ROW'.
2052
    // Otherwise, ProcessAsyncResults() should have failed so we would have returned above already.
2053
5.18M
    if (exec_context_->HasTransaction() &&
2054
93.2k
        (op->response().status() != QLResponsePB_QLStatus_YQL_STATUS_OK ||
2055
93.1k
         (op->response().has_applied() && !op->response().applied()))) {
2056
109
      exec_context_->AbortTransaction();
2057
109
      if (IsReturnsStatusBatch()) {
2058
103
        RETURN_NOT_OK(ProcessTnodeContexts(
2059
103
            exec_context_,
2060
103
            [&op](TnodeContext* tnode_context) -> Result<bool> {
2061
103
              for (auto& other : tnode_context->ops()) {
2062
103
                if (other != op) {
2063
103
                  other->mutable_response()->set_status(QLResponsePB::YQL_STATUS_OK);
2064
103
                  other->mutable_rows_data()->clear();
2065
103
                }
2066
103
              }
2067
103
              return false; // not done
2068
103
            }));
2069
103
      }
2070
109
    }
2071
2072
    // If the transaction is ready to commit, apply child transaction results if any.
2073
5.18M
    if (exec_context_->HasTransaction()) {
2074
93.1k
      if (tnode_context->HasPendingOperations()) {
2075
        // Defer the child transaction result applying till the last TNode operation finish.
2076
        // This prevents the incomplete operation deletion in the end of the loop.
2077
0
        op_itr++;
2078
0
        continue;
2079
93.1k
      } else {
2080
93.1k
        const QLResponsePB& response = op->response();
2081
93.1k
        if (response.has_child_transaction_result()) {
2082
14.1k
          const auto& result = response.child_transaction_result();
2083
14.1k
          const Status s = exec_context_->ApplyChildTransactionResult(result);
2084
          // If restart is needed, reset the current context and return immediately.
2085
14.1k
          if (ShouldRestart(s, rescheduler_)) {
2086
0
            exec_context_->Reset(client::Restart::kTrue, rescheduler_);
2087
0
            return false;
2088
0
          }
2089
14.1k
          RETURN_NOT_OK(s);
2090
14.1k
        }
2091
93.1k
      }
2092
93.1k
    }
2093
2094
    // If this is a batch returning status, defer appending the row because we need to return the
2095
    // results in the user-given order when all statements in the batch finish.
2096
5.18M
    if (IsReturnsStatusBatch()) {
2097
469
      op_itr++;
2098
469
      continue;
2099
469
    }
2100
2101
    // Append the rows if present.
2102
5.18M
    if (!op->rows_data().empty()) {
2103
3.94M
      SCHECK(!tnode->IsTopLevelReadNode() || tnode_context->query_state() != nullptr,
2104
3.94M
             Corruption, "Query state cannot be NULL for SELECT");
2105
      // NOTE: Although it is odd to check for LIMIT counters before appending a new set of data
2106
      // instead of when counting rows during appending, it is safer to do it this way.
2107
      // - This function is processing callbacks from RPC whenever data is arrived from DocDB.
2108
      // - If the arriving rows exceed the LIMIT, all of them will still be passed to this function
2109
      //   and must be blocked and rejected here before they are appended to tnode_context.
2110
3.94M
      if (tnode->IsTopLevelReadNode() && tnode_context->query_state()->reached_select_limit()) {
2111
        // We've reached the end of scan. Ignore the rest of the operators and results.
2112
5
        RETURN_NOT_OK(tnode_context->ClearQueryState());
2113
5
        break;
2114
3.94M
      }
2115
3.94M
      RETURN_NOT_OK(tnode_context->AppendRowsResult(std::make_shared<RowsResult>(op.get())));
2116
3.94M
    }
2117
2118
    // For SELECT statement, check if there are more rows to fetch and apply the op as needed.
2119
5.18M
    if (tnode->opcode() == TreeNodeOpcode::kPTSelectStmt) {
2120
3.93M
      const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode);
2121
      // Do this except for the parent SELECT with an index. For covered index, we will select
2122
      // from the index only. For uncovered index, the parent SELECT will fetch using the primary
2123
      // keys returned from below.
2124
3.93M
      if (!select_stmt->child_select()) {
2125
3.92M
        DCHECK_EQ(op->type(), YBOperation::Type::QL_READ);
2126
3.92M
        const auto& read_op = std::static_pointer_cast<YBqlReadOp>(op);
2127
3.92M
        if (VERIFY_RESULT(FetchMoreRows(select_stmt, read_op, tnode_context, exec_context_))) {
2128
33.2k
          op->mutable_response()->Clear();
2129
33.2k
          TRACE("Apply");
2130
33.2k
          session_->Apply(op);
2131
33.2k
          has_buffered_ops = true;
2132
33.2k
          op_itr++;
2133
33.2k
          continue;
2134
33.2k
        }
2135
5.14M
      }
2136
3.93M
    }
2137
2138
    // Remove the op that has completed.
2139
5.14M
    op_itr = ops.erase(op_itr);
2140
5.14M
  }
2141
2142
  // If there is a child context, process it.
2143
5.32M
  TnodeContext* child_context = tnode_context->child_context();
2144
5.32M
  if (child_context != nullptr) {
2145
3.63k
    const TreeNode *child_tnode = child_context->tnode();
2146
3.63k
    if (VERIFY_RESULT(ProcessTnodeResults(child_context))) {
2147
1.51k
      has_buffered_ops = true;
2148
1.51k
    }
2149
2150
    // If the child selects from an uncovered index, extract the primary keys returned and use them
2151
    // to select from the indexed table.
2152
3.63k
    RSTATUS_DCHECK_EQ(tnode->opcode(), TreeNodeOpcode::kPTSelectStmt,
2153
3.63k
                      Corruption, "Expecting SELECT opcode");
2154
3.63k
    RSTATUS_DCHECK_EQ(child_tnode->opcode(), TreeNodeOpcode::kPTSelectStmt,
2155
3.63k
                      Corruption, "Expecting nested SELECT opcode");
2156
3.63k
    RSTATUS_DCHECK(!static_cast<const PTSelectStmt *>(child_tnode)->index_id().empty(),
2157
3.63k
                   Corruption, "Expecting valid index id");
2158
2159
3.63k
    const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode);
2160
3.63k
    const auto* child_select = static_cast<const PTSelectStmt *>(child_tnode);
2161
2162
3.63k
    string& rows_data = child_context->rows_result()->rows_data();
2163
3.63k
    if (!child_select->covers_fully() && !rows_data.empty()) {
2164
1.52k
      QLRowBlock* keys = tnode_context->keys();
2165
1.52k
      keys->rows().clear();
2166
1.52k
      Slice data(rows_data);
2167
1.52k
      RETURN_NOT_OK(keys->Deserialize(YQL_CLIENT_CQL,  &data));
2168
1.52k
      const YBqlReadOpPtr& select_op = tnode_context->uncovered_select_op();
2169
1.52k
      if (VERIFY_RESULT(FetchRowsByKeys(select_stmt, select_op, *keys, tnode_context))) {
2170
361
        has_buffered_ops = true;
2171
361
      }
2172
1.52k
      rows_data.clear();
2173
1.52k
    }
2174
2175
    // Finalize the execution.  We will send this result to users, and they send us subsequent
2176
    // requests if the paging state is not empty.
2177
    // 1. Case no child: The result is already in the node.
2178
    // 1. Case fully_covered index: The result is in child_select node.
2179
    // 2. Case partially_covered index:
2180
    //    - The result and row-counter are kept in parent node.
2181
    //    - The paging state is in the child node.
2182
3.63k
    if (!tnode_context->HasPendingOperations() && !child_context->HasPendingOperations()) {
2183
1.91k
      RETURN_NOT_OK(tnode_context->ComposeRowsResultForUser(child_select,
2184
1.91k
                                                            false /* for_new_batches */));
2185
1.91k
    }
2186
3.63k
  }
2187
2188
5.32M
  return has_buffered_ops;
2189
5.32M
}
2190
2191
//--------------------------------------------------------------------------------------------------
2192
2193
namespace {
2194
2195
// Check if index updates can be issued from CQL proxy directly when executing a DML. Only indexes
2196
// that index primary key columns only may be updated from CQL proxy.
2197
337
bool UpdateIndexesLocally(const PTDmlStmt *tnode, const QLWriteRequestPB& req) {
2198
337
  if (req.has_if_expr() || req.returns_status()) {
2199
2
    return false;
2200
2
  }
2201
2202
335
  switch (req.type()) {
2203
    // For insert, the pk-only indexes can be updated from CQL proxy directly.
2204
241
    case QLWriteRequestPB::QL_STMT_INSERT:
2205
241
      return true;
2206
2207
    // For update, the pk-only indexes can be updated from CQL proxy directly only when not all
2208
    // columns are set to null. Otherwise, the row may be removed by the DML if it was created via
2209
    // update (i.e. no liveness column) and the remaining columns are already null.
2210
66
    case QLWriteRequestPB::QL_STMT_UPDATE: {
2211
68
      for (const auto& column_value : req.column_values()) {
2212
68
        switch (column_value.expr().expr_case()) {
2213
68
          case QLExpressionPB::ExprCase::kValue:
2214
68
            if (!IsNull(column_value.expr().value())) {
2215
60
              return true;
2216
60
            }
2217
8
            break;
2218
0
          case QLExpressionPB::ExprCase::kColumnId: FALLTHROUGH_INTENDED;
2219
0
          case QLExpressionPB::ExprCase::kSubscriptedCol: FALLTHROUGH_INTENDED;
2220
0
          case QLExpressionPB::ExprCase::kJsonColumn: FALLTHROUGH_INTENDED;
2221
0
          case QLExpressionPB::ExprCase::kBfcall: FALLTHROUGH_INTENDED;
2222
0
          case QLExpressionPB::ExprCase::kTscall: FALLTHROUGH_INTENDED;
2223
0
          case QLExpressionPB::ExprCase::kCondition: FALLTHROUGH_INTENDED;
2224
0
          case QLExpressionPB::ExprCase::kBocall: FALLTHROUGH_INTENDED;
2225
0
          case QLExpressionPB::ExprCase::kBindId: FALLTHROUGH_INTENDED;
2226
0
          case QLExpressionPB::ExprCase::EXPR_NOT_SET:
2227
0
            return false;
2228
68
        }
2229
68
      }
2230
6
      return false;
2231
66
    }
2232
    // For delete, the pk-only indexes can be updated from CQL proxy directly only if the whole
2233
    // is deleted and it is not a range delete.
2234
28
    case QLWriteRequestPB::QL_STMT_DELETE: {
2235
28
      const Schema& schema = tnode->table()->InternalSchema();
2236
28
      return (req.column_values().empty() &&
2237
12
              static_cast<size_t>(req.range_column_values_size()) ==
2238
12
                  schema.num_range_key_columns());
2239
0
    }
2240
0
  }
2241
0
  return false; // Not feasible
2242
0
}
2243
2244
} // namespace
2245
2246
Status Executor::UpdateIndexes(const PTDmlStmt *tnode,
2247
                               QLWriteRequestPB *req,
2248
85.2k
                               TnodeContext* tnode_context) {
2249
  // DML with TTL is not allowed if indexes are present.
2250
85.2k
  if (req->has_ttl()) {
2251
0
    return exec_context_->Error(tnode, ErrorCode::FEATURE_NOT_SUPPORTED);
2252
0
  }
2253
2254
  // If updates of pk-only indexes can be issued from CQL proxy directly, do it. Otherwise, add
2255
  // them to the list of indexes to be updated from tserver.
2256
85.2k
  if (!tnode->pk_only_indexes().empty()) {
2257
336
    if (UpdateIndexesLocally(tnode, *req)) {
2258
307
      RETURN_NOT_OK(AddIndexWriteOps(tnode, *req, tnode_context));
2259
29
    } else {
2260
146
      for (const auto& index : tnode->pk_only_indexes()) {
2261
146
        req->add_update_index_ids(index->id());
2262
146
      }
2263
29
    }
2264
336
  }
2265
2266
  // Add non-pk-only indexes to the list of indexes to be updated from tserver also.
2267
107k
  for (const auto& index_id : tnode->non_pk_only_indexes()) {
2268
107k
    req->add_update_index_ids(index_id);
2269
107k
  }
2270
2271
  // For update/delete, check if it just deletes some columns. If so, add the rest columns to be
2272
  // read so that tserver can check if they are all null also. We require this information (whether
2273
  // all columns are null) for rows which don't have a liveness column - for such a row (i.e.,
2274
  // without liveness column, if all columns are null, the row is as good as deleted. And in this
2275
  // case, the tserver will have to remove the corresponding index entries from indexes.
2276
85.2k
  if ((req->type() == QLWriteRequestPB::QL_STMT_UPDATE ||
2277
82.4k
       req->type() == QLWriteRequestPB::QL_STMT_DELETE) &&
2278
2.62k
      !req->column_values().empty()) {
2279
2.37k
    bool all_null = true;
2280
2.37k
    std::set<int32> column_dels;
2281
2.37k
    const Schema& schema = tnode->table()->InternalSchema();
2282
2.94k
    for (const QLColumnValuePB& column_value : req->column_values()) {
2283
2.94k
      const ColumnSchema& col_desc = VERIFY_RESULT(
2284
2.94k
        schema.column_by_id(ColumnId(column_value.column_id())));
2285
2286
2.94k
      if (column_value.has_expr() &&
2287
2.94k
          column_value.expr().has_value() &&
2288
2.90k
          !col_desc.is_static() && // Don't consider static column values.
2289
2.90k
          !IsNull(column_value.expr().value())) {
2290
2.24k
        all_null = false;
2291
2.24k
        break;
2292
2.24k
      }
2293
700
      column_dels.insert(column_value.column_id());
2294
700
    }
2295
2.37k
    if (all_null) {
2296
      // Ensure all columns of row are read by docdb layer before performing the write operation.
2297
123
      const MCSet<int32>& column_refs = tnode->column_refs();
2298
413
      for (size_t idx = schema.num_key_columns(); idx < schema.num_columns(); idx++) {
2299
290
        const int32 column_id = schema.column_id(idx);
2300
290
        if (!schema.column(idx).is_static() &&
2301
288
            column_refs.count(column_id) == 0 && // Add col only if not already in column_refs.
2302
84
            column_dels.count(column_id) == 0) {
2303
            // If col is already in delete list, don't add it. This is okay because of the following
2304
            // reason.
2305
            //
2306
            // We reach here if we have -
2307
            //   1. an UPDATE statement with all = NULL type of set clauses
2308
            //   2. a DELETE statement on some cols. This is as good as setting those cols to NULL.
2309
            //
2310
            // If column is not in column_refs but already there in the column_dels list, we need
2311
            // not add it in column_refs because the IsRowDeleted() function in cql_operation.cc
2312
            // doesn't need to know if this column was NULL or not in the old/existing row.
2313
            // Since the new row has BULL for this column, the loop in the function "continue"s.
2314
            //
2315
            // Also, if a column is deleted/set to NULL, you might wonder why we don't add it to
2316
            // column_refs in case it is part of an index (in which case we need to delete the
2317
            // index entry for the old value). But this isn't an issue, because the column would
2318
            // already have been added to column_refs as part of AnalyzeIndexesForWrites().
2319
10
          req->mutable_column_refs()->add_ids(column_id);
2320
10
        }
2321
290
      }
2322
123
    }
2323
2.37k
  }
2324
2325
85.2k
  if (!req->update_index_ids().empty() && tnode->RequiresTransaction()) {
2326
82.0k
    RETURN_NOT_OK(exec_context_->PrepareChildTransaction(
2327
82.0k
        rescheduler_->GetDeadline(), req->mutable_child_transaction_data()));
2328
82.0k
  }
2329
85.2k
  return Status::OK();
2330
85.2k
}
2331
2332
// Add the write operations to update the pk-only indexes.
2333
Status Executor::AddIndexWriteOps(const PTDmlStmt *tnode,
2334
                                  const QLWriteRequestPB& req,
2335
306
                                  TnodeContext* tnode_context) {
2336
306
  const Schema& schema = tnode->table()->InternalSchema();
2337
306
  const bool is_upsert = (req.type() == QLWriteRequestPB::QL_STMT_INSERT ||
2338
66
                          req.type() == QLWriteRequestPB::QL_STMT_UPDATE);
2339
  // Populate a column-id to value map.
2340
306
  std::unordered_map<ColumnId, const QLExpressionPB&> values;
2341
920
  for (size_t i = 0; i < schema.num_hash_key_columns(); i++) {
2342
614
    values.emplace(schema.column_id(i), req.hashed_column_values(narrow_cast<int>(i)));
2343
614
  }
2344
477
  for (size_t i = 0; i < schema.num_range_key_columns(); i++) {
2345
171
    values.emplace(schema.column_id(schema.num_hash_key_columns() + i),
2346
171
                   req.range_column_values(narrow_cast<int>(i)));
2347
171
  }
2348
306
  if (is_upsert) {
2349
425
    for (const auto& column_value : req.column_values()) {
2350
425
      values.emplace(ColumnId(column_value.column_id()), column_value.expr());
2351
425
    }
2352
302
  }
2353
2354
  // Create the write operation for each index and populate it using the original operation.
2355
  // CQL does not allow the primary key to be updated, so PK-only index rows will be either
2356
  // deleted when the row in the main table is deleted, or it will be inserted into the index
2357
  // when a row is inserted into the main table or updated (for a non-pk column).
2358
598
  for (const auto& index_table : tnode->pk_only_indexes()) {
2359
598
    const IndexInfo* index =
2360
598
        VERIFY_RESULT(tnode->table()->index_map().FindIndex(index_table->id()));
2361
567
    const bool index_ready_to_accept = (is_upsert ? index->HasWritePermission()
2362
31
                                                  : index->HasDeletePermission());
2363
598
    if (!index_ready_to_accept) {
2364
0
      VLOG(2) << "Index not ready to apply operaton " << index->ToString();
2365
      // We are in the process of backfilling the index. It should not be updated with a
2366
      // write/delete yet. The backfill stage will update the index for such entries.
2367
60
      continue;
2368
60
    }
2369
538
    YBqlWriteOpPtr index_op(is_upsert ? index_table->NewQLInsert() : index_table->NewQLDelete());
2370
538
    index_op->set_writes_primary_row(true);
2371
538
    QLWriteRequestPB *index_req = index_op->mutable_request();
2372
538
    index_req->set_request_id(req.request_id());
2373
538
    index_req->set_query_id(req.query_id());
2374
2.66k
    for (size_t i = 0; i < index->columns().size(); i++) {
2375
2.12k
      const ColumnId indexed_column_id = index->column(i).indexed_column_id;
2376
2.12k
      if (i < index->hash_column_count()) {
2377
655
        *index_req->add_hashed_column_values() = values.at(indexed_column_id);
2378
1.47k
      } else if (i < index->key_column_count()) {
2379
1.11k
        *index_req->add_range_column_values() = values.at(indexed_column_id);
2380
360
      } else if (is_upsert) {
2381
342
        const auto itr = values.find(indexed_column_id);
2382
342
        if (itr != values.end()) {
2383
262
          QLColumnValuePB* column_value = index_req->add_column_values();
2384
262
          column_value->set_column_id(index->column(i).column_id);
2385
262
          *column_value->mutable_expr() = itr->second;
2386
262
        }
2387
342
      }
2388
2.12k
    }
2389
538
    RETURN_NOT_OK(AddOperation(index_op, tnode_context));
2390
538
  }
2391
2392
306
  return Status::OK();
2393
306
}
2394
2395
//--------------------------------------------------------------------------------------------------
2396
2397
bool Executor::WriteBatch::Add(const YBqlWriteOpPtr& op,
2398
                               const TnodeContext* tnode_context,
2399
1.33M
                               ExecContext* exec_context) {
2400
1.33M
  if (FLAGS_ycql_serial_operation_in_transaction_block &&
2401
      // Inside BEGIN TRANSACTION; ... END TRANSACTION;
2402
1.33M
      exec_context && exec_context->HasTransaction()) {
2403
186k
    bool allow_parallel_exec = false;  // Always False for the main table.
2404
2405
    // Check if the index-update can be executed in parallel - only
2406
    // when the main table update is complete or started.
2407
    // The first op in TNode is treated as the main table operation.
2408
    // size = 1 means this (usually main table) 'op' is the first and one only op now.
2409
    // size > 1 means main op + a set of secondary index operations.
2410
186k
    if (tnode_context->ops().size() > 1) {
2411
901
      const client::YBqlOpPtr main_tbl_op = tnode_context->ops()[0];
2412
      // If the main table operation is complete or just started.
2413
901
      allow_parallel_exec = main_tbl_op->response().has_status() ||
2414
900
          exec_context->transactional_session()->IsInProgress(main_tbl_op);
2415
901
    }
2416
2417
186k
    if (!allow_parallel_exec) {
2418
186k
      if (Empty()) {
2419
149k
        ops_by_primary_key_.insert(op);
2420
149k
        return true; // Start first write op execution.
2421
149k
      }
2422
36.6k
      return false;
2423
36.6k
    }
2424
186k
  }
2425
2426
  // Checks if the write operation reads the primary/static row and if another operation that writes
2427
  // the primary/static row by the same primary/hash key already exists.
2428
1.14M
  if ((op->ReadsPrimaryRow() && ops_by_primary_key_.count(op) > 0) ||
2429
1.14M
      (op->ReadsStaticRow() && ops_by_hash_key_.count(op) > 0)) {
2430
10
    return false;
2431
10
  }
2432
2433
1.14M
  if (op->WritesPrimaryRow()) { ops_by_primary_key_.insert(op); }
2434
1.14M
  if (op->WritesStaticRow()) { ops_by_hash_key_.insert(op); }
2435
1.14M
  return true;
2436
1.14M
}
2437
2438
14.2M
void Executor::WriteBatch::Clear() {
2439
14.2M
  ops_by_primary_key_.clear();
2440
14.2M
  ops_by_hash_key_.clear();
2441
14.2M
}
2442
2443
4.09M
bool Executor::WriteBatch::Empty() const {
2444
4.09M
  return ops_by_primary_key_.empty() &&  ops_by_hash_key_.empty();
2445
4.09M
}
2446
2447
//--------------------------------------------------------------------------------------------------
2448
2449
3.88M
void Executor::AddOperation(const YBqlReadOpPtr& op, TnodeContext *tnode_context) {
2450
22.4k
  DCHECK(write_batch_.Empty()) << "Concurrent read and write operations not supported yet";
2451
2452
3.88M
  op->mutable_request()->set_request_id(exec_context_->params().request_id());
2453
3.88M
  tnode_context->AddOperation(op);
2454
2455
  // We need consistent read point if statement is executed in multiple RPC commands.
2456
3.88M
  if (tnode_context->UnreadPartitionsRemaining() > 0 ||
2457
3.89M
      op->request().hashed_column_values().empty()) {
2458
178k
    session_->SetForceConsistentRead(client::ForceConsistentRead::kTrue);
2459
178k
  }
2460
2461
3.88M
  TRACE("Apply");
2462
3.88M
  session_->Apply(op);
2463
3.88M
}
2464
2465
1.29M
Status Executor::AddOperation(const YBqlWriteOpPtr& op, TnodeContext* tnode_context) {
2466
1.29M
  tnode_context->AddOperation(op);
2467
2468
  // Check for inter-dependency in the current write batch before applying the write operation.
2469
  // Apply it in the transactional session in exec_context for the current statement if there is
2470
  // one. Otherwise, apply to the non-transactional session in the executor.
2471
1.29M
  if (write_batch_.Add(op, tnode_context, exec_context_)) {
2472
1.28M
    YBSessionPtr session = GetSession(exec_context_);
2473
1.28M
    TRACE("Apply");
2474
1.28M
    session->Apply(op);
2475
1.28M
  }
2476
2477
  // Also update secondary indexes if needed.
2478
1.29M
  if (op->table()->index_map().empty()) {
2479
1.21M
    return Status::OK();
2480
1.21M
  }
2481
84.4k
  const auto* dml_stmt = static_cast<const PTDmlStmt*>(tnode_context->tnode());
2482
84.4k
  return UpdateIndexes(dml_stmt, op->mutable_request(), tnode_context);
2483
84.4k
}
2484
2485
//--------------------------------------------------------------------------------------------------
2486
2487
10.2M
Status Executor::ProcessStatementStatus(const ParseTree& parse_tree, const Status& s) {
2488
10.2M
  if (PREDICT_FALSE(!s.ok() && s.IsQLError() && !parse_tree.reparsed())) {
2489
    // If execution fails because the statement was analyzed with stale metadata cache, the
2490
    // statement needs to be reparsed and re-analyzed. Symptoms of stale metadata are as listed
2491
    // below. Expand the list in future as new cases arise.
2492
    // - TABLET_NOT_FOUND when the tserver fails to execute the YBQLOp because the tablet is not
2493
    //   found (ENG-945).
2494
    // - WRONG_METADATA_VERSION when the schema version the tablet holds is different from the one
2495
    //   used by the semantic analyzer.
2496
    // - INVALID_TABLE_DEFINITION when a referenced user-defined type is not found.
2497
    // - INVALID_ARGUMENTS when the column datatype is inconsistent with the supplied value in an
2498
    //   INSERT or UPDATE statement.
2499
4.76k
    const ErrorCode errcode = GetErrorCode(s);
2500
4.76k
    if (errcode == ErrorCode::TABLET_NOT_FOUND         ||
2501
4.36k
        errcode == ErrorCode::WRONG_METADATA_VERSION   ||
2502
2.69k
        errcode == ErrorCode::INVALID_TABLE_DEFINITION ||
2503
2.69k
        errcode == ErrorCode::INVALID_TYPE_DEFINITION  ||
2504
2.69k
        errcode == ErrorCode::INVALID_ARGUMENTS        ||
2505
2.58k
        errcode == ErrorCode::OBJECT_NOT_FOUND         ||
2506
2.58k
        errcode == ErrorCode::TYPE_NOT_FOUND) {
2507
2.19k
      if (errcode == ErrorCode::INVALID_ARGUMENTS) {
2508
        // Check the table schema is up-to-date.
2509
110
        const shared_ptr<client::YBTable> table = GetTableFromStatement(parse_tree.root().get());
2510
110
        if (table) {
2511
110
          const uint32_t current_schema_ver = table->schema().version();
2512
110
          uint32_t updated_schema_ver = 0;
2513
110
          const Status s_get_schema = ql_env_->GetUpToDateTableSchemaVersion(
2514
110
              table->name(), &updated_schema_ver);
2515
2516
110
          if (s_get_schema.ok() && updated_schema_ver == current_schema_ver) {
2517
102
            return s; // Do not retry via STALE_METADATA code if the table schema is up-to-date.
2518
102
          }
2519
2.08k
        }
2520
110
      }
2521
2522
2.08k
      parse_tree.ClearAnalyzedTableCache(ql_env_);
2523
2.08k
      parse_tree.ClearAnalyzedUDTypeCache(ql_env_);
2524
2.08k
      parse_tree.set_stale();
2525
2.08k
      return ErrorStatus(ErrorCode::STALE_METADATA);
2526
2.08k
    }
2527
4.76k
  }
2528
10.2M
  return s;
2529
10.2M
}
2530
2531
Status Executor::ProcessOpStatus(const PTDmlStmt* stmt,
2532
                                 const YBqlOpPtr& op,
2533
5.19M
                                 ExecContext* exec_context) {
2534
5.19M
  const QLResponsePB &resp = op->response();
2535
  // Returns if this op was deferred and has not been completed, or it has been completed okay.
2536
5.19M
  if (!resp.has_status() || resp.status() == QLResponsePB::YQL_STATUS_OK) {
2537
5.17M
    return Status::OK();
2538
5.17M
  }
2539
2540
20.9k
  if (resp.status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
2541
33
    auto s = STATUS(TryAgain, resp.error_message());
2542
33
    RETURN_NOT_OK(audit_logger_.LogStatementError(stmt, exec_context_->stmt(), s,
2543
33
                                                  ErrorIsFormatted::kFalse));
2544
33
    return s;
2545
20.8k
  }
2546
2547
  // If we got an error we need to manually produce a result in the op.
2548
20.8k
  if (stmt->IsWriteOp() && stmt->returns_status()) {
2549
109
    std::vector<ColumnSchema> columns;
2550
109
    const auto& schema = stmt->table()->schema();
2551
109
    columns.reserve(stmt->table()->schema().num_columns() + 2);
2552
109
    columns.emplace_back("[applied]", DataType::BOOL);
2553
109
    columns.emplace_back("[message]", DataType::STRING);
2554
109
    columns.insert(columns.end(), schema.columns().begin(), schema.columns().end());
2555
109
    auto* column_schemas = op->mutable_response()->mutable_column_schemas();
2556
109
    column_schemas->Clear();
2557
454
    for (const auto& column : columns) {
2558
454
      ColumnSchemaToPB(column, column_schemas->Add());
2559
454
    }
2560
2561
109
    QLRowBlock result_row_block(Schema(columns, 0));
2562
109
    QLRow& row = result_row_block.Extend();
2563
109
    row.mutable_column(0)->set_bool_value(false);
2564
109
    row.mutable_column(1)->set_string_value(resp.error_message());
2565
    // Leave the rest of the columns null in this case.
2566
2567
109
    faststring row_data;
2568
109
    result_row_block.Serialize(YQL_CLIENT_CQL, &row_data);
2569
109
    *op->mutable_rows_data() = row_data.ToString();
2570
109
    return Status::OK();
2571
109
  }
2572
2573
20.7k
  const ErrorCode errcode = QLStatusToErrorCode(resp.status());
2574
20.7k
  auto s = exec_context->Error(stmt, resp.error_message().c_str(), errcode);
2575
20.7k
  RETURN_NOT_OK(audit_logger_.LogStatementError(stmt, exec_context_->stmt(), s,
2576
20.7k
                                                ErrorIsFormatted::kTrue));
2577
20.7k
  return s;
2578
20.7k
}
2579
2580
5.20M
Status Executor::ProcessAsyncStatus(const OpErrors& op_errors, ExecContext* exec_context) {
2581
5.20M
  return ProcessTnodeContexts(
2582
5.20M
      exec_context,
2583
5.36M
      [this, exec_context, &op_errors](TnodeContext* tnode_context) -> Result<bool> {
2584
5.36M
        const TreeNode* tnode = tnode_context->tnode();
2585
5.23M
        for (auto& op : tnode_context->ops()) {
2586
5.23M
          Status s;
2587
5.23M
          const auto itr = op_errors.find(op.get());
2588
5.23M
          if (itr != op_errors.end()) {
2589
56.9k
            s = itr->second;
2590
56.9k
          }
2591
5.23M
          if (PREDICT_FALSE(!s.ok() && !NeedsRestart(s))) {
2592
            // YBOperation returns not-found error when the tablet is not found.
2593
401
            const auto errcode = s.IsNotFound() ? ErrorCode::TABLET_NOT_FOUND
2594
65
                                                : ErrorCode::EXEC_ERROR;
2595
466
            s = exec_context->Error(tnode, s, errcode);
2596
466
          }
2597
5.23M
          if (s.ok()) {
2598
15.4k
            DCHECK(tnode->IsDml()) << "Only DML should issue a read/write operation";
2599
5.18M
            s = ProcessOpStatus(static_cast<const PTDmlStmt *>(tnode), op, exec_context);
2600
5.18M
          }
2601
5.23M
          if (ShouldRestart(s, rescheduler_)) {
2602
56.6k
            exec_context->Reset(client::Restart::kTrue, rescheduler_);
2603
56.6k
            return true; // done
2604
56.6k
          }
2605
5.18M
          RETURN_NOT_OK(ProcessStatementStatus(exec_context->parse_tree(), s));
2606
5.18M
        }
2607
5.30M
        return false; // not done
2608
5.36M
      });
2609
5.20M
}
2610
2611
5.25M
Status Executor::AppendRowsResult(RowsResult::SharedPtr&& rows_result) {
2612
5.25M
  if (!rows_result) {
2613
1.34M
    return Status::OK();
2614
1.34M
  }
2615
3.90M
  if (!result_) {
2616
3.89M
    result_ = std::move(rows_result);
2617
3.89M
    return Status::OK();
2618
3.89M
  }
2619
1.99k
  CHECK(result_->type() == ExecutedResult::Type::ROWS);
2620
1.99k
  return std::static_pointer_cast<RowsResult>(result_)->Append(std::move(*rows_result));
2621
1.99k
}
2622
2623
4.70M
void Executor::StatementExecuted(const Status& s, ResetAsyncCalls* reset_async_calls) {
2624
  // Update metrics for all statements executed.
2625
4.70M
  if (s.ok() && ql_metrics_ != nullptr) {
2626
5.08M
    for (auto& exec_context : exec_contexts_) {
2627
15.8k
      for (auto& tnode_context : exec_context.tnode_contexts()) {
2628
15.8k
        const TreeNode* tnode = tnode_context.tnode();
2629
15.8k
        if (tnode != nullptr) {
2630
15.8k
          switch (tnode->opcode()) {
2631
4
            case TreeNodeOpcode::kPTSelectStmt: FALLTHROUGH_INTENDED;
2632
162
            case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED;
2633
176
            case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED;
2634
176
            case TreeNodeOpcode::kPTDeleteStmt: FALLTHROUGH_INTENDED;
2635
4.35k
            case TreeNodeOpcode::kPTUseKeyspace: FALLTHROUGH_INTENDED;
2636
4.35k
            case TreeNodeOpcode::kPTListNode:   FALLTHROUGH_INTENDED;
2637
4.35k
            case TreeNodeOpcode::kPTStartTransaction: FALLTHROUGH_INTENDED;
2638
4.35k
            case TreeNodeOpcode::kPTCommit:
2639
              // The metrics for SELECT/INSERT/UPDATE/DELETE have been updated when the ops have
2640
              // been completed in FlushAsyncDone(). Exclude PTListNode also as we are interested
2641
              // in the metrics of its constituent DMLs only. Transaction metrics have been
2642
              // updated in CommitDone().
2643
              // The metrics for USE have been updated in ExecPTNode().
2644
4.35k
              break;
2645
11.5k
            default: {
2646
11.5k
              const MonoTime now = MonoTime::Now();
2647
11.5k
              const auto delta_usec = (now - tnode_context.start_time()).ToMicroseconds();
2648
11.5k
              ql_metrics_->ql_others_->Increment(delta_usec);
2649
11.5k
              ql_metrics_->time_to_execute_ql_query_->Increment(delta_usec);
2650
11.5k
              break;
2651
4.35k
            }
2652
15.8k
          }
2653
15.8k
        }
2654
15.8k
      }
2655
5.08M
      ql_metrics_->num_retries_to_execute_ql_->Increment(exec_context.num_retries());
2656
5.08M
    }
2657
4.69M
    ql_metrics_->num_flushes_to_execute_ql_->Increment(num_flushes_);
2658
4.69M
  }
2659
2660
  // Clean up and invoke statement-executed callback.
2661
4.70M
  ExecutedResult::SharedPtr result = s.ok() ? std::move(result_) : nullptr;
2662
4.70M
  StatementExecutedCallback cb = std::move(cb_);
2663
4.70M
  Reset(reset_async_calls);
2664
4.70M
  cb.Run(s, result);
2665
4.70M
}
2666
2667
4.71M
void Executor::Reset(ResetAsyncCalls* reset_async_calls) {
2668
4.71M
  exec_context_ = nullptr;
2669
4.71M
  exec_contexts_.clear();
2670
4.71M
  write_batch_.Clear();
2671
4.71M
  session_->Abort();
2672
4.71M
  num_flushes_ = 0;
2673
4.71M
  result_ = nullptr;
2674
4.71M
  cb_.Reset();
2675
4.71M
  returns_status_batch_opt_ = boost::none;
2676
4.71M
  reset_async_calls->Perform();
2677
4.71M
}
2678
2679
3.89M
QLExpressionPB* CreateQLExpression(QLWriteRequestPB *req, const ColumnDesc& col_desc) {
2680
3.89M
  if (col_desc.is_hash()) {
2681
1.35M
    return req->add_hashed_column_values();
2682
2.53M
  } else if (col_desc.is_primary()) {
2683
1.17M
    return req->add_range_column_values();
2684
1.35M
  } else {
2685
1.35M
    QLColumnValuePB *col_pb = req->add_column_values();
2686
1.35M
    col_pb->set_column_id(col_desc.id());
2687
1.35M
    return col_pb->mutable_expr();
2688
1.35M
  }
2689
3.89M
}
2690
2691
Executor::ExecutorTask& Executor::ExecutorTask::Bind(
2692
4.88M
    Executor* executor, Executor::ResetAsyncCalls* reset_async_calls) {
2693
4.88M
  executor_ = executor;
2694
4.88M
  reset_async_calls_ = std::move(*reset_async_calls);
2695
4.88M
  return *this;
2696
4.88M
}
2697
2698
4.92M
void Executor::ExecutorTask::Run() {
2699
4.92M
  auto executor = executor_;
2700
4.92M
  executor_ = nullptr;
2701
4.92M
  DoRun(executor, &reset_async_calls_);
2702
4.92M
}
2703
2704
4.92M
void Executor::ExecutorTask::Done(const Status& status) {
2705
4.92M
  if (!status.ok()) {
2706
0
    reset_async_calls_.Perform();
2707
0
  }
2708
4.92M
}
2709
2710
Executor::ResetAsyncCalls::ResetAsyncCalls(std::atomic<int64_t>* num_async_calls)
2711
9.65M
    : num_async_calls_(num_async_calls) {
2712
4.32k
  LOG_IF(DFATAL, num_async_calls && num_async_calls->load(std::memory_order_acquire))
2713
4.32k
      << "Expected 0 async calls, but have: " << num_async_calls->load(std::memory_order_acquire);
2714
9.65M
}
2715
2716
Executor::ResetAsyncCalls::ResetAsyncCalls(ResetAsyncCalls&& rhs)
2717
0
    : num_async_calls_(rhs.num_async_calls_) {
2718
0
  rhs.num_async_calls_ = nullptr;
2719
0
  LOG_IF(DFATAL, num_async_calls_ && num_async_calls_->load(std::memory_order_acquire))
2720
0
      << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire);
2721
0
}
2722
2723
4.89M
void Executor::ResetAsyncCalls::operator=(ResetAsyncCalls&& rhs) {
2724
4.89M
  Perform();
2725
4.89M
  num_async_calls_ = rhs.num_async_calls_;
2726
4.89M
  rhs.num_async_calls_ = nullptr;
2727
4.83k
  LOG_IF(DFATAL, num_async_calls_ && num_async_calls_->load(std::memory_order_acquire))
2728
4.83k
      << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire);
2729
4.89M
}
2730
2731
4.90M
void Executor::ResetAsyncCalls::Cancel() {
2732
4.90M
  num_async_calls_ = nullptr;
2733
4.90M
}
2734
2735
9.62M
Executor::ResetAsyncCalls::~ResetAsyncCalls() {
2736
9.62M
  Perform();
2737
9.62M
}
2738
2739
19.2M
void Executor::ResetAsyncCalls::Perform() {
2740
19.2M
  if (!num_async_calls_) {
2741
14.5M
    return;
2742
14.5M
  }
2743
2744
18.4E
  LOG_IF(DFATAL, num_async_calls_->load(std::memory_order_acquire))
2745
18.4E
      << "Expected 0 async calls, but have: " << num_async_calls_->load(std::memory_order_acquire);
2746
4.70M
  num_async_calls_->store(kAsyncCallsIdle, std::memory_order_release);
2747
4.70M
  num_async_calls_ = nullptr;
2748
4.70M
}
2749
2750
}  // namespace ql
2751
}  // namespace yb