Coverage Report

Created: 2025-10-09 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/commands/matview.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * matview.c
4
 *    materialized view support
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/commands/matview.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include "access/genam.h"
18
#include "access/heapam.h"
19
#include "access/htup_details.h"
20
#include "access/multixact.h"
21
#include "access/tableam.h"
22
#include "access/xact.h"
23
#include "catalog/indexing.h"
24
#include "catalog/namespace.h"
25
#include "catalog/pg_am.h"
26
#include "catalog/pg_opclass.h"
27
#include "commands/cluster.h"
28
#include "commands/matview.h"
29
#include "commands/tablecmds.h"
30
#include "commands/tablespace.h"
31
#include "executor/executor.h"
32
#include "executor/spi.h"
33
#include "miscadmin.h"
34
#include "pgstat.h"
35
#include "rewrite/rewriteHandler.h"
36
#include "storage/lmgr.h"
37
#include "tcop/tcopprot.h"
38
#include "utils/builtins.h"
39
#include "utils/lsyscache.h"
40
#include "utils/rel.h"
41
#include "utils/snapmgr.h"
42
#include "utils/syscache.h"
43
44
45
typedef struct
46
{
47
  DestReceiver pub;     /* publicly-known function pointers */
48
  Oid     transientoid; /* OID of new heap into which to store */
49
  /* These fields are filled by transientrel_startup: */
50
  Relation  transientrel; /* relation to write to */
51
  CommandId output_cid;   /* cmin to insert in output tuples */
52
  int     ti_options;   /* table_tuple_insert performance options */
53
  BulkInsertState bistate;  /* bulk insert state */
54
} DR_transientrel;
55
56
static int  matview_maintenance_depth = 0;
57
58
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
59
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
60
static void transientrel_shutdown(DestReceiver *self);
61
static void transientrel_destroy(DestReceiver *self);
62
static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
63
                     const char *queryString, bool is_create);
64
static char *make_temptable_name_n(char *tempname, int n);
65
static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
66
                   int save_sec_context);
67
static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
68
static bool is_usable_unique_index(Relation indexRel);
69
static void OpenMatViewIncrementalMaintenance(void);
70
static void CloseMatViewIncrementalMaintenance(void);
71
72
/*
73
 * SetMatViewPopulatedState
74
 *    Mark a materialized view as populated, or not.
75
 *
76
 * NOTE: caller must be holding an appropriate lock on the relation.
77
 */
78
void
79
SetMatViewPopulatedState(Relation relation, bool newstate)
80
0
{
81
0
  Relation  pgrel;
82
0
  HeapTuple tuple;
83
84
0
  Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
85
86
  /*
87
   * Update relation's pg_class entry.  Crucial side-effect: other backends
88
   * (and this one too!) are sent SI message to make them rebuild relcache
89
   * entries.
90
   */
91
0
  pgrel = table_open(RelationRelationId, RowExclusiveLock);
92
0
  tuple = SearchSysCacheCopy1(RELOID,
93
0
                ObjectIdGetDatum(RelationGetRelid(relation)));
94
0
  if (!HeapTupleIsValid(tuple))
95
0
    elog(ERROR, "cache lookup failed for relation %u",
96
0
       RelationGetRelid(relation));
97
98
0
  ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
99
100
0
  CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
101
102
0
  heap_freetuple(tuple);
103
0
  table_close(pgrel, RowExclusiveLock);
104
105
  /*
106
   * Advance command counter to make the updated pg_class row locally
107
   * visible.
108
   */
109
0
  CommandCounterIncrement();
110
0
}
111
112
/*
113
 * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
114
 *
115
 * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
116
 * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
117
 * statement associated with the materialized view.  The statement node's
118
 * skipData field shows whether the clause was used.
119
 */
120
ObjectAddress
121
ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
122
           QueryCompletion *qc)
123
0
{
124
0
  Oid     matviewOid;
125
0
  LOCKMODE  lockmode;
126
127
  /* Determine strength of lock needed. */
128
0
  lockmode = stmt->concurrent ? ExclusiveLock : AccessExclusiveLock;
129
130
  /*
131
   * Get a lock until end of transaction.
132
   */
133
0
  matviewOid = RangeVarGetRelidExtended(stmt->relation,
134
0
                      lockmode, 0,
135
0
                      RangeVarCallbackMaintainsTable,
136
0
                      NULL);
137
138
0
  return RefreshMatViewByOid(matviewOid, false, stmt->skipData,
139
0
                 stmt->concurrent, queryString, qc);
140
0
}
141
142
/*
143
 * RefreshMatViewByOid -- refresh materialized view by OID
144
 *
145
 * This refreshes the materialized view by creating a new table and swapping
146
 * the relfilenumbers of the new table and the old materialized view, so the OID
147
 * of the original materialized view is preserved. Thus we do not lose GRANT
148
 * nor references to this materialized view.
149
 *
150
 * If skipData is true, this is effectively like a TRUNCATE; otherwise it is
151
 * like a TRUNCATE followed by an INSERT using the SELECT statement associated
152
 * with the materialized view.
153
 *
154
 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
155
 * the new heap, it's better to create the indexes afterwards than to fill them
156
 * incrementally while we load.
157
 *
158
 * The matview's "populated" state is changed based on whether the contents
159
 * reflect the result set of the materialized view's query.
160
 *
161
 * This is also used to populate the materialized view created by CREATE
162
 * MATERIALIZED VIEW command.
163
 */
164
ObjectAddress
165
RefreshMatViewByOid(Oid matviewOid, bool is_create, bool skipData,
166
          bool concurrent, const char *queryString,
167
          QueryCompletion *qc)
168
0
{
169
0
  Relation  matviewRel;
170
0
  RewriteRule *rule;
171
0
  List     *actions;
172
0
  Query    *dataQuery;
173
0
  Oid     tableSpace;
174
0
  Oid     relowner;
175
0
  Oid     OIDNewHeap;
176
0
  uint64    processed = 0;
177
0
  char    relpersistence;
178
0
  Oid     save_userid;
179
0
  int     save_sec_context;
180
0
  int     save_nestlevel;
181
0
  ObjectAddress address;
182
183
0
  matviewRel = table_open(matviewOid, NoLock);
184
0
  relowner = matviewRel->rd_rel->relowner;
185
186
  /*
187
   * Switch to the owner's userid, so that any functions are run as that
188
   * user.  Also lock down security-restricted operations and arrange to
189
   * make GUC variable changes local to this command.
190
   */
191
0
  GetUserIdAndSecContext(&save_userid, &save_sec_context);
192
0
  SetUserIdAndSecContext(relowner,
193
0
               save_sec_context | SECURITY_RESTRICTED_OPERATION);
194
0
  save_nestlevel = NewGUCNestLevel();
195
0
  RestrictSearchPath();
196
197
  /* Make sure it is a materialized view. */
198
0
  if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
199
0
    ereport(ERROR,
200
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
201
0
         errmsg("\"%s\" is not a materialized view",
202
0
            RelationGetRelationName(matviewRel))));
203
204
  /* Check that CONCURRENTLY is not specified if not populated. */
205
0
  if (concurrent && !RelationIsPopulated(matviewRel))
206
0
    ereport(ERROR,
207
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
208
0
         errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
209
210
  /* Check that conflicting options have not been specified. */
211
0
  if (concurrent && skipData)
212
0
    ereport(ERROR,
213
0
        (errcode(ERRCODE_SYNTAX_ERROR),
214
0
         errmsg("%s and %s options cannot be used together",
215
0
            "CONCURRENTLY", "WITH NO DATA")));
216
217
  /*
218
   * Check that everything is correct for a refresh. Problems at this point
219
   * are internal errors, so elog is sufficient.
220
   */
221
0
  if (matviewRel->rd_rel->relhasrules == false ||
222
0
    matviewRel->rd_rules->numLocks < 1)
223
0
    elog(ERROR,
224
0
       "materialized view \"%s\" is missing rewrite information",
225
0
       RelationGetRelationName(matviewRel));
226
227
0
  if (matviewRel->rd_rules->numLocks > 1)
228
0
    elog(ERROR,
229
0
       "materialized view \"%s\" has too many rules",
230
0
       RelationGetRelationName(matviewRel));
231
232
0
  rule = matviewRel->rd_rules->rules[0];
233
0
  if (rule->event != CMD_SELECT || !(rule->isInstead))
234
0
    elog(ERROR,
235
0
       "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
236
0
       RelationGetRelationName(matviewRel));
237
238
0
  actions = rule->actions;
239
0
  if (list_length(actions) != 1)
240
0
    elog(ERROR,
241
0
       "the rule for materialized view \"%s\" is not a single action",
242
0
       RelationGetRelationName(matviewRel));
243
244
  /*
245
   * Check that there is a unique index with no WHERE clause on one or more
246
   * columns of the materialized view if CONCURRENTLY is specified.
247
   */
248
0
  if (concurrent)
249
0
  {
250
0
    List     *indexoidlist = RelationGetIndexList(matviewRel);
251
0
    ListCell   *indexoidscan;
252
0
    bool    hasUniqueIndex = false;
253
254
0
    Assert(!is_create);
255
256
0
    foreach(indexoidscan, indexoidlist)
257
0
    {
258
0
      Oid     indexoid = lfirst_oid(indexoidscan);
259
0
      Relation  indexRel;
260
261
0
      indexRel = index_open(indexoid, AccessShareLock);
262
0
      hasUniqueIndex = is_usable_unique_index(indexRel);
263
0
      index_close(indexRel, AccessShareLock);
264
0
      if (hasUniqueIndex)
265
0
        break;
266
0
    }
267
268
0
    list_free(indexoidlist);
269
270
0
    if (!hasUniqueIndex)
271
0
      ereport(ERROR,
272
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
273
0
           errmsg("cannot refresh materialized view \"%s\" concurrently",
274
0
              quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
275
0
                             RelationGetRelationName(matviewRel))),
276
0
           errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
277
0
  }
278
279
  /*
280
   * The stored query was rewritten at the time of the MV definition, but
281
   * has not been scribbled on by the planner.
282
   */
283
0
  dataQuery = linitial_node(Query, actions);
284
285
  /*
286
   * Check for active uses of the relation in the current transaction, such
287
   * as open scans.
288
   *
289
   * NB: We count on this to protect us against problems with refreshing the
290
   * data using TABLE_INSERT_FROZEN.
291
   */
292
0
  CheckTableNotInUse(matviewRel,
293
0
             is_create ? "CREATE MATERIALIZED VIEW" :
294
0
             "REFRESH MATERIALIZED VIEW");
295
296
  /*
297
   * Tentatively mark the matview as populated or not (this will roll back
298
   * if we fail later).
299
   */
300
0
  SetMatViewPopulatedState(matviewRel, !skipData);
301
302
  /* Concurrent refresh builds new data in temp tablespace, and does diff. */
303
0
  if (concurrent)
304
0
  {
305
0
    tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
306
0
    relpersistence = RELPERSISTENCE_TEMP;
307
0
  }
308
0
  else
309
0
  {
310
0
    tableSpace = matviewRel->rd_rel->reltablespace;
311
0
    relpersistence = matviewRel->rd_rel->relpersistence;
312
0
  }
313
314
  /*
315
   * Create the transient table that will receive the regenerated data. Lock
316
   * it against access by any other process until commit (by which time it
317
   * will be gone).
318
   */
319
0
  OIDNewHeap = make_new_heap(matviewOid, tableSpace,
320
0
                 matviewRel->rd_rel->relam,
321
0
                 relpersistence, ExclusiveLock);
322
0
  Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false));
323
324
  /* Generate the data, if wanted. */
325
0
  if (!skipData)
326
0
  {
327
0
    DestReceiver *dest;
328
329
0
    dest = CreateTransientRelDestReceiver(OIDNewHeap);
330
0
    processed = refresh_matview_datafill(dest, dataQuery, queryString,
331
0
                       is_create);
332
0
  }
333
334
  /* Make the matview match the newly generated data. */
335
0
  if (concurrent)
336
0
  {
337
0
    int     old_depth = matview_maintenance_depth;
338
339
0
    PG_TRY();
340
0
    {
341
0
      refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
342
0
                   save_sec_context);
343
0
    }
344
0
    PG_CATCH();
345
0
    {
346
0
      matview_maintenance_depth = old_depth;
347
0
      PG_RE_THROW();
348
0
    }
349
0
    PG_END_TRY();
350
0
    Assert(matview_maintenance_depth == old_depth);
351
0
  }
352
0
  else
353
0
  {
354
0
    refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
355
356
    /*
357
     * Inform cumulative stats system about our activity: basically, we
358
     * truncated the matview and inserted some new data.  (The concurrent
359
     * code path above doesn't need to worry about this because the
360
     * inserts and deletes it issues get counted by lower-level code.)
361
     */
362
0
    pgstat_count_truncate(matviewRel);
363
0
    if (!skipData)
364
0
      pgstat_count_heap_insert(matviewRel, processed);
365
0
  }
366
367
0
  table_close(matviewRel, NoLock);
368
369
  /* Roll back any GUC changes */
370
0
  AtEOXact_GUC(false, save_nestlevel);
371
372
  /* Restore userid and security context */
373
0
  SetUserIdAndSecContext(save_userid, save_sec_context);
374
375
0
  ObjectAddressSet(address, RelationRelationId, matviewOid);
376
377
  /*
378
   * Save the rowcount so that pg_stat_statements can track the total number
379
   * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we
380
   * still don't display the rowcount in the command completion tag output,
381
   * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW
382
   * command tag is left false in cmdtaglist.h. Otherwise, the change of
383
   * completion tag output might break applications using it.
384
   *
385
   * When called from CREATE MATERIALIZED VIEW command, the rowcount is
386
   * displayed with the command tag CMDTAG_SELECT.
387
   */
388
0
  if (qc)
389
0
    SetQueryCompletion(qc,
390
0
               is_create ? CMDTAG_SELECT : CMDTAG_REFRESH_MATERIALIZED_VIEW,
391
0
               processed);
392
393
0
  return address;
394
0
}
395
396
/*
397
 * refresh_matview_datafill
398
 *
399
 * Execute the given query, sending result rows to "dest" (which will
400
 * insert them into the target matview).
401
 *
402
 * Returns number of rows inserted.
403
 */
404
static uint64
405
refresh_matview_datafill(DestReceiver *dest, Query *query,
406
             const char *queryString, bool is_create)
407
0
{
408
0
  List     *rewritten;
409
0
  PlannedStmt *plan;
410
0
  QueryDesc  *queryDesc;
411
0
  Query    *copied_query;
412
0
  uint64    processed;
413
414
  /* Lock and rewrite, using a copy to preserve the original query. */
415
0
  copied_query = copyObject(query);
416
0
  AcquireRewriteLocks(copied_query, true, false);
417
0
  rewritten = QueryRewrite(copied_query);
418
419
  /* SELECT should never rewrite to more or less than one SELECT query */
420
0
  if (list_length(rewritten) != 1)
421
0
    elog(ERROR, "unexpected rewrite result for %s",
422
0
       is_create ? "CREATE MATERIALIZED VIEW " : "REFRESH MATERIALIZED VIEW");
423
0
  query = (Query *) linitial(rewritten);
424
425
  /* Check for user-requested abort. */
426
0
  CHECK_FOR_INTERRUPTS();
427
428
  /* Plan the query which will generate data for the refresh. */
429
0
  plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL, NULL);
430
431
  /*
432
   * Use a snapshot with an updated command ID to ensure this query sees
433
   * results of any previously executed queries.  (This could only matter if
434
   * the planner executed an allegedly-stable function that changed the
435
   * database contents, but let's do it anyway to be safe.)
436
   */
437
0
  PushCopiedSnapshot(GetActiveSnapshot());
438
0
  UpdateActiveSnapshotCommandId();
439
440
  /* Create a QueryDesc, redirecting output to our tuple receiver */
441
0
  queryDesc = CreateQueryDesc(plan, queryString,
442
0
                GetActiveSnapshot(), InvalidSnapshot,
443
0
                dest, NULL, NULL, 0);
444
445
  /* call ExecutorStart to prepare the plan for execution */
446
0
  ExecutorStart(queryDesc, 0);
447
448
  /* run the plan */
449
0
  ExecutorRun(queryDesc, ForwardScanDirection, 0);
450
451
0
  processed = queryDesc->estate->es_processed;
452
453
  /* and clean up */
454
0
  ExecutorFinish(queryDesc);
455
0
  ExecutorEnd(queryDesc);
456
457
0
  FreeQueryDesc(queryDesc);
458
459
0
  PopActiveSnapshot();
460
461
0
  return processed;
462
0
}
463
464
DestReceiver *
465
CreateTransientRelDestReceiver(Oid transientoid)
466
0
{
467
0
  DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
468
469
0
  self->pub.receiveSlot = transientrel_receive;
470
0
  self->pub.rStartup = transientrel_startup;
471
0
  self->pub.rShutdown = transientrel_shutdown;
472
0
  self->pub.rDestroy = transientrel_destroy;
473
0
  self->pub.mydest = DestTransientRel;
474
0
  self->transientoid = transientoid;
475
476
0
  return (DestReceiver *) self;
477
0
}
478
479
/*
480
 * transientrel_startup --- executor startup
481
 */
482
static void
483
transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
484
0
{
485
0
  DR_transientrel *myState = (DR_transientrel *) self;
486
0
  Relation  transientrel;
487
488
0
  transientrel = table_open(myState->transientoid, NoLock);
489
490
  /*
491
   * Fill private fields of myState for use by later routines
492
   */
493
0
  myState->transientrel = transientrel;
494
0
  myState->output_cid = GetCurrentCommandId(true);
495
0
  myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
496
0
  myState->bistate = GetBulkInsertState();
497
498
  /*
499
   * Valid smgr_targblock implies something already wrote to the relation.
500
   * This may be harmless, but this function hasn't planned for it.
501
   */
502
0
  Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
503
0
}
504
505
/*
506
 * transientrel_receive --- receive one tuple
507
 */
508
static bool
509
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
510
0
{
511
0
  DR_transientrel *myState = (DR_transientrel *) self;
512
513
  /*
514
   * Note that the input slot might not be of the type of the target
515
   * relation. That's supported by table_tuple_insert(), but slightly less
516
   * efficient than inserting with the right slot - but the alternative
517
   * would be to copy into a slot of the right type, which would not be
518
   * cheap either. This also doesn't allow accessing per-AM data (say a
519
   * tuple's xmin), but since we don't do that here...
520
   */
521
522
0
  table_tuple_insert(myState->transientrel,
523
0
             slot,
524
0
             myState->output_cid,
525
0
             myState->ti_options,
526
0
             myState->bistate);
527
528
  /* We know this is a newly created relation, so there are no indexes */
529
530
0
  return true;
531
0
}
532
533
/*
534
 * transientrel_shutdown --- executor end
535
 */
536
static void
537
transientrel_shutdown(DestReceiver *self)
538
0
{
539
0
  DR_transientrel *myState = (DR_transientrel *) self;
540
541
0
  FreeBulkInsertState(myState->bistate);
542
543
0
  table_finish_bulk_insert(myState->transientrel, myState->ti_options);
544
545
  /* close transientrel, but keep lock until commit */
546
0
  table_close(myState->transientrel, NoLock);
547
0
  myState->transientrel = NULL;
548
0
}
549
550
/*
551
 * transientrel_destroy --- release DestReceiver object
552
 */
553
static void
554
transientrel_destroy(DestReceiver *self)
555
0
{
556
0
  pfree(self);
557
0
}
558
559
560
/*
561
 * Given a qualified temporary table name, append an underscore followed by
562
 * the given integer, to make a new table name based on the old one.
563
 * The result is a palloc'd string.
564
 *
565
 * As coded, this would fail to make a valid SQL name if the given name were,
566
 * say, "FOO"."BAR".  Currently, the table name portion of the input will
567
 * never be double-quoted because it's of the form "pg_temp_NNN", cf
568
 * make_new_heap().  But we might have to work harder someday.
569
 */
570
static char *
571
make_temptable_name_n(char *tempname, int n)
572
0
{
573
0
  StringInfoData namebuf;
574
575
0
  initStringInfo(&namebuf);
576
0
  appendStringInfoString(&namebuf, tempname);
577
0
  appendStringInfo(&namebuf, "_%d", n);
578
0
  return namebuf.data;
579
0
}
580
581
/*
582
 * refresh_by_match_merge
583
 *
584
 * Refresh a materialized view with transactional semantics, while allowing
585
 * concurrent reads.
586
 *
587
 * This is called after a new version of the data has been created in a
588
 * temporary table.  It performs a full outer join against the old version of
589
 * the data, producing "diff" results.  This join cannot work if there are any
590
 * duplicated rows in either the old or new versions, in the sense that every
591
 * column would compare as equal between the two rows.  It does work correctly
592
 * in the face of rows which have at least one NULL value, with all non-NULL
593
 * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
594
 * indexes turns out to be quite convenient here; the tests we need to make
595
 * are consistent with default behavior.  If there is at least one UNIQUE
596
 * index on the materialized view, we have exactly the guarantee we need.
597
 *
598
 * The temporary table used to hold the diff results contains just the TID of
599
 * the old record (if matched) and the ROW from the new table as a single
600
 * column of complex record type (if matched).
601
 *
602
 * Once we have the diff table, we perform set-based DELETE and INSERT
603
 * operations against the materialized view, and discard both temporary
604
 * tables.
605
 *
606
 * Everything from the generation of the new data to applying the differences
607
 * takes place under cover of an ExclusiveLock, since it seems as though we
608
 * would want to prohibit not only concurrent REFRESH operations, but also
609
 * incremental maintenance.  It also doesn't seem reasonable or safe to allow
610
 * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
611
 * this command.
612
 */
613
static void
614
refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
615
             int save_sec_context)
616
0
{
617
0
  StringInfoData querybuf;
618
0
  Relation  matviewRel;
619
0
  Relation  tempRel;
620
0
  char     *matviewname;
621
0
  char     *tempname;
622
0
  char     *diffname;
623
0
  TupleDesc tupdesc;
624
0
  bool    foundUniqueIndex;
625
0
  List     *indexoidlist;
626
0
  ListCell   *indexoidscan;
627
0
  int16   relnatts;
628
0
  Oid      *opUsedForQual;
629
630
0
  initStringInfo(&querybuf);
631
0
  matviewRel = table_open(matviewOid, NoLock);
632
0
  matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
633
0
                       RelationGetRelationName(matviewRel));
634
0
  tempRel = table_open(tempOid, NoLock);
635
0
  tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
636
0
                      RelationGetRelationName(tempRel));
637
0
  diffname = make_temptable_name_n(tempname, 2);
638
639
0
  relnatts = RelationGetNumberOfAttributes(matviewRel);
640
641
  /* Open SPI context. */
642
0
  SPI_connect();
643
644
  /* Analyze the temp table with the new contents. */
645
0
  appendStringInfo(&querybuf, "ANALYZE %s", tempname);
646
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
647
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
648
649
  /*
650
   * We need to ensure that there are not duplicate rows without NULLs in
651
   * the new data set before we can count on the "diff" results.  Check for
652
   * that in a way that allows showing the first duplicated row found.  Even
653
   * after we pass this test, a unique index on the materialized view may
654
   * find a duplicate key problem.
655
   *
656
   * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
657
   * keep ".*" from being expanded into multiple columns in a SELECT list.
658
   * Compare ruleutils.c's get_variable().
659
   */
660
0
  resetStringInfo(&querybuf);
661
0
  appendStringInfo(&querybuf,
662
0
           "SELECT newdata.*::%s FROM %s newdata "
663
0
           "WHERE newdata.* IS NOT NULL AND EXISTS "
664
0
           "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
665
0
           "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
666
0
           "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
667
0
           "newdata.ctid)",
668
0
           tempname, tempname, tempname);
669
0
  if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
670
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
671
0
  if (SPI_processed > 0)
672
0
  {
673
    /*
674
     * Note that this ereport() is returning data to the user.  Generally,
675
     * we would want to make sure that the user has been granted access to
676
     * this data.  However, REFRESH MAT VIEW is only able to be run by the
677
     * owner of the mat view (or a superuser) and therefore there is no
678
     * need to check for access to data in the mat view.
679
     */
680
0
    ereport(ERROR,
681
0
        (errcode(ERRCODE_CARDINALITY_VIOLATION),
682
0
         errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
683
0
            RelationGetRelationName(matviewRel)),
684
0
         errdetail("Row: %s",
685
0
               SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
686
0
  }
687
688
  /*
689
   * Create the temporary "diff" table.
690
   *
691
   * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context,
692
   * because you cannot create temp tables in SRO context.  For extra
693
   * paranoia, add the composite type column only after switching back to
694
   * SRO context.
695
   */
696
0
  SetUserIdAndSecContext(relowner,
697
0
               save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
698
0
  resetStringInfo(&querybuf);
699
0
  appendStringInfo(&querybuf,
700
0
           "CREATE TEMP TABLE %s (tid pg_catalog.tid)",
701
0
           diffname);
702
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
703
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
704
0
  SetUserIdAndSecContext(relowner,
705
0
               save_sec_context | SECURITY_RESTRICTED_OPERATION);
706
0
  resetStringInfo(&querybuf);
707
0
  appendStringInfo(&querybuf,
708
0
           "ALTER TABLE %s ADD COLUMN newdata %s",
709
0
           diffname, tempname);
710
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
711
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
712
713
  /* Start building the query for populating the diff table. */
714
0
  resetStringInfo(&querybuf);
715
0
  appendStringInfo(&querybuf,
716
0
           "INSERT INTO %s "
717
0
           "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
718
0
           "FROM %s mv FULL JOIN %s newdata ON (",
719
0
           diffname, tempname, matviewname, tempname);
720
721
  /*
722
   * Get the list of index OIDs for the table from the relcache, and look up
723
   * each one in the pg_index syscache.  We will test for equality on all
724
   * columns present in all unique indexes which only reference columns and
725
   * include all rows.
726
   */
727
0
  tupdesc = matviewRel->rd_att;
728
0
  opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
729
0
  foundUniqueIndex = false;
730
731
0
  indexoidlist = RelationGetIndexList(matviewRel);
732
733
0
  foreach(indexoidscan, indexoidlist)
734
0
  {
735
0
    Oid     indexoid = lfirst_oid(indexoidscan);
736
0
    Relation  indexRel;
737
738
0
    indexRel = index_open(indexoid, RowExclusiveLock);
739
0
    if (is_usable_unique_index(indexRel))
740
0
    {
741
0
      Form_pg_index indexStruct = indexRel->rd_index;
742
0
      int     indnkeyatts = indexStruct->indnkeyatts;
743
0
      oidvector  *indclass;
744
0
      Datum   indclassDatum;
745
0
      int     i;
746
747
      /* Must get indclass the hard way. */
748
0
      indclassDatum = SysCacheGetAttrNotNull(INDEXRELID,
749
0
                           indexRel->rd_indextuple,
750
0
                           Anum_pg_index_indclass);
751
0
      indclass = (oidvector *) DatumGetPointer(indclassDatum);
752
753
      /* Add quals for all columns from this index. */
754
0
      for (i = 0; i < indnkeyatts; i++)
755
0
      {
756
0
        int     attnum = indexStruct->indkey.values[i];
757
0
        Oid     opclass = indclass->values[i];
758
0
        Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
759
0
        Oid     attrtype = attr->atttypid;
760
0
        HeapTuple cla_ht;
761
0
        Form_pg_opclass cla_tup;
762
0
        Oid     opfamily;
763
0
        Oid     opcintype;
764
0
        Oid     op;
765
0
        const char *leftop;
766
0
        const char *rightop;
767
768
        /*
769
         * Identify the equality operator associated with this index
770
         * column.  First we need to look up the column's opclass.
771
         */
772
0
        cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
773
0
        if (!HeapTupleIsValid(cla_ht))
774
0
          elog(ERROR, "cache lookup failed for opclass %u", opclass);
775
0
        cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
776
0
        opfamily = cla_tup->opcfamily;
777
0
        opcintype = cla_tup->opcintype;
778
0
        ReleaseSysCache(cla_ht);
779
780
0
        op = get_opfamily_member_for_cmptype(opfamily, opcintype, opcintype, COMPARE_EQ);
781
0
        if (!OidIsValid(op))
782
0
          elog(ERROR, "missing equality operator for (%u,%u) in opfamily %u",
783
0
             opcintype, opcintype, opfamily);
784
785
        /*
786
         * If we find the same column with the same equality semantics
787
         * in more than one index, we only need to emit the equality
788
         * clause once.
789
         *
790
         * Since we only remember the last equality operator, this
791
         * code could be fooled into emitting duplicate clauses given
792
         * multiple indexes with several different opclasses ... but
793
         * that's so unlikely it doesn't seem worth spending extra
794
         * code to avoid.
795
         */
796
0
        if (opUsedForQual[attnum - 1] == op)
797
0
          continue;
798
0
        opUsedForQual[attnum - 1] = op;
799
800
        /*
801
         * Actually add the qual, ANDed with any others.
802
         */
803
0
        if (foundUniqueIndex)
804
0
          appendStringInfoString(&querybuf, " AND ");
805
806
0
        leftop = quote_qualified_identifier("newdata",
807
0
                          NameStr(attr->attname));
808
0
        rightop = quote_qualified_identifier("mv",
809
0
                           NameStr(attr->attname));
810
811
0
        generate_operator_clause(&querybuf,
812
0
                     leftop, attrtype,
813
0
                     op,
814
0
                     rightop, attrtype);
815
816
0
        foundUniqueIndex = true;
817
0
      }
818
0
    }
819
820
    /* Keep the locks, since we're about to run DML which needs them. */
821
0
    index_close(indexRel, NoLock);
822
0
  }
823
824
0
  list_free(indexoidlist);
825
826
  /*
827
   * There must be at least one usable unique index on the matview.
828
   *
829
   * ExecRefreshMatView() checks that after taking the exclusive lock on the
830
   * matview. So at least one unique index is guaranteed to exist here
831
   * because the lock is still being held.  (One known exception is if a
832
   * function called as part of refreshing the matview drops the index.
833
   * That's a pretty silly thing to do.)
834
   */
835
0
  if (!foundUniqueIndex)
836
0
    ereport(ERROR,
837
0
        errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
838
0
        errmsg("could not find suitable unique index on materialized view \"%s\"",
839
0
             RelationGetRelationName(matviewRel)));
840
841
0
  appendStringInfoString(&querybuf,
842
0
               " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
843
0
               "WHERE newdata.* IS NULL OR mv.* IS NULL "
844
0
               "ORDER BY tid");
845
846
  /* Populate the temporary "diff" table. */
847
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
848
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
849
850
  /*
851
   * We have no further use for data from the "full-data" temp table, but we
852
   * must keep it around because its type is referenced from the diff table.
853
   */
854
855
  /* Analyze the diff table. */
856
0
  resetStringInfo(&querybuf);
857
0
  appendStringInfo(&querybuf, "ANALYZE %s", diffname);
858
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
859
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
860
861
0
  OpenMatViewIncrementalMaintenance();
862
863
  /* Deletes must come before inserts; do them first. */
864
0
  resetStringInfo(&querybuf);
865
0
  appendStringInfo(&querybuf,
866
0
           "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
867
0
           "(SELECT diff.tid FROM %s diff "
868
0
           "WHERE diff.tid IS NOT NULL "
869
0
           "AND diff.newdata IS NULL)",
870
0
           matviewname, diffname);
871
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
872
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
873
874
  /* Inserts go last. */
875
0
  resetStringInfo(&querybuf);
876
0
  appendStringInfo(&querybuf,
877
0
           "INSERT INTO %s SELECT (diff.newdata).* "
878
0
           "FROM %s diff WHERE tid IS NULL",
879
0
           matviewname, diffname);
880
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
881
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
882
883
  /* We're done maintaining the materialized view. */
884
0
  CloseMatViewIncrementalMaintenance();
885
0
  table_close(tempRel, NoLock);
886
0
  table_close(matviewRel, NoLock);
887
888
  /* Clean up temp tables. */
889
0
  resetStringInfo(&querybuf);
890
0
  appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
891
0
  if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
892
0
    elog(ERROR, "SPI_exec failed: %s", querybuf.data);
893
894
  /* Close SPI context. */
895
0
  if (SPI_finish() != SPI_OK_FINISH)
896
0
    elog(ERROR, "SPI_finish failed");
897
0
}
898
899
/*
900
 * Swap the physical files of the target and transient tables, then rebuild
901
 * the target's indexes and throw away the transient table.  Security context
902
 * swapping is handled by the called function, so it is not needed here.
903
 */
904
static void
905
refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
906
0
{
907
0
  finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
908
0
           RecentXmin, ReadNextMultiXactId(), relpersistence);
909
0
}
910
911
/*
912
 * Check whether specified index is usable for match merge.
913
 */
914
static bool
915
is_usable_unique_index(Relation indexRel)
916
0
{
917
0
  Form_pg_index indexStruct = indexRel->rd_index;
918
919
  /*
920
   * Must be unique, valid, immediate, non-partial, and be defined over
921
   * plain user columns (not expressions).
922
   */
923
0
  if (indexStruct->indisunique &&
924
0
    indexStruct->indimmediate &&
925
0
    indexStruct->indisvalid &&
926
0
    RelationGetIndexPredicate(indexRel) == NIL &&
927
0
    indexStruct->indnatts > 0)
928
0
  {
929
    /*
930
     * The point of groveling through the index columns individually is to
931
     * reject both index expressions and system columns.  Currently,
932
     * matviews couldn't have OID columns so there's no way to create an
933
     * index on a system column; but maybe someday that wouldn't be true,
934
     * so let's be safe.
935
     */
936
0
    int     numatts = indexStruct->indnatts;
937
0
    int     i;
938
939
0
    for (i = 0; i < numatts; i++)
940
0
    {
941
0
      int     attnum = indexStruct->indkey.values[i];
942
943
0
      if (attnum <= 0)
944
0
        return false;
945
0
    }
946
0
    return true;
947
0
  }
948
0
  return false;
949
0
}
950
951
952
/*
953
 * This should be used to test whether the backend is in a context where it is
954
 * OK to allow DML statements to modify materialized views.  We only want to
955
 * allow that for internal code driven by the materialized view definition,
956
 * not for arbitrary user-supplied code.
957
 *
958
 * While the function names reflect the fact that their main intended use is
959
 * incremental maintenance of materialized views (in response to changes to
960
 * the data in referenced relations), they are initially used to allow REFRESH
961
 * without blocking concurrent reads.
962
 */
963
bool
964
MatViewIncrementalMaintenanceIsEnabled(void)
965
0
{
966
0
  return matview_maintenance_depth > 0;
967
0
}
968
969
static void
970
OpenMatViewIncrementalMaintenance(void)
971
0
{
972
0
  matview_maintenance_depth++;
973
0
}
974
975
static void
976
CloseMatViewIncrementalMaintenance(void)
977
0
{
978
0
  matview_maintenance_depth--;
979
  Assert(matview_maintenance_depth >= 0);
980
0
}