/Users/deen/code/yugabyte-db/src/postgres/src/backend/executor/ybc_fdw.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*-------------------------------------------------------------------------------------------------- |
2 | | * |
3 | | * ybc_fdw.c |
4 | | * Foreign-data wrapper for YugabyteDB. |
5 | | * |
6 | | * Copyright (c) YugaByte, Inc. |
7 | | * |
8 | | * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
9 | | * in compliance with the License. You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software distributed under the License |
14 | | * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
15 | | * or implied. See the License for the specific language governing permissions and limitations |
16 | | * under the License. |
17 | | * |
18 | | * IDENTIFICATION |
19 | | * src/backend/executor/ybc_fdw.c |
20 | | * |
21 | | *-------------------------------------------------------------------------------------------------- |
22 | | */ |
23 | | |
24 | | #include "postgres.h" |
25 | | |
26 | | #include <sys/stat.h> |
27 | | #include <unistd.h> |
28 | | #include <string.h> |
29 | | |
30 | | /* TODO see which includes of this block are still needed. */ |
31 | | #include "access/htup_details.h" |
32 | | #include "access/reloptions.h" |
33 | | #include "access/sysattr.h" |
34 | | #include "access/xact.h" |
35 | | #include "catalog/catalog.h" |
36 | | #include "catalog/pg_foreign_table.h" |
37 | | #include "commands/copy.h" |
38 | | #include "commands/defrem.h" |
39 | | #include "commands/explain.h" |
40 | | #include "commands/vacuum.h" |
41 | | #include "foreign/fdwapi.h" |
42 | | #include "foreign/foreign.h" |
43 | | #include "miscadmin.h" |
44 | | #include "nodes/makefuncs.h" |
45 | | #include "optimizer/cost.h" |
46 | | #include "optimizer/pathnode.h" |
47 | | #include "optimizer/paths.h" |
48 | | #include "optimizer/planmain.h" |
49 | | #include "optimizer/restrictinfo.h" |
50 | | #include "optimizer/var.h" |
51 | | #include "utils/memutils.h" |
52 | | #include "utils/rel.h" |
53 | | #include "utils/sampling.h" |
54 | | |
55 | | /* YB includes. */ |
56 | | #include "commands/dbcommands.h" |
57 | | #include "catalog/pg_operator.h" |
58 | | #include "catalog/yb_type.h" |
59 | | #include "utils/lsyscache.h" |
60 | | #include "utils/syscache.h" |
61 | | |
62 | | #include "yb/yql/pggate/ybc_pggate.h" |
63 | | #include "pg_yb_utils.h" |
64 | | #include "access/yb_scan.h" |
65 | | #include "executor/ybcExpr.h" |
66 | | #include "executor/ybc_fdw.h" |
67 | | |
68 | | #include "utils/resowner_private.h" |
69 | | |
70 | | /* -------------------------------------------------------------------------- */ |
71 | | /* Planner/Optimizer functions */ |
72 | | |
73 | | typedef struct YbFdwPlanState |
74 | | { |
75 | | /* Bitmap of attribute (column) numbers that we need to fetch from YB. */ |
76 | | Bitmapset *target_attrs; |
77 | | |
78 | | } YbFdwPlanState; |
79 | | |
80 | | /* |
81 | | * ybcGetForeignRelSize |
82 | | * Obtain relation size estimates for a foreign table |
83 | | */ |
84 | | static void |
85 | | ybcGetForeignRelSize(PlannerInfo *root, |
86 | | RelOptInfo *baserel, |
87 | | Oid foreigntableid) |
88 | 61.6k | { |
89 | 61.6k | YbFdwPlanState *ybc_plan = NULL; |
90 | | |
91 | 61.6k | ybc_plan = (YbFdwPlanState *) palloc0(sizeof(YbFdwPlanState)); |
92 | | |
93 | | /* Set the estimate for the total number of rows (tuples) in this table. */ |
94 | 61.6k | if (baserel->tuples == 0) |
95 | 58.5k | baserel->tuples = YBC_DEFAULT_NUM_ROWS; |
96 | | |
97 | | /* |
98 | | * Initialize the estimate for the number of rows returned by this query. |
99 | | * This does not yet take into account the restriction clauses, but it will |
100 | | * be updated later by ybcIndexCostEstimate once it inspects the clauses. |
101 | | */ |
102 | 61.6k | baserel->rows = baserel->tuples; |
103 | | |
104 | 61.6k | baserel->fdw_private = ybc_plan; |
105 | | |
106 | | /* |
107 | | * Test any indexes of rel for applicability also. |
108 | | */ |
109 | 61.6k | check_index_predicates(root, baserel); |
110 | 61.6k | } |
111 | | |
112 | | /* |
113 | | * ybcGetForeignPaths |
114 | | * Create possible access paths for a scan on the foreign table, which is |
115 | | * the full table scan plus available index paths (including the primary key |
116 | | * scan path if any). |
117 | | */ |
118 | | static void |
119 | | ybcGetForeignPaths(PlannerInfo *root, |
120 | | RelOptInfo *baserel, |
121 | | Oid foreigntableid) |
122 | 61.6k | { |
123 | 61.6k | Cost startup_cost; |
124 | 61.6k | Cost total_cost; |
125 | | |
126 | | /* Estimate costs */ |
127 | 61.6k | ybcCostEstimate(baserel, YBC_FULL_SCAN_SELECTIVITY, |
128 | 61.6k | false /* is_backwards scan */, |
129 | 61.6k | true /* is_seq_scan */, |
130 | 61.6k | false /* is_uncovered_idx_scan */, |
131 | 61.6k | &startup_cost, |
132 | 61.6k | &total_cost, |
133 | 61.6k | baserel->reltablespace /* index_tablespace_oid */); |
134 | | |
135 | | /* Create a ForeignPath node and it as the scan path */ |
136 | 61.6k | add_path(baserel, |
137 | 61.6k | (Path *) create_foreignscan_path(root, |
138 | 61.6k | baserel, |
139 | 61.6k | NULL, /* default pathtarget */ |
140 | 61.6k | baserel->rows, |
141 | 61.6k | startup_cost, |
142 | 61.6k | total_cost, |
143 | 61.6k | NIL, /* no pathkeys */ |
144 | 61.6k | NULL, /* no outer rel either */ |
145 | 61.6k | NULL, /* no extra plan */ |
146 | 61.6k | NULL /* no options yet */ )); |
147 | | |
148 | | /* Add primary key and secondary index paths also */ |
149 | 61.6k | create_index_paths(root, baserel); |
150 | 61.6k | } |
151 | | |
152 | | /* |
153 | | * ybcGetForeignPlan |
154 | | * Create a ForeignScan plan node for scanning the foreign table |
155 | | */ |
156 | | static ForeignScan * |
157 | | ybcGetForeignPlan(PlannerInfo *root, |
158 | | RelOptInfo *baserel, |
159 | | Oid foreigntableid, |
160 | | ForeignPath *best_path, |
161 | | List *tlist, |
162 | | List *scan_clauses, |
163 | | Plan *outer_plan) |
164 | 19.1k | { |
165 | 19.1k | YbFdwPlanState *yb_plan_state = (YbFdwPlanState *) baserel->fdw_private; |
166 | 19.1k | Index scan_relid = baserel->relid; |
167 | 19.1k | List *local_clauses = NIL; |
168 | 19.1k | List *remote_clauses = NIL; |
169 | 19.1k | List *remote_params = NIL; |
170 | 19.1k | ListCell *lc; |
171 | | |
172 | 19.1k | scan_clauses = extract_actual_clauses(scan_clauses, false); |
173 | | |
174 | | /* |
175 | | * Split the expressions in the scan_clauses onto two lists: |
176 | | * - remote_clauses gets supported expressions to push down to DocDB, and |
177 | | * - local_clauses gets remaining to evaluate upon returned rows. |
178 | | * The remote_params list contains data type details of the columns |
179 | | * referenced by the expressions in the remote_clauses list. DocDB needs it |
180 | | * to convert row values to Datum/isnull pairs consumable by Postgres |
181 | | * functions. |
182 | | * The remote_clauses and remote_params lists are sent with the protobuf |
183 | | * read request. |
184 | | */ |
185 | 19.1k | foreach(lc, scan_clauses) |
186 | 20.5k | { |
187 | 20.5k | List *params = NIL; |
188 | 20.5k | Expr *expr = (Expr *) lfirst(lc); |
189 | 20.5k | if (YbCanPushdownExpr(expr, ¶ms)) |
190 | 22 | { |
191 | 22 | remote_clauses = lappend(remote_clauses, expr); |
192 | 22 | remote_params = list_concat(remote_params, params); |
193 | 22 | } |
194 | 20.5k | else |
195 | 20.5k | { |
196 | 20.5k | local_clauses = lappend(local_clauses, expr); |
197 | 20.5k | list_free_deep(params); |
198 | 20.5k | } |
199 | 20.5k | } |
200 | | |
201 | | /* Get the target columns that need to be retrieved from DocDB */ |
202 | 19.1k | foreach(lc, baserel->reltarget->exprs) |
203 | 44.0k | { |
204 | 44.0k | Expr *expr = (Expr *) lfirst(lc); |
205 | 44.0k | pull_varattnos_min_attr((Node *) expr, |
206 | 44.0k | baserel->relid, |
207 | 44.0k | &yb_plan_state->target_attrs, |
208 | 44.0k | baserel->min_attr); |
209 | 44.0k | } |
210 | | |
211 | | /* Get the target columns that are needed to evaluate local clauses */ |
212 | 19.1k | foreach(lc, local_clauses) |
213 | 20.5k | { |
214 | 20.5k | Expr *expr = (Expr *) lfirst(lc); |
215 | 20.5k | pull_varattnos_min_attr((Node *) expr, |
216 | 20.5k | baserel->relid, |
217 | 20.5k | &yb_plan_state->target_attrs, |
218 | 20.5k | baserel->min_attr); |
219 | 20.5k | } |
220 | | |
221 | | /* Set scan targets. */ |
222 | 19.1k | List *target_attrs = NULL; |
223 | 19.1k | bool wholerow = false; |
224 | 257k | for (AttrNumber attnum = baserel->min_attr; attnum <= baserel->max_attr; attnum++) |
225 | 238k | { |
226 | 238k | int bms_idx = attnum - baserel->min_attr + 1; |
227 | 238k | if (wholerow || bms_is_member(bms_idx, yb_plan_state->target_attrs)) |
228 | 45.2k | { |
229 | 45.2k | switch (attnum) |
230 | 45.2k | { |
231 | 202 | case InvalidAttrNumber: |
232 | | /* |
233 | | * Postgres repurposes InvalidAttrNumber to represent the "wholerow" |
234 | | * junk attribute. |
235 | | */ |
236 | 202 | wholerow = true; |
237 | 202 | break; |
238 | 7 | case SelfItemPointerAttributeNumber: |
239 | 7 | case MinTransactionIdAttributeNumber: |
240 | 7 | case MinCommandIdAttributeNumber: |
241 | 7 | case MaxTransactionIdAttributeNumber: |
242 | 7 | case MaxCommandIdAttributeNumber: |
243 | 7 | ereport(ERROR, |
244 | 7 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( |
245 | 7 | "System column with id %d is not supported yet", |
246 | 7 | attnum))); |
247 | 7 | break; |
248 | 20 | case TableOidAttributeNumber: |
249 | | /* Nothing to do in YugaByte: Postgres will handle this. */ |
250 | 20 | break; |
251 | 109 | case ObjectIdAttributeNumber: |
252 | 8.95k | case YBTupleIdAttributeNumber: |
253 | 44.9k | default: /* Regular column: attrNum > 0*/ |
254 | 44.9k | { |
255 | 44.9k | TargetEntry *target = makeNode(TargetEntry); |
256 | 44.9k | target->resno = attnum; |
257 | 44.9k | target_attrs = lappend(target_attrs, target); |
258 | 44.9k | } |
259 | 45.2k | } |
260 | 45.2k | } |
261 | 238k | } |
262 | | |
263 | | /* Create the ForeignScan node */ |
264 | 19.1k | return make_foreignscan(tlist, /* local target list */ |
265 | 19.1k | local_clauses, /* local qual */ |
266 | 19.1k | scan_relid, |
267 | 19.1k | target_attrs, /* referenced attributes */ |
268 | 19.1k | remote_params, /* fdw_private data (attribute types) */ |
269 | 19.1k | NIL, /* remote target list (none for now) */ |
270 | 19.1k | remote_clauses, /* remote qual */ |
271 | 19.1k | outer_plan); |
272 | 19.1k | } |
273 | | |
274 | | /* ------------------------------------------------------------------------- */ |
275 | | /* Scanning functions */ |
276 | | |
277 | | /* |
278 | | * FDW-specific information for ForeignScanState.fdw_state. |
279 | | */ |
280 | | typedef struct YbFdwExecState |
281 | | { |
282 | | /* The handle for the internal YB Select statement. */ |
283 | | YBCPgStatement handle; |
284 | | YBCPgExecParameters *exec_params; /* execution control parameters for YugaByte */ |
285 | | bool is_exec_done; /* Each statement should be executed exactly one time */ |
286 | | } YbFdwExecState; |
287 | | |
288 | | /* |
289 | | * ybcBeginForeignScan |
290 | | * Initiate access to the Yugabyte by allocating a Select handle. |
291 | | */ |
292 | | static void |
293 | | ybcBeginForeignScan(ForeignScanState *node, int eflags) |
294 | 19.8k | { |
295 | 19.8k | EState *estate = node->ss.ps.state; |
296 | 19.8k | Relation relation = node->ss.ss_currentRelation; |
297 | | |
298 | 19.8k | YbFdwExecState *ybc_state = NULL; |
299 | | |
300 | | /* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ |
301 | 19.8k | if (eflags & EXEC_FLAG_EXPLAIN_ONLY) |
302 | 573 | return; |
303 | | |
304 | | /* Allocate and initialize YB scan state. */ |
305 | 19.2k | ybc_state = (YbFdwExecState *) palloc0(sizeof(YbFdwExecState)); |
306 | | |
307 | 19.2k | node->fdw_state = (void *) ybc_state; |
308 | 19.2k | HandleYBStatus(YBCPgNewSelect(YBCGetDatabaseOid(relation), |
309 | 19.2k | YbGetStorageRelid(relation), |
310 | 19.2k | NULL /* prepare_params */, |
311 | 19.2k | &ybc_state->handle)); |
312 | 19.2k | ybc_state->exec_params = &estate->yb_exec_params; |
313 | | |
314 | 19.2k | ybc_state->exec_params->rowmark = -1; |
315 | 19.2k | if (YBReadFromFollowersEnabled()) { |
316 | 0 | ereport(DEBUG2, (errmsg("Doing read from followers"))); |
317 | 0 | } |
318 | 19.2k | if (XactIsoLevel == XACT_SERIALIZABLE) |
319 | 316 | { |
320 | | /* |
321 | | * In case of SERIALIZABLE isolation level we have to take predicate locks to disallow |
322 | | * INSERTion of new rows that satisfy the query predicate. So, we set the rowmark on all |
323 | | * read requests sent to tserver instead of locking each tuple one by one in LockRows node. |
324 | | */ |
325 | 316 | ListCell *l; |
326 | 316 | foreach(l, estate->es_rowMarks) { |
327 | 32 | ExecRowMark *erm = (ExecRowMark *) lfirst(l); |
328 | | // Do not propagate non-row-locking row marks. |
329 | 32 | if (erm->markType != ROW_MARK_REFERENCE && erm->markType != ROW_MARK_COPY) |
330 | 32 | { |
331 | 32 | ybc_state->exec_params->rowmark = erm->markType; |
332 | | /* |
333 | | * TODO(Piyush): We don't honour SKIP LOCKED yet in serializable isolation level. |
334 | | */ |
335 | 32 | ybc_state->exec_params->wait_policy = LockWaitError; |
336 | 32 | } |
337 | 32 | break; |
338 | 32 | } |
339 | 316 | } |
340 | | |
341 | 19.2k | ybc_state->is_exec_done = false; |
342 | | |
343 | | /* Set the current syscatalog version (will check that we are up to date) */ |
344 | 19.2k | HandleYBStatus(YBCPgSetCatalogCacheVersion(ybc_state->handle, yb_catalog_cache_version)); |
345 | 19.2k | } |
346 | | |
347 | | /* |
348 | | * ybSetupScanTargets |
349 | | * Add the target expressions to the DocDB statement. |
350 | | * Currently target are either all column references or all aggregates. |
351 | | * We do not push down target expressions yet. |
352 | | */ |
353 | | static void |
354 | | ybcSetupScanTargets(ForeignScanState *node) |
355 | 18.5k | { |
356 | 18.5k | EState *estate = node->ss.ps.state; |
357 | 18.5k | ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; |
358 | 18.5k | Relation relation = node->ss.ss_currentRelation; |
359 | 18.5k | YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state; |
360 | 18.5k | TupleDesc tupdesc = RelationGetDescr(relation); |
361 | 18.5k | ListCell *lc; |
362 | | |
363 | | /* Planning function above should ensure target list is set */ |
364 | 18.5k | List *target_attrs = foreignScan->fdw_exprs; |
365 | | |
366 | 18.5k | MemoryContext oldcontext = |
367 | 18.5k | MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory); |
368 | | |
369 | | /* Set scan targets. */ |
370 | 18.5k | if (node->yb_fdw_aggs == NIL) |
371 | 18.3k | { |
372 | | /* Set non-aggregate column targets. */ |
373 | 18.3k | bool has_targets = false; |
374 | 18.3k | foreach(lc, target_attrs) |
375 | 43.4k | { |
376 | 43.4k | TargetEntry *target = (TargetEntry *) lfirst(lc); |
377 | | |
378 | | /* For regular (non-system) attribute check if they were deleted */ |
379 | 43.4k | Oid attr_typid = InvalidOid; |
380 | 43.4k | Oid attr_collation = InvalidOid; |
381 | 43.4k | int32 attr_typmod = 0; |
382 | 43.4k | if (target->resno > 0) |
383 | 34.5k | { |
384 | 34.5k | Form_pg_attribute attr; |
385 | 34.5k | attr = TupleDescAttr(tupdesc, target->resno - 1); |
386 | | /* Ignore dropped attributes */ |
387 | 34.5k | if (attr->attisdropped) |
388 | 3 | { |
389 | 3 | continue; |
390 | 3 | } |
391 | 34.5k | attr_typid = attr->atttypid; |
392 | 34.5k | attr_typmod = attr->atttypmod; |
393 | 34.5k | attr_collation = attr->attcollation; |
394 | 34.5k | } |
395 | | |
396 | 43.4k | YBCPgTypeAttrs type_attrs = {attr_typmod}; |
397 | 43.4k | YBCPgExpr expr = YBCNewColumnRef(ybc_state->handle, |
398 | 43.4k | target->resno, |
399 | 43.4k | attr_typid, |
400 | 43.4k | attr_collation, |
401 | 43.4k | &type_attrs); |
402 | 43.4k | HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, expr)); |
403 | 43.4k | has_targets = true; |
404 | 43.4k | } |
405 | | |
406 | | /* |
407 | | * We can have no target columns at this point for e.g. a count(*). For now |
408 | | * we request the first non-dropped column in that case. |
409 | | * TODO look into handling this on YugaByte side. |
410 | | */ |
411 | 18.3k | if (!has_targets) |
412 | 14 | { |
413 | 14 | for (int16_t i = 0; i < tupdesc->natts; i++) |
414 | 14 | { |
415 | | /* Ignore dropped attributes */ |
416 | 14 | if (TupleDescAttr(tupdesc, i)->attisdropped) |
417 | 0 | { |
418 | 0 | continue; |
419 | 0 | } |
420 | | |
421 | 14 | YBCPgTypeAttrs type_attrs = { TupleDescAttr(tupdesc, i)->atttypmod }; |
422 | 14 | YBCPgExpr expr = YBCNewColumnRef(ybc_state->handle, |
423 | 14 | i + 1, |
424 | 14 | TupleDescAttr(tupdesc, i)->atttypid, |
425 | 14 | TupleDescAttr(tupdesc, i)->attcollation, |
426 | 14 | &type_attrs); |
427 | 14 | HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, expr)); |
428 | 14 | break; |
429 | 14 | } |
430 | 14 | } |
431 | 18.3k | } |
432 | 185 | else |
433 | 185 | { |
434 | | /* Set aggregate scan targets. */ |
435 | 185 | foreach(lc, node->yb_fdw_aggs) |
436 | 221 | { |
437 | 221 | Aggref *aggref = lfirst_node(Aggref, lc); |
438 | 221 | char *func_name = get_func_name(aggref->aggfnoid); |
439 | 221 | ListCell *lc_arg; |
440 | 221 | YBCPgExpr op_handle; |
441 | 221 | const YBCPgTypeEntity *type_entity; |
442 | | |
443 | | /* Get type entity for the operator from the aggref. */ |
444 | 221 | type_entity = YbDataTypeFromOidMod(InvalidAttrNumber, aggref->aggtranstype); |
445 | | |
446 | | /* Create operator. */ |
447 | 221 | HandleYBStatus(YBCPgNewOperator(ybc_state->handle, func_name, type_entity, aggref->aggcollid, &op_handle)); |
448 | | |
449 | | /* Handle arguments. */ |
450 | 221 | if (aggref->aggstar) { |
451 | | /* |
452 | | * Add dummy argument for COUNT(*) case, turning it into COUNT(0). |
453 | | * We don't use a column reference as we want to count rows |
454 | | * even if all column values are NULL. |
455 | | */ |
456 | 126 | YBCPgExpr const_handle; |
457 | 126 | HandleYBStatus(YBCPgNewConstant(ybc_state->handle, |
458 | 126 | type_entity, |
459 | 126 | false /* collate_is_valid_non_c */, |
460 | 126 | NULL /* collation_sortkey */, |
461 | 126 | 0 /* datum */, |
462 | 126 | false /* is_null */, |
463 | 126 | &const_handle)); |
464 | 126 | HandleYBStatus(YBCPgOperatorAppendArg(op_handle, const_handle)); |
465 | 95 | } else { |
466 | | /* Add aggregate arguments to operator. */ |
467 | 95 | foreach(lc_arg, aggref->args) |
468 | 95 | { |
469 | 95 | TargetEntry *tle = lfirst_node(TargetEntry, lc_arg); |
470 | 95 | if (IsA(tle->expr, Const)) |
471 | 13 | { |
472 | 13 | Const* const_node = castNode(Const, tle->expr); |
473 | | /* Already checked by yb_agg_pushdown_supported */ |
474 | 13 | Assert(const_node->constisnull || const_node->constbyval); |
475 | | |
476 | 13 | YBCPgExpr const_handle; |
477 | 13 | HandleYBStatus(YBCPgNewConstant(ybc_state->handle, |
478 | 13 | type_entity, |
479 | 13 | false /* collate_is_valid_non_c */, |
480 | 13 | NULL /* collation_sortkey */, |
481 | 13 | const_node->constvalue, |
482 | 13 | const_node->constisnull, |
483 | 13 | &const_handle)); |
484 | 13 | HandleYBStatus(YBCPgOperatorAppendArg(op_handle, const_handle)); |
485 | 13 | } |
486 | 82 | else if (IsA(tle->expr, Var)) |
487 | 82 | { |
488 | | /* |
489 | | * Use original attribute number (varoattno) instead of projected one (varattno) |
490 | | * as projection is disabled for tuples produced by pushed down operators. |
491 | | */ |
492 | 82 | int attno = castNode(Var, tle->expr)->varoattno; |
493 | 82 | Form_pg_attribute attr = TupleDescAttr(tupdesc, attno - 1); |
494 | 82 | YBCPgTypeAttrs type_attrs = {attr->atttypmod}; |
495 | | |
496 | 82 | YBCPgExpr arg = YBCNewColumnRef(ybc_state->handle, |
497 | 82 | attno, |
498 | 82 | attr->atttypid, |
499 | 82 | attr->attcollation, |
500 | 82 | &type_attrs); |
501 | 82 | HandleYBStatus(YBCPgOperatorAppendArg(op_handle, arg)); |
502 | 82 | } |
503 | 0 | else |
504 | 0 | { |
505 | | /* Should never happen. */ |
506 | 0 | ereport(ERROR, |
507 | 0 | (errcode(ERRCODE_INTERNAL_ERROR), |
508 | 0 | errmsg("unsupported aggregate function argument type"))); |
509 | 0 | } |
510 | 95 | } |
511 | 95 | } |
512 | | |
513 | | /* Add aggregate operator as scan target. */ |
514 | 221 | HandleYBStatus(YBCPgDmlAppendTarget(ybc_state->handle, op_handle)); |
515 | 221 | } |
516 | | |
517 | | /* |
518 | | * Setup the scan slot based on new tuple descriptor for the given targets. This is a dummy |
519 | | * tupledesc that only includes the number of attributes. Switch to per-query memory from |
520 | | * per-tuple memory so the slot persists across iterations. |
521 | | */ |
522 | 185 | TupleDesc target_tupdesc = CreateTemplateTupleDesc(list_length(node->yb_fdw_aggs), |
523 | 185 | false /* hasoid */); |
524 | 185 | ExecInitScanTupleSlot(estate, &node->ss, target_tupdesc); |
525 | 185 | } |
526 | 18.5k | MemoryContextSwitchTo(oldcontext); |
527 | 18.5k | } |
528 | | |
529 | | /* |
530 | | * ybSetupScanQual |
531 | | * Add the pushable qual expressions to the DocDB statement. |
532 | | */ |
533 | | static void |
534 | | ybSetupScanQual(ForeignScanState *node) |
535 | 18.5k | { |
536 | 18.5k | EState *estate = node->ss.ps.state; |
537 | 18.5k | ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; |
538 | 18.5k | YbFdwExecState *yb_state = (YbFdwExecState *) node->fdw_state; |
539 | 18.5k | List *qual = foreignScan->fdw_recheck_quals; |
540 | 18.5k | ListCell *lc; |
541 | | |
542 | 18.5k | MemoryContext oldcontext = |
543 | 18.5k | MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory); |
544 | | |
545 | 18.5k | foreach(lc, qual) |
546 | 22 | { |
547 | 22 | Expr *expr = (Expr *) lfirst(lc); |
548 | | /* |
549 | | * Some expressions may be parametrized, obviously remote end can not |
550 | | * acccess the estate to get parameter values, so param references |
551 | | * are replaced with constant expressions. |
552 | | */ |
553 | 22 | expr = YbExprInstantiateParams(expr, estate->es_param_list_info); |
554 | | /* Create new PgExpr wrapper for the expression */ |
555 | 22 | YBCPgExpr yb_expr = YBCNewEvalExprCall(yb_state->handle, expr); |
556 | | /* Add the PgExpr to the statement */ |
557 | 22 | HandleYBStatus(YbPgDmlAppendQual(yb_state->handle, yb_expr)); |
558 | 22 | } |
559 | | |
560 | 18.5k | MemoryContextSwitchTo(oldcontext); |
561 | 18.5k | } |
562 | | |
563 | | /* |
564 | | * ybSetupScanColumnRefs |
565 | | * Add the column references to the DocDB statement. |
566 | | */ |
567 | | static void |
568 | | ybSetupScanColumnRefs(ForeignScanState *node) |
569 | 18.5k | { |
570 | 18.5k | ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; |
571 | 18.5k | YbFdwExecState *yb_state = (YbFdwExecState *) node->fdw_state; |
572 | 18.5k | List *params = foreignScan->fdw_private; |
573 | 18.5k | ListCell *lc; |
574 | | |
575 | 18.5k | MemoryContext oldcontext = |
576 | 18.5k | MemoryContextSwitchTo(node->ss.ps.ps_ExprContext->ecxt_per_query_memory); |
577 | | |
578 | 18.5k | foreach(lc, params) |
579 | 22 | { |
580 | 22 | YbExprParamDesc *param = (YbExprParamDesc *) lfirst(lc); |
581 | 22 | YBCPgTypeAttrs type_attrs = { param->typmod }; |
582 | | /* Create new PgExpr wrapper for the column reference */ |
583 | 22 | YBCPgExpr yb_expr = YBCNewColumnRef(yb_state->handle, |
584 | 22 | param->attno, |
585 | 22 | param->typid, |
586 | 22 | param->collid, |
587 | 22 | &type_attrs); |
588 | | /* Add the PgExpr to the statement */ |
589 | 22 | HandleYBStatus(YbPgDmlAppendColumnRef(yb_state->handle, yb_expr)); |
590 | 22 | } |
591 | | |
592 | 18.5k | MemoryContextSwitchTo(oldcontext); |
593 | 18.5k | } |
594 | | |
595 | | /* |
596 | | * ybcIterateForeignScan |
597 | | * Read next record from the data file and store it into the |
598 | | * ScanTupleSlot as a virtual tuple |
599 | | */ |
600 | | static TupleTableSlot * |
601 | | ybcIterateForeignScan(ForeignScanState *node) |
602 | 5.45M | { |
603 | 5.45M | TupleTableSlot *slot; |
604 | 5.45M | YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state; |
605 | 5.45M | bool has_data = false; |
606 | | |
607 | | /* Execute the select statement one time. |
608 | | * TODO(neil) Check whether YugaByte PgGate should combine Exec() and Fetch() into one function. |
609 | | * - The first fetch from YugaByte PgGate requires a number of operations including allocating |
610 | | * operators and protobufs. These operations are done by YBCPgExecSelect() function. |
611 | | * - The subsequent fetches don't need to setup the query with these operations again. |
612 | | */ |
613 | 5.45M | if (!ybc_state->is_exec_done) { |
614 | 18.5k | ybcSetupScanTargets(node); |
615 | 18.5k | ybSetupScanQual(node); |
616 | 18.5k | ybSetupScanColumnRefs(node); |
617 | 18.5k | HandleYBStatus(YBCPgExecSelect(ybc_state->handle, ybc_state->exec_params)); |
618 | 18.5k | ybc_state->is_exec_done = true; |
619 | 18.5k | } |
620 | | |
621 | | /* Clear tuple slot before starting */ |
622 | 5.45M | slot = node->ss.ss_ScanTupleSlot; |
623 | 5.45M | ExecClearTuple(slot); |
624 | | |
625 | 5.45M | TupleDesc tupdesc = slot->tts_tupleDescriptor; |
626 | 5.45M | Datum *values = slot->tts_values; |
627 | 5.45M | bool *isnull = slot->tts_isnull; |
628 | 5.45M | YBCPgSysColumns syscols; |
629 | | |
630 | | /* Fetch one row. */ |
631 | 5.45M | HandleYBStatus(YBCPgDmlFetch(ybc_state->handle, |
632 | 5.45M | tupdesc->natts, |
633 | 5.45M | (uint64_t *) values, |
634 | 5.45M | isnull, |
635 | 5.45M | &syscols, |
636 | 5.45M | &has_data)); |
637 | | |
638 | | /* If we have result(s) update the tuple slot. */ |
639 | 5.45M | if (has_data) |
640 | 5.44M | { |
641 | 5.44M | if (node->yb_fdw_aggs == NIL) |
642 | 5.44M | { |
643 | 5.44M | HeapTuple tuple = heap_form_tuple(tupdesc, values, isnull); |
644 | 5.44M | if (syscols.oid != InvalidOid) |
645 | 20.8k | { |
646 | 20.8k | HeapTupleSetOid(tuple, syscols.oid); |
647 | 20.8k | } |
648 | | |
649 | 5.44M | slot = ExecStoreHeapTuple(tuple, slot, false); |
650 | | |
651 | | /* Setup special columns in the slot */ |
652 | 5.44M | slot->tts_ybctid = PointerGetDatum(syscols.ybctid); |
653 | 5.44M | } |
654 | 345 | else |
655 | 345 | { |
656 | | /* |
657 | | * Aggregate results stored in virtual slot (no tuple). Set the |
658 | | * number of valid values and mark as non-empty. |
659 | | */ |
660 | 345 | slot->tts_nvalid = tupdesc->natts; |
661 | 345 | slot->tts_isempty = false; |
662 | 345 | } |
663 | 5.44M | } |
664 | | |
665 | 5.45M | return slot; |
666 | 5.45M | } |
667 | | |
668 | | static void |
669 | | ybcFreeStatementObject(YbFdwExecState* yb_fdw_exec_state) |
670 | 9.11k | { |
671 | | /* If yb_fdw_exec_state is NULL, we are in EXPLAIN; nothing to do */ |
672 | 9.11k | if (yb_fdw_exec_state != NULL && yb_fdw_exec_state->handle != NULL) |
673 | 8.54k | { |
674 | 8.54k | YBCPgDeleteStatement(yb_fdw_exec_state->handle); |
675 | 8.54k | yb_fdw_exec_state->handle = NULL; |
676 | 8.54k | yb_fdw_exec_state->exec_params = NULL; |
677 | 8.54k | yb_fdw_exec_state->is_exec_done = false; |
678 | 8.54k | } |
679 | 9.11k | } |
680 | | |
681 | | /* |
682 | | * fileReScanForeignScan |
683 | | * Rescan table, possibly with new parameters |
684 | | */ |
685 | | static void |
686 | | ybcReScanForeignScan(ForeignScanState *node) |
687 | 800 | { |
688 | 800 | YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state; |
689 | | |
690 | | /* Clear (delete) the previous select */ |
691 | 800 | ybcFreeStatementObject(ybc_state); |
692 | | |
693 | | /* Re-allocate and execute the select. */ |
694 | 800 | ybcBeginForeignScan(node, 0 /* eflags */); |
695 | 800 | } |
696 | | |
697 | | /* |
698 | | * ybcEndForeignScan |
699 | | * Finish scanning foreign table and dispose objects used for this scan |
700 | | */ |
701 | | static void |
702 | | ybcEndForeignScan(ForeignScanState *node) |
703 | 8.31k | { |
704 | 8.31k | YbFdwExecState *ybc_state = (YbFdwExecState *) node->fdw_state; |
705 | 8.31k | ybcFreeStatementObject(ybc_state); |
706 | 8.31k | } |
707 | | |
708 | | /* ------------------------------------------------------------------------- */ |
709 | | /* FDW declaration */ |
710 | | |
711 | | /* |
712 | | * Foreign-data wrapper handler function: return a struct with pointers |
713 | | * to YugaByte callback routines. |
714 | | */ |
715 | | Datum |
716 | | ybc_fdw_handler() |
717 | 65.2k | { |
718 | 65.2k | FdwRoutine *fdwroutine = makeNode(FdwRoutine); |
719 | | |
720 | 65.2k | fdwroutine->GetForeignRelSize = ybcGetForeignRelSize; |
721 | 65.2k | fdwroutine->GetForeignPaths = ybcGetForeignPaths; |
722 | 65.2k | fdwroutine->GetForeignPlan = ybcGetForeignPlan; |
723 | 65.2k | fdwroutine->BeginForeignScan = ybcBeginForeignScan; |
724 | 65.2k | fdwroutine->IterateForeignScan = ybcIterateForeignScan; |
725 | 65.2k | fdwroutine->ReScanForeignScan = ybcReScanForeignScan; |
726 | 65.2k | fdwroutine->EndForeignScan = ybcEndForeignScan; |
727 | | |
728 | | /* TODO: These are optional but we should support them eventually. */ |
729 | | /* fdwroutine->ExplainForeignScan = ybcExplainForeignScan; */ |
730 | | /* fdwroutine->AnalyzeForeignTable = ybcAnalyzeForeignTable; */ |
731 | | /* fdwroutine->IsForeignScanParallelSafe = ybcIsForeignScanParallelSafe; */ |
732 | | |
733 | 65.2k | PG_RETURN_POINTER(fdwroutine); |
734 | 65.2k | } |