YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_sample.cc
Line
Count
Source
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_sample.h"
17
18
#include "yb/gutil/casts.h"
19
20
namespace yb {
21
namespace pggate {
22
23
using std::make_shared;
24
25
//--------------------------------------------------------------------------------------------------
26
// PgSample
27
//--------------------------------------------------------------------------------------------------
28
29
PgSample::PgSample(PgSession::ScopedRefPtr pg_session,
30
                   const int targrows,
31
                   const PgObjectId& table_id)
32
79
    : PgDmlRead(pg_session, table_id, PgObjectId(), nullptr), targrows_(targrows) {}
33
34
79
PgSample::~PgSample() {
35
79
}
36
37
79
Status PgSample::Prepare() {
38
  // Setup target and bind descriptor.
39
79
  target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_)));
40
79
  bind_ = PgTable(nullptr);
41
42
  // Setup sample picker as secondary index query
43
79
  secondary_index_query_ = std::make_unique<PgSamplePicker>(pg_session_, table_id_);
44
79
  RETURN_NOT_OK(secondary_index_query_->Prepare());
45
46
  // Prepare read op to fetch rows
47
79
  auto read_op = std::make_shared<PgsqlReadOp>(*target_);
48
79
  read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request());
49
79
  doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));
50
51
79
  return Status::OK();
52
79
}
53
54
79
Status PgSample::InitRandomState(double rstate_w, uint64 rand_state) {
55
79
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
56
79
  return down_cast<PgSamplePicker*>(secondary_index_query_.get())
57
79
      ->PrepareSamplingState(targrows_, rstate_w, rand_state);
58
79
}
59
60
270
Status PgSample::SampleNextBlock(bool *has_more) {
61
270
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
62
270
  if (!secondary_index_query_->is_executed()) {
63
79
    secondary_index_query_->set_is_executed(true);
64
79
    RETURN_NOT_OK(secondary_index_query_->Exec(nullptr));
65
79
  }
66
67
270
  auto picker = down_cast<PgSamplePicker*>(secondary_index_query_.get());
68
270
  *has_more = VERIFY_RESULT(picker->ProcessNextBlock());
69
70
270
  return Status::OK();
71
270
}
72
73
79
Status PgSample::GetEstimatedRowCount(double *liverows, double *deadrows) {
74
79
  SCHECK(secondary_index_query_ != nullptr, RuntimeError, "PgSample has not been prepared");
75
79
  return down_cast<PgSamplePicker*>(secondary_index_query_.get())
76
79
      ->GetEstimatedRowCount(liverows, deadrows);
77
79
}
78
79
//--------------------------------------------------------------------------------------------------
80
// PgSamplePicker
81
//--------------------------------------------------------------------------------------------------
82
83
PgSamplePicker::PgSamplePicker(PgSession::ScopedRefPtr pg_session,
84
                               const PgObjectId& table_id)
85
79
    : PgSelectIndex(pg_session, table_id, PgObjectId(), nullptr) {}
86
87
79
PgSamplePicker::~PgSamplePicker() {
88
79
}
89
90
79
Status PgSamplePicker::Prepare() {
91
79
  target_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_)));
92
79
  bind_ = PgTable(nullptr);
93
79
  auto read_op = std::make_shared<PgsqlReadOp>(*target_);
94
79
  read_req_ = std::shared_ptr<PgsqlReadRequestPB>(read_op, &read_op->read_request());
95
79
  doc_op_ = make_shared<PgDocReadOp>(pg_session_, &target_, std::move(read_op));
96
79
  return Status::OK();
97
79
}
98
99
79
Status PgSamplePicker::PrepareSamplingState(int targrows, double rstate_w, uint64 rand_state) {
100
79
  PgsqlSamplingStatePB* sampling_state = read_req_->mutable_sampling_state();
101
79
  sampling_state->set_targrows(targrows);      // target sample size
102
79
  sampling_state->set_numrows(0);              // current number of rows selected
103
79
  sampling_state->set_samplerows(0);           // rows scanned so far
104
79
  sampling_state->set_rowstoskip(-1);          // rows to skip before selecting another
105
79
  sampling_state->set_rstate_w(rstate_w);      // Vitter algorithm's W
106
79
  sampling_state->set_rand_state(rand_state);  // random generator's state
107
79
  reservoir_ = std::make_unique<std::string[]>(targrows);
108
79
  return Status::OK();
109
79
}
110
111
270
Result<bool> PgSamplePicker::ProcessNextBlock() {
112
  // Process previous responses
113
379
  for (auto rowset_iter = rowsets_.begin(); rowset_iter != rowsets_.end();) {
114
191
    if (rowset_iter->is_eof()) {
115
109
      rowset_iter = rowsets_.erase(rowset_iter);
116
82
    } else {
117
      // Update reservoir with newly selected rows.
118
82
      RETURN_NOT_OK(rowset_iter->ProcessSparseSystemColumns(reservoir_.get()));
119
82
      return true;
120
82
    }
121
191
  }
122
  // Request more data, if exhausted, mark reservoir as ready and let caller know
123
188
  if (VERIFY_RESULT(FetchDataFromServer())) {
124
109
    return true;
125
79
  } else {
126
    // Skip fetch if the table is empty
127
79
    reservoir_ready_ = !reservoir_[0].empty();
128
79
    return false;
129
79
  }
130
188
}
131
132
158
Result<bool> PgSamplePicker::FetchYbctidBatch(const vector<Slice> **ybctids) {
133
  // Check if all ybctids are already returned
134
158
  if (!reservoir_ready_) {
135
106
    *ybctids = nullptr;
136
106
    return false;
137
106
  }
138
  // Get reservoir capacity. There may be less rows then that
139
52
  int targrows = read_req_->sampling_state().targrows();
140
  // Prepare target vector
141
52
  ybctids_.clear();
142
52
  ybctids_.reserve(targrows);
143
  // Create pointers to the items in the reservoir
144
86.6k
  for (int idx = 0; idx < targrows; idx++) {
145
86.6k
    if (reservoir_[idx].empty()) {
146
      // Algorithm fills up the reservoir first. Empty row means there are
147
      // no mmore data
148
52
      break;
149
52
    }
150
86.5k
    ybctids_.emplace_back(reservoir_[idx]);
151
86.5k
  }
152
52
  reservoir_ready_ = false;
153
52
  *ybctids = &ybctids_;
154
52
  return true;
155
52
}
156
157
79
Status PgSamplePicker::GetEstimatedRowCount(double *liverows, double *deadrows) {
158
79
  return down_cast<PgDocReadOp*>(doc_op_.get())->GetEstimatedRowCount(liverows, deadrows);
159
79
}
160
161
}  // namespace pggate
162
}  // namespace yb