YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_sample.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_sample.h"
17
18
#include "yb/gutil/casts.h"
19
20
namespace yb {
21
namespace pggate {
22
23
using std::make_shared;
24
25
//--------------------------------------------------------------------------------------------------
26
// PgSample
27
//--------------------------------------------------------------------------------------------------
28
29
PgSample::PgSample(PgSession::ScopedRefPtr pg_session,
30
                   const int targrows,
31
                   const PgObjectId& table_id)
32
178
    : PgDmlRead(pg_session, table_id, PgObjectId(), nullptr), targrows_(targrows) {}
33
34
178
PgSample::~PgSample() {
35
178
}
36
37
178
Status PgSample::Prepare() {
38
  // Setup target and bind descriptor.
39
178
  target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_)));
40
0
  bind_ = PgTable(nullptr);
41
42
  // Setup sample picker as secondary index query
43
178
  secondary_index_query_ = std::make_unique<PgSamplePicker>(pg_session_, table_id_);
44
178
  RETURN_NOT_OK(secondary_index_query_->Prepare());
45
46
  // Prepare read op to fetch rows
47
178
  auto read_op = std::make_shared<PgsqlReadOp>(*target_);
48
178
  read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request());
49
178
  doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));
50
51
178
  return Status::OK();
52
178
}
53
54
178
Status PgSample::InitRandomState(double rstate_w, uint64 rand_state) {
55
178
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
56
178
  return down_cast<PgSamplePicker*>(secondary_index_query_.get())
57
178
      ->PrepareSamplingState(targrows_, rstate_w, rand_state);
58
178
}
59
60
939
Status PgSample::SampleNextBlock(bool *has_more) {
61
939
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
62
939
  if (!secondary_index_query_->is_executed()) {
63
178
    secondary_index_query_->set_is_executed(true);
64
178
    RETURN_NOT_OK(secondary_index_query_->Exec(nullptr));
65
178
  }
66
67
939
  auto picker = down_cast<PgSamplePicker*>(secondary_index_query_.get());
68
939
  *has_more = VERIFY_RESULT(picker->ProcessNextBlock());
69
70
0
  return Status::OK();
71
939
}
72
73
178
Status PgSample::GetEstimatedRowCount(double *liverows, double *deadrows) {
74
178
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
75
178
  return down_cast<PgSamplePicker*>(secondary_index_query_.get())
76
178
      ->GetEstimatedRowCount(liverows, deadrows);
77
178
}
78
79
//--------------------------------------------------------------------------------------------------
80
// PgSamplePicker
81
//--------------------------------------------------------------------------------------------------
82
83
PgSamplePicker::PgSamplePicker(PgSession::ScopedRefPtr pg_session,
84
                               const PgObjectId& table_id)
85
178
    : PgSelectIndex(pg_session, table_id, PgObjectId(), nullptr) {}
86
87
178
PgSamplePicker::~PgSamplePicker() {
88
178
}
89
90
178
Status PgSamplePicker::Prepare() {
91
178
  target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_)));
92
0
  bind_ = PgTable(nullptr);
93
178
  auto read_op = std::make_shared<PgsqlReadOp>(*target_);
94
178
  read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request());
95
178
  doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));
96
178
  return Status::OK();
97
178
}
98
99
178
Status PgSamplePicker::PrepareSamplingState(int targrows, double rstate_w, uint64 rand_state) {
100
178
  PgsqlSamplingStatePB* sampling_state = read_req_->mutable_sampling_state();
101
178
  sampling_state->set_targrows(targrows);      // target sample size
102
178
  sampling_state->set_numrows(0);              // current number of rows selected
103
178
  sampling_state->set_samplerows(0);           // rows scanned so far
104
178
  sampling_state->set_rowstoskip(-1);          // rows to skip before selecting another
105
178
  sampling_state->set_rstate_w(rstate_w);      // Vitter algorithm's W
106
178
  sampling_state->set_rand_state(rand_state);  // random generator's state
107
178
  reservoir_ = std::make_unique<std::string[]>(targrows);
108
178
  return Status::OK();
109
178
}
110
111
939
Result<bool> PgSamplePicker::ProcessNextBlock() {
112
  // Process previous responses
113
1.34k
  for (auto rowset_iter = rowsets_.begin(); rowset_iter != rowsets_.end();) {
114
761
    if (rowset_iter->is_eof()) {
115
406
      rowset_iter = rowsets_.erase(rowset_iter);
116
406
    } else {
117
      // Update reservoir with newly selected rows.
118
355
      RETURN_NOT_OK(rowset_iter->ProcessSparseSystemColumns(reservoir_.get()));
119
355
      return true;
120
355
    }
121
761
  }
122
  // Request more data, if exhausted, mark reservoir as ready and let caller know
123
584
  if (VERIFY_RESULT(FetchDataFromServer())) {
124
406
    return true;
125
406
  } else {
126
    // Skip fetch if the table is empty
127
178
    reservoir_ready_ = !reservoir_[0].empty();
128
178
    return false;
129
178
  }
130
584
}
131
132
356
Result<bool> PgSamplePicker::FetchYbctidBatch(const vector<Slice> **ybctids) {
133
  // Check if all ybctids are already returned
134
356
  if (!reservoir_ready_) {
135
212
    *ybctids = nullptr;
136
212
    return false;
137
212
  }
138
  // Get reservoir capacity. There may be less rows then that
139
144
  int targrows = read_req_->sampling_state().targrows();
140
  // Prepare target vector
141
144
  ybctids_.clear();
142
144
  ybctids_.reserve(targrows);
143
  // Create pointers to the items in the reservoir
144
161k
  for (int idx = 0; idx < targrows; 
idx++161k
) {
145
161k
    if (reservoir_[idx].empty()) {
146
      // Algorithm fills up the reservoir first. Empty row means there are
147
      // no mmore data
148
144
      break;
149
144
    }
150
161k
    ybctids_.emplace_back(reservoir_[idx]);
151
161k
  }
152
144
  reservoir_ready_ = false;
153
144
  *ybctids = &ybctids_;
154
144
  return true;
155
356
}
156
157
178
Status PgSamplePicker::GetEstimatedRowCount(double *liverows, double *deadrows) {
158
178
  return down_cast<PgDocReadOp*>(doc_op_.get())->GetEstimatedRowCount(liverows, deadrows);
159
178
}
160
161
}  // namespace pggate
162
}  // namespace yb