Coverage Report

Created: 2025-07-03 06:49

/src/postgres/src/backend/executor/nodeWindowAgg.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * nodeWindowAgg.c
4
 *    routines to handle WindowAgg nodes.
5
 *
6
 * A WindowAgg node evaluates "window functions" across suitable partitions
7
 * of the input tuple set.  Any one WindowAgg works for just a single window
8
 * specification, though it can evaluate multiple window functions sharing
9
 * identical window specifications.  The input tuples are required to be
10
 * delivered in sorted order, with the PARTITION BY columns (if any) as
11
 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12
 * (The planner generates a stack of WindowAggs with intervening Sort nodes
13
 * as needed, if a query involves more than one window specification.)
14
 *
15
 * Since window functions can require access to any or all of the rows in
16
 * the current partition, we accumulate rows of the partition into a
17
 * tuplestore.  The window functions are called using the WindowObject API
18
 * so that they can access those rows as needed.
19
 *
20
 * We also support using plain aggregate functions as window functions.
21
 * For these, the regular Agg-node environment is emulated for each partition.
22
 * As required by the SQL spec, the output represents the value of the
23
 * aggregate function over all rows in the current row's window frame.
24
 *
25
 *
26
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
27
 * Portions Copyright (c) 1994, Regents of the University of California
28
 *
29
 * IDENTIFICATION
30
 *    src/backend/executor/nodeWindowAgg.c
31
 *
32
 *-------------------------------------------------------------------------
33
 */
34
#include "postgres.h"
35
36
#include "access/htup_details.h"
37
#include "catalog/objectaccess.h"
38
#include "catalog/pg_aggregate.h"
39
#include "catalog/pg_proc.h"
40
#include "executor/executor.h"
41
#include "executor/nodeWindowAgg.h"
42
#include "miscadmin.h"
43
#include "nodes/nodeFuncs.h"
44
#include "optimizer/clauses.h"
45
#include "optimizer/optimizer.h"
46
#include "parser/parse_agg.h"
47
#include "parser/parse_coerce.h"
48
#include "utils/acl.h"
49
#include "utils/builtins.h"
50
#include "utils/datum.h"
51
#include "utils/expandeddatum.h"
52
#include "utils/lsyscache.h"
53
#include "utils/memutils.h"
54
#include "utils/regproc.h"
55
#include "utils/syscache.h"
56
#include "windowapi.h"
57
58
/*
59
 * All the window function APIs are called with this object, which is passed
60
 * to window functions as fcinfo->context.
61
 */
62
typedef struct WindowObjectData
63
{
64
  NodeTag   type;
65
  WindowAggState *winstate; /* parent WindowAggState */
66
  List     *argstates;    /* ExprState trees for fn's arguments */
67
  void     *localmem;   /* WinGetPartitionLocalMemory's chunk */
68
  int     markptr;    /* tuplestore mark pointer for this fn */
69
  int     readptr;    /* tuplestore read pointer for this fn */
70
  int64   markpos;    /* row that markptr is positioned on */
71
  int64   seekpos;    /* row that readptr is positioned on */
72
} WindowObjectData;
73
74
/*
75
 * We have one WindowStatePerFunc struct for each window function and
76
 * window aggregate handled by this node.
77
 */
78
typedef struct WindowStatePerFuncData
79
{
80
  /* Links to WindowFunc expr and state nodes this working state is for */
81
  WindowFuncExprState *wfuncstate;
82
  WindowFunc *wfunc;
83
84
  int     numArguments; /* number of arguments */
85
86
  FmgrInfo  flinfo;     /* fmgr lookup data for window function */
87
88
  Oid     winCollation; /* collation derived for window function */
89
90
  /*
91
   * We need the len and byval info for the result of each function in order
92
   * to know how to copy/delete values.
93
   */
94
  int16   resulttypeLen;
95
  bool    resulttypeByVal;
96
97
  bool    plain_agg;    /* is it just a plain aggregate function? */
98
  int     aggno;      /* if so, index of its WindowStatePerAggData */
99
100
  WindowObject winobj;    /* object used in window function API */
101
}     WindowStatePerFuncData;
102
103
/*
104
 * For plain aggregate window functions, we also have one of these.
105
 */
106
typedef struct WindowStatePerAggData
107
{
108
  /* Oids of transition functions */
109
  Oid     transfn_oid;
110
  Oid     invtransfn_oid; /* may be InvalidOid */
111
  Oid     finalfn_oid;  /* may be InvalidOid */
112
113
  /*
114
   * fmgr lookup data for transition functions --- only valid when
115
   * corresponding oid is not InvalidOid.  Note in particular that fn_strict
116
   * flags are kept here.
117
   */
118
  FmgrInfo  transfn;
119
  FmgrInfo  invtransfn;
120
  FmgrInfo  finalfn;
121
122
  int     numFinalArgs; /* number of arguments to pass to finalfn */
123
124
  /*
125
   * initial value from pg_aggregate entry
126
   */
127
  Datum   initValue;
128
  bool    initValueIsNull;
129
130
  /*
131
   * cached value for current frame boundaries
132
   */
133
  Datum   resultValue;
134
  bool    resultValueIsNull;
135
136
  /*
137
   * We need the len and byval info for the agg's input, result, and
138
   * transition data types in order to know how to copy/delete values.
139
   */
140
  int16   inputtypeLen,
141
        resulttypeLen,
142
        transtypeLen;
143
  bool    inputtypeByVal,
144
        resulttypeByVal,
145
        transtypeByVal;
146
147
  int     wfuncno;    /* index of associated WindowStatePerFuncData */
148
149
  /* Context holding transition value and possibly other subsidiary data */
150
  MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
151
152
  /* Current transition value */
153
  Datum   transValue;   /* current transition value */
154
  bool    transValueIsNull;
155
156
  int64   transValueCount;  /* number of currently-aggregated rows */
157
158
  /* Data local to eval_windowaggregates() */
159
  bool    restart;    /* need to restart this agg in this cycle? */
160
} WindowStatePerAggData;
161
162
static void initialize_windowaggregate(WindowAggState *winstate,
163
                     WindowStatePerFunc perfuncstate,
164
                     WindowStatePerAgg peraggstate);
165
static void advance_windowaggregate(WindowAggState *winstate,
166
                  WindowStatePerFunc perfuncstate,
167
                  WindowStatePerAgg peraggstate);
168
static bool advance_windowaggregate_base(WindowAggState *winstate,
169
                     WindowStatePerFunc perfuncstate,
170
                     WindowStatePerAgg peraggstate);
171
static void finalize_windowaggregate(WindowAggState *winstate,
172
                   WindowStatePerFunc perfuncstate,
173
                   WindowStatePerAgg peraggstate,
174
                   Datum *result, bool *isnull);
175
176
static void eval_windowaggregates(WindowAggState *winstate);
177
static void eval_windowfunction(WindowAggState *winstate,
178
                WindowStatePerFunc perfuncstate,
179
                Datum *result, bool *isnull);
180
181
static void begin_partition(WindowAggState *winstate);
182
static void spool_tuples(WindowAggState *winstate, int64 pos);
183
static void release_partition(WindowAggState *winstate);
184
185
static int  row_is_in_frame(WindowAggState *winstate, int64 pos,
186
              TupleTableSlot *slot);
187
static void update_frameheadpos(WindowAggState *winstate);
188
static void update_frametailpos(WindowAggState *winstate);
189
static void update_grouptailpos(WindowAggState *winstate);
190
191
static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
192
                        WindowFunc *wfunc,
193
                        WindowStatePerAgg peraggstate);
194
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
195
196
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
197
            TupleTableSlot *slot2);
198
static bool window_gettupleslot(WindowObject winobj, int64 pos,
199
                TupleTableSlot *slot);
200
201
202
/*
203
 * initialize_windowaggregate
204
 * parallel to initialize_aggregates in nodeAgg.c
205
 */
206
static void
207
initialize_windowaggregate(WindowAggState *winstate,
208
               WindowStatePerFunc perfuncstate,
209
               WindowStatePerAgg peraggstate)
210
0
{
211
0
  MemoryContext oldContext;
212
213
  /*
214
   * If we're using a private aggcontext, we may reset it here.  But if the
215
   * context is shared, we don't know which other aggregates may still need
216
   * it, so we must leave it to the caller to reset at an appropriate time.
217
   */
218
0
  if (peraggstate->aggcontext != winstate->aggcontext)
219
0
    MemoryContextReset(peraggstate->aggcontext);
220
221
0
  if (peraggstate->initValueIsNull)
222
0
    peraggstate->transValue = peraggstate->initValue;
223
0
  else
224
0
  {
225
0
    oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
226
0
    peraggstate->transValue = datumCopy(peraggstate->initValue,
227
0
                      peraggstate->transtypeByVal,
228
0
                      peraggstate->transtypeLen);
229
0
    MemoryContextSwitchTo(oldContext);
230
0
  }
231
0
  peraggstate->transValueIsNull = peraggstate->initValueIsNull;
232
0
  peraggstate->transValueCount = 0;
233
0
  peraggstate->resultValue = (Datum) 0;
234
0
  peraggstate->resultValueIsNull = true;
235
0
}
236
237
/*
238
 * advance_windowaggregate
239
 * parallel to advance_aggregates in nodeAgg.c
240
 */
241
static void
242
advance_windowaggregate(WindowAggState *winstate,
243
            WindowStatePerFunc perfuncstate,
244
            WindowStatePerAgg peraggstate)
245
0
{
246
0
  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
247
0
  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
248
0
  int     numArguments = perfuncstate->numArguments;
249
0
  Datum   newVal;
250
0
  ListCell   *arg;
251
0
  int     i;
252
0
  MemoryContext oldContext;
253
0
  ExprContext *econtext = winstate->tmpcontext;
254
0
  ExprState  *filter = wfuncstate->aggfilter;
255
256
0
  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
257
258
  /* Skip anything FILTERed out */
259
0
  if (filter)
260
0
  {
261
0
    bool    isnull;
262
0
    Datum   res = ExecEvalExpr(filter, econtext, &isnull);
263
264
0
    if (isnull || !DatumGetBool(res))
265
0
    {
266
0
      MemoryContextSwitchTo(oldContext);
267
0
      return;
268
0
    }
269
0
  }
270
271
  /* We start from 1, since the 0th arg will be the transition value */
272
0
  i = 1;
273
0
  foreach(arg, wfuncstate->args)
274
0
  {
275
0
    ExprState  *argstate = (ExprState *) lfirst(arg);
276
277
0
    fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
278
0
                       &fcinfo->args[i].isnull);
279
0
    i++;
280
0
  }
281
282
0
  if (peraggstate->transfn.fn_strict)
283
0
  {
284
    /*
285
     * For a strict transfn, nothing happens when there's a NULL input; we
286
     * just keep the prior transValue.  Note transValueCount doesn't
287
     * change either.
288
     */
289
0
    for (i = 1; i <= numArguments; i++)
290
0
    {
291
0
      if (fcinfo->args[i].isnull)
292
0
      {
293
0
        MemoryContextSwitchTo(oldContext);
294
0
        return;
295
0
      }
296
0
    }
297
298
    /*
299
     * For strict transition functions with initial value NULL we use the
300
     * first non-NULL input as the initial state.  (We already checked
301
     * that the agg's input type is binary-compatible with its transtype,
302
     * so straight copy here is OK.)
303
     *
304
     * We must copy the datum into aggcontext if it is pass-by-ref.  We do
305
     * not need to pfree the old transValue, since it's NULL.
306
     */
307
0
    if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
308
0
    {
309
0
      MemoryContextSwitchTo(peraggstate->aggcontext);
310
0
      peraggstate->transValue = datumCopy(fcinfo->args[1].value,
311
0
                        peraggstate->transtypeByVal,
312
0
                        peraggstate->transtypeLen);
313
0
      peraggstate->transValueIsNull = false;
314
0
      peraggstate->transValueCount = 1;
315
0
      MemoryContextSwitchTo(oldContext);
316
0
      return;
317
0
    }
318
319
0
    if (peraggstate->transValueIsNull)
320
0
    {
321
      /*
322
       * Don't call a strict function with NULL inputs.  Note it is
323
       * possible to get here despite the above tests, if the transfn is
324
       * strict *and* returned a NULL on a prior cycle.  If that happens
325
       * we will propagate the NULL all the way to the end.  That can
326
       * only happen if there's no inverse transition function, though,
327
       * since we disallow transitions back to NULL when there is one.
328
       */
329
0
      MemoryContextSwitchTo(oldContext);
330
0
      Assert(!OidIsValid(peraggstate->invtransfn_oid));
331
0
      return;
332
0
    }
333
0
  }
334
335
  /*
336
   * OK to call the transition function.  Set winstate->curaggcontext while
337
   * calling it, for possible use by AggCheckCallContext.
338
   */
339
0
  InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
340
0
               numArguments + 1,
341
0
               perfuncstate->winCollation,
342
0
               (Node *) winstate, NULL);
343
0
  fcinfo->args[0].value = peraggstate->transValue;
344
0
  fcinfo->args[0].isnull = peraggstate->transValueIsNull;
345
0
  winstate->curaggcontext = peraggstate->aggcontext;
346
0
  newVal = FunctionCallInvoke(fcinfo);
347
0
  winstate->curaggcontext = NULL;
348
349
  /*
350
   * Moving-aggregate transition functions must not return null, see
351
   * advance_windowaggregate_base().
352
   */
353
0
  if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
354
0
    ereport(ERROR,
355
0
        (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
356
0
         errmsg("moving-aggregate transition function must not return null")));
357
358
  /*
359
   * We must track the number of rows included in transValue, since to
360
   * remove the last input, advance_windowaggregate_base() mustn't call the
361
   * inverse transition function, but simply reset transValue back to its
362
   * initial value.
363
   */
364
0
  peraggstate->transValueCount++;
365
366
  /*
367
   * If pass-by-ref datatype, must copy the new value into aggcontext and
368
   * free the prior transValue.  But if transfn returned a pointer to its
369
   * first input, we don't need to do anything.  Also, if transfn returned a
370
   * pointer to a R/W expanded object that is already a child of the
371
   * aggcontext, assume we can adopt that value without copying it.  (See
372
   * comments for ExecAggCopyTransValue, which this code duplicates.)
373
   */
374
0
  if (!peraggstate->transtypeByVal &&
375
0
    DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
376
0
  {
377
0
    if (!fcinfo->isnull)
378
0
    {
379
0
      MemoryContextSwitchTo(peraggstate->aggcontext);
380
0
      if (DatumIsReadWriteExpandedObject(newVal,
381
0
                         false,
382
0
                         peraggstate->transtypeLen) &&
383
0
        MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
384
0
         /* do nothing */ ;
385
0
      else
386
0
        newVal = datumCopy(newVal,
387
0
                   peraggstate->transtypeByVal,
388
0
                   peraggstate->transtypeLen);
389
0
    }
390
0
    if (!peraggstate->transValueIsNull)
391
0
    {
392
0
      if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
393
0
                         false,
394
0
                         peraggstate->transtypeLen))
395
0
        DeleteExpandedObject(peraggstate->transValue);
396
0
      else
397
0
        pfree(DatumGetPointer(peraggstate->transValue));
398
0
    }
399
0
  }
400
401
0
  MemoryContextSwitchTo(oldContext);
402
0
  peraggstate->transValue = newVal;
403
0
  peraggstate->transValueIsNull = fcinfo->isnull;
404
0
}
405
406
/*
407
 * advance_windowaggregate_base
408
 * Remove the oldest tuple from an aggregation.
409
 *
410
 * This is very much like advance_windowaggregate, except that we will call
411
 * the inverse transition function (which caller must have checked is
412
 * available).
413
 *
414
 * Returns true if we successfully removed the current row from this
415
 * aggregate, false if not (in the latter case, caller is responsible
416
 * for cleaning up by restarting the aggregation).
417
 */
418
static bool
419
advance_windowaggregate_base(WindowAggState *winstate,
420
               WindowStatePerFunc perfuncstate,
421
               WindowStatePerAgg peraggstate)
422
0
{
423
0
  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
424
0
  WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
425
0
  int     numArguments = perfuncstate->numArguments;
426
0
  Datum   newVal;
427
0
  ListCell   *arg;
428
0
  int     i;
429
0
  MemoryContext oldContext;
430
0
  ExprContext *econtext = winstate->tmpcontext;
431
0
  ExprState  *filter = wfuncstate->aggfilter;
432
433
0
  oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
434
435
  /* Skip anything FILTERed out */
436
0
  if (filter)
437
0
  {
438
0
    bool    isnull;
439
0
    Datum   res = ExecEvalExpr(filter, econtext, &isnull);
440
441
0
    if (isnull || !DatumGetBool(res))
442
0
    {
443
0
      MemoryContextSwitchTo(oldContext);
444
0
      return true;
445
0
    }
446
0
  }
447
448
  /* We start from 1, since the 0th arg will be the transition value */
449
0
  i = 1;
450
0
  foreach(arg, wfuncstate->args)
451
0
  {
452
0
    ExprState  *argstate = (ExprState *) lfirst(arg);
453
454
0
    fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
455
0
                       &fcinfo->args[i].isnull);
456
0
    i++;
457
0
  }
458
459
0
  if (peraggstate->invtransfn.fn_strict)
460
0
  {
461
    /*
462
     * For a strict (inv)transfn, nothing happens when there's a NULL
463
     * input; we just keep the prior transValue.  Note transValueCount
464
     * doesn't change either.
465
     */
466
0
    for (i = 1; i <= numArguments; i++)
467
0
    {
468
0
      if (fcinfo->args[i].isnull)
469
0
      {
470
0
        MemoryContextSwitchTo(oldContext);
471
0
        return true;
472
0
      }
473
0
    }
474
0
  }
475
476
  /* There should still be an added but not yet removed value */
477
0
  Assert(peraggstate->transValueCount > 0);
478
479
  /*
480
   * In moving-aggregate mode, the state must never be NULL, except possibly
481
   * before any rows have been aggregated (which is surely not the case at
482
   * this point).  This restriction allows us to interpret a NULL result
483
   * from the inverse function as meaning "sorry, can't do an inverse
484
   * transition in this case".  We already checked this in
485
   * advance_windowaggregate, but just for safety, check again.
486
   */
487
0
  if (peraggstate->transValueIsNull)
488
0
    elog(ERROR, "aggregate transition value is NULL before inverse transition");
489
490
  /*
491
   * We mustn't use the inverse transition function to remove the last
492
   * input.  Doing so would yield a non-NULL state, whereas we should be in
493
   * the initial state afterwards which may very well be NULL.  So instead,
494
   * we simply re-initialize the aggregate in this case.
495
   */
496
0
  if (peraggstate->transValueCount == 1)
497
0
  {
498
0
    MemoryContextSwitchTo(oldContext);
499
0
    initialize_windowaggregate(winstate,
500
0
                   &winstate->perfunc[peraggstate->wfuncno],
501
0
                   peraggstate);
502
0
    return true;
503
0
  }
504
505
  /*
506
   * OK to call the inverse transition function.  Set
507
   * winstate->curaggcontext while calling it, for possible use by
508
   * AggCheckCallContext.
509
   */
510
0
  InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
511
0
               numArguments + 1,
512
0
               perfuncstate->winCollation,
513
0
               (Node *) winstate, NULL);
514
0
  fcinfo->args[0].value = peraggstate->transValue;
515
0
  fcinfo->args[0].isnull = peraggstate->transValueIsNull;
516
0
  winstate->curaggcontext = peraggstate->aggcontext;
517
0
  newVal = FunctionCallInvoke(fcinfo);
518
0
  winstate->curaggcontext = NULL;
519
520
  /*
521
   * If the function returns NULL, report failure, forcing a restart.
522
   */
523
0
  if (fcinfo->isnull)
524
0
  {
525
0
    MemoryContextSwitchTo(oldContext);
526
0
    return false;
527
0
  }
528
529
  /* Update number of rows included in transValue */
530
0
  peraggstate->transValueCount--;
531
532
  /*
533
   * If pass-by-ref datatype, must copy the new value into aggcontext and
534
   * free the prior transValue.  But if invtransfn returned a pointer to its
535
   * first input, we don't need to do anything.  Also, if invtransfn
536
   * returned a pointer to a R/W expanded object that is already a child of
537
   * the aggcontext, assume we can adopt that value without copying it. (See
538
   * comments for ExecAggCopyTransValue, which this code duplicates.)
539
   *
540
   * Note: the checks for null values here will never fire, but it seems
541
   * best to have this stanza look just like advance_windowaggregate.
542
   */
543
0
  if (!peraggstate->transtypeByVal &&
544
0
    DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
545
0
  {
546
0
    if (!fcinfo->isnull)
547
0
    {
548
0
      MemoryContextSwitchTo(peraggstate->aggcontext);
549
0
      if (DatumIsReadWriteExpandedObject(newVal,
550
0
                         false,
551
0
                         peraggstate->transtypeLen) &&
552
0
        MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
553
0
         /* do nothing */ ;
554
0
      else
555
0
        newVal = datumCopy(newVal,
556
0
                   peraggstate->transtypeByVal,
557
0
                   peraggstate->transtypeLen);
558
0
    }
559
0
    if (!peraggstate->transValueIsNull)
560
0
    {
561
0
      if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
562
0
                         false,
563
0
                         peraggstate->transtypeLen))
564
0
        DeleteExpandedObject(peraggstate->transValue);
565
0
      else
566
0
        pfree(DatumGetPointer(peraggstate->transValue));
567
0
    }
568
0
  }
569
570
0
  MemoryContextSwitchTo(oldContext);
571
0
  peraggstate->transValue = newVal;
572
0
  peraggstate->transValueIsNull = fcinfo->isnull;
573
574
0
  return true;
575
0
}
576
577
/*
578
 * finalize_windowaggregate
579
 * parallel to finalize_aggregate in nodeAgg.c
580
 */
581
static void
582
finalize_windowaggregate(WindowAggState *winstate,
583
             WindowStatePerFunc perfuncstate,
584
             WindowStatePerAgg peraggstate,
585
             Datum *result, bool *isnull)
586
0
{
587
0
  MemoryContext oldContext;
588
589
0
  oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
590
591
  /*
592
   * Apply the agg's finalfn if one is provided, else return transValue.
593
   */
594
0
  if (OidIsValid(peraggstate->finalfn_oid))
595
0
  {
596
0
    LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
597
0
    int     numFinalArgs = peraggstate->numFinalArgs;
598
0
    bool    anynull;
599
0
    int     i;
600
601
0
    InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
602
0
                 numFinalArgs,
603
0
                 perfuncstate->winCollation,
604
0
                 (Node *) winstate, NULL);
605
0
    fcinfo->args[0].value =
606
0
      MakeExpandedObjectReadOnly(peraggstate->transValue,
607
0
                     peraggstate->transValueIsNull,
608
0
                     peraggstate->transtypeLen);
609
0
    fcinfo->args[0].isnull = peraggstate->transValueIsNull;
610
0
    anynull = peraggstate->transValueIsNull;
611
612
    /* Fill any remaining argument positions with nulls */
613
0
    for (i = 1; i < numFinalArgs; i++)
614
0
    {
615
0
      fcinfo->args[i].value = (Datum) 0;
616
0
      fcinfo->args[i].isnull = true;
617
0
      anynull = true;
618
0
    }
619
620
0
    if (fcinfo->flinfo->fn_strict && anynull)
621
0
    {
622
      /* don't call a strict function with NULL inputs */
623
0
      *result = (Datum) 0;
624
0
      *isnull = true;
625
0
    }
626
0
    else
627
0
    {
628
0
      Datum   res;
629
630
0
      winstate->curaggcontext = peraggstate->aggcontext;
631
0
      res = FunctionCallInvoke(fcinfo);
632
0
      winstate->curaggcontext = NULL;
633
0
      *isnull = fcinfo->isnull;
634
0
      *result = MakeExpandedObjectReadOnly(res,
635
0
                         fcinfo->isnull,
636
0
                         peraggstate->resulttypeLen);
637
0
    }
638
0
  }
639
0
  else
640
0
  {
641
0
    *result =
642
0
      MakeExpandedObjectReadOnly(peraggstate->transValue,
643
0
                     peraggstate->transValueIsNull,
644
0
                     peraggstate->transtypeLen);
645
0
    *isnull = peraggstate->transValueIsNull;
646
0
  }
647
648
0
  MemoryContextSwitchTo(oldContext);
649
0
}
650
651
/*
652
 * eval_windowaggregates
653
 * evaluate plain aggregates being used as window functions
654
 *
655
 * This differs from nodeAgg.c in two ways.  First, if the window's frame
656
 * start position moves, we use the inverse transition function (if it exists)
657
 * to remove rows from the transition value.  And second, we expect to be
658
 * able to call aggregate final functions repeatedly after aggregating more
659
 * data onto the same transition value.  This is not a behavior required by
660
 * nodeAgg.c.
661
 */
662
static void
663
eval_windowaggregates(WindowAggState *winstate)
664
0
{
665
0
  WindowStatePerAgg peraggstate;
666
0
  int     wfuncno,
667
0
        numaggs,
668
0
        numaggs_restart,
669
0
        i;
670
0
  int64   aggregatedupto_nonrestarted;
671
0
  MemoryContext oldContext;
672
0
  ExprContext *econtext;
673
0
  WindowObject agg_winobj;
674
0
  TupleTableSlot *agg_row_slot;
675
0
  TupleTableSlot *temp_slot;
676
677
0
  numaggs = winstate->numaggs;
678
0
  if (numaggs == 0)
679
0
    return;         /* nothing to do */
680
681
  /* final output execution is in ps_ExprContext */
682
0
  econtext = winstate->ss.ps.ps_ExprContext;
683
0
  agg_winobj = winstate->agg_winobj;
684
0
  agg_row_slot = winstate->agg_row_slot;
685
0
  temp_slot = winstate->temp_slot_1;
686
687
  /*
688
   * If the window's frame start clause is UNBOUNDED_PRECEDING and no
689
   * exclusion clause is specified, then the window frame consists of a
690
   * contiguous group of rows extending forward from the start of the
691
   * partition, and rows only enter the frame, never exit it, as the current
692
   * row advances forward.  This makes it possible to use an incremental
693
   * strategy for evaluating aggregates: we run the transition function for
694
   * each row added to the frame, and run the final function whenever we
695
   * need the current aggregate value.  This is considerably more efficient
696
   * than the naive approach of re-running the entire aggregate calculation
697
   * for each current row.  It does assume that the final function doesn't
698
   * damage the running transition value, but we have the same assumption in
699
   * nodeAgg.c too (when it rescans an existing hash table).
700
   *
701
   * If the frame start does sometimes move, we can still optimize as above
702
   * whenever successive rows share the same frame head, but if the frame
703
   * head moves beyond the previous head we try to remove those rows using
704
   * the aggregate's inverse transition function.  This function restores
705
   * the aggregate's current state to what it would be if the removed row
706
   * had never been aggregated in the first place.  Inverse transition
707
   * functions may optionally return NULL, indicating that the function was
708
   * unable to remove the tuple from aggregation.  If this happens, or if
709
   * the aggregate doesn't have an inverse transition function at all, we
710
   * must perform the aggregation all over again for all tuples within the
711
   * new frame boundaries.
712
   *
713
   * If there's any exclusion clause, then we may have to aggregate over a
714
   * non-contiguous set of rows, so we punt and recalculate for every row.
715
   * (For some frame end choices, it might be that the frame is always
716
   * contiguous anyway, but that's an optimization to investigate later.)
717
   *
718
   * In many common cases, multiple rows share the same frame and hence the
719
   * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
720
   * window, then all rows are peers and so they all have window frame equal
721
   * to the whole partition.)  We optimize such cases by calculating the
722
   * aggregate value once when we reach the first row of a peer group, and
723
   * then returning the saved value for all subsequent rows.
724
   *
725
   * 'aggregatedupto' keeps track of the first row that has not yet been
726
   * accumulated into the aggregate transition values.  Whenever we start a
727
   * new peer group, we accumulate forward to the end of the peer group.
728
   */
729
730
  /*
731
   * First, update the frame head position.
732
   *
733
   * The frame head should never move backwards, and the code below wouldn't
734
   * cope if it did, so for safety we complain if it does.
735
   */
736
0
  update_frameheadpos(winstate);
737
0
  if (winstate->frameheadpos < winstate->aggregatedbase)
738
0
    elog(ERROR, "window frame head moved backward");
739
740
  /*
741
   * If the frame didn't change compared to the previous row, we can re-use
742
   * the result values that were previously saved at the bottom of this
743
   * function.  Since we don't know the current frame's end yet, this is not
744
   * possible to check for fully.  But if the frame end mode is UNBOUNDED
745
   * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
746
   * current row lies within the previous row's frame, then the two frames'
747
   * ends must coincide.  Note that on the first row aggregatedbase ==
748
   * aggregatedupto, meaning this test must fail, so we don't need to check
749
   * the "there was no previous row" case explicitly here.
750
   */
751
0
  if (winstate->aggregatedbase == winstate->frameheadpos &&
752
0
    (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
753
0
                   FRAMEOPTION_END_CURRENT_ROW)) &&
754
0
    !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
755
0
    winstate->aggregatedbase <= winstate->currentpos &&
756
0
    winstate->aggregatedupto > winstate->currentpos)
757
0
  {
758
0
    for (i = 0; i < numaggs; i++)
759
0
    {
760
0
      peraggstate = &winstate->peragg[i];
761
0
      wfuncno = peraggstate->wfuncno;
762
0
      econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
763
0
      econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
764
0
    }
765
0
    return;
766
0
  }
767
768
  /*----------
769
   * Initialize restart flags.
770
   *
771
   * We restart the aggregation:
772
   *   - if we're processing the first row in the partition, or
773
   *   - if the frame's head moved and we cannot use an inverse
774
   *     transition function, or
775
   *   - we have an EXCLUSION clause, or
776
   *   - if the new frame doesn't overlap the old one
777
   *
778
   * Note that we don't strictly need to restart in the last case, but if
779
   * we're going to remove all rows from the aggregation anyway, a restart
780
   * surely is faster.
781
   *----------
782
   */
783
0
  numaggs_restart = 0;
784
0
  for (i = 0; i < numaggs; i++)
785
0
  {
786
0
    peraggstate = &winstate->peragg[i];
787
0
    if (winstate->currentpos == 0 ||
788
0
      (winstate->aggregatedbase != winstate->frameheadpos &&
789
0
       !OidIsValid(peraggstate->invtransfn_oid)) ||
790
0
      (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
791
0
      winstate->aggregatedupto <= winstate->frameheadpos)
792
0
    {
793
0
      peraggstate->restart = true;
794
0
      numaggs_restart++;
795
0
    }
796
0
    else
797
0
      peraggstate->restart = false;
798
0
  }
799
800
  /*
801
   * If we have any possibly-moving aggregates, attempt to advance
802
   * aggregatedbase to match the frame's head by removing input rows that
803
   * fell off the top of the frame from the aggregations.  This can fail,
804
   * i.e. advance_windowaggregate_base() can return false, in which case
805
   * we'll restart that aggregate below.
806
   */
807
0
  while (numaggs_restart < numaggs &&
808
0
       winstate->aggregatedbase < winstate->frameheadpos)
809
0
  {
810
    /*
811
     * Fetch the next tuple of those being removed. This should never fail
812
     * as we should have been here before.
813
     */
814
0
    if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
815
0
                 temp_slot))
816
0
      elog(ERROR, "could not re-fetch previously fetched frame row");
817
818
    /* Set tuple context for evaluation of aggregate arguments */
819
0
    winstate->tmpcontext->ecxt_outertuple = temp_slot;
820
821
    /*
822
     * Perform the inverse transition for each aggregate function in the
823
     * window, unless it has already been marked as needing a restart.
824
     */
825
0
    for (i = 0; i < numaggs; i++)
826
0
    {
827
0
      bool    ok;
828
829
0
      peraggstate = &winstate->peragg[i];
830
0
      if (peraggstate->restart)
831
0
        continue;
832
833
0
      wfuncno = peraggstate->wfuncno;
834
0
      ok = advance_windowaggregate_base(winstate,
835
0
                        &winstate->perfunc[wfuncno],
836
0
                        peraggstate);
837
0
      if (!ok)
838
0
      {
839
        /* Inverse transition function has failed, must restart */
840
0
        peraggstate->restart = true;
841
0
        numaggs_restart++;
842
0
      }
843
0
    }
844
845
    /* Reset per-input-tuple context after each tuple */
846
0
    ResetExprContext(winstate->tmpcontext);
847
848
    /* And advance the aggregated-row state */
849
0
    winstate->aggregatedbase++;
850
0
    ExecClearTuple(temp_slot);
851
0
  }
852
853
  /*
854
   * If we successfully advanced the base rows of all the aggregates,
855
   * aggregatedbase now equals frameheadpos; but if we failed for any, we
856
   * must forcibly update aggregatedbase.
857
   */
858
0
  winstate->aggregatedbase = winstate->frameheadpos;
859
860
  /*
861
   * If we created a mark pointer for aggregates, keep it pushed up to frame
862
   * head, so that tuplestore can discard unnecessary rows.
863
   */
864
0
  if (agg_winobj->markptr >= 0)
865
0
    WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
866
867
  /*
868
   * Now restart the aggregates that require it.
869
   *
870
   * We assume that aggregates using the shared context always restart if
871
   * *any* aggregate restarts, and we may thus clean up the shared
872
   * aggcontext if that is the case.  Private aggcontexts are reset by
873
   * initialize_windowaggregate() if their owning aggregate restarts. If we
874
   * aren't restarting an aggregate, we need to free any previously saved
875
   * result for it, else we'll leak memory.
876
   */
877
0
  if (numaggs_restart > 0)
878
0
    MemoryContextReset(winstate->aggcontext);
879
0
  for (i = 0; i < numaggs; i++)
880
0
  {
881
0
    peraggstate = &winstate->peragg[i];
882
883
    /* Aggregates using the shared ctx must restart if *any* agg does */
884
0
    Assert(peraggstate->aggcontext != winstate->aggcontext ||
885
0
         numaggs_restart == 0 ||
886
0
         peraggstate->restart);
887
888
0
    if (peraggstate->restart)
889
0
    {
890
0
      wfuncno = peraggstate->wfuncno;
891
0
      initialize_windowaggregate(winstate,
892
0
                     &winstate->perfunc[wfuncno],
893
0
                     peraggstate);
894
0
    }
895
0
    else if (!peraggstate->resultValueIsNull)
896
0
    {
897
0
      if (!peraggstate->resulttypeByVal)
898
0
        pfree(DatumGetPointer(peraggstate->resultValue));
899
0
      peraggstate->resultValue = (Datum) 0;
900
0
      peraggstate->resultValueIsNull = true;
901
0
    }
902
0
  }
903
904
  /*
905
   * Non-restarted aggregates now contain the rows between aggregatedbase
906
   * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
907
   * contain no rows.  If there are any restarted aggregates, we must thus
908
   * begin aggregating anew at frameheadpos, otherwise we may simply
909
   * continue at aggregatedupto.  We must remember the old value of
910
   * aggregatedupto to know how long to skip advancing non-restarted
911
   * aggregates.  If we modify aggregatedupto, we must also clear
912
   * agg_row_slot, per the loop invariant below.
913
   */
914
0
  aggregatedupto_nonrestarted = winstate->aggregatedupto;
915
0
  if (numaggs_restart > 0 &&
916
0
    winstate->aggregatedupto != winstate->frameheadpos)
917
0
  {
918
0
    winstate->aggregatedupto = winstate->frameheadpos;
919
0
    ExecClearTuple(agg_row_slot);
920
0
  }
921
922
  /*
923
   * Advance until we reach a row not in frame (or end of partition).
924
   *
925
   * Note the loop invariant: agg_row_slot is either empty or holds the row
926
   * at position aggregatedupto.  We advance aggregatedupto after processing
927
   * a row.
928
   */
929
0
  for (;;)
930
0
  {
931
0
    int     ret;
932
933
    /* Fetch next row if we didn't already */
934
0
    if (TupIsNull(agg_row_slot))
935
0
    {
936
0
      if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
937
0
                   agg_row_slot))
938
0
        break;     /* must be end of partition */
939
0
    }
940
941
    /*
942
     * Exit loop if no more rows can be in frame.  Skip aggregation if
943
     * current row is not in frame but there might be more in the frame.
944
     */
945
0
    ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
946
0
    if (ret < 0)
947
0
      break;
948
0
    if (ret == 0)
949
0
      goto next_tuple;
950
951
    /* Set tuple context for evaluation of aggregate arguments */
952
0
    winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
953
954
    /* Accumulate row into the aggregates */
955
0
    for (i = 0; i < numaggs; i++)
956
0
    {
957
0
      peraggstate = &winstate->peragg[i];
958
959
      /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
960
0
      if (!peraggstate->restart &&
961
0
        winstate->aggregatedupto < aggregatedupto_nonrestarted)
962
0
        continue;
963
964
0
      wfuncno = peraggstate->wfuncno;
965
0
      advance_windowaggregate(winstate,
966
0
                  &winstate->perfunc[wfuncno],
967
0
                  peraggstate);
968
0
    }
969
970
0
next_tuple:
971
    /* Reset per-input-tuple context after each tuple */
972
0
    ResetExprContext(winstate->tmpcontext);
973
974
    /* And advance the aggregated-row state */
975
0
    winstate->aggregatedupto++;
976
0
    ExecClearTuple(agg_row_slot);
977
0
  }
978
979
  /* The frame's end is not supposed to move backwards, ever */
980
0
  Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
981
982
  /*
983
   * finalize aggregates and fill result/isnull fields.
984
   */
985
0
  for (i = 0; i < numaggs; i++)
986
0
  {
987
0
    Datum    *result;
988
0
    bool     *isnull;
989
990
0
    peraggstate = &winstate->peragg[i];
991
0
    wfuncno = peraggstate->wfuncno;
992
0
    result = &econtext->ecxt_aggvalues[wfuncno];
993
0
    isnull = &econtext->ecxt_aggnulls[wfuncno];
994
0
    finalize_windowaggregate(winstate,
995
0
                 &winstate->perfunc[wfuncno],
996
0
                 peraggstate,
997
0
                 result, isnull);
998
999
    /*
1000
     * save the result in case next row shares the same frame.
1001
     *
1002
     * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1003
     * advance that the next row can't possibly share the same frame. Is
1004
     * it worth detecting that and skipping this code?
1005
     */
1006
0
    if (!peraggstate->resulttypeByVal && !*isnull)
1007
0
    {
1008
0
      oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1009
0
      peraggstate->resultValue =
1010
0
        datumCopy(*result,
1011
0
              peraggstate->resulttypeByVal,
1012
0
              peraggstate->resulttypeLen);
1013
0
      MemoryContextSwitchTo(oldContext);
1014
0
    }
1015
0
    else
1016
0
    {
1017
0
      peraggstate->resultValue = *result;
1018
0
    }
1019
0
    peraggstate->resultValueIsNull = *isnull;
1020
0
  }
1021
0
}
1022
1023
/*
1024
 * eval_windowfunction
1025
 *
1026
 * Arguments of window functions are not evaluated here, because a window
1027
 * function can need random access to arbitrary rows in the partition.
1028
 * The window function uses the special WinGetFuncArgInPartition and
1029
 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1030
 * it wants.
1031
 */
1032
static void
1033
eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1034
          Datum *result, bool *isnull)
1035
0
{
1036
0
  LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1037
0
  MemoryContext oldContext;
1038
1039
0
  oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1040
1041
  /*
1042
   * We don't pass any normal arguments to a window function, but we do pass
1043
   * it the number of arguments, in order to permit window function
1044
   * implementations to support varying numbers of arguments.  The real info
1045
   * goes through the WindowObject, which is passed via fcinfo->context.
1046
   */
1047
0
  InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1048
0
               perfuncstate->numArguments,
1049
0
               perfuncstate->winCollation,
1050
0
               (Node *) perfuncstate->winobj, NULL);
1051
  /* Just in case, make all the regular argument slots be null */
1052
0
  for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1053
0
    fcinfo->args[argno].isnull = true;
1054
  /* Window functions don't have a current aggregate context, either */
1055
0
  winstate->curaggcontext = NULL;
1056
1057
0
  *result = FunctionCallInvoke(fcinfo);
1058
0
  *isnull = fcinfo->isnull;
1059
1060
  /*
1061
   * The window function might have returned a pass-by-ref result that's
1062
   * just a pointer into one of the WindowObject's temporary slots.  That's
1063
   * not a problem if it's the only window function using the WindowObject;
1064
   * but if there's more than one function, we'd better copy the result to
1065
   * ensure it's not clobbered by later window functions.
1066
   */
1067
0
  if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1068
0
    winstate->numfuncs > 1)
1069
0
    *result = datumCopy(*result,
1070
0
              perfuncstate->resulttypeByVal,
1071
0
              perfuncstate->resulttypeLen);
1072
1073
0
  MemoryContextSwitchTo(oldContext);
1074
0
}
1075
1076
/*
1077
 * prepare_tuplestore
1078
 *    Prepare the tuplestore and all of the required read pointers for the
1079
 *    WindowAggState's frameOptions.
1080
 *
1081
 * Note: We use pg_noinline to avoid bloating the calling function with code
1082
 * which is only called once.
1083
 */
1084
static pg_noinline void
1085
prepare_tuplestore(WindowAggState *winstate)
1086
0
{
1087
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1088
0
  int     frameOptions = winstate->frameOptions;
1089
0
  int     numfuncs = winstate->numfuncs;
1090
1091
  /* we shouldn't be called if this was done already */
1092
0
  Assert(winstate->buffer == NULL);
1093
1094
  /* Create new tuplestore */
1095
0
  winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1096
1097
  /*
1098
   * Set up read pointers for the tuplestore.  The current pointer doesn't
1099
   * need BACKWARD capability, but the per-window-function read pointers do,
1100
   * and the aggregate pointer does if we might need to restart aggregation.
1101
   */
1102
0
  winstate->current_ptr = 0;  /* read pointer 0 is pre-allocated */
1103
1104
  /* reset default REWIND capability bit for current ptr */
1105
0
  tuplestore_set_eflags(winstate->buffer, 0);
1106
1107
  /* create read pointers for aggregates, if needed */
1108
0
  if (winstate->numaggs > 0)
1109
0
  {
1110
0
    WindowObject agg_winobj = winstate->agg_winobj;
1111
0
    int     readptr_flags = 0;
1112
1113
    /*
1114
     * If the frame head is potentially movable, or we have an EXCLUSION
1115
     * clause, we might need to restart aggregation ...
1116
     */
1117
0
    if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1118
0
      (frameOptions & FRAMEOPTION_EXCLUSION))
1119
0
    {
1120
      /* ... so create a mark pointer to track the frame head */
1121
0
      agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1122
      /* and the read pointer will need BACKWARD capability */
1123
0
      readptr_flags |= EXEC_FLAG_BACKWARD;
1124
0
    }
1125
1126
0
    agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1127
0
                              readptr_flags);
1128
0
  }
1129
1130
  /* create mark and read pointers for each real window function */
1131
0
  for (int i = 0; i < numfuncs; i++)
1132
0
  {
1133
0
    WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1134
1135
0
    if (!perfuncstate->plain_agg)
1136
0
    {
1137
0
      WindowObject winobj = perfuncstate->winobj;
1138
1139
0
      winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1140
0
                              0);
1141
0
      winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1142
0
                              EXEC_FLAG_BACKWARD);
1143
0
    }
1144
0
  }
1145
1146
  /*
1147
   * If we are in RANGE or GROUPS mode, then determining frame boundaries
1148
   * requires physical access to the frame endpoint rows, except in certain
1149
   * degenerate cases.  We create read pointers to point to those rows, to
1150
   * simplify access and ensure that the tuplestore doesn't discard the
1151
   * endpoint rows prematurely.  (Must create pointers in exactly the same
1152
   * cases that update_frameheadpos and update_frametailpos need them.)
1153
   */
1154
0
  winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1155
1156
0
  if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1157
0
  {
1158
0
    if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1159
0
       node->ordNumCols != 0) ||
1160
0
      (frameOptions & FRAMEOPTION_START_OFFSET))
1161
0
      winstate->framehead_ptr =
1162
0
        tuplestore_alloc_read_pointer(winstate->buffer, 0);
1163
0
    if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1164
0
       node->ordNumCols != 0) ||
1165
0
      (frameOptions & FRAMEOPTION_END_OFFSET))
1166
0
      winstate->frametail_ptr =
1167
0
        tuplestore_alloc_read_pointer(winstate->buffer, 0);
1168
0
  }
1169
1170
  /*
1171
   * If we have an exclusion clause that requires knowing the boundaries of
1172
   * the current row's peer group, we create a read pointer to track the
1173
   * tail position of the peer group (i.e., first row of the next peer
1174
   * group).  The head position does not require its own pointer because we
1175
   * maintain that as a side effect of advancing the current row.
1176
   */
1177
0
  winstate->grouptail_ptr = -1;
1178
1179
0
  if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1180
0
             FRAMEOPTION_EXCLUDE_TIES)) &&
1181
0
    node->ordNumCols != 0)
1182
0
  {
1183
0
    winstate->grouptail_ptr =
1184
0
      tuplestore_alloc_read_pointer(winstate->buffer, 0);
1185
0
  }
1186
0
}
1187
1188
/*
1189
 * begin_partition
1190
 * Start buffering rows of the next partition.
1191
 */
1192
static void
1193
begin_partition(WindowAggState *winstate)
1194
0
{
1195
0
  PlanState  *outerPlan = outerPlanState(winstate);
1196
0
  int     numfuncs = winstate->numfuncs;
1197
1198
0
  winstate->partition_spooled = false;
1199
0
  winstate->framehead_valid = false;
1200
0
  winstate->frametail_valid = false;
1201
0
  winstate->grouptail_valid = false;
1202
0
  winstate->spooled_rows = 0;
1203
0
  winstate->currentpos = 0;
1204
0
  winstate->frameheadpos = 0;
1205
0
  winstate->frametailpos = 0;
1206
0
  winstate->currentgroup = 0;
1207
0
  winstate->frameheadgroup = 0;
1208
0
  winstate->frametailgroup = 0;
1209
0
  winstate->groupheadpos = 0;
1210
0
  winstate->grouptailpos = -1;  /* see update_grouptailpos */
1211
0
  ExecClearTuple(winstate->agg_row_slot);
1212
0
  if (winstate->framehead_slot)
1213
0
    ExecClearTuple(winstate->framehead_slot);
1214
0
  if (winstate->frametail_slot)
1215
0
    ExecClearTuple(winstate->frametail_slot);
1216
1217
  /*
1218
   * If this is the very first partition, we need to fetch the first input
1219
   * row to store in first_part_slot.
1220
   */
1221
0
  if (TupIsNull(winstate->first_part_slot))
1222
0
  {
1223
0
    TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1224
1225
0
    if (!TupIsNull(outerslot))
1226
0
      ExecCopySlot(winstate->first_part_slot, outerslot);
1227
0
    else
1228
0
    {
1229
      /* outer plan is empty, so we have nothing to do */
1230
0
      winstate->partition_spooled = true;
1231
0
      winstate->more_partitions = false;
1232
0
      return;
1233
0
    }
1234
0
  }
1235
1236
  /* Create new tuplestore if not done already. */
1237
0
  if (unlikely(winstate->buffer == NULL))
1238
0
    prepare_tuplestore(winstate);
1239
1240
0
  winstate->next_partition = false;
1241
1242
0
  if (winstate->numaggs > 0)
1243
0
  {
1244
0
    WindowObject agg_winobj = winstate->agg_winobj;
1245
1246
    /* reset mark and see positions for aggregate functions */
1247
0
    agg_winobj->markpos = -1;
1248
0
    agg_winobj->seekpos = -1;
1249
1250
    /* Also reset the row counters for aggregates */
1251
0
    winstate->aggregatedbase = 0;
1252
0
    winstate->aggregatedupto = 0;
1253
0
  }
1254
1255
  /* reset mark and seek positions for each real window function */
1256
0
  for (int i = 0; i < numfuncs; i++)
1257
0
  {
1258
0
    WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1259
1260
0
    if (!perfuncstate->plain_agg)
1261
0
    {
1262
0
      WindowObject winobj = perfuncstate->winobj;
1263
1264
0
      winobj->markpos = -1;
1265
0
      winobj->seekpos = -1;
1266
0
    }
1267
0
  }
1268
1269
  /*
1270
   * Store the first tuple into the tuplestore (it's always available now;
1271
   * we either read it above, or saved it at the end of previous partition)
1272
   */
1273
0
  tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1274
0
  winstate->spooled_rows++;
1275
0
}
1276
1277
/*
1278
 * Read tuples from the outer node, up to and including position 'pos', and
1279
 * store them into the tuplestore. If pos is -1, reads the whole partition.
1280
 */
1281
static void
1282
spool_tuples(WindowAggState *winstate, int64 pos)
1283
0
{
1284
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1285
0
  PlanState  *outerPlan;
1286
0
  TupleTableSlot *outerslot;
1287
0
  MemoryContext oldcontext;
1288
1289
0
  if (!winstate->buffer)
1290
0
    return;         /* just a safety check */
1291
0
  if (winstate->partition_spooled)
1292
0
    return;         /* whole partition done already */
1293
1294
  /*
1295
   * When in pass-through mode we can just exhaust all tuples in the current
1296
   * partition.  We don't need these tuples for any further window function
1297
   * evaluation, however, we do need to keep them around if we're not the
1298
   * top-level window as another WindowAgg node above must see these.
1299
   */
1300
0
  if (winstate->status != WINDOWAGG_RUN)
1301
0
  {
1302
0
    Assert(winstate->status == WINDOWAGG_PASSTHROUGH ||
1303
0
         winstate->status == WINDOWAGG_PASSTHROUGH_STRICT);
1304
1305
0
    pos = -1;
1306
0
  }
1307
1308
  /*
1309
   * If the tuplestore has spilled to disk, alternate reading and writing
1310
   * becomes quite expensive due to frequent buffer flushes.  It's cheaper
1311
   * to force the entire partition to get spooled in one go.
1312
   *
1313
   * XXX this is a horrid kluge --- it'd be better to fix the performance
1314
   * problem inside tuplestore.  FIXME
1315
   */
1316
0
  else if (!tuplestore_in_memory(winstate->buffer))
1317
0
    pos = -1;
1318
1319
0
  outerPlan = outerPlanState(winstate);
1320
1321
  /* Must be in query context to call outerplan */
1322
0
  oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1323
1324
0
  while (winstate->spooled_rows <= pos || pos == -1)
1325
0
  {
1326
0
    outerslot = ExecProcNode(outerPlan);
1327
0
    if (TupIsNull(outerslot))
1328
0
    {
1329
      /* reached the end of the last partition */
1330
0
      winstate->partition_spooled = true;
1331
0
      winstate->more_partitions = false;
1332
0
      break;
1333
0
    }
1334
1335
0
    if (node->partNumCols > 0)
1336
0
    {
1337
0
      ExprContext *econtext = winstate->tmpcontext;
1338
1339
0
      econtext->ecxt_innertuple = winstate->first_part_slot;
1340
0
      econtext->ecxt_outertuple = outerslot;
1341
1342
      /* Check if this tuple still belongs to the current partition */
1343
0
      if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1344
0
      {
1345
        /*
1346
         * end of partition; copy the tuple for the next cycle.
1347
         */
1348
0
        ExecCopySlot(winstate->first_part_slot, outerslot);
1349
0
        winstate->partition_spooled = true;
1350
0
        winstate->more_partitions = true;
1351
0
        break;
1352
0
      }
1353
0
    }
1354
1355
    /*
1356
     * Remember the tuple unless we're the top-level window and we're in
1357
     * pass-through mode.
1358
     */
1359
0
    if (winstate->status != WINDOWAGG_PASSTHROUGH_STRICT)
1360
0
    {
1361
      /* Still in partition, so save it into the tuplestore */
1362
0
      tuplestore_puttupleslot(winstate->buffer, outerslot);
1363
0
      winstate->spooled_rows++;
1364
0
    }
1365
0
  }
1366
1367
0
  MemoryContextSwitchTo(oldcontext);
1368
0
}
1369
1370
/*
1371
 * release_partition
1372
 * clear information kept within a partition, including
1373
 * tuplestore and aggregate results.
1374
 */
1375
static void
1376
release_partition(WindowAggState *winstate)
1377
0
{
1378
0
  int     i;
1379
1380
0
  for (i = 0; i < winstate->numfuncs; i++)
1381
0
  {
1382
0
    WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1383
1384
    /* Release any partition-local state of this window function */
1385
0
    if (perfuncstate->winobj)
1386
0
      perfuncstate->winobj->localmem = NULL;
1387
0
  }
1388
1389
  /*
1390
   * Release all partition-local memory (in particular, any partition-local
1391
   * state that we might have trashed our pointers to in the above loop, and
1392
   * any aggregate temp data).  We don't rely on retail pfree because some
1393
   * aggregates might have allocated data we don't have direct pointers to.
1394
   */
1395
0
  MemoryContextReset(winstate->partcontext);
1396
0
  MemoryContextReset(winstate->aggcontext);
1397
0
  for (i = 0; i < winstate->numaggs; i++)
1398
0
  {
1399
0
    if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1400
0
      MemoryContextReset(winstate->peragg[i].aggcontext);
1401
0
  }
1402
1403
0
  if (winstate->buffer)
1404
0
    tuplestore_clear(winstate->buffer);
1405
0
  winstate->partition_spooled = false;
1406
0
  winstate->next_partition = true;
1407
0
}
1408
1409
/*
1410
 * row_is_in_frame
1411
 * Determine whether a row is in the current row's window frame according
1412
 * to our window framing rule
1413
 *
1414
 * The caller must have already determined that the row is in the partition
1415
 * and fetched it into a slot.  This function just encapsulates the framing
1416
 * rules.
1417
 *
1418
 * Returns:
1419
 * -1, if the row is out of frame and no succeeding rows can be in frame
1420
 * 0, if the row is out of frame but succeeding rows might be in frame
1421
 * 1, if the row is in frame
1422
 *
1423
 * May clobber winstate->temp_slot_2.
1424
 */
1425
static int
1426
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1427
0
{
1428
0
  int     frameOptions = winstate->frameOptions;
1429
1430
0
  Assert(pos >= 0);     /* else caller error */
1431
1432
  /*
1433
   * First, check frame starting conditions.  We might as well delegate this
1434
   * to update_frameheadpos always; it doesn't add any notable cost.
1435
   */
1436
0
  update_frameheadpos(winstate);
1437
0
  if (pos < winstate->frameheadpos)
1438
0
    return 0;
1439
1440
  /*
1441
   * Okay so far, now check frame ending conditions.  Here, we avoid calling
1442
   * update_frametailpos in simple cases, so as not to spool tuples further
1443
   * ahead than necessary.
1444
   */
1445
0
  if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1446
0
  {
1447
0
    if (frameOptions & FRAMEOPTION_ROWS)
1448
0
    {
1449
      /* rows after current row are out of frame */
1450
0
      if (pos > winstate->currentpos)
1451
0
        return -1;
1452
0
    }
1453
0
    else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1454
0
    {
1455
      /* following row that is not peer is out of frame */
1456
0
      if (pos > winstate->currentpos &&
1457
0
        !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1458
0
        return -1;
1459
0
    }
1460
0
    else
1461
0
      Assert(false);
1462
0
  }
1463
0
  else if (frameOptions & FRAMEOPTION_END_OFFSET)
1464
0
  {
1465
0
    if (frameOptions & FRAMEOPTION_ROWS)
1466
0
    {
1467
0
      int64   offset = DatumGetInt64(winstate->endOffsetValue);
1468
1469
      /* rows after current row + offset are out of frame */
1470
0
      if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1471
0
        offset = -offset;
1472
1473
0
      if (pos > winstate->currentpos + offset)
1474
0
        return -1;
1475
0
    }
1476
0
    else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1477
0
    {
1478
      /* hard cases, so delegate to update_frametailpos */
1479
0
      update_frametailpos(winstate);
1480
0
      if (pos >= winstate->frametailpos)
1481
0
        return -1;
1482
0
    }
1483
0
    else
1484
0
      Assert(false);
1485
0
  }
1486
1487
  /* Check exclusion clause */
1488
0
  if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1489
0
  {
1490
0
    if (pos == winstate->currentpos)
1491
0
      return 0;
1492
0
  }
1493
0
  else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1494
0
       ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1495
0
        pos != winstate->currentpos))
1496
0
  {
1497
0
    WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1498
1499
    /* If no ORDER BY, all rows are peers with each other */
1500
0
    if (node->ordNumCols == 0)
1501
0
      return 0;
1502
    /* Otherwise, check the group boundaries */
1503
0
    if (pos >= winstate->groupheadpos)
1504
0
    {
1505
0
      update_grouptailpos(winstate);
1506
0
      if (pos < winstate->grouptailpos)
1507
0
        return 0;
1508
0
    }
1509
0
  }
1510
1511
  /* If we get here, it's in frame */
1512
0
  return 1;
1513
0
}
1514
1515
/*
1516
 * update_frameheadpos
1517
 * make frameheadpos valid for the current row
1518
 *
1519
 * Note that frameheadpos is computed without regard for any window exclusion
1520
 * clause; the current row and/or its peers are considered part of the frame
1521
 * for this purpose even if they must be excluded later.
1522
 *
1523
 * May clobber winstate->temp_slot_2.
1524
 */
1525
static void
1526
update_frameheadpos(WindowAggState *winstate)
1527
0
{
1528
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1529
0
  int     frameOptions = winstate->frameOptions;
1530
0
  MemoryContext oldcontext;
1531
1532
0
  if (winstate->framehead_valid)
1533
0
    return;         /* already known for current row */
1534
1535
  /* We may be called in a short-lived context */
1536
0
  oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1537
1538
0
  if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1539
0
  {
1540
    /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1541
0
    winstate->frameheadpos = 0;
1542
0
    winstate->framehead_valid = true;
1543
0
  }
1544
0
  else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1545
0
  {
1546
0
    if (frameOptions & FRAMEOPTION_ROWS)
1547
0
    {
1548
      /* In ROWS mode, frame head is the same as current */
1549
0
      winstate->frameheadpos = winstate->currentpos;
1550
0
      winstate->framehead_valid = true;
1551
0
    }
1552
0
    else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1553
0
    {
1554
      /* If no ORDER BY, all rows are peers with each other */
1555
0
      if (node->ordNumCols == 0)
1556
0
      {
1557
0
        winstate->frameheadpos = 0;
1558
0
        winstate->framehead_valid = true;
1559
0
        MemoryContextSwitchTo(oldcontext);
1560
0
        return;
1561
0
      }
1562
1563
      /*
1564
       * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1565
       * first row that is a peer of current row.  We keep a copy of the
1566
       * last-known frame head row in framehead_slot, and advance as
1567
       * necessary.  Note that if we reach end of partition, we will
1568
       * leave frameheadpos = end+1 and framehead_slot empty.
1569
       */
1570
0
      tuplestore_select_read_pointer(winstate->buffer,
1571
0
                       winstate->framehead_ptr);
1572
0
      if (winstate->frameheadpos == 0 &&
1573
0
        TupIsNull(winstate->framehead_slot))
1574
0
      {
1575
        /* fetch first row into framehead_slot, if we didn't already */
1576
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1577
0
                       winstate->framehead_slot))
1578
0
          elog(ERROR, "unexpected end of tuplestore");
1579
0
      }
1580
1581
0
      while (!TupIsNull(winstate->framehead_slot))
1582
0
      {
1583
0
        if (are_peers(winstate, winstate->framehead_slot,
1584
0
                winstate->ss.ss_ScanTupleSlot))
1585
0
          break;   /* this row is the correct frame head */
1586
        /* Note we advance frameheadpos even if the fetch fails */
1587
0
        winstate->frameheadpos++;
1588
0
        spool_tuples(winstate, winstate->frameheadpos);
1589
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1590
0
                       winstate->framehead_slot))
1591
0
          break;   /* end of partition */
1592
0
      }
1593
0
      winstate->framehead_valid = true;
1594
0
    }
1595
0
    else
1596
0
      Assert(false);
1597
0
  }
1598
0
  else if (frameOptions & FRAMEOPTION_START_OFFSET)
1599
0
  {
1600
0
    if (frameOptions & FRAMEOPTION_ROWS)
1601
0
    {
1602
      /* In ROWS mode, bound is physically n before/after current */
1603
0
      int64   offset = DatumGetInt64(winstate->startOffsetValue);
1604
1605
0
      if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1606
0
        offset = -offset;
1607
1608
0
      winstate->frameheadpos = winstate->currentpos + offset;
1609
      /* frame head can't go before first row */
1610
0
      if (winstate->frameheadpos < 0)
1611
0
        winstate->frameheadpos = 0;
1612
0
      else if (winstate->frameheadpos > winstate->currentpos + 1)
1613
0
      {
1614
        /* make sure frameheadpos is not past end of partition */
1615
0
        spool_tuples(winstate, winstate->frameheadpos - 1);
1616
0
        if (winstate->frameheadpos > winstate->spooled_rows)
1617
0
          winstate->frameheadpos = winstate->spooled_rows;
1618
0
      }
1619
0
      winstate->framehead_valid = true;
1620
0
    }
1621
0
    else if (frameOptions & FRAMEOPTION_RANGE)
1622
0
    {
1623
      /*
1624
       * In RANGE START_OFFSET mode, frame head is the first row that
1625
       * satisfies the in_range constraint relative to the current row.
1626
       * We keep a copy of the last-known frame head row in
1627
       * framehead_slot, and advance as necessary.  Note that if we
1628
       * reach end of partition, we will leave frameheadpos = end+1 and
1629
       * framehead_slot empty.
1630
       */
1631
0
      int     sortCol = node->ordColIdx[0];
1632
0
      bool    sub,
1633
0
            less;
1634
1635
      /* We must have an ordering column */
1636
0
      Assert(node->ordNumCols == 1);
1637
1638
      /* Precompute flags for in_range checks */
1639
0
      if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1640
0
        sub = true;   /* subtract startOffset from current row */
1641
0
      else
1642
0
        sub = false; /* add it */
1643
0
      less = false;   /* normally, we want frame head >= sum */
1644
      /* If sort order is descending, flip both flags */
1645
0
      if (!winstate->inRangeAsc)
1646
0
      {
1647
0
        sub = !sub;
1648
0
        less = true;
1649
0
      }
1650
1651
0
      tuplestore_select_read_pointer(winstate->buffer,
1652
0
                       winstate->framehead_ptr);
1653
0
      if (winstate->frameheadpos == 0 &&
1654
0
        TupIsNull(winstate->framehead_slot))
1655
0
      {
1656
        /* fetch first row into framehead_slot, if we didn't already */
1657
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1658
0
                       winstate->framehead_slot))
1659
0
          elog(ERROR, "unexpected end of tuplestore");
1660
0
      }
1661
1662
0
      while (!TupIsNull(winstate->framehead_slot))
1663
0
      {
1664
0
        Datum   headval,
1665
0
              currval;
1666
0
        bool    headisnull,
1667
0
              currisnull;
1668
1669
0
        headval = slot_getattr(winstate->framehead_slot, sortCol,
1670
0
                     &headisnull);
1671
0
        currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1672
0
                     &currisnull);
1673
0
        if (headisnull || currisnull)
1674
0
        {
1675
          /* order of the rows depends only on nulls_first */
1676
0
          if (winstate->inRangeNullsFirst)
1677
0
          {
1678
            /* advance head if head is null and curr is not */
1679
0
            if (!headisnull || currisnull)
1680
0
              break;
1681
0
          }
1682
0
          else
1683
0
          {
1684
            /* advance head if head is not null and curr is null */
1685
0
            if (headisnull || !currisnull)
1686
0
              break;
1687
0
          }
1688
0
        }
1689
0
        else
1690
0
        {
1691
0
          if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1692
0
                             winstate->inRangeColl,
1693
0
                             headval,
1694
0
                             currval,
1695
0
                             winstate->startOffsetValue,
1696
0
                             BoolGetDatum(sub),
1697
0
                             BoolGetDatum(less))))
1698
0
            break; /* this row is the correct frame head */
1699
0
        }
1700
        /* Note we advance frameheadpos even if the fetch fails */
1701
0
        winstate->frameheadpos++;
1702
0
        spool_tuples(winstate, winstate->frameheadpos);
1703
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1704
0
                       winstate->framehead_slot))
1705
0
          break;   /* end of partition */
1706
0
      }
1707
0
      winstate->framehead_valid = true;
1708
0
    }
1709
0
    else if (frameOptions & FRAMEOPTION_GROUPS)
1710
0
    {
1711
      /*
1712
       * In GROUPS START_OFFSET mode, frame head is the first row of the
1713
       * first peer group whose number satisfies the offset constraint.
1714
       * We keep a copy of the last-known frame head row in
1715
       * framehead_slot, and advance as necessary.  Note that if we
1716
       * reach end of partition, we will leave frameheadpos = end+1 and
1717
       * framehead_slot empty.
1718
       */
1719
0
      int64   offset = DatumGetInt64(winstate->startOffsetValue);
1720
0
      int64   minheadgroup;
1721
1722
0
      if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1723
0
        minheadgroup = winstate->currentgroup - offset;
1724
0
      else
1725
0
        minheadgroup = winstate->currentgroup + offset;
1726
1727
0
      tuplestore_select_read_pointer(winstate->buffer,
1728
0
                       winstate->framehead_ptr);
1729
0
      if (winstate->frameheadpos == 0 &&
1730
0
        TupIsNull(winstate->framehead_slot))
1731
0
      {
1732
        /* fetch first row into framehead_slot, if we didn't already */
1733
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1734
0
                       winstate->framehead_slot))
1735
0
          elog(ERROR, "unexpected end of tuplestore");
1736
0
      }
1737
1738
0
      while (!TupIsNull(winstate->framehead_slot))
1739
0
      {
1740
0
        if (winstate->frameheadgroup >= minheadgroup)
1741
0
          break;   /* this row is the correct frame head */
1742
0
        ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1743
        /* Note we advance frameheadpos even if the fetch fails */
1744
0
        winstate->frameheadpos++;
1745
0
        spool_tuples(winstate, winstate->frameheadpos);
1746
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1747
0
                       winstate->framehead_slot))
1748
0
          break;   /* end of partition */
1749
0
        if (!are_peers(winstate, winstate->temp_slot_2,
1750
0
                 winstate->framehead_slot))
1751
0
          winstate->frameheadgroup++;
1752
0
      }
1753
0
      ExecClearTuple(winstate->temp_slot_2);
1754
0
      winstate->framehead_valid = true;
1755
0
    }
1756
0
    else
1757
0
      Assert(false);
1758
0
  }
1759
0
  else
1760
0
    Assert(false);
1761
1762
0
  MemoryContextSwitchTo(oldcontext);
1763
0
}
1764
1765
/*
1766
 * update_frametailpos
1767
 * make frametailpos valid for the current row
1768
 *
1769
 * Note that frametailpos is computed without regard for any window exclusion
1770
 * clause; the current row and/or its peers are considered part of the frame
1771
 * for this purpose even if they must be excluded later.
1772
 *
1773
 * May clobber winstate->temp_slot_2.
1774
 */
1775
static void
1776
update_frametailpos(WindowAggState *winstate)
1777
0
{
1778
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1779
0
  int     frameOptions = winstate->frameOptions;
1780
0
  MemoryContext oldcontext;
1781
1782
0
  if (winstate->frametail_valid)
1783
0
    return;         /* already known for current row */
1784
1785
  /* We may be called in a short-lived context */
1786
0
  oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1787
1788
0
  if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1789
0
  {
1790
    /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1791
0
    spool_tuples(winstate, -1);
1792
0
    winstate->frametailpos = winstate->spooled_rows;
1793
0
    winstate->frametail_valid = true;
1794
0
  }
1795
0
  else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1796
0
  {
1797
0
    if (frameOptions & FRAMEOPTION_ROWS)
1798
0
    {
1799
      /* In ROWS mode, exactly the rows up to current are in frame */
1800
0
      winstate->frametailpos = winstate->currentpos + 1;
1801
0
      winstate->frametail_valid = true;
1802
0
    }
1803
0
    else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1804
0
    {
1805
      /* If no ORDER BY, all rows are peers with each other */
1806
0
      if (node->ordNumCols == 0)
1807
0
      {
1808
0
        spool_tuples(winstate, -1);
1809
0
        winstate->frametailpos = winstate->spooled_rows;
1810
0
        winstate->frametail_valid = true;
1811
0
        MemoryContextSwitchTo(oldcontext);
1812
0
        return;
1813
0
      }
1814
1815
      /*
1816
       * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1817
       * row that is a peer of current row, frame tail is the row after
1818
       * that (if any).  We keep a copy of the last-known frame tail row
1819
       * in frametail_slot, and advance as necessary.  Note that if we
1820
       * reach end of partition, we will leave frametailpos = end+1 and
1821
       * frametail_slot empty.
1822
       */
1823
0
      tuplestore_select_read_pointer(winstate->buffer,
1824
0
                       winstate->frametail_ptr);
1825
0
      if (winstate->frametailpos == 0 &&
1826
0
        TupIsNull(winstate->frametail_slot))
1827
0
      {
1828
        /* fetch first row into frametail_slot, if we didn't already */
1829
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1830
0
                       winstate->frametail_slot))
1831
0
          elog(ERROR, "unexpected end of tuplestore");
1832
0
      }
1833
1834
0
      while (!TupIsNull(winstate->frametail_slot))
1835
0
      {
1836
0
        if (winstate->frametailpos > winstate->currentpos &&
1837
0
          !are_peers(winstate, winstate->frametail_slot,
1838
0
                 winstate->ss.ss_ScanTupleSlot))
1839
0
          break;   /* this row is the frame tail */
1840
        /* Note we advance frametailpos even if the fetch fails */
1841
0
        winstate->frametailpos++;
1842
0
        spool_tuples(winstate, winstate->frametailpos);
1843
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1844
0
                       winstate->frametail_slot))
1845
0
          break;   /* end of partition */
1846
0
      }
1847
0
      winstate->frametail_valid = true;
1848
0
    }
1849
0
    else
1850
0
      Assert(false);
1851
0
  }
1852
0
  else if (frameOptions & FRAMEOPTION_END_OFFSET)
1853
0
  {
1854
0
    if (frameOptions & FRAMEOPTION_ROWS)
1855
0
    {
1856
      /* In ROWS mode, bound is physically n before/after current */
1857
0
      int64   offset = DatumGetInt64(winstate->endOffsetValue);
1858
1859
0
      if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1860
0
        offset = -offset;
1861
1862
0
      winstate->frametailpos = winstate->currentpos + offset + 1;
1863
      /* smallest allowable value of frametailpos is 0 */
1864
0
      if (winstate->frametailpos < 0)
1865
0
        winstate->frametailpos = 0;
1866
0
      else if (winstate->frametailpos > winstate->currentpos + 1)
1867
0
      {
1868
        /* make sure frametailpos is not past end of partition */
1869
0
        spool_tuples(winstate, winstate->frametailpos - 1);
1870
0
        if (winstate->frametailpos > winstate->spooled_rows)
1871
0
          winstate->frametailpos = winstate->spooled_rows;
1872
0
      }
1873
0
      winstate->frametail_valid = true;
1874
0
    }
1875
0
    else if (frameOptions & FRAMEOPTION_RANGE)
1876
0
    {
1877
      /*
1878
       * In RANGE END_OFFSET mode, frame end is the last row that
1879
       * satisfies the in_range constraint relative to the current row,
1880
       * frame tail is the row after that (if any).  We keep a copy of
1881
       * the last-known frame tail row in frametail_slot, and advance as
1882
       * necessary.  Note that if we reach end of partition, we will
1883
       * leave frametailpos = end+1 and frametail_slot empty.
1884
       */
1885
0
      int     sortCol = node->ordColIdx[0];
1886
0
      bool    sub,
1887
0
            less;
1888
1889
      /* We must have an ordering column */
1890
0
      Assert(node->ordNumCols == 1);
1891
1892
      /* Precompute flags for in_range checks */
1893
0
      if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1894
0
        sub = true;   /* subtract endOffset from current row */
1895
0
      else
1896
0
        sub = false; /* add it */
1897
0
      less = true;    /* normally, we want frame tail <= sum */
1898
      /* If sort order is descending, flip both flags */
1899
0
      if (!winstate->inRangeAsc)
1900
0
      {
1901
0
        sub = !sub;
1902
0
        less = false;
1903
0
      }
1904
1905
0
      tuplestore_select_read_pointer(winstate->buffer,
1906
0
                       winstate->frametail_ptr);
1907
0
      if (winstate->frametailpos == 0 &&
1908
0
        TupIsNull(winstate->frametail_slot))
1909
0
      {
1910
        /* fetch first row into frametail_slot, if we didn't already */
1911
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1912
0
                       winstate->frametail_slot))
1913
0
          elog(ERROR, "unexpected end of tuplestore");
1914
0
      }
1915
1916
0
      while (!TupIsNull(winstate->frametail_slot))
1917
0
      {
1918
0
        Datum   tailval,
1919
0
              currval;
1920
0
        bool    tailisnull,
1921
0
              currisnull;
1922
1923
0
        tailval = slot_getattr(winstate->frametail_slot, sortCol,
1924
0
                     &tailisnull);
1925
0
        currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1926
0
                     &currisnull);
1927
0
        if (tailisnull || currisnull)
1928
0
        {
1929
          /* order of the rows depends only on nulls_first */
1930
0
          if (winstate->inRangeNullsFirst)
1931
0
          {
1932
            /* advance tail if tail is null or curr is not */
1933
0
            if (!tailisnull)
1934
0
              break;
1935
0
          }
1936
0
          else
1937
0
          {
1938
            /* advance tail if tail is not null or curr is null */
1939
0
            if (!currisnull)
1940
0
              break;
1941
0
          }
1942
0
        }
1943
0
        else
1944
0
        {
1945
0
          if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1946
0
                            winstate->inRangeColl,
1947
0
                            tailval,
1948
0
                            currval,
1949
0
                            winstate->endOffsetValue,
1950
0
                            BoolGetDatum(sub),
1951
0
                            BoolGetDatum(less))))
1952
0
            break; /* this row is the correct frame tail */
1953
0
        }
1954
        /* Note we advance frametailpos even if the fetch fails */
1955
0
        winstate->frametailpos++;
1956
0
        spool_tuples(winstate, winstate->frametailpos);
1957
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1958
0
                       winstate->frametail_slot))
1959
0
          break;   /* end of partition */
1960
0
      }
1961
0
      winstate->frametail_valid = true;
1962
0
    }
1963
0
    else if (frameOptions & FRAMEOPTION_GROUPS)
1964
0
    {
1965
      /*
1966
       * In GROUPS END_OFFSET mode, frame end is the last row of the
1967
       * last peer group whose number satisfies the offset constraint,
1968
       * and frame tail is the row after that (if any).  We keep a copy
1969
       * of the last-known frame tail row in frametail_slot, and advance
1970
       * as necessary.  Note that if we reach end of partition, we will
1971
       * leave frametailpos = end+1 and frametail_slot empty.
1972
       */
1973
0
      int64   offset = DatumGetInt64(winstate->endOffsetValue);
1974
0
      int64   maxtailgroup;
1975
1976
0
      if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1977
0
        maxtailgroup = winstate->currentgroup - offset;
1978
0
      else
1979
0
        maxtailgroup = winstate->currentgroup + offset;
1980
1981
0
      tuplestore_select_read_pointer(winstate->buffer,
1982
0
                       winstate->frametail_ptr);
1983
0
      if (winstate->frametailpos == 0 &&
1984
0
        TupIsNull(winstate->frametail_slot))
1985
0
      {
1986
        /* fetch first row into frametail_slot, if we didn't already */
1987
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1988
0
                       winstate->frametail_slot))
1989
0
          elog(ERROR, "unexpected end of tuplestore");
1990
0
      }
1991
1992
0
      while (!TupIsNull(winstate->frametail_slot))
1993
0
      {
1994
0
        if (winstate->frametailgroup > maxtailgroup)
1995
0
          break;   /* this row is the correct frame tail */
1996
0
        ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1997
        /* Note we advance frametailpos even if the fetch fails */
1998
0
        winstate->frametailpos++;
1999
0
        spool_tuples(winstate, winstate->frametailpos);
2000
0
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2001
0
                       winstate->frametail_slot))
2002
0
          break;   /* end of partition */
2003
0
        if (!are_peers(winstate, winstate->temp_slot_2,
2004
0
                 winstate->frametail_slot))
2005
0
          winstate->frametailgroup++;
2006
0
      }
2007
0
      ExecClearTuple(winstate->temp_slot_2);
2008
0
      winstate->frametail_valid = true;
2009
0
    }
2010
0
    else
2011
0
      Assert(false);
2012
0
  }
2013
0
  else
2014
0
    Assert(false);
2015
2016
0
  MemoryContextSwitchTo(oldcontext);
2017
0
}
2018
2019
/*
2020
 * update_grouptailpos
2021
 * make grouptailpos valid for the current row
2022
 *
2023
 * May clobber winstate->temp_slot_2.
2024
 */
2025
static void
2026
update_grouptailpos(WindowAggState *winstate)
2027
0
{
2028
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
2029
0
  MemoryContext oldcontext;
2030
2031
0
  if (winstate->grouptail_valid)
2032
0
    return;         /* already known for current row */
2033
2034
  /* We may be called in a short-lived context */
2035
0
  oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2036
2037
  /* If no ORDER BY, all rows are peers with each other */
2038
0
  if (node->ordNumCols == 0)
2039
0
  {
2040
0
    spool_tuples(winstate, -1);
2041
0
    winstate->grouptailpos = winstate->spooled_rows;
2042
0
    winstate->grouptail_valid = true;
2043
0
    MemoryContextSwitchTo(oldcontext);
2044
0
    return;
2045
0
  }
2046
2047
  /*
2048
   * Because grouptail_valid is reset only when current row advances into a
2049
   * new peer group, we always reach here knowing that grouptailpos needs to
2050
   * be advanced by at least one row.  Hence, unlike the otherwise similar
2051
   * case for frame tail tracking, we do not need persistent storage of the
2052
   * group tail row.
2053
   */
2054
0
  Assert(winstate->grouptailpos <= winstate->currentpos);
2055
0
  tuplestore_select_read_pointer(winstate->buffer,
2056
0
                   winstate->grouptail_ptr);
2057
0
  for (;;)
2058
0
  {
2059
    /* Note we advance grouptailpos even if the fetch fails */
2060
0
    winstate->grouptailpos++;
2061
0
    spool_tuples(winstate, winstate->grouptailpos);
2062
0
    if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2063
0
                   winstate->temp_slot_2))
2064
0
      break;       /* end of partition */
2065
0
    if (winstate->grouptailpos > winstate->currentpos &&
2066
0
      !are_peers(winstate, winstate->temp_slot_2,
2067
0
             winstate->ss.ss_ScanTupleSlot))
2068
0
      break;       /* this row is the group tail */
2069
0
  }
2070
0
  ExecClearTuple(winstate->temp_slot_2);
2071
0
  winstate->grouptail_valid = true;
2072
2073
0
  MemoryContextSwitchTo(oldcontext);
2074
0
}
2075
2076
/*
2077
 * calculate_frame_offsets
2078
 *    Determine the startOffsetValue and endOffsetValue values for the
2079
 *    WindowAgg's frame options.
2080
 */
2081
static pg_noinline void
2082
calculate_frame_offsets(PlanState *pstate)
2083
0
{
2084
0
  WindowAggState *winstate = castNode(WindowAggState, pstate);
2085
0
  ExprContext *econtext;
2086
0
  int     frameOptions = winstate->frameOptions;
2087
0
  Datum   value;
2088
0
  bool    isnull;
2089
0
  int16   len;
2090
0
  bool    byval;
2091
2092
  /* Ensure we've not been called before for this scan */
2093
0
  Assert(winstate->all_first);
2094
2095
0
  econtext = winstate->ss.ps.ps_ExprContext;
2096
2097
0
  if (frameOptions & FRAMEOPTION_START_OFFSET)
2098
0
  {
2099
0
    Assert(winstate->startOffset != NULL);
2100
0
    value = ExecEvalExprSwitchContext(winstate->startOffset,
2101
0
                      econtext,
2102
0
                      &isnull);
2103
0
    if (isnull)
2104
0
      ereport(ERROR,
2105
0
          (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2106
0
           errmsg("frame starting offset must not be null")));
2107
    /* copy value into query-lifespan context */
2108
0
    get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2109
0
            &len,
2110
0
            &byval);
2111
0
    winstate->startOffsetValue = datumCopy(value, byval, len);
2112
0
    if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2113
0
    {
2114
      /* value is known to be int8 */
2115
0
      int64   offset = DatumGetInt64(value);
2116
2117
0
      if (offset < 0)
2118
0
        ereport(ERROR,
2119
0
            (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2120
0
             errmsg("frame starting offset must not be negative")));
2121
0
    }
2122
0
  }
2123
2124
0
  if (frameOptions & FRAMEOPTION_END_OFFSET)
2125
0
  {
2126
0
    Assert(winstate->endOffset != NULL);
2127
0
    value = ExecEvalExprSwitchContext(winstate->endOffset,
2128
0
                      econtext,
2129
0
                      &isnull);
2130
0
    if (isnull)
2131
0
      ereport(ERROR,
2132
0
          (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2133
0
           errmsg("frame ending offset must not be null")));
2134
    /* copy value into query-lifespan context */
2135
0
    get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2136
0
            &len,
2137
0
            &byval);
2138
0
    winstate->endOffsetValue = datumCopy(value, byval, len);
2139
0
    if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2140
0
    {
2141
      /* value is known to be int8 */
2142
0
      int64   offset = DatumGetInt64(value);
2143
2144
0
      if (offset < 0)
2145
0
        ereport(ERROR,
2146
0
            (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2147
0
             errmsg("frame ending offset must not be negative")));
2148
0
    }
2149
0
  }
2150
0
  winstate->all_first = false;
2151
0
}
2152
2153
/* -----------------
2154
 * ExecWindowAgg
2155
 *
2156
 *  ExecWindowAgg receives tuples from its outer subplan and
2157
 *  stores them into a tuplestore, then processes window functions.
2158
 *  This node doesn't reduce nor qualify any row so the number of
2159
 *  returned rows is exactly the same as its outer subplan's result.
2160
 * -----------------
2161
 */
2162
static TupleTableSlot *
2163
ExecWindowAgg(PlanState *pstate)
2164
0
{
2165
0
  WindowAggState *winstate = castNode(WindowAggState, pstate);
2166
0
  TupleTableSlot *slot;
2167
0
  ExprContext *econtext;
2168
0
  int     i;
2169
0
  int     numfuncs;
2170
2171
0
  CHECK_FOR_INTERRUPTS();
2172
2173
0
  if (winstate->status == WINDOWAGG_DONE)
2174
0
    return NULL;
2175
2176
  /*
2177
   * Compute frame offset values, if any, during first call (or after a
2178
   * rescan).  These are assumed to hold constant throughout the scan; if
2179
   * user gives us a volatile expression, we'll only use its initial value.
2180
   */
2181
0
  if (unlikely(winstate->all_first))
2182
0
    calculate_frame_offsets(pstate);
2183
2184
  /* We need to loop as the runCondition or qual may filter out tuples */
2185
0
  for (;;)
2186
0
  {
2187
0
    if (winstate->next_partition)
2188
0
    {
2189
      /* Initialize for first partition and set current row = 0 */
2190
0
      begin_partition(winstate);
2191
      /* If there are no input rows, we'll detect that and exit below */
2192
0
    }
2193
0
    else
2194
0
    {
2195
      /* Advance current row within partition */
2196
0
      winstate->currentpos++;
2197
      /* This might mean that the frame moves, too */
2198
0
      winstate->framehead_valid = false;
2199
0
      winstate->frametail_valid = false;
2200
      /* we don't need to invalidate grouptail here; see below */
2201
0
    }
2202
2203
    /*
2204
     * Spool all tuples up to and including the current row, if we haven't
2205
     * already
2206
     */
2207
0
    spool_tuples(winstate, winstate->currentpos);
2208
2209
    /* Move to the next partition if we reached the end of this partition */
2210
0
    if (winstate->partition_spooled &&
2211
0
      winstate->currentpos >= winstate->spooled_rows)
2212
0
    {
2213
0
      release_partition(winstate);
2214
2215
0
      if (winstate->more_partitions)
2216
0
      {
2217
0
        begin_partition(winstate);
2218
0
        Assert(winstate->spooled_rows > 0);
2219
2220
        /* Come out of pass-through mode when changing partition */
2221
0
        winstate->status = WINDOWAGG_RUN;
2222
0
      }
2223
0
      else
2224
0
      {
2225
        /* No further partitions?  We're done */
2226
0
        winstate->status = WINDOWAGG_DONE;
2227
0
        return NULL;
2228
0
      }
2229
0
    }
2230
2231
    /* final output execution is in ps_ExprContext */
2232
0
    econtext = winstate->ss.ps.ps_ExprContext;
2233
2234
    /* Clear the per-output-tuple context for current row */
2235
0
    ResetExprContext(econtext);
2236
2237
    /*
2238
     * Read the current row from the tuplestore, and save in
2239
     * ScanTupleSlot. (We can't rely on the outerplan's output slot
2240
     * because we may have to read beyond the current row.  Also, we have
2241
     * to actually copy the row out of the tuplestore, since window
2242
     * function evaluation might cause the tuplestore to dump its state to
2243
     * disk.)
2244
     *
2245
     * In GROUPS mode, or when tracking a group-oriented exclusion clause,
2246
     * we must also detect entering a new peer group and update associated
2247
     * state when that happens.  We use temp_slot_2 to temporarily hold
2248
     * the previous row for this purpose.
2249
     *
2250
     * Current row must be in the tuplestore, since we spooled it above.
2251
     */
2252
0
    tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2253
0
    if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2254
0
                     FRAMEOPTION_EXCLUDE_GROUP |
2255
0
                     FRAMEOPTION_EXCLUDE_TIES)) &&
2256
0
      winstate->currentpos > 0)
2257
0
    {
2258
0
      ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2259
0
      if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2260
0
                     winstate->ss.ss_ScanTupleSlot))
2261
0
        elog(ERROR, "unexpected end of tuplestore");
2262
0
      if (!are_peers(winstate, winstate->temp_slot_2,
2263
0
               winstate->ss.ss_ScanTupleSlot))
2264
0
      {
2265
0
        winstate->currentgroup++;
2266
0
        winstate->groupheadpos = winstate->currentpos;
2267
0
        winstate->grouptail_valid = false;
2268
0
      }
2269
0
      ExecClearTuple(winstate->temp_slot_2);
2270
0
    }
2271
0
    else
2272
0
    {
2273
0
      if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2274
0
                     winstate->ss.ss_ScanTupleSlot))
2275
0
        elog(ERROR, "unexpected end of tuplestore");
2276
0
    }
2277
2278
    /* don't evaluate the window functions when we're in pass-through mode */
2279
0
    if (winstate->status == WINDOWAGG_RUN)
2280
0
    {
2281
      /*
2282
       * Evaluate true window functions
2283
       */
2284
0
      numfuncs = winstate->numfuncs;
2285
0
      for (i = 0; i < numfuncs; i++)
2286
0
      {
2287
0
        WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2288
2289
0
        if (perfuncstate->plain_agg)
2290
0
          continue;
2291
0
        eval_windowfunction(winstate, perfuncstate,
2292
0
                  &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2293
0
                  &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2294
0
      }
2295
2296
      /*
2297
       * Evaluate aggregates
2298
       */
2299
0
      if (winstate->numaggs > 0)
2300
0
        eval_windowaggregates(winstate);
2301
0
    }
2302
2303
    /*
2304
     * If we have created auxiliary read pointers for the frame or group
2305
     * boundaries, force them to be kept up-to-date, because we don't know
2306
     * whether the window function(s) will do anything that requires that.
2307
     * Failing to advance the pointers would result in being unable to
2308
     * trim data from the tuplestore, which is bad.  (If we could know in
2309
     * advance whether the window functions will use frame boundary info,
2310
     * we could skip creating these pointers in the first place ... but
2311
     * unfortunately the window function API doesn't require that.)
2312
     */
2313
0
    if (winstate->framehead_ptr >= 0)
2314
0
      update_frameheadpos(winstate);
2315
0
    if (winstate->frametail_ptr >= 0)
2316
0
      update_frametailpos(winstate);
2317
0
    if (winstate->grouptail_ptr >= 0)
2318
0
      update_grouptailpos(winstate);
2319
2320
    /*
2321
     * Truncate any no-longer-needed rows from the tuplestore.
2322
     */
2323
0
    tuplestore_trim(winstate->buffer);
2324
2325
    /*
2326
     * Form and return a projection tuple using the windowfunc results and
2327
     * the current row.  Setting ecxt_outertuple arranges that any Vars
2328
     * will be evaluated with respect to that row.
2329
     */
2330
0
    econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2331
2332
0
    slot = ExecProject(winstate->ss.ps.ps_ProjInfo);
2333
2334
0
    if (winstate->status == WINDOWAGG_RUN)
2335
0
    {
2336
0
      econtext->ecxt_scantuple = slot;
2337
2338
      /*
2339
       * Now evaluate the run condition to see if we need to go into
2340
       * pass-through mode, or maybe stop completely.
2341
       */
2342
0
      if (!ExecQual(winstate->runcondition, econtext))
2343
0
      {
2344
        /*
2345
         * Determine which mode to move into.  If there is no
2346
         * PARTITION BY clause and we're the top-level WindowAgg then
2347
         * we're done.  This tuple and any future tuples cannot
2348
         * possibly match the runcondition.  However, when there is a
2349
         * PARTITION BY clause or we're not the top-level window we
2350
         * can't just stop as we need to either process other
2351
         * partitions or ensure WindowAgg nodes above us receive all
2352
         * of the tuples they need to process their WindowFuncs.
2353
         */
2354
0
        if (winstate->use_pass_through)
2355
0
        {
2356
          /*
2357
           * When switching into a pass-through mode, we'd better
2358
           * NULLify the aggregate results as these are no longer
2359
           * updated and NULLifying them avoids the old stale
2360
           * results lingering.  Some of these might be byref types
2361
           * so we can't have them pointing to free'd memory.  The
2362
           * planner insisted that quals used in the runcondition
2363
           * are strict, so the top-level WindowAgg will always
2364
           * filter these NULLs out in the filter clause.
2365
           */
2366
0
          numfuncs = winstate->numfuncs;
2367
0
          for (i = 0; i < numfuncs; i++)
2368
0
          {
2369
0
            econtext->ecxt_aggvalues[i] = (Datum) 0;
2370
0
            econtext->ecxt_aggnulls[i] = true;
2371
0
          }
2372
2373
          /*
2374
           * STRICT pass-through mode is required for the top window
2375
           * when there is a PARTITION BY clause.  Otherwise we must
2376
           * ensure we store tuples that don't match the
2377
           * runcondition so they're available to WindowAggs above.
2378
           */
2379
0
          if (winstate->top_window)
2380
0
          {
2381
0
            winstate->status = WINDOWAGG_PASSTHROUGH_STRICT;
2382
0
            continue;
2383
0
          }
2384
0
          else
2385
0
          {
2386
0
            winstate->status = WINDOWAGG_PASSTHROUGH;
2387
0
          }
2388
0
        }
2389
0
        else
2390
0
        {
2391
          /*
2392
           * Pass-through not required.  We can just return NULL.
2393
           * Nothing else will match the runcondition.
2394
           */
2395
0
          winstate->status = WINDOWAGG_DONE;
2396
0
          return NULL;
2397
0
        }
2398
0
      }
2399
2400
      /*
2401
       * Filter out any tuples we don't need in the top-level WindowAgg.
2402
       */
2403
0
      if (!ExecQual(winstate->ss.ps.qual, econtext))
2404
0
      {
2405
0
        InstrCountFiltered1(winstate, 1);
2406
0
        continue;
2407
0
      }
2408
2409
0
      break;
2410
0
    }
2411
2412
    /*
2413
     * When not in WINDOWAGG_RUN mode, we must still return this tuple if
2414
     * we're anything apart from the top window.
2415
     */
2416
0
    else if (!winstate->top_window)
2417
0
      break;
2418
0
  }
2419
2420
0
  return slot;
2421
0
}
2422
2423
/* -----------------
2424
 * ExecInitWindowAgg
2425
 *
2426
 *  Creates the run-time information for the WindowAgg node produced by the
2427
 *  planner and initializes its outer subtree
2428
 * -----------------
2429
 */
2430
WindowAggState *
2431
ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2432
0
{
2433
0
  WindowAggState *winstate;
2434
0
  Plan     *outerPlan;
2435
0
  ExprContext *econtext;
2436
0
  ExprContext *tmpcontext;
2437
0
  WindowStatePerFunc perfunc;
2438
0
  WindowStatePerAgg peragg;
2439
0
  int     frameOptions = node->frameOptions;
2440
0
  int     numfuncs,
2441
0
        wfuncno,
2442
0
        numaggs,
2443
0
        aggno;
2444
0
  TupleDesc scanDesc;
2445
0
  ListCell   *l;
2446
2447
  /* check for unsupported flags */
2448
0
  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2449
2450
  /*
2451
   * create state structure
2452
   */
2453
0
  winstate = makeNode(WindowAggState);
2454
0
  winstate->ss.ps.plan = (Plan *) node;
2455
0
  winstate->ss.ps.state = estate;
2456
0
  winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2457
2458
  /* copy frame options to state node for easy access */
2459
0
  winstate->frameOptions = frameOptions;
2460
2461
  /*
2462
   * Create expression contexts.  We need two, one for per-input-tuple
2463
   * processing and one for per-output-tuple processing.  We cheat a little
2464
   * by using ExecAssignExprContext() to build both.
2465
   */
2466
0
  ExecAssignExprContext(estate, &winstate->ss.ps);
2467
0
  tmpcontext = winstate->ss.ps.ps_ExprContext;
2468
0
  winstate->tmpcontext = tmpcontext;
2469
0
  ExecAssignExprContext(estate, &winstate->ss.ps);
2470
2471
  /* Create long-lived context for storage of partition-local memory etc */
2472
0
  winstate->partcontext =
2473
0
    AllocSetContextCreate(CurrentMemoryContext,
2474
0
                "WindowAgg Partition",
2475
0
                ALLOCSET_DEFAULT_SIZES);
2476
2477
  /*
2478
   * Create mid-lived context for aggregate trans values etc.
2479
   *
2480
   * Note that moving aggregates each use their own private context, not
2481
   * this one.
2482
   */
2483
0
  winstate->aggcontext =
2484
0
    AllocSetContextCreate(CurrentMemoryContext,
2485
0
                "WindowAgg Aggregates",
2486
0
                ALLOCSET_DEFAULT_SIZES);
2487
2488
  /* Only the top-level WindowAgg may have a qual */
2489
0
  Assert(node->plan.qual == NIL || node->topWindow);
2490
2491
  /* Initialize the qual */
2492
0
  winstate->ss.ps.qual = ExecInitQual(node->plan.qual,
2493
0
                    (PlanState *) winstate);
2494
2495
  /*
2496
   * Setup the run condition, if we received one from the query planner.
2497
   * When set, this may allow us to move into pass-through mode so that we
2498
   * don't have to perform any further evaluation of WindowFuncs in the
2499
   * current partition or possibly stop returning tuples altogether when all
2500
   * tuples are in the same partition.
2501
   */
2502
0
  winstate->runcondition = ExecInitQual(node->runCondition,
2503
0
                      (PlanState *) winstate);
2504
2505
  /*
2506
   * When we're not the top-level WindowAgg node or we are but have a
2507
   * PARTITION BY clause we must move into one of the WINDOWAGG_PASSTHROUGH*
2508
   * modes when the runCondition becomes false.
2509
   */
2510
0
  winstate->use_pass_through = !node->topWindow || node->partNumCols > 0;
2511
2512
  /* remember if we're the top-window or we are below the top-window */
2513
0
  winstate->top_window = node->topWindow;
2514
2515
  /*
2516
   * initialize child nodes
2517
   */
2518
0
  outerPlan = outerPlan(node);
2519
0
  outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2520
2521
  /*
2522
   * initialize source tuple type (which is also the tuple type that we'll
2523
   * store in the tuplestore and use in all our working slots).
2524
   */
2525
0
  ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2526
0
  scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2527
2528
  /* the outer tuple isn't the child's tuple, but always a minimal tuple */
2529
0
  winstate->ss.ps.outeropsset = true;
2530
0
  winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2531
0
  winstate->ss.ps.outeropsfixed = true;
2532
2533
  /*
2534
   * tuple table initialization
2535
   */
2536
0
  winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2537
0
                             &TTSOpsMinimalTuple);
2538
0
  winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2539
0
                          &TTSOpsMinimalTuple);
2540
0
  winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2541
0
                           &TTSOpsMinimalTuple);
2542
0
  winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2543
0
                           &TTSOpsMinimalTuple);
2544
2545
  /*
2546
   * create frame head and tail slots only if needed (must create slots in
2547
   * exactly the same cases that update_frameheadpos and update_frametailpos
2548
   * need them)
2549
   */
2550
0
  winstate->framehead_slot = winstate->frametail_slot = NULL;
2551
2552
0
  if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2553
0
  {
2554
0
    if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2555
0
       node->ordNumCols != 0) ||
2556
0
      (frameOptions & FRAMEOPTION_START_OFFSET))
2557
0
      winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2558
0
                                &TTSOpsMinimalTuple);
2559
0
    if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2560
0
       node->ordNumCols != 0) ||
2561
0
      (frameOptions & FRAMEOPTION_END_OFFSET))
2562
0
      winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2563
0
                                &TTSOpsMinimalTuple);
2564
0
  }
2565
2566
  /*
2567
   * Initialize result slot, type and projection.
2568
   */
2569
0
  ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2570
0
  ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2571
2572
  /* Set up data for comparing tuples */
2573
0
  if (node->partNumCols > 0)
2574
0
    winstate->partEqfunction =
2575
0
      execTuplesMatchPrepare(scanDesc,
2576
0
                   node->partNumCols,
2577
0
                   node->partColIdx,
2578
0
                   node->partOperators,
2579
0
                   node->partCollations,
2580
0
                   &winstate->ss.ps);
2581
2582
0
  if (node->ordNumCols > 0)
2583
0
    winstate->ordEqfunction =
2584
0
      execTuplesMatchPrepare(scanDesc,
2585
0
                   node->ordNumCols,
2586
0
                   node->ordColIdx,
2587
0
                   node->ordOperators,
2588
0
                   node->ordCollations,
2589
0
                   &winstate->ss.ps);
2590
2591
  /*
2592
   * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2593
   */
2594
0
  numfuncs = winstate->numfuncs;
2595
0
  numaggs = winstate->numaggs;
2596
0
  econtext = winstate->ss.ps.ps_ExprContext;
2597
0
  econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2598
0
  econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2599
2600
  /*
2601
   * allocate per-wfunc/per-agg state information.
2602
   */
2603
0
  perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2604
0
  peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2605
0
  winstate->perfunc = perfunc;
2606
0
  winstate->peragg = peragg;
2607
2608
0
  wfuncno = -1;
2609
0
  aggno = -1;
2610
0
  foreach(l, winstate->funcs)
2611
0
  {
2612
0
    WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2613
0
    WindowFunc *wfunc = wfuncstate->wfunc;
2614
0
    WindowStatePerFunc perfuncstate;
2615
0
    AclResult aclresult;
2616
0
    int     i;
2617
2618
0
    if (wfunc->winref != node->winref) /* planner screwed up? */
2619
0
      elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2620
0
         wfunc->winref, node->winref);
2621
2622
    /* Look for a previous duplicate window function */
2623
0
    for (i = 0; i <= wfuncno; i++)
2624
0
    {
2625
0
      if (equal(wfunc, perfunc[i].wfunc) &&
2626
0
        !contain_volatile_functions((Node *) wfunc))
2627
0
        break;
2628
0
    }
2629
0
    if (i <= wfuncno)
2630
0
    {
2631
      /* Found a match to an existing entry, so just mark it */
2632
0
      wfuncstate->wfuncno = i;
2633
0
      continue;
2634
0
    }
2635
2636
    /* Nope, so assign a new PerAgg record */
2637
0
    perfuncstate = &perfunc[++wfuncno];
2638
2639
    /* Mark WindowFunc state node with assigned index in the result array */
2640
0
    wfuncstate->wfuncno = wfuncno;
2641
2642
    /* Check permission to call window function */
2643
0
    aclresult = object_aclcheck(ProcedureRelationId, wfunc->winfnoid, GetUserId(),
2644
0
                  ACL_EXECUTE);
2645
0
    if (aclresult != ACLCHECK_OK)
2646
0
      aclcheck_error(aclresult, OBJECT_FUNCTION,
2647
0
               get_func_name(wfunc->winfnoid));
2648
0
    InvokeFunctionExecuteHook(wfunc->winfnoid);
2649
2650
    /* Fill in the perfuncstate data */
2651
0
    perfuncstate->wfuncstate = wfuncstate;
2652
0
    perfuncstate->wfunc = wfunc;
2653
0
    perfuncstate->numArguments = list_length(wfuncstate->args);
2654
0
    perfuncstate->winCollation = wfunc->inputcollid;
2655
2656
0
    get_typlenbyval(wfunc->wintype,
2657
0
            &perfuncstate->resulttypeLen,
2658
0
            &perfuncstate->resulttypeByVal);
2659
2660
    /*
2661
     * If it's really just a plain aggregate function, we'll emulate the
2662
     * Agg environment for it.
2663
     */
2664
0
    perfuncstate->plain_agg = wfunc->winagg;
2665
0
    if (wfunc->winagg)
2666
0
    {
2667
0
      WindowStatePerAgg peraggstate;
2668
2669
0
      perfuncstate->aggno = ++aggno;
2670
0
      peraggstate = &winstate->peragg[aggno];
2671
0
      initialize_peragg(winstate, wfunc, peraggstate);
2672
0
      peraggstate->wfuncno = wfuncno;
2673
0
    }
2674
0
    else
2675
0
    {
2676
0
      WindowObject winobj = makeNode(WindowObjectData);
2677
2678
0
      winobj->winstate = winstate;
2679
0
      winobj->argstates = wfuncstate->args;
2680
0
      winobj->localmem = NULL;
2681
0
      perfuncstate->winobj = winobj;
2682
2683
      /* It's a real window function, so set up to call it. */
2684
0
      fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2685
0
              econtext->ecxt_per_query_memory);
2686
0
      fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2687
0
    }
2688
0
  }
2689
2690
  /* Update numfuncs, numaggs to match number of unique functions found */
2691
0
  winstate->numfuncs = wfuncno + 1;
2692
0
  winstate->numaggs = aggno + 1;
2693
2694
  /* Set up WindowObject for aggregates, if needed */
2695
0
  if (winstate->numaggs > 0)
2696
0
  {
2697
0
    WindowObject agg_winobj = makeNode(WindowObjectData);
2698
2699
0
    agg_winobj->winstate = winstate;
2700
0
    agg_winobj->argstates = NIL;
2701
0
    agg_winobj->localmem = NULL;
2702
    /* make sure markptr = -1 to invalidate. It may not get used */
2703
0
    agg_winobj->markptr = -1;
2704
0
    agg_winobj->readptr = -1;
2705
0
    winstate->agg_winobj = agg_winobj;
2706
0
  }
2707
2708
  /* Set the status to running */
2709
0
  winstate->status = WINDOWAGG_RUN;
2710
2711
  /* initialize frame bound offset expressions */
2712
0
  winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2713
0
                     (PlanState *) winstate);
2714
0
  winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2715
0
                     (PlanState *) winstate);
2716
2717
  /* Lookup in_range support functions if needed */
2718
0
  if (OidIsValid(node->startInRangeFunc))
2719
0
    fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2720
0
  if (OidIsValid(node->endInRangeFunc))
2721
0
    fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2722
0
  winstate->inRangeColl = node->inRangeColl;
2723
0
  winstate->inRangeAsc = node->inRangeAsc;
2724
0
  winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2725
2726
0
  winstate->all_first = true;
2727
0
  winstate->partition_spooled = false;
2728
0
  winstate->more_partitions = false;
2729
0
  winstate->next_partition = true;
2730
2731
0
  return winstate;
2732
0
}
2733
2734
/* -----------------
2735
 * ExecEndWindowAgg
2736
 * -----------------
2737
 */
2738
void
2739
ExecEndWindowAgg(WindowAggState *node)
2740
0
{
2741
0
  PlanState  *outerPlan;
2742
0
  int     i;
2743
2744
0
  if (node->buffer != NULL)
2745
0
  {
2746
0
    tuplestore_end(node->buffer);
2747
2748
    /* nullify so that release_partition skips the tuplestore_clear() */
2749
0
    node->buffer = NULL;
2750
0
  }
2751
2752
0
  release_partition(node);
2753
2754
0
  for (i = 0; i < node->numaggs; i++)
2755
0
  {
2756
0
    if (node->peragg[i].aggcontext != node->aggcontext)
2757
0
      MemoryContextDelete(node->peragg[i].aggcontext);
2758
0
  }
2759
0
  MemoryContextDelete(node->partcontext);
2760
0
  MemoryContextDelete(node->aggcontext);
2761
2762
0
  pfree(node->perfunc);
2763
0
  pfree(node->peragg);
2764
2765
0
  outerPlan = outerPlanState(node);
2766
0
  ExecEndNode(outerPlan);
2767
0
}
2768
2769
/* -----------------
2770
 * ExecReScanWindowAgg
2771
 * -----------------
2772
 */
2773
void
2774
ExecReScanWindowAgg(WindowAggState *node)
2775
0
{
2776
0
  PlanState  *outerPlan = outerPlanState(node);
2777
0
  ExprContext *econtext = node->ss.ps.ps_ExprContext;
2778
2779
0
  node->status = WINDOWAGG_RUN;
2780
0
  node->all_first = true;
2781
2782
  /* release tuplestore et al */
2783
0
  release_partition(node);
2784
2785
  /* release all temp tuples, but especially first_part_slot */
2786
0
  ExecClearTuple(node->ss.ss_ScanTupleSlot);
2787
0
  ExecClearTuple(node->first_part_slot);
2788
0
  ExecClearTuple(node->agg_row_slot);
2789
0
  ExecClearTuple(node->temp_slot_1);
2790
0
  ExecClearTuple(node->temp_slot_2);
2791
0
  if (node->framehead_slot)
2792
0
    ExecClearTuple(node->framehead_slot);
2793
0
  if (node->frametail_slot)
2794
0
    ExecClearTuple(node->frametail_slot);
2795
2796
  /* Forget current wfunc values */
2797
0
  MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2798
0
  MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2799
2800
  /*
2801
   * if chgParam of subnode is not null then plan will be re-scanned by
2802
   * first ExecProcNode.
2803
   */
2804
0
  if (outerPlan->chgParam == NULL)
2805
0
    ExecReScan(outerPlan);
2806
0
}
2807
2808
/*
2809
 * initialize_peragg
2810
 *
2811
 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2812
 */
2813
static WindowStatePerAggData *
2814
initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2815
          WindowStatePerAgg peraggstate)
2816
0
{
2817
0
  Oid     inputTypes[FUNC_MAX_ARGS];
2818
0
  int     numArguments;
2819
0
  HeapTuple aggTuple;
2820
0
  Form_pg_aggregate aggform;
2821
0
  Oid     aggtranstype;
2822
0
  AttrNumber  initvalAttNo;
2823
0
  AclResult aclresult;
2824
0
  bool    use_ma_code;
2825
0
  Oid     transfn_oid,
2826
0
        invtransfn_oid,
2827
0
        finalfn_oid;
2828
0
  bool    finalextra;
2829
0
  char    finalmodify;
2830
0
  Expr     *transfnexpr,
2831
0
         *invtransfnexpr,
2832
0
         *finalfnexpr;
2833
0
  Datum   textInitVal;
2834
0
  int     i;
2835
0
  ListCell   *lc;
2836
2837
0
  numArguments = list_length(wfunc->args);
2838
2839
0
  i = 0;
2840
0
  foreach(lc, wfunc->args)
2841
0
  {
2842
0
    inputTypes[i++] = exprType((Node *) lfirst(lc));
2843
0
  }
2844
2845
0
  aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2846
0
  if (!HeapTupleIsValid(aggTuple))
2847
0
    elog(ERROR, "cache lookup failed for aggregate %u",
2848
0
       wfunc->winfnoid);
2849
0
  aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2850
2851
  /*
2852
   * Figure out whether we want to use the moving-aggregate implementation,
2853
   * and collect the right set of fields from the pg_aggregate entry.
2854
   *
2855
   * It's possible that an aggregate would supply a safe moving-aggregate
2856
   * implementation and an unsafe normal one, in which case our hand is
2857
   * forced.  Otherwise, if the frame head can't move, we don't need
2858
   * moving-aggregate code.  Even if we'd like to use it, don't do so if the
2859
   * aggregate's arguments (and FILTER clause if any) contain any calls to
2860
   * volatile functions.  Otherwise, the difference between restarting and
2861
   * not restarting the aggregation would be user-visible.
2862
   *
2863
   * We also don't risk using moving aggregates when there are subplans in
2864
   * the arguments or FILTER clause.  This is partly because
2865
   * contain_volatile_functions() doesn't look inside subplans; but there
2866
   * are other reasons why a subplan's output might be volatile.  For
2867
   * example, syncscan mode can render the results nonrepeatable.
2868
   */
2869
0
  if (!OidIsValid(aggform->aggminvtransfn))
2870
0
    use_ma_code = false; /* sine qua non */
2871
0
  else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2872
0
       aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2873
0
    use_ma_code = true;   /* decision forced by safety */
2874
0
  else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2875
0
    use_ma_code = false; /* non-moving frame head */
2876
0
  else if (contain_volatile_functions((Node *) wfunc))
2877
0
    use_ma_code = false; /* avoid possible behavioral change */
2878
0
  else if (contain_subplans((Node *) wfunc))
2879
0
    use_ma_code = false; /* subplans might contain volatile functions */
2880
0
  else
2881
0
    use_ma_code = true;   /* yes, let's use it */
2882
0
  if (use_ma_code)
2883
0
  {
2884
0
    peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2885
0
    peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2886
0
    peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2887
0
    finalextra = aggform->aggmfinalextra;
2888
0
    finalmodify = aggform->aggmfinalmodify;
2889
0
    aggtranstype = aggform->aggmtranstype;
2890
0
    initvalAttNo = Anum_pg_aggregate_aggminitval;
2891
0
  }
2892
0
  else
2893
0
  {
2894
0
    peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2895
0
    peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2896
0
    peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2897
0
    finalextra = aggform->aggfinalextra;
2898
0
    finalmodify = aggform->aggfinalmodify;
2899
0
    aggtranstype = aggform->aggtranstype;
2900
0
    initvalAttNo = Anum_pg_aggregate_agginitval;
2901
0
  }
2902
2903
  /*
2904
   * ExecInitWindowAgg already checked permission to call aggregate function
2905
   * ... but we still need to check the component functions
2906
   */
2907
2908
  /* Check that aggregate owner has permission to call component fns */
2909
0
  {
2910
0
    HeapTuple procTuple;
2911
0
    Oid     aggOwner;
2912
2913
0
    procTuple = SearchSysCache1(PROCOID,
2914
0
                  ObjectIdGetDatum(wfunc->winfnoid));
2915
0
    if (!HeapTupleIsValid(procTuple))
2916
0
      elog(ERROR, "cache lookup failed for function %u",
2917
0
         wfunc->winfnoid);
2918
0
    aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2919
0
    ReleaseSysCache(procTuple);
2920
2921
0
    aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner,
2922
0
                  ACL_EXECUTE);
2923
0
    if (aclresult != ACLCHECK_OK)
2924
0
      aclcheck_error(aclresult, OBJECT_FUNCTION,
2925
0
               get_func_name(transfn_oid));
2926
0
    InvokeFunctionExecuteHook(transfn_oid);
2927
2928
0
    if (OidIsValid(invtransfn_oid))
2929
0
    {
2930
0
      aclresult = object_aclcheck(ProcedureRelationId, invtransfn_oid, aggOwner,
2931
0
                    ACL_EXECUTE);
2932
0
      if (aclresult != ACLCHECK_OK)
2933
0
        aclcheck_error(aclresult, OBJECT_FUNCTION,
2934
0
                 get_func_name(invtransfn_oid));
2935
0
      InvokeFunctionExecuteHook(invtransfn_oid);
2936
0
    }
2937
2938
0
    if (OidIsValid(finalfn_oid))
2939
0
    {
2940
0
      aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,
2941
0
                    ACL_EXECUTE);
2942
0
      if (aclresult != ACLCHECK_OK)
2943
0
        aclcheck_error(aclresult, OBJECT_FUNCTION,
2944
0
                 get_func_name(finalfn_oid));
2945
0
      InvokeFunctionExecuteHook(finalfn_oid);
2946
0
    }
2947
0
  }
2948
2949
  /*
2950
   * If the selected finalfn isn't read-only, we can't run this aggregate as
2951
   * a window function.  This is a user-facing error, so we take a bit more
2952
   * care with the error message than elsewhere in this function.
2953
   */
2954
0
  if (finalmodify != AGGMODIFY_READ_ONLY)
2955
0
    ereport(ERROR,
2956
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2957
0
         errmsg("aggregate function %s does not support use as a window function",
2958
0
            format_procedure(wfunc->winfnoid))));
2959
2960
  /* Detect how many arguments to pass to the finalfn */
2961
0
  if (finalextra)
2962
0
    peraggstate->numFinalArgs = numArguments + 1;
2963
0
  else
2964
0
    peraggstate->numFinalArgs = 1;
2965
2966
  /* resolve actual type of transition state, if polymorphic */
2967
0
  aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2968
0
                         aggtranstype,
2969
0
                         inputTypes,
2970
0
                         numArguments);
2971
2972
  /* build expression trees using actual argument & result types */
2973
0
  build_aggregate_transfn_expr(inputTypes,
2974
0
                 numArguments,
2975
0
                 0, /* no ordered-set window functions yet */
2976
0
                 false, /* no variadic window functions yet */
2977
0
                 aggtranstype,
2978
0
                 wfunc->inputcollid,
2979
0
                 transfn_oid,
2980
0
                 invtransfn_oid,
2981
0
                 &transfnexpr,
2982
0
                 &invtransfnexpr);
2983
2984
  /* set up infrastructure for calling the transfn(s) and finalfn */
2985
0
  fmgr_info(transfn_oid, &peraggstate->transfn);
2986
0
  fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2987
2988
0
  if (OidIsValid(invtransfn_oid))
2989
0
  {
2990
0
    fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2991
0
    fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2992
0
  }
2993
2994
0
  if (OidIsValid(finalfn_oid))
2995
0
  {
2996
0
    build_aggregate_finalfn_expr(inputTypes,
2997
0
                   peraggstate->numFinalArgs,
2998
0
                   aggtranstype,
2999
0
                   wfunc->wintype,
3000
0
                   wfunc->inputcollid,
3001
0
                   finalfn_oid,
3002
0
                   &finalfnexpr);
3003
0
    fmgr_info(finalfn_oid, &peraggstate->finalfn);
3004
0
    fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
3005
0
  }
3006
3007
  /* get info about relevant datatypes */
3008
0
  get_typlenbyval(wfunc->wintype,
3009
0
          &peraggstate->resulttypeLen,
3010
0
          &peraggstate->resulttypeByVal);
3011
0
  get_typlenbyval(aggtranstype,
3012
0
          &peraggstate->transtypeLen,
3013
0
          &peraggstate->transtypeByVal);
3014
3015
  /*
3016
   * initval is potentially null, so don't try to access it as a struct
3017
   * field. Must do it the hard way with SysCacheGetAttr.
3018
   */
3019
0
  textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
3020
0
                  &peraggstate->initValueIsNull);
3021
3022
0
  if (peraggstate->initValueIsNull)
3023
0
    peraggstate->initValue = (Datum) 0;
3024
0
  else
3025
0
    peraggstate->initValue = GetAggInitVal(textInitVal,
3026
0
                         aggtranstype);
3027
3028
  /*
3029
   * If the transfn is strict and the initval is NULL, make sure input type
3030
   * and transtype are the same (or at least binary-compatible), so that
3031
   * it's OK to use the first input value as the initial transValue.  This
3032
   * should have been checked at agg definition time, but we must check
3033
   * again in case the transfn's strictness property has been changed.
3034
   */
3035
0
  if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
3036
0
  {
3037
0
    if (numArguments < 1 ||
3038
0
      !IsBinaryCoercible(inputTypes[0], aggtranstype))
3039
0
      ereport(ERROR,
3040
0
          (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3041
0
           errmsg("aggregate %u needs to have compatible input type and transition type",
3042
0
              wfunc->winfnoid)));
3043
0
  }
3044
3045
  /*
3046
   * Insist that forward and inverse transition functions have the same
3047
   * strictness setting.  Allowing them to differ would require handling
3048
   * more special cases in advance_windowaggregate and
3049
   * advance_windowaggregate_base, for no discernible benefit.  This should
3050
   * have been checked at agg definition time, but we must check again in
3051
   * case either function's strictness property has been changed.
3052
   */
3053
0
  if (OidIsValid(invtransfn_oid) &&
3054
0
    peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
3055
0
    ereport(ERROR,
3056
0
        (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3057
0
         errmsg("strictness of aggregate's forward and inverse transition functions must match")));
3058
3059
  /*
3060
   * Moving aggregates use their own aggcontext.
3061
   *
3062
   * This is necessary because they might restart at different times, so we
3063
   * might never be able to reset the shared context otherwise.  We can't
3064
   * make it the aggregates' responsibility to clean up after themselves,
3065
   * because strict aggregates must be restarted whenever we remove their
3066
   * last non-NULL input, which the aggregate won't be aware is happening.
3067
   * Also, just pfree()ing the transValue upon restarting wouldn't help,
3068
   * since we'd miss any indirectly referenced data.  We could, in theory,
3069
   * make the memory allocation rules for moving aggregates different than
3070
   * they have historically been for plain aggregates, but that seems grotty
3071
   * and likely to lead to memory leaks.
3072
   */
3073
0
  if (OidIsValid(invtransfn_oid))
3074
0
    peraggstate->aggcontext =
3075
0
      AllocSetContextCreate(CurrentMemoryContext,
3076
0
                  "WindowAgg Per Aggregate",
3077
0
                  ALLOCSET_DEFAULT_SIZES);
3078
0
  else
3079
0
    peraggstate->aggcontext = winstate->aggcontext;
3080
3081
0
  ReleaseSysCache(aggTuple);
3082
3083
0
  return peraggstate;
3084
0
}
3085
3086
static Datum
3087
GetAggInitVal(Datum textInitVal, Oid transtype)
3088
0
{
3089
0
  Oid     typinput,
3090
0
        typioparam;
3091
0
  char     *strInitVal;
3092
0
  Datum   initVal;
3093
3094
0
  getTypeInputInfo(transtype, &typinput, &typioparam);
3095
0
  strInitVal = TextDatumGetCString(textInitVal);
3096
0
  initVal = OidInputFunctionCall(typinput, strInitVal,
3097
0
                   typioparam, -1);
3098
0
  pfree(strInitVal);
3099
0
  return initVal;
3100
0
}
3101
3102
/*
3103
 * are_peers
3104
 * compare two rows to see if they are equal according to the ORDER BY clause
3105
 *
3106
 * NB: this does not consider the window frame mode.
3107
 */
3108
static bool
3109
are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
3110
      TupleTableSlot *slot2)
3111
0
{
3112
0
  WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
3113
0
  ExprContext *econtext = winstate->tmpcontext;
3114
3115
  /* If no ORDER BY, all rows are peers with each other */
3116
0
  if (node->ordNumCols == 0)
3117
0
    return true;
3118
3119
0
  econtext->ecxt_outertuple = slot1;
3120
0
  econtext->ecxt_innertuple = slot2;
3121
0
  return ExecQualAndReset(winstate->ordEqfunction, econtext);
3122
0
}
3123
3124
/*
3125
 * window_gettupleslot
3126
 *  Fetch the pos'th tuple of the current partition into the slot,
3127
 *  using the winobj's read pointer
3128
 *
3129
 * Returns true if successful, false if no such row
3130
 */
3131
static bool
3132
window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
3133
0
{
3134
0
  WindowAggState *winstate = winobj->winstate;
3135
0
  MemoryContext oldcontext;
3136
3137
  /* often called repeatedly in a row */
3138
0
  CHECK_FOR_INTERRUPTS();
3139
3140
  /* Don't allow passing -1 to spool_tuples here */
3141
0
  if (pos < 0)
3142
0
    return false;
3143
3144
  /* If necessary, fetch the tuple into the spool */
3145
0
  spool_tuples(winstate, pos);
3146
3147
0
  if (pos >= winstate->spooled_rows)
3148
0
    return false;
3149
3150
0
  if (pos < winobj->markpos)
3151
0
    elog(ERROR, "cannot fetch row before WindowObject's mark position");
3152
3153
0
  oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
3154
3155
0
  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3156
3157
  /*
3158
   * Advance or rewind until we are within one tuple of the one we want.
3159
   */
3160
0
  if (winobj->seekpos < pos - 1)
3161
0
  {
3162
0
    if (!tuplestore_skiptuples(winstate->buffer,
3163
0
                   pos - 1 - winobj->seekpos,
3164
0
                   true))
3165
0
      elog(ERROR, "unexpected end of tuplestore");
3166
0
    winobj->seekpos = pos - 1;
3167
0
  }
3168
0
  else if (winobj->seekpos > pos + 1)
3169
0
  {
3170
0
    if (!tuplestore_skiptuples(winstate->buffer,
3171
0
                   winobj->seekpos - (pos + 1),
3172
0
                   false))
3173
0
      elog(ERROR, "unexpected end of tuplestore");
3174
0
    winobj->seekpos = pos + 1;
3175
0
  }
3176
0
  else if (winobj->seekpos == pos)
3177
0
  {
3178
    /*
3179
     * There's no API to refetch the tuple at the current position.  We
3180
     * have to move one tuple forward, and then one backward.  (We don't
3181
     * do it the other way because we might try to fetch the row before
3182
     * our mark, which isn't allowed.)  XXX this case could stand to be
3183
     * optimized.
3184
     */
3185
0
    tuplestore_advance(winstate->buffer, true);
3186
0
    winobj->seekpos++;
3187
0
  }
3188
3189
  /*
3190
   * Now we should be on the tuple immediately before or after the one we
3191
   * want, so just fetch forwards or backwards as appropriate.
3192
   *
3193
   * Notice that we tell tuplestore_gettupleslot to make a physical copy of
3194
   * the fetched tuple.  This ensures that the slot's contents remain valid
3195
   * through manipulations of the tuplestore, which some callers depend on.
3196
   */
3197
0
  if (winobj->seekpos > pos)
3198
0
  {
3199
0
    if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
3200
0
      elog(ERROR, "unexpected end of tuplestore");
3201
0
    winobj->seekpos--;
3202
0
  }
3203
0
  else
3204
0
  {
3205
0
    if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
3206
0
      elog(ERROR, "unexpected end of tuplestore");
3207
0
    winobj->seekpos++;
3208
0
  }
3209
3210
0
  Assert(winobj->seekpos == pos);
3211
3212
0
  MemoryContextSwitchTo(oldcontext);
3213
3214
0
  return true;
3215
0
}
3216
3217
3218
/***********************************************************************
3219
 * API exposed to window functions
3220
 ***********************************************************************/
3221
3222
3223
/*
3224
 * WinGetPartitionLocalMemory
3225
 *    Get working memory that lives till end of partition processing
3226
 *
3227
 * On first call within a given partition, this allocates and zeroes the
3228
 * requested amount of space.  Subsequent calls just return the same chunk.
3229
 *
3230
 * Memory obtained this way is normally used to hold state that should be
3231
 * automatically reset for each new partition.  If a window function wants
3232
 * to hold state across the whole query, fcinfo->fn_extra can be used in the
3233
 * usual way for that.
3234
 */
3235
void *
3236
WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3237
0
{
3238
0
  Assert(WindowObjectIsValid(winobj));
3239
0
  if (winobj->localmem == NULL)
3240
0
    winobj->localmem =
3241
0
      MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3242
0
  return winobj->localmem;
3243
0
}
3244
3245
/*
3246
 * WinGetCurrentPosition
3247
 *    Return the current row's position (counting from 0) within the current
3248
 *    partition.
3249
 */
3250
int64
3251
WinGetCurrentPosition(WindowObject winobj)
3252
0
{
3253
0
  Assert(WindowObjectIsValid(winobj));
3254
0
  return winobj->winstate->currentpos;
3255
0
}
3256
3257
/*
3258
 * WinGetPartitionRowCount
3259
 *    Return total number of rows contained in the current partition.
3260
 *
3261
 * Note: this is a relatively expensive operation because it forces the
3262
 * whole partition to be "spooled" into the tuplestore at once.  Once
3263
 * executed, however, additional calls within the same partition are cheap.
3264
 */
3265
int64
3266
WinGetPartitionRowCount(WindowObject winobj)
3267
0
{
3268
0
  Assert(WindowObjectIsValid(winobj));
3269
0
  spool_tuples(winobj->winstate, -1);
3270
0
  return winobj->winstate->spooled_rows;
3271
0
}
3272
3273
/*
3274
 * WinSetMarkPosition
3275
 *    Set the "mark" position for the window object, which is the oldest row
3276
 *    number (counting from 0) it is allowed to fetch during all subsequent
3277
 *    operations within the current partition.
3278
 *
3279
 * Window functions do not have to call this, but are encouraged to move the
3280
 * mark forward when possible to keep the tuplestore size down and prevent
3281
 * having to spill rows to disk.
3282
 */
3283
void
3284
WinSetMarkPosition(WindowObject winobj, int64 markpos)
3285
0
{
3286
0
  WindowAggState *winstate;
3287
3288
0
  Assert(WindowObjectIsValid(winobj));
3289
0
  winstate = winobj->winstate;
3290
3291
0
  if (markpos < winobj->markpos)
3292
0
    elog(ERROR, "cannot move WindowObject's mark position backward");
3293
0
  tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3294
0
  if (markpos > winobj->markpos)
3295
0
  {
3296
0
    tuplestore_skiptuples(winstate->buffer,
3297
0
                markpos - winobj->markpos,
3298
0
                true);
3299
0
    winobj->markpos = markpos;
3300
0
  }
3301
0
  tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3302
0
  if (markpos > winobj->seekpos)
3303
0
  {
3304
0
    tuplestore_skiptuples(winstate->buffer,
3305
0
                markpos - winobj->seekpos,
3306
0
                true);
3307
0
    winobj->seekpos = markpos;
3308
0
  }
3309
0
}
3310
3311
/*
3312
 * WinRowsArePeers
3313
 *    Compare two rows (specified by absolute position in partition) to see
3314
 *    if they are equal according to the ORDER BY clause.
3315
 *
3316
 * NB: this does not consider the window frame mode.
3317
 */
3318
bool
3319
WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3320
0
{
3321
0
  WindowAggState *winstate;
3322
0
  WindowAgg  *node;
3323
0
  TupleTableSlot *slot1;
3324
0
  TupleTableSlot *slot2;
3325
0
  bool    res;
3326
3327
0
  Assert(WindowObjectIsValid(winobj));
3328
0
  winstate = winobj->winstate;
3329
0
  node = (WindowAgg *) winstate->ss.ps.plan;
3330
3331
  /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3332
0
  if (node->ordNumCols == 0)
3333
0
    return true;
3334
3335
  /*
3336
   * Note: OK to use temp_slot_2 here because we aren't calling any
3337
   * frame-related functions (those tend to clobber temp_slot_2).
3338
   */
3339
0
  slot1 = winstate->temp_slot_1;
3340
0
  slot2 = winstate->temp_slot_2;
3341
3342
0
  if (!window_gettupleslot(winobj, pos1, slot1))
3343
0
    elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3344
0
       pos1);
3345
0
  if (!window_gettupleslot(winobj, pos2, slot2))
3346
0
    elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3347
0
       pos2);
3348
3349
0
  res = are_peers(winstate, slot1, slot2);
3350
3351
0
  ExecClearTuple(slot1);
3352
0
  ExecClearTuple(slot2);
3353
3354
0
  return res;
3355
0
}
3356
3357
/*
3358
 * WinGetFuncArgInPartition
3359
 *    Evaluate a window function's argument expression on a specified
3360
 *    row of the partition.  The row is identified in lseek(2) style,
3361
 *    i.e. relative to the current, first, or last row.
3362
 *
3363
 * argno: argument number to evaluate (counted from 0)
3364
 * relpos: signed rowcount offset from the seek position
3365
 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3366
 * set_mark: If the row is found and set_mark is true, the mark is moved to
3367
 *    the row as a side-effect.
3368
 * isnull: output argument, receives isnull status of result
3369
 * isout: output argument, set to indicate whether target row position
3370
 *    is out of partition (can pass NULL if caller doesn't care about this)
3371
 *
3372
 * Specifying a nonexistent row is not an error, it just causes a null result
3373
 * (plus setting *isout true, if isout isn't NULL).
3374
 */
3375
Datum
3376
WinGetFuncArgInPartition(WindowObject winobj, int argno,
3377
             int relpos, int seektype, bool set_mark,
3378
             bool *isnull, bool *isout)
3379
0
{
3380
0
  WindowAggState *winstate;
3381
0
  ExprContext *econtext;
3382
0
  TupleTableSlot *slot;
3383
0
  bool    gottuple;
3384
0
  int64   abs_pos;
3385
3386
0
  Assert(WindowObjectIsValid(winobj));
3387
0
  winstate = winobj->winstate;
3388
0
  econtext = winstate->ss.ps.ps_ExprContext;
3389
0
  slot = winstate->temp_slot_1;
3390
3391
0
  switch (seektype)
3392
0
  {
3393
0
    case WINDOW_SEEK_CURRENT:
3394
0
      abs_pos = winstate->currentpos + relpos;
3395
0
      break;
3396
0
    case WINDOW_SEEK_HEAD:
3397
0
      abs_pos = relpos;
3398
0
      break;
3399
0
    case WINDOW_SEEK_TAIL:
3400
0
      spool_tuples(winstate, -1);
3401
0
      abs_pos = winstate->spooled_rows - 1 + relpos;
3402
0
      break;
3403
0
    default:
3404
0
      elog(ERROR, "unrecognized window seek type: %d", seektype);
3405
0
      abs_pos = 0;    /* keep compiler quiet */
3406
0
      break;
3407
0
  }
3408
3409
0
  gottuple = window_gettupleslot(winobj, abs_pos, slot);
3410
3411
0
  if (!gottuple)
3412
0
  {
3413
0
    if (isout)
3414
0
      *isout = true;
3415
0
    *isnull = true;
3416
0
    return (Datum) 0;
3417
0
  }
3418
0
  else
3419
0
  {
3420
0
    if (isout)
3421
0
      *isout = false;
3422
0
    if (set_mark)
3423
0
      WinSetMarkPosition(winobj, abs_pos);
3424
0
    econtext->ecxt_outertuple = slot;
3425
0
    return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3426
0
              econtext, isnull);
3427
0
  }
3428
0
}
3429
3430
/*
3431
 * WinGetFuncArgInFrame
3432
 *    Evaluate a window function's argument expression on a specified
3433
 *    row of the window frame.  The row is identified in lseek(2) style,
3434
 *    i.e. relative to the first or last row of the frame.  (We do not
3435
 *    support WINDOW_SEEK_CURRENT here, because it's not very clear what
3436
 *    that should mean if the current row isn't part of the frame.)
3437
 *
3438
 * argno: argument number to evaluate (counted from 0)
3439
 * relpos: signed rowcount offset from the seek position
3440
 * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3441
 * set_mark: If the row is found/in frame and set_mark is true, the mark is
3442
 *    moved to the row as a side-effect.
3443
 * isnull: output argument, receives isnull status of result
3444
 * isout: output argument, set to indicate whether target row position
3445
 *    is out of frame (can pass NULL if caller doesn't care about this)
3446
 *
3447
 * Specifying a nonexistent or not-in-frame row is not an error, it just
3448
 * causes a null result (plus setting *isout true, if isout isn't NULL).
3449
 *
3450
 * Note that some exclusion-clause options lead to situations where the
3451
 * rows that are in-frame are not consecutive in the partition.  But we
3452
 * count only in-frame rows when measuring relpos.
3453
 *
3454
 * The set_mark flag is interpreted as meaning that the caller will specify
3455
 * a constant (or, perhaps, monotonically increasing) relpos in successive
3456
 * calls, so that *if there is no exclusion clause* there will be no need
3457
 * to fetch a row before the previously fetched row.  But we do not expect
3458
 * the caller to know how to account for exclusion clauses.  Therefore,
3459
 * if there is an exclusion clause we take responsibility for adjusting the
3460
 * mark request to something that will be safe given the above assumption
3461
 * about relpos.
3462
 */
3463
Datum
3464
WinGetFuncArgInFrame(WindowObject winobj, int argno,
3465
           int relpos, int seektype, bool set_mark,
3466
           bool *isnull, bool *isout)
3467
0
{
3468
0
  WindowAggState *winstate;
3469
0
  ExprContext *econtext;
3470
0
  TupleTableSlot *slot;
3471
0
  int64   abs_pos;
3472
0
  int64   mark_pos;
3473
3474
0
  Assert(WindowObjectIsValid(winobj));
3475
0
  winstate = winobj->winstate;
3476
0
  econtext = winstate->ss.ps.ps_ExprContext;
3477
0
  slot = winstate->temp_slot_1;
3478
3479
0
  switch (seektype)
3480
0
  {
3481
0
    case WINDOW_SEEK_CURRENT:
3482
0
      elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3483
0
      abs_pos = mark_pos = 0; /* keep compiler quiet */
3484
0
      break;
3485
0
    case WINDOW_SEEK_HEAD:
3486
      /* rejecting relpos < 0 is easy and simplifies code below */
3487
0
      if (relpos < 0)
3488
0
        goto out_of_frame;
3489
0
      update_frameheadpos(winstate);
3490
0
      abs_pos = winstate->frameheadpos + relpos;
3491
0
      mark_pos = abs_pos;
3492
3493
      /*
3494
       * Account for exclusion option if one is active, but advance only
3495
       * abs_pos not mark_pos.  This prevents changes of the current
3496
       * row's peer group from resulting in trying to fetch a row before
3497
       * some previous mark position.
3498
       *
3499
       * Note that in some corner cases such as current row being
3500
       * outside frame, these calculations are theoretically too simple,
3501
       * but it doesn't matter because we'll end up deciding the row is
3502
       * out of frame.  We do not attempt to avoid fetching rows past
3503
       * end of frame; that would happen in some cases anyway.
3504
       */
3505
0
      switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3506
0
      {
3507
0
        case 0:
3508
          /* no adjustment needed */
3509
0
          break;
3510
0
        case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3511
0
          if (abs_pos >= winstate->currentpos &&
3512
0
            winstate->currentpos >= winstate->frameheadpos)
3513
0
            abs_pos++;
3514
0
          break;
3515
0
        case FRAMEOPTION_EXCLUDE_GROUP:
3516
0
          update_grouptailpos(winstate);
3517
0
          if (abs_pos >= winstate->groupheadpos &&
3518
0
            winstate->grouptailpos > winstate->frameheadpos)
3519
0
          {
3520
0
            int64   overlapstart = Max(winstate->groupheadpos,
3521
0
                             winstate->frameheadpos);
3522
3523
0
            abs_pos += winstate->grouptailpos - overlapstart;
3524
0
          }
3525
0
          break;
3526
0
        case FRAMEOPTION_EXCLUDE_TIES:
3527
0
          update_grouptailpos(winstate);
3528
0
          if (abs_pos >= winstate->groupheadpos &&
3529
0
            winstate->grouptailpos > winstate->frameheadpos)
3530
0
          {
3531
0
            int64   overlapstart = Max(winstate->groupheadpos,
3532
0
                             winstate->frameheadpos);
3533
3534
0
            if (abs_pos == overlapstart)
3535
0
              abs_pos = winstate->currentpos;
3536
0
            else
3537
0
              abs_pos += winstate->grouptailpos - overlapstart - 1;
3538
0
          }
3539
0
          break;
3540
0
        default:
3541
0
          elog(ERROR, "unrecognized frame option state: 0x%x",
3542
0
             winstate->frameOptions);
3543
0
          break;
3544
0
      }
3545
0
      break;
3546
0
    case WINDOW_SEEK_TAIL:
3547
      /* rejecting relpos > 0 is easy and simplifies code below */
3548
0
      if (relpos > 0)
3549
0
        goto out_of_frame;
3550
0
      update_frametailpos(winstate);
3551
0
      abs_pos = winstate->frametailpos - 1 + relpos;
3552
3553
      /*
3554
       * Account for exclusion option if one is active.  If there is no
3555
       * exclusion, we can safely set the mark at the accessed row.  But
3556
       * if there is, we can only mark the frame start, because we can't
3557
       * be sure how far back in the frame the exclusion might cause us
3558
       * to fetch in future.  Furthermore, we have to actually check
3559
       * against frameheadpos here, since it's unsafe to try to fetch a
3560
       * row before frame start if the mark might be there already.
3561
       */
3562
0
      switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3563
0
      {
3564
0
        case 0:
3565
          /* no adjustment needed */
3566
0
          mark_pos = abs_pos;
3567
0
          break;
3568
0
        case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3569
0
          if (abs_pos <= winstate->currentpos &&
3570
0
            winstate->currentpos < winstate->frametailpos)
3571
0
            abs_pos--;
3572
0
          update_frameheadpos(winstate);
3573
0
          if (abs_pos < winstate->frameheadpos)
3574
0
            goto out_of_frame;
3575
0
          mark_pos = winstate->frameheadpos;
3576
0
          break;
3577
0
        case FRAMEOPTION_EXCLUDE_GROUP:
3578
0
          update_grouptailpos(winstate);
3579
0
          if (abs_pos < winstate->grouptailpos &&
3580
0
            winstate->groupheadpos < winstate->frametailpos)
3581
0
          {
3582
0
            int64   overlapend = Min(winstate->grouptailpos,
3583
0
                           winstate->frametailpos);
3584
3585
0
            abs_pos -= overlapend - winstate->groupheadpos;
3586
0
          }
3587
0
          update_frameheadpos(winstate);
3588
0
          if (abs_pos < winstate->frameheadpos)
3589
0
            goto out_of_frame;
3590
0
          mark_pos = winstate->frameheadpos;
3591
0
          break;
3592
0
        case FRAMEOPTION_EXCLUDE_TIES:
3593
0
          update_grouptailpos(winstate);
3594
0
          if (abs_pos < winstate->grouptailpos &&
3595
0
            winstate->groupheadpos < winstate->frametailpos)
3596
0
          {
3597
0
            int64   overlapend = Min(winstate->grouptailpos,
3598
0
                           winstate->frametailpos);
3599
3600
0
            if (abs_pos == overlapend - 1)
3601
0
              abs_pos = winstate->currentpos;
3602
0
            else
3603
0
              abs_pos -= overlapend - 1 - winstate->groupheadpos;
3604
0
          }
3605
0
          update_frameheadpos(winstate);
3606
0
          if (abs_pos < winstate->frameheadpos)
3607
0
            goto out_of_frame;
3608
0
          mark_pos = winstate->frameheadpos;
3609
0
          break;
3610
0
        default:
3611
0
          elog(ERROR, "unrecognized frame option state: 0x%x",
3612
0
             winstate->frameOptions);
3613
0
          mark_pos = 0; /* keep compiler quiet */
3614
0
          break;
3615
0
      }
3616
0
      break;
3617
0
    default:
3618
0
      elog(ERROR, "unrecognized window seek type: %d", seektype);
3619
0
      abs_pos = mark_pos = 0; /* keep compiler quiet */
3620
0
      break;
3621
0
  }
3622
3623
0
  if (!window_gettupleslot(winobj, abs_pos, slot))
3624
0
    goto out_of_frame;
3625
3626
  /* The code above does not detect all out-of-frame cases, so check */
3627
0
  if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3628
0
    goto out_of_frame;
3629
3630
0
  if (isout)
3631
0
    *isout = false;
3632
0
  if (set_mark)
3633
0
    WinSetMarkPosition(winobj, mark_pos);
3634
0
  econtext->ecxt_outertuple = slot;
3635
0
  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3636
0
            econtext, isnull);
3637
3638
0
out_of_frame:
3639
0
  if (isout)
3640
0
    *isout = true;
3641
0
  *isnull = true;
3642
0
  return (Datum) 0;
3643
0
}
3644
3645
/*
3646
 * WinGetFuncArgCurrent
3647
 *    Evaluate a window function's argument expression on the current row.
3648
 *
3649
 * argno: argument number to evaluate (counted from 0)
3650
 * isnull: output argument, receives isnull status of result
3651
 *
3652
 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3653
 * WinGetFuncArgInFrame targeting the current row, because it will succeed
3654
 * even if the WindowObject's mark has been set beyond the current row.
3655
 * This should generally be used for "ordinary" arguments of a window
3656
 * function, such as the offset argument of lead() or lag().
3657
 */
3658
Datum
3659
WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3660
0
{
3661
0
  WindowAggState *winstate;
3662
0
  ExprContext *econtext;
3663
3664
0
  Assert(WindowObjectIsValid(winobj));
3665
0
  winstate = winobj->winstate;
3666
3667
0
  econtext = winstate->ss.ps.ps_ExprContext;
3668
3669
0
  econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3670
0
  return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3671
0
            econtext, isnull);
3672
0
}