YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/executor/ybc_fdw.c
Line
Count
Source (jump to first uncovered line)
1
/*--------------------------------------------------------------------------------------------------
2
 *
3
 * ybc_fdw.c
4
 *      Foreign-data wrapper for YugabyteDB.
5
 *
6
 * Copyright (c) YugaByte, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9
 * in compliance with the License.  You may obtain a copy of the License at
10
 *
11
 * http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing, software distributed under the License
14
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15
 * or implied.  See the License for the specific language governing permissions and limitations
16
 * under the License.
17
 *
18
 * IDENTIFICATION
19
 *      src/backend/executor/ybc_fdw.c
20
 *
21
 *--------------------------------------------------------------------------------------------------
22
 */
23
24
#include "postgres.h"
25
26
#include <sys/stat.h>
27
#include <unistd.h>
28
#include <string.h>
29
30
/*  TODO see which includes of this block are still needed. */
31
#include "access/htup_details.h"
32
#include "access/reloptions.h"
33
#include "access/sysattr.h"
34
#include "access/xact.h"
35
#include "catalog/catalog.h"
36
#include "catalog/pg_foreign_table.h"
37
#include "commands/copy.h"
38
#include "commands/defrem.h"
39
#include "commands/explain.h"
40
#include "commands/vacuum.h"
41
#include "foreign/fdwapi.h"
42
#include "foreign/foreign.h"
43
#include "miscadmin.h"
44
#include "nodes/makefuncs.h"
45
#include "optimizer/cost.h"
46
#include "optimizer/pathnode.h"
47
#include "optimizer/paths.h"
48
#include "optimizer/planmain.h"
49
#include "optimizer/restrictinfo.h"
50
#include "optimizer/var.h"
51
#include "utils/memutils.h"
52
#include "utils/rel.h"
53
#include "utils/sampling.h"
54
55
/*  YB includes. */
56
#include "commands/dbcommands.h"
57
#include "catalog/pg_operator.h"
58
#include "catalog/yb_type.h"
59
#include "utils/lsyscache.h"
60
#include "utils/syscache.h"
61
62
#include "yb/yql/pggate/ybc_pggate.h"
63
#include "pg_yb_utils.h"
64
#include "access/yb_scan.h"
65
#include "executor/ybcExpr.h"
66
#include "executor/ybc_fdw.h"
67
68
#include "utils/resowner_private.h"
69
70
/* -------------------------------------------------------------------------- */
71
/*  Planner/Optimizer functions */
72
73
typedef struct YbFdwPlanState
74
{
75
  /* Bitmap of attribute (column) numbers that we need to fetch from YB. */
76
  Bitmapset *target_attrs;
77
78
} YbFdwPlanState;
79
80
/*
81
 * ybcGetForeignRelSize
82
 *    Obtain relation size estimates for a foreign table
83
 */
84
static void
85
ybcGetForeignRelSize(PlannerInfo *root,
86
           RelOptInfo *baserel,
87
           Oid foreigntableid)
88
61.6k
{
89
61.6k
  YbFdwPlanState    *ybc_plan = NULL;
90
91
61.6k
  ybc_plan = (YbFdwPlanState *) palloc0(sizeof(YbFdwPlanState));
92
93
  /* Set the estimate for the total number of rows (tuples) in this table. */
94
61.6k
  if (baserel->tuples == 0)
95
58.5k
    baserel->tuples = YBC_DEFAULT_NUM_ROWS;
96
97
  /*
98
   * Initialize the estimate for the number of rows returned by this query.
99
   * This does not yet take into account the restriction clauses, but it will
100
   * be updated later by ybcIndexCostEstimate once it inspects the clauses.
101
   */
102
61.6k
  baserel->rows = baserel->tuples;
103
104
61.6k
  baserel->fdw_private = ybc_plan;
105
106
  /*
107
   * Test any indexes of rel for applicability also.
108
   */
109
61.6k
  check_index_predicates(root, baserel);
110
61.6k
}
111
112
/*
113
 * ybcGetForeignPaths
114
 *    Create possible access paths for a scan on the foreign table, which is
115
 *      the full table scan plus available index paths (including the  primary key
116
 *      scan path if any).
117
 */
118
static void
119
ybcGetForeignPaths(PlannerInfo *root,
120
           RelOptInfo *baserel,
121
           Oid foreigntableid)
122
61.6k
{
123
61.6k
  Cost startup_cost;
124
61.6k
  Cost total_cost;
125
126
  /* Estimate costs */
127
61.6k
  ybcCostEstimate(baserel, YBC_FULL_SCAN_SELECTIVITY,
128
61.6k
          false /* is_backwards scan */,
129
61.6k
          true /* is_seq_scan */,
130
61.6k
          false /* is_uncovered_idx_scan */,
131
61.6k
          &startup_cost,
132
61.6k
          &total_cost,
133
61.6k
          baserel->reltablespace /* index_tablespace_oid */);
134
135
  /* Create a ForeignPath node and it as the scan path */
136
61.6k
  add_path(baserel,
137
61.6k
       (Path *) create_foreignscan_path(root,
138
61.6k
                        baserel,
139
61.6k
                        NULL, /* default pathtarget */
140
61.6k
                        baserel->rows,
141
61.6k
                        startup_cost,
142
61.6k
                        total_cost,
143
61.6k
                        NIL,  /* no pathkeys */
144
61.6k
                        NULL, /* no outer rel either */
145
61.6k
                        NULL, /* no extra plan */
146
61.6k
                        NULL  /* no options yet */ ));
147
148
  /* Add primary key and secondary index paths also */
149
61.6k
  create_index_paths(root, baserel);
150
61.6k
}
151
152
/*
153
 * ybcGetForeignPlan
154
 *    Create a ForeignScan plan node for scanning the foreign table
155
 */
156
static ForeignScan *
157
ybcGetForeignPlan(PlannerInfo *root,
158
          RelOptInfo *baserel,
159
          Oid foreigntableid,
160
          ForeignPath *best_path,
161
          List *tlist,
162
          List *scan_clauses,
163
          Plan *outer_plan)
164
19.1k
{
165
19.1k
  YbFdwPlanState *yb_plan_state = (YbFdwPlanState *) baserel->fdw_private;
166
19.1k
  Index     scan_relid = baserel->relid;
167
19.1k
  List       *local_clauses = NIL;
168
19.1k
  List       *remote_clauses = NIL;
169
19.1k
  List       *remote_params = NIL;
170
19.1k
  ListCell     *lc;
171
172
19.1k
  scan_clauses = extract_actual_clauses(scan_clauses, false);
173
174
  /*
175
   * Split the expressions in the scan_clauses onto two lists:
176
   * - remote_clauses gets supported expressions to push down to DocDB, and
177
   * - local_clauses gets remaining to evaluate upon returned rows.
178
   * The remote_params list contains data type details of the columns
179
   * referenced by the expressions in the remote_clauses list. DocDB needs it
180
   * to convert row values to Datum/isnull pairs consumable by Postgres
181
   * functions.
182
   * The remote_clauses and remote_params lists are sent with the protobuf
183
   * read request.
184
   */
185
19.1k
  foreach(lc, scan_clauses)
186
20.5k
  {
187
20.5k
    List *params = NIL;
188
20.5k
    Expr *expr = (Expr *) lfirst(lc);
189
20.5k
    if (YbCanPushdownExpr(expr, &params))
190
22
    {
191
22
      remote_clauses = lappend(remote_clauses, expr);
192
22
      remote_params = list_concat(remote_params, params);
193
22
    }
194
20.5k
    else
195
20.5k
    {
196
20.5k
      local_clauses = lappend(local_clauses, expr);
197
20.5k
      list_free_deep(params);
198
20.5k
    }
199
20.5k
  }
200
201
  /* Get the target columns that need to be retrieved from DocDB */
202
19.1k
  foreach(lc, baserel->reltarget->exprs)
203
44.0k
  {
204
44.0k
    Expr *expr = (Expr *) lfirst(lc);
205
44.0k
    pull_varattnos_min_attr((Node *) expr,
206
44.0k
                baserel->relid,
207
44.0k
                &yb_plan_state->target_attrs,
208
44.0k
                baserel->min_attr);
209
44.0k
  }
210
211
  /* Get the target columns that are needed to evaluate local clauses */
212
19.1k
  foreach(lc, local_clauses)
213
20.5k
  {
214
20.5k
    Expr *expr = (Expr *) lfirst(lc);
215
20.5k
    pull_varattnos_min_attr((Node *) expr,
216
20.5k
                baserel->relid,
217
20.5k
                &yb_plan_state->target_attrs,
218
20.5k
                baserel->min_attr);
219
20.5k
  }
220
221
  /* Set scan targets. */
222
19.1k
  List *target_attrs = NULL;
223
19.1k
  bool wholerow = false;
224
257k
  for (AttrNumber attnum = baserel->min_attr; attnum <= baserel->max_attr; attnum++)
225
238k
  {
226
238k
    int bms_idx = attnum - baserel->min_attr + 1;
227
238k
    if (wholerow || bms_is_member(bms_idx, yb_plan_state->target_attrs))
228
45.2k
    {
229
45.2k
      switch (attnum)
230
45.2k
      {
231
202
        case InvalidAttrNumber:
232
          /*
233
           * Postgres repurposes InvalidAttrNumber to represent the "wholerow"
234
           * junk attribute.
235
           */
236
202
          wholerow = true;
237
202
          break;
238
7
        case SelfItemPointerAttributeNumber:
239
7
        case MinTransactionIdAttributeNumber:
240
7
        case MinCommandIdAttributeNumber:
241
7
        case MaxTransactionIdAttributeNumber:
242
7
        case MaxCommandIdAttributeNumber:
243
7
          ereport(ERROR,
244
7
                  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg(
245
7
                      "System column with id %d is not supported yet",
246
7
                      attnum)));
247
7
          break;
248
20
        case TableOidAttributeNumber:
249
          /* Nothing to do in YugaByte: Postgres will handle this. */
250
20
          break;
251
109
        case ObjectIdAttributeNumber:
252
8.95k
        case YBTupleIdAttributeNumber:
253
44.9k
        default: /* Regular column: attrNum > 0*/
254
44.9k
        {
255
44.9k
          TargetEntry *target = makeNode(TargetEntry);
256
44.9k
          target->resno = attnum;
257
44.9k
          target_attrs = lappend(target_attrs, target);
258
44.9k
        }
259
45.2k
      }
260
45.2k
    }
261
238k
  }
262
263
  /* Create the ForeignScan node */
264
19.1k
  return make_foreignscan(tlist,           /* local target list */
265
19.1k
              local_clauses,   /* local qual */
266
19.1k
              scan_relid,
267
19.1k
              target_attrs,    /* referenced attributes */
268
19.1k
              remote_params,   /* fdw_private data (attribute types) */
269
19.1k
              NIL,             /* remote target list (none for now) */
270
19.1k
              remote_clauses,  /* remote qual */
271
19.1k
              outer_plan);
272
19.1k
}
273
274
/* ------------------------------------------------------------------------- */
275
/*  Scanning functions */
276
277
/*
278
 * FDW-specific information for ForeignScanState.fdw_state.
279
 */
280
typedef struct YbFdwExecState
281
{
282
  /* The handle for the internal YB Select statement. */
283
  YBCPgStatement  handle;
284
  YBCPgExecParameters *exec_params; /* execution control parameters for YugaByte */
285
  bool is_exec_done; /* Each statement should be executed exactly one time */
286
} YbFdwExecState;
287
288
/*
289
 * ybcBeginForeignScan
290
 *    Initiate access to the Yugabyte by allocating a Select handle.
291
 */
292
static void
293
ybcBeginForeignScan(ForeignScanState *node, int eflags)
294
19.8k
{
295
19.8k
  EState      *estate      = node->ss.ps.state;
296
19.8k
  Relation    relation     = node->ss.ss_currentRelation;
297
298
19.8k
  YbFdwExecState *ybc_state = NULL;
299
300
  /* Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL. */
301
19.8k
  if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
302
573
    return;
303
304
  /* Allocate and initialize YB scan state. */
305
19.2k
  ybc_state = (YbFdwExecState *) palloc0(sizeof(YbFdwExecState));
306
307
19.2k
  node->fdw_state = (void *) ybc_state;
308
19.2k
  HandleYBStatus(YBCPgNewSelect(YBCGetDatabaseOid(relation),
309
19.2k
           YbGetStorageRelid(relation),
310
19.2k
           NULL /* prepare_params */,
311
19.2k
           &ybc_state->handle));
312
19.2k
  ybc_state->exec_params = &estate->yb_exec_params;
313
314
19.2k
  ybc_state->exec_params->rowmark = -1;
315
19.2k
  if (YBReadFromFollowersEnabled()) {
316
0
    ereport(DEBUG2, (errmsg("Doing read from followers")));
317
0
  }
318
19.2k
  if (XactIsoLevel == XACT_SERIALIZABLE)
319
316
  {
320
    /*
321
     * In case of SERIALIZABLE isolation level we have to take predicate locks to disallow
322
     * INSERTion of new rows that satisfy the query predicate. So, we set the rowmark on all
323
     * read requests sent to tserver instead of locking each tuple one by one in LockRows node.
324
     */
325
316
    ListCell   *l;
326
316
    foreach(l, estate->es_rowMarks) {
327
32
      ExecRowMark *erm = (ExecRowMark *) lfirst(l);
328
      // Do not propagate non-row-locking row marks.
329
32
      if (erm->markType != ROW_MARK_REFERENCE && erm->markType != ROW_MARK_COPY)
330
32
      {
331
32
        ybc_state->exec_params->rowmark = erm->markType;
332
        /*
333
         * TODO(Piyush): We don't honour SKIP LOCKED yet in serializable isolation level.
334
         */
335
32
        ybc_state->exec_params->wait_policy = LockWaitError;
336
32
      }
337
32
      break;
338
32
    }
339
316
  }
340
341
19.2k
  ybc_state->is_exec_done = false;
342
343
  /* Set the current syscatalog version (will check that we are up to date) */
344
19.2k
  HandleYBStatus(YBCPgSetCatalogCacheVersion(ybc_state->handle, yb_catalog_cache_version));
345
19.2k
}
346
347
/*
348
 * ybSetupScanTargets
349
 *    Add the target expressions to the DocDB statement.
350
 *    Currently target are either all column references or all aggregates.
351
 *    We do not push down target expressions yet.
352
 */
353
static void
354
ybcSetupScanTargets(ForeignScanState *node)
355
18.5k
{
356
18.5k
  EState *estate = node->ss.ps.state;
357
18.5k
  ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
358
18.5k
  Relation relation = node->ss.ss_currentRelation;
359
18.5k
  YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state;
360
18.5k
  TupleDesc tupdesc = RelationGetDescr(relation);
361
18.5k
  ListCell *lc;
362
363
  /* Planning function above should ensure target list is set */
364
18.5k
  List *target_attrs = foreignScan->fdw_exprs;
365
366
18.5k
  MemoryContext oldcontext =
367
18.5k
    MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory);
368
369
  /* Set scan targets. */
370
18.5k
  if (node->yb_fdw_aggs == NIL)
371
18.3k
  {
372
    /* Set non-aggregate column targets. */
373
18.3k
    bool has_targets = false;
374
18.3k
    foreach(lc, target_attrs)
375
43.4k
    {
376
43.4k
      TargetEntry *target = (TargetEntry *) lfirst(lc);
377
378
      /* For regular (non-system) attribute check if they were deleted */
379
43.4k
      Oid   attr_typid  = InvalidOid;
380
43.4k
      Oid   attr_collation = InvalidOid;
381
43.4k
      int32 attr_typmod = 0;
382
43.4k
      if (target->resno > 0)
383
34.5k
      {
384
34.5k
        Form_pg_attribute attr;
385
34.5k
        attr = TupleDescAttr(tupdesc, target->resno - 1);
386
        /* Ignore dropped attributes */
387
34.5k
        if (attr->attisdropped)
388
3
        {
389
3
          continue;
390
3
        }
391
34.5k
        attr_typid  = attr->atttypid;
392
34.5k
        attr_typmod = attr->atttypmod;
393
34.5k
        attr_collation = attr->attcollation;
394
34.5k
      }
395
396
43.4k
      YBCPgTypeAttrs type_attrs = {attr_typmod};
397
43.4k
      YBCPgExpr      expr       = YBCNewColumnRef(ybc_state->handle,
398
43.4k
                            target->resno,
399
43.4k
                            attr_typid,
400
43.4k
                            attr_collation,
401
43.4k
                            &type_attrs);
402
43.4k
      HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, expr));
403
43.4k
      has_targets = true;
404
43.4k
    }
405
406
    /*
407
     * We can have no target columns at this point for e.g. a count(*). For now
408
     * we request the first non-dropped column in that case.
409
     * TODO look into handling this on YugaByte side.
410
     */
411
18.3k
    if (!has_targets)
412
14
    {
413
14
      for (int16_t i = 0; i < tupdesc->natts; i++)
414
14
      {
415
        /* Ignore dropped attributes */
416
14
        if (TupleDescAttr(tupdesc, i)->attisdropped)
417
0
        {
418
0
          continue;
419
0
        }
420
421
14
        YBCPgTypeAttrs type_attrs = { TupleDescAttr(tupdesc, i)->atttypmod };
422
14
        YBCPgExpr      expr       = YBCNewColumnRef(ybc_state->handle,
423
14
                              i + 1,
424
14
                              TupleDescAttr(tupdesc, i)->atttypid,
425
14
                              TupleDescAttr(tupdesc, i)->attcollation,
426
14
                              &type_attrs);
427
14
        HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, expr));
428
14
        break;
429
14
      }
430
14
    }
431
18.3k
  }
432
185
  else
433
185
  {
434
    /* Set aggregate scan targets. */
435
185
    foreach(lc, node->yb_fdw_aggs)
436
221
    {
437
221
      Aggref *aggref = lfirst_node(Aggref, lc);
438
221
      char *func_name = get_func_name(aggref->aggfnoid);
439
221
      ListCell *lc_arg;
440
221
      YBCPgExpr op_handle;
441
221
      const YBCPgTypeEntity *type_entity;
442
443
      /* Get type entity for the operator from the aggref. */
444
221
      type_entity = YbDataTypeFromOidMod(InvalidAttrNumber, aggref->aggtranstype);
445
446
      /* Create operator. */
447
221
      HandleYBStatus(YBCPgNewOperator(ybc_state->handle, func_name, type_entity, aggref->aggcollid, &op_handle));
448
449
      /* Handle arguments. */
450
221
      if (aggref->aggstar) {
451
        /*
452
         * Add dummy argument for COUNT(*) case, turning it into COUNT(0).
453
         * We don't use a column reference as we want to count rows
454
         * even if all column values are NULL.
455
         */
456
126
        YBCPgExpr const_handle;
457
126
        HandleYBStatus(YBCPgNewConstant(ybc_state->handle,
458
126
                 type_entity,
459
126
                 false /* collate_is_valid_non_c */,
460
126
                 NULL /* collation_sortkey */,
461
126
                 0 /* datum */,
462
126
                 false /* is_null */,
463
126
                 &const_handle));
464
126
        HandleYBStatus(YBCPgOperatorAppendArg(op_handle, const_handle));
465
95
      } else {
466
        /* Add aggregate arguments to operator. */
467
95
        foreach(lc_arg, aggref->args)
468
95
        {
469
95
          TargetEntry *tle = lfirst_node(TargetEntry, lc_arg);
470
95
          if (IsA(tle->expr, Const))
471
13
          {
472
13
            Const* const_node = castNode(Const, tle->expr);
473
            /* Already checked by yb_agg_pushdown_supported */
474
13
            Assert(const_node->constisnull || const_node->constbyval);
475
476
13
            YBCPgExpr const_handle;
477
13
            HandleYBStatus(YBCPgNewConstant(ybc_state->handle,
478
13
                     type_entity,
479
13
                     false /* collate_is_valid_non_c */,
480
13
                     NULL /* collation_sortkey */,
481
13
                     const_node->constvalue,
482
13
                     const_node->constisnull,
483
13
                     &const_handle));
484
13
            HandleYBStatus(YBCPgOperatorAppendArg(op_handle, const_handle));
485
13
          }
486
82
          else if (IsA(tle->expr, Var))
487
82
          {
488
            /*
489
             * Use original attribute number (varoattno) instead of projected one (varattno)
490
             * as projection is disabled for tuples produced by pushed down operators.
491
             */
492
82
            int attno = castNode(Var, tle->expr)->varoattno;
493
82
            Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1);
494
82
            YBCPgTypeAttrs type_attrs = {attr->atttypmod};
495
496
82
            YBCPgExpr arg = YBCNewColumnRef(ybc_state->handle,
497
82
                            attno,
498
82
                            attr->atttypid,
499
82
                            attr->attcollation,
500
82
                            &type_attrs);
501
82
            HandleYBStatus(YBCPgOperatorAppendArg(op_handle, arg));
502
82
          }
503
0
          else
504
0
          {
505
            /* Should never happen. */
506
0
            ereport(ERROR,
507
0
                (errcode(ERRCODE_INTERNAL_ERROR),
508
0
                 errmsg("unsupported aggregate function argument type")));
509
0
          }
510
95
        }
511
95
      }
512
513
      /* Add aggregate operator as scan target. */
514
221
      HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, op_handle));
515
221
    }
516
517
    /*
518
     * Setup the scan slot based on new tuple descriptor for the given targets. This is a dummy
519
     * tupledesc that only includes the number of attributes. Switch to per-query memory from
520
     * per-tuple memory so the slot persists across iterations.
521
     */
522
185
    TupleDesc target_tupdesc = CreateTemplateTupleDesc(list_length(node->yb_fdw_aggs),
523
185
                               false /* hasoid */);
524
185
    ExecInitScanTupleSlot(estate, &node->ss, target_tupdesc);
525
185
  }
526
18.5k
  MemoryContextSwitchTo(oldcontext);
527
18.5k
}
528
529
/*
530
 * ybSetupScanQual
531
 *    Add the pushable qual expressions to the DocDB statement.
532
 */
533
static void
534
ybSetupScanQual(ForeignScanState *node)
535
18.5k
{
536
18.5k
  EState     *estate = node->ss.ps.state;
537
18.5k
  ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
538
18.5k
  YbFdwExecState *yb_state = (YbFdwExecState *) node->fdw_state;
539
18.5k
  List     *qual = foreignScan->fdw_recheck_quals;
540
18.5k
  ListCell   *lc;
541
542
18.5k
  MemoryContext oldcontext =
543
18.5k
    MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory);
544
545
18.5k
  foreach(lc, qual)
546
22
  {
547
22
    Expr *expr = (Expr *) lfirst(lc);
548
    /*
549
     * Some expressions may be parametrized, obviously remote end can not
550
     * acccess the estate to get parameter values, so param references
551
     * are replaced with constant expressions.
552
     */
553
22
    expr = YbExprInstantiateParams(expr, estate->es_param_list_info);
554
    /* Create new PgExpr wrapper for the expression */
555
22
    YBCPgExpr yb_expr = YBCNewEvalExprCall(yb_state->handle, expr);
556
    /* Add the PgExpr to the statement */
557
22
    HandleYBStatus(YbPgDmlAppendQual(yb_state->handle, yb_expr));
558
22
  }
559
560
18.5k
  MemoryContextSwitchTo(oldcontext);
561
18.5k
}
562
563
/*
564
 * ybSetupScanColumnRefs
565
 *    Add the column references to the DocDB statement.
566
 */
567
static void
568
ybSetupScanColumnRefs(ForeignScanState *node)
569
18.5k
{
570
18.5k
  ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
571
18.5k
  YbFdwExecState *yb_state = (YbFdwExecState *) node->fdw_state;
572
18.5k
  List     *params = foreignScan->fdw_private;
573
18.5k
  ListCell   *lc;
574
575
18.5k
  MemoryContext oldcontext =
576
18.5k
    MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory);
577
578
18.5k
  foreach(lc, params)
579
22
  {
580
22
    YbExprParamDesc *param = (YbExprParamDesc *) lfirst(lc);
581
22
    YBCPgTypeAttrs type_attrs = { param->typmod };
582
    /* Create new PgExpr wrapper for the column reference */
583
22
    YBCPgExpr yb_expr = YBCNewColumnRef(yb_state->handle,
584
22
                      param->attno,
585
22
                      param->typid,
586
22
                      param->collid,
587
22
                      &type_attrs);
588
    /* Add the PgExpr to the statement */
589
22
    HandleYBStatus(YbPgDmlAppendColumnRef(yb_state->handle, yb_expr));
590
22
  }
591
592
18.5k
  MemoryContextSwitchTo(oldcontext);
593
18.5k
}
594
595
/*
596
 * ybcIterateForeignScan
597
 *    Read next record from the data file and store it into the
598
 *    ScanTupleSlot as a virtual tuple
599
 */
600
static TupleTableSlot *
601
ybcIterateForeignScan(ForeignScanState *node)
602
5.45M
{
603
5.45M
  TupleTableSlot *slot;
604
5.45M
  YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state;
605
5.45M
  bool           has_data   = false;
606
607
  /* Execute the select statement one time.
608
   * TODO(neil) Check whether YugaByte PgGate should combine Exec() and Fetch() into one function.
609
   * - The first fetch from YugaByte PgGate requires a number of operations including allocating
610
   *   operators and protobufs. These operations are done by YBCPgExecSelect() function.
611
   * - The subsequent fetches don't need to setup the query with these operations again.
612
   */
613
5.45M
  if (!ybc_state->is_exec_done) {
614
18.5k
    ybcSetupScanTargets(node);
615
18.5k
    ybSetupScanQual(node);
616
18.5k
    ybSetupScanColumnRefs(node);
617
18.5k
    HandleYBStatus(YBCPgExecSelect(ybc_state->handle, ybc_state->exec_params));
618
18.5k
    ybc_state->is_exec_done = true;
619
18.5k
  }
620
621
  /* Clear tuple slot before starting */
622
5.45M
  slot = node->ss.ss_ScanTupleSlot;
623
5.45M
  ExecClearTuple(slot);
624
625
5.45M
  TupleDesc       tupdesc = slot->tts_tupleDescriptor;
626
5.45M
  Datum           *values = slot->tts_values;
627
5.45M
  bool            *isnull = slot->tts_isnull;
628
5.45M
  YBCPgSysColumns syscols;
629
630
  /* Fetch one row. */
631
5.45M
  HandleYBStatus(YBCPgDmlFetch(ybc_state->handle,
632
5.45M
                               tupdesc->natts,
633
5.45M
                               (uint64_t *) values,
634
5.45M
                               isnull,
635
5.45M
                               &syscols,
636
5.45M
                               &has_data));
637
638
  /* If we have result(s) update the tuple slot. */
639
5.45M
  if (has_data)
640
5.44M
  {
641
5.44M
    if (node->yb_fdw_aggs == NIL)
642
5.44M
    {
643
5.44M
      HeapTuple tuple = heap_form_tuple(tupdesc, values, isnull);
644
5.44M
      if (syscols.oid != InvalidOid)
645
20.8k
      {
646
20.8k
        HeapTupleSetOid(tuple, syscols.oid);
647
20.8k
      }
648
649
5.44M
      slot = ExecStoreHeapTuple(tuple, slot, false);
650
651
      /* Setup special columns in the slot */
652
5.44M
      slot->tts_ybctid = PointerGetDatum(syscols.ybctid);
653
5.44M
    }
654
345
    else
655
345
    {
656
      /*
657
       * Aggregate results stored in virtual slot (no tuple). Set the
658
       * number of valid values and mark as non-empty.
659
       */
660
345
      slot->tts_nvalid = tupdesc->natts;
661
345
      slot->tts_isempty = false;
662
345
    }
663
5.44M
  }
664
665
5.45M
  return slot;
666
5.45M
}
667
668
static void
669
ybcFreeStatementObject(YbFdwExecState* yb_fdw_exec_state)
670
9.11k
{
671
  /* If yb_fdw_exec_state is NULL, we are in EXPLAIN; nothing to do */
672
9.11k
  if (yb_fdw_exec_state != NULL && yb_fdw_exec_state->handle != NULL)
673
8.54k
  {
674
8.54k
    YBCPgDeleteStatement(yb_fdw_exec_state->handle);
675
8.54k
    yb_fdw_exec_state->handle = NULL;
676
8.54k
    yb_fdw_exec_state->exec_params = NULL;
677
8.54k
    yb_fdw_exec_state->is_exec_done = false;
678
8.54k
  }
679
9.11k
}
680
681
/*
682
 * fileReScanForeignScan
683
 *    Rescan table, possibly with new parameters
684
 */
685
static void
686
ybcReScanForeignScan(ForeignScanState *node)
687
800
{
688
800
  YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state;
689
690
  /* Clear (delete) the previous select */
691
800
  ybcFreeStatementObject(ybc_state);
692
693
  /* Re-allocate and execute the select. */
694
800
  ybcBeginForeignScan(node, 0 /* eflags */);
695
800
}
696
697
/*
698
 * ybcEndForeignScan
699
 *    Finish scanning foreign table and dispose objects used for this scan
700
 */
701
static void
702
ybcEndForeignScan(ForeignScanState *node)
703
8.31k
{
704
8.31k
  YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state;
705
8.31k
  ybcFreeStatementObject(ybc_state);
706
8.31k
}
707
708
/* ------------------------------------------------------------------------- */
709
/*  FDW declaration */
710
711
/*
712
 * Foreign-data wrapper handler function: return a struct with pointers
713
 * to YugaByte callback routines.
714
 */
715
Datum
716
ybc_fdw_handler()
717
65.2k
{
718
65.2k
  FdwRoutine *fdwroutine = makeNode(FdwRoutine);
719
720
65.2k
  fdwroutine->GetForeignRelSize  = ybcGetForeignRelSize;
721
65.2k
  fdwroutine->GetForeignPaths    = ybcGetForeignPaths;
722
65.2k
  fdwroutine->GetForeignPlan     = ybcGetForeignPlan;
723
65.2k
  fdwroutine->BeginForeignScan   = ybcBeginForeignScan;
724
65.2k
  fdwroutine->IterateForeignScan = ybcIterateForeignScan;
725
65.2k
  fdwroutine->ReScanForeignScan  = ybcReScanForeignScan;
726
65.2k
  fdwroutine->EndForeignScan     = ybcEndForeignScan;
727
728
  /* TODO: These are optional but we should support them eventually. */
729
  /* fdwroutine->ExplainForeignScan = ybcExplainForeignScan; */
730
  /* fdwroutine->AnalyzeForeignTable = ybcAnalyzeForeignTable; */
731
  /* fdwroutine->IsForeignScanParallelSafe = ybcIsForeignScanParallelSafe; */
732
733
65.2k
  PG_RETURN_POINTER(fdwroutine);
734
65.2k
}