/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_sample.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/pg_sample.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | |
20 | | namespace yb { |
21 | | namespace pggate { |
22 | | |
23 | | using std::make_shared; |
24 | | |
25 | | //-------------------------------------------------------------------------------------------------- |
26 | | // PgSample |
27 | | //-------------------------------------------------------------------------------------------------- |
28 | | |
29 | | PgSample::PgSample(PgSession::ScopedRefPtr pg_session, |
30 | | const int targrows, |
31 | | const PgObjectId& table_id) |
32 | 178 | : PgDmlRead(pg_session, table_id, PgObjectId(), nullptr), targrows_(targrows) {} |
33 | | |
34 | 178 | PgSample::~PgSample() { |
35 | 178 | } |
36 | | |
37 | 178 | Status PgSample::Prepare() { |
38 | | // Setup target and bind descriptor. |
39 | 178 | target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_))); |
40 | 0 | bind_ = PgTable(nullptr); |
41 | | |
42 | | // Setup sample picker as secondary index query |
43 | 178 | secondary_index_query_ = std::make_unique<PgSamplePicker>(pg_session_, table_id_); |
44 | 178 | RETURN_NOT_OK(secondary_index_query_->Prepare()); |
45 | | |
46 | | // Prepare read op to fetch rows |
47 | 178 | auto read_op = std::make_shared<PgsqlReadOp>(*target_); |
48 | 178 | read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request()); |
49 | 178 | doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op)); |
50 | | |
51 | 178 | return Status::OK(); |
52 | 178 | } |
53 | | |
54 | 178 | Status PgSample::InitRandomState(double rstate_w, uint64 rand_state) { |
55 | 178 | SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared"); |
56 | 178 | return down_cast<PgSamplePicker*>(secondary_index_query_.get()) |
57 | 178 | ->PrepareSamplingState(targrows_, rstate_w, rand_state); |
58 | 178 | } |
59 | | |
60 | 939 | Status PgSample::SampleNextBlock(bool *has_more) { |
61 | 939 | SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared"); |
62 | 939 | if (!secondary_index_query_->is_executed()) { |
63 | 178 | secondary_index_query_->set_is_executed(true); |
64 | 178 | RETURN_NOT_OK(secondary_index_query_->Exec(nullptr)); |
65 | 178 | } |
66 | | |
67 | 939 | auto picker = down_cast<PgSamplePicker*>(secondary_index_query_.get()); |
68 | 939 | *has_more = VERIFY_RESULT(picker->ProcessNextBlock()); |
69 | | |
70 | 0 | return Status::OK(); |
71 | 939 | } |
72 | | |
73 | 178 | Status PgSample::GetEstimatedRowCount(double *liverows, double *deadrows) { |
74 | 178 | SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared"); |
75 | 178 | return down_cast<PgSamplePicker*>(secondary_index_query_.get()) |
76 | 178 | ->GetEstimatedRowCount(liverows, deadrows); |
77 | 178 | } |
78 | | |
79 | | //-------------------------------------------------------------------------------------------------- |
80 | | // PgSamplePicker |
81 | | //-------------------------------------------------------------------------------------------------- |
82 | | |
83 | | PgSamplePicker::PgSamplePicker(PgSession::ScopedRefPtr pg_session, |
84 | | const PgObjectId& table_id) |
85 | 178 | : PgSelectIndex(pg_session, table_id, PgObjectId(), nullptr) {} |
86 | | |
87 | 178 | PgSamplePicker::~PgSamplePicker() { |
88 | 178 | } |
89 | | |
90 | 178 | Status PgSamplePicker::Prepare() { |
91 | 178 | target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_))); |
92 | 0 | bind_ = PgTable(nullptr); |
93 | 178 | auto read_op = std::make_shared<PgsqlReadOp>(*target_); |
94 | 178 | read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request()); |
95 | 178 | doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op)); |
96 | 178 | return Status::OK(); |
97 | 178 | } |
98 | | |
99 | 178 | Status PgSamplePicker::PrepareSamplingState(int targrows, double rstate_w, uint64 rand_state) { |
100 | 178 | PgsqlSamplingStatePB* sampling_state = read_req_->mutable_sampling_state(); |
101 | 178 | sampling_state->set_targrows(targrows); // target sample size |
102 | 178 | sampling_state->set_numrows(0); // current number of rows selected |
103 | 178 | sampling_state->set_samplerows(0); // rows scanned so far |
104 | 178 | sampling_state->set_rowstoskip(-1); // rows to skip before selecting another |
105 | 178 | sampling_state->set_rstate_w(rstate_w); // Vitter algorithm's W |
106 | 178 | sampling_state->set_rand_state(rand_state); // random generator's state |
107 | 178 | reservoir_ = std::make_unique<std::string[]>(targrows); |
108 | 178 | return Status::OK(); |
109 | 178 | } |
110 | | |
111 | 939 | Result<bool> PgSamplePicker::ProcessNextBlock() { |
112 | | // Process previous responses |
113 | 1.34k | for (auto rowset_iter = rowsets_.begin(); rowset_iter != rowsets_.end();) { |
114 | 761 | if (rowset_iter->is_eof()) { |
115 | 406 | rowset_iter = rowsets_.erase(rowset_iter); |
116 | 406 | } else { |
117 | | // Update reservoir with newly selected rows. |
118 | 355 | RETURN_NOT_OK(rowset_iter->ProcessSparseSystemColumns(reservoir_.get())); |
119 | 355 | return true; |
120 | 355 | } |
121 | 761 | } |
122 | | // Request more data, if exhausted, mark reservoir as ready and let caller know |
123 | 584 | if (VERIFY_RESULT(FetchDataFromServer())) { |
124 | 406 | return true; |
125 | 406 | } else { |
126 | | // Skip fetch if the table is empty |
127 | 178 | reservoir_ready_ = !reservoir_[0].empty(); |
128 | 178 | return false; |
129 | 178 | } |
130 | 584 | } |
131 | | |
132 | 356 | Result<bool> PgSamplePicker::FetchYbctidBatch(const vector<Slice> **ybctids) { |
133 | | // Check if all ybctids are already returned |
134 | 356 | if (!reservoir_ready_) { |
135 | 212 | *ybctids = nullptr; |
136 | 212 | return false; |
137 | 212 | } |
138 | | // Get reservoir capacity. There may be less rows then that |
139 | 144 | int targrows = read_req_->sampling_state().targrows(); |
140 | | // Prepare target vector |
141 | 144 | ybctids_.clear(); |
142 | 144 | ybctids_.reserve(targrows); |
143 | | // Create pointers to the items in the reservoir |
144 | 161k | for (int idx = 0; idx < targrows; idx++161k ) { |
145 | 161k | if (reservoir_[idx].empty()) { |
146 | | // Algorithm fills up the reservoir first. Empty row means there are |
147 | | // no mmore data |
148 | 144 | break; |
149 | 144 | } |
150 | 161k | ybctids_.emplace_back(reservoir_[idx]); |
151 | 161k | } |
152 | 144 | reservoir_ready_ = false; |
153 | 144 | *ybctids = &ybctids_; |
154 | 144 | return true; |
155 | 356 | } |
156 | | |
157 | 178 | Status PgSamplePicker::GetEstimatedRowCount(double *liverows, double *deadrows) { |
158 | 178 | return down_cast<PgDocReadOp*>(doc_op_.get())->GetEstimatedRowCount(liverows, deadrows); |
159 | 178 | } |
160 | | |
161 | | } // namespace pggate |
162 | | } // namespace yb |