YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/executor/nodeAgg.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * nodeAgg.c
4
 *    Routines to handle aggregate nodes.
5
 *
6
 *    ExecAgg normally evaluates each aggregate in the following steps:
7
 *
8
 *     transvalue = initcond
9
 *     foreach input_tuple do
10
 *      transvalue = transfunc(transvalue, input_value(s))
11
 *     result = finalfunc(transvalue, direct_argument(s))
12
 *
13
 *    If a finalfunc is not supplied then the result is just the ending
14
 *    value of transvalue.
15
 *
16
 *    Other behaviors can be selected by the "aggsplit" mode, which exists
17
 *    to support partial aggregation.  It is possible to:
18
 *    * Skip running the finalfunc, so that the output is always the
19
 *    final transvalue state.
20
 *    * Substitute the combinefunc for the transfunc, so that transvalue
21
 *    states (propagated up from a child partial-aggregation step) are merged
22
 *    rather than processing raw input rows.  (The statements below about
23
 *    the transfunc apply equally to the combinefunc, when it's selected.)
24
 *    * Apply the serializefunc to the output values (this only makes sense
25
 *    when skipping the finalfunc, since the serializefunc works on the
26
 *    transvalue data type).
27
 *    * Apply the deserializefunc to the input values (this only makes sense
28
 *    when using the combinefunc, for similar reasons).
29
 *    It is the planner's responsibility to connect up Agg nodes using these
30
 *    alternate behaviors in a way that makes sense, with partial aggregation
31
 *    results being fed to nodes that expect them.
32
 *
33
 *    If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34
 *    input tuples and eliminate duplicates (if required) before performing
35
 *    the above-depicted process.  (However, we don't do that for ordered-set
36
 *    aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37
 *    so far as this module is concerned.)  Note that partial aggregation
38
 *    is not supported in these cases, since we couldn't ensure global
39
 *    ordering or distinctness of the inputs.
40
 *
41
 *    If transfunc is marked "strict" in pg_proc and initcond is NULL,
42
 *    then the first non-NULL input_value is assigned directly to transvalue,
43
 *    and transfunc isn't applied until the second non-NULL input_value.
44
 *    The agg's first input type and transtype must be the same in this case!
45
 *
46
 *    If transfunc is marked "strict" then NULL input_values are skipped,
47
 *    keeping the previous transvalue.  If transfunc is not strict then it
48
 *    is called for every input tuple and must deal with NULL initcond
49
 *    or NULL input_values for itself.
50
 *
51
 *    If finalfunc is marked "strict" then it is not called when the
52
 *    ending transvalue is NULL, instead a NULL result is created
53
 *    automatically (this is just the usual handling of strict functions,
54
 *    of course).  A non-strict finalfunc can make its own choice of
55
 *    what to return for a NULL ending transvalue.
56
 *
57
 *    Ordered-set aggregates are treated specially in one other way: we
58
 *    evaluate any "direct" arguments and pass them to the finalfunc along
59
 *    with the transition value.
60
 *
61
 *    A finalfunc can have additional arguments beyond the transvalue and
62
 *    any "direct" arguments, corresponding to the input arguments of the
63
 *    aggregate.  These are always just passed as NULL.  Such arguments may be
64
 *    needed to allow resolution of a polymorphic aggregate's result type.
65
 *
66
 *    We compute aggregate input expressions and run the transition functions
67
 *    in a temporary econtext (aggstate->tmpcontext).  This is reset at least
68
 *    once per input tuple, so when the transvalue datatype is
69
 *    pass-by-reference, we have to be careful to copy it into a longer-lived
70
 *    memory context, and free the prior value to avoid memory leakage.  We
71
 *    store transvalues in another set of econtexts, aggstate->aggcontexts
72
 *    (one per grouping set, see below), which are also used for the hashtable
73
 *    structures in AGG_HASHED mode.  These econtexts are rescanned, not just
74
 *    reset, at group boundaries so that aggregate transition functions can
75
 *    register shutdown callbacks via AggRegisterCallback.
76
 *
77
 *    The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78
 *    run finalize functions and compute the output tuple; this context can be
79
 *    reset once per output tuple.
80
 *
81
 *    The executor's AggState node is passed as the fmgr "context" value in
82
 *    all transfunc and finalfunc calls.  It is not recommended that the
83
 *    transition functions look at the AggState node directly, but they can
84
 *    use AggCheckCallContext() to verify that they are being called by
85
 *    nodeAgg.c (and not as ordinary SQL functions).  The main reason a
86
 *    transition function might want to know this is so that it can avoid
87
 *    palloc'ing a fixed-size pass-by-ref transition value on every call:
88
 *    it can instead just scribble on and return its left input.  Ordinarily
89
 *    it is completely forbidden for functions to modify pass-by-ref inputs,
90
 *    but in the aggregate case we know the left input is either the initial
91
 *    transition value or a previous function result, and in either case its
92
 *    value need not be preserved.  See int8inc() for an example.  Notice that
93
 *    the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94
 *    the previous transition value pointer is returned.  It is also possible
95
 *    to avoid repeated data copying when the transition value is an expanded
96
 *    object: to do that, the transition function must take care to return
97
 *    an expanded object that is in a child context of the memory context
98
 *    returned by AggCheckCallContext().  Also, some transition functions want
99
 *    to store working state in addition to the nominal transition value; they
100
 *    can use the memory context returned by AggCheckCallContext() to do that.
101
 *
102
 *    Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
103
 *    AggState is available as context in earlier releases (back to 8.1),
104
 *    but direct examination of the node is needed to use it before 9.0.
105
 *
106
 *    As of 9.4, aggregate transition functions can also use AggGetAggref()
107
 *    to get hold of the Aggref expression node for their aggregate call.
108
 *    This is mainly intended for ordered-set aggregates, which are not
109
 *    supported as window functions.  (A regular aggregate function would
110
 *    need some fallback logic to use this, since there's no Aggref node
111
 *    for a window function.)
112
 *
113
 *    Grouping sets:
114
 *
115
 *    A list of grouping sets which is structurally equivalent to a ROLLUP
116
 *    clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117
 *    ordered data.  We do this by keeping a separate set of transition values
118
 *    for each grouping set being concurrently processed; for each input tuple
119
 *    we update them all, and on group boundaries we reset those states
120
 *    (starting at the front of the list) whose grouping values have changed
121
 *    (the list of grouping sets is ordered from most specific to least
122
 *    specific).
123
 *
124
 *    Where more complex grouping sets are used, we break them down into
125
 *    "phases", where each phase has a different sort order (except phase 0
126
 *    which is reserved for hashing).  During each phase but the last, the
127
 *    input tuples are additionally stored in a tuplesort which is keyed to the
128
 *    next phase's sort order; during each phase but the first, the input
129
 *    tuples are drawn from the previously sorted data.  (The sorting of the
130
 *    data for the first phase is handled by the planner, as it might be
131
 *    satisfied by underlying nodes.)
132
 *
133
 *    Hashing can be mixed with sorted grouping.  To do this, we have an
134
 *    AGG_MIXED strategy that populates the hashtables during the first sorted
135
 *    phase, and switches to reading them out after completing all sort phases.
136
 *    We can also support AGG_HASHED with multiple hash tables and no sorting
137
 *    at all.
138
 *
139
 *    From the perspective of aggregate transition and final functions, the
140
 *    only issue regarding grouping sets is this: a single call site (flinfo)
141
 *    of an aggregate function may be used for updating several different
142
 *    transition values in turn. So the function must not cache in the flinfo
143
 *    anything which logically belongs as part of the transition value (most
144
 *    importantly, the memory context in which the transition value exists).
145
 *    The support API functions (AggCheckCallContext, AggRegisterCallback) are
146
 *    sensitive to the grouping set for which the aggregate function is
147
 *    currently being called.
148
 *
149
 *    Plan structure:
150
 *
151
 *    What we get from the planner is actually one "real" Agg node which is
152
 *    part of the plan tree proper, but which optionally has an additional list
153
 *    of Agg nodes hung off the side via the "chain" field.  This is because an
154
 *    Agg node happens to be a convenient representation of all the data we
155
 *    need for grouping sets.
156
 *
157
 *    For many purposes, we treat the "real" node as if it were just the first
158
 *    node in the chain.  The chain must be ordered such that hashed entries
159
 *    come before sorted/plain entries; the real node is marked AGG_MIXED if
160
 *    there are both types present (in which case the real node describes one
161
 *    of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162
 *    the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
163
 *    the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164
 *    nodes must be of the same type; if it is AGG_PLAIN, there can be no
165
 *    chained nodes.
166
 *
167
 *    We collect all hashed nodes into a single "phase", numbered 0, and create
168
 *    a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169
 *    Phase 0 is allocated even if there are no hashes, but remains unused in
170
 *    that case.
171
 *
172
 *    AGG_HASHED nodes actually refer to only a single grouping set each,
173
 *    because for each hashed grouping we need a separate grpColIdx and
174
 *    numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
175
 *    grouping sets that share a sort order.  Each AGG_SORTED node other than
176
 *    the first one has an associated Sort node which describes the sort order
177
 *    to be used; the first sorted node takes its input from the outer subtree,
178
 *    which the planner has already arranged to provide ordered data.
179
 *
180
 *    Memory and ExprContext usage:
181
 *
182
 *    Because we're accumulating aggregate values across input rows, we need to
183
 *    use more memory contexts than just simple input/output tuple contexts.
184
 *    In fact, for a rollup, we need a separate context for each grouping set
185
 *    so that we can reset the inner (finer-grained) aggregates on their group
186
 *    boundaries while continuing to accumulate values for outer
187
 *    (coarser-grained) groupings.  On top of this, we might be simultaneously
188
 *    populating hashtables; however, we only need one context for all the
189
 *    hashtables.
190
 *
191
 *    So we create an array, aggcontexts, with an ExprContext for each grouping
192
 *    set in the largest rollup that we're going to process, and use the
193
 *    per-tuple memory context of those ExprContexts to store the aggregate
194
 *    transition values.  hashcontext is the single context created to support
195
 *    all hash tables.
196
 *
197
 *    Transition / Combine function invocation:
198
 *
199
 *    For performance reasons transition functions, including combine
200
 *    functions, aren't invoked one-by-one from nodeAgg.c after computing
201
 *    arguments using the expression evaluation engine. Instead
202
 *    ExecBuildAggTrans() builds one large expression that does both argument
203
 *    evaluation and transition function invocation. That avoids performance
204
 *    issues due to repeated uses of expression evaluation, complications due
205
 *    to filter expressions having to be evaluated early, and allows to JIT
206
 *    the entire expression into one native function.
207
 *
208
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
209
 * Portions Copyright (c) 1994, Regents of the University of California
210
 *
211
 * IDENTIFICATION
212
 *    src/backend/executor/nodeAgg.c
213
 *
214
 *-------------------------------------------------------------------------
215
 */
216
217
#include "postgres.h"
218
219
#include "access/htup_details.h"
220
#include "catalog/objectaccess.h"
221
#include "catalog/pg_aggregate.h"
222
#include "catalog/pg_proc.h"
223
#include "catalog/pg_type.h"
224
#include "catalog/yb_type.h"
225
#include "executor/executor.h"
226
#include "executor/nodeAgg.h"
227
#include "miscadmin.h"
228
#include "nodes/makefuncs.h"
229
#include "nodes/nodeFuncs.h"
230
#include "optimizer/clauses.h"
231
#include "optimizer/tlist.h"
232
#include "parser/parse_agg.h"
233
#include "parser/parse_coerce.h"
234
#include "utils/acl.h"
235
#include "utils/builtins.h"
236
#include "utils/fmgroids.h"
237
#include "utils/lsyscache.h"
238
#include "utils/memutils.h"
239
#include "utils/rel.h"
240
#include "utils/syscache.h"
241
#include "utils/tuplesort.h"
242
#include "utils/datum.h"
243
244
245
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
246
static void initialize_phase(AggState *aggstate, int newphase);
247
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
248
static void initialize_aggregates(AggState *aggstate,
249
            AggStatePerGroup *pergroups,
250
            int numReset);
251
static void advance_transition_function(AggState *aggstate,
252
              AggStatePerTrans pertrans,
253
              AggStatePerGroup pergroupstate);
254
static void advance_aggregates(AggState *aggstate);
255
static void process_ordered_aggregate_single(AggState *aggstate,
256
                 AggStatePerTrans pertrans,
257
                 AggStatePerGroup pergroupstate);
258
static void process_ordered_aggregate_multi(AggState *aggstate,
259
                AggStatePerTrans pertrans,
260
                AggStatePerGroup pergroupstate);
261
static void finalize_aggregate(AggState *aggstate,
262
           AggStatePerAgg peragg,
263
           AggStatePerGroup pergroupstate,
264
           Datum *resultVal, bool *resultIsNull);
265
static void finalize_partialaggregate(AggState *aggstate,
266
              AggStatePerAgg peragg,
267
              AggStatePerGroup pergroupstate,
268
              Datum *resultVal, bool *resultIsNull);
269
static void prepare_projection_slot(AggState *aggstate,
270
            TupleTableSlot *slot,
271
            int currentSet);
272
static void finalize_aggregates(AggState *aggstate,
273
          AggStatePerAgg peragg,
274
          AggStatePerGroup pergroup);
275
static TupleTableSlot *project_aggregates(AggState *aggstate);
276
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
277
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
278
static void build_hash_table(AggState *aggstate);
279
static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
280
static void lookup_hash_entries(AggState *aggstate);
281
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
282
static void agg_fill_hash_table(AggState *aggstate);
283
static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
284
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
285
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
286
              AggState *aggstate, EState *estate,
287
              Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
288
              Oid aggserialfn, Oid aggdeserialfn,
289
              Datum initValue, bool initValueIsNull,
290
              Oid *inputTypes, int numArguments);
291
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
292
             int lastaggno, List **same_input_transnos);
293
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
294
             bool shareable,
295
             Oid aggtransfn, Oid aggtranstype,
296
             Oid aggserialfn, Oid aggdeserialfn,
297
             Datum initValue, bool initValueIsNull,
298
             List *transnos);
299
static void yb_agg_pushdown_supported(AggState *aggstate);
300
static void yb_agg_pushdown(AggState *aggstate);
301
302
303
/*
304
 * Select the current grouping set; affects current_set and
305
 * curaggcontext.
306
 */
307
static void
308
select_current_set(AggState *aggstate, int setno, bool is_hash)
309
303k
{
310
  /* when changing this, also adapt ExecInterpExpr() and friends */
311
303k
  if (is_hash)
312
297k
    aggstate->curaggcontext = aggstate->hashcontext;
313
5.77k
  else
314
5.77k
    aggstate->curaggcontext = aggstate->aggcontexts[setno];
315
316
303k
  aggstate->current_set = setno;
317
303k
}
318
319
/*
320
 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
321
 * current_phase + 1. Juggle the tuplesorts accordingly.
322
 *
323
 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
324
 * case, so when entering phase 0, all we need to do is drop open sorts.
325
 */
326
static void
327
initialize_phase(AggState *aggstate, int newphase)
328
2.06k
{
329
2.06k
  Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
330
331
  /*
332
   * Whatever the previous state, we're now done with whatever input
333
   * tuplesort was in use.
334
   */
335
2.06k
  if (aggstate->sort_in)
336
0
  {
337
0
    tuplesort_end(aggstate->sort_in);
338
0
    aggstate->sort_in = NULL;
339
0
  }
340
341
2.06k
  if (newphase <= 1)
342
2.06k
  {
343
    /*
344
     * Discard any existing output tuplesort.
345
     */
346
2.06k
    if (aggstate->sort_out)
347
0
    {
348
0
      tuplesort_end(aggstate->sort_out);
349
0
      aggstate->sort_out = NULL;
350
0
    }
351
2.06k
  }
352
0
  else
353
0
  {
354
    /*
355
     * The old output tuplesort becomes the new input one, and this is the
356
     * right time to actually sort it.
357
     */
358
0
    aggstate->sort_in = aggstate->sort_out;
359
0
    aggstate->sort_out = NULL;
360
0
    Assert(aggstate->sort_in);
361
0
    tuplesort_performsort(aggstate->sort_in);
362
0
  }
363
364
  /*
365
   * If this isn't the last phase, we need to sort appropriately for the
366
   * next phase in sequence.
367
   */
368
2.06k
  if (newphase > 0 && newphase < aggstate->numphases - 1)
369
0
  {
370
0
    Sort     *sortnode = aggstate->phases[newphase + 1].sortnode;
371
0
    PlanState  *outerNode = outerPlanState(aggstate);
372
0
    TupleDesc tupDesc = ExecGetResultType(outerNode);
373
374
0
    aggstate->sort_out = tuplesort_begin_heap(tupDesc,
375
0
                          sortnode->numCols,
376
0
                          sortnode->sortColIdx,
377
0
                          sortnode->sortOperators,
378
0
                          sortnode->collations,
379
0
                          sortnode->nullsFirst,
380
0
                          work_mem,
381
0
                          NULL, false);
382
0
  }
383
384
2.06k
  aggstate->current_phase = newphase;
385
2.06k
  aggstate->phase = &aggstate->phases[newphase];
386
2.06k
}
387
388
/*
389
 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
390
 * populated by the previous phase.  Copy it to the sorter for the next phase
391
 * if any.
392
 *
393
 * Callers cannot rely on memory for tuple in returned slot remaining valid
394
 * past any subsequently fetched tuple.
395
 */
396
static TupleTableSlot *
397
fetch_input_tuple(AggState *aggstate)
398
813k
{
399
813k
  TupleTableSlot *slot;
400
401
813k
  if (aggstate->sort_in)
402
0
  {
403
    /* make sure we check for interrupts in either path through here */
404
0
    CHECK_FOR_INTERRUPTS();
405
0
    if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
406
0
                  aggstate->sort_slot, NULL))
407
0
      return NULL;
408
0
    slot = aggstate->sort_slot;
409
0
  }
410
813k
  else
411
813k
    slot = ExecProcNode(outerPlanState(aggstate));
412
413
813k
  if (!TupIsNull(slot) && aggstate->sort_out)
414
0
    tuplesort_puttupleslot(aggstate->sort_out, slot);
415
416
813k
  return slot;
417
813k
}
418
419
/*
420
 * (Re)Initialize an individual aggregate.
421
 *
422
 * This function handles only one grouping set, already set in
423
 * aggstate->current_set.
424
 *
425
 * When called, GetCurrentMemoryContext() should be the per-query context.
426
 */
427
static void
428
initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
429
           AggStatePerGroup pergroupstate)
430
34.9k
{
431
  /*
432
   * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
433
   */
434
34.9k
  if (pertrans->numSortCols > 0)
435
136
  {
436
    /*
437
     * In case of rescan, maybe there could be an uncompleted sort
438
     * operation?  Clean it up if so.
439
     */
440
136
    if (pertrans->sortstates[aggstate->current_set])
441
0
      tuplesort_end(pertrans->sortstates[aggstate->current_set]);
442
443
444
    /*
445
     * We use a plain Datum sorter when there's a single input column;
446
     * otherwise sort the full tuple.  (See comments for
447
     * process_ordered_aggregate_single.)
448
     */
449
136
    if (pertrans->numInputs == 1)
450
96
    {
451
96
      Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
452
453
96
      pertrans->sortstates[aggstate->current_set] =
454
96
        tuplesort_begin_datum(attr->atttypid,
455
96
                    pertrans->sortOperators[0],
456
96
                    pertrans->sortCollations[0],
457
96
                    pertrans->sortNullsFirst[0],
458
96
                    work_mem, NULL, false);
459
96
    }
460
40
    else
461
40
      pertrans->sortstates[aggstate->current_set] =
462
40
        tuplesort_begin_heap(pertrans->sortdesc,
463
40
                   pertrans->numSortCols,
464
40
                   pertrans->sortColIdx,
465
40
                   pertrans->sortOperators,
466
40
                   pertrans->sortCollations,
467
40
                   pertrans->sortNullsFirst,
468
40
                   work_mem, NULL, false);
469
136
  }
470
471
  /*
472
   * (Re)set transValue to the initial value.
473
   *
474
   * Note that when the initial value is pass-by-ref, we must copy it (into
475
   * the aggcontext) since we will pfree the transValue later.
476
   */
477
34.9k
  if (pertrans->initValueIsNull)
478
31.7k
    pergroupstate->transValue = pertrans->initValue;
479
3.21k
  else
480
3.21k
  {
481
3.21k
    MemoryContext oldContext;
482
483
3.21k
    oldContext = MemoryContextSwitchTo(
484
3.21k
                       aggstate->curaggcontext->ecxt_per_tuple_memory);
485
3.21k
    pergroupstate->transValue = datumCopy(pertrans->initValue,
486
3.21k
                        pertrans->transtypeByVal,
487
3.21k
                        pertrans->transtypeLen);
488
3.21k
    MemoryContextSwitchTo(oldContext);
489
3.21k
  }
490
34.9k
  pergroupstate->transValueIsNull = pertrans->initValueIsNull;
491
492
  /*
493
   * If the initial value for the transition state doesn't exist in the
494
   * pg_aggregate table then we will let the first non-NULL value returned
495
   * from the outer procNode become the initial value. (This is useful for
496
   * aggregates like max() and min().) The noTransValue flag signals that we
497
   * still need to do this.
498
   */
499
34.9k
  pergroupstate->noTransValue = pertrans->initValueIsNull;
500
34.9k
}
501
502
/*
503
 * Initialize all aggregate transition states for a new group of input values.
504
 *
505
 * If there are multiple grouping sets, we initialize only the first numReset
506
 * of them (the grouping sets are ordered so that the most specific one, which
507
 * is reset most often, is first). As a convenience, if numReset is 0, we
508
 * reinitialize all sets.
509
 *
510
 * NB: This cannot be used for hash aggregates, as for those the grouping set
511
 * number has to be specified from further up.
512
 *
513
 * When called, GetCurrentMemoryContext() should be the per-query context.
514
 */
515
static void
516
initialize_aggregates(AggState *aggstate,
517
            AggStatePerGroup *pergroups,
518
            int numReset)
519
1.84k
{
520
1.84k
  int     transno;
521
1.84k
  int     numGroupingSets = Max(aggstate->phase->numsets, 1);
522
1.84k
  int     setno = 0;
523
1.84k
  int     numTrans = aggstate->numtrans;
524
1.84k
  AggStatePerTrans transstates = aggstate->pertrans;
525
526
1.84k
  if (numReset == 0)
527
0
    numReset = numGroupingSets;
528
529
3.69k
  for (setno = 0; setno < numReset; setno++)
530
1.84k
  {
531
1.84k
    AggStatePerGroup pergroup = pergroups[setno];
532
533
1.84k
    select_current_set(aggstate, setno, false);
534
535
3.85k
    for (transno = 0; transno < numTrans; transno++)
536
2.00k
    {
537
2.00k
      AggStatePerTrans pertrans = &transstates[transno];
538
2.00k
      AggStatePerGroup pergroupstate = &pergroup[transno];
539
540
2.00k
      initialize_aggregate(aggstate, pertrans, pergroupstate);
541
2.00k
    }
542
1.84k
  }
543
1.84k
}
544
545
/*
546
 * Given new input value(s), advance the transition function of one aggregate
547
 * state within one grouping set only (already set in aggstate->current_set)
548
 *
549
 * The new values (and null flags) have been preloaded into argument positions
550
 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
551
 * pass to the transition function.  We also expect that the static fields of
552
 * the fcinfo are already initialized; that was done by ExecInitAgg().
553
 *
554
 * It doesn't matter which memory context this is called in.
555
 */
556
static void
557
advance_transition_function(AggState *aggstate,
558
              AggStatePerTrans pertrans,
559
              AggStatePerGroup pergroupstate)
560
455
{
561
455
  FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
562
455
  MemoryContext oldContext;
563
455
  Datum   newVal;
564
565
455
  if (pertrans->transfn.fn_strict)
566
156
  {
567
    /*
568
     * For a strict transfn, nothing happens when there's a NULL input; we
569
     * just keep the prior transValue.
570
     */
571
156
    int     numTransInputs = pertrans->numTransInputs;
572
156
    int     i;
573
574
310
    for (i = 1; i <= numTransInputs; i++)
575
168
    {
576
168
      if (fcinfo->argnull[i])
577
14
        return;
578
168
    }
579
142
    if (pergroupstate->noTransValue)
580
55
    {
581
      /*
582
       * transValue has not been initialized. This is the first non-NULL
583
       * input value. We use it as the initial value for transValue. (We
584
       * already checked that the agg's input type is binary-compatible
585
       * with its transtype, so straight copy here is OK.)
586
       *
587
       * We must copy the datum into aggcontext if it is pass-by-ref. We
588
       * do not need to pfree the old transValue, since it's NULL.
589
       */
590
55
      oldContext = MemoryContextSwitchTo(
591
55
                         aggstate->curaggcontext->ecxt_per_tuple_memory);
592
55
      pergroupstate->transValue = datumCopy(fcinfo->arg[1],
593
55
                          pertrans->transtypeByVal,
594
55
                          pertrans->transtypeLen);
595
55
      pergroupstate->transValueIsNull = false;
596
55
      pergroupstate->noTransValue = false;
597
55
      MemoryContextSwitchTo(oldContext);
598
55
      return;
599
55
    }
600
87
    if (pergroupstate->transValueIsNull)
601
0
    {
602
      /*
603
       * Don't call a strict function with NULL inputs.  Note it is
604
       * possible to get here despite the above tests, if the transfn is
605
       * strict *and* returned a NULL on a prior cycle. If that happens
606
       * we will propagate the NULL all the way to the end.
607
       */
608
0
      return;
609
0
    }
610
386
  }
611
612
  /* We run the transition functions in per-input-tuple memory context */
613
386
  oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
614
615
  /* set up aggstate->curpertrans for AggGetAggref() */
616
386
  aggstate->curpertrans = pertrans;
617
618
  /*
619
   * OK to call the transition function
620
   */
621
386
  fcinfo->arg[0] = pergroupstate->transValue;
622
386
  fcinfo->argnull[0] = pergroupstate->transValueIsNull;
623
386
  fcinfo->isnull = false;   /* just in case transfn doesn't set it */
624
625
386
  newVal = FunctionCallInvoke(fcinfo);
626
627
386
  aggstate->curpertrans = NULL;
628
629
  /*
630
   * If pass-by-ref datatype, must copy the new value into aggcontext and
631
   * free the prior transValue.  But if transfn returned a pointer to its
632
   * first input, we don't need to do anything.  Also, if transfn returned a
633
   * pointer to a R/W expanded object that is already a child of the
634
   * aggcontext, assume we can adopt that value without copying it.
635
   */
636
386
  if (!pertrans->transtypeByVal &&
637
62
    DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
638
62
  {
639
62
    if (!fcinfo->isnull)
640
62
    {
641
62
      MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
642
62
      if (DatumIsReadWriteExpandedObject(newVal,
643
62
                         false,
644
62
                         pertrans->transtypeLen) &&
645
4
        MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == GetCurrentMemoryContext())
646
0
         /* do nothing */ ;
647
62
      else
648
62
        newVal = datumCopy(newVal,
649
62
                   pertrans->transtypeByVal,
650
62
                   pertrans->transtypeLen);
651
62
    }
652
62
    if (!pergroupstate->transValueIsNull)
653
60
    {
654
60
      if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
655
60
                         false,
656
60
                         pertrans->transtypeLen))
657
0
        DeleteExpandedObject(pergroupstate->transValue);
658
60
      else
659
60
        pfree(DatumGetPointer(pergroupstate->transValue));
660
60
    }
661
62
  }
662
663
386
  pergroupstate->transValue = newVal;
664
386
  pergroupstate->transValueIsNull = fcinfo->isnull;
665
666
386
  MemoryContextSwitchTo(oldContext);
667
386
}
668
669
/*
670
 * Advance each aggregate transition state for one input tuple.  The input
671
 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
672
 * accessible to ExecEvalExpr.
673
 *
674
 * We have two sets of transition states to handle: one for sorted aggregation
675
 * and one for hashed; we do them both here, to avoid multiple evaluation of
676
 * the inputs.
677
 *
678
 * When called, GetCurrentMemoryContext() should be the per-query context.
679
 */
680
static void
681
advance_aggregates(AggState *aggstate)
682
810k
{
683
810k
  bool    dummynull;
684
685
810k
  ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
686
810k
                aggstate->tmpcontext,
687
810k
                &dummynull);
688
810k
}
689
690
/*
691
 * Run the transition function for a DISTINCT or ORDER BY aggregate
692
 * with only one input.  This is called after we have completed
693
 * entering all the input values into the sort object.  We complete the
694
 * sort, read out the values in sorted order, and run the transition
695
 * function on each value (applying DISTINCT if appropriate).
696
 *
697
 * Note that the strictness of the transition function was checked when
698
 * entering the values into the sort, so we don't check it again here;
699
 * we just apply standard SQL DISTINCT logic.
700
 *
701
 * The one-input case is handled separately from the multi-input case
702
 * for performance reasons: for single by-value inputs, such as the
703
 * common case of count(distinct id), the tuplesort_getdatum code path
704
 * is around 300% faster.  (The speedup for by-reference types is less
705
 * but still noticeable.)
706
 *
707
 * This function handles only one grouping set (already set in
708
 * aggstate->current_set).
709
 *
710
 * When called, GetCurrentMemoryContext() should be the per-query context.
711
 */
712
static void
713
process_ordered_aggregate_single(AggState *aggstate,
714
                 AggStatePerTrans pertrans,
715
                 AggStatePerGroup pergroupstate)
716
96
{
717
96
  Datum   oldVal = (Datum) 0;
718
96
  bool    oldIsNull = true;
719
96
  bool    haveOldVal = false;
720
96
  MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
721
96
  MemoryContext oldContext;
722
96
  bool    isDistinct = (pertrans->numDistinctCols > 0);
723
96
  Datum   newAbbrevVal = (Datum) 0;
724
96
  Datum   oldAbbrevVal = (Datum) 0;
725
96
  FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
726
96
  Datum    *newVal;
727
96
  bool     *isNull;
728
729
96
  Assert(pertrans->numDistinctCols < 2);
730
731
96
  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
732
733
  /* Load the column into argument 1 (arg 0 will be transition value) */
734
96
  newVal = fcinfo->arg + 1;
735
96
  isNull = fcinfo->argnull + 1;
736
737
  /*
738
   * Note: if input type is pass-by-ref, the datums returned by the sort are
739
   * freshly palloc'd in the per-query context, so we must be careful to
740
   * pfree them when they are no longer needed.
741
   */
742
743
4.34k
  while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
744
4.34k
                true, newVal, isNull, &newAbbrevVal))
745
4.25k
  {
746
    /*
747
     * Clear and select the working context for evaluation of the equality
748
     * function and transition function.
749
     */
750
4.25k
    MemoryContextReset(workcontext);
751
4.25k
    oldContext = MemoryContextSwitchTo(workcontext);
752
753
    /*
754
     * If DISTINCT mode, and not distinct from prior, skip it.
755
     *
756
     * Note: we assume equality functions don't care about collation.
757
     */
758
4.25k
    if (isDistinct &&
759
4.13k
      haveOldVal &&
760
4.09k
      ((oldIsNull && *isNull) ||
761
4.09k
       (!oldIsNull && !*isNull &&
762
4.08k
        oldAbbrevVal == newAbbrevVal &&
763
4.08k
        DatumGetBool(FunctionCall2(&pertrans->equalfnOne,
764
4.09k
                     oldVal, *newVal)))))
765
4.01k
    {
766
      /* equal to prior, so forget this one */
767
4.01k
      if (!pertrans->inputtypeByVal && !*isNull)
768
4
        pfree(DatumGetPointer(*newVal));
769
4.01k
    }
770
236
    else
771
236
    {
772
236
      advance_transition_function(aggstate, pertrans, pergroupstate);
773
      /* forget the old value, if any */
774
236
      if (!oldIsNull && !pertrans->inputtypeByVal)
775
5
        pfree(DatumGetPointer(oldVal));
776
      /* and remember the new one for subsequent equality checks */
777
236
      oldVal = *newVal;
778
236
      oldAbbrevVal = newAbbrevVal;
779
236
      oldIsNull = *isNull;
780
236
      haveOldVal = true;
781
236
    }
782
783
4.25k
    MemoryContextSwitchTo(oldContext);
784
4.25k
  }
785
786
96
  if (!oldIsNull && !pertrans->inputtypeByVal)
787
1
    pfree(DatumGetPointer(oldVal));
788
789
96
  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
790
96
  pertrans->sortstates[aggstate->current_set] = NULL;
791
96
}
792
793
/*
794
 * Run the transition function for a DISTINCT or ORDER BY aggregate
795
 * with more than one input.  This is called after we have completed
796
 * entering all the input values into the sort object.  We complete the
797
 * sort, read out the values in sorted order, and run the transition
798
 * function on each value (applying DISTINCT if appropriate).
799
 *
800
 * This function handles only one grouping set (already set in
801
 * aggstate->current_set).
802
 *
803
 * When called, GetCurrentMemoryContext() should be the per-query context.
804
 */
805
static void
806
process_ordered_aggregate_multi(AggState *aggstate,
807
                AggStatePerTrans pertrans,
808
                AggStatePerGroup pergroupstate)
809
40
{
810
40
  ExprContext *tmpcontext = aggstate->tmpcontext;
811
40
  FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
812
40
  TupleTableSlot *slot1 = pertrans->sortslot;
813
40
  TupleTableSlot *slot2 = pertrans->uniqslot;
814
40
  int     numTransInputs = pertrans->numTransInputs;
815
40
  int     numDistinctCols = pertrans->numDistinctCols;
816
40
  Datum   newAbbrevVal = (Datum) 0;
817
40
  Datum   oldAbbrevVal = (Datum) 0;
818
40
  bool    haveOldValue = false;
819
40
  TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
820
40
  int     i;
821
822
40
  tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
823
824
40
  ExecClearTuple(slot1);
825
40
  if (slot2)
826
12
    ExecClearTuple(slot2);
827
828
181
  while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
829
181
                  true, true, slot1, &newAbbrevVal))
830
141
  {
831
141
    CHECK_FOR_INTERRUPTS();
832
833
141
    tmpcontext->ecxt_outertuple = slot1;
834
141
    tmpcontext->ecxt_innertuple = slot2;
835
836
141
    if (numDistinctCols == 0 ||
837
110
      !haveOldValue ||
838
98
      newAbbrevVal != oldAbbrevVal ||
839
92
      !ExecQual(pertrans->equalfnMulti, tmpcontext))
840
75
    {
841
      /*
842
       * Extract the first numTransInputs columns as datums to pass to
843
       * the transfn.
844
       */
845
75
      slot_getsomeattrs(slot1, numTransInputs);
846
847
      /* Load values into fcinfo */
848
      /* Start from 1, since the 0th arg will be the transition value */
849
262
      for (i = 0; i < numTransInputs; i++)
850
187
      {
851
187
        fcinfo->arg[i + 1] = slot1->tts_values[i];
852
187
        fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
853
187
      }
854
855
75
      advance_transition_function(aggstate, pertrans, pergroupstate);
856
857
75
      if (numDistinctCols > 0)
858
44
      {
859
        /* swap the slot pointers to retain the current tuple */
860
44
        TupleTableSlot *tmpslot = slot2;
861
862
44
        slot2 = slot1;
863
44
        slot1 = tmpslot;
864
        /* avoid ExecQual() calls by reusing abbreviated keys */
865
44
        oldAbbrevVal = newAbbrevVal;
866
44
        haveOldValue = true;
867
44
      }
868
75
    }
869
870
    /* Reset context each time */
871
141
    ResetExprContext(tmpcontext);
872
873
141
    ExecClearTuple(slot1);
874
141
  }
875
876
40
  if (slot2)
877
12
    ExecClearTuple(slot2);
878
879
40
  tuplesort_end(pertrans->sortstates[aggstate->current_set]);
880
40
  pertrans->sortstates[aggstate->current_set] = NULL;
881
882
  /* restore previous slot, potentially in use for grouping sets */
883
40
  tmpcontext->ecxt_outertuple = save;
884
40
}
885
886
/*
887
 * Compute the final value of one aggregate.
888
 *
889
 * This function handles only one grouping set (already set in
890
 * aggstate->current_set).
891
 *
892
 * The finalfunction will be run, and the result delivered, in the
893
 * output-tuple context; caller's GetCurrentMemoryContext() does not matter.
894
 *
895
 * The finalfn uses the state as set in the transno. This also might be
896
 * being used by another aggregate function, so it's important that we do
897
 * nothing destructive here.
898
 */
899
static void
900
finalize_aggregate(AggState *aggstate,
901
           AggStatePerAgg peragg,
902
           AggStatePerGroup pergroupstate,
903
           Datum *resultVal, bool *resultIsNull)
904
34.9k
{
905
34.9k
  FunctionCallInfoData fcinfo;
906
34.9k
  bool    anynull = false;
907
34.9k
  MemoryContext oldContext;
908
34.9k
  int     i;
909
34.9k
  ListCell   *lc;
910
34.9k
  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
911
912
34.9k
  oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
913
914
  /*
915
   * Evaluate any direct arguments.  We do this even if there's no finalfn
916
   * (which is unlikely anyway), so that side-effects happen as expected.
917
   * The direct arguments go into arg positions 1 and up, leaving position 0
918
   * for the transition state value.
919
   */
920
34.9k
  i = 1;
921
34.9k
  foreach(lc, peragg->aggdirectargs)
922
107
  {
923
107
    ExprState  *expr = (ExprState *) lfirst(lc);
924
925
107
    fcinfo.arg[i] = ExecEvalExpr(expr,
926
107
                   aggstate->ss.ps.ps_ExprContext,
927
107
                   &fcinfo.argnull[i]);
928
107
    anynull |= fcinfo.argnull[i];
929
107
    i++;
930
107
  }
931
932
  /*
933
   * Apply the agg's finalfn if one is provided, else return transValue.
934
   */
935
34.9k
  if (OidIsValid(peragg->finalfn_oid))
936
1.35k
  {
937
1.35k
    int     numFinalArgs = peragg->numFinalArgs;
938
939
    /* set up aggstate->curperagg for AggGetAggref() */
940
1.35k
    aggstate->curperagg = peragg;
941
942
1.35k
    InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
943
1.35k
                 numFinalArgs,
944
1.35k
                 pertrans->aggCollation,
945
1.35k
                 (void *) aggstate, NULL);
946
947
    /* Fill in the transition state value */
948
1.35k
    fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
949
1.35k
                           pergroupstate->transValueIsNull,
950
1.35k
                           pertrans->transtypeLen);
951
1.35k
    fcinfo.argnull[0] = pergroupstate->transValueIsNull;
952
1.35k
    anynull |= pergroupstate->transValueIsNull;
953
954
    /* Fill any remaining argument positions with nulls */
955
1.52k
    for (; i < numFinalArgs; i++)
956
165
    {
957
165
      fcinfo.arg[i] = (Datum) 0;
958
165
      fcinfo.argnull[i] = true;
959
165
      anynull = true;
960
165
    }
961
962
1.35k
    if (fcinfo.flinfo->fn_strict && anynull)
963
0
    {
964
      /* don't call a strict function with NULL inputs */
965
0
      *resultVal = (Datum) 0;
966
0
      *resultIsNull = true;
967
0
    }
968
1.35k
    else
969
1.35k
    {
970
1.35k
      *resultVal = FunctionCallInvoke(&fcinfo);
971
1.35k
      *resultIsNull = fcinfo.isnull;
972
1.35k
    }
973
1.35k
    aggstate->curperagg = NULL;
974
1.35k
  }
975
33.5k
  else
976
33.5k
  {
977
    /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
978
33.5k
    *resultVal = pergroupstate->transValue;
979
33.5k
    *resultIsNull = pergroupstate->transValueIsNull;
980
33.5k
  }
981
982
  /*
983
   * If result is pass-by-ref, make sure it is in the right context.
984
   */
985
34.9k
  if (!peragg->resulttypeByVal && !*resultIsNull &&
986
1.23k
    !MemoryContextContains(GetCurrentMemoryContext(),
987
1.23k
                 DatumGetPointer(*resultVal)))
988
52
    *resultVal = datumCopy(*resultVal,
989
52
                 peragg->resulttypeByVal,
990
52
                 peragg->resulttypeLen);
991
992
34.9k
  MemoryContextSwitchTo(oldContext);
993
34.9k
}
994
995
/*
996
 * Compute the output value of one partial aggregate.
997
 *
998
 * The serialization function will be run, and the result delivered, in the
999
 * output-tuple context; caller's GetCurrentMemoryContext() does not matter.
1000
 */
1001
static void
1002
finalize_partialaggregate(AggState *aggstate,
1003
              AggStatePerAgg peragg,
1004
              AggStatePerGroup pergroupstate,
1005
              Datum *resultVal, bool *resultIsNull)
1006
0
{
1007
0
  AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1008
0
  MemoryContext oldContext;
1009
1010
0
  oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1011
1012
  /*
1013
   * serialfn_oid will be set if we must serialize the transvalue before
1014
   * returning it
1015
   */
1016
0
  if (OidIsValid(pertrans->serialfn_oid))
1017
0
  {
1018
    /* Don't call a strict serialization function with NULL input. */
1019
0
    if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1020
0
    {
1021
0
      *resultVal = (Datum) 0;
1022
0
      *resultIsNull = true;
1023
0
    }
1024
0
    else
1025
0
    {
1026
0
      FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1027
1028
0
      fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1029
0
                            pergroupstate->transValueIsNull,
1030
0
                            pertrans->transtypeLen);
1031
0
      fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1032
1033
0
      *resultVal = FunctionCallInvoke(fcinfo);
1034
0
      *resultIsNull = fcinfo->isnull;
1035
0
    }
1036
0
  }
1037
0
  else
1038
0
  {
1039
    /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1040
0
    *resultVal = pergroupstate->transValue;
1041
0
    *resultIsNull = pergroupstate->transValueIsNull;
1042
0
  }
1043
1044
  /* If result is pass-by-ref, make sure it is in the right context. */
1045
0
  if (!peragg->resulttypeByVal && !*resultIsNull &&
1046
0
    !MemoryContextContains(GetCurrentMemoryContext(),
1047
0
                 DatumGetPointer(*resultVal)))
1048
0
    *resultVal = datumCopy(*resultVal,
1049
0
                 peragg->resulttypeByVal,
1050
0
                 peragg->resulttypeLen);
1051
1052
0
  MemoryContextSwitchTo(oldContext);
1053
0
}
1054
1055
/*
1056
 * Prepare to finalize and project based on the specified representative tuple
1057
 * slot and grouping set.
1058
 *
1059
 * In the specified tuple slot, force to null all attributes that should be
1060
 * read as null in the context of the current grouping set.  Also stash the
1061
 * current group bitmap where GroupingExpr can get at it.
1062
 *
1063
 * This relies on three conditions:
1064
 *
1065
 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1066
 * only reference it in evaluations, which will only access individual
1067
 * attributes.
1068
 *
1069
 * 2) No system columns are going to need to be nulled. (If a system column is
1070
 * referenced in a group clause, it is actually projected in the outer plan
1071
 * tlist.)
1072
 *
1073
 * 3) Within a given phase, we never need to recover the value of an attribute
1074
 * once it has been set to null.
1075
 *
1076
 * Poking into the slot this way is a bit ugly, but the consensus is that the
1077
 * alternative was worse.
1078
 */
1079
static void
1080
prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1081
138k
{
1082
138k
  if (aggstate->phase->grouped_cols)
1083
136k
  {
1084
136k
    Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1085
1086
136k
    aggstate->grouped_cols = grouped_cols;
1087
1088
136k
    if (slot->tts_isempty)
1089
0
    {
1090
      /*
1091
       * Force all values to be NULL if working on an empty input tuple
1092
       * (i.e. an empty grouping set for which no input rows were
1093
       * supplied).
1094
       */
1095
0
      ExecStoreAllNullTuple(slot);
1096
0
    }
1097
136k
    else if (aggstate->all_grouped_cols)
1098
136k
    {
1099
136k
      ListCell   *lc;
1100
1101
      /* all_grouped_cols is arranged in desc order */
1102
136k
      slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1103
1104
136k
      foreach(lc, aggstate->all_grouped_cols)
1105
238k
      {
1106
238k
        int     attnum = lfirst_int(lc);
1107
1108
238k
        if (!bms_is_member(attnum, grouped_cols))
1109
8
          slot->tts_isnull[attnum - 1] = true;
1110
238k
      }
1111
136k
    }
1112
136k
  }
1113
138k
}
1114
1115
/*
1116
 * Compute the final value of all aggregates for one group.
1117
 *
1118
 * This function handles only one grouping set at a time, which the caller must
1119
 * have selected.  It's also the caller's responsibility to adjust the supplied
1120
 * pergroup parameter to point to the current set's transvalues.
1121
 *
1122
 * Results are stored in the output econtext aggvalues/aggnulls.
1123
 */
1124
static void
1125
finalize_aggregates(AggState *aggstate,
1126
          AggStatePerAgg peraggs,
1127
          AggStatePerGroup pergroup)
1128
138k
{
1129
138k
  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1130
138k
  Datum    *aggvalues = econtext->ecxt_aggvalues;
1131
138k
  bool     *aggnulls = econtext->ecxt_aggnulls;
1132
138k
  int     aggno;
1133
138k
  int     transno;
1134
1135
  /*
1136
   * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1137
   * inputs and run the transition functions.
1138
   */
1139
173k
  for (transno = 0; transno < aggstate->numtrans; transno++)
1140
34.9k
  {
1141
34.9k
    AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1142
34.9k
    AggStatePerGroup pergroupstate;
1143
1144
34.9k
    pergroupstate = &pergroup[transno];
1145
1146
34.9k
    if (pertrans->numSortCols > 0)
1147
136
    {
1148
136
      Assert(aggstate->aggstrategy != AGG_HASHED &&
1149
136
           aggstate->aggstrategy != AGG_MIXED);
1150
1151
136
      if (pertrans->numInputs == 1)
1152
96
        process_ordered_aggregate_single(aggstate,
1153
96
                         pertrans,
1154
96
                         pergroupstate);
1155
40
      else
1156
40
        process_ordered_aggregate_multi(aggstate,
1157
40
                        pertrans,
1158
40
                        pergroupstate);
1159
136
    }
1160
34.9k
  }
1161
1162
  /*
1163
   * Run the final functions.
1164
   */
1165
173k
  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1166
34.9k
  {
1167
34.9k
    AggStatePerAgg peragg = &peraggs[aggno];
1168
34.9k
    int     transno = peragg->transno;
1169
34.9k
    AggStatePerGroup pergroupstate;
1170
1171
34.9k
    pergroupstate = &pergroup[transno];
1172
1173
34.9k
    if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1174
0
      finalize_partialaggregate(aggstate, peragg, pergroupstate,
1175
0
                    &aggvalues[aggno], &aggnulls[aggno]);
1176
34.9k
    else
1177
34.9k
      finalize_aggregate(aggstate, peragg, pergroupstate,
1178
34.9k
                 &aggvalues[aggno], &aggnulls[aggno]);
1179
34.9k
  }
1180
138k
}
1181
1182
/*
1183
 * Project the result of a group (whose aggs have already been calculated by
1184
 * finalize_aggregates). Returns the result slot, or NULL if no row is
1185
 * projected (suppressed by qual).
1186
 */
1187
static TupleTableSlot *
1188
project_aggregates(AggState *aggstate)
1189
138k
{
1190
138k
  ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1191
1192
  /*
1193
   * Check the qual (HAVING clause); if the group does not match, ignore it.
1194
   */
1195
138k
  if (ExecQual(aggstate->ss.ps.qual, econtext))
1196
138k
  {
1197
    /*
1198
     * Form and return projection tuple using the aggregate results and
1199
     * the representative input tuple.
1200
     */
1201
138k
    return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1202
138k
  }
1203
138k
  else
1204
138k
    InstrCountFiltered1(aggstate, 1);
1205
1206
102
  return NULL;
1207
138k
}
1208
1209
/*
1210
 * find_unaggregated_cols
1211
 *    Construct a bitmapset of the column numbers of un-aggregated Vars
1212
 *    appearing in our targetlist and qual (HAVING clause)
1213
 */
1214
static Bitmapset *
1215
find_unaggregated_cols(AggState *aggstate)
1216
130
{
1217
130
  Agg      *node = (Agg *) aggstate->ss.ps.plan;
1218
130
  Bitmapset  *colnos;
1219
1220
130
  colnos = NULL;
1221
130
  (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1222
130
                     &colnos);
1223
130
  (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1224
130
                     &colnos);
1225
130
  return colnos;
1226
130
}
1227
1228
static bool
1229
find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1230
936
{
1231
936
  if (node == NULL)
1232
114
    return false;
1233
822
  if (IsA(node, Var))
1234
167
  {
1235
167
    Var      *var = (Var *) node;
1236
1237
    /* setrefs.c should have set the varno to OUTER_VAR */
1238
167
    Assert(var->varno == OUTER_VAR);
1239
167
    Assert(var->varlevelsup == 0);
1240
167
    *colnos = bms_add_member(*colnos, var->varattno);
1241
167
    return false;
1242
655
  }
1243
655
  if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1244
154
  {
1245
    /* do not descend into aggregate exprs */
1246
154
    return false;
1247
154
  }
1248
501
  return expression_tree_walker(node, find_unaggregated_cols_walker,
1249
501
                  (void *) colnos);
1250
501
}
1251
1252
/*
1253
 * (Re-)initialize the hash table(s) to empty.
1254
 *
1255
 * To implement hashed aggregation, we need a hashtable that stores a
1256
 * representative tuple and an array of AggStatePerGroup structs for each
1257
 * distinct set of GROUP BY column values.  We compute the hash key from the
1258
 * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
1259
 * for each entry.
1260
 *
1261
 * We have a separate hashtable and associated perhash data structure for each
1262
 * grouping set for which we're doing hashing.
1263
 *
1264
 * The contents of the hash tables always live in the hashcontext's per-tuple
1265
 * memory context (there is only one of these for all tables together, since
1266
 * they are all reset at the same time).
1267
 */
1268
static void
1269
build_hash_table(AggState *aggstate)
1270
1.10k
{
1271
1.10k
  MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1272
1.10k
  Size    additionalsize;
1273
1.10k
  int     i;
1274
1275
1.10k
  Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1276
1277
1.10k
  additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1278
1279
2.20k
  for (i = 0; i < aggstate->num_hashes; ++i)
1280
1.10k
  {
1281
1.10k
    AggStatePerHash perhash = &aggstate->perhash[i];
1282
1283
1.10k
    Assert(perhash->aggnode->numGroups > 0);
1284
1285
1.10k
    if (perhash->hashtable)
1286
1.00k
      ResetTupleHashTable(perhash->hashtable);
1287
102
    else
1288
102
      perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1289
102
                            perhash->hashslot->tts_tupleDescriptor,
1290
102
                            perhash->numCols,
1291
102
                            perhash->hashGrpColIdxHash,
1292
102
                            perhash->eqfuncoids,
1293
102
                            perhash->hashfunctions,
1294
102
                            perhash->aggnode->numGroups,
1295
102
                            additionalsize,
1296
102
                            aggstate->ss.ps.state->es_query_cxt,
1297
102
                            aggstate->hashcontext->ecxt_per_tuple_memory,
1298
102
                            tmpmem,
1299
102
                            DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1300
1.10k
  }
1301
1.10k
}
1302
1303
/*
1304
 * Compute columns that actually need to be stored in hashtable entries.  The
1305
 * incoming tuples from the child plan node will contain grouping columns,
1306
 * other columns referenced in our targetlist and qual, columns used to
1307
 * compute the aggregate functions, and perhaps just junk columns we don't use
1308
 * at all.  Only columns of the first two types need to be stored in the
1309
 * hashtable, and getting rid of the others can make the table entries
1310
 * significantly smaller.  The hashtable only contains the relevant columns,
1311
 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1312
 * into the format of the normal input descriptor.
1313
 *
1314
 * Additional columns, in addition to the columns grouped by, come from two
1315
 * sources: Firstly functionally dependent columns that we don't need to group
1316
 * by themselves, and secondly ctids for row-marks.
1317
 *
1318
 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1319
 * then build an array of the columns included in the hashtable.  Note that
1320
 * the array is preserved over ExecReScanAgg, so we allocate it in the
1321
 * per-query context (unlike the hash table itself).
1322
 */
1323
static void
1324
find_hash_columns(AggState *aggstate)
1325
130
{
1326
130
  Bitmapset  *base_colnos;
1327
130
  List     *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1328
130
  int     numHashes = aggstate->num_hashes;
1329
130
  EState     *estate = aggstate->ss.ps.state;
1330
130
  int     j;
1331
1332
  /* Find Vars that will be needed in tlist and qual */
1333
130
  base_colnos = find_unaggregated_cols(aggstate);
1334
1335
262
  for (j = 0; j < numHashes; ++j)
1336
132
  {
1337
132
    AggStatePerHash perhash = &aggstate->perhash[j];
1338
132
    Bitmapset  *colnos = bms_copy(base_colnos);
1339
132
    AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1340
132
    List     *hashTlist = NIL;
1341
132
    TupleDesc hashDesc;
1342
132
    int     i;
1343
1344
132
    perhash->largestGrpColIdx = 0;
1345
1346
    /*
1347
     * If we're doing grouping sets, then some Vars might be referenced in
1348
     * tlist/qual for the benefit of other grouping sets, but not needed
1349
     * when hashing; i.e. prepare_projection_slot will null them out, so
1350
     * there'd be no point storing them.  Use prepare_projection_slot's
1351
     * logic to determine which.
1352
     */
1353
132
    if (aggstate->phases[0].grouped_cols)
1354
132
    {
1355
132
      Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
1356
132
      ListCell   *lc;
1357
1358
132
      foreach(lc, aggstate->all_grouped_cols)
1359
151
      {
1360
151
        int     attnum = lfirst_int(lc);
1361
1362
151
        if (!bms_is_member(attnum, grouped_cols))
1363
4
          colnos = bms_del_member(colnos, attnum);
1364
151
      }
1365
132
    }
1366
    /* Add in all the grouping columns */
1367
279
    for (i = 0; i < perhash->numCols; i++)
1368
147
      colnos = bms_add_member(colnos, grpColIdx[i]);
1369
1370
132
    perhash->hashGrpColIdxInput =
1371
132
      palloc(bms_num_members(colnos) * sizeof(AttrNumber));
1372
132
    perhash->hashGrpColIdxHash =
1373
132
      palloc(perhash->numCols * sizeof(AttrNumber));
1374
1375
    /*
1376
     * First build mapping for columns directly hashed. These are the
1377
     * first, because they'll be accessed when computing hash values and
1378
     * comparing tuples for exact matches. We also build simple mapping
1379
     * for execGrouping, so it knows where to find the to-be-hashed /
1380
     * compared columns in the input.
1381
     */
1382
279
    for (i = 0; i < perhash->numCols; i++)
1383
147
    {
1384
147
      perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1385
147
      perhash->hashGrpColIdxHash[i] = i + 1;
1386
147
      perhash->numhashGrpCols++;
1387
      /* delete already mapped columns */
1388
147
      bms_del_member(colnos, grpColIdx[i]);
1389
147
    }
1390
1391
    /* and add the remaining columns */
1392
140
    while ((i = bms_first_member(colnos)) >= 0)
1393
8
    {
1394
8
      perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1395
8
      perhash->numhashGrpCols++;
1396
8
    }
1397
1398
    /* and build a tuple descriptor for the hashtable */
1399
287
    for (i = 0; i < perhash->numhashGrpCols; i++)
1400
155
    {
1401
155
      int     varNumber = perhash->hashGrpColIdxInput[i] - 1;
1402
1403
155
      hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1404
155
      perhash->largestGrpColIdx =
1405
155
        Max(varNumber + 1, perhash->largestGrpColIdx);
1406
155
    }
1407
1408
132
    hashDesc = ExecTypeFromTL(hashTlist, false);
1409
1410
132
    execTuplesHashPrepare(perhash->numCols,
1411
132
                perhash->aggnode->grpOperators,
1412
132
                &perhash->eqfuncoids,
1413
132
                &perhash->hashfunctions);
1414
132
    perhash->hashslot =
1415
132
      ExecAllocTableSlot(&estate->es_tupleTable, hashDesc);
1416
1417
132
    list_free(hashTlist);
1418
132
    bms_free(colnos);
1419
132
  }
1420
1421
130
  bms_free(base_colnos);
1422
130
}
1423
1424
/*
1425
 * Estimate per-hash-table-entry overhead for the planner.
1426
 *
1427
 * Note that the estimate does not include space for pass-by-reference
1428
 * transition data values, nor for the representative tuple of each group.
1429
 * Nor does this account of the target fill-factor and growth policy of the
1430
 * hash table.
1431
 */
1432
Size
1433
hash_agg_entry_size(int numAggs)
1434
184
{
1435
184
  Size    entrysize;
1436
1437
  /* This must match build_hash_table */
1438
184
  entrysize = sizeof(TupleHashEntryData) +
1439
184
    numAggs * sizeof(AggStatePerGroupData);
1440
184
  entrysize = MAXALIGN(entrysize);
1441
1442
184
  return entrysize;
1443
184
}
1444
1445
/*
1446
 * Find or create a hashtable entry for the tuple group containing the current
1447
 * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1448
 * set (which the caller must have selected - note that initialize_aggregate
1449
 * depends on this).
1450
 *
1451
 * When called, GetCurrentMemoryContext() should be the per-query context.
1452
 */
1453
static TupleHashEntryData *
1454
lookup_hash_entry(AggState *aggstate)
1455
295k
{
1456
295k
  TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1457
295k
  AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1458
295k
  TupleTableSlot *hashslot = perhash->hashslot;
1459
295k
  TupleHashEntryData *entry;
1460
295k
  bool    isnew;
1461
295k
  int     i;
1462
1463
  /* transfer just the needed columns into hashslot */
1464
295k
  slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1465
295k
  ExecClearTuple(hashslot);
1466
1467
697k
  for (i = 0; i < perhash->numhashGrpCols; i++)
1468
401k
  {
1469
401k
    int     varNumber = perhash->hashGrpColIdxInput[i] - 1;
1470
1471
401k
    hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1472
401k
    hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1473
401k
  }
1474
295k
  ExecStoreVirtualTuple(hashslot);
1475
1476
  /* find or create the hashtable entry using the filtered tuple */
1477
295k
  entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1478
1479
295k
  if (isnew)
1480
136k
  {
1481
136k
    AggStatePerGroup pergroup;
1482
136k
    int     transno;
1483
1484
136k
    pergroup = (AggStatePerGroup)
1485
136k
      MemoryContextAlloc(perhash->hashtable->tablecxt,
1486
136k
                 sizeof(AggStatePerGroupData) * aggstate->numtrans);
1487
136k
    entry->additional = pergroup;
1488
1489
    /*
1490
     * Initialize aggregates for new tuple group, lookup_hash_entries()
1491
     * already has selected the relevant grouping set.
1492
     */
1493
169k
    for (transno = 0; transno < aggstate->numtrans; transno++)
1494
32.9k
    {
1495
32.9k
      AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1496
32.9k
      AggStatePerGroup pergroupstate = &pergroup[transno];
1497
1498
32.9k
      initialize_aggregate(aggstate, pertrans, pergroupstate);
1499
32.9k
    }
1500
136k
  }
1501
1502
295k
  return entry;
1503
295k
}
1504
1505
/*
1506
 * Look up hash entries for the current tuple in all hashed grouping sets,
1507
 * returning an array of pergroup pointers suitable for advance_aggregates.
1508
 *
1509
 * Be aware that lookup_hash_entry can reset the tmpcontext.
1510
 */
1511
static void
1512
lookup_hash_entries(AggState *aggstate)
1513
295k
{
1514
295k
  int     numHashes = aggstate->num_hashes;
1515
295k
  AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1516
295k
  int     setno;
1517
1518
591k
  for (setno = 0; setno < numHashes; setno++)
1519
295k
  {
1520
295k
    select_current_set(aggstate, setno, true);
1521
295k
    pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1522
295k
  }
1523
295k
}
1524
1525
/*
1526
 * Evaluates whether plan supports pushdowns of aggregates to DocDB, and sets
1527
 * yb_pushdown_supported accordingly in AggState.
1528
 */
1529
static void
1530
yb_agg_pushdown_supported(AggState *aggstate)
1531
2.02k
{
1532
2.02k
  ForeignScanState *scan_state;
1533
2.02k
  ListCell *lc_agg;
1534
2.02k
  ListCell *lc_arg;
1535
2.02k
  bool check_outer_plan;
1536
1537
  /* Initially set pushdown supported to false. */
1538
2.02k
  aggstate->yb_pushdown_supported = false;
1539
1540
  /* Phase 0 is a dummy phase, so there should be two phases. */
1541
2.02k
  if (aggstate->numphases != 2)
1542
126
    return;
1543
1544
  /* Plain agg strategy. */
1545
1.89k
  if (aggstate->phase->aggstrategy != AGG_PLAIN)
1546
25
    return;
1547
1548
  /* No GROUP BY. */
1549
1.87k
  if (aggstate->phase->numsets != 0)
1550
4
    return;
1551
1552
  /* Foreign scan outer plan. */
1553
1.86k
  if (!IsA(outerPlanState(aggstate), ForeignScanState))
1554
1.46k
    return;
1555
1556
400
  scan_state = castNode(ForeignScanState, outerPlanState(aggstate));
1557
1558
  /* Foreign relation we are scanning is a YB table. */
1559
400
  if (!IsYBRelationById(scan_state->ss.ss_currentRelation->rd_id))
1560
8
    return;
1561
1562
  /* No WHERE quals. */
1563
392
  if (scan_state->ss.ps.qual)
1564
146
    return;
1565
1566
246
  check_outer_plan = false;
1567
1568
246
  foreach(lc_agg, aggstate->aggs)
1569
291
  {
1570
291
    AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(lc_agg);
1571
291
    Aggref *aggref = aggrefstate->aggref;
1572
291
    char *func_name = get_func_name(aggref->aggfnoid);
1573
1574
    /* Only support COUNT/MIN/MAX/SUM. */
1575
291
    if (strcmp(func_name, "count") != 0 &&
1576
137
      strcmp(func_name, "min") != 0 &&
1577
112
      strcmp(func_name, "max") != 0 &&
1578
68
      strcmp(func_name, "sum") != 0)
1579
39
      return;
1580
1581
    /* No ORDER BY. */
1582
252
    if (list_length(aggref->aggorder) != 0)
1583
0
      return;
1584
1585
    /* No DISTINCT. */
1586
252
    if (list_length(aggref->aggdistinct) != 0)
1587
1
      return;
1588
1589
    /* No FILTER. */
1590
251
    if (aggref->aggfilter)
1591
4
      return;
1592
1593
    /* No array arguments. */
1594
247
    if (aggref->aggvariadic)
1595
0
      return;
1596
1597
    /* Normal aggregate kind. */
1598
247
    if (aggref->aggkind != AGGKIND_NORMAL)
1599
0
      return;
1600
1601
    /* Does not belong to outer plan. */
1602
247
    if (aggref->agglevelsup != 0)
1603
0
      return;
1604
1605
    /* Simple split. */
1606
247
    if (aggref->aggsplit != AGGSPLIT_SIMPLE)
1607
0
      return;
1608
1609
    /* Aggtranstype is a supported YB key type and is not INTERNAL or NUMERIC. */
1610
247
    if (!YbDataTypeIsValidForKey(aggref->aggtranstype) ||
1611
247
      aggref->aggtranstype == INTERNALOID ||
1612
244
      aggref->aggtranstype == NUMERICOID)
1613
3
      return;
1614
1615
    /*
1616
     * The builtin functions max and min imply comparison. Character type
1617
     * comparison requires postgres collation info which is not accessible
1618
     * by DocDB. Because DocDB only does byte-wise comparison, it will not
1619
     * be correct for any non-C collations. In order to allow min/max
1620
     * pushdown for a non-C collation, we need to ensure that the argument
1621
     * is a key-column with a deterministic non-C collation. In such a
1622
     * case we store a collation-encoded string by concatenating the
1623
     * collation sort key with the original text value so that the byte-wise
1624
     * comparison result is correct.
1625
     */
1626
244
    if ((strcmp(func_name, "min") == 0 || strcmp(func_name, "max") == 0) &&
1627
67
      (YBIsCollationValidNonC(aggref->aggcollid) ||
1628
67
       YBIsCollationValidNonC(aggref->inputcollid)))
1629
0
      return;
1630
1631
244
    foreach(lc_arg, aggref->args)
1632
118
    {
1633
118
      TargetEntry *tle = lfirst_node(TargetEntry, lc_arg);
1634
1635
      /* Only support simple column expressions until DocDB can eval PG exprs. */
1636
118
      Oid type = InvalidOid;
1637
118
      if (IsA(tle->expr, Var))
1638
105
      {
1639
105
        check_outer_plan = true;
1640
105
        type = castNode(Var, tle->expr)->vartype;
1641
105
      }
1642
13
      else if (IsA(tle->expr, Const))
1643
13
      {
1644
13
        Const* const_node = castNode(Const, tle->expr);
1645
13
        if (const_node->constisnull)
1646
          /* NULL has a type UNKNOWNOID which isn't very helpful. */
1647
6
          type = aggref->aggtranstype;
1648
7
        else if (!const_node->constbyval)
1649
          /* Do not support pointer-based constants yet. */
1650
0
          return;
1651
7
        else
1652
7
          type = const_node->consttype;
1653
13
      }
1654
0
      else
1655
0
        return;
1656
1657
      /*
1658
       * Only support types that are allowed to be YB keys as we cannot guarantee
1659
       * we can safely perform postgres semantic compatible DocDB aggregate evaluation
1660
       * otherwise.
1661
       */
1662
118
      if (!YbDataTypeIsValidForKey(type))
1663
0
        return;
1664
118
    }
1665
244
  }
1666
1667
199
  if (check_outer_plan)
1668
65
  {
1669
    /*
1670
     * Check outer plan to reject case such as:
1671
     *   create table foo(c0 decimal);
1672
     *   select sum(r) from (select random() as r from foo) as res;
1673
     *   select sum(r) from (select (null=random())::int as r from foo) as res;
1674
     * However check_outer_plan will be false for case such as:
1675
     *   select sum(1) from (select random() as r from foo) as res;
1676
     *   select sum(1) from (select (null=random())::int as r from foo) as res;
1677
     * and pushdown will still be supported.
1678
     * For simplicity, we do not try to match Var between aggref->args and outplan
1679
     * targetlist and simply reject once we see any item that is not a simple column
1680
     * reference.
1681
     */
1682
65
    ListCell   *t;
1683
65
    foreach(t, outerPlanState(aggstate)->plan->targetlist)
1684
387
    {
1685
387
      TargetEntry *tle = lfirst_node(TargetEntry, t);
1686
1687
387
      if (!IsA(tle->expr, Var) || IS_SPECIAL_VARNO(castNode(Var, tle->expr)->varno))
1688
4
        return;
1689
387
    }
1690
65
  }
1691
1692
  /* If this is reached, YB pushdown is supported. */
1693
195
  aggstate->yb_pushdown_supported = true;
1694
195
}
1695
1696
/*
1697
 * Populates aggregate pushdown information in the YB foreign scan state.
1698
 */
1699
static void
1700
yb_agg_pushdown(AggState *aggstate)
1701
185
{
1702
185
  ForeignScanState *scan_state = castNode(ForeignScanState, outerPlanState(aggstate));
1703
185
  List *pushdown_aggs = NIL;
1704
185
  int aggno;
1705
1706
406
  for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1707
221
  {
1708
221
    Aggref *aggref = aggstate->peragg[aggno].aggref;
1709
1710
221
    pushdown_aggs = lappend(pushdown_aggs, aggref);
1711
221
  }
1712
185
  scan_state->yb_fdw_aggs = pushdown_aggs;
1713
  /* Disable projection for tuples produced by pushed down aggregate operators. */
1714
185
  scan_state->ss.ps.ps_ProjInfo = NULL;
1715
185
}
1716
1717
/*
1718
 * ExecAgg -
1719
 *
1720
 *    ExecAgg receives tuples from its outer subplan and aggregates over
1721
 *    the appropriate attribute for each aggregate function use (Aggref
1722
 *    node) appearing in the targetlist or qual of the node.  The number
1723
 *    of tuples to aggregate over depends on whether grouped or plain
1724
 *    aggregation is selected.  In grouped aggregation, we produce a result
1725
 *    row for each group; in plain aggregation there's a single result row
1726
 *    for the whole query.  In either case, the value of each aggregate is
1727
 *    stored in the expression context to be used when ExecProject evaluates
1728
 *    the result tuple.
1729
 */
1730
static TupleTableSlot *
1731
ExecAgg(PlanState *pstate)
1732
140k
{
1733
140k
  AggState   *node = castNode(AggState, pstate);
1734
140k
  TupleTableSlot *result = NULL;
1735
1736
140k
  CHECK_FOR_INTERRUPTS();
1737
1738
140k
  if (!node->agg_done)
1739
139k
  {
1740
    /*
1741
     * Use default prefetch limit when AGGREGATE is present.
1742
     * Aggregate functions combine multiple rows into one. The final LIMIT can be different from
1743
     * the number of rows to be read. As a result, we have to use default prefetch limit.
1744
     *
1745
     * Pushdown aggregates to DocDB if the plan state meets proper conditions.
1746
     */
1747
139k
    if (IsYugaByteEnabled())
1748
139k
    {
1749
139k
      pstate->state->yb_exec_params.limit_use_default = true;
1750
139k
      if (node->yb_pushdown_supported)
1751
185
        yb_agg_pushdown(node);
1752
139k
    }
1753
1754
    /* Dispatch based on strategy */
1755
139k
    switch (node->phase->aggstrategy)
1756
139k
    {
1757
137k
      case AGG_HASHED:
1758
137k
        if (!node->table_filled)
1759
1.10k
          agg_fill_hash_table(node);
1760
137k
        switch_fallthrough();
1761
137k
      case AGG_MIXED:
1762
137k
        result = agg_retrieve_hash_table(node);
1763
137k
        break;
1764
1.76k
      case AGG_PLAIN:
1765
1.89k
      case AGG_SORTED:
1766
1.89k
        result = agg_retrieve_direct(node);
1767
1.89k
        break;
1768
139k
    }
1769
1770
139k
    if (!TupIsNull(result))
1771
138k
      return result;
1772
1.81k
  }
1773
1774
1.81k
  return NULL;
1775
1.81k
}
1776
1777
/*
1778
 * ExecAgg for non-hashed case
1779
 */
1780
static TupleTableSlot *
1781
agg_retrieve_direct(AggState *aggstate)
1782
1.89k
{
1783
1.89k
  Agg      *node = aggstate->phase->aggnode;
1784
1.89k
  ExprContext *econtext;
1785
1.89k
  ExprContext *tmpcontext;
1786
1.89k
  AggStatePerAgg peragg;
1787
1.89k
  AggStatePerGroup *pergroups;
1788
1.89k
  TupleTableSlot *outerslot;
1789
1.89k
  TupleTableSlot *firstSlot;
1790
1.89k
  TupleTableSlot *result;
1791
1.89k
  bool    hasGroupingSets = aggstate->phase->numsets > 0;
1792
1.89k
  int     numGroupingSets = Max(aggstate->phase->numsets, 1);
1793
1.89k
  int     currentSet;
1794
1.89k
  int     nextSetSize;
1795
1.89k
  int     numReset;
1796
1.89k
  int     i;
1797
1.89k
  int     aggno;
1798
1799
  /*
1800
   * get state info from node
1801
   *
1802
   * econtext is the per-output-tuple expression context
1803
   *
1804
   * tmpcontext is the per-input-tuple expression context
1805
   */
1806
1.89k
  econtext = aggstate->ss.ps.ps_ExprContext;
1807
1.89k
  tmpcontext = aggstate->tmpcontext;
1808
1809
1.89k
  peragg = aggstate->peragg;
1810
1.89k
  pergroups = aggstate->pergroups;
1811
1.89k
  firstSlot = aggstate->ss.ss_ScanTupleSlot;
1812
1813
  /*
1814
   * We loop retrieving groups until we find one matching
1815
   * aggstate->ss.ps.qual
1816
   *
1817
   * For grouping sets, we have the invariant that aggstate->projected_set
1818
   * is either -1 (initial call) or the index (starting from 0) in
1819
   * gset_lengths for the group we just completed (either by projecting a
1820
   * row or by discarding it in the qual).
1821
   */
1822
1.96k
  while (!aggstate->agg_done)
1823
1.90k
  {
1824
    /*
1825
     * Clear the per-output-tuple context for each group, as well as
1826
     * aggcontext (which contains any pass-by-ref transvalues of the old
1827
     * group).  Some aggregate functions store working state in child
1828
     * contexts; those now get reset automatically without us needing to
1829
     * do anything special.
1830
     *
1831
     * We use ReScanExprContext not just ResetExprContext because we want
1832
     * any registered shutdown callbacks to be called.  That allows
1833
     * aggregate functions to ensure they've cleaned up any non-memory
1834
     * resources.
1835
     */
1836
1.90k
    ReScanExprContext(econtext);
1837
1838
    /*
1839
     * Determine how many grouping sets need to be reset at this boundary.
1840
     */
1841
1.90k
    if (aggstate->projected_set >= 0 &&
1842
119
      aggstate->projected_set < numGroupingSets)
1843
119
      numReset = aggstate->projected_set + 1;
1844
1.78k
    else
1845
1.78k
      numReset = numGroupingSets;
1846
1847
    /*
1848
     * numReset can change on a phase boundary, but that's OK; we want to
1849
     * reset the contexts used in _this_ phase, and later, after possibly
1850
     * changing phase, initialize the right number of aggregates for the
1851
     * _new_ phase.
1852
     */
1853
1854
3.80k
    for (i = 0; i < numReset; i++)
1855
1.90k
    {
1856
1.90k
      ReScanExprContext(aggstate->aggcontexts[i]);
1857
1.90k
    }
1858
1859
    /*
1860
     * Check if input is complete and there are no more groups to project
1861
     * in this phase; move to next phase or mark as done.
1862
     */
1863
1.90k
    if (aggstate->input_done == true &&
1864
2
      aggstate->projected_set >= (numGroupingSets - 1))
1865
2
    {
1866
2
      if (aggstate->current_phase < aggstate->numphases - 1)
1867
0
      {
1868
0
        initialize_phase(aggstate, aggstate->current_phase + 1);
1869
0
        aggstate->input_done = false;
1870
0
        aggstate->projected_set = -1;
1871
0
        numGroupingSets = Max(aggstate->phase->numsets, 1);
1872
0
        node = aggstate->phase->aggnode;
1873
0
        numReset = numGroupingSets;
1874
0
      }
1875
2
      else if (aggstate->aggstrategy == AGG_MIXED)
1876
2
      {
1877
        /*
1878
         * Mixed mode; we've output all the grouped stuff and have
1879
         * full hashtables, so switch to outputting those.
1880
         */
1881
2
        initialize_phase(aggstate, 0);
1882
2
        aggstate->table_filled = true;
1883
2
        ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1884
2
                     &aggstate->perhash[0].hashiter);
1885
2
        select_current_set(aggstate, 0, true);
1886
2
        return agg_retrieve_hash_table(aggstate);
1887
2
      }
1888
0
      else
1889
0
      {
1890
0
        aggstate->agg_done = true;
1891
0
        break;
1892
0
      }
1893
1.90k
    }
1894
1895
    /*
1896
     * Get the number of columns in the next grouping set after the last
1897
     * projected one (if any). This is the number of columns to compare to
1898
     * see if we reached the boundary of that set too.
1899
     */
1900
1.90k
    if (aggstate->projected_set >= 0 &&
1901
117
      aggstate->projected_set < (numGroupingSets - 1))
1902
0
      nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1903
1.90k
    else
1904
1.90k
      nextSetSize = 0;
1905
1906
    /*----------
1907
     * If a subgroup for the current grouping set is present, project it.
1908
     *
1909
     * We have a new group if:
1910
     *  - we're out of input but haven't projected all grouping sets
1911
     *    (checked above)
1912
     * OR
1913
     *    - we already projected a row that wasn't from the last grouping
1914
     *    set
1915
     *    AND
1916
     *    - the next grouping set has at least one grouping column (since
1917
     *    empty grouping sets project only once input is exhausted)
1918
     *    AND
1919
     *    - the previous and pending rows differ on the grouping columns
1920
     *    of the next grouping set
1921
     *----------
1922
     */
1923
1.90k
    tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1924
1.90k
    if (aggstate->input_done ||
1925
1.90k
      (node->aggstrategy != AGG_PLAIN &&
1926
137
       aggstate->projected_set != -1 &&
1927
117
       aggstate->projected_set < (numGroupingSets - 1) &&
1928
0
       nextSetSize > 0 &&
1929
0
       !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1930
0
                 tmpcontext)))
1931
0
    {
1932
0
      aggstate->projected_set += 1;
1933
1934
0
      Assert(aggstate->projected_set < numGroupingSets);
1935
0
      Assert(nextSetSize > 0 || aggstate->input_done);
1936
0
    }
1937
1.90k
    else if (aggstate->yb_pushdown_supported)
1938
185
    {
1939
185
      aggstate->projected_set = 0;
1940
185
      currentSet = aggstate->projected_set;
1941
185
      select_current_set(aggstate, currentSet, false);
1942
1943
      /* Initialize aggregates. */
1944
185
      initialize_aggregates(aggstate, pergroups, numReset);
1945
1946
      /*
1947
       * Aggs were pushed down to YB, so handle returned aggregate results. The slot
1948
       * contains one value for each aggno, and there is one result per RPC response.
1949
       * We need to aggregate the results from all responses.
1950
       *
1951
       * We special case for COUNT and sum values so it returns the proper count
1952
       * aggregated across all responses.
1953
       */
1954
185
      for (;;)
1955
530
      {
1956
530
        outerslot = fetch_input_tuple(aggstate);
1957
530
        if (TupIsNull(outerslot))
1958
185
        {
1959
185
          aggstate->agg_done = true;
1960
185
          break;
1961
185
        }
1962
1963
345
        Assert(aggstate->numaggs == outerslot->tts_nvalid);
1964
1965
755
        for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1966
410
        {
1967
410
          MemoryContext oldContext;
1968
410
          int transno = peragg[aggno].transno;
1969
410
          Aggref *aggref = aggstate->peragg[aggno].aggref;
1970
410
          char *func_name = get_func_name(aggref->aggfnoid);
1971
410
          AggStatePerGroup pergroup = pergroups[currentSet];
1972
410
          AggStatePerGroup pergroupstate = &pergroup[transno];
1973
410
          AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1974
410
          FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1975
410
          Datum value = outerslot->tts_values[aggno];
1976
410
          bool isnull = outerslot->tts_isnull[aggno];
1977
1978
410
          if (strcmp(func_name, "count") == 0)
1979
266
          {
1980
            /*
1981
             * Sum results from each response for COUNT. It is safe to do this
1982
             * directly on the datum as it is guaranteed to be an int64.
1983
             */
1984
266
            oldContext = MemoryContextSwitchTo(
1985
266
              aggstate->curaggcontext->ecxt_per_tuple_memory);
1986
266
            pergroupstate->transValue += value;
1987
266
            MemoryContextSwitchTo(oldContext);
1988
266
          }
1989
144
          else
1990
144
          {
1991
            /* Set slot result as argument, then advance the transition function. */
1992
144
            fcinfo->arg[1] = value;
1993
144
            fcinfo->argnull[1] = isnull;
1994
144
            advance_transition_function(aggstate, pertrans, pergroupstate);
1995
144
          }
1996
410
        }
1997
1998
        /* Reset per-input-tuple context after each tuple */
1999
345
        ResetExprContext(tmpcontext);
2000
345
      }
2001
185
    }
2002
1.71k
    else
2003
1.71k
    {
2004
      /*
2005
       * We no longer care what group we just projected, the next
2006
       * projection will always be the first (or only) grouping set
2007
       * (unless the input proves to be empty).
2008
       */
2009
1.71k
      aggstate->projected_set = 0;
2010
2011
      /*
2012
       * If we don't already have the first tuple of the new group,
2013
       * fetch it from the outer plan.
2014
       */
2015
1.71k
      if (aggstate->grp_firstTuple == NULL)
2016
1.59k
      {
2017
1.59k
        outerslot = fetch_input_tuple(aggstate);
2018
1.59k
        if (!TupIsNull(outerslot))
2019
1.49k
        {
2020
          /*
2021
           * Make a copy of the first input tuple; we will use this
2022
           * for comparisons (in group mode) and for projection.
2023
           */
2024
1.49k
          aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2025
1.49k
        }
2026
101
        else
2027
101
        {
2028
          /* outer plan produced no tuples at all */
2029
101
          if (hasGroupingSets)
2030
0
          {
2031
            /*
2032
             * If there was no input at all, we need to project
2033
             * rows only if there are grouping sets of size 0.
2034
             * Note that this implies that there can't be any
2035
             * references to ungrouped Vars, which would otherwise
2036
             * cause issues with the empty output slot.
2037
             *
2038
             * XXX: This is no longer true, we currently deal with
2039
             * this in finalize_aggregates().
2040
             */
2041
0
            aggstate->input_done = true;
2042
2043
0
            while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2044
0
            {
2045
0
              aggstate->projected_set += 1;
2046
0
              if (aggstate->projected_set >= numGroupingSets)
2047
0
              {
2048
                /*
2049
                 * We can't set agg_done here because we might
2050
                 * have more phases to do, even though the
2051
                 * input is empty. So we need to restart the
2052
                 * whole outer loop.
2053
                 */
2054
0
                break;
2055
0
              }
2056
0
            }
2057
2058
0
            if (aggstate->projected_set >= numGroupingSets)
2059
0
              continue;
2060
101
          }
2061
101
          else
2062
101
          {
2063
101
            aggstate->agg_done = true;
2064
            /* If we are grouping, we should produce no tuples too */
2065
101
            if (node->aggstrategy != AGG_PLAIN)
2066
0
              return NULL;
2067
1.71k
          }
2068
101
        }
2069
1.59k
      }
2070
2071
      /*
2072
       * Initialize working state for a new input tuple group.
2073
       */
2074
1.71k
      initialize_aggregates(aggstate, pergroups, numReset);
2075
2076
1.71k
      if (aggstate->grp_firstTuple != NULL)
2077
1.61k
      {
2078
        /*
2079
         * Store the copied first input tuple in the tuple table slot
2080
         * reserved for it.  The tuple will be deleted when it is
2081
         * cleared from the slot.
2082
         */
2083
1.61k
        ExecStoreHeapTuple(aggstate->grp_firstTuple,
2084
1.61k
                   firstSlot,
2085
1.61k
                   true);
2086
1.61k
        aggstate->grp_firstTuple = NULL;  /* don't keep two pointers */
2087
2088
        /* set up for first advance_aggregates call */
2089
1.61k
        tmpcontext->ecxt_outertuple = firstSlot;
2090
2091
        /*
2092
         * Process each outer-plan tuple, and then fetch the next one,
2093
         * until we exhaust the outer plan or cross a group boundary.
2094
         */
2095
1.61k
        for (;;)
2096
515k
        {
2097
          /*
2098
           * During phase 1 only of a mixed agg, we need to update
2099
           * hashtables as well in advance_aggregates.
2100
           */
2101
515k
          if (aggstate->aggstrategy == AGG_MIXED &&
2102
600
            aggstate->current_phase == 1)
2103
600
          {
2104
600
            lookup_hash_entries(aggstate);
2105
600
          }
2106
2107
          /* Advance the aggregates (or combine functions) */
2108
515k
          advance_aggregates(aggstate);
2109
2110
          /* Reset per-input-tuple context after each tuple */
2111
515k
          ResetExprContext(tmpcontext);
2112
2113
515k
          outerslot = fetch_input_tuple(aggstate);
2114
515k
          if (TupIsNull(outerslot))
2115
1.49k
          {
2116
            /* no more outer-plan tuples available */
2117
1.49k
            if (hasGroupingSets)
2118
2
            {
2119
2
              aggstate->input_done = true;
2120
2
              break;
2121
2
            }
2122
1.49k
            else
2123
1.49k
            {
2124
1.49k
              aggstate->agg_done = true;
2125
1.49k
              break;
2126
1.49k
            }
2127
513k
          }
2128
          /* set up for next advance_aggregates call */
2129
513k
          tmpcontext->ecxt_outertuple = outerslot;
2130
2131
          /*
2132
           * If we are grouping, check whether we've crossed a group
2133
           * boundary.
2134
           */
2135
513k
          if (node->aggstrategy != AGG_PLAIN)
2136
31.0k
          {
2137
31.0k
            tmpcontext->ecxt_innertuple = firstSlot;
2138
31.0k
            if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2139
31.0k
                    tmpcontext))
2140
117
            {
2141
117
              aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2142
117
              break;
2143
117
            }
2144
31.0k
          }
2145
513k
        }
2146
1.61k
      }
2147
2148
      /*
2149
       * Use the representative input tuple for any references to
2150
       * non-aggregated input columns in aggregate direct args, the node
2151
       * qual, and the tlist.  (If we are not grouping, and there are no
2152
       * input rows at all, we will come here with an empty firstSlot
2153
       * ... but if not grouping, there can't be any references to
2154
       * non-aggregated input columns, so no problem.)
2155
       */
2156
1.71k
      econtext->ecxt_outertuple = firstSlot;
2157
1.71k
    }
2158
2159
1.90k
    Assert(aggstate->projected_set >= 0);
2160
2161
1.90k
    currentSet = aggstate->projected_set;
2162
2163
1.90k
    prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2164
2165
1.90k
    select_current_set(aggstate, currentSet, false);
2166
2167
1.90k
    finalize_aggregates(aggstate,
2168
1.90k
              peragg,
2169
1.90k
              pergroups[currentSet]);
2170
2171
    /*
2172
     * If there's no row to project right now, we must continue rather
2173
     * than returning a null since there might be more groups.
2174
     */
2175
1.90k
    result = project_aggregates(aggstate);
2176
1.90k
    if (result)
2177
1.83k
      return result;
2178
1.90k
  }
2179
2180
  /* No more groups */
2181
58
  return NULL;
2182
1.89k
}
2183
2184
/*
2185
 * ExecAgg for hashed case: read input and build hash table
2186
 */
2187
static void
2188
agg_fill_hash_table(AggState *aggstate)
2189
1.10k
{
2190
1.10k
  TupleTableSlot *outerslot;
2191
1.10k
  ExprContext *tmpcontext = aggstate->tmpcontext;
2192
2193
  /*
2194
   * Process each outer-plan tuple, and then fetch the next one, until we
2195
   * exhaust the outer plan.
2196
   */
2197
1.10k
  for (;;)
2198
296k
  {
2199
296k
    outerslot = fetch_input_tuple(aggstate);
2200
296k
    if (TupIsNull(outerslot))
2201
1.10k
      break;
2202
2203
    /* set up for lookup_hash_entries and advance_aggregates */
2204
295k
    tmpcontext->ecxt_outertuple = outerslot;
2205
2206
    /* Find or build hashtable entries */
2207
295k
    lookup_hash_entries(aggstate);
2208
2209
    /* Advance the aggregates (or combine functions) */
2210
295k
    advance_aggregates(aggstate);
2211
2212
    /*
2213
     * Reset per-input-tuple context after each tuple, but note that the
2214
     * hash lookups do this too
2215
     */
2216
295k
    ResetExprContext(aggstate->tmpcontext);
2217
295k
  }
2218
2219
1.10k
  aggstate->table_filled = true;
2220
  /* Initialize to walk the first hash table */
2221
1.10k
  select_current_set(aggstate, 0, true);
2222
1.10k
  ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2223
1.10k
               &aggstate->perhash[0].hashiter);
2224
1.10k
}
2225
2226
/*
2227
 * ExecAgg for hashed case: retrieving groups from hash table
2228
 */
2229
static TupleTableSlot *
2230
agg_retrieve_hash_table(AggState *aggstate)
2231
137k
{
2232
137k
  ExprContext *econtext;
2233
137k
  AggStatePerAgg peragg;
2234
137k
  AggStatePerGroup pergroup;
2235
137k
  TupleHashEntryData *entry;
2236
137k
  TupleTableSlot *firstSlot;
2237
137k
  TupleTableSlot *result;
2238
137k
  AggStatePerHash perhash;
2239
2240
  /*
2241
   * get state info from node.
2242
   *
2243
   * econtext is the per-output-tuple expression context.
2244
   */
2245
137k
  econtext = aggstate->ss.ps.ps_ExprContext;
2246
137k
  peragg = aggstate->peragg;
2247
137k
  firstSlot = aggstate->ss.ss_ScanTupleSlot;
2248
2249
  /*
2250
   * Note that perhash (and therefore anything accessed through it) can
2251
   * change inside the loop, as we change between grouping sets.
2252
   */
2253
137k
  perhash = &aggstate->perhash[aggstate->current_set];
2254
2255
  /*
2256
   * We loop retrieving groups until we find one satisfying
2257
   * aggstate->ss.ps.qual
2258
   */
2259
137k
  while (!aggstate->agg_done)
2260
137k
  {
2261
137k
    TupleTableSlot *hashslot = perhash->hashslot;
2262
137k
    int     i;
2263
2264
137k
    CHECK_FOR_INTERRUPTS();
2265
2266
    /*
2267
     * Find the next entry in the hash table
2268
     */
2269
137k
    entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2270
137k
    if (entry == NULL)
2271
1.09k
    {
2272
1.09k
      int     nextset = aggstate->current_set + 1;
2273
2274
1.09k
      if (nextset < aggstate->num_hashes)
2275
1
      {
2276
        /*
2277
         * Switch to next grouping set, reinitialize, and restart the
2278
         * loop.
2279
         */
2280
1
        select_current_set(aggstate, nextset, true);
2281
2282
1
        perhash = &aggstate->perhash[aggstate->current_set];
2283
2284
1
        ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2285
2286
1
        continue;
2287
1
      }
2288
1.09k
      else
2289
1.09k
      {
2290
        /* No more hashtables, so done */
2291
1.09k
        aggstate->agg_done = true;
2292
1.09k
        return NULL;
2293
1.09k
      }
2294
136k
    }
2295
2296
    /*
2297
     * Clear the per-output-tuple context for each group
2298
     *
2299
     * We intentionally don't use ReScanExprContext here; if any aggs have
2300
     * registered shutdown callbacks, they mustn't be called yet, since we
2301
     * might not be done with that agg.
2302
     */
2303
136k
    ResetExprContext(econtext);
2304
2305
    /*
2306
     * Transform representative tuple back into one with the right
2307
     * columns.
2308
     */
2309
136k
    ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2310
136k
    slot_getallattrs(hashslot);
2311
2312
136k
    ExecClearTuple(firstSlot);
2313
136k
    memset(firstSlot->tts_isnull, true,
2314
136k
         firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2315
2316
375k
    for (i = 0; i < perhash->numhashGrpCols; i++)
2317
238k
    {
2318
238k
      int     varNumber = perhash->hashGrpColIdxInput[i] - 1;
2319
2320
238k
      firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2321
238k
      firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2322
238k
    }
2323
136k
    ExecStoreVirtualTuple(firstSlot);
2324
2325
136k
    pergroup = (AggStatePerGroup) entry->additional;
2326
2327
    /*
2328
     * Use the representative input tuple for any references to
2329
     * non-aggregated input columns in the qual and tlist.
2330
     */
2331
136k
    econtext->ecxt_outertuple = firstSlot;
2332
2333
136k
    prepare_projection_slot(aggstate,
2334
136k
                econtext->ecxt_outertuple,
2335
136k
                aggstate->current_set);
2336
2337
136k
    finalize_aggregates(aggstate, peragg, pergroup);
2338
2339
136k
    result = project_aggregates(aggstate);
2340
136k
    if (result)
2341
136k
      return result;
2342
136k
  }
2343
2344
  /* No more groups */
2345
0
  return NULL;
2346
137k
}
2347
2348
/* -----------------
2349
 * ExecInitAgg
2350
 *
2351
 *  Creates the run-time information for the agg node produced by the
2352
 *  planner and initializes its outer subtree.
2353
 *
2354
 * -----------------
2355
 */
2356
AggState *
2357
ExecInitAgg(Agg *node, EState *estate, int eflags)
2358
2.02k
{
2359
2.02k
  AggState   *aggstate;
2360
2.02k
  AggStatePerAgg peraggs;
2361
2.02k
  AggStatePerTrans pertransstates;
2362
2.02k
  AggStatePerGroup *pergroups;
2363
2.02k
  Plan     *outerPlan;
2364
2.02k
  ExprContext *econtext;
2365
2.02k
  TupleDesc scanDesc;
2366
2.02k
  int     numaggs,
2367
2.02k
        transno,
2368
2.02k
        aggno;
2369
2.02k
  int     phase;
2370
2.02k
  int     phaseidx;
2371
2.02k
  ListCell   *l;
2372
2.02k
  Bitmapset  *all_grouped_cols = NULL;
2373
2.02k
  int     numGroupingSets = 1;
2374
2.02k
  int     numPhases;
2375
2.02k
  int     numHashes;
2376
2.02k
  int     i = 0;
2377
2.02k
  int     j = 0;
2378
2.02k
  bool    use_hashing = (node->aggstrategy == AGG_HASHED ||
2379
1.89k
                 node->aggstrategy == AGG_MIXED);
2380
2381
  /* check for unsupported flags */
2382
2.02k
  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2383
2384
  /*
2385
   * create state structure
2386
   */
2387
2.02k
  aggstate = makeNode(AggState);
2388
2.02k
  aggstate->ss.ps.plan = (Plan *) node;
2389
2.02k
  aggstate->ss.ps.state = estate;
2390
2.02k
  aggstate->ss.ps.ExecProcNode = ExecAgg;
2391
2392
2.02k
  aggstate->aggs = NIL;
2393
2.02k
  aggstate->numaggs = 0;
2394
2.02k
  aggstate->numtrans = 0;
2395
2.02k
  aggstate->aggstrategy = node->aggstrategy;
2396
2.02k
  aggstate->aggsplit = node->aggsplit;
2397
2.02k
  aggstate->maxsets = 0;
2398
2.02k
  aggstate->projected_set = -1;
2399
2.02k
  aggstate->current_set = 0;
2400
2.02k
  aggstate->peragg = NULL;
2401
2.02k
  aggstate->pertrans = NULL;
2402
2.02k
  aggstate->curperagg = NULL;
2403
2.02k
  aggstate->curpertrans = NULL;
2404
2.02k
  aggstate->input_done = false;
2405
2.02k
  aggstate->agg_done = false;
2406
2.02k
  aggstate->pergroups = NULL;
2407
2.02k
  aggstate->grp_firstTuple = NULL;
2408
2.02k
  aggstate->sort_in = NULL;
2409
2.02k
  aggstate->sort_out = NULL;
2410
2411
  /*
2412
   * phases[0] always exists, but is dummy in sorted/plain mode
2413
   */
2414
1.89k
  numPhases = (use_hashing ? 1 : 2);
2415
1.89k
  numHashes = (use_hashing ? 1 : 0);
2416
2417
  /*
2418
   * Calculate the maximum number of grouping sets in any phase; this
2419
   * determines the size of some allocations.  Also calculate the number of
2420
   * phases, since all hashed/mixed nodes contribute to only a single phase.
2421
   */
2422
2.02k
  if (node->groupingSets)
2423
6
  {
2424
6
    numGroupingSets = list_length(node->groupingSets);
2425
2426
6
    foreach(l, node->chain)
2427
6
    {
2428
6
      Agg      *agg = lfirst(l);
2429
2430
6
      numGroupingSets = Max(numGroupingSets,
2431
6
                  list_length(agg->groupingSets));
2432
2433
      /*
2434
       * additional AGG_HASHED aggs become part of phase 0, but all
2435
       * others add an extra phase.
2436
       */
2437
6
      if (agg->aggstrategy != AGG_HASHED)
2438
4
        ++numPhases;
2439
2
      else
2440
2
        ++numHashes;
2441
6
    }
2442
6
  }
2443
2444
2.02k
  aggstate->maxsets = numGroupingSets;
2445
2.02k
  aggstate->numphases = numPhases;
2446
2447
2.02k
  aggstate->aggcontexts = (ExprContext **)
2448
2.02k
    palloc0(sizeof(ExprContext *) * numGroupingSets);
2449
2450
  /*
2451
   * Create expression contexts.  We need three or more, one for
2452
   * per-input-tuple processing, one for per-output-tuple processing, one
2453
   * for all the hashtables, and one for each grouping set.  The per-tuple
2454
   * memory context of the per-grouping-set ExprContexts (aggcontexts)
2455
   * replaces the standalone memory context formerly used to hold transition
2456
   * values.  We cheat a little by using ExecAssignExprContext() to build
2457
   * all of them.
2458
   *
2459
   * NOTE: the details of what is stored in aggcontexts and what is stored
2460
   * in the regular per-query memory context are driven by a simple
2461
   * decision: we want to reset the aggcontext at group boundaries (if not
2462
   * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2463
   */
2464
2.02k
  ExecAssignExprContext(estate, &aggstate->ss.ps);
2465
2.02k
  aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2466
2467
4.04k
  for (i = 0; i < numGroupingSets; ++i)
2468
2.02k
  {
2469
2.02k
    ExecAssignExprContext(estate, &aggstate->ss.ps);
2470
2.02k
    aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2471
2.02k
  }
2472
2473
2.02k
  if (use_hashing)
2474
130
  {
2475
130
    ExecAssignExprContext(estate, &aggstate->ss.ps);
2476
130
    aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2477
130
  }
2478
2479
2.02k
  ExecAssignExprContext(estate, &aggstate->ss.ps);
2480
2481
  /*
2482
   * Initialize child nodes.
2483
   *
2484
   * If we are doing a hashed aggregation then the child plan does not need
2485
   * to handle REWIND efficiently; see ExecReScanAgg.
2486
   */
2487
2.02k
  if (node->aggstrategy == AGG_HASHED)
2488
126
    eflags &= ~EXEC_FLAG_REWIND;
2489
2.02k
  outerPlan = outerPlan(node);
2490
2.02k
  outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2491
2492
  /*
2493
   * initialize source tuple type.
2494
   */
2495
2.02k
  ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss);
2496
2.02k
  scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2497
2.02k
  if (node->chain)
2498
6
    aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2499
2500
  /*
2501
   * Initialize result type, slot and projection.
2502
   */
2503
2.02k
  ExecInitResultTupleSlotTL(&aggstate->ss.ps);
2504
2.02k
  ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2505
2506
  /*
2507
   * initialize child expressions
2508
   *
2509
   * We expect the parser to have checked that no aggs contain other agg
2510
   * calls in their arguments (and just to be sure, we verify it again while
2511
   * initializing the plan node).  This would make no sense under SQL
2512
   * semantics, and it's forbidden by the spec.  Because it is true, we
2513
   * don't need to worry about evaluating the aggs in any particular order.
2514
   *
2515
   * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2516
   * nodes to aggstate->aggs.  Aggrefs in the qual are found here; Aggrefs
2517
   * in the targetlist are found during ExecAssignProjectionInfo, below.
2518
   */
2519
2.02k
  aggstate->ss.ps.qual =
2520
2.02k
    ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2521
2522
  /*
2523
   * We should now have found all Aggrefs in the targetlist and quals.
2524
   */
2525
2.02k
  numaggs = aggstate->numaggs;
2526
2.02k
  Assert(numaggs == list_length(aggstate->aggs));
2527
2528
  /*
2529
   * For each phase, prepare grouping set data and fmgr lookup data for
2530
   * compare functions.  Accumulate all_grouped_cols in passing.
2531
   */
2532
2.02k
  aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2533
2534
2.02k
  aggstate->num_hashes = numHashes;
2535
2.02k
  if (numHashes)
2536
130
  {
2537
130
    aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2538
130
    aggstate->phases[0].numsets = 0;
2539
130
    aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2540
130
    aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2541
130
  }
2542
2543
2.02k
  phase = 0;
2544
4.05k
  for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2545
2.02k
  {
2546
2.02k
    Agg      *aggnode;
2547
2.02k
    Sort     *sortnode;
2548
2549
2.02k
    if (phaseidx > 0)
2550
6
    {
2551
6
      aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2552
6
      sortnode = castNode(Sort, aggnode->plan.lefttree);
2553
6
    }
2554
2.02k
    else
2555
2.02k
    {
2556
2.02k
      aggnode = node;
2557
2.02k
      sortnode = NULL;
2558
2.02k
    }
2559
2560
2.02k
    Assert(phase <= 1 || sortnode);
2561
2562
2.02k
    if (aggnode->aggstrategy == AGG_HASHED
2563
1.90k
      || aggnode->aggstrategy == AGG_MIXED)
2564
132
    {
2565
132
      AggStatePerPhase phasedata = &aggstate->phases[0];
2566
132
      AggStatePerHash perhash;
2567
132
      Bitmapset  *cols = NULL;
2568
2569
132
      Assert(phase == 0);
2570
132
      i = phasedata->numsets++;
2571
132
      perhash = &aggstate->perhash[i];
2572
2573
      /* phase 0 always points to the "real" Agg in the hash case */
2574
132
      phasedata->aggnode = node;
2575
132
      phasedata->aggstrategy = node->aggstrategy;
2576
2577
      /* but the actual Agg node representing this hash is saved here */
2578
132
      perhash->aggnode = aggnode;
2579
2580
132
      phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2581
2582
279
      for (j = 0; j < aggnode->numCols; ++j)
2583
147
        cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2584
2585
132
      phasedata->grouped_cols[i] = cols;
2586
2587
132
      all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2588
132
      continue;
2589
1.89k
    }
2590
1.89k
    else
2591
1.89k
    {
2592
1.89k
      AggStatePerPhase phasedata = &aggstate->phases[++phase];
2593
1.89k
      int     num_sets;
2594
2595
1.89k
      phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2596
2597
1.89k
      if (num_sets)
2598
4
      {
2599
4
        phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2600
4
        phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2601
2602
4
        i = 0;
2603
4
        foreach(l, aggnode->groupingSets)
2604
4
        {
2605
4
          int     current_length = list_length(lfirst(l));
2606
4
          Bitmapset  *cols = NULL;
2607
2608
          /* planner forces this to be correct */
2609
4
          for (j = 0; j < current_length; ++j)
2610
0
            cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2611
2612
4
          phasedata->grouped_cols[i] = cols;
2613
4
          phasedata->gset_lengths[i] = current_length;
2614
2615
4
          ++i;
2616
4
        }
2617
2618
4
        all_grouped_cols = bms_add_members(all_grouped_cols,
2619
4
                           phasedata->grouped_cols[0]);
2620
4
      }
2621
1.89k
      else
2622
1.89k
      {
2623
1.89k
        Assert(phaseidx == 0);
2624
2625
1.89k
        phasedata->gset_lengths = NULL;
2626
1.89k
        phasedata->grouped_cols = NULL;
2627
1.89k
      }
2628
2629
      /*
2630
       * If we are grouping, precompute fmgr lookup data for inner loop.
2631
       */
2632
1.89k
      if (aggnode->aggstrategy == AGG_SORTED)
2633
25
      {
2634
25
        int     i = 0;
2635
2636
25
        Assert(aggnode->numCols > 0);
2637
2638
        /*
2639
         * Build a separate function for each subset of columns that
2640
         * need to be compared.
2641
         */
2642
25
        phasedata->eqfunctions =
2643
25
          (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2644
2645
        /* for each grouping set */
2646
25
        for (i = 0; i < phasedata->numsets; i++)
2647
0
        {
2648
0
          int     length = phasedata->gset_lengths[i];
2649
2650
0
          if (phasedata->eqfunctions[length - 1] != NULL)
2651
0
            continue;
2652
2653
0
          phasedata->eqfunctions[length - 1] =
2654
0
            execTuplesMatchPrepare(scanDesc,
2655
0
                         length,
2656
0
                         aggnode->grpColIdx,
2657
0
                         aggnode->grpOperators,
2658
0
                         (PlanState *) aggstate);
2659
0
        }
2660
2661
        /* and for all grouped columns, unless already computed */
2662
25
        if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2663
25
        {
2664
25
          phasedata->eqfunctions[aggnode->numCols - 1] =
2665
25
            execTuplesMatchPrepare(scanDesc,
2666
25
                         aggnode->numCols,
2667
25
                         aggnode->grpColIdx,
2668
25
                         aggnode->grpOperators,
2669
25
                         (PlanState *) aggstate);
2670
25
        }
2671
25
      }
2672
2673
1.89k
      phasedata->aggnode = aggnode;
2674
1.89k
      phasedata->aggstrategy = aggnode->aggstrategy;
2675
1.89k
      phasedata->sortnode = sortnode;
2676
1.89k
    }
2677
2.02k
  }
2678
2679
  /*
2680
   * Convert all_grouped_cols to a descending-order list.
2681
   */
2682
2.02k
  i = -1;
2683
2.17k
  while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2684
147
    aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2685
2686
  /*
2687
   * Set up aggregate-result storage in the output expr context, and also
2688
   * allocate my private per-agg working storage
2689
   */
2690
2.02k
  econtext = aggstate->ss.ps.ps_ExprContext;
2691
2.02k
  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2692
2.02k
  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2693
2694
2.02k
  peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2695
2.02k
  pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2696
2697
2.02k
  aggstate->peragg = peraggs;
2698
2.02k
  aggstate->pertrans = pertransstates;
2699
2700
2701
2.02k
  aggstate->all_pergroups =
2702
2.02k
    (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2703
2.02k
                   * (numGroupingSets + numHashes));
2704
2.02k
  pergroups = aggstate->all_pergroups;
2705
2706
2.02k
  if (node->aggstrategy != AGG_HASHED)
2707
1.89k
  {
2708
3.79k
    for (i = 0; i < numGroupingSets; i++)
2709
1.89k
    {
2710
1.89k
      pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2711
1.89k
                            * numaggs);
2712
1.89k
    }
2713
2714
1.89k
    aggstate->pergroups = pergroups;
2715
1.89k
    pergroups += numGroupingSets;
2716
1.89k
  }
2717
2718
  /*
2719
   * Hashing can only appear in the initial phase.
2720
   */
2721
2.02k
  if (use_hashing)
2722
130
  {
2723
    /* this is an array of pointers, not structures */
2724
130
    aggstate->hash_pergroup = pergroups;
2725
2726
130
    find_hash_columns(aggstate);
2727
2728
    /* Skip massive memory allocation if we are just doing EXPLAIN */
2729
130
    if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
2730
101
      build_hash_table(aggstate);
2731
2732
130
    aggstate->table_filled = false;
2733
130
  }
2734
2735
  /*
2736
   * Initialize current phase-dependent values to initial phase. The initial
2737
   * phase is 1 (first sort pass) for all strategies that use sorting (if
2738
   * hashing is being done too, then phase 0 is processed last); but if only
2739
   * hashing is being done, then phase 0 is all there is.
2740
   */
2741
2.02k
  if (node->aggstrategy == AGG_HASHED)
2742
126
  {
2743
126
    aggstate->current_phase = 0;
2744
126
    initialize_phase(aggstate, 0);
2745
126
    select_current_set(aggstate, 0, true);
2746
126
  }
2747
1.89k
  else
2748
1.89k
  {
2749
1.89k
    aggstate->current_phase = 1;
2750
1.89k
    initialize_phase(aggstate, 1);
2751
1.89k
    select_current_set(aggstate, 0, false);
2752
1.89k
  }
2753
2754
  /* Internally set whether plan supports YB agg pushdown. */
2755
2.02k
  yb_agg_pushdown_supported(aggstate);
2756
2757
  /* -----------------
2758
   * Perform lookups of aggregate function info, and initialize the
2759
   * unchanging fields of the per-agg and per-trans data.
2760
   *
2761
   * We try to optimize by detecting duplicate aggregate functions so that
2762
   * their state and final values are re-used, rather than needlessly being
2763
   * re-calculated independently. We also detect aggregates that are not
2764
   * the same, but which can share the same transition state.
2765
   *
2766
   * Scenarios:
2767
   *
2768
   * 1. Identical aggregate function calls appear in the query:
2769
   *
2770
   *    SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2771
   *
2772
   *    Since these aggregates are identical, we only need to calculate
2773
   *    the value once. Both aggregates will share the same 'aggno' value.
2774
   *
2775
   * 2. Two different aggregate functions appear in the query, but the
2776
   *    aggregates have the same arguments, transition functions and
2777
   *    initial values (and, presumably, different final functions):
2778
   *
2779
   *    SELECT AVG(x), STDDEV(x) FROM ...
2780
   *
2781
   *    In this case we must create a new peragg for the varying aggregate,
2782
   *    and we need to call the final functions separately, but we need
2783
   *    only run the transition function once.  (This requires that the
2784
   *    final functions be nondestructive of the transition state, but
2785
   *    that's required anyway for other reasons.)
2786
   *
2787
   * For either of these optimizations to be valid, all aggregate properties
2788
   * used in the transition phase must be the same, including any modifiers
2789
   * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2790
   * contain any volatile functions.
2791
   * -----------------
2792
   */
2793
2.02k
  aggno = -1;
2794
2.02k
  transno = -1;
2795
2.02k
  foreach(l, aggstate->aggs)
2796
2.18k
  {
2797
2.18k
    AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2798
2.18k
    Aggref     *aggref = aggrefstate->aggref;
2799
2.18k
    AggStatePerAgg peragg;
2800
2.18k
    AggStatePerTrans pertrans;
2801
2.18k
    int     existing_aggno;
2802
2.18k
    int     existing_transno;
2803
2.18k
    List     *same_input_transnos;
2804
2.18k
    Oid     inputTypes[FUNC_MAX_ARGS];
2805
2.18k
    int     numArguments;
2806
2.18k
    int     numDirectArgs;
2807
2.18k
    HeapTuple aggTuple;
2808
2.18k
    Form_pg_aggregate aggform;
2809
2.18k
    AclResult aclresult;
2810
2.18k
    Oid     transfn_oid,
2811
2.18k
          finalfn_oid;
2812
2.18k
    bool    shareable;
2813
2.18k
    Oid     serialfn_oid,
2814
2.18k
          deserialfn_oid;
2815
2.18k
    Expr     *finalfnexpr;
2816
2.18k
    Oid     aggtranstype;
2817
2.18k
    Datum   textInitVal;
2818
2.18k
    Datum   initValue;
2819
2.18k
    bool    initValueIsNull;
2820
2821
    /* Planner should have assigned aggregate to correct level */
2822
2.18k
    Assert(aggref->agglevelsup == 0);
2823
    /* ... and the split mode should match */
2824
2.18k
    Assert(aggref->aggsplit == aggstate->aggsplit);
2825
2826
    /* 1. Check for already processed aggs which can be re-used */
2827
2.18k
    existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2828
2.18k
                        &same_input_transnos);
2829
2.18k
    if (existing_aggno != -1)
2830
16
    {
2831
      /*
2832
       * Existing compatible agg found. so just point the Aggref to the
2833
       * same per-agg struct.
2834
       */
2835
16
      aggrefstate->aggno = existing_aggno;
2836
16
      continue;
2837
16
    }
2838
2839
    /* Mark Aggref state node with assigned index in the result array */
2840
2.16k
    peragg = &peraggs[++aggno];
2841
2.16k
    peragg->aggref = aggref;
2842
2.16k
    aggrefstate->aggno = aggno;
2843
2844
    /* Fetch the pg_aggregate row */
2845
2.16k
    aggTuple = SearchSysCache1(AGGFNOID,
2846
2.16k
                   ObjectIdGetDatum(aggref->aggfnoid));
2847
2.16k
    if (!HeapTupleIsValid(aggTuple))
2848
0
      elog(ERROR, "cache lookup failed for aggregate %u",
2849
2.16k
         aggref->aggfnoid);
2850
2.16k
    aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2851
2852
    /* Check permission to call aggregate function */
2853
2.16k
    aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2854
2.16k
                   ACL_EXECUTE);
2855
2.16k
    if (aclresult != ACLCHECK_OK)
2856
1
      aclcheck_error(aclresult, OBJECT_AGGREGATE,
2857
1
               get_func_name(aggref->aggfnoid));
2858
2.16k
    InvokeFunctionExecuteHook(aggref->aggfnoid);
2859
2860
    /* planner recorded transition state type in the Aggref itself */
2861
2.16k
    aggtranstype = aggref->aggtranstype;
2862
2.16k
    Assert(OidIsValid(aggtranstype));
2863
2864
    /*
2865
     * If this aggregation is performing state combines, then instead of
2866
     * using the transition function, we'll use the combine function
2867
     */
2868
2.16k
    if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2869
4
    {
2870
4
      transfn_oid = aggform->aggcombinefn;
2871
2872
      /* If not set then the planner messed up */
2873
4
      if (!OidIsValid(transfn_oid))
2874
0
        elog(ERROR, "combinefn not set for aggregate function");
2875
4
    }
2876
2.16k
    else
2877
2.16k
      transfn_oid = aggform->aggtransfn;
2878
2879
    /* Final function only required if we're finalizing the aggregates */
2880
2.16k
    if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2881
12
      peragg->finalfn_oid = finalfn_oid = InvalidOid;
2882
2.16k
    else
2883
2.15k
      peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2884
2885
    /*
2886
     * If finalfn is marked read-write, we can't share transition states;
2887
     * but it is okay to share states for AGGMODIFY_SHAREABLE aggs.  Also,
2888
     * if we're not executing the finalfn here, we can share regardless.
2889
     */
2890
2.16k
    shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2891
13
      (finalfn_oid == InvalidOid);
2892
2.16k
    peragg->shareable = shareable;
2893
2894
2.16k
    serialfn_oid = InvalidOid;
2895
2.16k
    deserialfn_oid = InvalidOid;
2896
2897
    /*
2898
     * Check if serialization/deserialization is required.  We only do it
2899
     * for aggregates that have transtype INTERNAL.
2900
     */
2901
2.16k
    if (aggtranstype == INTERNALOID)
2902
205
    {
2903
      /*
2904
       * The planner should only have generated a serialize agg node if
2905
       * every aggregate with an INTERNAL state has a serialization
2906
       * function.  Verify that.
2907
       */
2908
205
      if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2909
0
      {
2910
        /* serialization only valid when not running finalfn */
2911
0
        Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2912
2913
0
        if (!OidIsValid(aggform->aggserialfn))
2914
0
          elog(ERROR, "serialfunc not provided for serialization aggregation");
2915
0
        serialfn_oid = aggform->aggserialfn;
2916
0
      }
2917
2918
      /* Likewise for deserialization functions */
2919
205
      if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2920
0
      {
2921
        /* deserialization only valid when combining states */
2922
0
        Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2923
2924
0
        if (!OidIsValid(aggform->aggdeserialfn))
2925
0
          elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2926
0
        deserialfn_oid = aggform->aggdeserialfn;
2927
0
      }
2928
205
    }
2929
2930
    /* Check that aggregate owner has permission to call component fns */
2931
2.16k
    {
2932
2.16k
      HeapTuple procTuple;
2933
2.16k
      Oid     aggOwner;
2934
2935
2.16k
      procTuple = SearchSysCache1(PROCOID,
2936
2.16k
                    ObjectIdGetDatum(aggref->aggfnoid));
2937
2.16k
      if (!HeapTupleIsValid(procTuple))
2938
0
        elog(ERROR, "cache lookup failed for function %u",
2939
2.16k
           aggref->aggfnoid);
2940
2.16k
      aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2941
2.16k
      ReleaseSysCache(procTuple);
2942
2943
2.16k
      aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2944
2.16k
                     ACL_EXECUTE);
2945
2.16k
      if (aclresult != ACLCHECK_OK)
2946
0
        aclcheck_error(aclresult, OBJECT_FUNCTION,
2947
0
                 get_func_name(transfn_oid));
2948
2.16k
      InvokeFunctionExecuteHook(transfn_oid);
2949
2.16k
      if (OidIsValid(finalfn_oid))
2950
266
      {
2951
266
        aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2952
266
                       ACL_EXECUTE);
2953
266
        if (aclresult != ACLCHECK_OK)
2954
0
          aclcheck_error(aclresult, OBJECT_FUNCTION,
2955
0
                   get_func_name(finalfn_oid));
2956
266
        InvokeFunctionExecuteHook(finalfn_oid);
2957
266
      }
2958
2.16k
      if (OidIsValid(serialfn_oid))
2959
0
      {
2960
0
        aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2961
0
                       ACL_EXECUTE);
2962
0
        if (aclresult != ACLCHECK_OK)
2963
0
          aclcheck_error(aclresult, OBJECT_FUNCTION,
2964
0
                   get_func_name(serialfn_oid));
2965
0
        InvokeFunctionExecuteHook(serialfn_oid);
2966
0
      }
2967
2.16k
      if (OidIsValid(deserialfn_oid))
2968
0
      {
2969
0
        aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2970
0
                       ACL_EXECUTE);
2971
0
        if (aclresult != ACLCHECK_OK)
2972
0
          aclcheck_error(aclresult, OBJECT_FUNCTION,
2973
0
                   get_func_name(deserialfn_oid));
2974
0
        InvokeFunctionExecuteHook(deserialfn_oid);
2975
0
      }
2976
2.16k
    }
2977
2978
    /*
2979
     * Get actual datatypes of the (nominal) aggregate inputs.  These
2980
     * could be different from the agg's declared input types, when the
2981
     * agg accepts ANY or a polymorphic type.
2982
     */
2983
2.16k
    numArguments = get_aggregate_argtypes(aggref, inputTypes);
2984
2985
    /*
2986
     * If we support YB agg pushdown we set transition function input types
2987
     * to be the same as the transition value that will be the type returned by
2988
     * the DocDB aggregate result which we combine using the appropriate transition
2989
     * function. Aggstar (e.g. COUNT(*)) do not have arguments so we skip them.
2990
     */
2991
2.16k
    if (aggstate->yb_pushdown_supported && !aggref->aggstar)
2992
107
    {
2993
      /* We currently only support single argument aggregates for YB pushdown. */
2994
107
      numArguments = 1;
2995
107
      Assert(list_length(aggref->aggargtypes) == numArguments);
2996
107
      inputTypes[0] = aggref->aggtranstype;
2997
2998
      /*
2999
       * Convert SUM function to 8-byte SUM for appropriate types to match values
3000
       * returned from DocDB aggregates.
3001
       *
3002
       * Note that we don't need to perform this for floats as they use accumulators
3003
       * of the same precision as the input. Also, we don't support pushdown of 8-byte
3004
       * integer SUM as PG uses a numeric type to avoid overflow which we don't yet fully
3005
       * support in DocDB, so we don't need to handle that here either.
3006
       */
3007
107
      if (strcmp(get_func_name(aggref->aggfnoid), "sum") == 0)
3008
16
      {
3009
16
        switch (linitial_oid(aggref->aggargtypes))
3010
16
        {
3011
2
          case INT2OID:
3012
12
          case INT4OID:
3013
12
            transfn_oid = F_INT8PL;
3014
12
            break;
3015
2.16k
        }
3016
2.16k
      }
3017
107
    }
3018
3019
    /* Count the "direct" arguments, if any */
3020
2.16k
    numDirectArgs = list_length(aggref->aggdirectargs);
3021
3022
    /* Detect how many arguments to pass to the finalfn */
3023
2.16k
    if (aggform->aggfinalextra)
3024
140
      peragg->numFinalArgs = numArguments + 1;
3025
2.02k
    else
3026
2.02k
      peragg->numFinalArgs = numDirectArgs + 1;
3027
3028
    /* Initialize any direct-argument expressions */
3029
2.16k
    peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3030
2.16k
                         (PlanState *) aggstate);
3031
3032
    /*
3033
     * build expression trees using actual argument & result types for the
3034
     * finalfn, if it exists and is required.
3035
     */
3036
2.16k
    if (OidIsValid(finalfn_oid))
3037
266
    {
3038
266
      build_aggregate_finalfn_expr(inputTypes,
3039
266
                     peragg->numFinalArgs,
3040
266
                     aggtranstype,
3041
266
                     aggref->aggtype,
3042
266
                     aggref->inputcollid,
3043
266
                     finalfn_oid,
3044
266
                     &finalfnexpr);
3045
266
      fmgr_info(finalfn_oid, &peragg->finalfn);
3046
266
      fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3047
266
    }
3048
3049
    /* get info about the output value's datatype */
3050
2.16k
    get_typlenbyval(aggref->aggtype,
3051
2.16k
            &peragg->resulttypeLen,
3052
2.16k
            &peragg->resulttypeByVal);
3053
3054
    /*
3055
     * initval is potentially null, so don't try to access it as a struct
3056
     * field. Must do it the hard way with SysCacheGetAttr.
3057
     */
3058
2.16k
    textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3059
2.16k
                    Anum_pg_aggregate_agginitval,
3060
2.16k
                    &initValueIsNull);
3061
2.16k
    if (initValueIsNull)
3062
1.51k
      initValue = (Datum) 0;
3063
650
    else
3064
650
      initValue = GetAggInitVal(textInitVal, aggtranstype);
3065
3066
    /*
3067
     * 2. Build working state for invoking the transition function, or
3068
     * look up previously initialized working state, if we can share it.
3069
     *
3070
     * find_compatible_peragg() already collected a list of shareable
3071
     * per-Trans's with the same inputs. Check if any of them have the
3072
     * same transition function and initial value.
3073
     */
3074
2.16k
    existing_transno = find_compatible_pertrans(aggstate, aggref,
3075
2.16k
                          shareable,
3076
2.16k
                          transfn_oid, aggtranstype,
3077
2.16k
                          serialfn_oid, deserialfn_oid,
3078
2.16k
                          initValue, initValueIsNull,
3079
2.16k
                          same_input_transnos);
3080
2.16k
    if (existing_transno != -1)
3081
9
    {
3082
      /*
3083
       * Existing compatible trans found, so just point the 'peragg' to
3084
       * the same per-trans struct, and mark the trans state as shared.
3085
       */
3086
9
      pertrans = &pertransstates[existing_transno];
3087
9
      pertrans->aggshared = true;
3088
9
      peragg->transno = existing_transno;
3089
9
    }
3090
2.15k
    else
3091
2.15k
    {
3092
2.15k
      pertrans = &pertransstates[++transno];
3093
2.15k
      build_pertrans_for_aggref(pertrans, aggstate, estate,
3094
2.15k
                    aggref, transfn_oid, aggtranstype,
3095
2.15k
                    serialfn_oid, deserialfn_oid,
3096
2.15k
                    initValue, initValueIsNull,
3097
2.15k
                    inputTypes, numArguments);
3098
2.15k
      peragg->transno = transno;
3099
2.15k
    }
3100
2.16k
    ReleaseSysCache(aggTuple);
3101
2.16k
  }
3102
3103
  /*
3104
   * Update aggstate->numaggs to be the number of unique aggregates found.
3105
   * Also set numstates to the number of unique transition states found.
3106
   */
3107
2.02k
  aggstate->numaggs = aggno + 1;
3108
2.02k
  aggstate->numtrans = transno + 1;
3109
3110
  /*
3111
   * Last, check whether any more aggregates got added onto the node while
3112
   * we processed the expressions for the aggregate arguments (including not
3113
   * only the regular arguments and FILTER expressions handled immediately
3114
   * above, but any direct arguments we might've handled earlier).  If so,
3115
   * we have nested aggregate functions, which is semantically nonsensical,
3116
   * so complain.  (This should have been caught by the parser, so we don't
3117
   * need to work hard on a helpful error message; but we defend against it
3118
   * here anyway, just to be sure.)
3119
   */
3120
2.02k
  if (numaggs != list_length(aggstate->aggs))
3121
2.02k
    ereport(ERROR,
3122
2.02k
        (errcode(ERRCODE_GROUPING_ERROR),
3123
2.02k
         errmsg("aggregate function calls cannot be nested")));
3124
3125
  /*
3126
   * Build expressions doing all the transition work at once. We build a
3127
   * different one for each phase, as the number of transition function
3128
   * invocation can differ between phases. Note this'll work both for
3129
   * transition and combination functions (although there'll only be one
3130
   * phase in the latter case).
3131
   */
3132
5.94k
  for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3133
3.91k
  {
3134
3.91k
    AggStatePerPhase phase = &aggstate->phases[phaseidx];
3135
3.91k
    bool    dohash = false;
3136
3.91k
    bool    dosort = false;
3137
3138
    /* phase 0 doesn't necessarily exist */
3139
3.91k
    if (!phase->aggnode)
3140
1.89k
      continue;
3141
3142
2.02k
    if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3143
4
    {
3144
      /*
3145
       * Phase one, and only phase one, in a mixed agg performs both
3146
       * sorting and aggregation.
3147
       */
3148
4
      dohash = true;
3149
4
      dosort = true;
3150
4
    }
3151
2.02k
    else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
3152
4
    {
3153
      /*
3154
       * No need to compute a transition function for an AGG_MIXED phase
3155
       * 0 - the contents of the hashtables will have been computed
3156
       * during phase 1.
3157
       */
3158
4
      continue;
3159
4
    }
3160
2.01k
    else if (phase->aggstrategy == AGG_PLAIN ||
3161
151
         phase->aggstrategy == AGG_SORTED)
3162
1.89k
    {
3163
1.89k
      dohash = false;
3164
1.89k
      dosort = true;
3165
1.89k
    }
3166
126
    else if (phase->aggstrategy == AGG_HASHED)
3167
126
    {
3168
126
      dohash = true;
3169
126
      dosort = false;
3170
126
    }
3171
126
    else
3172
0
      Assert(false);
3173
3174
2.02k
    phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
3175
3176
2.02k
  }
3177
3178
2.02k
  return aggstate;
3179
2.02k
}
3180
3181
/*
3182
 * Build the state needed to calculate a state value for an aggregate.
3183
 *
3184
 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
3185
 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
3186
 * of the arguments could be calculated from 'aggref', but the caller has
3187
 * calculated them already, so might as well pass them.
3188
 */
3189
static void
3190
build_pertrans_for_aggref(AggStatePerTrans pertrans,
3191
              AggState *aggstate, EState *estate,
3192
              Aggref *aggref,
3193
              Oid aggtransfn, Oid aggtranstype,
3194
              Oid aggserialfn, Oid aggdeserialfn,
3195
              Datum initValue, bool initValueIsNull,
3196
              Oid *inputTypes, int numArguments)
3197
2.15k
{
3198
2.15k
  int     numGroupingSets = Max(aggstate->maxsets, 1);
3199
2.15k
  Expr     *serialfnexpr = NULL;
3200
2.15k
  Expr     *deserialfnexpr = NULL;
3201
2.15k
  ListCell   *lc;
3202
2.15k
  int     numInputs;
3203
2.15k
  int     numDirectArgs;
3204
2.15k
  List     *sortlist;
3205
2.15k
  int     numSortCols;
3206
2.15k
  int     numDistinctCols;
3207
2.15k
  int     i;
3208
3209
  /* Begin filling in the pertrans data */
3210
2.15k
  pertrans->aggref = aggref;
3211
2.15k
  pertrans->aggshared = false;
3212
2.15k
  pertrans->aggCollation = aggref->inputcollid;
3213
2.15k
  pertrans->transfn_oid = aggtransfn;
3214
2.15k
  pertrans->serialfn_oid = aggserialfn;
3215
2.15k
  pertrans->deserialfn_oid = aggdeserialfn;
3216
2.15k
  pertrans->initValue = initValue;
3217
2.15k
  pertrans->initValueIsNull = initValueIsNull;
3218
3219
  /* Count the "direct" arguments, if any */
3220
2.15k
  numDirectArgs = list_length(aggref->aggdirectargs);
3221
3222
  /* Count the number of aggregated input columns */
3223
2.15k
  pertrans->numInputs = numInputs = list_length(aggref->args);
3224
3225
2.15k
  pertrans->aggtranstype = aggtranstype;
3226
3227
  /* Detect how many arguments to pass to the transfn */
3228
2.15k
  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3229
30
    pertrans->numTransInputs = numInputs;
3230
2.12k
  else
3231
2.12k
    pertrans->numTransInputs = numArguments;
3232
3233
  /*
3234
   * When combining states, we have no use at all for the aggregate
3235
   * function's transfn. Instead we use the combinefn.  In this case, the
3236
   * transfn and transfn_oid fields of pertrans refer to the combine
3237
   * function rather than the transition function.
3238
   */
3239
2.15k
  if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3240
4
  {
3241
4
    Expr     *combinefnexpr;
3242
3243
4
    build_aggregate_combinefn_expr(aggtranstype,
3244
4
                     aggref->inputcollid,
3245
4
                     aggtransfn,
3246
4
                     &combinefnexpr);
3247
4
    fmgr_info(aggtransfn, &pertrans->transfn);
3248
4
    fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
3249
3250
4
    InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3251
4
                 &pertrans->transfn,
3252
4
                 2,
3253
4
                 pertrans->aggCollation,
3254
4
                 (void *) aggstate, NULL);
3255
3256
    /*
3257
     * Ensure that a combine function to combine INTERNAL states is not
3258
     * strict. This should have been checked during CREATE AGGREGATE, but
3259
     * the strict property could have been changed since then.
3260
     */
3261
4
    if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3262
4
      ereport(ERROR,
3263
4
          (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3264
4
           errmsg("combine function with transition type %s must not be declared STRICT",
3265
4
              format_type_be(aggtranstype))));
3266
4
  }
3267
2.15k
  else
3268
2.15k
  {
3269
2.15k
    Expr     *transfnexpr;
3270
3271
    /*
3272
     * Set up infrastructure for calling the transfn.  Note that
3273
     * invtransfn is not needed here.
3274
     */
3275
2.15k
    build_aggregate_transfn_expr(inputTypes,
3276
2.15k
                   numArguments,
3277
2.15k
                   numDirectArgs,
3278
2.15k
                   aggref->aggvariadic,
3279
2.15k
                   aggtranstype,
3280
2.15k
                   aggref->inputcollid,
3281
2.15k
                   aggtransfn,
3282
2.15k
                   InvalidOid,
3283
2.15k
                   &transfnexpr,
3284
2.15k
                   NULL);
3285
2.15k
    fmgr_info(aggtransfn, &pertrans->transfn);
3286
2.15k
    fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3287
3288
2.15k
    InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3289
2.15k
                 &pertrans->transfn,
3290
2.15k
                 pertrans->numTransInputs + 1,
3291
2.15k
                 pertrans->aggCollation,
3292
2.15k
                 (void *) aggstate, NULL);
3293
3294
    /*
3295
     * If the transfn is strict and the initval is NULL, make sure input
3296
     * type and transtype are the same (or at least binary-compatible), so
3297
     * that it's OK to use the first aggregated input value as the initial
3298
     * transValue.  This should have been checked at agg definition time,
3299
     * but we must check again in case the transfn's strictness property
3300
     * has been changed.
3301
     */
3302
2.15k
    if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3303
1.19k
    {
3304
1.19k
      if (numArguments <= numDirectArgs ||
3305
1.19k
        !IsBinaryCoercible(inputTypes[numDirectArgs],
3306
1.19k
                   aggtranstype))
3307
1.19k
        ereport(ERROR,
3308
1.19k
            (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3309
1.19k
             errmsg("aggregate %u needs to have compatible input type and transition type",
3310
1.19k
                aggref->aggfnoid)));
3311
1.19k
    }
3312
2.15k
  }
3313
3314
  /* get info about the state value's datatype */
3315
2.15k
  get_typlenbyval(aggtranstype,
3316
2.15k
          &pertrans->transtypeLen,
3317
2.15k
          &pertrans->transtypeByVal);
3318
3319
2.15k
  if (OidIsValid(aggserialfn))
3320
0
  {
3321
0
    build_aggregate_serialfn_expr(aggserialfn,
3322
0
                    &serialfnexpr);
3323
0
    fmgr_info(aggserialfn, &pertrans->serialfn);
3324
0
    fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3325
3326
0
    InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3327
0
                 &pertrans->serialfn,
3328
0
                 1,
3329
0
                 InvalidOid,
3330
0
                 (void *) aggstate, NULL);
3331
0
  }
3332
3333
2.15k
  if (OidIsValid(aggdeserialfn))
3334
0
  {
3335
0
    build_aggregate_deserialfn_expr(aggdeserialfn,
3336
0
                    &deserialfnexpr);
3337
0
    fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3338
0
    fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3339
3340
0
    InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3341
0
                 &pertrans->deserialfn,
3342
0
                 2,
3343
0
                 InvalidOid,
3344
0
                 (void *) aggstate, NULL);
3345
3346
0
  }
3347
3348
  /*
3349
   * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3350
   * have a list of SortGroupClause nodes; fish out the data in them and
3351
   * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
3352
   * however; the agg's transfn and finalfn are responsible for that.
3353
   *
3354
   * Note that by construction, if there is a DISTINCT clause then the ORDER
3355
   * BY clause is a prefix of it (see transformDistinctClause).
3356
   */
3357
2.15k
  if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3358
30
  {
3359
30
    sortlist = NIL;
3360
30
    numSortCols = numDistinctCols = 0;
3361
30
  }
3362
2.12k
  else if (aggref->aggdistinct)
3363
29
  {
3364
29
    sortlist = aggref->aggdistinct;
3365
29
    numSortCols = numDistinctCols = list_length(sortlist);
3366
29
    Assert(numSortCols >= list_length(aggref->aggorder));
3367
29
  }
3368
2.09k
  else
3369
2.09k
  {
3370
2.09k
    sortlist = aggref->aggorder;
3371
2.09k
    numSortCols = list_length(sortlist);
3372
2.09k
    numDistinctCols = 0;
3373
2.09k
  }
3374
3375
2.15k
  pertrans->numSortCols = numSortCols;
3376
2.15k
  pertrans->numDistinctCols = numDistinctCols;
3377
3378
  /*
3379
   * If we have either sorting or filtering to do, create a tupledesc and
3380
   * slot corresponding to the aggregated inputs (including sort
3381
   * expressions) of the agg.
3382
   */
3383
2.15k
  if (numSortCols > 0 || aggref->aggfilter)
3384
349
  {
3385
349
    pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3386
349
    pertrans->sortslot =
3387
349
      ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3388
349
  }
3389
3390
2.15k
  if (numSortCols > 0)
3391
131
  {
3392
    /*
3393
     * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3394
     * (yet)
3395
     */
3396
131
    Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3397
3398
    /* If we have only one input, we need its len/byval info. */
3399
131
    if (numInputs == 1)
3400
66
    {
3401
66
      get_typlenbyval(inputTypes[numDirectArgs],
3402
66
              &pertrans->inputtypeLen,
3403
66
              &pertrans->inputtypeByVal);
3404
66
    }
3405
65
    else if (numDistinctCols > 0)
3406
12
    {
3407
      /* we will need an extra slot to store prior values */
3408
12
      pertrans->uniqslot =
3409
12
        ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3410
12
    }
3411
3412
    /* Extract the sort information for use later */
3413
131
    pertrans->sortColIdx =
3414
131
      (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3415
131
    pertrans->sortOperators =
3416
131
      (Oid *) palloc(numSortCols * sizeof(Oid));
3417
131
    pertrans->sortCollations =
3418
131
      (Oid *) palloc(numSortCols * sizeof(Oid));
3419
131
    pertrans->sortNullsFirst =
3420
131
      (bool *) palloc(numSortCols * sizeof(bool));
3421
3422
131
    i = 0;
3423
131
    foreach(lc, sortlist)
3424
155
    {
3425
155
      SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3426
155
      TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3427
3428
      /* the parser should have made sure of this */
3429
155
      Assert(OidIsValid(sortcl->sortop));
3430
3431
155
      pertrans->sortColIdx[i] = tle->resno;
3432
155
      pertrans->sortOperators[i] = sortcl->sortop;
3433
155
      pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3434
155
      pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3435
155
      i++;
3436
155
    }
3437
131
    Assert(i == numSortCols);
3438
131
  }
3439
3440
2.15k
  if (aggref->aggdistinct)
3441
29
  {
3442
29
    Oid      *ops;
3443
3444
29
    Assert(numArguments > 0);
3445
29
    Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3446
3447
29
    ops = palloc(numDistinctCols * sizeof(Oid));
3448
3449
29
    i = 0;
3450
29
    foreach(lc, aggref->aggdistinct)
3451
53
      ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3452
3453
    /* lookup / build the necessary comparators */
3454
29
    if (numDistinctCols == 1)
3455
17
      fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3456
12
    else
3457
12
      pertrans->equalfnMulti =
3458
12
        execTuplesMatchPrepare(pertrans->sortdesc,
3459
12
                     numDistinctCols,
3460
12
                     pertrans->sortColIdx,
3461
12
                     ops,
3462
12
                     &aggstate->ss.ps);
3463
29
    pfree(ops);
3464
29
  }
3465
3466
2.15k
  pertrans->sortstates = (Tuplesortstate **)
3467
2.15k
    palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3468
2.15k
}
3469
3470
3471
static Datum
3472
GetAggInitVal(Datum textInitVal, Oid transtype)
3473
649
{
3474
649
  Oid     typinput,
3475
649
        typioparam;
3476
649
  char     *strInitVal;
3477
649
  Datum   initVal;
3478
3479
649
  getTypeInputInfo(transtype, &typinput, &typioparam);
3480
649
  strInitVal = TextDatumGetCString(textInitVal);
3481
649
  initVal = OidInputFunctionCall(typinput, strInitVal,
3482
649
                   typioparam, -1);
3483
649
  pfree(strInitVal);
3484
649
  return initVal;
3485
649
}
3486
3487
/*
3488
 * find_compatible_peragg - search for a previously initialized per-Agg struct
3489
 *
3490
 * Searches the previously looked at aggregates to find one which is compatible
3491
 * with this one, with the same input parameters. If no compatible aggregate
3492
 * can be found, returns -1.
3493
 *
3494
 * As a side-effect, this also collects a list of existing, shareable per-Trans
3495
 * structs with matching inputs. If no identical Aggref is found, the list is
3496
 * passed later to find_compatible_pertrans, to see if we can at least reuse
3497
 * the state value of another aggregate.
3498
 */
3499
static int
3500
find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3501
             int lastaggno, List **same_input_transnos)
3502
2.18k
{
3503
2.18k
  int     aggno;
3504
2.18k
  AggStatePerAgg peraggs;
3505
3506
2.18k
  *same_input_transnos = NIL;
3507
3508
  /* we mustn't reuse the aggref if it contains volatile function calls */
3509
2.18k
  if (contain_volatile_functions((Node *) newagg))
3510
6
    return -1;
3511
3512
2.17k
  peraggs = aggstate->peragg;
3513
3514
  /*
3515
   * Search through the list of already seen aggregates. If we find an
3516
   * existing identical aggregate call, then we can re-use that one. While
3517
   * searching, we'll also collect a list of Aggrefs with the same input
3518
   * parameters. If no matching Aggref is found, the caller can potentially
3519
   * still re-use the transition state of one of them.  (At this stage we
3520
   * just compare the parsetrees; whether different aggregates share the
3521
   * same transition function will be checked later.)
3522
   */
3523
2.55k
  for (aggno = 0; aggno <= lastaggno; aggno++)
3524
389
  {
3525
389
    AggStatePerAgg peragg;
3526
389
    Aggref     *existingRef;
3527
3528
389
    peragg = &peraggs[aggno];
3529
389
    existingRef = peragg->aggref;
3530
3531
    /* all of the following must be the same or it's no match */
3532
389
    if (newagg->inputcollid != existingRef->inputcollid ||
3533
342
      newagg->aggtranstype != existingRef->aggtranstype ||
3534
164
      newagg->aggstar != existingRef->aggstar ||
3535
152
      newagg->aggvariadic != existingRef->aggvariadic ||
3536
152
      newagg->aggkind != existingRef->aggkind ||
3537
149
      !equal(newagg->args, existingRef->args) ||
3538
76
      !equal(newagg->aggorder, existingRef->aggorder) ||
3539
76
      !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3540
76
      !equal(newagg->aggfilter, existingRef->aggfilter))
3541
316
      continue;
3542
3543
    /* if it's the same aggregate function then report exact match */
3544
73
    if (newagg->aggfnoid == existingRef->aggfnoid &&
3545
16
      newagg->aggtype == existingRef->aggtype &&
3546
16
      newagg->aggcollid == existingRef->aggcollid &&
3547
16
      equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3548
16
    {
3549
16
      list_free(*same_input_transnos);
3550
16
      *same_input_transnos = NIL;
3551
16
      return aggno;
3552
16
    }
3553
3554
    /*
3555
     * Not identical, but it had the same inputs.  If the final function
3556
     * permits sharing, return its transno to the caller, in case we can
3557
     * re-use its per-trans state.  (If there's already sharing going on,
3558
     * we might report a transno more than once.  find_compatible_pertrans
3559
     * is cheap enough that it's not worth spending cycles to avoid that.)
3560
     */
3561
57
    if (peragg->shareable)
3562
56
      *same_input_transnos = lappend_int(*same_input_transnos,
3563
56
                         peragg->transno);
3564
57
  }
3565
3566
2.16k
  return -1;
3567
2.17k
}
3568
3569
/*
3570
 * find_compatible_pertrans - search for a previously initialized per-Trans
3571
 * struct
3572
 *
3573
 * Searches the list of transnos for a per-Trans struct with the same
3574
 * transition function and initial condition. (The inputs have already been
3575
 * verified to match.)
3576
 */
3577
static int
3578
find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3579
             Oid aggtransfn, Oid aggtranstype,
3580
             Oid aggserialfn, Oid aggdeserialfn,
3581
             Datum initValue, bool initValueIsNull,
3582
             List *transnos)
3583
2.16k
{
3584
2.16k
  ListCell   *lc;
3585
3586
  /* If this aggregate can't share transition states, give up */
3587
2.16k
  if (!shareable)
3588
13
    return -1;
3589
3590
2.15k
  foreach(lc, transnos)
3591
56
  {
3592
56
    int     transno = lfirst_int(lc);
3593
56
    AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3594
3595
    /*
3596
     * if the transfns or transition state types are not the same then the
3597
     * state can't be shared.
3598
     */
3599
56
    if (aggtransfn != pertrans->transfn_oid ||
3600
10
      aggtranstype != pertrans->aggtranstype)
3601
46
      continue;
3602
3603
    /*
3604
     * The serialization and deserialization functions must match, if
3605
     * present, as we're unable to share the trans state for aggregates
3606
     * which will serialize or deserialize into different formats.
3607
     * Remember that these will be InvalidOid if they're not required for
3608
     * this agg node.
3609
     */
3610
10
    if (aggserialfn != pertrans->serialfn_oid ||
3611
10
      aggdeserialfn != pertrans->deserialfn_oid)
3612
0
      continue;
3613
3614
    /*
3615
     * Check that the initial condition matches, too.
3616
     */
3617
10
    if (initValueIsNull && pertrans->initValueIsNull)
3618
5
      return transno;
3619
3620
5
    if (!initValueIsNull && !pertrans->initValueIsNull &&
3621
5
      datumIsEqual(initValue, pertrans->initValue,
3622
5
             pertrans->transtypeByVal, pertrans->transtypeLen))
3623
4
      return transno;
3624
5
  }
3625
2.14k
  return -1;
3626
2.15k
}
3627
3628
void
3629
ExecEndAgg(AggState *node)
3630
1.96k
{
3631
1.96k
  PlanState  *outerPlan;
3632
1.96k
  int     transno;
3633
1.96k
  int     numGroupingSets = Max(node->maxsets, 1);
3634
1.96k
  int     setno;
3635
3636
  /* Make sure we have closed any open tuplesorts */
3637
3638
1.96k
  if (node->sort_in)
3639
0
    tuplesort_end(node->sort_in);
3640
1.96k
  if (node->sort_out)
3641
0
    tuplesort_end(node->sort_out);
3642
3643
4.06k
  for (transno = 0; transno < node->numtrans; transno++)
3644
2.10k
  {
3645
2.10k
    AggStatePerTrans pertrans = &node->pertrans[transno];
3646
3647
4.20k
    for (setno = 0; setno < numGroupingSets; setno++)
3648
2.10k
    {
3649
2.10k
      if (pertrans->sortstates[setno])
3650
0
        tuplesort_end(pertrans->sortstates[setno]);
3651
2.10k
    }
3652
2.10k
  }
3653
3654
  /* And ensure any agg shutdown callbacks have been called */
3655
3.93k
  for (setno = 0; setno < numGroupingSets; setno++)
3656
1.96k
    ReScanExprContext(node->aggcontexts[setno]);
3657
1.96k
  if (node->hashcontext)
3658
129
    ReScanExprContext(node->hashcontext);
3659
3660
  /*
3661
   * We don't actually free any ExprContexts here (see comment in
3662
   * ExecFreeExprContext), just unlinking the output one from the plan node
3663
   * suffices.
3664
   */
3665
1.96k
  ExecFreeExprContext(&node->ss.ps);
3666
3667
  /* clean up tuple table */
3668
1.96k
  ExecClearTuple(node->ss.ss_ScanTupleSlot);
3669
3670
1.96k
  outerPlan = outerPlanState(node);
3671
1.96k
  ExecEndNode(outerPlan);
3672
1.96k
}
3673
3674
void
3675
ExecReScanAgg(AggState *node)
3676
1.04k
{
3677
1.04k
  ExprContext *econtext = node->ss.ps.ps_ExprContext;
3678
1.04k
  PlanState  *outerPlan = outerPlanState(node);
3679
1.04k
  Agg      *aggnode = (Agg *) node->ss.ps.plan;
3680
1.04k
  int     transno;
3681
1.04k
  int     numGroupingSets = Max(node->maxsets, 1);
3682
1.04k
  int     setno;
3683
3684
1.04k
  node->agg_done = false;
3685
3686
1.04k
  if (node->aggstrategy == AGG_HASHED)
3687
1.00k
  {
3688
    /*
3689
     * In the hashed case, if we haven't yet built the hash table then we
3690
     * can just return; nothing done yet, so nothing to undo. If subnode's
3691
     * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3692
     * else no reason to re-scan it at all.
3693
     */
3694
1.00k
    if (!node->table_filled)
3695
3
      return;
3696
3697
    /*
3698
     * If we do have the hash table, and the subplan does not have any
3699
     * parameter changes, and none of our own parameter changes affect
3700
     * input expressions of the aggregated functions, then we can just
3701
     * rescan the existing hash table; no need to build it again.
3702
     */
3703
1.00k
    if (outerPlan->chgParam == NULL &&
3704
4
      !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3705
0
    {
3706
0
      ResetTupleHashIterator(node->perhash[0].hashtable,
3707
0
                   &node->perhash[0].hashiter);
3708
0
      select_current_set(node, 0, true);
3709
0
      return;
3710
0
    }
3711
1.04k
  }
3712
3713
  /* Make sure we have closed any open tuplesorts */
3714
1.08k
  for (transno = 0; transno < node->numtrans; transno++)
3715
42
  {
3716
84
    for (setno = 0; setno < numGroupingSets; setno++)
3717
42
    {
3718
42
      AggStatePerTrans pertrans = &node->pertrans[transno];
3719
3720
42
      if (pertrans->sortstates[setno])
3721
0
      {
3722
0
        tuplesort_end(pertrans->sortstates[setno]);
3723
0
        pertrans->sortstates[setno] = NULL;
3724
0
      }
3725
42
    }
3726
42
  }
3727
3728
  /*
3729
   * We don't need to ReScanExprContext the output tuple context here;
3730
   * ExecReScan already did it. But we do need to reset our per-grouping-set
3731
   * contexts, which may have transvalues stored in them. (We use rescan
3732
   * rather than just reset because transfns may have registered callbacks
3733
   * that need to be run now.) For the AGG_HASHED case, see below.
3734
   */
3735
3736
2.08k
  for (setno = 0; setno < numGroupingSets; setno++)
3737
1.04k
  {
3738
1.04k
    ReScanExprContext(node->aggcontexts[setno]);
3739
1.04k
  }
3740
3741
  /* Release first tuple of group, if we have made a copy */
3742
1.04k
  if (node->grp_firstTuple != NULL)
3743
0
  {
3744
0
    heap_freetuple(node->grp_firstTuple);
3745
0
    node->grp_firstTuple = NULL;
3746
0
  }
3747
1.04k
  ExecClearTuple(node->ss.ss_ScanTupleSlot);
3748
3749
  /* Forget current agg values */
3750
1.04k
  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3751
1.04k
  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3752
3753
  /*
3754
   * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3755
   * the hashcontext. This used to be an issue, but now, resetting a context
3756
   * automatically deletes sub-contexts too.
3757
   */
3758
1.04k
  if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3759
1.00k
  {
3760
1.00k
    ReScanExprContext(node->hashcontext);
3761
    /* Rebuild an empty hash table */
3762
1.00k
    build_hash_table(node);
3763
1.00k
    node->table_filled = false;
3764
    /* iterator will be reset when the table is filled */
3765
1.00k
  }
3766
3767
1.04k
  if (node->aggstrategy != AGG_HASHED)
3768
38
  {
3769
    /*
3770
     * Reset the per-group state (in particular, mark transvalues null)
3771
     */
3772
76
    for (setno = 0; setno < numGroupingSets; setno++)
3773
38
    {
3774
38
      MemSet(node->pergroups[setno], 0,
3775
38
           sizeof(AggStatePerGroupData) * node->numaggs);
3776
38
    }
3777
3778
    /* reset to phase 1 */
3779
38
    initialize_phase(node, 1);
3780
3781
38
    node->input_done = false;
3782
38
    node->projected_set = -1;
3783
38
  }
3784
3785
1.04k
  if (outerPlan->chgParam == NULL)
3786
6
    ExecReScan(outerPlan);
3787
1.04k
}
3788
3789
3790
/***********************************************************************
3791
 * API exposed to aggregate functions
3792
 ***********************************************************************/
3793
3794
3795
/*
3796
 * AggCheckCallContext - test if a SQL function is being called as an aggregate
3797
 *
3798
 * The transition and/or final functions of an aggregate may want to verify
3799
 * that they are being called as aggregates, rather than as plain SQL
3800
 * functions.  They should use this function to do so.  The return value
3801
 * is nonzero if being called as an aggregate, or zero if not.  (Specific
3802
 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3803
 * values could conceivably appear in future.)
3804
 *
3805
 * If aggcontext isn't NULL, the function also stores at *aggcontext the
3806
 * identity of the memory context that aggregate transition values are being
3807
 * stored in.  Note that the same aggregate call site (flinfo) may be called
3808
 * interleaved on different transition values in different contexts, so it's
3809
 * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
3810
 * cache it in the transvalue itself (for internal-type transvalues).
3811
 */
3812
int
3813
AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3814
25.2k
{
3815
25.2k
  if (fcinfo->context && IsA(fcinfo->context, AggState))
3816
25.1k
  {
3817
25.1k
    if (aggcontext)
3818
1.35k
    {
3819
1.35k
      AggState   *aggstate = ((AggState *) fcinfo->context);
3820
1.35k
      ExprContext *cxt = aggstate->curaggcontext;
3821
3822
1.35k
      *aggcontext = cxt->ecxt_per_tuple_memory;
3823
1.35k
    }
3824
25.1k
    return AGG_CONTEXT_AGGREGATE;
3825
25.1k
  }
3826
133
  if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3827
60
  {
3828
60
    if (aggcontext)
3829
40
      *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3830
60
    return AGG_CONTEXT_WINDOW;
3831
60
  }
3832
3833
  /* this is just to prevent "uninitialized variable" warnings */
3834
73
  if (aggcontext)
3835
73
    *aggcontext = NULL;
3836
73
  return 0;
3837
73
}
3838
3839
/*
3840
 * AggGetAggref - allow an aggregate support function to get its Aggref
3841
 *
3842
 * If the function is being called as an aggregate support function,
3843
 * return the Aggref node for the aggregate call.  Otherwise, return NULL.
3844
 *
3845
 * Aggregates sharing the same inputs and transition functions can get
3846
 * merged into a single transition calculation.  If the transition function
3847
 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3848
 * executing.  It must therefore not pay attention to the Aggref fields that
3849
 * relate to the final function, as those are indeterminate.  But if a final
3850
 * function calls AggGetAggref, it will get a precise result.
3851
 *
3852
 * Note that if an aggregate is being used as a window function, this will
3853
 * return NULL.  We could provide a similar function to return the relevant
3854
 * WindowFunc node in such cases, but it's not needed yet.
3855
 */
3856
Aggref *
3857
AggGetAggref(FunctionCallInfo fcinfo)
3858
29
{
3859
29
  if (fcinfo->context && IsA(fcinfo->context, AggState))
3860
29
  {
3861
29
    AggState   *aggstate = (AggState *) fcinfo->context;
3862
29
    AggStatePerAgg curperagg;
3863
29
    AggStatePerTrans curpertrans;
3864
3865
    /* check curperagg (valid when in a final function) */
3866
29
    curperagg = aggstate->curperagg;
3867
3868
29
    if (curperagg)
3869
0
      return curperagg->aggref;
3870
3871
    /* check curpertrans (valid when in a transition function) */
3872
29
    curpertrans = aggstate->curpertrans;
3873
3874
29
    if (curpertrans)
3875
29
      return curpertrans->aggref;
3876
0
  }
3877
0
  return NULL;
3878
0
}
3879
3880
/*
3881
 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3882
 *
3883
 * This is useful in agg final functions; the context returned is one that
3884
 * the final function can safely reset as desired.  This isn't useful for
3885
 * transition functions, since the context returned MAY (we don't promise)
3886
 * be the same as the context those are called in.
3887
 *
3888
 * As above, this is currently not useful for aggs called as window functions.
3889
 */
3890
MemoryContext
3891
AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3892
0
{
3893
0
  if (fcinfo->context && IsA(fcinfo->context, AggState))
3894
0
  {
3895
0
    AggState   *aggstate = (AggState *) fcinfo->context;
3896
3897
0
    return aggstate->tmpcontext->ecxt_per_tuple_memory;
3898
0
  }
3899
0
  return NULL;
3900
0
}
3901
3902
/*
3903
 * AggStateIsShared - find out whether transition state is shared
3904
 *
3905
 * If the function is being called as an aggregate support function,
3906
 * return true if the aggregate's transition state is shared across
3907
 * multiple aggregates, false if it is not.
3908
 *
3909
 * Returns true if not called as an aggregate support function.
3910
 * This is intended as a conservative answer, ie "no you'd better not
3911
 * scribble on your input".  In particular, will return true if the
3912
 * aggregate is being used as a window function, which is a scenario
3913
 * in which changing the transition state is a bad idea.  We might
3914
 * want to refine the behavior for the window case in future.
3915
 */
3916
bool
3917
AggStateIsShared(FunctionCallInfo fcinfo)
3918
29
{
3919
29
  if (fcinfo->context && IsA(fcinfo->context, AggState))
3920
29
  {
3921
29
    AggState   *aggstate = (AggState *) fcinfo->context;
3922
29
    AggStatePerAgg curperagg;
3923
29
    AggStatePerTrans curpertrans;
3924
3925
    /* check curperagg (valid when in a final function) */
3926
29
    curperagg = aggstate->curperagg;
3927
3928
29
    if (curperagg)
3929
0
      return aggstate->pertrans[curperagg->transno].aggshared;
3930
3931
    /* check curpertrans (valid when in a transition function) */
3932
29
    curpertrans = aggstate->curpertrans;
3933
3934
29
    if (curpertrans)
3935
29
      return curpertrans->aggshared;
3936
0
  }
3937
0
  return true;
3938
0
}
3939
3940
/*
3941
 * AggRegisterCallback - register a cleanup callback for an aggregate
3942
 *
3943
 * This is useful for aggs to register shutdown callbacks, which will ensure
3944
 * that non-memory resources are freed.  The callback will occur just before
3945
 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3946
 * either between groups or as a result of rescanning the query.  The callback
3947
 * will NOT be called on error paths.  The typical use-case is for freeing of
3948
 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3949
 * created by the agg functions.  (The callback will not be called until after
3950
 * the result of the finalfn is no longer needed, so it's safe for the finalfn
3951
 * to return data that will be freed by the callback.)
3952
 *
3953
 * As above, this is currently not useful for aggs called as window functions.
3954
 */
3955
void
3956
AggRegisterCallback(FunctionCallInfo fcinfo,
3957
          ExprContextCallbackFunction func,
3958
          Datum arg)
3959
84
{
3960
84
  if (fcinfo->context && IsA(fcinfo->context, AggState))
3961
84
  {
3962
84
    AggState   *aggstate = (AggState *) fcinfo->context;
3963
84
    ExprContext *cxt = aggstate->curaggcontext;
3964
3965
84
    RegisterExprContextCallback(cxt, func, arg);
3966
3967
84
    return;
3968
84
  }
3969
0
  elog(ERROR, "aggregate function cannot register a callback in this context");
3970
0
}
3971
3972
3973
/*
3974
 * aggregate_dummy - dummy execution routine for aggregate functions
3975
 *
3976
 * This function is listed as the implementation (prosrc field) of pg_proc
3977
 * entries for aggregate functions.  Its only purpose is to throw an error
3978
 * if someone mistakenly executes such a function in the normal way.
3979
 *
3980
 * Perhaps someday we could assign real meaning to the prosrc field of
3981
 * an aggregate?
3982
 */
3983
Datum
3984
aggregate_dummy(PG_FUNCTION_ARGS)
3985
0
{
3986
0
  elog(ERROR, "aggregate function %u called as normal function",
3987
0
     fcinfo->flinfo->fn_oid);
3988
0
  return (Datum) 0;     /* keep compiler quiet */
3989
0
}