YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/exec_context.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/cql/ql/exec/exec_context.h"
17
18
#include <boost/function.hpp>
19
20
#include "yb/client/table.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/ql_rowblock.h"
25
#include "yb/common/schema.h"
26
27
#include "yb/gutil/casts.h"
28
29
#include "yb/rpc/thread_pool.h"
30
31
#include "yb/util/result.h"
32
#include "yb/util/status_format.h"
33
#include "yb/util/trace.h"
34
#include "yb/util/tsan_util.h"
35
36
#include "yb/yql/cql/ql/exec/rescheduler.h"
37
#include "yb/yql/cql/ql/ptree/parse_tree.h"
38
#include "yb/yql/cql/ql/ptree/pt_select.h"
39
#include "yb/yql/cql/ql/util/statement_params.h"
40
41
DEFINE_int32(cql_prepare_child_threshold_ms, 2000 * yb::kTimeMultiplier,
42
             "Timeout if preparing for child transaction takes longer"
43
             "than the prescribed threshold.");
44
45
namespace yb {
46
namespace ql {
47
48
using client::CommitCallback;
49
using client::Restart;
50
using client::YBqlReadOpPtr;
51
using client::YBSessionPtr;
52
using client::YBTransactionPtr;
53
54
ExecContext::ExecContext(const ParseTree& parse_tree, const StatementParameters& params)
55
9.45M
    : parse_tree_(parse_tree), params_(params) {
56
9.45M
}
57
58
9.45M
ExecContext::~ExecContext() {
59
  // Reset to abort transaction explicitly instead of letting it expire.
60
  // Should be ok not to take a rescheduler here since the `ExecContext` clean up should happen
61
  // only when we return a response to the CQL client, which is now guaranteed to happen in
62
  // CQL proxy's handler thread.
63
9.45M
  Reset(client::Restart::kFalse, nullptr);
64
9.45M
}
65
66
9.66M
TnodeContext* ExecContext::AddTnode(const TreeNode *tnode) {
67
9.66M
  restart_ = client::Restart::kFalse;
68
9.66M
  tnode_contexts_.emplace_back(tnode);
69
9.66M
  return &tnode_contexts_.back();
70
9.66M
}
71
72
//--------------------------------------------------------------------------------------------------
73
Status ExecContext::StartTransaction(
74
121k
    const IsolationLevel isolation_level, QLEnv* ql_env, Rescheduler* rescheduler) {
75
121k
  TRACE("Start Transaction");
76
121k
  transaction_start_time_ = MonoTime::Now();
77
121k
  if (!transaction_) {
78
120k
    transaction_ = VERIFY_RESULT(ql_env->NewTransaction(
79
120k
        transaction_, isolation_level, rescheduler->GetDeadline()));
80
120k
  } else 
if (1.28k
transaction_->IsRestartRequired()1.28k
) {
81
0
    transaction_ = VERIFY_RESULT(transaction_->CreateRestartedTransaction());
82
1.28k
  } else {
83
    // If there is no need to start or restart transaction, just return. This can happen to DMLs on
84
    // a table with secondary index inside a "BEGIN TRANSACTION ... END TRANSACTION" block. Each DML
85
    // will try to start a transaction "on-demand" and we will use the shared transaction already
86
    // started by "BEGIN TRANSACTION".
87
1.28k
    return Status::OK();
88
1.28k
  }
89
90
120k
  if (!transactional_session_) {
91
79.0k
    transactional_session_ = ql_env->NewSession();
92
79.0k
    transactional_session_->SetReadPoint(client::Restart::kFalse);
93
79.0k
  }
94
120k
  transactional_session_->SetDeadline(rescheduler->GetDeadline());
95
120k
  transactional_session_->SetTransaction(transaction_);
96
97
120k
  return Status::OK();
98
121k
}
99
100
Status ExecContext::PrepareChildTransaction(
101
66.1k
    CoarseTimePoint deadline, ChildTransactionDataPB* data) {
102
66.1k
  auto future = DCHECK_NOTNULL(transaction_.get())->PrepareChildFuture(
103
66.1k
      client::ForceConsistentRead::kTrue, deadline);
104
105
  // Set the deadline to be the earlier of the input deadline and the current timestamp
106
  // plus the waiting time for the prepare child
107
66.1k
  auto future_deadline = std::min(
108
66.1k
      deadline,
109
66.1k
      CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cql_prepare_child_threshold_ms));
110
111
66.1k
  auto future_status = future.wait_until(future_deadline);
112
113
66.1k
  if (future_status == std::future_status::ready) {
114
66.1k
    *data = VERIFY_RESULT(std::move(future).get());
115
0
    return Status::OK();
116
66.1k
  }
117
118
22
  auto message = Format("Timed out waiting for prepare child status, left to deadline: $0",
119
22
                        MonoDelta(deadline - CoarseMonoClock::now()));
120
22
  LOG(INFO) << message;
121
22
  return STATUS(TimedOut, message);
122
66.1k
}
123
124
13.4k
Status ExecContext::ApplyChildTransactionResult(const ChildTransactionResultPB& result) {
125
13.4k
  return DCHECK_NOTNULL(transaction_.get())->ApplyChildResult(result);
126
13.4k
}
127
128
77.2k
void ExecContext::CommitTransaction(CoarseTimePoint deadline, CommitCallback callback) {
129
77.2k
  if (!transaction_) {
130
0
    LOG(DFATAL) << "No transaction to commit";
131
0
    return;
132
0
  }
133
134
  // Clear the transaction from the session before committing the transaction. SetTransaction()
135
  // must be called before the Commit() call instead of after because when the commit callback is
136
  // invoked, it will finish the current transaction, return the response and make the CQLProcessor
137
  // available for the next statement and its operations would be aborted by SetTransaction().
138
77.2k
  transactional_session_->SetTransaction(nullptr);
139
77.2k
  transactional_session_ = nullptr;
140
141
77.2k
  YBTransactionPtr transaction = std::move(transaction_);
142
77.2k
  TRACE("Commit Transaction");
143
77.2k
  transaction->Commit(deadline, std::move(callback));
144
77.2k
}
145
146
109
void ExecContext::AbortTransaction() {
147
109
  if (!transaction_) {
148
0
    LOG(DFATAL) << "No transaction to abort";
149
0
    return;
150
0
  }
151
152
  // Abort the session and clear the transaction from the session before aborting the transaction.
153
109
  transactional_session_->Abort();
154
109
  transactional_session_->SetTransaction(nullptr);
155
109
  transactional_session_ = nullptr;
156
157
109
  YBTransactionPtr transaction = std::move(transaction_);
158
109
  TRACE("Abort Transaction");
159
109
  transaction->Abort();
160
109
}
161
162
82.3k
bool ExecContext::HasPendingOperations() const {
163
82.3k
  for (const auto& tnode_context : tnode_contexts_) {
164
5.17k
    if (tnode_context.HasPendingOperations()) {
165
5.11k
      return true;
166
5.11k
    }
167
5.17k
  }
168
77.2k
  return false;
169
82.3k
}
170
171
class AbortTransactionTask : public rpc::ThreadPoolTask {
172
 public:
173
  explicit AbortTransactionTask(YBTransactionPtr transaction)
174
41.0k
      : transaction_(std::move(transaction)) {}
175
176
41.2k
  void Run() override {
177
41.2k
    transaction_->Abort();
178
41.2k
    transaction_ = nullptr;
179
41.2k
  }
180
181
41.1k
  void Done(const Status& status) override {
182
41.1k
    delete this;
183
41.1k
  }
184
185
41.1k
  virtual ~AbortTransactionTask() {
186
41.1k
  }
187
188
 private:
189
  YBTransactionPtr transaction_;
190
};
191
192
//--------------------------------------------------------------------------------------------------
193
9.53M
void ExecContext::Reset(const Restart restart, Rescheduler* rescheduler) {
194
9.53M
  if (transactional_session_) {
195
42.5k
    transactional_session_->Abort();
196
42.5k
    transactional_session_->SetTransaction(nullptr);
197
42.5k
  }
198
9.53M
  if (transaction_ && 
!(42.5k
transaction_->IsRestartRequired()42.5k
&&
restart0
)) {
199
42.7k
    YBTransactionPtr transaction = std::move(transaction_);
200
42.7k
    TRACE("Abort Transaction");
201
42.7k
    if (rescheduler && 
rescheduler->NeedReschedule()41.1k
) {
202
41.1k
      rescheduler->Reschedule(new AbortTransactionTask(std::move(transaction)));
203
41.1k
    } else {
204
1.55k
      transaction->Abort();
205
1.55k
    }
206
42.7k
  }
207
9.53M
  restart_ = restart;
208
9.53M
  tnode_contexts_.clear();
209
9.53M
  if (restart) {
210
89.8k
    num_retries_++;
211
89.8k
  }
212
9.53M
}
213
214
//--------------------------------------------------------------------------------------------------
215
9.66M
TnodeContext::TnodeContext(const TreeNode* tnode) : tnode_(tnode), start_time_(MonoTime::Now()) {
216
9.66M
}
217
218
9.65M
TnodeContext::~TnodeContext() = default;
219
220
1.96k
TnodeContext* TnodeContext::AddChildTnode(const TreeNode* tnode) {
221
1.96k
  DCHECK(!child_context_);
222
1.96k
  child_context_ = std::make_unique<TnodeContext>(tnode);
223
1.96k
  return child_context_.get();
224
1.96k
}
225
226
7.54M
Status TnodeContext::AppendRowsResult(RowsResult::SharedPtr&& rows_result) {
227
  // Append data arriving from DocDB.
228
  // (1) SELECT without nested query.
229
  //  - SELECT <select_list> FROM <table or index>
230
  //      WHERE <filter_cond>
231
  //      LIMIT <limit> OFFSET <offset>
232
  //  - New rows are appended at the end.
233
  //
234
  // (2) SELECT with nested query.
235
  //  - SELECT <select_list> FROM <table>
236
  //      WHERE
237
  //        primary_key IN (SELECT primary_key FROM <index> WHERE <index_cond>)
238
  //        AND
239
  //        <filter_cond>
240
  //      LIMIT <limit> OFFSET <offset>
241
  //  - When nested INDEX query fully-covers the SELECT command, data coming from the nested node
242
  //    is appended without being filtered or rejected.
243
  //  - When nested INDEX query does NOT fully-cover the SELECT command, data is rejected if OFFSET
244
  //    is not yet reached. Otherwise, data is appended ONE row at a time.
245
7.54M
  if (!rows_result) {
246
0
    return Status::OK();
247
0
  }
248
249
7.54M
  int64_t number_of_new_rows =
250
7.54M
    VERIFY_RESULT(QLRowBlock::GetRowCount(YQL_CLIENT_CQL, rows_result->rows_data()));
251
252
7.54M
  if (query_state_) {
253
7.53M
    RSTATUS_DCHECK(tnode_->opcode() == TreeNodeOpcode::kPTSelectStmt,
254
7.53M
                   Corruption, "QueryPagingState is setup for non-select statement");
255
7.53M
    const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode_);
256
257
    // Save the last offset status before loading new status from DocDB.
258
7.53M
    const bool reached_offset = query_state_->reached_select_offset();
259
7.53M
    const bool has_nested_query = select_stmt->child_select() != nullptr;
260
7.53M
    RETURN_NOT_OK(
261
7.53M
      query_state_->LoadPagingStateFromDocdb(rows_result, number_of_new_rows, has_nested_query));
262
263
7.53M
    if (!reached_offset && 
has_nested_query714
) {
264
      // Parent query needs to discard the new row when (row_count < SELECT::OFFSET).
265
89
      if (!rows_result_) {
266
51
        rows_result_ = std::make_shared<RowsResult>(select_stmt);
267
51
      }
268
89
      rows_result_->SetPagingState(std::move(*rows_result));
269
89
      return Status::OK();
270
89
    }
271
7.53M
  }
272
273
  // Append the new rows to result.
274
7.54M
  row_count_ += number_of_new_rows;
275
7.54M
  if (rows_result_ == nullptr) {
276
7.48M
    rows_result_ = std::move(rows_result);
277
7.48M
    return Status::OK();
278
7.48M
  }
279
61.2k
  return rows_result_->Append(std::move(*rows_result));
280
7.54M
}
281
282
866
void TnodeContext::InitializePartition(QLReadRequestPB *req, bool continue_user_request) {
283
866
  uint64_t start_partition = continue_user_request ? 
query_state_->next_partition_index()16
:
0850
;
284
285
866
  current_partition_index_ = start_partition;
286
  // Hash values before the first 'IN' condition will be already set.
287
  // hash_values_options_ vector starts from the first column with an 'IN' restriction.
288
  // E.g. for a query "h1 = 1 and h2 in (2,3) and h3 in (4,5) and h4 = 6":
289
  // hashed_column_values() will be [1] and hash_values_options_ will be [[2,3],[4,5],[6]].
290
866
  int set_cols_size = req->hashed_column_values().size();
291
866
  auto unset_cols_size = hash_values_options_->size();
292
293
  // Initialize the missing columns with default values (e.g. h2, h3, h4 in example above).
294
866
  req->mutable_hashed_column_values()->Reserve(narrow_cast<int>(set_cols_size + unset_cols_size));
295
1.75k
  for (size_t i = 0; i < unset_cols_size; 
i++885
) {
296
885
    req->add_hashed_column_values();
297
885
  }
298
299
  // Set the right values for the missing/unset columns by converting partition index into positions
300
  // for each hash column and using the corresponding values from the hash values options vector.
301
  // E.g. In example above, with start_partition = 0:
302
  //    h4 = 6 since pos is "0 % 1 = 0", (start_position becomes 0 / 1 = 0).
303
  //    h3 = 4 since pos is "0 % 2 = 0", (start_position becomes 0 / 2 = 0).
304
  //    h2 = 2 since pos is "0 % 2 = 0", (start_position becomes 0 / 2 = 0).
305
1.75k
  for (auto i = unset_cols_size; i > 0;) {
306
885
    --i;
307
885
    const auto& options = (*hash_values_options_)[i];
308
885
    auto pos = start_partition % options.size();
309
885
    *req->mutable_hashed_column_values(narrow_cast<int>(i + set_cols_size)) = options[pos];
310
885
    start_partition /= options.size();
311
885
  }
312
866
}
313
314
369k
bool TnodeContext::FinishedReadingPartition() {
315
369k
  return rows_result_->paging_state().empty() ||
316
369k
      
(34.4k
query_state_->next_partition_key().empty()34.4k
&&
query_state_->next_row_key().empty()305
);
317
369k
}
318
319
5.44k
void TnodeContext::AdvanceToNextPartition(QLReadRequestPB *req) {
320
  // E.g. for a query "h1 = 1 and h2 in (2,3) and h3 in (4,5) and h4 = 6" partition index 2:
321
  // this will do, index: 2 -> 3 and hashed_column_values(): [1, 3, 4, 6] -> [1, 3, 5, 6].
322
5.44k
  current_partition_index_++;
323
5.44k
  uint64_t partition_counter = current_partition_index_;
324
  // Hash_values_options_ vector starts from the first column with an 'IN' restriction.
325
5.44k
  const int hash_key_size = req->hashed_column_values().size();
326
5.44k
  const auto fixed_cols_size = hash_key_size - hash_values_options_->size();
327
328
  // Set the right values for the missing/unset columns by converting partition index into positions
329
  // for each hash column and using the corresponding values from the hash values options vector.
330
  // E.g. In example above, with start_partition = 3:
331
  //    h4 = 6 since pos is "3 % 1 = 0", new partition counter is "3 / 1 = 3".
332
  //    h3 = 5 since pos is "3 % 2 = 1", pos is non-zero which guarantees previous cols don't need
333
  //    to be changed (i.e. are the same as for previous partition index) so we break.
334
5.51k
  for (size_t i = hash_key_size; i > fixed_cols_size;) {
335
5.51k
    --i;
336
5.51k
    const auto& options = (*hash_values_options_)[i - fixed_cols_size];
337
5.51k
    auto pos = partition_counter % options.size();
338
5.51k
    *req->mutable_hashed_column_values(narrow_cast<int>(i)) = options[pos];
339
5.51k
    if (pos != 0) 
break5.44k
; // The previous position hash values must be unchanged.
340
75
    partition_counter /= options.size();
341
75
  }
342
343
5.44k
  req->clear_hash_code();
344
5.44k
  req->clear_max_hash_code();
345
346
5.44k
  if (hash_code_from_partition_key_ops_.is_initialized())
347
3
    req->set_hash_code(*hash_code_from_partition_key_ops_);
348
5.44k
  if (max_hash_code_from_partition_key_ops_.is_initialized())
349
3
    req->set_max_hash_code(*max_hash_code_from_partition_key_ops_);
350
5.44k
}
351
352
9.77M
bool TnodeContext::HasPendingOperations() const {
353
9.77M
  for (const auto& op : ops_) {
354
164k
    if (!op->response().has_status()) {
355
75.0k
      return true;
356
75.0k
    }
357
164k
  }
358
9.69M
  if (child_context_) {
359
6.56k
    return child_context_->HasPendingOperations();
360
6.56k
  }
361
9.69M
  return false;
362
9.69M
}
363
364
405
void TnodeContext::SetUncoveredSelectOp(const YBqlReadOpPtr& select_op) {
365
405
  uncovered_select_op_ = select_op;
366
405
  const Schema& schema = static_cast<const PTSelectStmt*>(tnode_)->table()->InternalSchema();
367
405
  std::vector<ColumnId> key_column_ids;
368
405
  key_column_ids.reserve(schema.num_key_columns());
369
1.49k
  for (size_t idx = 0; idx < schema.num_key_columns(); 
idx++1.08k
) {
370
1.08k
    key_column_ids.emplace_back(schema.column_id(idx));
371
1.08k
  }
372
405
  keys_ = std::make_unique<QLRowBlock>(schema, key_column_ids);
373
405
}
374
375
QueryPagingState *TnodeContext::CreateQueryState(const StatementParameters& user_params,
376
7.49M
                                                 bool is_top_level_select) {
377
7.49M
  query_state_ = std::make_unique<QueryPagingState>(user_params, is_top_level_select);
378
7.49M
  return query_state_.get();
379
7.49M
}
380
381
7.48M
Status TnodeContext::ClearQueryState() {
382
7.48M
  RSTATUS_DCHECK(query_state_, Corruption, "Query state should not be null for SELECT");
383
7.48M
  rows_result_->ClearPagingState();
384
7.48M
  query_state_->ClearPagingState();
385
386
7.48M
  return Status::OK();
387
7.48M
}
388
389
Status TnodeContext::ComposeRowsResultForUser(const TreeNode* child_select_node,
390
2.19k
                                              bool for_new_batches) {
391
2.19k
  RSTATUS_DCHECK_EQ(tnode_->opcode(), TreeNodeOpcode::kPTSelectStmt,
392
2.19k
                    Corruption, "Only SELECT node can have nested query");
393
2.19k
  const auto* select_stmt = static_cast<const PTSelectStmt *>(tnode_);
394
395
  // Case 1:
396
  //   SELECT * FROM <table>;
397
2.19k
  if (!child_select_node) {
398
282
    if (rows_result_->has_paging_state() || 
for_new_batches2
) {
399
      // Paging state must be provided for two cases. Otherwise, we've reached end of result set.
400
      // - Docdb sent back rows_result with paging state.
401
      // - Seting up paging_state for user's next batches.
402
282
      RETURN_NOT_OK(query_state_->ComposePagingStateForUser());
403
282
      rows_result_->SetPagingState(query_state_->query_pb());
404
282
    }
405
282
    return Status::OK();
406
282
  }
407
408
  // Check for nested condition.
409
2.19k
  RSTATUS_DCHECK(child_context_ && child_select_node->opcode() == TreeNodeOpcode::kPTSelectStmt,
410
1.90k
                 Corruption, "Expecting nested context with a SELECT node");
411
412
  // Case 2:
413
  //   SELECT <fully_covered_columns> FROM <index>;
414
  // Move result from index query (child) to the table query (this parent node).
415
1.90k
  const auto* child_select = static_cast<const PTSelectStmt *>(child_select_node);
416
1.90k
  if (child_select->covers_fully()) {
417
1.54k
    return AppendRowsResult(std::move(child_context_->rows_result()));
418
1.54k
  }
419
420
  // Case 3:
421
  //   SELECT <any columns> FROM <table> WHERE primary_keys IN (SELECT primary_keys FROM <index>);
422
  // Compose result of the following fields.
423
  // - The rows_result should be from this node (rows_result_).
424
  // - The counter_state should be from this node (query_state_::counter_pb_).
425
  // - The read paging_state should be from the CHILD node (query_state_::query_pb_)
426
366
  if (!rows_result_) {
427
    // Allocate an empty rows_result that will be filled with paging state.
428
13
    rows_result_ = std::make_shared<RowsResult>(select_stmt);
429
13
  }
430
431
366
  if (child_context_->rows_result()->has_paging_state() &&
432
366
      
!query_state_->reached_select_limit()60
) {
433
    // If child node has paging state and LIMIT is not yet reached, provide paging state to users
434
    // to continue reading.
435
57
    RETURN_NOT_OK(
436
57
        query_state_->ComposePagingStateForUser(child_context_->query_state()->query_pb()));
437
57
    rows_result_->SetPagingState(query_state_->query_pb());
438
309
  } else {
439
    // Clear paging state once all requested rows were retrieved.
440
309
    rows_result_->ClearPagingState();
441
309
  }
442
443
366
  return Status::OK();
444
366
}
445
446
//--------------------------------------------------------------------------------------------------
447
448
QueryPagingState::QueryPagingState(const StatementParameters& user_params,
449
                                   bool is_top_level_read_node)
450
7.48M
    : max_fetch_size_(user_params.page_size()) {
451
7.48M
  LoadPagingStateFromUser(user_params, is_top_level_read_node);
452
453
  // Just default it to max_int.
454
7.48M
  if (max_fetch_size_ <= 0) {
455
0
    max_fetch_size_ = INT_MAX;
456
0
  }
457
7.48M
}
458
459
7.18M
void QueryPagingState::AdjustMaxFetchSizeToSelectLimit() {
460
7.18M
  int64_t limit = select_limit();
461
7.18M
  if (limit < 0) {
462
0
    return;
463
0
  }
464
465
7.18M
  int64_t count = read_count();
466
7.20M
  if (
count < limit7.18M
) {
467
7.20M
    int64_t wanted = limit - count;
468
7.20M
    if (
wanted < max_fetch_size_7.20M
) {
469
7.20M
      max_fetch_size_ = wanted;
470
7.20M
    }
471
18.4E
  } else {
472
18.4E
    max_fetch_size_ = 0;
473
18.4E
  }
474
7.18M
}
475
476
7.48M
void QueryPagingState::ClearPagingState() {
477
  // Clear only the paging state.
478
  // Keep the counter so that we knows how many rows have been processed.
479
7.48M
  query_pb_.Clear();
480
7.48M
}
481
482
void QueryPagingState::LoadPagingStateFromUser(const StatementParameters& user_params,
483
7.48M
                                               bool is_top_level_read_node) {
484
7.48M
  user_params.WritePagingState(&query_pb_);
485
486
  // Calculate "skip_count" and "read_count".
487
  // (1) Top level read node.
488
  //  - Either top-level SELECT or fully-covering INDEX query.
489
  //  - User "params::couter_pb_" should have the valid "counter_pb_"
490
  //
491
  // (2) Nested read node.
492
  //  - Zero out its counters. We don't use "counter_pb_" for this node because LIMIT and OFFSET
493
  //    restrictions are not applied to nested nodes.
494
  //  - Because nested node might be running different READ operators (with different hash values)
495
  //    for different calls from users, the counters from users' message are discarded here.
496
7.48M
  if (is_top_level_read_node) {
497
7.47M
    counter_pb_.CopyFrom(query_pb_.row_counter());
498
7.47M
  } else {
499
    // These values are not used, set them to zero.
500
5.54k
    set_skip_count(0);
501
5.54k
    set_read_count(0);
502
5.54k
  }
503
7.48M
}
504
505
282
Status QueryPagingState::ComposePagingStateForUser() {
506
  // Write the counters into the paging_state.
507
282
  query_pb_.mutable_row_counter()->CopyFrom(counter_pb_);
508
282
  return Status::OK();
509
282
}
510
511
57
Status QueryPagingState::ComposePagingStateForUser(const QLPagingStatePB& child_state) {
512
  // Write child_state.
513
57
  query_pb_.CopyFrom(child_state);
514
515
  // Write the counters into the paging_state.
516
57
  query_pb_.mutable_row_counter()->CopyFrom(counter_pb_);
517
518
57
  return Status::OK();
519
57
}
520
521
Status QueryPagingState::LoadPagingStateFromDocdb(const RowsResult::SharedPtr& rows_result,
522
                                                  int64_t number_of_new_rows,
523
7.52M
                                                  bool has_nested_query) {
524
  // Load "query_pb_" with the latest result from DocDB.
525
7.52M
  query_pb_.ParseFromString(rows_result->paging_state());
526
527
  // If DocDB processed the skipping rows, record it here.
528
7.52M
  if (total_rows_skipped() > 0) {
529
1.37k
    set_skip_count(total_rows_skipped());
530
1.37k
  }
531
532
7.52M
  if (
!has_nested_query7.52M
) {
533
    // SELECT <select_list> FROM <table or index>
534
    //   WHERE <filter_cond>
535
    //   LIMIT <limit> OFFSET <offset>
536
    // - DocDB processed the <limit> and <offset> restrictions.
537
    //   Either "reached_select_offset() == TRUE" OR number_of_new_rows == 0.
538
    // - Two DocDB::counters are used to compute here.
539
    //   . QLPagingStatePB::total_rows_skipped - Skip count in DocDB.
540
    //   . number_of_new_rows - Rows of data from DocDB after skipping.
541
7.52M
    set_read_count(read_count() + number_of_new_rows);
542
543
18.4E
  } else {
544
    // SELECT <select_list> FROM <table>
545
    //   WHERE
546
    //     primary_key IN (SELECT primary_key FROM <index> WHERE <index_cond>)
547
    //     AND
548
    //     <filter_cond>
549
    //   LIMIT <limit> OFFSET <offset>
550
    //
551
    // NOTE:
552
    // 1. Case INDEX query fully-covers the SELECT command.
553
    //    - DocDB counters are transfered from nested node to this node.
554
    //    - "reached_select_offset() == TRUE" OR number_of_new_rows == 0.
555
    //
556
    // 2. Case INDEX query does NOT fully-cover the SELECT command.
557
    //    - Values of <limit> and <offset> are NOT sent together with proto request to DocDB. They
558
    //      are computed and processed here in CQL layer.
559
    //    - For this case, "number_of_new_rows" is either 1 or 0.
560
    //      CQL assumes that outer SELECT reads at most one row at a time as it uses values of
561
    //      PRIMARY KEY (always unique) to read the rest of the columns of the <table>.
562
18.4E
    if (!reached_select_offset()) {
563
      // Since OFFSET is processed here, this must be case 2.
564
89
      RSTATUS_DCHECK_LE(number_of_new_rows, 1, Corruption, "Incorrect counter calculation");
565
89
      set_skip_count(skip_count() + number_of_new_rows);
566
18.4E
    } else {
567
18.4E
      set_read_count(read_count() + number_of_new_rows);
568
18.4E
    }
569
18.4E
  }
570
571
7.52M
  return Status::OK();
572
7.52M
}
573
574
9.46M
const std::string& ExecContext::stmt() const {
575
9.46M
  return parse_tree_.stmt();
576
9.46M
}
577
578
}  // namespace ql
579
}  // namespace yb