YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/exec_context.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/cql/ql/exec/exec_context.h"
17
18
#include <boost/function.hpp>
19
20
#include "yb/client/table.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/ql_rowblock.h"
25
#include "yb/common/schema.h"
26
27
#include "yb/gutil/casts.h"
28
29
#include "yb/rpc/thread_pool.h"
30
31
#include "yb/util/result.h"
32
#include "yb/util/status_format.h"
33
#include "yb/util/trace.h"
34
35
#include "yb/yql/cql/ql/exec/rescheduler.h"
36
#include "yb/yql/cql/ql/ptree/parse_tree.h"
37
#include "yb/yql/cql/ql/ptree/pt_select.h"
38
#include "yb/yql/cql/ql/util/statement_params.h"
39
40
DEFINE_int32(cql_prepare_child_threshold_ms, 2000,
41
             "Timeout if preparing for child transaction takes longer"
42
             "than the prescribed threshold.");
43
44
namespace yb {
45
namespace ql {
46
47
using client::CommitCallback;
48
using client::Restart;
49
using client::YBqlReadOpPtr;
50
using client::YBSessionPtr;
51
using client::YBTransactionPtr;
52
53
ExecContext::ExecContext(const ParseTree& parse_tree, const StatementParameters& params)
54
5.10M
    : parse_tree_(parse_tree), params_(params) {
55
5.10M
}
56
57
5.10M
ExecContext::~ExecContext() {
58
  // Reset to abort transaction explicitly instead of letting it expire.
59
  // Should be ok not to take a rescheduler here since the `ExecContext` clean up should happen
60
  // only when we return a response to the CQL client, which is now guaranteed to happen in
61
  // CQL proxy's handler thread.
62
5.10M
  Reset(client::Restart::kFalse, nullptr);
63
5.10M
}
64
65
5.32M
TnodeContext* ExecContext::AddTnode(const TreeNode *tnode) {
66
5.32M
  restart_ = client::Restart::kFalse;
67
5.32M
  tnode_contexts_.emplace_back(tnode);
68
5.32M
  return &tnode_contexts_.back();
69
5.32M
}
70
71
//--------------------------------------------------------------------------------------------------
72
Status ExecContext::StartTransaction(
73
137k
    const IsolationLevel isolation_level, QLEnv* ql_env, Rescheduler* rescheduler) {
74
137k
  TRACE("Start Transaction");
75
137k
  transaction_start_time_ = MonoTime::Now();
76
137k
  if (!transaction_) {
77
136k
    transaction_ = VERIFY_RESULT(ql_env->NewTransaction(transaction_, isolation_level));
78
1.01k
  } else if (transaction_->IsRestartRequired()) {
79
0
    transaction_ = VERIFY_RESULT(transaction_->CreateRestartedTransaction());
80
1.01k
  } else {
81
    // If there is no need to start or restart transaction, just return. This can happen to DMLs on
82
    // a table with secondary index inside a "BEGIN TRANSACTION ... END TRANSACTION" block. Each DML
83
    // will try to start a transaction "on-demand" and we will use the shared transaction already
84
    // started by "BEGIN TRANSACTION".
85
1.01k
    return Status::OK();
86
1.01k
  }
87
88
136k
  if (!transactional_session_) {
89
80.2k
    transactional_session_ = ql_env->NewSession();
90
80.2k
    transactional_session_->SetReadPoint(client::Restart::kFalse);
91
80.2k
  }
92
136k
  transactional_session_->SetDeadline(rescheduler->GetDeadline());
93
136k
  transactional_session_->SetTransaction(transaction_);
94
95
136k
  return Status::OK();
96
136k
}
97
98
Status ExecContext::PrepareChildTransaction(
99
81.8k
    CoarseTimePoint deadline, ChildTransactionDataPB* data) {
100
81.8k
  auto future = DCHECK_NOTNULL(transaction_.get())->PrepareChildFuture(
101
81.8k
      client::ForceConsistentRead::kTrue, deadline);
102
103
  // Set the deadline to be the earlier of the input deadline and the current timestamp
104
  // plus the waiting time for the prepare child
105
81.8k
  auto future_deadline = std::min(
106
81.8k
      deadline,
107
81.8k
      CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cql_prepare_child_threshold_ms));
108
109
81.8k
  auto future_status = future.wait_until(future_deadline);
110
111
81.9k
  if (future_status == std::future_status::ready) {
112
81.9k
    *data = VERIFY_RESULT(std::move(future).get());
113
81.9k
    return Status::OK();
114
18.4E
  }
115
116
18.4E
  auto message = Format("Timed out waiting for prepare child status, left to deadline: $0",
117
18.4E
                        MonoDelta(deadline - CoarseMonoClock::now()));
118
18.4E
  LOG(INFO) << message;
119
18.4E
  return STATUS(TimedOut, message);
120
18.4E
}
121
122
14.1k
Status ExecContext::ApplyChildTransactionResult(const ChildTransactionResultPB& result) {
123
14.1k
  return DCHECK_NOTNULL(transaction_.get())->ApplyChildResult(result);
124
14.1k
}
125
126
78.5k
void ExecContext::CommitTransaction(CoarseTimePoint deadline, CommitCallback callback) {
127
78.5k
  if (!transaction_) {
128
0
    LOG(DFATAL) << "No transaction to commit";
129
0
    return;
130
0
  }
131
132
  // Clear the transaction from the session before committing the transaction. SetTransaction()
133
  // must be called before the Commit() call instead of after because when the commit callback is
134
  // invoked, it will finish the current transaction, return the response and make the CQLProcessor
135
  // available for the next statement and its operations would be aborted by SetTransaction().
136
78.5k
  transactional_session_->SetTransaction(nullptr);
137
78.5k
  transactional_session_ = nullptr;
138
139
78.5k
  YBTransactionPtr transaction = std::move(transaction_);
140
78.5k
  TRACE("Commit Transaction");
141
78.5k
  transaction->Commit(deadline, std::move(callback));
142
78.5k
}
143
144
109
void ExecContext::AbortTransaction() {
145
109
  if (!transaction_) {
146
0
    LOG(DFATAL) << "No transaction to abort";
147
0
    return;
148
0
  }
149
150
  // Abort the session and clear the transaction from the session before aborting the transaction.
151
109
  transactional_session_->Abort();
152
109
  transactional_session_->SetTransaction(nullptr);
153
109
  transactional_session_ = nullptr;
154
155
109
  YBTransactionPtr transaction = std::move(transaction_);
156
109
  TRACE("Abort Transaction");
157
109
  transaction->Abort();
158
109
}
159
160
81.4k
bool ExecContext::HasPendingOperations() const {
161
3.01k
  for (const auto& tnode_context : tnode_contexts_) {
162
3.01k
    if (tnode_context.HasPendingOperations()) {
163
2.95k
      return true;
164
2.95k
    }
165
3.01k
  }
166
78.5k
  return false;
167
81.4k
}
168
169
class AbortTransactionTask : public rpc::ThreadPoolTask {
170
 public:
171
  explicit AbortTransactionTask(YBTransactionPtr transaction)
172
56.0k
      : transaction_(std::move(transaction)) {}
173
174
56.7k
  void Run() override {
175
56.7k
    transaction_->Abort();
176
56.7k
    transaction_ = nullptr;
177
56.7k
  }
178
179
56.7k
  void Done(const Status& status) override {
180
56.7k
    delete this;
181
56.7k
  }
182
183
56.7k
  virtual ~AbortTransactionTask() {
184
56.7k
  }
185
186
 private:
187
  YBTransactionPtr transaction_;
188
};
189
190
//--------------------------------------------------------------------------------------------------
191
5.19M
void ExecContext::Reset(const Restart restart, Rescheduler* rescheduler) {
192
5.19M
  if (transactional_session_) {
193
57.8k
    transactional_session_->Abort();
194
57.8k
    transactional_session_->SetTransaction(nullptr);
195
57.8k
  }
196
5.19M
  if (transaction_ && !(transaction_->IsRestartRequired() && restart)) {
197
58.3k
    YBTransactionPtr transaction = std::move(transaction_);
198
58.3k
    TRACE("Abort Transaction");
199
58.3k
    if (rescheduler && rescheduler->NeedReschedule()) {
200
56.0k
      rescheduler->Reschedule(new AbortTransactionTask(std::move(transaction)));
201
2.23k
    } else {
202
2.23k
      transaction->Abort();
203
2.23k
    }
204
58.3k
  }
205
5.19M
  restart_ = restart;
206
5.19M
  tnode_contexts_.clear();
207
5.19M
  if (restart) {
208
102k
    num_retries_++;
209
102k
  }
210
5.19M
}
211
212
//--------------------------------------------------------------------------------------------------
213
5.32M
TnodeContext::TnodeContext(const TreeNode* tnode) : tnode_(tnode), start_time_(MonoTime::Now()) {
214
5.32M
}
215
216
5.32M
TnodeContext::~TnodeContext() = default;
217
218
1.95k
TnodeContext* TnodeContext::AddChildTnode(const TreeNode* tnode) {
219
1.95k
  DCHECK(!child_context_);
220
1.95k
  child_context_ = std::make_unique<TnodeContext>(tnode);
221
1.95k
  return child_context_.get();
222
1.95k
}
223
224
3.93M
Status TnodeContext::AppendRowsResult(RowsResult::SharedPtr&& rows_result) {
225
  // Append data arriving from DocDB.
226
  // (1) SELECT without nested query.
227
  //  - SELECT <select_list> FROM <table or index>
228
  //      WHERE <filter_cond>
229
  //      LIMIT <limit> OFFSET <offset>
230
  //  - New rows are appended at the end.
231
  //
232
  // (2) SELECT with nested query.
233
  //  - SELECT <select_list> FROM <table>
234
  //      WHERE
235
  //        primary_key IN (SELECT primary_key FROM <index> WHERE <index_cond>)
236
  //        AND
237
  //        <filter_cond>
238
  //      LIMIT <limit> OFFSET <offset>
239
  //  - When nested INDEX query fully-covers the SELECT command, data coming from the nested node
240
  //    is appended without being filtered or rejected.
241
  //  - When nested INDEX query does NOT fully-cover the SELECT command, data is rejected if OFFSET
242
  //    is not yet reached. Otherwise, data is appended ONE row at a time.
243
3.93M
  if (!rows_result) {
244
0
    return Status::OK();
245
0
  }
246
247
3.93M
  int64_t number_of_new_rows =
248
3.93M
    VERIFY_RESULT(QLRowBlock::GetRowCount(YQL_CLIENT_CQL, rows_result->rows_data()));
249
250
3.93M
  if (query_state_) {
251
3.93M
    RSTATUS_DCHECK(tnode_->opcode() == TreeNodeOpcode::kPTSelectStmt,
252
3.93M
                   Corruption, "QueryPagingState is setup for non-select statement");
253
3.93M
    const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode_);
254
255
    // Save the last offset status before loading new status from DocDB.
256
3.93M
    const bool reached_offset = query_state_->reached_select_offset();
257
3.93M
    const bool has_nested_query = select_stmt->child_select() != nullptr;
258
3.93M
    RETURN_NOT_OK(
259
3.93M
      query_state_->LoadPagingStateFromDocdb(rows_result, number_of_new_rows, has_nested_query));
260
261
3.93M
    if (!reached_offset && has_nested_query) {
262
      // Parent query needs to discard the new row when (row_count < SELECT::OFFSET).
263
87
      if (!rows_result_) {
264
50
        rows_result_ = std::make_shared<RowsResult>(select_stmt);
265
50
      }
266
87
      rows_result_->SetPagingState(std::move(*rows_result));
267
87
      return Status::OK();
268
87
    }
269
3.93M
  }
270
271
  // Append the new rows to result.
272
3.93M
  row_count_ += number_of_new_rows;
273
3.93M
  if (rows_result_ == nullptr) {
274
3.88M
    rows_result_ = std::move(rows_result);
275
3.88M
    return Status::OK();
276
3.88M
  }
277
50.3k
  return rows_result_->Append(std::move(*rows_result));
278
50.3k
}
279
280
1.07k
void TnodeContext::InitializePartition(QLReadRequestPB *req, bool continue_user_request) {
281
1.05k
  uint64_t start_partition = continue_user_request ? query_state_->next_partition_index() : 0;
282
283
1.07k
  current_partition_index_ = start_partition;
284
  // Hash values before the first 'IN' condition will be already set.
285
  // hash_values_options_ vector starts from the first column with an 'IN' restriction.
286
  // E.g. for a query "h1 = 1 and h2 in (2,3) and h3 in (4,5) and h4 = 6":
287
  // hashed_column_values() will be [1] and hash_values_options_ will be [[2,3],[4,5],[6]].
288
1.07k
  int set_cols_size = req->hashed_column_values().size();
289
1.07k
  auto unset_cols_size = hash_values_options_->size();
290
291
  // Initialize the missing columns with default values (e.g. h2, h3, h4 in example above).
292
1.07k
  req->mutable_hashed_column_values()->Reserve(narrow_cast<int>(set_cols_size + unset_cols_size));
293
2.16k
  for (size_t i = 0; i < unset_cols_size; i++) {
294
1.09k
    req->add_hashed_column_values();
295
1.09k
  }
296
297
  // Set the right values for the missing/unset columns by converting partition index into positions
298
  // for each hash column and using the corresponding values from the hash values options vector.
299
  // E.g. In example above, with start_partition = 0:
300
  //    h4 = 6 since pos is "0 % 1 = 0", (start_position becomes 0 / 1 = 0).
301
  //    h3 = 4 since pos is "0 % 2 = 0", (start_position becomes 0 / 2 = 0).
302
  //    h2 = 2 since pos is "0 % 2 = 0", (start_position becomes 0 / 2 = 0).
303
2.16k
  for (auto i = unset_cols_size; i > 0;) {
304
1.09k
    --i;
305
1.09k
    const auto& options = (*hash_values_options_)[i];
306
1.09k
    auto pos = start_partition % options.size();
307
1.09k
    *req->mutable_hashed_column_values(narrow_cast<int>(i + set_cols_size)) = options[pos];
308
1.09k
    start_partition /= options.size();
309
1.09k
  }
310
1.07k
}
311
312
317k
bool TnodeContext::FinishedReadingPartition() {
313
317k
  return rows_result_->paging_state().empty() ||
314
33.5k
      (query_state_->next_partition_key().empty() && query_state_->next_row_key().empty());
315
317k
}
316
317
5.65k
void TnodeContext::AdvanceToNextPartition(QLReadRequestPB *req) {
318
  // E.g. for a query "h1 = 1 and h2 in (2,3) and h3 in (4,5) and h4 = 6" partition index 2:
319
  // this will do, index: 2 -> 3 and hashed_column_values(): [1, 3, 4, 6] -> [1, 3, 5, 6].
320
5.65k
  current_partition_index_++;
321
5.65k
  uint64_t partition_counter = current_partition_index_;
322
  // Hash_values_options_ vector starts from the first column with an 'IN' restriction.
323
5.65k
  const int hash_key_size = req->hashed_column_values().size();
324
5.65k
  const auto fixed_cols_size = hash_key_size - hash_values_options_->size();
325
326
  // Set the right values for the missing/unset columns by converting partition index into positions
327
  // for each hash column and using the corresponding values from the hash values options vector.
328
  // E.g. In example above, with start_partition = 3:
329
  //    h4 = 6 since pos is "3 % 1 = 0", new partition counter is "3 / 1 = 3".
330
  //    h3 = 5 since pos is "3 % 2 = 1", pos is non-zero which guarantees previous cols don't need
331
  //    to be changed (i.e. are the same as for previous partition index) so we break.
332
5.73k
  for (size_t i = hash_key_size; i > fixed_cols_size;) {
333
5.73k
    --i;
334
5.73k
    const auto& options = (*hash_values_options_)[i - fixed_cols_size];
335
5.73k
    auto pos = partition_counter % options.size();
336
5.73k
    *req->mutable_hashed_column_values(narrow_cast<int>(i)) = options[pos];
337
5.73k
    if (pos != 0) break; // The previous position hash values must be unchanged.
338
75
    partition_counter /= options.size();
339
75
  }
340
341
5.65k
  req->clear_hash_code();
342
5.65k
  req->clear_max_hash_code();
343
344
5.65k
  if (hash_code_from_partition_key_ops_.is_initialized())
345
3
    req->set_hash_code(*hash_code_from_partition_key_ops_);
346
5.65k
  if (max_hash_code_from_partition_key_ops_.is_initialized())
347
3
    req->set_max_hash_code(*max_hash_code_from_partition_key_ops_);
348
5.65k
}
349
350
5.42M
bool TnodeContext::HasPendingOperations() const {
351
168k
  for (const auto& op : ops_) {
352
168k
    if (!op->response().has_status()) {
353
73.6k
      return true;
354
73.6k
    }
355
168k
  }
356
5.34M
  if (child_context_) {
357
6.54k
    return child_context_->HasPendingOperations();
358
6.54k
  }
359
5.34M
  return false;
360
5.34M
}
361
362
404
void TnodeContext::SetUncoveredSelectOp(const YBqlReadOpPtr& select_op) {
363
404
  uncovered_select_op_ = select_op;
364
404
  const Schema& schema = static_cast<const PTSelectStmt*>(tnode_)->table()->InternalSchema();
365
404
  std::vector<ColumnId> key_column_ids;
366
404
  key_column_ids.reserve(schema.num_key_columns());
367
1.49k
  for (size_t idx = 0; idx < schema.num_key_columns(); idx++) {
368
1.08k
    key_column_ids.emplace_back(schema.column_id(idx));
369
1.08k
  }
370
404
  keys_ = std::make_unique<QLRowBlock>(schema, key_column_ids);
371
404
}
372
373
QueryPagingState *TnodeContext::CreateQueryState(const StatementParameters& user_params,
374
3.89M
                                                 bool is_top_level_select) {
375
3.89M
  query_state_ = std::make_unique<QueryPagingState>(user_params, is_top_level_select);
376
3.89M
  return query_state_.get();
377
3.89M
}
378
379
3.89M
Status TnodeContext::ClearQueryState() {
380
3.89M
  RSTATUS_DCHECK(query_state_, Corruption, "Query state should not be null for SELECT");
381
3.89M
  rows_result_->ClearPagingState();
382
3.89M
  query_state_->ClearPagingState();
383
384
3.89M
  return Status::OK();
385
3.89M
}
386
387
Status TnodeContext::ComposeRowsResultForUser(const TreeNode* child_select_node,
388
2.19k
                                              bool for_new_batches) {
389
2.19k
  RSTATUS_DCHECK_EQ(tnode_->opcode(), TreeNodeOpcode::kPTSelectStmt,
390
2.19k
                    Corruption, "Only SELECT node can have nested query");
391
2.19k
  const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode_);
392
393
  // Case 1:
394
  //   SELECT * FROM <table>;
395
2.19k
  if (!child_select_node) {
396
282
    if (rows_result_->has_paging_state() || for_new_batches) {
397
      // Paging state must be provided for two cases. Otherwise, we've reached end of result set.
398
      // - Docdb sent back rows_result with paging state.
399
      // - Seting up paging_state for user's next batches.
400
282
      RETURN_NOT_OK(query_state_->ComposePagingStateForUser());
401
282
      rows_result_->SetPagingState(query_state_->query_pb());
402
282
    }
403
282
    return Status::OK();
404
1.91k
  }
405
406
  // Check for nested condition.
407
1.91k
  RSTATUS_DCHECK(child_context_ && child_select_node->opcode() == TreeNodeOpcode::kPTSelectStmt,
408
1.91k
                 Corruption, "Expecting nested context with a SELECT node");
409
410
  // Case 2:
411
  //   SELECT <fully_covered_columns> FROM <index>;
412
  // Move result from index query (child) to the table query (this parent node).
413
1.91k
  const auto* child_select = static_cast<const PTSelectStmt *>(child_select_node);
414
1.91k
  if (child_select->covers_fully()) {
415
1.54k
    return AppendRowsResult(std::move(child_context_->rows_result()));
416
1.54k
  }
417
418
  // Case 3:
419
  //   SELECT <any columns> FROM <table> WHERE primary_keys IN (SELECT primary_keys FROM <index>);
420
  // Compose result of the following fields.
421
  // - The rows_result should be from this node (rows_result_).
422
  // - The counter_state should be from this node (query_state_::counter_pb_).
423
  // - The read paging_state should be from the CHILD node (query_state_::query_pb_)
424
366
  if (!rows_result_) {
425
    // Allocate an empty rows_result that will be filled with paging state.
426
12
    rows_result_ = std::make_shared<RowsResult>(select_stmt);
427
12
  }
428
429
366
  if (child_context_->rows_result()->has_paging_state() &&
430
60
      !query_state_->reached_select_limit()) {
431
    // If child node has paging state and LIMIT is not yet reached, provide paging state to users
432
    // to continue reading.
433
57
    RETURN_NOT_OK(
434
57
        query_state_->ComposePagingStateForUser(child_context_->query_state()->query_pb()));
435
57
    rows_result_->SetPagingState(query_state_->query_pb());
436
309
  } else {
437
    // Clear paging state once all requested rows were retrieved.
438
309
    rows_result_->ClearPagingState();
439
309
  }
440
441
366
  return Status::OK();
442
366
}
443
444
//--------------------------------------------------------------------------------------------------
445
446
QueryPagingState::QueryPagingState(const StatementParameters& user_params,
447
                                   bool is_top_level_read_node)
448
3.88M
    : max_fetch_size_(user_params.page_size()) {
449
3.88M
  LoadPagingStateFromUser(user_params, is_top_level_read_node);
450
451
  // Just default it to max_int.
452
3.88M
  if (max_fetch_size_ <= 0) {
453
0
    max_fetch_size_ = INT_MAX;
454
0
  }
455
3.88M
}
456
457
3.60M
void QueryPagingState::AdjustMaxFetchSizeToSelectLimit() {
458
3.60M
  int64_t limit = select_limit();
459
3.60M
  if (limit < 0) {
460
0
    return;
461
0
  }
462
463
3.60M
  int64_t count = read_count();
464
3.61M
  if (count < limit) {
465
3.61M
    int64_t wanted = limit - count;
466
3.61M
    if (wanted < max_fetch_size_) {
467
3.61M
      max_fetch_size_ = wanted;
468
3.61M
    }
469
18.4E
  } else {
470
18.4E
    max_fetch_size_ = 0;
471
18.4E
  }
472
3.60M
}
473
474
3.89M
void QueryPagingState::ClearPagingState() {
475
  // Clear only the paging state.
476
  // Keep the counter so that we knows how many rows have been processed.
477
3.89M
  query_pb_.Clear();
478
3.89M
}
479
480
void QueryPagingState::LoadPagingStateFromUser(const StatementParameters& user_params,
481
3.89M
                                               bool is_top_level_read_node) {
482
3.89M
  user_params.WritePagingState(&query_pb_);
483
484
  // Calculate "skip_count" and "read_count".
485
  // (1) Top level read node.
486
  //  - Either top-level SELECT or fully-covering INDEX query.
487
  //  - User "params::couter_pb_" should have the valid "counter_pb_"
488
  //
489
  // (2) Nested read node.
490
  //  - Zero out its counters. We don't use "counter_pb_" for this node because LIMIT and OFFSET
491
  //    restrictions are not applied to nested nodes.
492
  //  - Because nested node might be running different READ operators (with different hash values)
493
  //    for different calls from users, the counters from users' message are discarded here.
494
3.89M
  if (is_top_level_read_node) {
495
3.88M
    counter_pb_.CopyFrom(query_pb_.row_counter());
496
11.1k
  } else {
497
    // These values are not used, set them to zero.
498
11.1k
    set_skip_count(0);
499
11.1k
    set_read_count(0);
500
11.1k
  }
501
3.89M
}
502
503
282
Status QueryPagingState::ComposePagingStateForUser() {
504
  // Write the counters into the paging_state.
505
282
  query_pb_.mutable_row_counter()->CopyFrom(counter_pb_);
506
282
  return Status::OK();
507
282
}
508
509
57
Status QueryPagingState::ComposePagingStateForUser(const QLPagingStatePB& child_state) {
510
  // Write child_state.
511
57
  query_pb_.CopyFrom(child_state);
512
513
  // Write the counters into the paging_state.
514
57
  query_pb_.mutable_row_counter()->CopyFrom(counter_pb_);
515
516
57
  return Status::OK();
517
57
}
518
519
Status QueryPagingState::LoadPagingStateFromDocdb(const RowsResult::SharedPtr& rows_result,
520
                                                  int64_t number_of_new_rows,
521
3.93M
                                                  bool has_nested_query) {
522
  // Load "query_pb_" with the latest result from DocDB.
523
3.93M
  query_pb_.ParseFromString(rows_result->paging_state());
524
525
  // If DocDB processed the skipping rows, record it here.
526
3.93M
  if (total_rows_skipped() > 0) {
527
1.37k
    set_skip_count(total_rows_skipped());
528
1.37k
  }
529
530
3.93M
  if (!has_nested_query) {
531
    // SELECT <select_list> FROM <table or index>
532
    //   WHERE <filter_cond>
533
    //   LIMIT <limit> OFFSET <offset>
534
    // - DocDB processed the <limit> and <offset> restrictions.
535
    //   Either "reached_select_offset() == TRUE" OR number_of_new_rows == 0.
536
    // - Two DocDB::counters are used to compute here.
537
    //   . QLPagingStatePB::total_rows_skipped - Skip count in DocDB.
538
    //   . number_of_new_rows - Rows of data from DocDB after skipping.
539
3.92M
    set_read_count(read_count() + number_of_new_rows);
540
541
1.00k
  } else {
542
    // SELECT <select_list> FROM <table>
543
    //   WHERE
544
    //     primary_key IN (SELECT primary_key FROM <index> WHERE <index_cond>)
545
    //     AND
546
    //     <filter_cond>
547
    //   LIMIT <limit> OFFSET <offset>
548
    //
549
    // NOTE:
550
    // 1. Case INDEX query fully-covers the SELECT command.
551
    //    - DocDB counters are transfered from nested node to this node.
552
    //    - "reached_select_offset() == TRUE" OR number_of_new_rows == 0.
553
    //
554
    // 2. Case INDEX query does NOT fully-cover the SELECT command.
555
    //    - Values of <limit> and <offset> are NOT sent together with proto request to DocDB. They
556
    //      are computed and processed here in CQL layer.
557
    //    - For this case, "number_of_new_rows" is either 1 or 0.
558
    //      CQL assumes that outer SELECT reads at most one row at a time as it uses values of
559
    //      PRIMARY KEY (always unique) to read the rest of the columns of the <table>.
560
1.00k
    if (!reached_select_offset()) {
561
      // Since OFFSET is processed here, this must be case 2.
562
87
      RSTATUS_DCHECK_LE(number_of_new_rows, 1, Corruption, "Incorrect counter calculation");
563
87
      set_skip_count(skip_count() + number_of_new_rows);
564
921
    } else {
565
921
      set_read_count(read_count() + number_of_new_rows);
566
921
    }
567
1.00k
  }
568
569
3.93M
  return Status::OK();
570
3.93M
}
571
572
5.11M
const std::string& ExecContext::stmt() const {
573
5.11M
  return parse_tree_.stmt();
574
5.11M
}
575
576
}  // namespace ql
577
}  // namespace yb