/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_doc_op.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | //-------------------------------------------------------------------------------------------------- |
14 | | |
15 | | #include "yb/yql/pggate/pg_doc_op.h" |
16 | | |
17 | | #include <algorithm> |
18 | | #include <memory> |
19 | | #include <string> |
20 | | #include <utility> |
21 | | #include <vector> |
22 | | |
23 | | #include "yb/common/row_mark.h" |
24 | | |
25 | | #include "yb/gutil/casts.h" |
26 | | #include "yb/gutil/strings/escaping.h" |
27 | | |
28 | | #include "yb/util/status_format.h" |
29 | | #include "yb/util/status_log.h" |
30 | | |
31 | | #include "yb/yql/pggate/pg_expr.h" |
32 | | #include "yb/yql/pggate/pg_table.h" |
33 | | #include "yb/yql/pggate/pg_tools.h" |
34 | | #include "yb/yql/pggate/pggate_flags.h" |
35 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
36 | | |
37 | | using std::lower_bound; |
38 | | using std::list; |
39 | | using std::vector; |
40 | | using std::shared_ptr; |
41 | | using std::make_shared; |
42 | | using std::unique_ptr; |
43 | | using std::move; |
44 | | |
45 | | using yb::client::YBPgsqlOp; |
46 | | using yb::client::YBPgsqlReadOp; |
47 | | using yb::client::YBPgsqlWriteOp; |
48 | | |
49 | | namespace yb { |
50 | | namespace pggate { |
51 | | |
52 | 3.22M | PgDocResult::PgDocResult(rpc::SidecarPtr&& data) : data_(std::move(data)) { |
53 | 3.22M | PgDocData::LoadCache(data_, &row_count_, &row_iterator_); |
54 | 3.22M | } |
55 | | |
56 | | PgDocResult::PgDocResult(rpc::SidecarPtr&& data, std::list<int64_t>&& row_orders) |
57 | 6.34k | : data_(std::move(data)), row_orders_(move(row_orders)) { |
58 | 6.34k | PgDocData::LoadCache(data_, &row_count_, &row_iterator_); |
59 | 6.34k | } |
60 | | |
61 | 3.22M | PgDocResult::~PgDocResult() { |
62 | 3.22M | } |
63 | | |
64 | 52.4M | int64_t PgDocResult::NextRowOrder() { |
65 | 52.4M | return row_orders_.size() > 0 ? row_orders_.front()1.57M : -150.8M ; |
66 | 52.4M | } |
67 | | |
68 | | Status PgDocResult::WritePgTuple(const std::vector<PgExpr*>& targets, PgTuple *pg_tuple, |
69 | 52.3M | int64_t *row_order) { |
70 | 52.3M | int attr_num = 0; |
71 | 649M | for (const PgExpr *target : targets) { |
72 | 649M | if (!target->is_colref() && !target->is_aggregate()15.5k ) { |
73 | 0 | return STATUS(InternalError, |
74 | 0 | "Unexpected expression, only column refs or aggregates supported here"); |
75 | 0 | } |
76 | 649M | if (target->opcode() == PgColumnRef::Opcode::PG_EXPR_COLREF) { |
77 | 648M | attr_num = static_cast<const PgColumnRef *>(target)->attr_num(); |
78 | 648M | } else { |
79 | 85.1k | attr_num++; |
80 | 85.1k | } |
81 | | |
82 | 649M | PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_); |
83 | 649M | target->TranslateData(&row_iterator_, header, attr_num - 1, pg_tuple); |
84 | 649M | } |
85 | | |
86 | 52.3M | if (row_orders_.size()) { |
87 | 1.44M | *row_order = row_orders_.front(); |
88 | 1.44M | row_orders_.pop_front(); |
89 | 50.8M | } else { |
90 | 50.8M | *row_order = -1; |
91 | 50.8M | } |
92 | 52.3M | return Status::OK(); |
93 | 52.3M | } |
94 | | |
95 | 5.63k | Status PgDocResult::ProcessSystemColumns() { |
96 | 5.63k | if (syscol_processed_) { |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | 5.63k | syscol_processed_ = true; |
100 | | |
101 | 1.38M | for (int i = 0; i < row_count_; i++1.38M ) { |
102 | 1.38M | PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_); |
103 | 1.38M | SCHECK(!header.is_null(), InternalError, "System column ybctid cannot be NULL"); |
104 | | |
105 | 1.38M | int64_t data_size; |
106 | 1.38M | size_t read_size = PgDocData::ReadNumber(&row_iterator_, &data_size); |
107 | 1.38M | row_iterator_.remove_prefix(read_size); |
108 | | |
109 | 1.38M | ybctids_.emplace_back(row_iterator_.data(), data_size); |
110 | 1.38M | row_iterator_.remove_prefix(data_size); |
111 | 1.38M | } |
112 | 5.63k | return Status::OK(); |
113 | 5.63k | } |
114 | | |
115 | 355 | Status PgDocResult::ProcessSparseSystemColumns(std::string *reservoir) { |
116 | | // Process block sampling result returned from DocDB. |
117 | | // Results come as (index, ybctid) tuples where index is the position in the reservoir of |
118 | | // predetermined size. DocDB returns ybctids with sequential indexes first, starting from 0 and |
119 | | // until reservoir is full. Then it returns ybctids with random indexes, so they replace previous |
120 | | // ybctids. |
121 | 162k | for (int i = 0; i < row_count_; i++161k ) { |
122 | | // Read index column |
123 | 161k | PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_); |
124 | 161k | SCHECK(!header.is_null(), InternalError, "Reservoir index cannot be NULL"); |
125 | 161k | int32_t index; |
126 | 161k | size_t read_size = PgDocData::ReadNumber(&row_iterator_, &index); |
127 | 161k | row_iterator_.remove_prefix(read_size); |
128 | | // Read ybctid column |
129 | 161k | header = PgDocData::ReadDataHeader(&row_iterator_); |
130 | 161k | SCHECK(!header.is_null(), InternalError, "System column ybctid cannot be NULL"); |
131 | 161k | int64_t data_size; |
132 | 161k | read_size = PgDocData::ReadNumber(&row_iterator_, &data_size); |
133 | 161k | row_iterator_.remove_prefix(read_size); |
134 | | |
135 | | // Copy ybctid data to the reservoir |
136 | 161k | reservoir[index].assign(reinterpret_cast<const char *>(row_iterator_.data()), data_size); |
137 | 161k | row_iterator_.remove_prefix(data_size); |
138 | 161k | } |
139 | 355 | return Status::OK(); |
140 | 355 | } |
141 | | |
142 | | //-------------------------------------------------------------------------------------------------- |
143 | | |
144 | | PgDocOp::PgDocOp(const PgSession::ScopedRefPtr& pg_session, PgTable* table) |
145 | 8.80M | : pg_session_(pg_session), table_(*table) { |
146 | 8.80M | } |
147 | | |
148 | 8.80M | PgDocOp::~PgDocOp() { |
149 | | // Wait for result in case request was sent. |
150 | | // Operation can be part of transaction it is necessary to complete it before transaction commit. |
151 | 8.80M | if (response_.Valid()) { |
152 | 7.39k | auto result = response_.Get(); |
153 | 7.39k | WARN_NOT_OK(result, "Operation completion failed"); |
154 | 7.39k | } |
155 | 8.80M | } |
156 | | |
157 | 8.79M | Status PgDocOp::ExecuteInit(const PgExecParameters *exec_params) { |
158 | 8.79M | end_of_data_ = false; |
159 | 8.79M | if (exec_params) { |
160 | 464k | exec_params_ = *exec_params; |
161 | 464k | } |
162 | 8.79M | return Status::OK(); |
163 | 8.79M | } |
164 | | |
165 | 1.99k | const PgExecParameters& PgDocOp::ExecParameters() const { |
166 | 1.99k | return exec_params_; |
167 | 1.99k | } |
168 | | |
169 | 8.79M | Result<RequestSent> PgDocOp::Execute(bool force_non_bufferable) { |
170 | | // As of 09/25/2018, DocDB doesn't cache or keep any execution state for a statement, so we |
171 | | // have to call query execution every time. |
172 | | // - Normal SQL convention: Exec, Fetch, Fetch, ... |
173 | | // - Our SQL convention: Exec & Fetch, Exec & Fetch, ... |
174 | | // This refers to the sequence of operations between this layer and the underlying tablet |
175 | | // server / DocDB layer, not to the sequence of operations between the PostgreSQL layer and this |
176 | | // layer. |
177 | 8.79M | exec_status_ = SendRequest(force_non_bufferable); |
178 | 8.79M | RETURN_NOT_OK(exec_status_); |
179 | 8.79M | return RequestSent(response_.Valid()); |
180 | 8.79M | } |
181 | | |
182 | 2.55M | Status PgDocOp::GetResult(list<PgDocResult> *rowsets) { |
183 | | // If the execution has error, return without reading any rows. |
184 | 2.55M | RETURN_NOT_OK(exec_status_); |
185 | | |
186 | 2.55M | if (!end_of_data_) { |
187 | | // Send request now in case prefetching was suppressed. |
188 | 1.76M | if (suppress_next_result_prefetching_ && !response_.Valid()27.6k ) { |
189 | 2.70k | exec_status_ = SendRequest(true /* force_non_bufferable */); |
190 | 2.70k | RETURN_NOT_OK(exec_status_); |
191 | 2.70k | } |
192 | | |
193 | 1.76M | DCHECK(response_.Valid()); |
194 | 1.76M | auto result = response_.Get(); |
195 | 1.76M | auto rows = VERIFY_RESULT1.71M (ProcessResponse(result));1.71M |
196 | | // In case ProcessResponse doesn't fail with an error |
197 | | // it should return non empty rows and/or set end_of_data_. |
198 | 0 | DCHECK(!rows.empty() || end_of_data_); |
199 | 1.71M | rowsets->splice(rowsets->end(), rows); |
200 | | // Prefetch next portion of data if needed. |
201 | 1.71M | if (!(end_of_data_ || suppress_next_result_prefetching_142k )) { |
202 | 121k | exec_status_ = SendRequest(true /* force_non_bufferable */); |
203 | 121k | RETURN_NOT_OK(exec_status_); |
204 | 121k | } |
205 | 1.71M | } |
206 | | |
207 | 2.50M | return Status::OK(); |
208 | 2.55M | } |
209 | | |
210 | 53.4k | Result<int32_t> PgDocOp::GetRowsAffectedCount() const { |
211 | 53.4k | RETURN_NOT_OK(exec_status_); |
212 | 53.4k | DCHECK(end_of_data_); |
213 | 53.4k | return rows_affected_count_; |
214 | 53.4k | } |
215 | | |
216 | 300k | Status PgDocOp::ClonePgsqlOps(size_t op_count) { |
217 | | // Allocate batch operator, one per partition. |
218 | 300k | SCHECK(op_count > 0, InternalError, "Table must have at least one partition"); |
219 | 300k | if (300k pgsql_ops_.size() < op_count300k ) { |
220 | 300k | pgsql_ops_.resize(op_count); |
221 | 2.95M | for (auto& op : pgsql_ops_) { |
222 | 2.95M | op = CloneFromTemplate(); |
223 | | |
224 | | // Initialize as inactive. Turn it on when setup argument for a specific partition. |
225 | 2.95M | op->set_active(false); |
226 | 2.95M | } |
227 | | |
228 | | // Set parallism_level_ to maximum possible of operators to be executed at one time. |
229 | 300k | parallelism_level_ = pgsql_ops_.size(); |
230 | 300k | } |
231 | | |
232 | 300k | return Status::OK(); |
233 | 300k | } |
234 | | |
235 | 147k | void PgDocOp::MoveInactiveOpsOutside() { |
236 | | // Move inactive op to the end. |
237 | 147k | const ssize_t total_op_count = pgsql_ops_.size(); |
238 | 147k | bool has_sorting_order = !batch_row_orders_.empty(); |
239 | 147k | ssize_t left_iter = 0; |
240 | 147k | ssize_t right_iter = total_op_count - 1; |
241 | 150k | while (true) { |
242 | | // Advance left iterator. |
243 | 298k | while (left_iter < total_op_count && pgsql_ops_[left_iter]->is_active()156k ) left_iter++148k ; |
244 | | |
245 | | // Advance right iterator. |
246 | 160k | while (right_iter >= 0160k && !pgsql_ops_[right_iter]->is_active()) right_iter--10.3k ; |
247 | | |
248 | | // Move inactive operator to the end by swapping the pointers. |
249 | 150k | if (left_iter < right_iter) { |
250 | 2.47k | std::swap(pgsql_ops_[left_iter], pgsql_ops_[right_iter]); |
251 | 2.47k | if (has_sorting_order) { |
252 | 2.22k | std::swap(batch_row_orders_[left_iter], batch_row_orders_[right_iter]); |
253 | 2.22k | } |
254 | 147k | } else { |
255 | 147k | break; |
256 | 147k | } |
257 | 150k | } |
258 | | |
259 | | // Set active op count. |
260 | 147k | active_op_count_ = left_iter; |
261 | 147k | } |
262 | | |
263 | 8.92M | Status PgDocOp::SendRequest(bool force_non_bufferable) { |
264 | 8.92M | DCHECK(exec_status_.ok()); |
265 | 8.92M | DCHECK(!response_.Valid()); |
266 | 8.92M | exec_status_ = SendRequestImpl(force_non_bufferable); |
267 | 8.92M | return exec_status_; |
268 | 8.92M | } |
269 | | |
270 | 8.92M | Status PgDocOp::SendRequestImpl(bool force_non_bufferable) { |
271 | | // Populate collected information into protobuf requests before sending to DocDB. |
272 | 8.92M | RETURN_NOT_OK(CreateRequests()); |
273 | | |
274 | | // Currently, send and receive individual request of a batch is not yet supported |
275 | | // - Among statements, only queries by BASE-YBCTIDs need to be sent and received in batches |
276 | | // to honor the order of how the BASE-YBCTIDs are kept in the database. |
277 | | // - For other type of statements, it could be more efficient to send them individually. |
278 | 8.92M | SCHECK(wait_for_batch_completion_, InternalError, |
279 | 8.92M | "Only send and receive the whole batch is supported"); |
280 | | |
281 | | // Send at most "parallelism_level_" number of requests at one time. |
282 | 8.92M | size_t send_count = std::min(parallelism_level_, active_op_count_); |
283 | 8.92M | response_ = VERIFY_RESULT8.92M (8.92M pg_session_->RunAsync( |
284 | 0 | pgsql_ops_.data(), send_count, *table_, &GetReadTime(), force_non_bufferable)); |
285 | | |
286 | 0 | return Status::OK(); |
287 | 8.92M | } |
288 | | |
289 | 1.76M | Result<std::list<PgDocResult>> PgDocOp::ProcessResponse(const Status& status) { |
290 | | // Check operation status. |
291 | 1.76M | DCHECK(exec_status_.ok()); |
292 | 1.76M | exec_status_ = status; |
293 | 1.76M | if (exec_status_.ok()) { |
294 | 1.71M | auto result = ProcessResponseImpl(); |
295 | 1.71M | if (result.ok()1.71M ) { |
296 | 1.71M | return result; |
297 | 1.71M | } |
298 | 18.4E | exec_status_ = result.status(); |
299 | 18.4E | } |
300 | 50.7k | return exec_status_; |
301 | 1.76M | } |
302 | | |
303 | 1.71M | Result<std::list<PgDocResult>> PgDocOp::ProcessResponseResult() { |
304 | 1.71M | VLOG(1) << __PRETTY_FUNCTION__ << ": Received response for request " << this342 ; |
305 | | |
306 | | // Process data coming from tablet server. |
307 | 1.71M | std::list<PgDocResult> result; |
308 | 1.71M | bool no_sorting_order = batch_row_orders_.empty(); |
309 | | |
310 | 1.71M | rows_affected_count_ = 0; |
311 | | // Check for errors reported by tablet server. |
312 | 4.93M | for (size_t op_index = 0; op_index < active_op_count_; op_index++3.22M ) { |
313 | 3.22M | auto& pgsql_op = pgsql_ops_[op_index]; |
314 | 3.22M | auto& response = pgsql_op->response(); |
315 | | // Get total number of rows that are operated on. |
316 | 3.22M | rows_affected_count_ += response.rows_affected_count(); |
317 | | |
318 | | // A single batch of requests almost always is directed to fetch data from a single tablet. |
319 | | // However, when tablets split, data can be sharded/distributed across multiple tablets. |
320 | | // Due to automatic tablet splitting, there can exist scenarios where a pgsql_operation prepares |
321 | | // a request for a single tablet before the split occurs and then a tablet can be split to |
322 | | // multiple tablets. Hence, a single pgsql_op would potentially end up fetching data from |
323 | | // multiple tablets. |
324 | | // |
325 | | // For example consider a query select * from table where i=1 order by j; |
326 | | // where there exists a secondary index table_i_j_idx (i HASH, j ASC). Since, there is an |
327 | | // ordering constraint, the ybctids are ordered on j; |
328 | | // ybctid[partition 1] contains (ybctid_1, ybctid_3, ybctid_4) |
329 | | // ybctid[partition 2] contains (ybctid_2, ybctid_6) |
330 | | // ybctid[partition 3] contains (ybctid_5, ybctid_7) |
331 | | // |
332 | | // say partition 1 splits into partition 1.1 and partition 1.2 |
333 | | // ybctid[partition 1.1] contains (ybctid_1, ybctid_4) -> pgsql_op for partition 1 |
334 | | // ybctid[partition 1.2] contains (ybctid_3) -> pgsql_op partition 1 with paging state |
335 | | // |
336 | | // In order to match the ordering constraints between the request and the responses, we |
337 | | // obtain the orders of requests executed in each partition and send it along with the responses |
338 | | // so that pg_gate can send responses to the postgres layer in the correct order. |
339 | | |
340 | | // Get contents. |
341 | 3.22M | auto& rows_data = pgsql_ops_[op_index]->rows_data(); |
342 | 3.22M | if (rows_data) { |
343 | 3.22M | if (no_sorting_order) { |
344 | 3.22M | result.emplace_back(std::move(rows_data)); |
345 | 3.22M | } else { |
346 | 6.33k | const auto& batch_orders = pgsql_op->response().batch_orders(); |
347 | 6.33k | if (!batch_orders.empty()) { |
348 | 5.90k | result.emplace_back(std::move(pgsql_op->rows_data()), |
349 | 5.90k | std::list<int64_t>(batch_orders.begin(), batch_orders.end())); |
350 | 5.90k | } else { |
351 | 429 | result.emplace_back(std::move(rows_data), std::move(batch_row_orders_[op_index])); |
352 | 429 | } |
353 | 6.33k | } |
354 | 3.22M | } |
355 | 3.22M | } |
356 | | |
357 | 1.71M | return result; |
358 | 1.71M | } |
359 | | |
360 | 8.92M | uint64_t& PgDocOp::GetReadTime() { |
361 | 8.92M | return (read_time_ || !exec_params_.statement_read_time8.90M ) |
362 | 8.92M | ? read_time_8.36M : *exec_params_.statement_read_time556k ; |
363 | 8.92M | } |
364 | | |
365 | 8.92M | Status PgDocOp::CreateRequests() { |
366 | 8.92M | if (!request_population_completed_) { |
367 | 8.79M | if (VERIFY_RESULT(DoCreateRequests())) { |
368 | 8.79M | request_population_completed_ = true; |
369 | 8.79M | } |
370 | 8.79M | } |
371 | 0 | return CompleteRequests(); |
372 | 8.92M | } |
373 | | |
374 | 5.58k | Status PgDocOp::PopulateDmlByYbctidOps(const std::vector<Slice>& ybctids) { |
375 | 5.58k | RETURN_NOT_OK(DoPopulateDmlByYbctidOps(ybctids)); |
376 | 5.58k | request_population_completed_ = true; |
377 | 5.58k | return CompleteRequests(); |
378 | 5.58k | } |
379 | | |
380 | 8.92M | Status PgDocOp::CompleteRequests() { |
381 | 20.5M | for (size_t i = 0; i != active_op_count_; ++i11.5M ) { |
382 | 11.5M | RETURN_NOT_OK(pgsql_ops_[i]->InitPartitionKey(*table_)); |
383 | 11.5M | } |
384 | 8.92M | return Status::OK(); |
385 | 8.92M | } |
386 | | |
387 | | //------------------------------------------------------------------------------------------------- |
388 | | |
389 | | PgDocReadOp::PgDocReadOp(const PgSession::ScopedRefPtr& pg_session, |
390 | | PgTable* table, |
391 | | PgsqlReadOpPtr read_op) |
392 | 1.59M | : PgDocOp(pg_session, table), read_op_(std::move(read_op)) { |
393 | 1.59M | } |
394 | | |
395 | 1.59M | Status PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) { |
396 | 1.59M | SCHECK(pgsql_ops_.empty(), |
397 | 1.59M | IllegalState, |
398 | 1.59M | "Exec params can't be changed for already created operations"); |
399 | 1.59M | RETURN_NOT_OK(PgDocOp::ExecuteInit(exec_params)); |
400 | | |
401 | 1.59M | read_op_->read_request().set_return_paging_state(true); |
402 | | // TODO(10696): This is probably the only place in pg_doc_op where pg_session is being |
403 | | // used as a source of truth. All other uses treat it as stateless. Refactor to move this |
404 | | // state elsewhere. |
405 | 1.59M | if (pg_session_->ShouldUseFollowerReads()) { |
406 | 27 | read_op_->set_read_from_followers(); |
407 | 27 | } |
408 | 1.59M | SetRequestPrefetchLimit(); |
409 | 1.59M | SetBackfillSpec(); |
410 | 1.59M | SetRowMark(); |
411 | 1.59M | SetReadTime(); |
412 | 1.59M | return Status::OK(); |
413 | 1.59M | } |
414 | | |
415 | 1.65M | Result<std::list<PgDocResult>> PgDocReadOp::ProcessResponseImpl() { |
416 | | // Process result from tablet server and check result status. |
417 | 1.65M | auto result = VERIFY_RESULT(ProcessResponseResult()); |
418 | | |
419 | | // Process paging state and check status. |
420 | 1.65M | RETURN_NOT_OK(ProcessResponseReadStates()); |
421 | 1.65M | return result; |
422 | 1.65M | } |
423 | | |
424 | 1.58M | Result<bool> PgDocReadOp::DoCreateRequests() { |
425 | | // All information from the SQL request has been collected and setup. This code populate |
426 | | // Protobuf requests before sending them to DocDB. For performance reasons, requests are |
427 | | // constructed differently for different statement. |
428 | 1.58M | if (read_op_->read_request().has_sampling_state()) { |
429 | 178 | VLOG(1) << __PRETTY_FUNCTION__ << ": Preparing sampling requests "0 ; |
430 | 178 | return PopulateSamplingOps(); |
431 | | |
432 | | // Requests pushing down aggregates and/or filter expression tend to do more work on DocDB side, |
433 | | // so it takes longer to return responses, and Postgres side has generally less work to do. |
434 | | // Hence we optimize by sending multiple parallel requests to the nodes, allowing their |
435 | | // simultaneous processing. |
436 | | // Effect may be less than expected if the nodes are already heavily loaded and CPU consumption |
437 | | // is high, or selectivity of the filter is low. |
438 | 1.58M | } else if (read_op_->read_request().is_aggregate() || |
439 | 1.58M | read_op_->read_request().where_clauses_size() > 01.57M ) { |
440 | 6.05k | return PopulateParallelSelectOps(); |
441 | | |
442 | 1.57M | } else if (read_op_->read_request().partition_column_values_size() > 0) { |
443 | | // Optimization for multiple hash keys. |
444 | | // - SELECT * FROM sql_table WHERE hash_c1 IN (1, 2, 3) AND hash_c2 IN (4, 5, 6); |
445 | | // - Multiple requests for differrent hash permutations / keys. |
446 | 289k | return PopulateNextHashPermutationOps(); |
447 | | |
448 | 1.28M | } else { |
449 | | // No optimization. |
450 | 1.28M | if (exec_params_.partition_key != nullptr) { |
451 | 2.25k | RETURN_NOT_OK(SetScanPartitionBoundary()); |
452 | 2.25k | } |
453 | 1.28M | pgsql_ops_.emplace_back(read_op_); |
454 | 1.28M | pgsql_ops_.back()->set_active(true); |
455 | 1.28M | active_op_count_ = 1; |
456 | 1.28M | return true; |
457 | 1.28M | } |
458 | 1.58M | } |
459 | | |
460 | 5.58k | Status PgDocReadOp::DoPopulateDmlByYbctidOps(const std::vector<Slice>& ybctids) { |
461 | | // This function is called only when ybctids were returned from INDEX. |
462 | | // |
463 | | // NOTE on a typical process. |
464 | | // 1- Statement: |
465 | | // SELECT xxx FROM <table> WHERE ybctid IN (SELECT ybctid FROM INDEX); |
466 | | // |
467 | | // 2- Select 1024 ybctids (prefetch limit) from INDEX. |
468 | | // |
469 | | // 3- ONLY ONE TIME: Create a batch of operators, one per partition. |
470 | | // * Each operator has a clone requests from template_op_. |
471 | | // * We will reuse the created operators & requests for the future batches of 1024 ybctids. |
472 | | // * Certain fields in the protobuf requests MUST BE RESET for each batches. |
473 | | // |
474 | | // 4- Assign the selected 1024 ybctids to the batch of operators. |
475 | | // |
476 | | // 5- Send requests to tablet servers to read data from <tab> associated with ybctid values. |
477 | | // |
478 | | // 6- Repeat step 2 thru 5 for the next batch of 1024 ybctids till done. |
479 | 5.58k | RETURN_NOT_OK(InitializeYbctidOperators()); |
480 | 5.58k | const auto& partition_keys = table_->GetPartitions(); |
481 | | |
482 | | // Begin a batch of ybctids. |
483 | 5.58k | end_of_data_ = false; |
484 | | // Assign ybctid values. |
485 | 1.54M | for (const Slice& ybctid : ybctids) { |
486 | | // Find partition. The partition index is the boundary order minus 1. |
487 | | // - For hash partitioning, we use hashcode to find the right index. |
488 | | // - For range partitioning, we pass partition key to seek the index. |
489 | 1.54M | SCHECK(ybctid.size() > 0, InternalError, "Invalid ybctid value"); |
490 | | // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after |
491 | | // the following lines? |
492 | 1.54M | const size_t partition = VERIFY_RESULT(table_->FindPartitionIndex(ybctid)); |
493 | 1.54M | SCHECK(partition < table_->GetPartitionCount(), InternalError, |
494 | 1.54M | "Ybctid value is not within partition boundary"); |
495 | | |
496 | | // Assign ybctids to operators. |
497 | 1.54M | auto& read_req = GetReadReq(partition); |
498 | 1.54M | if (!read_req.has_ybctid_column_value()) { |
499 | | // We must set "ybctid_column_value" in the request for two reasons. |
500 | | // - "client::yb_op" uses it to set the hash_code. |
501 | | // - Rolling upgrade: Older server will read only "ybctid_column_value" as it doesn't know |
502 | | // of ybctid-batching operation. |
503 | 6.37k | pgsql_ops_[partition]->set_active(true); |
504 | 6.37k | read_req.set_is_forward_scan(true); |
505 | 6.37k | read_req.mutable_ybctid_column_value()->mutable_value() |
506 | 6.37k | ->set_binary_value(ybctid.data(), ybctid.size()); |
507 | | |
508 | | // For every read operation set partition boundary. In case a tablet is split between |
509 | | // preparing requests and executing them, docDB will return a paging state for pggate to |
510 | | // contiunue till the end of current tablet is reached. |
511 | 6.37k | std::string upper_bound; |
512 | 6.37k | if (partition < partition_keys.size() - 1) { |
513 | 4.63k | upper_bound = partition_keys[partition + 1]; |
514 | 4.63k | } |
515 | 6.37k | RETURN_NOT_OK(table_->SetScanBoundary(&read_req, |
516 | 6.37k | partition_keys[partition], |
517 | 6.37k | /* lower_bound_is_inclusive */ true, |
518 | 6.37k | upper_bound, |
519 | 6.37k | /* upper_bound_is_inclusive */ false)); |
520 | 6.37k | } |
521 | | |
522 | | // Append ybctid and its order to batch_arguments. |
523 | | // The "ybctid" values are returned in the same order as the row in the IndexTable. To keep |
524 | | // track of this order, each argument is assigned an order-number. |
525 | 1.54M | auto batch_arg = read_req.add_batch_arguments(); |
526 | 1.54M | batch_arg->set_order(batch_row_ordering_counter_); |
527 | 1.54M | batch_arg->mutable_ybctid()->mutable_value()->set_binary_value(ybctid.data(), ybctid.size()); |
528 | | |
529 | | // Remember the order number for each request. |
530 | 1.54M | batch_row_orders_[partition].push_back(batch_row_ordering_counter_); |
531 | | |
532 | | // Increment counter for the next row. |
533 | 1.54M | batch_row_ordering_counter_++; |
534 | 1.54M | } |
535 | | |
536 | | // Done creating request, but not all partition or operator has arguments (inactive). |
537 | 5.58k | MoveInactiveOpsOutside(); |
538 | | |
539 | 5.58k | return Status::OK(); |
540 | 5.58k | } |
541 | | |
542 | 5.59k | Status PgDocReadOp::InitializeYbctidOperators() { |
543 | 5.59k | auto op_count = table_->GetPartitionCount(); |
544 | | |
545 | 5.59k | if (batch_row_orders_.size() == 0) { |
546 | | // First batch: |
547 | | // - Create operators. |
548 | | // - Allocate row orders for each tablet server. |
549 | | // - Protobuf fields in requests are not yet set so not needed to be cleared. |
550 | 4.14k | RETURN_NOT_OK(ClonePgsqlOps(op_count)); |
551 | 4.14k | batch_row_orders_.resize(op_count); |
552 | | |
553 | | // To honor the indexing order of ybctid values, for each batch of ybctid-binds, select all rows |
554 | | // in the batch and then order them before returning result to Postgres layer. |
555 | 4.14k | wait_for_batch_completion_ = true; |
556 | | |
557 | 4.14k | } else { |
558 | | // Second and later batches: Reuse all state variables. |
559 | | // - Clear row orders for this batch to be set later. |
560 | | // - Clear protobuf fields ybctids and others before reusing them in this batch. |
561 | 1.44k | RETURN_NOT_OK(ResetInactivePgsqlOps()); |
562 | 1.44k | } |
563 | 5.59k | return Status::OK(); |
564 | 5.59k | } |
565 | | |
566 | 289k | Result<bool> PgDocReadOp::PopulateNextHashPermutationOps() { |
567 | 289k | RETURN_NOT_OK(InitializeHashPermutationStates()); |
568 | | |
569 | | // Set the index at the start of inactive operators. |
570 | 289k | auto op_count = pgsql_ops_.size(); |
571 | 289k | auto op_index = active_op_count_; |
572 | | |
573 | | // Fill inactive operators with new hash permutations. |
574 | 289k | const size_t hash_column_count = table_->num_hash_key_columns(); |
575 | 3.20M | for (; op_index < op_count && next_permutation_idx_ < total_permutation_count_2.91M ; ++op_index2.91M ) { |
576 | 2.91M | auto& read_req = GetReadReq(op_index); |
577 | 2.91M | pgsql_ops_[op_index]->set_active(true); |
578 | | |
579 | 2.91M | int pos = next_permutation_idx_++; |
580 | 5.83M | for (int c_idx = narrow_cast<int>(hash_column_count); c_idx-- > 0;) { |
581 | 2.92M | int sel_idx = pos % partition_exprs_[c_idx].size(); |
582 | 2.92M | *read_req.mutable_partition_column_values(c_idx) = *partition_exprs_[c_idx][sel_idx]; |
583 | 2.92M | pos /= partition_exprs_[c_idx].size(); |
584 | 2.92M | } |
585 | 2.91M | } |
586 | 289k | active_op_count_ = op_index; |
587 | | |
588 | | // Stop adding requests if we reach the total number of permutations. |
589 | 289k | return next_permutation_idx_ >= total_permutation_count_; |
590 | 289k | } |
591 | | |
592 | | // Collect hash expressions to prepare for generating permutations. |
593 | 290k | Status PgDocReadOp::InitializeHashPermutationStates() { |
594 | | // Return if state variables were initialized. |
595 | 290k | if (!partition_exprs_.empty()) { |
596 | | // Reset the protobuf request before reusing the operators. |
597 | 134 | return ResetInactivePgsqlOps(); |
598 | 134 | } |
599 | | |
600 | | // Initialize partition_exprs_. |
601 | | // Reorganize the input arguments from Postgres to prepre for permutation generation. |
602 | 290k | const size_t hash_column_count = table_->num_hash_key_columns(); |
603 | 290k | partition_exprs_.resize(hash_column_count); |
604 | 580k | for (size_t c_idx = 0; c_idx < hash_column_count; ++c_idx290k ) { |
605 | 290k | const auto& col_expr = read_op_->read_request().partition_column_values( |
606 | 290k | narrow_cast<int>(c_idx)); |
607 | 290k | if (col_expr.has_condition()) { |
608 | 2.71M | for (const auto& expr : col_expr.condition().operands(1).condition().operands()) { |
609 | 2.71M | partition_exprs_[c_idx].push_back(&expr); |
610 | 2.71M | } |
611 | 198k | } else { |
612 | 198k | partition_exprs_[c_idx].push_back(&col_expr); |
613 | 198k | } |
614 | 290k | } |
615 | | |
616 | | // Calculate the total number of permutations to be generated. |
617 | 290k | total_permutation_count_ = 1; |
618 | 290k | for (auto& exprs : partition_exprs_) { |
619 | 290k | total_permutation_count_ *= exprs.size(); |
620 | 290k | } |
621 | | |
622 | | // Create operators, one operation per partition, up to FLAGS_ysql_request_limit. |
623 | | // |
624 | | // TODO(neil) The control variable "ysql_request_limit" should be applied to ALL statements, but |
625 | | // at the moment, the number of operators never exceeds the number of tablets except for hash |
626 | | // permutation operation, so the work on this GFLAG can be done when it is necessary. |
627 | 290k | int max_op_count = std::min(total_permutation_count_, FLAGS_ysql_request_limit); |
628 | 290k | RETURN_NOT_OK(ClonePgsqlOps(max_op_count)); |
629 | | |
630 | | // Clear the original partition expressions as it will be replaced with hash permutations. |
631 | 3.21M | for (int op_index = 0; 290k op_index < max_op_count; op_index++2.92M ) { |
632 | 2.92M | auto& read_request = GetReadReq(op_index); |
633 | 2.92M | read_request.clear_partition_column_values(); |
634 | 5.84M | for (size_t i = 0; i < hash_column_count; ++i2.92M ) { |
635 | 2.92M | read_request.add_partition_column_values(); |
636 | 2.92M | } |
637 | 2.92M | pgsql_ops_[op_index]->set_active(false); |
638 | 2.92M | } |
639 | | |
640 | | // Initialize counters. |
641 | 290k | next_permutation_idx_ = 0; |
642 | 290k | active_op_count_ = 0; |
643 | | |
644 | 290k | return Status::OK(); |
645 | 290k | } |
646 | | |
647 | 6.01k | Result<bool> PgDocReadOp::PopulateParallelSelectOps() { |
648 | | // Create batch operators, one per partition, to execute in parallel. |
649 | | // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after |
650 | | // the following line? |
651 | 6.01k | RETURN_NOT_OK(ClonePgsqlOps(table_->GetPartitionCount())); |
652 | | // Set "pararallelism_level_" to control how many operators can be sent at one time. |
653 | | // |
654 | | // TODO(neil) The calculation for this control variable should be applied to ALL operators, but |
655 | | // the following calculation needs to be refined before it can be used for all statements. |
656 | 6.01k | auto parallelism_level = FLAGS_ysql_select_parallelism; |
657 | 6.03k | if (parallelism_level < 06.01k ) { |
658 | 6.03k | int tserver_count = VERIFY_RESULT(pg_session_->TabletServerCount(true /* primary_only */)); |
659 | | |
660 | | // Establish lower and upper bounds on parallelism. |
661 | 0 | int kMinParSelParallelism = 1; |
662 | 6.03k | int kMaxParSelParallelism = 16; |
663 | 6.03k | parallelism_level_ = |
664 | 6.03k | std::min(std::max(tserver_count * 2, kMinParSelParallelism), kMaxParSelParallelism); |
665 | 18.4E | } else { |
666 | 18.4E | parallelism_level_ = parallelism_level; |
667 | 18.4E | } |
668 | | |
669 | | // Assign partitions to operators. |
670 | 6.01k | const auto& partition_keys = table_->GetPartitions(); |
671 | 6.01k | SCHECK_EQ(partition_keys.size(), pgsql_ops_.size(), IllegalState, |
672 | 6.01k | "Number of partitions and number of partition keys are not the same"); |
673 | | |
674 | 24.6k | for (size_t partition = 0; 6.01k partition < partition_keys.size(); partition++18.6k ) { |
675 | | // Construct a new YBPgsqlReadOp. |
676 | 18.6k | pgsql_ops_[partition]->set_active(true); |
677 | | |
678 | | // Use partition index to setup the protobuf to identify the partition that this request |
679 | | // is for. Batcher will use this information to send the request to correct tablet server, and |
680 | | // server uses this information to operate on correct tablet. |
681 | | // - Range partition uses range partition key to identify partition. |
682 | | // - Hash partition uses "next_partition_key" and "max_hash_code" to identify partition. |
683 | 18.6k | string upper_bound; |
684 | 18.6k | if (partition < partition_keys.size() - 1) { |
685 | 12.6k | upper_bound = partition_keys[partition + 1]; |
686 | 12.6k | } |
687 | 18.6k | RETURN_NOT_OK(table_->SetScanBoundary(&GetReadReq(partition), |
688 | 18.6k | partition_keys[partition], |
689 | 18.6k | true /* lower_bound_is_inclusive */, |
690 | 18.6k | upper_bound, |
691 | 18.6k | false /* upper_bound_is_inclusive */)); |
692 | 18.6k | } |
693 | 6.01k | active_op_count_ = partition_keys.size(); |
694 | | |
695 | 6.01k | return true; |
696 | 6.01k | } |
697 | | |
698 | 178 | Result<bool> PgDocReadOp::PopulateSamplingOps() { |
699 | | // Create one PgsqlOp per partition |
700 | 178 | RETURN_NOT_OK(ClonePgsqlOps(table_->GetPartitionCount())); |
701 | | // Partitions are sampled sequentially, one at a time |
702 | 178 | parallelism_level_ = 1; |
703 | | // Assign partitions to operators. |
704 | 178 | const auto& partition_keys = table_->GetPartitions(); |
705 | 178 | SCHECK_EQ(partition_keys.size(), pgsql_ops_.size(), IllegalState, |
706 | 178 | "Number of partitions and number of partition keys are not the same"); |
707 | | |
708 | | // Bind requests to partitions |
709 | 584 | for (size_t partition = 0; 178 partition < partition_keys.size(); partition++406 ) { |
710 | | // Construct a new YBPgsqlReadOp. |
711 | 406 | pgsql_ops_[partition]->set_active(true); |
712 | | |
713 | | // Use partition index to setup the protobuf to identify the partition that this request |
714 | | // is for. Batcher will use this information to send the request to correct tablet server, and |
715 | | // server uses this information to operate on correct tablet. |
716 | | // - Range partition uses range partition key to identify partition. |
717 | | // - Hash partition uses "next_partition_key" and "max_hash_code" to identify partition. |
718 | 406 | string upper_bound; |
719 | 406 | if (partition < partition_keys.size() - 1) { |
720 | 228 | upper_bound = partition_keys[partition + 1]; |
721 | 228 | } |
722 | 406 | RETURN_NOT_OK(table_->SetScanBoundary(&GetReadReq(partition), |
723 | 406 | partition_keys[partition], |
724 | 406 | true /* lower_bound_is_inclusive */, |
725 | 406 | upper_bound, |
726 | 406 | false /* upper_bound_is_inclusive */)); |
727 | 406 | } |
728 | 178 | active_op_count_ = partition_keys.size(); |
729 | 178 | VLOG(1) << "Number of partitions to sample: " << active_op_count_0 ; |
730 | | // If we have big enough sample after processing some partitions we skip the rest. |
731 | | // By shuffling partitions we randomly select the partition(s) to sample. |
732 | 178 | std::random_shuffle(pgsql_ops_.begin(), pgsql_ops_.end()); |
733 | | |
734 | 178 | return true; |
735 | 178 | } |
736 | | |
737 | 178 | Status PgDocReadOp::GetEstimatedRowCount(double *liverows, double *deadrows) { |
738 | 178 | if (liverows != nullptr) { |
739 | | // Return estimated number of live tuples |
740 | 178 | VLOG(1) << "Returning liverows " << sample_rows_0 ; |
741 | 178 | *liverows = sample_rows_; |
742 | 178 | } |
743 | 178 | if (deadrows != nullptr) { |
744 | | // TODO count dead tuples while sampling |
745 | 178 | *deadrows = 0; |
746 | 178 | } |
747 | 178 | return Status::OK(); |
748 | 178 | } |
749 | | |
750 | | // When postgres requests to scan a specific partition, set the partition parameter accordingly. |
751 | 2.25k | Status PgDocReadOp::SetScanPartitionBoundary() { |
752 | | // Boundary to scan from a given key to the end of its associated tablet. |
753 | | // - Lower: The given partition key (inclusive). |
754 | | // - Upper: Beginning of next tablet (not inclusive). |
755 | 2.25k | SCHECK(exec_params_.partition_key != nullptr, Uninitialized, "expected non-null partition_key"); |
756 | | |
757 | | // Seek the tablet of the given key. |
758 | | // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after |
759 | | // the following line? |
760 | 2.25k | const std::vector<std::string>& partition_keys = table_->GetPartitions(); |
761 | 2.25k | const auto& partition_key = std::find( |
762 | 2.25k | partition_keys.begin(), |
763 | 2.25k | partition_keys.end(), |
764 | 2.25k | a2b_hex(exec_params_.partition_key)); |
765 | 2.25k | RSTATUS_DCHECK( |
766 | 2.25k | partition_key != partition_keys.end(), InvalidArgument, "invalid partition key given"); |
767 | | |
768 | | // Seek upper bound (Beginning of next tablet). |
769 | 2.25k | string upper_bound; |
770 | 2.25k | const auto& next_partition_key = std::next(partition_key, 1); |
771 | 2.25k | if (next_partition_key != partition_keys.end()) { |
772 | 1.62k | upper_bound = *next_partition_key; |
773 | 1.62k | } |
774 | 2.25k | RETURN_NOT_OK(table_->SetScanBoundary( |
775 | 2.25k | &read_op_->read_request(), *partition_key, true /* lower_bound_is_inclusive */, upper_bound, |
776 | 2.25k | false /* upper_bound_is_inclusive */)); |
777 | 2.25k | return Status::OK(); |
778 | 2.25k | } |
779 | | |
780 | 1.65M | Status PgDocReadOp::ProcessResponseReadStates() { |
781 | | // For each read_op, set up its request for the next batch of data or make it in-active. |
782 | 1.65M | bool has_more_data = false; |
783 | 1.65M | auto send_count = std::min(parallelism_level_, active_op_count_); |
784 | | |
785 | 4.82M | for (size_t op_index = 0; op_index < send_count; op_index++3.17M ) { |
786 | 3.17M | auto& read_op = down_cast<PgsqlReadOp&>(*pgsql_ops_[op_index]); |
787 | 3.17M | RETURN_NOT_OK(ReviewResponsePagingState(*table_, &read_op)); |
788 | | |
789 | | // Check for completion. |
790 | 3.17M | bool has_more_arg = false; |
791 | 3.17M | auto& res = read_op.response(); |
792 | 3.17M | auto& req = read_op.read_request(); |
793 | | |
794 | | // Save the backfill_spec if tablet server wants to return it. |
795 | 3.17M | if (res.is_backfill_batch_done()) { |
796 | 2.25k | out_param_backfill_spec_ = res.backfill_spec(); |
797 | 3.16M | } else if (res.has_paging_state()) { |
798 | 141k | has_more_arg = true; |
799 | | |
800 | | // Set up paging state for next request. |
801 | | // A query request can be nested, and paging state belong to the innermost query which is |
802 | | // the read operator that is operated first and feeds data to other queries. |
803 | | // Recursive Proto Message: |
804 | | // PgsqlReadRequestPB { PgsqlReadRequestPB index_request; } |
805 | 141k | PgsqlReadRequestPB *innermost_req = &req; |
806 | 141k | while (innermost_req->has_index_request()) { |
807 | 167 | innermost_req = innermost_req->mutable_index_request(); |
808 | 167 | } |
809 | 141k | *innermost_req->mutable_paging_state() = std::move(*res.mutable_paging_state()); |
810 | 141k | if (innermost_req->paging_state().has_read_time()) { |
811 | 62.4k | read_op.set_read_time(ReadHybridTime::FromPB(innermost_req->paging_state().read_time())); |
812 | 62.4k | } |
813 | | |
814 | | // Setup backfill_spec for the next request. |
815 | 141k | if (res.has_backfill_spec()) { |
816 | 79 | *innermost_req->mutable_backfill_spec() = std::move(*res.mutable_backfill_spec()); |
817 | 79 | } |
818 | | |
819 | | // Parse/Analysis/Rewrite catalog version has already been checked on the first request. |
820 | | // The docdb layer will check the target table's schema version is compatible. |
821 | | // This allows long-running queries to continue in the presence of other DDL statements |
822 | | // as long as they do not affect the table(s) being queried. |
823 | 141k | req.clear_ysql_catalog_version(); |
824 | 141k | } |
825 | | |
826 | | // Check for batch execution. |
827 | 3.17M | if (res.batch_arg_count() < req.batch_arguments_size()) { |
828 | 0 | has_more_arg = true; |
829 | | |
830 | | // Delete the executed arguments from batch and keep those that haven't been executed. |
831 | 0 | req.mutable_batch_arguments()->DeleteSubrange( |
832 | 0 | 0, narrow_cast<int32_t>(res.batch_arg_count())); |
833 | | |
834 | | // Due to rolling upgrade reason, we must copy the first batch_arg to the scalar arg. |
835 | 0 | FormulateRequestForRollingUpgrade(&req); |
836 | 0 | } |
837 | | |
838 | 3.17M | if (res.has_sampling_state()) { |
839 | 406 | VLOG(1) << "Received sampling state:" |
840 | 0 | << " samplerows: " << res.mutable_sampling_state()->samplerows() |
841 | 0 | << " rowstoskip: " << res.mutable_sampling_state()->rowstoskip() |
842 | 0 | << " rstate_w: " << res.mutable_sampling_state()->rstate_w() |
843 | 0 | << " rand_state: " << res.mutable_sampling_state()->rand_state(); |
844 | 406 | if (has_more_arg) { |
845 | | // Copy sampling state from the response to the request, to properly continue to sample |
846 | | // the next block. |
847 | 0 | *req.mutable_sampling_state() = std::move(*res.mutable_sampling_state()); |
848 | 406 | } else { |
849 | | // Partition sampling is completed. |
850 | | // If samplerows is greater than or equal to targrows the sampling is complete. There are |
851 | | // enough rows selected to calculate stats and we can estimate total number of rows in the |
852 | | // table by extrapolating samplerows to the partitions that have not been scanned. |
853 | | // If samplerows is less than targrows next partition needs to be sampled. Next pgdoc_op |
854 | | // already has sampling state copied from the template_op_, only couple fields need to be |
855 | | // updated: numrows and samplerows. The targrows never changes, and in reservoir population |
856 | | // phase (before samplerows reaches targrows) 1. numrows and samplerows are always equal; |
857 | | // and 2. random numbers never generated, so random state remains the same. |
858 | | // That essentially means the only thing we need to collect from the partition's final |
859 | | // sampling state is the samplerows. We use that number to either estimate liverows, or to |
860 | | // update numrows and samplerows in next partition's sampling state. |
861 | 406 | sample_rows_ = res.mutable_sampling_state()->samplerows(); |
862 | 406 | } |
863 | 406 | } |
864 | | |
865 | 3.17M | if (has_more_arg) { |
866 | 141k | has_more_data = true; |
867 | 3.03M | } else { |
868 | 3.03M | read_op.set_active(false); |
869 | 3.03M | } |
870 | 3.17M | } |
871 | | |
872 | 1.65M | if (has_more_data || send_count < active_op_count_1.51M ) { |
873 | | // Move inactive ops to the end of pgsql_ops_ to make room for new set of arguments. |
874 | 142k | MoveInactiveOpsOutside(); |
875 | 142k | end_of_data_ = false; |
876 | 1.51M | } else { |
877 | | // There should be no active op left in queue. |
878 | 1.51M | active_op_count_ = 0; |
879 | 1.51M | end_of_data_ = request_population_completed_; |
880 | 1.51M | } |
881 | | |
882 | 1.65M | if (active_op_count_ > 0 && read_op_->read_request().has_sampling_state()142k ) { |
883 | 228 | auto& read_op = down_cast<PgsqlReadOp&>(*pgsql_ops_[0]); |
884 | 228 | PgsqlReadRequestPB *req = &read_op.read_request(); |
885 | 228 | if (!req->has_paging_state()) { |
886 | | // Current sampling op without paging state means that previous one was completed and moved |
887 | | // outside. |
888 | 228 | auto sampling_state = req->mutable_sampling_state(); |
889 | 228 | if (sample_rows_ < sampling_state->targrows()) { |
890 | | // More sample rows are needed, update sampling state and let next partition be scanned |
891 | 228 | VLOG(1) << "Continue sampling next partition from " << sample_rows_0 ; |
892 | 228 | sampling_state->set_numrows(static_cast<int32>(sample_rows_)); |
893 | 228 | sampling_state->set_samplerows(sample_rows_); |
894 | 228 | } else { |
895 | | // Have enough of sample rows, estimate total table rows assuming they are evenly |
896 | | // distributed between partitions |
897 | 0 | auto completed_ops = pgsql_ops_.size() - active_op_count_; |
898 | 0 | sample_rows_ = floor((sample_rows_ / completed_ops) * pgsql_ops_.size() + 0.5); |
899 | 0 | VLOG(1) << "Done sampling, prorated rowcount is " << sample_rows_; |
900 | 0 | end_of_data_ = true; |
901 | 0 | } |
902 | 228 | } |
903 | 228 | } |
904 | | |
905 | 1.65M | return Status::OK(); |
906 | 1.65M | } |
907 | | |
908 | 1.58M | void PgDocReadOp::SetRequestPrefetchLimit() { |
909 | | // Predict the maximum prefetch-limit using the associated gflags. |
910 | 1.58M | PgsqlReadRequestPB& req = read_op_->read_request(); |
911 | 1.58M | auto predicted_limit = FLAGS_ysql_prefetch_limit; |
912 | 1.58M | if (!req.is_forward_scan()) { |
913 | | // Backward scan is slower than forward scan, so predicted limit is a smaller number. |
914 | 63 | predicted_limit = predicted_limit * FLAGS_ysql_backward_prefetch_scale_factor; |
915 | 63 | } |
916 | | |
917 | | // System setting has to be at least 1 while user setting (LIMIT clause) can be anything that |
918 | | // is allowed by SQL semantics. |
919 | 1.58M | if (predicted_limit < 1) { |
920 | 0 | predicted_limit = 1; |
921 | 0 | } |
922 | | |
923 | | // Use statement LIMIT(count + offset) if it is smaller than the predicted limit. |
924 | 1.58M | auto limit = exec_params_.limit_count + exec_params_.limit_offset; |
925 | 1.58M | suppress_next_result_prefetching_ = true; |
926 | 1.58M | if (exec_params_.limit_use_default || limit > predicted_limit25.0k ) { |
927 | 1.56M | limit = predicted_limit; |
928 | 1.56M | suppress_next_result_prefetching_ = false; |
929 | 1.56M | } |
930 | 18.4E | VLOG(3) << __func__ |
931 | 18.4E | << " exec_params_.limit_count=" << exec_params_.limit_count |
932 | 18.4E | << " exec_params_.limit_offset=" << exec_params_.limit_offset |
933 | 18.4E | << " exec_params_.limit_use_default=" << exec_params_.limit_use_default |
934 | 18.4E | << " predicted_limit=" << predicted_limit |
935 | 18.4E | << " limit=" << limit; |
936 | 1.58M | req.set_limit(limit); |
937 | 1.58M | } |
938 | | |
939 | 1.58M | void PgDocReadOp::SetRowMark() { |
940 | 1.58M | auto& req = read_op_->read_request(); |
941 | 1.58M | const auto row_mark_type = GetRowMarkType(&exec_params_); |
942 | 1.58M | if (IsValidRowMarkType(row_mark_type)) { |
943 | 8.49k | req.set_row_mark_type(row_mark_type); |
944 | 8.49k | req.set_wait_policy(static_cast<yb::WaitPolicy>(exec_params_.wait_policy)); |
945 | 1.58M | } else { |
946 | 1.58M | req.clear_row_mark_type(); |
947 | 1.58M | } |
948 | 1.58M | } |
949 | | |
950 | 1.58M | void PgDocReadOp::SetBackfillSpec() { |
951 | 1.58M | PgsqlReadRequestPB& req = read_op_->read_request(); |
952 | | |
953 | 1.58M | if (exec_params_.bfinstr) { |
954 | 2.25k | req.set_backfill_spec(exec_params_.bfinstr, strlen(exec_params_.bfinstr)); |
955 | 1.58M | } else { |
956 | 1.58M | req.clear_backfill_spec(); |
957 | 1.58M | } |
958 | 1.58M | } |
959 | | |
960 | 1.58M | void PgDocReadOp::SetReadTime() { |
961 | 1.58M | if (exec_params_.is_index_backfill) { |
962 | 2.25k | read_op_->read_request().set_is_for_backfill(true); |
963 | 2.25k | read_op_->set_read_time(ReadHybridTime::FromUint64(GetReadTime())); |
964 | 2.25k | } |
965 | 1.58M | } |
966 | | |
967 | 1.53k | Status PgDocReadOp::ResetInactivePgsqlOps() { |
968 | | // Clear the existing ybctids. |
969 | 6.64k | for (auto op_index = active_op_count_; op_index < pgsql_ops_.size(); op_index++5.10k ) { |
970 | 5.10k | PgsqlReadRequestPB& read_req = GetReadReq(op_index); |
971 | 5.10k | read_req.clear_ybctid_column_value(); |
972 | 5.10k | read_req.clear_batch_arguments(); |
973 | 5.10k | read_req.clear_hash_code(); |
974 | 5.10k | read_req.clear_max_hash_code(); |
975 | 5.10k | read_req.clear_paging_state(); |
976 | 5.10k | read_req.clear_lower_bound(); |
977 | 5.10k | read_req.clear_upper_bound(); |
978 | 5.10k | } |
979 | | |
980 | | // Clear row orders. |
981 | 1.53k | if (batch_row_orders_.size() > 0) { |
982 | 5.57k | for (auto op_index = active_op_count_; op_index < pgsql_ops_.size(); op_index++4.16k ) { |
983 | 4.16k | batch_row_orders_[op_index].clear(); |
984 | 4.16k | } |
985 | 1.40k | } |
986 | | |
987 | 1.53k | return Status::OK(); |
988 | 1.53k | } |
989 | | |
990 | 0 | void PgDocReadOp::FormulateRequestForRollingUpgrade(PgsqlReadRequestPB *read_req) { |
991 | | // Copy first batch-argument to scalar arg because older server does not support batch arguments. |
992 | 0 | const auto& batch_arg = read_req->batch_arguments(0); |
993 | |
|
994 | 0 | if (batch_arg.has_ybctid()) { |
995 | 0 | *read_req->mutable_ybctid_column_value() = std::move(batch_arg.ybctid()); |
996 | 0 | } |
997 | |
|
998 | 0 | if (batch_arg.partition_column_values_size() > 0) { |
999 | 0 | read_req->set_hash_code(batch_arg.hash_code()); |
1000 | 0 | read_req->set_max_hash_code(batch_arg.max_hash_code()); |
1001 | 0 | *read_req->mutable_partition_column_values() = |
1002 | 0 | std::move(read_req->batch_arguments(0).partition_column_values()); |
1003 | 0 | } |
1004 | 0 | } |
1005 | | |
1006 | 7.41M | PgsqlReadRequestPB& PgDocReadOp::GetReadReq(size_t op_index) { |
1007 | 7.41M | return down_cast<PgsqlReadOp&>(*pgsql_ops_[op_index]).read_request(); |
1008 | 7.41M | } |
1009 | | |
1010 | | //-------------------------------------------------------------------------------------------------- |
1011 | | |
1012 | | PgDocWriteOp::PgDocWriteOp(const PgSession::ScopedRefPtr& pg_session, |
1013 | | PgTable* table, |
1014 | | PgsqlWriteOpPtr write_op) |
1015 | 7.20M | : PgDocOp(pg_session, table), write_op_(std::move(write_op)) { |
1016 | 7.20M | } |
1017 | | |
1018 | 53.4k | Result<std::list<PgDocResult>> PgDocWriteOp::ProcessResponseImpl() { |
1019 | | // Process result from tablet server and check result status. |
1020 | 53.4k | auto result = VERIFY_RESULT(ProcessResponseResult()); |
1021 | | |
1022 | | // End execution and return result. |
1023 | 0 | end_of_data_ = true; |
1024 | 53.4k | VLOG(1) << __PRETTY_FUNCTION__ << ": Received response for request " << this0 ; |
1025 | 53.4k | return result; |
1026 | 53.4k | } |
1027 | | |
1028 | 7.20M | Result<bool> PgDocWriteOp::DoCreateRequests() { |
1029 | | // Setup a singular operator. |
1030 | 7.20M | pgsql_ops_.push_back(write_op_); |
1031 | 7.20M | pgsql_ops_.back()->set_active(true); |
1032 | 7.20M | active_op_count_ = 1; |
1033 | | |
1034 | | // Log non buffered request. |
1035 | 7.20M | VLOG_IF(1, response_.Valid()) << __PRETTY_FUNCTION__ << ": Sending request for " << this705 ; |
1036 | 7.20M | return true; |
1037 | 7.20M | } |
1038 | | |
1039 | 371k | void PgDocWriteOp::SetWriteTime(const HybridTime& write_time) { |
1040 | 371k | write_op_->SetWriteTime(write_time); |
1041 | 371k | } |
1042 | | |
1043 | 0 | PgsqlWriteRequestPB& PgDocWriteOp::GetWriteOp(int op_index) { |
1044 | 0 | return down_cast<PgsqlWriteOp&>(*pgsql_ops_[op_index]).write_request(); |
1045 | 0 | } |
1046 | | |
1047 | | } // namespace pggate |
1048 | | } // namespace yb |