YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_doc_op.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/pggate/pg_doc_op.h"
16
17
#include <algorithm>
18
#include <memory>
19
#include <string>
20
#include <utility>
21
#include <vector>
22
23
#include "yb/common/row_mark.h"
24
25
#include "yb/gutil/casts.h"
26
#include "yb/gutil/strings/escaping.h"
27
28
#include "yb/util/status_format.h"
29
#include "yb/util/status_log.h"
30
31
#include "yb/yql/pggate/pg_expr.h"
32
#include "yb/yql/pggate/pg_table.h"
33
#include "yb/yql/pggate/pg_tools.h"
34
#include "yb/yql/pggate/pggate_flags.h"
35
#include "yb/yql/pggate/util/pg_doc_data.h"
36
37
using std::lower_bound;
38
using std::list;
39
using std::vector;
40
using std::shared_ptr;
41
using std::make_shared;
42
using std::unique_ptr;
43
using std::move;
44
45
using yb::client::YBPgsqlOp;
46
using yb::client::YBPgsqlReadOp;
47
using yb::client::YBPgsqlWriteOp;
48
49
namespace yb {
50
namespace pggate {
51
52
3.22M
PgDocResult::PgDocResult(rpc::SidecarPtr&& data) : data_(std::move(data)) {
53
3.22M
  PgDocData::LoadCache(data_, &row_count_, &row_iterator_);
54
3.22M
}
55
56
PgDocResult::PgDocResult(rpc::SidecarPtr&& data, std::list<int64_t>&& row_orders)
57
6.34k
    : data_(std::move(data)), row_orders_(move(row_orders)) {
58
6.34k
  PgDocData::LoadCache(data_, &row_count_, &row_iterator_);
59
6.34k
}
60
61
3.22M
PgDocResult::~PgDocResult() {
62
3.22M
}
63
64
52.4M
int64_t PgDocResult::NextRowOrder() {
65
52.4M
  return row_orders_.size() > 0 ? 
row_orders_.front()1.57M
:
-150.8M
;
66
52.4M
}
67
68
Status PgDocResult::WritePgTuple(const std::vector<PgExpr*>& targets, PgTuple *pg_tuple,
69
52.3M
                                 int64_t *row_order) {
70
52.3M
  int attr_num = 0;
71
649M
  for (const PgExpr *target : targets) {
72
649M
    if (!target->is_colref() && 
!target->is_aggregate()15.5k
) {
73
0
      return STATUS(InternalError,
74
0
                    "Unexpected expression, only column refs or aggregates supported here");
75
0
    }
76
649M
    if (target->opcode() == PgColumnRef::Opcode::PG_EXPR_COLREF) {
77
648M
      attr_num = static_cast<const PgColumnRef *>(target)->attr_num();
78
648M
    } else {
79
85.1k
      attr_num++;
80
85.1k
    }
81
82
649M
    PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_);
83
649M
    target->TranslateData(&row_iterator_, header, attr_num - 1, pg_tuple);
84
649M
  }
85
86
52.3M
  if (row_orders_.size()) {
87
1.44M
    *row_order = row_orders_.front();
88
1.44M
    row_orders_.pop_front();
89
50.8M
  } else {
90
50.8M
    *row_order = -1;
91
50.8M
  }
92
52.3M
  return Status::OK();
93
52.3M
}
94
95
5.63k
Status PgDocResult::ProcessSystemColumns() {
96
5.63k
  if (syscol_processed_) {
97
0
    return Status::OK();
98
0
  }
99
5.63k
  syscol_processed_ = true;
100
101
1.38M
  for (int i = 0; i < row_count_; 
i++1.38M
) {
102
1.38M
    PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_);
103
1.38M
    SCHECK(!header.is_null(), InternalError, "System column ybctid cannot be NULL");
104
105
1.38M
    int64_t data_size;
106
1.38M
    size_t read_size = PgDocData::ReadNumber(&row_iterator_, &data_size);
107
1.38M
    row_iterator_.remove_prefix(read_size);
108
109
1.38M
    ybctids_.emplace_back(row_iterator_.data(), data_size);
110
1.38M
    row_iterator_.remove_prefix(data_size);
111
1.38M
  }
112
5.63k
  return Status::OK();
113
5.63k
}
114
115
355
Status PgDocResult::ProcessSparseSystemColumns(std::string *reservoir) {
116
  // Process block sampling result returned from DocDB.
117
  // Results come as (index, ybctid) tuples where index is the position in the reservoir of
118
  // predetermined size. DocDB returns ybctids with sequential indexes first, starting from 0 and
119
  // until reservoir is full. Then it returns ybctids with random indexes, so they replace previous
120
  // ybctids.
121
162k
  for (int i = 0; i < row_count_; 
i++161k
) {
122
    // Read index column
123
161k
    PgWireDataHeader header = PgDocData::ReadDataHeader(&row_iterator_);
124
161k
    SCHECK(!header.is_null(), InternalError, "Reservoir index cannot be NULL");
125
161k
    int32_t index;
126
161k
    size_t read_size = PgDocData::ReadNumber(&row_iterator_, &index);
127
161k
    row_iterator_.remove_prefix(read_size);
128
    // Read ybctid column
129
161k
    header = PgDocData::ReadDataHeader(&row_iterator_);
130
161k
    SCHECK(!header.is_null(), InternalError, "System column ybctid cannot be NULL");
131
161k
    int64_t data_size;
132
161k
    read_size = PgDocData::ReadNumber(&row_iterator_, &data_size);
133
161k
    row_iterator_.remove_prefix(read_size);
134
135
    // Copy ybctid data to the reservoir
136
161k
    reservoir[index].assign(reinterpret_cast<const char *>(row_iterator_.data()), data_size);
137
161k
    row_iterator_.remove_prefix(data_size);
138
161k
  }
139
355
  return Status::OK();
140
355
}
141
142
//--------------------------------------------------------------------------------------------------
143
144
PgDocOp::PgDocOp(const PgSession::ScopedRefPtr& pg_session, PgTable* table)
145
8.80M
    : pg_session_(pg_session), table_(*table) {
146
8.80M
}
147
148
8.80M
PgDocOp::~PgDocOp() {
149
  // Wait for result in case request was sent.
150
  // Operation can be part of transaction it is necessary to complete it before transaction commit.
151
8.80M
  if (response_.Valid()) {
152
7.39k
    auto result = response_.Get();
153
7.39k
    WARN_NOT_OK(result, "Operation completion failed");
154
7.39k
  }
155
8.80M
}
156
157
8.79M
Status PgDocOp::ExecuteInit(const PgExecParameters *exec_params) {
158
8.79M
  end_of_data_ = false;
159
8.79M
  if (exec_params) {
160
464k
    exec_params_ = *exec_params;
161
464k
  }
162
8.79M
  return Status::OK();
163
8.79M
}
164
165
1.99k
const PgExecParameters& PgDocOp::ExecParameters() const {
166
1.99k
  return exec_params_;
167
1.99k
}
168
169
8.79M
Result<RequestSent> PgDocOp::Execute(bool force_non_bufferable) {
170
  // As of 09/25/2018, DocDB doesn't cache or keep any execution state for a statement, so we
171
  // have to call query execution every time.
172
  // - Normal SQL convention: Exec, Fetch, Fetch, ...
173
  // - Our SQL convention: Exec & Fetch, Exec & Fetch, ...
174
  // This refers to the sequence of operations between this layer and the underlying tablet
175
  // server / DocDB layer, not to the sequence of operations between the PostgreSQL layer and this
176
  // layer.
177
8.79M
  exec_status_ = SendRequest(force_non_bufferable);
178
8.79M
  RETURN_NOT_OK(exec_status_);
179
8.79M
  return RequestSent(response_.Valid());
180
8.79M
}
181
182
2.55M
Status PgDocOp::GetResult(list<PgDocResult> *rowsets) {
183
  // If the execution has error, return without reading any rows.
184
2.55M
  RETURN_NOT_OK(exec_status_);
185
186
2.55M
  if (!end_of_data_) {
187
    // Send request now in case prefetching was suppressed.
188
1.76M
    if (suppress_next_result_prefetching_ && 
!response_.Valid()27.6k
) {
189
2.70k
      exec_status_ = SendRequest(true /* force_non_bufferable */);
190
2.70k
      RETURN_NOT_OK(exec_status_);
191
2.70k
    }
192
193
1.76M
    DCHECK(response_.Valid());
194
1.76M
    auto result = response_.Get();
195
1.76M
    auto rows = 
VERIFY_RESULT1.71M
(ProcessResponse(result));1.71M
196
    // In case ProcessResponse doesn't fail with an error
197
    // it should return non empty rows and/or set end_of_data_.
198
0
    DCHECK(!rows.empty() || end_of_data_);
199
1.71M
    rowsets->splice(rowsets->end(), rows);
200
    // Prefetch next portion of data if needed.
201
1.71M
    if (!(end_of_data_ || 
suppress_next_result_prefetching_142k
)) {
202
121k
      exec_status_ = SendRequest(true /* force_non_bufferable */);
203
121k
      RETURN_NOT_OK(exec_status_);
204
121k
    }
205
1.71M
  }
206
207
2.50M
  return Status::OK();
208
2.55M
}
209
210
53.4k
Result<int32_t> PgDocOp::GetRowsAffectedCount() const {
211
53.4k
  RETURN_NOT_OK(exec_status_);
212
53.4k
  DCHECK(end_of_data_);
213
53.4k
  return rows_affected_count_;
214
53.4k
}
215
216
300k
Status PgDocOp::ClonePgsqlOps(size_t op_count) {
217
  // Allocate batch operator, one per partition.
218
300k
  SCHECK(op_count > 0, InternalError, "Table must have at least one partition");
219
300k
  
if (300k
pgsql_ops_.size() < op_count300k
) {
220
300k
    pgsql_ops_.resize(op_count);
221
2.95M
    for (auto& op : pgsql_ops_) {
222
2.95M
      op = CloneFromTemplate();
223
224
      // Initialize as inactive. Turn it on when setup argument for a specific partition.
225
2.95M
      op->set_active(false);
226
2.95M
    }
227
228
    // Set parallism_level_ to maximum possible of operators to be executed at one time.
229
300k
    parallelism_level_ = pgsql_ops_.size();
230
300k
  }
231
232
300k
  return Status::OK();
233
300k
}
234
235
147k
void PgDocOp::MoveInactiveOpsOutside() {
236
  // Move inactive op to the end.
237
147k
  const ssize_t total_op_count = pgsql_ops_.size();
238
147k
  bool has_sorting_order = !batch_row_orders_.empty();
239
147k
  ssize_t left_iter = 0;
240
147k
  ssize_t right_iter = total_op_count - 1;
241
150k
  while (true) {
242
    // Advance left iterator.
243
298k
    while (left_iter < total_op_count && 
pgsql_ops_[left_iter]->is_active()156k
)
left_iter++148k
;
244
245
    // Advance right iterator.
246
160k
    while (
right_iter >= 0160k
&& !pgsql_ops_[right_iter]->is_active())
right_iter--10.3k
;
247
248
    // Move inactive operator to the end by swapping the pointers.
249
150k
    if (left_iter < right_iter) {
250
2.47k
      std::swap(pgsql_ops_[left_iter], pgsql_ops_[right_iter]);
251
2.47k
      if (has_sorting_order) {
252
2.22k
        std::swap(batch_row_orders_[left_iter], batch_row_orders_[right_iter]);
253
2.22k
      }
254
147k
    } else {
255
147k
      break;
256
147k
    }
257
150k
  }
258
259
  // Set active op count.
260
147k
  active_op_count_ = left_iter;
261
147k
}
262
263
8.92M
Status PgDocOp::SendRequest(bool force_non_bufferable) {
264
8.92M
  DCHECK(exec_status_.ok());
265
8.92M
  DCHECK(!response_.Valid());
266
8.92M
  exec_status_ = SendRequestImpl(force_non_bufferable);
267
8.92M
  return exec_status_;
268
8.92M
}
269
270
8.92M
Status PgDocOp::SendRequestImpl(bool force_non_bufferable) {
271
  // Populate collected information into protobuf requests before sending to DocDB.
272
8.92M
  RETURN_NOT_OK(CreateRequests());
273
274
  // Currently, send and receive individual request of a batch is not yet supported
275
  // - Among statements, only queries by BASE-YBCTIDs need to be sent and received in batches
276
  //   to honor the order of how the BASE-YBCTIDs are kept in the database.
277
  // - For other type of statements, it could be more efficient to send them individually.
278
8.92M
  SCHECK(wait_for_batch_completion_, InternalError,
279
8.92M
         "Only send and receive the whole batch is supported");
280
281
  // Send at most "parallelism_level_" number of requests at one time.
282
8.92M
  size_t send_count = std::min(parallelism_level_, active_op_count_);
283
8.92M
  response_ = 
VERIFY_RESULT8.92M
(8.92M
pg_session_->RunAsync(
284
0
      pgsql_ops_.data(), send_count, *table_, &GetReadTime(), force_non_bufferable));
285
286
0
  return Status::OK();
287
8.92M
}
288
289
1.76M
Result<std::list<PgDocResult>> PgDocOp::ProcessResponse(const Status& status) {
290
  // Check operation status.
291
1.76M
  DCHECK(exec_status_.ok());
292
1.76M
  exec_status_ = status;
293
1.76M
  if (exec_status_.ok()) {
294
1.71M
    auto result = ProcessResponseImpl();
295
1.71M
    if (
result.ok()1.71M
) {
296
1.71M
      return result;
297
1.71M
    }
298
18.4E
    exec_status_ = result.status();
299
18.4E
  }
300
50.7k
  return exec_status_;
301
1.76M
}
302
303
1.71M
Result<std::list<PgDocResult>> PgDocOp::ProcessResponseResult() {
304
1.71M
  VLOG
(1) << __PRETTY_FUNCTION__ << ": Received response for request " << this342
;
305
306
  // Process data coming from tablet server.
307
1.71M
  std::list<PgDocResult> result;
308
1.71M
  bool no_sorting_order = batch_row_orders_.empty();
309
310
1.71M
  rows_affected_count_ = 0;
311
  // Check for errors reported by tablet server.
312
4.93M
  for (size_t op_index = 0; op_index < active_op_count_; 
op_index++3.22M
) {
313
3.22M
    auto& pgsql_op = pgsql_ops_[op_index];
314
3.22M
    auto& response = pgsql_op->response();
315
    // Get total number of rows that are operated on.
316
3.22M
    rows_affected_count_ += response.rows_affected_count();
317
318
    // A single batch of requests almost always is directed to fetch data from a single tablet.
319
    // However, when tablets split, data can be sharded/distributed across multiple tablets.
320
    // Due to automatic tablet splitting, there can exist scenarios where a pgsql_operation prepares
321
    // a request for a single tablet before the split occurs and then a tablet can be split to
322
    // multiple tablets. Hence, a single pgsql_op would potentially end up fetching data from
323
    // multiple tablets.
324
    //
325
    // For example consider a query select * from table where i=1 order by j;
326
    // where there exists a secondary index table_i_j_idx (i HASH, j ASC). Since, there is an
327
    // ordering constraint, the ybctids are ordered on j;
328
    //     ybctid[partition 1] contains (ybctid_1, ybctid_3, ybctid_4)
329
    //     ybctid[partition 2] contains (ybctid_2, ybctid_6)
330
    //     ybctid[partition 3] contains (ybctid_5, ybctid_7)
331
    //
332
    // say partition 1 splits into partition 1.1 and partition 1.2
333
    // ybctid[partition 1.1] contains (ybctid_1, ybctid_4) -> pgsql_op for partition 1
334
    // ybctid[partition 1.2] contains (ybctid_3) -> pgsql_op partition 1 with paging state
335
    //
336
    // In order to match the ordering constraints between the request and the responses, we
337
    // obtain the orders of requests executed in each partition and send it along with the responses
338
    // so that pg_gate can send responses to the postgres layer in the correct order.
339
340
    // Get contents.
341
3.22M
    auto& rows_data = pgsql_ops_[op_index]->rows_data();
342
3.22M
    if (rows_data) {
343
3.22M
      if (no_sorting_order) {
344
3.22M
        result.emplace_back(std::move(rows_data));
345
3.22M
      } else {
346
6.33k
        const auto& batch_orders = pgsql_op->response().batch_orders();
347
6.33k
        if (!batch_orders.empty()) {
348
5.90k
          result.emplace_back(std::move(pgsql_op->rows_data()),
349
5.90k
                              std::list<int64_t>(batch_orders.begin(), batch_orders.end()));
350
5.90k
        } else {
351
429
          result.emplace_back(std::move(rows_data), std::move(batch_row_orders_[op_index]));
352
429
        }
353
6.33k
      }
354
3.22M
    }
355
3.22M
  }
356
357
1.71M
  return result;
358
1.71M
}
359
360
8.92M
uint64_t& PgDocOp::GetReadTime() {
361
8.92M
  return (read_time_ || 
!exec_params_.statement_read_time8.90M
)
362
8.92M
      ? 
read_time_8.36M
:
*exec_params_.statement_read_time556k
;
363
8.92M
}
364
365
8.92M
Status PgDocOp::CreateRequests() {
366
8.92M
  if (!request_population_completed_) {
367
8.79M
    if (VERIFY_RESULT(DoCreateRequests())) {
368
8.79M
      request_population_completed_ = true;
369
8.79M
    }
370
8.79M
  }
371
0
  return CompleteRequests();
372
8.92M
}
373
374
5.58k
Status PgDocOp::PopulateDmlByYbctidOps(const std::vector<Slice>& ybctids) {
375
5.58k
  RETURN_NOT_OK(DoPopulateDmlByYbctidOps(ybctids));
376
5.58k
  request_population_completed_ = true;
377
5.58k
  return CompleteRequests();
378
5.58k
}
379
380
8.92M
Status PgDocOp::CompleteRequests() {
381
20.5M
  for (size_t i = 0; i != active_op_count_; 
++i11.5M
) {
382
11.5M
    RETURN_NOT_OK(pgsql_ops_[i]->InitPartitionKey(*table_));
383
11.5M
  }
384
8.92M
  return Status::OK();
385
8.92M
}
386
387
//-------------------------------------------------------------------------------------------------
388
389
PgDocReadOp::PgDocReadOp(const PgSession::ScopedRefPtr& pg_session,
390
                         PgTable* table,
391
                         PgsqlReadOpPtr read_op)
392
1.59M
    : PgDocOp(pg_session, table), read_op_(std::move(read_op)) {
393
1.59M
}
394
395
1.59M
Status PgDocReadOp::ExecuteInit(const PgExecParameters *exec_params) {
396
1.59M
  SCHECK(pgsql_ops_.empty(),
397
1.59M
         IllegalState,
398
1.59M
         "Exec params can't be changed for already created operations");
399
1.59M
  RETURN_NOT_OK(PgDocOp::ExecuteInit(exec_params));
400
401
1.59M
  read_op_->read_request().set_return_paging_state(true);
402
  // TODO(10696): This is probably the only place in pg_doc_op where pg_session is being
403
  // used as a source of truth. All other uses treat it as stateless. Refactor to move this
404
  // state elsewhere.
405
1.59M
  if (pg_session_->ShouldUseFollowerReads()) {
406
27
    read_op_->set_read_from_followers();
407
27
  }
408
1.59M
  SetRequestPrefetchLimit();
409
1.59M
  SetBackfillSpec();
410
1.59M
  SetRowMark();
411
1.59M
  SetReadTime();
412
1.59M
  return Status::OK();
413
1.59M
}
414
415
1.65M
Result<std::list<PgDocResult>> PgDocReadOp::ProcessResponseImpl() {
416
  // Process result from tablet server and check result status.
417
1.65M
  auto result = VERIFY_RESULT(ProcessResponseResult());
418
419
  // Process paging state and check status.
420
1.65M
  RETURN_NOT_OK(ProcessResponseReadStates());
421
1.65M
  return result;
422
1.65M
}
423
424
1.58M
Result<bool> PgDocReadOp::DoCreateRequests() {
425
  // All information from the SQL request has been collected and setup. This code populate
426
  // Protobuf requests before sending them to DocDB. For performance reasons, requests are
427
  // constructed differently for different statement.
428
1.58M
  if (read_op_->read_request().has_sampling_state()) {
429
178
    VLOG
(1) << __PRETTY_FUNCTION__ << ": Preparing sampling requests "0
;
430
178
    return PopulateSamplingOps();
431
432
  // Requests pushing down aggregates and/or filter expression tend to do more work on DocDB side,
433
  // so it takes longer to return responses, and Postgres side has generally less work to do.
434
  // Hence we optimize by sending multiple parallel requests to the nodes, allowing their
435
  // simultaneous processing.
436
  // Effect may be less than expected if the nodes are already heavily loaded and CPU consumption
437
  // is high, or selectivity of the filter is low.
438
1.58M
  } else if (read_op_->read_request().is_aggregate() ||
439
1.58M
             
read_op_->read_request().where_clauses_size() > 01.57M
) {
440
6.05k
    return PopulateParallelSelectOps();
441
442
1.57M
  } else if (read_op_->read_request().partition_column_values_size() > 0) {
443
    // Optimization for multiple hash keys.
444
    // - SELECT * FROM sql_table WHERE hash_c1 IN (1, 2, 3) AND hash_c2 IN (4, 5, 6);
445
    // - Multiple requests for differrent hash permutations / keys.
446
289k
    return PopulateNextHashPermutationOps();
447
448
1.28M
  } else {
449
    // No optimization.
450
1.28M
    if (exec_params_.partition_key != nullptr) {
451
2.25k
      RETURN_NOT_OK(SetScanPartitionBoundary());
452
2.25k
    }
453
1.28M
    pgsql_ops_.emplace_back(read_op_);
454
1.28M
    pgsql_ops_.back()->set_active(true);
455
1.28M
    active_op_count_ = 1;
456
1.28M
    return true;
457
1.28M
  }
458
1.58M
}
459
460
5.58k
Status PgDocReadOp::DoPopulateDmlByYbctidOps(const std::vector<Slice>& ybctids) {
461
  // This function is called only when ybctids were returned from INDEX.
462
  //
463
  // NOTE on a typical process.
464
  // 1- Statement:
465
  //    SELECT xxx FROM <table> WHERE ybctid IN (SELECT ybctid FROM INDEX);
466
  //
467
  // 2- Select 1024 ybctids (prefetch limit) from INDEX.
468
  //
469
  // 3- ONLY ONE TIME: Create a batch of operators, one per partition.
470
  //    * Each operator has a clone requests from template_op_.
471
  //    * We will reuse the created operators & requests for the future batches of 1024 ybctids.
472
  //    * Certain fields in the protobuf requests MUST BE RESET for each batches.
473
  //
474
  // 4- Assign the selected 1024 ybctids to the batch of operators.
475
  //
476
  // 5- Send requests to tablet servers to read data from <tab> associated with ybctid values.
477
  //
478
  // 6- Repeat step 2 thru 5 for the next batch of 1024 ybctids till done.
479
5.58k
  RETURN_NOT_OK(InitializeYbctidOperators());
480
5.58k
  const auto& partition_keys = table_->GetPartitions();
481
482
  // Begin a batch of ybctids.
483
5.58k
  end_of_data_ = false;
484
  // Assign ybctid values.
485
1.54M
  for (const Slice& ybctid : ybctids) {
486
    // Find partition. The partition index is the boundary order minus 1.
487
    // - For hash partitioning, we use hashcode to find the right index.
488
    // - For range partitioning, we pass partition key to seek the index.
489
1.54M
    SCHECK(ybctid.size() > 0, InternalError, "Invalid ybctid value");
490
    // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after
491
    // the following lines?
492
1.54M
    const size_t partition = VERIFY_RESULT(table_->FindPartitionIndex(ybctid));
493
1.54M
    SCHECK(partition < table_->GetPartitionCount(), InternalError,
494
1.54M
           "Ybctid value is not within partition boundary");
495
496
    // Assign ybctids to operators.
497
1.54M
    auto& read_req = GetReadReq(partition);
498
1.54M
    if (!read_req.has_ybctid_column_value()) {
499
      // We must set "ybctid_column_value" in the request for two reasons.
500
      // - "client::yb_op" uses it to set the hash_code.
501
      // - Rolling upgrade: Older server will read only "ybctid_column_value" as it doesn't know
502
      //   of ybctid-batching operation.
503
6.37k
      pgsql_ops_[partition]->set_active(true);
504
6.37k
      read_req.set_is_forward_scan(true);
505
6.37k
      read_req.mutable_ybctid_column_value()->mutable_value()
506
6.37k
        ->set_binary_value(ybctid.data(), ybctid.size());
507
508
      // For every read operation set partition boundary. In case a tablet is split between
509
      // preparing requests and executing them, docDB will return a paging state for pggate to
510
      // contiunue till the end of current tablet is reached.
511
6.37k
      std::string upper_bound;
512
6.37k
      if (partition < partition_keys.size() - 1) {
513
4.63k
        upper_bound = partition_keys[partition + 1];
514
4.63k
      }
515
6.37k
      RETURN_NOT_OK(table_->SetScanBoundary(&read_req,
516
6.37k
                                            partition_keys[partition],
517
6.37k
                                            /* lower_bound_is_inclusive */ true,
518
6.37k
                                            upper_bound,
519
6.37k
                                            /* upper_bound_is_inclusive */ false));
520
6.37k
    }
521
522
    // Append ybctid and its order to batch_arguments.
523
    // The "ybctid" values are returned in the same order as the row in the IndexTable. To keep
524
    // track of this order, each argument is assigned an order-number.
525
1.54M
    auto batch_arg = read_req.add_batch_arguments();
526
1.54M
    batch_arg->set_order(batch_row_ordering_counter_);
527
1.54M
    batch_arg->mutable_ybctid()->mutable_value()->set_binary_value(ybctid.data(), ybctid.size());
528
529
    // Remember the order number for each request.
530
1.54M
    batch_row_orders_[partition].push_back(batch_row_ordering_counter_);
531
532
    // Increment counter for the next row.
533
1.54M
    batch_row_ordering_counter_++;
534
1.54M
  }
535
536
  // Done creating request, but not all partition or operator has arguments (inactive).
537
5.58k
  MoveInactiveOpsOutside();
538
539
5.58k
  return Status::OK();
540
5.58k
}
541
542
5.59k
Status PgDocReadOp::InitializeYbctidOperators() {
543
5.59k
  auto op_count = table_->GetPartitionCount();
544
545
5.59k
  if (batch_row_orders_.size() == 0) {
546
    // First batch:
547
    // - Create operators.
548
    // - Allocate row orders for each tablet server.
549
    // - Protobuf fields in requests are not yet set so not needed to be cleared.
550
4.14k
    RETURN_NOT_OK(ClonePgsqlOps(op_count));
551
4.14k
    batch_row_orders_.resize(op_count);
552
553
    // To honor the indexing order of ybctid values, for each batch of ybctid-binds, select all rows
554
    // in the batch and then order them before returning result to Postgres layer.
555
4.14k
    wait_for_batch_completion_ = true;
556
557
4.14k
  } else {
558
    // Second and later batches: Reuse all state variables.
559
    // - Clear row orders for this batch to be set later.
560
    // - Clear protobuf fields ybctids and others before reusing them in this batch.
561
1.44k
    RETURN_NOT_OK(ResetInactivePgsqlOps());
562
1.44k
  }
563
5.59k
  return Status::OK();
564
5.59k
}
565
566
289k
Result<bool> PgDocReadOp::PopulateNextHashPermutationOps() {
567
289k
  RETURN_NOT_OK(InitializeHashPermutationStates());
568
569
  // Set the index at the start of inactive operators.
570
289k
  auto op_count = pgsql_ops_.size();
571
289k
  auto op_index = active_op_count_;
572
573
  // Fill inactive operators with new hash permutations.
574
289k
  const size_t hash_column_count = table_->num_hash_key_columns();
575
3.20M
  for (; op_index < op_count && 
next_permutation_idx_ < total_permutation_count_2.91M
;
++op_index2.91M
) {
576
2.91M
    auto& read_req = GetReadReq(op_index);
577
2.91M
    pgsql_ops_[op_index]->set_active(true);
578
579
2.91M
    int pos = next_permutation_idx_++;
580
5.83M
    for (int c_idx = narrow_cast<int>(hash_column_count); c_idx-- > 0;) {
581
2.92M
      int sel_idx = pos % partition_exprs_[c_idx].size();
582
2.92M
      *read_req.mutable_partition_column_values(c_idx) = *partition_exprs_[c_idx][sel_idx];
583
2.92M
      pos /= partition_exprs_[c_idx].size();
584
2.92M
    }
585
2.91M
  }
586
289k
  active_op_count_ = op_index;
587
588
  // Stop adding requests if we reach the total number of permutations.
589
289k
  return next_permutation_idx_ >= total_permutation_count_;
590
289k
}
591
592
// Collect hash expressions to prepare for generating permutations.
593
290k
Status PgDocReadOp::InitializeHashPermutationStates() {
594
  // Return if state variables were initialized.
595
290k
  if (!partition_exprs_.empty()) {
596
    // Reset the protobuf request before reusing the operators.
597
134
    return ResetInactivePgsqlOps();
598
134
  }
599
600
  // Initialize partition_exprs_.
601
  // Reorganize the input arguments from Postgres to prepre for permutation generation.
602
290k
  const size_t hash_column_count = table_->num_hash_key_columns();
603
290k
  partition_exprs_.resize(hash_column_count);
604
580k
  for (size_t c_idx = 0; c_idx < hash_column_count; 
++c_idx290k
) {
605
290k
    const auto& col_expr = read_op_->read_request().partition_column_values(
606
290k
        narrow_cast<int>(c_idx));
607
290k
    if (col_expr.has_condition()) {
608
2.71M
      for (const auto& expr : col_expr.condition().operands(1).condition().operands()) {
609
2.71M
        partition_exprs_[c_idx].push_back(&expr);
610
2.71M
      }
611
198k
    } else {
612
198k
      partition_exprs_[c_idx].push_back(&col_expr);
613
198k
    }
614
290k
  }
615
616
  // Calculate the total number of permutations to be generated.
617
290k
  total_permutation_count_ = 1;
618
290k
  for (auto& exprs : partition_exprs_) {
619
290k
    total_permutation_count_ *= exprs.size();
620
290k
  }
621
622
  // Create operators, one operation per partition, up to FLAGS_ysql_request_limit.
623
  //
624
  // TODO(neil) The control variable "ysql_request_limit" should be applied to ALL statements, but
625
  // at the moment, the number of operators never exceeds the number of tablets except for hash
626
  // permutation operation, so the work on this GFLAG can be done when it is necessary.
627
290k
  int max_op_count = std::min(total_permutation_count_, FLAGS_ysql_request_limit);
628
290k
  RETURN_NOT_OK(ClonePgsqlOps(max_op_count));
629
630
  // Clear the original partition expressions as it will be replaced with hash permutations.
631
3.21M
  
for (int op_index = 0; 290k
op_index < max_op_count;
op_index++2.92M
) {
632
2.92M
    auto& read_request = GetReadReq(op_index);
633
2.92M
    read_request.clear_partition_column_values();
634
5.84M
    for (size_t i = 0; i < hash_column_count; 
++i2.92M
) {
635
2.92M
      read_request.add_partition_column_values();
636
2.92M
    }
637
2.92M
    pgsql_ops_[op_index]->set_active(false);
638
2.92M
  }
639
640
  // Initialize counters.
641
290k
  next_permutation_idx_ = 0;
642
290k
  active_op_count_ = 0;
643
644
290k
  return Status::OK();
645
290k
}
646
647
6.01k
Result<bool> PgDocReadOp::PopulateParallelSelectOps() {
648
  // Create batch operators, one per partition, to execute in parallel.
649
  // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after
650
  // the following line?
651
6.01k
  RETURN_NOT_OK(ClonePgsqlOps(table_->GetPartitionCount()));
652
  // Set "pararallelism_level_" to control how many operators can be sent at one time.
653
  //
654
  // TODO(neil) The calculation for this control variable should be applied to ALL operators, but
655
  // the following calculation needs to be refined before it can be used for all statements.
656
6.01k
  auto parallelism_level = FLAGS_ysql_select_parallelism;
657
6.03k
  if (
parallelism_level < 06.01k
) {
658
6.03k
    int tserver_count = VERIFY_RESULT(pg_session_->TabletServerCount(true /* primary_only */));
659
660
    // Establish lower and upper bounds on parallelism.
661
0
    int kMinParSelParallelism = 1;
662
6.03k
    int kMaxParSelParallelism = 16;
663
6.03k
    parallelism_level_ =
664
6.03k
      std::min(std::max(tserver_count * 2, kMinParSelParallelism), kMaxParSelParallelism);
665
18.4E
  } else {
666
18.4E
    parallelism_level_ = parallelism_level;
667
18.4E
  }
668
669
  // Assign partitions to operators.
670
6.01k
  const auto& partition_keys = table_->GetPartitions();
671
6.01k
  SCHECK_EQ(partition_keys.size(), pgsql_ops_.size(), IllegalState,
672
6.01k
            "Number of partitions and number of partition keys are not the same");
673
674
24.6k
  
for (size_t partition = 0; 6.01k
partition < partition_keys.size();
partition++18.6k
) {
675
    // Construct a new YBPgsqlReadOp.
676
18.6k
    pgsql_ops_[partition]->set_active(true);
677
678
    // Use partition index to setup the protobuf to identify the partition that this request
679
    // is for. Batcher will use this information to send the request to correct tablet server, and
680
    // server uses this information to operate on correct tablet.
681
    // - Range partition uses range partition key to identify partition.
682
    // - Hash partition uses "next_partition_key" and "max_hash_code" to identify partition.
683
18.6k
    string upper_bound;
684
18.6k
    if (partition < partition_keys.size() - 1) {
685
12.6k
      upper_bound = partition_keys[partition + 1];
686
12.6k
    }
687
18.6k
    RETURN_NOT_OK(table_->SetScanBoundary(&GetReadReq(partition),
688
18.6k
                                          partition_keys[partition],
689
18.6k
                                          true /* lower_bound_is_inclusive */,
690
18.6k
                                          upper_bound,
691
18.6k
                                          false /* upper_bound_is_inclusive */));
692
18.6k
  }
693
6.01k
  active_op_count_ = partition_keys.size();
694
695
6.01k
  return true;
696
6.01k
}
697
698
178
Result<bool> PgDocReadOp::PopulateSamplingOps() {
699
  // Create one PgsqlOp per partition
700
178
  RETURN_NOT_OK(ClonePgsqlOps(table_->GetPartitionCount()));
701
  // Partitions are sampled sequentially, one at a time
702
178
  parallelism_level_ = 1;
703
  // Assign partitions to operators.
704
178
  const auto& partition_keys = table_->GetPartitions();
705
178
  SCHECK_EQ(partition_keys.size(), pgsql_ops_.size(), IllegalState,
706
178
            "Number of partitions and number of partition keys are not the same");
707
708
  // Bind requests to partitions
709
584
  
for (size_t partition = 0; 178
partition < partition_keys.size();
partition++406
) {
710
    // Construct a new YBPgsqlReadOp.
711
406
    pgsql_ops_[partition]->set_active(true);
712
713
    // Use partition index to setup the protobuf to identify the partition that this request
714
    // is for. Batcher will use this information to send the request to correct tablet server, and
715
    // server uses this information to operate on correct tablet.
716
    // - Range partition uses range partition key to identify partition.
717
    // - Hash partition uses "next_partition_key" and "max_hash_code" to identify partition.
718
406
    string upper_bound;
719
406
    if (partition < partition_keys.size() - 1) {
720
228
      upper_bound = partition_keys[partition + 1];
721
228
    }
722
406
    RETURN_NOT_OK(table_->SetScanBoundary(&GetReadReq(partition),
723
406
                                          partition_keys[partition],
724
406
                                          true /* lower_bound_is_inclusive */,
725
406
                                          upper_bound,
726
406
                                          false /* upper_bound_is_inclusive */));
727
406
  }
728
178
  active_op_count_ = partition_keys.size();
729
178
  VLOG
(1) << "Number of partitions to sample: " << active_op_count_0
;
730
  // If we have big enough sample after processing some partitions we skip the rest.
731
  // By shuffling partitions we randomly select the partition(s) to sample.
732
178
  std::random_shuffle(pgsql_ops_.begin(), pgsql_ops_.end());
733
734
178
  return true;
735
178
}
736
737
178
Status PgDocReadOp::GetEstimatedRowCount(double *liverows, double *deadrows) {
738
178
  if (liverows != nullptr) {
739
    // Return estimated number of live tuples
740
178
    VLOG
(1) << "Returning liverows " << sample_rows_0
;
741
178
    *liverows = sample_rows_;
742
178
  }
743
178
  if (deadrows != nullptr) {
744
    // TODO count dead tuples while sampling
745
178
    *deadrows = 0;
746
178
  }
747
178
  return Status::OK();
748
178
}
749
750
// When postgres requests to scan a specific partition, set the partition parameter accordingly.
751
2.25k
Status PgDocReadOp::SetScanPartitionBoundary() {
752
  // Boundary to scan from a given key to the end of its associated tablet.
753
  // - Lower: The given partition key (inclusive).
754
  // - Upper: Beginning of next tablet (not inclusive).
755
2.25k
  SCHECK(exec_params_.partition_key != nullptr, Uninitialized, "expected non-null partition_key");
756
757
  // Seek the tablet of the given key.
758
  // TODO(tsplit): what if table partition is changed during PgDocReadOp lifecycle before or after
759
  // the following line?
760
2.25k
  const std::vector<std::string>& partition_keys = table_->GetPartitions();
761
2.25k
  const auto& partition_key = std::find(
762
2.25k
      partition_keys.begin(),
763
2.25k
      partition_keys.end(),
764
2.25k
      a2b_hex(exec_params_.partition_key));
765
2.25k
  RSTATUS_DCHECK(
766
2.25k
      partition_key != partition_keys.end(), InvalidArgument, "invalid partition key given");
767
768
  // Seek upper bound (Beginning of next tablet).
769
2.25k
  string upper_bound;
770
2.25k
  const auto& next_partition_key = std::next(partition_key, 1);
771
2.25k
  if (next_partition_key != partition_keys.end()) {
772
1.62k
    upper_bound = *next_partition_key;
773
1.62k
  }
774
2.25k
  RETURN_NOT_OK(table_->SetScanBoundary(
775
2.25k
      &read_op_->read_request(), *partition_key, true /* lower_bound_is_inclusive */, upper_bound,
776
2.25k
      false /* upper_bound_is_inclusive */));
777
2.25k
  return Status::OK();
778
2.25k
}
779
780
1.65M
Status PgDocReadOp::ProcessResponseReadStates() {
781
  // For each read_op, set up its request for the next batch of data or make it in-active.
782
1.65M
  bool has_more_data = false;
783
1.65M
  auto send_count = std::min(parallelism_level_, active_op_count_);
784
785
4.82M
  for (size_t op_index = 0; op_index < send_count; 
op_index++3.17M
) {
786
3.17M
    auto& read_op = down_cast<PgsqlReadOp&>(*pgsql_ops_[op_index]);
787
3.17M
    RETURN_NOT_OK(ReviewResponsePagingState(*table_, &read_op));
788
789
    // Check for completion.
790
3.17M
    bool has_more_arg = false;
791
3.17M
    auto& res = read_op.response();
792
3.17M
    auto& req = read_op.read_request();
793
794
    // Save the backfill_spec if tablet server wants to return it.
795
3.17M
    if (res.is_backfill_batch_done()) {
796
2.25k
      out_param_backfill_spec_ = res.backfill_spec();
797
3.16M
    } else if (res.has_paging_state()) {
798
141k
      has_more_arg = true;
799
800
      // Set up paging state for next request.
801
      // A query request can be nested, and paging state belong to the innermost query which is
802
      // the read operator that is operated first and feeds data to other queries.
803
      // Recursive Proto Message:
804
      //     PgsqlReadRequestPB { PgsqlReadRequestPB index_request; }
805
141k
      PgsqlReadRequestPB *innermost_req = &req;
806
141k
      while (innermost_req->has_index_request()) {
807
167
        innermost_req = innermost_req->mutable_index_request();
808
167
      }
809
141k
      *innermost_req->mutable_paging_state() = std::move(*res.mutable_paging_state());
810
141k
      if (innermost_req->paging_state().has_read_time()) {
811
62.4k
        read_op.set_read_time(ReadHybridTime::FromPB(innermost_req->paging_state().read_time()));
812
62.4k
      }
813
814
      // Setup backfill_spec for the next request.
815
141k
      if (res.has_backfill_spec()) {
816
79
        *innermost_req->mutable_backfill_spec() = std::move(*res.mutable_backfill_spec());
817
79
      }
818
819
      // Parse/Analysis/Rewrite catalog version has already been checked on the first request.
820
      // The docdb layer will check the target table's schema version is compatible.
821
      // This allows long-running queries to continue in the presence of other DDL statements
822
      // as long as they do not affect the table(s) being queried.
823
141k
      req.clear_ysql_catalog_version();
824
141k
    }
825
826
    // Check for batch execution.
827
3.17M
    if (res.batch_arg_count() < req.batch_arguments_size()) {
828
0
      has_more_arg = true;
829
830
      // Delete the executed arguments from batch and keep those that haven't been executed.
831
0
      req.mutable_batch_arguments()->DeleteSubrange(
832
0
          0, narrow_cast<int32_t>(res.batch_arg_count()));
833
834
      // Due to rolling upgrade reason, we must copy the first batch_arg to the scalar arg.
835
0
      FormulateRequestForRollingUpgrade(&req);
836
0
    }
837
838
3.17M
    if (res.has_sampling_state()) {
839
406
      VLOG(1) << "Received sampling state:"
840
0
              << " samplerows: " << res.mutable_sampling_state()->samplerows()
841
0
              << " rowstoskip: " << res.mutable_sampling_state()->rowstoskip()
842
0
              << " rstate_w: " << res.mutable_sampling_state()->rstate_w()
843
0
              << " rand_state: " << res.mutable_sampling_state()->rand_state();
844
406
      if (has_more_arg) {
845
        // Copy sampling state from the response to the request, to properly continue to sample
846
        // the next block.
847
0
        *req.mutable_sampling_state() = std::move(*res.mutable_sampling_state());
848
406
      } else {
849
        // Partition sampling is completed.
850
        // If samplerows is greater than or equal to targrows the sampling is complete. There are
851
        // enough rows selected to calculate stats and we can estimate total number of rows in the
852
        // table by extrapolating samplerows to the partitions that have not been scanned.
853
        // If samplerows is less than targrows next partition needs to be sampled. Next pgdoc_op
854
        // already has sampling state copied from the template_op_, only couple fields need to be
855
        // updated: numrows and samplerows. The targrows never changes, and in reservoir population
856
        // phase (before samplerows reaches targrows) 1. numrows and samplerows are always equal;
857
        // and 2. random numbers never generated, so random state remains the same.
858
        // That essentially means the only thing we need to collect from the partition's final
859
        // sampling state is the samplerows. We use that number to either estimate liverows, or to
860
        // update numrows and samplerows in next partition's sampling state.
861
406
        sample_rows_ = res.mutable_sampling_state()->samplerows();
862
406
      }
863
406
    }
864
865
3.17M
    if (has_more_arg) {
866
141k
      has_more_data = true;
867
3.03M
    } else {
868
3.03M
      read_op.set_active(false);
869
3.03M
    }
870
3.17M
  }
871
872
1.65M
  if (has_more_data || 
send_count < active_op_count_1.51M
) {
873
    // Move inactive ops to the end of pgsql_ops_ to make room for new set of arguments.
874
142k
    MoveInactiveOpsOutside();
875
142k
    end_of_data_ = false;
876
1.51M
  } else {
877
    // There should be no active op left in queue.
878
1.51M
    active_op_count_ = 0;
879
1.51M
    end_of_data_ = request_population_completed_;
880
1.51M
  }
881
882
1.65M
  if (active_op_count_ > 0 && 
read_op_->read_request().has_sampling_state()142k
) {
883
228
    auto& read_op = down_cast<PgsqlReadOp&>(*pgsql_ops_[0]);
884
228
    PgsqlReadRequestPB *req = &read_op.read_request();
885
228
    if (!req->has_paging_state()) {
886
      // Current sampling op without paging state means that previous one was completed and moved
887
      // outside.
888
228
      auto sampling_state = req->mutable_sampling_state();
889
228
      if (sample_rows_ < sampling_state->targrows()) {
890
        // More sample rows are needed, update sampling state and let next partition be scanned
891
228
        VLOG
(1) << "Continue sampling next partition from " << sample_rows_0
;
892
228
        sampling_state->set_numrows(static_cast<int32>(sample_rows_));
893
228
        sampling_state->set_samplerows(sample_rows_);
894
228
      } else {
895
        // Have enough of sample rows, estimate total table rows assuming they are evenly
896
        // distributed between partitions
897
0
        auto completed_ops = pgsql_ops_.size() - active_op_count_;
898
0
        sample_rows_ = floor((sample_rows_ / completed_ops) * pgsql_ops_.size() + 0.5);
899
0
        VLOG(1) << "Done sampling, prorated rowcount is " << sample_rows_;
900
0
        end_of_data_ = true;
901
0
      }
902
228
    }
903
228
  }
904
905
1.65M
  return Status::OK();
906
1.65M
}
907
908
1.58M
void PgDocReadOp::SetRequestPrefetchLimit() {
909
  // Predict the maximum prefetch-limit using the associated gflags.
910
1.58M
  PgsqlReadRequestPB& req = read_op_->read_request();
911
1.58M
  auto predicted_limit = FLAGS_ysql_prefetch_limit;
912
1.58M
  if (!req.is_forward_scan()) {
913
    // Backward scan is slower than forward scan, so predicted limit is a smaller number.
914
63
    predicted_limit = predicted_limit * FLAGS_ysql_backward_prefetch_scale_factor;
915
63
  }
916
917
  // System setting has to be at least 1 while user setting (LIMIT clause) can be anything that
918
  // is allowed by SQL semantics.
919
1.58M
  if (predicted_limit < 1) {
920
0
    predicted_limit = 1;
921
0
  }
922
923
  // Use statement LIMIT(count + offset) if it is smaller than the predicted limit.
924
1.58M
  auto limit = exec_params_.limit_count + exec_params_.limit_offset;
925
1.58M
  suppress_next_result_prefetching_ = true;
926
1.58M
  if (exec_params_.limit_use_default || 
limit > predicted_limit25.0k
) {
927
1.56M
    limit = predicted_limit;
928
1.56M
    suppress_next_result_prefetching_ = false;
929
1.56M
  }
930
18.4E
  VLOG(3) << __func__
931
18.4E
          << " exec_params_.limit_count=" << exec_params_.limit_count
932
18.4E
          << " exec_params_.limit_offset=" << exec_params_.limit_offset
933
18.4E
          << " exec_params_.limit_use_default=" << exec_params_.limit_use_default
934
18.4E
          << " predicted_limit=" << predicted_limit
935
18.4E
          << " limit=" << limit;
936
1.58M
  req.set_limit(limit);
937
1.58M
}
938
939
1.58M
void PgDocReadOp::SetRowMark() {
940
1.58M
  auto& req = read_op_->read_request();
941
1.58M
  const auto row_mark_type = GetRowMarkType(&exec_params_);
942
1.58M
  if (IsValidRowMarkType(row_mark_type)) {
943
8.49k
    req.set_row_mark_type(row_mark_type);
944
8.49k
    req.set_wait_policy(static_cast<yb::WaitPolicy>(exec_params_.wait_policy));
945
1.58M
  } else {
946
1.58M
    req.clear_row_mark_type();
947
1.58M
  }
948
1.58M
}
949
950
1.58M
void PgDocReadOp::SetBackfillSpec() {
951
1.58M
  PgsqlReadRequestPB& req = read_op_->read_request();
952
953
1.58M
  if (exec_params_.bfinstr) {
954
2.25k
    req.set_backfill_spec(exec_params_.bfinstr, strlen(exec_params_.bfinstr));
955
1.58M
  } else {
956
1.58M
    req.clear_backfill_spec();
957
1.58M
  }
958
1.58M
}
959
960
1.58M
void PgDocReadOp::SetReadTime() {
961
1.58M
  if (exec_params_.is_index_backfill) {
962
2.25k
    read_op_->read_request().set_is_for_backfill(true);
963
2.25k
    read_op_->set_read_time(ReadHybridTime::FromUint64(GetReadTime()));
964
2.25k
  }
965
1.58M
}
966
967
1.53k
Status PgDocReadOp::ResetInactivePgsqlOps() {
968
  // Clear the existing ybctids.
969
6.64k
  for (auto op_index = active_op_count_; op_index < pgsql_ops_.size(); 
op_index++5.10k
) {
970
5.10k
    PgsqlReadRequestPB& read_req = GetReadReq(op_index);
971
5.10k
    read_req.clear_ybctid_column_value();
972
5.10k
    read_req.clear_batch_arguments();
973
5.10k
    read_req.clear_hash_code();
974
5.10k
    read_req.clear_max_hash_code();
975
5.10k
    read_req.clear_paging_state();
976
5.10k
    read_req.clear_lower_bound();
977
5.10k
    read_req.clear_upper_bound();
978
5.10k
  }
979
980
  // Clear row orders.
981
1.53k
  if (batch_row_orders_.size() > 0) {
982
5.57k
    for (auto op_index = active_op_count_; op_index < pgsql_ops_.size(); 
op_index++4.16k
) {
983
4.16k
      batch_row_orders_[op_index].clear();
984
4.16k
    }
985
1.40k
  }
986
987
1.53k
  return Status::OK();
988
1.53k
}
989
990
0
void PgDocReadOp::FormulateRequestForRollingUpgrade(PgsqlReadRequestPB *read_req) {
991
  // Copy first batch-argument to scalar arg because older server does not support batch arguments.
992
0
  const auto& batch_arg = read_req->batch_arguments(0);
993
994
0
  if (batch_arg.has_ybctid()) {
995
0
    *read_req->mutable_ybctid_column_value() = std::move(batch_arg.ybctid());
996
0
  }
997
998
0
  if (batch_arg.partition_column_values_size() > 0) {
999
0
    read_req->set_hash_code(batch_arg.hash_code());
1000
0
    read_req->set_max_hash_code(batch_arg.max_hash_code());
1001
0
    *read_req->mutable_partition_column_values() =
1002
0
      std::move(read_req->batch_arguments(0).partition_column_values());
1003
0
  }
1004
0
}
1005
1006
7.41M
PgsqlReadRequestPB& PgDocReadOp::GetReadReq(size_t op_index) {
1007
7.41M
  return down_cast<PgsqlReadOp&>(*pgsql_ops_[op_index]).read_request();
1008
7.41M
}
1009
1010
//--------------------------------------------------------------------------------------------------
1011
1012
PgDocWriteOp::PgDocWriteOp(const PgSession::ScopedRefPtr& pg_session,
1013
                           PgTable* table,
1014
                           PgsqlWriteOpPtr write_op)
1015
7.20M
    : PgDocOp(pg_session, table), write_op_(std::move(write_op)) {
1016
7.20M
}
1017
1018
53.4k
Result<std::list<PgDocResult>> PgDocWriteOp::ProcessResponseImpl() {
1019
  // Process result from tablet server and check result status.
1020
53.4k
  auto result = VERIFY_RESULT(ProcessResponseResult());
1021
1022
  // End execution and return result.
1023
0
  end_of_data_ = true;
1024
53.4k
  VLOG
(1) << __PRETTY_FUNCTION__ << ": Received response for request " << this0
;
1025
53.4k
  return result;
1026
53.4k
}
1027
1028
7.20M
Result<bool> PgDocWriteOp::DoCreateRequests() {
1029
  // Setup a singular operator.
1030
7.20M
  pgsql_ops_.push_back(write_op_);
1031
7.20M
  pgsql_ops_.back()->set_active(true);
1032
7.20M
  active_op_count_ = 1;
1033
1034
  // Log non buffered request.
1035
7.20M
  VLOG_IF
(1, response_.Valid()) << __PRETTY_FUNCTION__ << ": Sending request for " << this705
;
1036
7.20M
  return true;
1037
7.20M
}
1038
1039
371k
void PgDocWriteOp::SetWriteTime(const HybridTime& write_time) {
1040
371k
  write_op_->SetWriteTime(write_time);
1041
371k
}
1042
1043
0
PgsqlWriteRequestPB& PgDocWriteOp::GetWriteOp(int op_index) {
1044
0
  return down_cast<PgsqlWriteOp&>(*pgsql_ops_[op_index]).write_request();
1045
0
}
1046
1047
}  // namespace pggate
1048
}  // namespace yb