YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_dml.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_dml.h"
17
18
#include "yb/client/yb_op.h"
19
20
#include "yb/common/pg_system_attr.h"
21
22
#include "yb/util/atomic.h"
23
#include "yb/util/status_format.h"
24
25
#include "yb/yql/pggate/pg_select_index.h"
26
#include "yb/yql/pggate/pggate_flags.h"
27
#include "yb/yql/pggate/util/pg_doc_data.h"
28
#include "yb/yql/pggate/ybc_pggate.h"
29
30
namespace yb {
31
namespace pggate {
32
33
using namespace std::literals;  // NOLINT
34
using std::list;
35
36
// TODO(neil) This should be derived from a GFLAGS.
37
static MonoDelta kSessionTimeout = 60s;
38
39
//--------------------------------------------------------------------------------------------------
40
// PgDml
41
//--------------------------------------------------------------------------------------------------
42
43
PgDml::PgDml(PgSession::ScopedRefPtr pg_session, const PgObjectId& table_id)
44
9.25M
    : PgStatement(std::move(pg_session)), table_id_(table_id) {
45
9.25M
}
46
47
PgDml::PgDml(PgSession::ScopedRefPtr pg_session,
48
             const PgObjectId& table_id,
49
             const PgObjectId& index_id,
50
             const PgPrepareParameters *prepare_params)
51
2.05M
    : PgDml(pg_session, table_id) {
52
53
2.05M
  if (prepare_params) {
54
1.92M
    prepare_params_ = *prepare_params;
55
    // Primary index does not have its own data table.
56
1.92M
    if (prepare_params_.use_secondary_index) {
57
1.01M
      index_id_ = index_id;
58
1.01M
    }
59
1.92M
  }
60
2.05M
}
61
62
9.25M
PgDml::~PgDml() {
63
9.25M
}
64
65
//--------------------------------------------------------------------------------------------------
66
67
22.3M
Status PgDml::AppendTarget(PgExpr *target) {
68
  // Except for base_ctid, all targets should be appended to this DML.
69
22.3M
  if (target_ && 
(22.3M
prepare_params_.index_only_scan22.3M
||
!target->is_ybbasetid()22.0M
)) {
70
21.8M
    RETURN_NOT_OK(AppendTargetPB(target));
71
21.8M
  } else {
72
    // Append base_ctid to the index_query.
73
461k
    RETURN_NOT_OK(secondary_index_query_->AppendTargetPB(target));
74
461k
  }
75
76
22.3M
  return Status::OK();
77
22.3M
}
78
79
22.3M
Status PgDml::AppendTargetPB(PgExpr *target) {
80
  // Append to targets_.
81
22.3M
  targets_.push_back(target);
82
83
  // Allocate associated protobuf.
84
22.3M
  PgsqlExpressionPB *expr_pb = AllocTargetPB();
85
86
  // Prepare expression. Except for constants and place_holders, all other expressions can be
87
  // evaluate just one time during prepare.
88
22.3M
  RETURN_NOT_OK(target->PrepareForRead(this, expr_pb));
89
90
  // Link the given expression "attr_value" with the allocated protobuf. Note that except for
91
  // constants and place_holders, all other expressions can be setup just one time during prepare.
92
  // Example:
93
  // - Bind values for a target of SELECT
94
  //   SELECT AVG(col + ?) FROM a_table;
95
22.3M
  expr_binds_[expr_pb] = target;
96
22.3M
  return Status::OK();
97
22.3M
}
98
99
22
Status PgDml::AppendQual(PgExpr *qual) {
100
  // Append to quals_.
101
22
  quals_.push_back(qual);
102
103
  // Allocate associated protobuf.
104
22
  PgsqlExpressionPB *expr_pb = AllocQualPB();
105
106
  // Populate the expr_pb with data from the qual expression.
107
  // Side effect of PrepareForRead is to call PrepareColumnForRead on "this" being passed in
108
  // for any column reference found in the expression. However, the serialized Postgres expressions,
109
  // the only kind of Postgres expressions supported as quals, can not be searched.
110
  // Their column references should be explicitly appended with AppendColumnRef()
111
22
  return qual->PrepareForRead(this, expr_pb);
112
22
}
113
114
11.9k
Status PgDml::AppendColumnRef(PgExpr *colref) {
115
11.9k
  DCHECK
(colref->is_colref()) << "Colref is expected"0
;
116
  // Postgres attribute number, this is column id to refer the column from Postgres code
117
11.9k
  int attr_num = static_cast<PgColumnRef *>(colref)->attr_num();
118
  // Retrieve column metadata from the target relation metadata
119
11.9k
  PgColumn& col = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
120
11.9k
  if (!col.is_virtual_column()) {
121
    // Do not overwrite Postgres
122
11.9k
    if (!col.has_pg_type_info()) {
123
      // Postgres type information is required to get column value to evaluate serialized Postgres
124
      // expressions. For other purposes it is OK to use InvalidOids (zeroes). That would not make
125
      // the column to appear like it has Postgres type information.
126
      // Note, that for expression kinds other than serialized Postgres expressions column
127
      // references are set automatically: when the expressions are being appended they call either
128
      // PrepareColumnForRead or PrepareColumnForWrite for each column reference expression they
129
      // contain.
130
11.9k
      col.set_pg_type_info(colref->get_pg_typid(),
131
11.9k
                           colref->get_pg_typmod(),
132
11.9k
                           colref->get_pg_collid());
133
11.9k
    }
134
    // Flag column as used, so it is added to the request
135
11.9k
    col.set_read_requested(true);
136
11.9k
  }
137
11.9k
  return Status::OK();
138
11.9k
}
139
140
22.3M
Result<const PgColumn&> PgDml::PrepareColumnForRead(int attr_num, PgsqlExpressionPB *target_pb) {
141
  // Find column from targeted table.
142
22.3M
  PgColumn& col = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
143
144
  // Prepare protobuf to send to DocDB.
145
22.3M
  if (
target_pb22.3M
) {
146
22.3M
    target_pb->set_column_id(col.id());
147
22.3M
  }
148
149
  // Mark non-virtual column reference for DocDB.
150
22.3M
  if (!col.is_virtual_column()) {
151
20.9M
    col.set_read_requested(true);
152
20.9M
  }
153
154
22.3M
  return const_cast<const PgColumn&>(col);
155
22.3M
}
156
157
983k
Status PgDml::PrepareColumnForWrite(PgColumn *pg_col, PgsqlExpressionPB *assign_pb) {
158
  // Prepare protobuf to send to DocDB.
159
983k
  assign_pb->set_column_id(pg_col->id());
160
161
  // Mark non-virtual column reference for DocDB.
162
983k
  if (!pg_col->is_virtual_column()) {
163
983k
    pg_col->set_write_requested(true);
164
983k
  }
165
166
983k
  return Status::OK();
167
983k
}
168
169
9.25M
void PgDml::ColumnRefsToPB(PgsqlColumnRefsPB *column_refs) {
170
9.25M
  column_refs->Clear();
171
64.2M
  for (const PgColumn& col : target_.columns()) {
172
64.2M
    if (col.read_requested() || 
col.write_requested()43.3M
) {
173
21.9M
      column_refs->add_ids(col.id());
174
21.9M
    }
175
64.2M
  }
176
9.25M
}
177
178
9.25M
void PgDml::ColRefsToPB() {
179
  // Remove previously set column references in case if the statement is being reexecuted
180
9.25M
  ClearColRefPBs();
181
64.2M
  for (const PgColumn& col : target_.columns()) {
182
    // Only used columns are added to the request
183
64.2M
    if (col.read_requested() || 
col.write_requested()43.3M
) {
184
      // Allocate a protobuf entry
185
21.9M
      PgsqlColRefPB *col_ref = AllocColRefPB();
186
      // Add DocDB identifier
187
21.9M
      col_ref->set_column_id(col.id());
188
      // Add Postgres identifier
189
21.9M
      col_ref->set_attno(col.attr_num());
190
      // Add Postgres type information, if defined
191
21.9M
      if (col.has_pg_type_info()) {
192
11.9k
        col_ref->set_typid(col.pg_typid());
193
11.9k
        col_ref->set_typmod(col.pg_typmod());
194
11.9k
        col_ref->set_collid(col.pg_collid());
195
11.9k
      }
196
21.9M
    }
197
64.2M
  }
198
9.25M
}
199
200
//--------------------------------------------------------------------------------------------------
201
202
32.1M
Status PgDml::BindColumn(int attr_num, PgExpr *attr_value) {
203
32.1M
  if (secondary_index_query_) {
204
    // Bind by secondary key.
205
953k
    return secondary_index_query_->BindColumn(attr_num, attr_value);
206
953k
  }
207
208
  // Find column to bind.
209
31.1M
  PgColumn& column = 
VERIFY_RESULT31.1M
(bind_.ColumnForAttr(attr_num));31.1M
210
211
  // Check datatype.
212
31.1M
  if (attr_value->internal_type() != InternalType::kGinNullValue) {
213
31.1M
    SCHECK_EQ(column.internal_type(), attr_value->internal_type(), Corruption,
214
31.1M
              "Attribute value type does not match column type");
215
31.1M
  }
216
217
  // Alloc the protobuf.
218
31.1M
  PgsqlExpressionPB *bind_pb = column.bind_pb();
219
31.1M
  if (bind_pb == nullptr) {
220
20.5M
    bind_pb = AllocColumnBindPB(&column);
221
20.5M
  } else {
222
10.5M
    if (expr_binds_.find(bind_pb) != expr_binds_.end()) {
223
148
      LOG(WARNING) << strings::Substitute("Column $0 is already bound to another value.", attr_num);
224
148
    }
225
10.5M
  }
226
227
  // Link the given expression "attr_value" with the allocated protobuf. Note that except for
228
  // constants and place_holders, all other expressions can be setup just one time during prepare.
229
  // Examples:
230
  // - Bind values for primary columns in where clause.
231
  //     WHERE hash = ?
232
  // - Bind values for a column in INSERT statement.
233
  //     INSERT INTO a_table(hash, key, col) VALUES(?, ?, ?)
234
31.1M
  expr_binds_[bind_pb] = attr_value;
235
31.1M
  if (attr_num == static_cast<int>(PgSystemAttrNum::kYBTupleId)) {
236
5.87M
    CHECK
(attr_value->is_constant()) << "Column ybctid must be bound to constant"635
;
237
5.87M
    ybctid_bind_ = true;
238
5.87M
  }
239
31.1M
  return Status::OK();
240
31.1M
}
241
242
9.25M
Status PgDml::UpdateBindPBs() {
243
56.2M
  for (const auto &entry : expr_binds_) {
244
56.2M
    PgsqlExpressionPB *expr_pb = entry.first;
245
56.2M
    PgExpr *attr_value = entry.second;
246
56.2M
    RETURN_NOT_OK(attr_value->Eval(expr_pb));
247
56.2M
  }
248
249
9.25M
  return Status::OK();
250
9.25M
}
251
252
//--------------------------------------------------------------------------------------------------
253
254
90
Status PgDml::BindTable() {
255
90
  bind_table_ = true;
256
90
  return Status::OK();
257
90
}
258
259
//--------------------------------------------------------------------------------------------------
260
261
983k
Status PgDml::AssignColumn(int attr_num, PgExpr *attr_value) {
262
  // Find column from targeted table.
263
983k
  PgColumn& column = VERIFY_RESULT(target_.ColumnForAttr(attr_num));
264
265
  // Check datatype.
266
983k
  SCHECK_EQ(column.internal_type(), attr_value->internal_type(), Corruption,
267
983k
            "Attribute value type does not match column type");
268
269
  // Alloc the protobuf.
270
983k
  PgsqlExpressionPB *assign_pb = column.assign_pb();
271
983k
  if (
assign_pb == nullptr983k
) {
272
983k
    assign_pb = AllocColumnAssignPB(&column);
273
18.4E
  } else {
274
18.4E
    if (expr_assigns_.find(assign_pb) != expr_assigns_.end()) {
275
0
      return STATUS_SUBSTITUTE(InvalidArgument,
276
0
                               "Column $0 is already assigned to another value", attr_num);
277
0
    }
278
18.4E
  }
279
280
  // Link the expression and protobuf. During execution, expr will write result to the pb.
281
  // - Prepare the left hand side for write.
282
  // - Prepare the right hand side for read. Currently, the right hand side is always constant.
283
983k
  RETURN_NOT_OK(PrepareColumnForWrite(&column, assign_pb));
284
983k
  RETURN_NOT_OK(attr_value->PrepareForRead(this, assign_pb));
285
286
  // Link the given expression "attr_value" with the allocated protobuf. Note that except for
287
  // constants and place_holders, all other expressions can be setup just one time during prepare.
288
  // Examples:
289
  // - Setup rhs values for SET column = assign_pb in UPDATE statement.
290
  //     UPDATE a_table SET col = assign_expr;
291
983k
  expr_assigns_[assign_pb] = attr_value;
292
293
983k
  return Status::OK();
294
983k
}
295
296
7.20M
Status PgDml::UpdateAssignPBs() {
297
  // Process the column binds for two cases.
298
  // For performance reasons, we might evaluate these expressions together with bind values in YB.
299
7.20M
  for (const auto &entry : expr_assigns_) {
300
983k
    PgsqlExpressionPB *expr_pb = entry.first;
301
983k
    PgExpr *attr_value = entry.second;
302
983k
    RETURN_NOT_OK(attr_value->Eval(expr_pb));
303
983k
  }
304
305
7.20M
  return Status::OK();
306
7.20M
}
307
308
//--------------------------------------------------------------------------------------------------
309
310
2.84M
Result<bool> PgDml::ProcessSecondaryIndexRequest(const PgExecParameters *exec_params) {
311
2.84M
  if (!secondary_index_query_) {
312
    // Secondary INDEX is not used in this request.
313
2.00M
    return false;
314
2.00M
  }
315
316
  // Execute query in PgGate.
317
  // If index query is not yet executed, run it.
318
841k
  if (!secondary_index_query_->is_executed()) {
319
460k
    secondary_index_query_->set_is_executed(true);
320
460k
    RETURN_NOT_OK(secondary_index_query_->Exec(exec_params));
321
460k
  }
322
323
  // Not processing index request if it does not require its own doc operator.
324
  //
325
  // When INDEX is used for system catalog (colocated table), the index subquery does not have its
326
  // own operator. The request is combined with 'this' outer SELECT using 'index_request' attribute.
327
  //   (PgDocOp)doc_op_->(YBPgsqlReadOp)read_op_->(PgsqlReadRequestPB)read_request_::index_request
328
841k
  if (!secondary_index_query_->has_doc_op()) {
329
834k
    return false;
330
834k
  }
331
332
  // When INDEX has its own doc_op, execute it to fetch next batch of ybctids which is then used
333
  // to read data from the main table.
334
7.20k
  const vector<Slice> *ybctids;
335
7.20k
  if (!VERIFY_RESULT(secondary_index_query_->FetchYbctidBatch(&ybctids))) {
336
    // No more rows of ybctids.
337
2.42k
    return false;
338
2.42k
  }
339
340
  // Update request with the new batch of ybctids to fetch the next batch of rows.
341
4.77k
  RETURN_NOT_OK(doc_op_->PopulateDmlByYbctidOps(*ybctids));
342
4.77k
  AtomicFlagSleepMs(&FLAGS_TEST_inject_delay_between_prepare_ybctid_execute_batch_ybctid_ms);
343
4.77k
  return true;
344
4.77k
}
345
346
Status PgDml::Fetch(int32_t natts,
347
                    uint64_t *values,
348
                    bool *isnulls,
349
                    PgSysColumns *syscols,
350
53.1M
                    bool *has_data) {
351
  // Each isnulls and values correspond (in order) to columns from the table schema.
352
  // Initialize to nulls for any columns not present in result.
353
53.1M
  if (isnulls) {
354
53.1M
    memset(isnulls, true, natts * sizeof(bool));
355
53.1M
  }
356
53.1M
  if (syscols) {
357
53.1M
    memset(syscols, 0, sizeof(PgSysColumns));
358
53.1M
  }
359
360
  // Keep reading until we either reach the end or get some rows.
361
53.1M
  *has_data = true;
362
53.1M
  PgTuple pg_tuple(values, isnulls, syscols);
363
54.8M
  while (!VERIFY_RESULT(GetNextRow(&pg_tuple))) {
364
2.49M
    if (!VERIFY_RESULT(FetchDataFromServer())) {
365
      // Stop processing as server returns no more rows.
366
792k
      *has_data = false;
367
792k
      return Status::OK();
368
792k
    }
369
2.49M
  }
370
371
52.3M
  return Status::OK();
372
53.1M
}
373
374
2.49M
Result<bool> PgDml::FetchDataFromServer() {
375
  // Get the rowsets from doc-operator.
376
2.49M
  RETURN_NOT_OK(doc_op_->GetResult(&rowsets_));
377
378
  // Check if EOF is reached.
379
2.44M
  if (rowsets_.empty()) {
380
    // Process the secondary index to find the next WHERE condition.
381
    //   DML(Table) WHERE ybctid IN (SELECT base_ybctid FROM IndexTable),
382
    //   The nested query would return many rows each of which yields different result-set.
383
796k
    if (!VERIFY_RESULT(ProcessSecondaryIndexRequest(nullptr))) {
384
      // Return EOF as the nested subquery does not have any more data.
385
794k
      return false;
386
794k
    }
387
388
    // Execute doc_op_ again for the new set of WHERE condition from the nested query.
389
1.49k
    SCHECK_EQ(VERIFY_RESULT(doc_op_->Execute()), RequestSent::kTrue, IllegalState,
390
1.49k
              "YSQL read operation was not sent");
391
392
    // Get the rowsets from doc-operator.
393
1.49k
    RETURN_NOT_OK(doc_op_->GetResult(&rowsets_));
394
1.49k
  }
395
396
  // Return the output parameter back to Postgres if server wants.
397
1.65M
  if (doc_op_->has_out_param_backfill_spec() && 
pg_exec_params_2.25k
) {
398
2.25k
    PgExecOutParamValue value;
399
2.25k
    value.bfoutput = doc_op_->out_param_backfill_spec();
400
2.25k
    YBCGetPgCallbacks()->WriteExecOutParam(pg_exec_params_->out_param, &value);
401
2.25k
  }
402
403
1.65M
  return true;
404
2.44M
}
405
406
54.8M
Result<bool> PgDml::GetNextRow(PgTuple *pg_tuple) {
407
54.8M
  for (;;) {
408
57.3M
    for (auto rowset_iter = rowsets_.begin(); rowset_iter != rowsets_.end();) {
409
      // Check if the rowset has any data.
410
54.8M
      auto& rowset = *rowset_iter;
411
54.8M
      if (rowset.is_eof()) {
412
2.42M
        rowset_iter = rowsets_.erase(rowset_iter);
413
2.42M
        continue;
414
2.42M
      }
415
416
      // If this rowset has the next row of the index order, load it. Otherwise, continue looking
417
      // for the next row in the order.
418
      //
419
      // NOTE:
420
      //   DML <Table> WHERE ybctid IN (SELECT base_ybctid FROM <Index> ORDER BY <Index Range>)
421
      // The nested subquery should return rows in indexing order, but the ybctids are then grouped
422
      // by hash-code for BATCH-DML-REQUEST, so the response here are out-of-order.
423
52.4M
      if (rowset.NextRowOrder() <= current_row_order_) {
424
        // Write row to postgres tuple.
425
52.3M
        int64_t row_order = -1;
426
52.3M
        RETURN_NOT_OK(rowset.WritePgTuple(targets_, pg_tuple, &row_order));
427
52.3M
        SCHECK(row_order == -1 || row_order == current_row_order_, InternalError,
428
52.3M
               "The resulting row are not arranged in indexing order");
429
430
        // Found the current row. Move cursor to next row.
431
52.3M
        current_row_order_++;
432
52.3M
        return true;
433
52.3M
      }
434
435
124k
      rowset_iter++;
436
124k
    }
437
438
2.49M
    if (!rowsets_.empty() && 
doc_op_->end_of_data()0
) {
439
      // If the current desired row is missing, skip it and continue to look for the next
440
      // desired row in order. A row is deemed missing if it is not found and the doc op
441
      // has no more rows to return.
442
0
      current_row_order_++;
443
2.49M
    } else {
444
2.49M
      break;
445
2.49M
    }
446
2.49M
  }
447
448
2.49M
  return false;
449
54.8M
}
450
451
2.50M
bool PgDml::has_aggregate_targets() {
452
2.50M
  size_t num_aggregate_targets = 0;
453
28.5M
  for (const auto& target : targets_) {
454
28.5M
    if (target->is_aggregate()) {
455
6.05k
      num_aggregate_targets++;
456
6.05k
    }
457
28.5M
  }
458
459
2.50M
  CHECK(num_aggregate_targets == 0 || num_aggregate_targets == targets_.size())
460
508
    << "Some, but not all, targets are aggregate expressions.";
461
462
2.50M
  return num_aggregate_targets > 0;
463
2.50M
}
464
465
6.20M
Result<YBCPgColumnInfo> PgDml::GetColumnInfo(int attr_num) const {
466
6.20M
  if (secondary_index_query_) {
467
759
    return secondary_index_query_->GetColumnInfo(attr_num);
468
759
  }
469
6.19M
  return bind_->GetColumnInfo(attr_num);
470
6.20M
}
471
472
}  // namespace pggate
473
}  // namespace yb