Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/commands/publicationcmds.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * publicationcmds.c
4
 *    publication manipulation
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    src/backend/commands/publicationcmds.c
11
 *
12
 *-------------------------------------------------------------------------
13
 */
14
15
#include "postgres.h"
16
17
#include "access/htup_details.h"
18
#include "access/table.h"
19
#include "access/xact.h"
20
#include "catalog/catalog.h"
21
#include "catalog/indexing.h"
22
#include "catalog/namespace.h"
23
#include "catalog/objectaccess.h"
24
#include "catalog/objectaddress.h"
25
#include "catalog/pg_database.h"
26
#include "catalog/pg_inherits.h"
27
#include "catalog/pg_namespace.h"
28
#include "catalog/pg_proc.h"
29
#include "catalog/pg_publication.h"
30
#include "catalog/pg_publication_namespace.h"
31
#include "catalog/pg_publication_rel.h"
32
#include "commands/defrem.h"
33
#include "commands/event_trigger.h"
34
#include "commands/publicationcmds.h"
35
#include "miscadmin.h"
36
#include "nodes/nodeFuncs.h"
37
#include "parser/parse_clause.h"
38
#include "parser/parse_collate.h"
39
#include "parser/parse_relation.h"
40
#include "rewrite/rewriteHandler.h"
41
#include "storage/lmgr.h"
42
#include "utils/acl.h"
43
#include "utils/builtins.h"
44
#include "utils/inval.h"
45
#include "utils/lsyscache.h"
46
#include "utils/rel.h"
47
#include "utils/syscache.h"
48
#include "utils/varlena.h"
49
50
51
/*
52
 * Information used to validate the columns in the row filter expression. See
53
 * contain_invalid_rfcolumn_walker for details.
54
 */
55
typedef struct rf_context
56
{
57
  Bitmapset  *bms_replident;  /* bitset of replica identity columns */
58
  bool    pubviaroot;   /* true if we are validating the parent
59
                 * relation's row filter */
60
  Oid     relid;      /* relid of the relation */
61
  Oid     parentid;   /* relid of the parent relation */
62
} rf_context;
63
64
static List *OpenTableList(List *tables);
65
static void CloseTableList(List *rels);
66
static void LockSchemaList(List *schemalist);
67
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68
                 AlterPublicationStmt *stmt);
69
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71
                  AlterPublicationStmt *stmt);
72
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73
static char defGetGeneratedColsOption(DefElem *def);
74
75
76
static void
77
parse_publication_options(ParseState *pstate,
78
              List *options,
79
              bool *publish_given,
80
              PublicationActions *pubactions,
81
              bool *publish_via_partition_root_given,
82
              bool *publish_via_partition_root,
83
              bool *publish_generated_columns_given,
84
              char *publish_generated_columns)
85
0
{
86
0
  ListCell   *lc;
87
88
0
  *publish_given = false;
89
0
  *publish_via_partition_root_given = false;
90
0
  *publish_generated_columns_given = false;
91
92
  /* defaults */
93
0
  pubactions->pubinsert = true;
94
0
  pubactions->pubupdate = true;
95
0
  pubactions->pubdelete = true;
96
0
  pubactions->pubtruncate = true;
97
0
  *publish_via_partition_root = false;
98
0
  *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99
100
  /* Parse options */
101
0
  foreach(lc, options)
102
0
  {
103
0
    DefElem    *defel = (DefElem *) lfirst(lc);
104
105
0
    if (strcmp(defel->defname, "publish") == 0)
106
0
    {
107
0
      char     *publish;
108
0
      List     *publish_list;
109
0
      ListCell   *lc2;
110
111
0
      if (*publish_given)
112
0
        errorConflictingDefElem(defel, pstate);
113
114
      /*
115
       * If publish option was given only the explicitly listed actions
116
       * should be published.
117
       */
118
0
      pubactions->pubinsert = false;
119
0
      pubactions->pubupdate = false;
120
0
      pubactions->pubdelete = false;
121
0
      pubactions->pubtruncate = false;
122
123
0
      *publish_given = true;
124
0
      publish = defGetString(defel);
125
126
0
      if (!SplitIdentifierString(publish, ',', &publish_list))
127
0
        ereport(ERROR,
128
0
            (errcode(ERRCODE_SYNTAX_ERROR),
129
0
             errmsg("invalid list syntax in parameter \"%s\"",
130
0
                "publish")));
131
132
      /* Process the option list. */
133
0
      foreach(lc2, publish_list)
134
0
      {
135
0
        char     *publish_opt = (char *) lfirst(lc2);
136
137
0
        if (strcmp(publish_opt, "insert") == 0)
138
0
          pubactions->pubinsert = true;
139
0
        else if (strcmp(publish_opt, "update") == 0)
140
0
          pubactions->pubupdate = true;
141
0
        else if (strcmp(publish_opt, "delete") == 0)
142
0
          pubactions->pubdelete = true;
143
0
        else if (strcmp(publish_opt, "truncate") == 0)
144
0
          pubactions->pubtruncate = true;
145
0
        else
146
0
          ereport(ERROR,
147
0
              (errcode(ERRCODE_SYNTAX_ERROR),
148
0
               errmsg("unrecognized value for publication option \"%s\": \"%s\"",
149
0
                  "publish", publish_opt)));
150
0
      }
151
0
    }
152
0
    else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
153
0
    {
154
0
      if (*publish_via_partition_root_given)
155
0
        errorConflictingDefElem(defel, pstate);
156
0
      *publish_via_partition_root_given = true;
157
0
      *publish_via_partition_root = defGetBoolean(defel);
158
0
    }
159
0
    else if (strcmp(defel->defname, "publish_generated_columns") == 0)
160
0
    {
161
0
      if (*publish_generated_columns_given)
162
0
        errorConflictingDefElem(defel, pstate);
163
0
      *publish_generated_columns_given = true;
164
0
      *publish_generated_columns = defGetGeneratedColsOption(defel);
165
0
    }
166
0
    else
167
0
      ereport(ERROR,
168
0
          (errcode(ERRCODE_SYNTAX_ERROR),
169
0
           errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
170
0
  }
171
0
}
172
173
/*
174
 * Convert the PublicationObjSpecType list into schema oid list and
175
 * PublicationTable list.
176
 */
177
static void
178
ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
179
               List **rels, List **schemas)
180
0
{
181
0
  ListCell   *cell;
182
0
  PublicationObjSpec *pubobj;
183
184
0
  if (!pubobjspec_list)
185
0
    return;
186
187
0
  foreach(cell, pubobjspec_list)
188
0
  {
189
0
    Oid     schemaid;
190
0
    List     *search_path;
191
192
0
    pubobj = (PublicationObjSpec *) lfirst(cell);
193
194
0
    switch (pubobj->pubobjtype)
195
0
    {
196
0
      case PUBLICATIONOBJ_TABLE:
197
0
        *rels = lappend(*rels, pubobj->pubtable);
198
0
        break;
199
0
      case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
200
0
        schemaid = get_namespace_oid(pubobj->name, false);
201
202
        /* Filter out duplicates if user specifies "sch1, sch1" */
203
0
        *schemas = list_append_unique_oid(*schemas, schemaid);
204
0
        break;
205
0
      case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
206
0
        search_path = fetch_search_path(false);
207
0
        if (search_path == NIL) /* nothing valid in search_path? */
208
0
          ereport(ERROR,
209
0
              errcode(ERRCODE_UNDEFINED_SCHEMA),
210
0
              errmsg("no schema has been selected for CURRENT_SCHEMA"));
211
212
0
        schemaid = linitial_oid(search_path);
213
0
        list_free(search_path);
214
215
        /* Filter out duplicates if user specifies "sch1, sch1" */
216
0
        *schemas = list_append_unique_oid(*schemas, schemaid);
217
0
        break;
218
0
      default:
219
        /* shouldn't happen */
220
0
        elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
221
0
        break;
222
0
    }
223
0
  }
224
0
}
225
226
/*
227
 * Returns true if any of the columns used in the row filter WHERE expression is
228
 * not part of REPLICA IDENTITY, false otherwise.
229
 */
230
static bool
231
contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
232
0
{
233
0
  if (node == NULL)
234
0
    return false;
235
236
0
  if (IsA(node, Var))
237
0
  {
238
0
    Var      *var = (Var *) node;
239
0
    AttrNumber  attnum = var->varattno;
240
241
    /*
242
     * If pubviaroot is true, we are validating the row filter of the
243
     * parent table, but the bitmap contains the replica identity
244
     * information of the child table. So, get the column number of the
245
     * child table as parent and child column order could be different.
246
     */
247
0
    if (context->pubviaroot)
248
0
    {
249
0
      char     *colname = get_attname(context->parentid, attnum, false);
250
251
0
      attnum = get_attnum(context->relid, colname);
252
0
    }
253
254
0
    if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
255
0
               context->bms_replident))
256
0
      return true;
257
0
  }
258
259
0
  return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
260
0
                  context);
261
0
}
262
263
/*
264
 * Check if all columns referenced in the filter expression are part of the
265
 * REPLICA IDENTITY index or not.
266
 *
267
 * Returns true if any invalid column is found.
268
 */
269
bool
270
pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
271
                 bool pubviaroot)
272
0
{
273
0
  HeapTuple rftuple;
274
0
  Oid     relid = RelationGetRelid(relation);
275
0
  Oid     publish_as_relid = RelationGetRelid(relation);
276
0
  bool    result = false;
277
0
  Datum   rfdatum;
278
0
  bool    rfisnull;
279
280
  /*
281
   * FULL means all columns are in the REPLICA IDENTITY, so all columns are
282
   * allowed in the row filter and we can skip the validation.
283
   */
284
0
  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
285
0
    return false;
286
287
  /*
288
   * For a partition, if pubviaroot is true, find the topmost ancestor that
289
   * is published via this publication as we need to use its row filter
290
   * expression to filter the partition's changes.
291
   *
292
   * Note that even though the row filter used is for an ancestor, the
293
   * REPLICA IDENTITY used will be for the actual child table.
294
   */
295
0
  if (pubviaroot && relation->rd_rel->relispartition)
296
0
  {
297
0
    publish_as_relid
298
0
      = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
299
300
0
    if (!OidIsValid(publish_as_relid))
301
0
      publish_as_relid = relid;
302
0
  }
303
304
0
  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
305
0
                ObjectIdGetDatum(publish_as_relid),
306
0
                ObjectIdGetDatum(pubid));
307
308
0
  if (!HeapTupleIsValid(rftuple))
309
0
    return false;
310
311
0
  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
312
0
                Anum_pg_publication_rel_prqual,
313
0
                &rfisnull);
314
315
0
  if (!rfisnull)
316
0
  {
317
0
    rf_context  context = {0};
318
0
    Node     *rfnode;
319
0
    Bitmapset  *bms = NULL;
320
321
0
    context.pubviaroot = pubviaroot;
322
0
    context.parentid = publish_as_relid;
323
0
    context.relid = relid;
324
325
    /* Remember columns that are part of the REPLICA IDENTITY */
326
0
    bms = RelationGetIndexAttrBitmap(relation,
327
0
                     INDEX_ATTR_BITMAP_IDENTITY_KEY);
328
329
0
    context.bms_replident = bms;
330
0
    rfnode = stringToNode(TextDatumGetCString(rfdatum));
331
0
    result = contain_invalid_rfcolumn_walker(rfnode, &context);
332
0
  }
333
334
0
  ReleaseSysCache(rftuple);
335
336
0
  return result;
337
0
}
338
339
/*
340
 * Check for invalid columns in the publication table definition.
341
 *
342
 * This function evaluates two conditions:
343
 *
344
 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
345
 *    by the column list. If any column is missing, *invalid_column_list is set
346
 *    to true.
347
 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
348
 *    are published, either by being explicitly named in the column list or, if
349
 *    no column list is specified, by setting the option
350
 *    publish_generated_columns to stored. If any unpublished
351
 *    generated column is found, *invalid_gen_col is set to true.
352
 *
353
 * Returns true if any of the above conditions are not met.
354
 */
355
bool
356
pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
357
              bool pubviaroot, char pubgencols_type,
358
              bool *invalid_column_list,
359
              bool *invalid_gen_col)
360
0
{
361
0
  Oid     relid = RelationGetRelid(relation);
362
0
  Oid     publish_as_relid = RelationGetRelid(relation);
363
0
  Bitmapset  *idattrs;
364
0
  Bitmapset  *columns = NULL;
365
0
  TupleDesc desc = RelationGetDescr(relation);
366
0
  Publication *pub;
367
0
  int     x;
368
369
0
  *invalid_column_list = false;
370
0
  *invalid_gen_col = false;
371
372
  /*
373
   * For a partition, if pubviaroot is true, find the topmost ancestor that
374
   * is published via this publication as we need to use its column list for
375
   * the changes.
376
   *
377
   * Note that even though the column list used is for an ancestor, the
378
   * REPLICA IDENTITY used will be for the actual child table.
379
   */
380
0
  if (pubviaroot && relation->rd_rel->relispartition)
381
0
  {
382
0
    publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
383
384
0
    if (!OidIsValid(publish_as_relid))
385
0
      publish_as_relid = relid;
386
0
  }
387
388
  /* Fetch the column list */
389
0
  pub = GetPublication(pubid);
390
0
  check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
391
392
0
  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
393
0
  {
394
    /* With REPLICA IDENTITY FULL, no column list is allowed. */
395
0
    *invalid_column_list = (columns != NULL);
396
397
    /*
398
     * As we don't allow a column list with REPLICA IDENTITY FULL, the
399
     * publish_generated_columns option must be set to stored if the table
400
     * has any stored generated columns.
401
     */
402
0
    if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
403
0
      relation->rd_att->constr &&
404
0
      relation->rd_att->constr->has_generated_stored)
405
0
      *invalid_gen_col = true;
406
407
    /*
408
     * Virtual generated columns are currently not supported for logical
409
     * replication at all.
410
     */
411
0
    if (relation->rd_att->constr &&
412
0
      relation->rd_att->constr->has_generated_virtual)
413
0
      *invalid_gen_col = true;
414
415
0
    if (*invalid_gen_col && *invalid_column_list)
416
0
      return true;
417
0
  }
418
419
  /* Remember columns that are part of the REPLICA IDENTITY */
420
0
  idattrs = RelationGetIndexAttrBitmap(relation,
421
0
                     INDEX_ATTR_BITMAP_IDENTITY_KEY);
422
423
  /*
424
   * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
425
   * (to handle system columns the usual way), while column list does not
426
   * use offset, so we can't do bms_is_subset(). Instead, we have to loop
427
   * over the idattrs and check all of them are in the list.
428
   */
429
0
  x = -1;
430
0
  while ((x = bms_next_member(idattrs, x)) >= 0)
431
0
  {
432
0
    AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
433
0
    Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
434
435
0
    if (columns == NULL)
436
0
    {
437
      /*
438
       * The publish_generated_columns option must be set to stored if
439
       * the REPLICA IDENTITY contains any stored generated column.
440
       */
441
0
      if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
442
0
      {
443
0
        *invalid_gen_col = true;
444
0
        break;
445
0
      }
446
447
      /*
448
       * The equivalent setting for virtual generated columns does not
449
       * exist yet.
450
       */
451
0
      if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
452
0
      {
453
0
        *invalid_gen_col = true;
454
0
        break;
455
0
      }
456
457
      /* Skip validating the column list since it is not defined */
458
0
      continue;
459
0
    }
460
461
    /*
462
     * If pubviaroot is true, we are validating the column list of the
463
     * parent table, but the bitmap contains the replica identity
464
     * information of the child table. The parent/child attnums may not
465
     * match, so translate them to the parent - get the attname from the
466
     * child, and look it up in the parent.
467
     */
468
0
    if (pubviaroot)
469
0
    {
470
      /* attribute name in the child table */
471
0
      char     *colname = get_attname(relid, attnum, false);
472
473
      /*
474
       * Determine the attnum for the attribute name in parent (we are
475
       * using the column list defined on the parent).
476
       */
477
0
      attnum = get_attnum(publish_as_relid, colname);
478
0
    }
479
480
    /* replica identity column, not covered by the column list */
481
0
    *invalid_column_list |= !bms_is_member(attnum, columns);
482
483
0
    if (*invalid_column_list && *invalid_gen_col)
484
0
      break;
485
0
  }
486
487
0
  bms_free(columns);
488
0
  bms_free(idattrs);
489
490
0
  return *invalid_column_list || *invalid_gen_col;
491
0
}
492
493
/*
494
 * Invalidate entries in the RelationSyncCache for relations included in the
495
 * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
496
 *
497
 * If 'puballtables' is true, invalidate all cache entries.
498
 */
499
void
500
InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
501
0
{
502
0
  if (puballtables)
503
0
  {
504
0
    CacheInvalidateRelSyncAll();
505
0
  }
506
0
  else
507
0
  {
508
0
    List     *relids = NIL;
509
0
    List     *schemarelids = NIL;
510
511
    /*
512
     * For partitioned tables, we must invalidate all partitions and
513
     * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
514
     * a target. However, WAL records for TRUNCATE specify both a root and
515
     * its leaves.
516
     */
517
0
    relids = GetPublicationRelations(pubid,
518
0
                     PUBLICATION_PART_ALL);
519
0
    schemarelids = GetAllSchemaPublicationRelations(pubid,
520
0
                            PUBLICATION_PART_ALL);
521
522
0
    relids = list_concat_unique_oid(relids, schemarelids);
523
524
    /* Invalidate the relsyncache */
525
0
    foreach_oid(relid, relids)
526
0
      CacheInvalidateRelSync(relid);
527
0
  }
528
529
0
  return;
530
0
}
531
532
/* check_functions_in_node callback */
533
static bool
534
contain_mutable_or_user_functions_checker(Oid func_id, void *context)
535
0
{
536
0
  return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
537
0
      func_id >= FirstNormalObjectId);
538
0
}
539
540
/*
541
 * The row filter walker checks if the row filter expression is a "simple
542
 * expression".
543
 *
544
 * It allows only simple or compound expressions such as:
545
 * - (Var Op Const)
546
 * - (Var Op Var)
547
 * - (Var Op Const) AND/OR (Var Op Const)
548
 * - etc
549
 * (where Var is a column of the table this filter belongs to)
550
 *
551
 * The simple expression has the following restrictions:
552
 * - User-defined operators are not allowed;
553
 * - User-defined functions are not allowed;
554
 * - User-defined types are not allowed;
555
 * - User-defined collations are not allowed;
556
 * - Non-immutable built-in functions are not allowed;
557
 * - System columns are not allowed.
558
 *
559
 * NOTES
560
 *
561
 * We don't allow user-defined functions/operators/types/collations because
562
 * (a) if a user drops a user-defined object used in a row filter expression or
563
 * if there is any other error while using it, the logical decoding
564
 * infrastructure won't be able to recover from such an error even if the
565
 * object is recreated again because a historic snapshot is used to evaluate
566
 * the row filter;
567
 * (b) a user-defined function can be used to access tables that could have
568
 * unpleasant results because a historic snapshot is used. That's why only
569
 * immutable built-in functions are allowed in row filter expressions.
570
 *
571
 * We don't allow system columns because currently, we don't have that
572
 * information in the tuple passed to downstream. Also, as we don't replicate
573
 * those to subscribers, there doesn't seem to be a need for a filter on those
574
 * columns.
575
 *
576
 * We can allow other node types after more analysis and testing.
577
 */
578
static bool
579
check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
580
0
{
581
0
  char     *errdetail_msg = NULL;
582
583
0
  if (node == NULL)
584
0
    return false;
585
586
0
  switch (nodeTag(node))
587
0
  {
588
0
    case T_Var:
589
      /* System columns are not allowed. */
590
0
      if (((Var *) node)->varattno < InvalidAttrNumber)
591
0
        errdetail_msg = _("System columns are not allowed.");
592
0
      break;
593
0
    case T_OpExpr:
594
0
    case T_DistinctExpr:
595
0
    case T_NullIfExpr:
596
      /* OK, except user-defined operators are not allowed. */
597
0
      if (((OpExpr *) node)->opno >= FirstNormalObjectId)
598
0
        errdetail_msg = _("User-defined operators are not allowed.");
599
0
      break;
600
0
    case T_ScalarArrayOpExpr:
601
      /* OK, except user-defined operators are not allowed. */
602
0
      if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
603
0
        errdetail_msg = _("User-defined operators are not allowed.");
604
605
      /*
606
       * We don't need to check the hashfuncid and negfuncid of
607
       * ScalarArrayOpExpr as those functions are only built for a
608
       * subquery.
609
       */
610
0
      break;
611
0
    case T_RowCompareExpr:
612
0
      {
613
0
        ListCell   *opid;
614
615
        /* OK, except user-defined operators are not allowed. */
616
0
        foreach(opid, ((RowCompareExpr *) node)->opnos)
617
0
        {
618
0
          if (lfirst_oid(opid) >= FirstNormalObjectId)
619
0
          {
620
0
            errdetail_msg = _("User-defined operators are not allowed.");
621
0
            break;
622
0
          }
623
0
        }
624
0
      }
625
0
      break;
626
0
    case T_Const:
627
0
    case T_FuncExpr:
628
0
    case T_BoolExpr:
629
0
    case T_RelabelType:
630
0
    case T_CollateExpr:
631
0
    case T_CaseExpr:
632
0
    case T_CaseTestExpr:
633
0
    case T_ArrayExpr:
634
0
    case T_RowExpr:
635
0
    case T_CoalesceExpr:
636
0
    case T_MinMaxExpr:
637
0
    case T_XmlExpr:
638
0
    case T_NullTest:
639
0
    case T_BooleanTest:
640
0
    case T_List:
641
      /* OK, supported */
642
0
      break;
643
0
    default:
644
0
      errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
645
0
      break;
646
0
  }
647
648
  /*
649
   * For all the supported nodes, if we haven't already found a problem,
650
   * check the types, functions, and collations used in it.  We check List
651
   * by walking through each element.
652
   */
653
0
  if (!errdetail_msg && !IsA(node, List))
654
0
  {
655
0
    if (exprType(node) >= FirstNormalObjectId)
656
0
      errdetail_msg = _("User-defined types are not allowed.");
657
0
    else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
658
0
                     pstate))
659
0
      errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
660
0
    else if (exprCollation(node) >= FirstNormalObjectId ||
661
0
         exprInputCollation(node) >= FirstNormalObjectId)
662
0
      errdetail_msg = _("User-defined collations are not allowed.");
663
0
  }
664
665
  /*
666
   * If we found a problem in this node, throw error now. Otherwise keep
667
   * going.
668
   */
669
0
  if (errdetail_msg)
670
0
    ereport(ERROR,
671
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672
0
         errmsg("invalid publication WHERE expression"),
673
0
         errdetail_internal("%s", errdetail_msg),
674
0
         parser_errposition(pstate, exprLocation(node))));
675
676
0
  return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
677
0
                  pstate);
678
0
}
679
680
/*
681
 * Check if the row filter expression is a "simple expression".
682
 *
683
 * See check_simple_rowfilter_expr_walker for details.
684
 */
685
static bool
686
check_simple_rowfilter_expr(Node *node, ParseState *pstate)
687
0
{
688
0
  return check_simple_rowfilter_expr_walker(node, pstate);
689
0
}
690
691
/*
692
 * Transform the publication WHERE expression for all the relations in the list,
693
 * ensuring it is coerced to boolean and necessary collation information is
694
 * added if required, and add a new nsitem/RTE for the associated relation to
695
 * the ParseState's namespace list.
696
 *
697
 * Also check the publication row filter expression and throw an error if
698
 * anything not permitted or unexpected is encountered.
699
 */
700
static void
701
TransformPubWhereClauses(List *tables, const char *queryString,
702
             bool pubviaroot)
703
0
{
704
0
  ListCell   *lc;
705
706
0
  foreach(lc, tables)
707
0
  {
708
0
    ParseNamespaceItem *nsitem;
709
0
    Node     *whereclause = NULL;
710
0
    ParseState *pstate;
711
0
    PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
712
713
0
    if (pri->whereClause == NULL)
714
0
      continue;
715
716
    /*
717
     * If the publication doesn't publish changes via the root partitioned
718
     * table, the partition's row filter will be used. So disallow using
719
     * WHERE clause on partitioned table in this case.
720
     */
721
0
    if (!pubviaroot &&
722
0
      pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
723
0
      ereport(ERROR,
724
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
725
0
           errmsg("cannot use publication WHERE clause for relation \"%s\"",
726
0
              RelationGetRelationName(pri->relation)),
727
0
           errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
728
0
                 "publish_via_partition_root")));
729
730
    /*
731
     * A fresh pstate is required so that we only have "this" table in its
732
     * rangetable
733
     */
734
0
    pstate = make_parsestate(NULL);
735
0
    pstate->p_sourcetext = queryString;
736
0
    nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
737
0
                         AccessShareLock, NULL,
738
0
                         false, false);
739
0
    addNSItemToQuery(pstate, nsitem, false, true, true);
740
741
0
    whereclause = transformWhereClause(pstate,
742
0
                       copyObject(pri->whereClause),
743
0
                       EXPR_KIND_WHERE,
744
0
                       "PUBLICATION WHERE");
745
746
    /* Fix up collation information */
747
0
    assign_expr_collations(pstate, whereclause);
748
749
0
    whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
750
751
    /*
752
     * We allow only simple expressions in row filters. See
753
     * check_simple_rowfilter_expr_walker.
754
     */
755
0
    check_simple_rowfilter_expr(whereclause, pstate);
756
757
0
    free_parsestate(pstate);
758
759
0
    pri->whereClause = whereclause;
760
0
  }
761
0
}
762
763
764
/*
765
 * Given a list of tables that are going to be added to a publication,
766
 * verify that they fulfill the necessary preconditions, namely: no tables
767
 * have a column list if any schema is published; and partitioned tables do
768
 * not have column lists if publish_via_partition_root is not set.
769
 *
770
 * 'publish_schema' indicates that the publication contains any TABLES IN
771
 * SCHEMA elements (newly added in this command, or preexisting).
772
 * 'pubviaroot' is the value of publish_via_partition_root.
773
 */
774
static void
775
CheckPubRelationColumnList(char *pubname, List *tables,
776
               bool publish_schema, bool pubviaroot)
777
0
{
778
0
  ListCell   *lc;
779
780
0
  foreach(lc, tables)
781
0
  {
782
0
    PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
783
784
0
    if (pri->columns == NIL)
785
0
      continue;
786
787
    /*
788
     * Disallow specifying column list if any schema is in the
789
     * publication.
790
     *
791
     * XXX We could instead just forbid the case when the publication
792
     * tries to publish the table with a column list and a schema for that
793
     * table. However, if we do that then we need a restriction during
794
     * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
795
     * seem to be a good idea.
796
     */
797
0
    if (publish_schema)
798
0
      ereport(ERROR,
799
0
          errcode(ERRCODE_INVALID_PARAMETER_VALUE),
800
0
          errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
801
0
               get_namespace_name(RelationGetNamespace(pri->relation)),
802
0
               RelationGetRelationName(pri->relation), pubname),
803
0
          errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
804
805
    /*
806
     * If the publication doesn't publish changes via the root partitioned
807
     * table, the partition's column list will be used. So disallow using
808
     * a column list on the partitioned table in this case.
809
     */
810
0
    if (!pubviaroot &&
811
0
      pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
812
0
      ereport(ERROR,
813
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
814
0
           errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
815
0
              get_namespace_name(RelationGetNamespace(pri->relation)),
816
0
              RelationGetRelationName(pri->relation), pubname),
817
0
           errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
818
0
                 "publish_via_partition_root")));
819
0
  }
820
0
}
821
822
/*
823
 * Create new publication.
824
 */
825
ObjectAddress
826
CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
827
{
828
  Relation  rel;
829
  ObjectAddress myself;
830
  Oid     puboid;
831
  bool    nulls[Natts_pg_publication];
832
  Datum   values[Natts_pg_publication];
833
  HeapTuple tup;
834
  bool    publish_given;
835
  PublicationActions pubactions;
836
  bool    publish_via_partition_root_given;
837
  bool    publish_via_partition_root;
838
  bool    publish_generated_columns_given;
839
  char    publish_generated_columns;
840
  AclResult aclresult;
841
  List     *relations = NIL;
842
  List     *schemaidlist = NIL;
843
844
  /* must have CREATE privilege on database */
845
  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
846
  if (aclresult != ACLCHECK_OK)
847
    aclcheck_error(aclresult, OBJECT_DATABASE,
848
             get_database_name(MyDatabaseId));
849
850
  /* FOR ALL TABLES requires superuser */
851
  if (stmt->for_all_tables && !superuser())
852
    ereport(ERROR,
853
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
854
         errmsg("must be superuser to create FOR ALL TABLES publication")));
855
856
  rel = table_open(PublicationRelationId, RowExclusiveLock);
857
858
  /* Check if name is used */
859
  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
860
               CStringGetDatum(stmt->pubname));
861
  if (OidIsValid(puboid))
862
    ereport(ERROR,
863
        (errcode(ERRCODE_DUPLICATE_OBJECT),
864
         errmsg("publication \"%s\" already exists",
865
            stmt->pubname)));
866
867
  /* Form a tuple. */
868
  memset(values, 0, sizeof(values));
869
  memset(nulls, false, sizeof(nulls));
870
871
  values[Anum_pg_publication_pubname - 1] =
872
    DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
873
  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
874
875
  parse_publication_options(pstate,
876
                stmt->options,
877
                &publish_given, &pubactions,
878
                &publish_via_partition_root_given,
879
                &publish_via_partition_root,
880
                &publish_generated_columns_given,
881
                &publish_generated_columns);
882
883
  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
884
                Anum_pg_publication_oid);
885
  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
886
  values[Anum_pg_publication_puballtables - 1] =
887
    BoolGetDatum(stmt->for_all_tables);
888
  values[Anum_pg_publication_pubinsert - 1] =
889
    BoolGetDatum(pubactions.pubinsert);
890
  values[Anum_pg_publication_pubupdate - 1] =
891
    BoolGetDatum(pubactions.pubupdate);
892
  values[Anum_pg_publication_pubdelete - 1] =
893
    BoolGetDatum(pubactions.pubdelete);
894
  values[Anum_pg_publication_pubtruncate - 1] =
895
    BoolGetDatum(pubactions.pubtruncate);
896
  values[Anum_pg_publication_pubviaroot - 1] =
897
    BoolGetDatum(publish_via_partition_root);
898
  values[Anum_pg_publication_pubgencols - 1] =
899
    CharGetDatum(publish_generated_columns);
900
901
  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
902
903
  /* Insert tuple into catalog. */
904
  CatalogTupleInsert(rel, tup);
905
  heap_freetuple(tup);
906
907
  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
908
909
  ObjectAddressSet(myself, PublicationRelationId, puboid);
910
911
  /* Make the changes visible. */
912
  CommandCounterIncrement();
913
914
  /* Associate objects with the publication. */
915
  if (stmt->for_all_tables)
916
  {
917
    /* Invalidate relcache so that publication info is rebuilt. */
918
    CacheInvalidateRelcacheAll();
919
  }
920
  else
921
  {
922
    ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
923
                   &schemaidlist);
924
925
    /* FOR TABLES IN SCHEMA requires superuser */
926
    if (schemaidlist != NIL && !superuser())
927
      ereport(ERROR,
928
          errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
929
          errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
930
931
    if (relations != NIL)
932
    {
933
      List     *rels;
934
935
      rels = OpenTableList(relations);
936
      TransformPubWhereClauses(rels, pstate->p_sourcetext,
937
                   publish_via_partition_root);
938
939
      CheckPubRelationColumnList(stmt->pubname, rels,
940
                     schemaidlist != NIL,
941
                     publish_via_partition_root);
942
943
      PublicationAddTables(puboid, rels, true, NULL);
944
      CloseTableList(rels);
945
    }
946
947
    if (schemaidlist != NIL)
948
    {
949
      /*
950
       * Schema lock is held until the publication is created to prevent
951
       * concurrent schema deletion.
952
       */
953
      LockSchemaList(schemaidlist);
954
      PublicationAddSchemas(puboid, schemaidlist, true, NULL);
955
    }
956
  }
957
958
  table_close(rel, RowExclusiveLock);
959
960
  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
961
962
  if (wal_level != WAL_LEVEL_LOGICAL)
963
    ereport(WARNING,
964
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
965
         errmsg("\"wal_level\" is insufficient to publish logical changes"),
966
         errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
967
968
  return myself;
969
}
970
971
/*
972
 * Change options of a publication.
973
 */
974
static void
975
AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
976
            Relation rel, HeapTuple tup)
977
0
{
978
0
  bool    nulls[Natts_pg_publication];
979
0
  bool    replaces[Natts_pg_publication];
980
0
  Datum   values[Natts_pg_publication];
981
0
  bool    publish_given;
982
0
  PublicationActions pubactions;
983
0
  bool    publish_via_partition_root_given;
984
0
  bool    publish_via_partition_root;
985
0
  bool    publish_generated_columns_given;
986
0
  char    publish_generated_columns;
987
0
  ObjectAddress obj;
988
0
  Form_pg_publication pubform;
989
0
  List     *root_relids = NIL;
990
0
  ListCell   *lc;
991
992
0
  parse_publication_options(pstate,
993
0
                stmt->options,
994
0
                &publish_given, &pubactions,
995
0
                &publish_via_partition_root_given,
996
0
                &publish_via_partition_root,
997
0
                &publish_generated_columns_given,
998
0
                &publish_generated_columns);
999
1000
0
  pubform = (Form_pg_publication) GETSTRUCT(tup);
1001
1002
  /*
1003
   * If the publication doesn't publish changes via the root partitioned
1004
   * table, the partition's row filter and column list will be used. So
1005
   * disallow using WHERE clause and column lists on partitioned table in
1006
   * this case.
1007
   */
1008
0
  if (!pubform->puballtables && publish_via_partition_root_given &&
1009
0
    !publish_via_partition_root)
1010
0
  {
1011
    /*
1012
     * Lock the publication so nobody else can do anything with it. This
1013
     * prevents concurrent alter to add partitioned table(s) with WHERE
1014
     * clause(s) and/or column lists which we don't allow when not
1015
     * publishing via root.
1016
     */
1017
0
    LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1018
0
               AccessShareLock);
1019
1020
0
    root_relids = GetPublicationRelations(pubform->oid,
1021
0
                        PUBLICATION_PART_ROOT);
1022
1023
0
    foreach(lc, root_relids)
1024
0
    {
1025
0
      Oid     relid = lfirst_oid(lc);
1026
0
      HeapTuple rftuple;
1027
0
      char    relkind;
1028
0
      char     *relname;
1029
0
      bool    has_rowfilter;
1030
0
      bool    has_collist;
1031
1032
      /*
1033
       * Beware: we don't have lock on the relations, so cope silently
1034
       * with the cache lookups returning NULL.
1035
       */
1036
1037
0
      rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1038
0
                    ObjectIdGetDatum(relid),
1039
0
                    ObjectIdGetDatum(pubform->oid));
1040
0
      if (!HeapTupleIsValid(rftuple))
1041
0
        continue;
1042
0
      has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1043
0
      has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1044
0
      if (!has_rowfilter && !has_collist)
1045
0
      {
1046
0
        ReleaseSysCache(rftuple);
1047
0
        continue;
1048
0
      }
1049
1050
0
      relkind = get_rel_relkind(relid);
1051
0
      if (relkind != RELKIND_PARTITIONED_TABLE)
1052
0
      {
1053
0
        ReleaseSysCache(rftuple);
1054
0
        continue;
1055
0
      }
1056
0
      relname = get_rel_name(relid);
1057
0
      if (relname == NULL) /* table concurrently dropped */
1058
0
      {
1059
0
        ReleaseSysCache(rftuple);
1060
0
        continue;
1061
0
      }
1062
1063
0
      if (has_rowfilter)
1064
0
        ereport(ERROR,
1065
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1066
0
             errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1067
0
                "publish_via_partition_root",
1068
0
                stmt->pubname),
1069
0
             errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1070
0
                   relname, "publish_via_partition_root")));
1071
0
      Assert(has_collist);
1072
0
      ereport(ERROR,
1073
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1074
0
           errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1075
0
              "publish_via_partition_root",
1076
0
              stmt->pubname),
1077
0
           errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1078
0
                 relname, "publish_via_partition_root")));
1079
0
    }
1080
0
  }
1081
1082
  /* Everything ok, form a new tuple. */
1083
0
  memset(values, 0, sizeof(values));
1084
0
  memset(nulls, false, sizeof(nulls));
1085
0
  memset(replaces, false, sizeof(replaces));
1086
1087
0
  if (publish_given)
1088
0
  {
1089
0
    values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1090
0
    replaces[Anum_pg_publication_pubinsert - 1] = true;
1091
1092
0
    values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1093
0
    replaces[Anum_pg_publication_pubupdate - 1] = true;
1094
1095
0
    values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1096
0
    replaces[Anum_pg_publication_pubdelete - 1] = true;
1097
1098
0
    values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1099
0
    replaces[Anum_pg_publication_pubtruncate - 1] = true;
1100
0
  }
1101
1102
0
  if (publish_via_partition_root_given)
1103
0
  {
1104
0
    values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1105
0
    replaces[Anum_pg_publication_pubviaroot - 1] = true;
1106
0
  }
1107
1108
0
  if (publish_generated_columns_given)
1109
0
  {
1110
0
    values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1111
0
    replaces[Anum_pg_publication_pubgencols - 1] = true;
1112
0
  }
1113
1114
0
  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1115
0
              replaces);
1116
1117
  /* Update the catalog. */
1118
0
  CatalogTupleUpdate(rel, &tup->t_self, tup);
1119
1120
0
  CommandCounterIncrement();
1121
1122
0
  pubform = (Form_pg_publication) GETSTRUCT(tup);
1123
1124
  /* Invalidate the relcache. */
1125
0
  if (pubform->puballtables)
1126
0
  {
1127
0
    CacheInvalidateRelcacheAll();
1128
0
  }
1129
0
  else
1130
0
  {
1131
0
    List     *relids = NIL;
1132
0
    List     *schemarelids = NIL;
1133
1134
    /*
1135
     * For any partitioned tables contained in the publication, we must
1136
     * invalidate all partitions contained in the respective partition
1137
     * trees, not just those explicitly mentioned in the publication.
1138
     */
1139
0
    if (root_relids == NIL)
1140
0
      relids = GetPublicationRelations(pubform->oid,
1141
0
                       PUBLICATION_PART_ALL);
1142
0
    else
1143
0
    {
1144
      /*
1145
       * We already got tables explicitly mentioned in the publication.
1146
       * Now get all partitions for the partitioned table in the list.
1147
       */
1148
0
      foreach(lc, root_relids)
1149
0
        relids = GetPubPartitionOptionRelations(relids,
1150
0
                            PUBLICATION_PART_ALL,
1151
0
                            lfirst_oid(lc));
1152
0
    }
1153
1154
0
    schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1155
0
                            PUBLICATION_PART_ALL);
1156
0
    relids = list_concat_unique_oid(relids, schemarelids);
1157
1158
0
    InvalidatePublicationRels(relids);
1159
0
  }
1160
1161
0
  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1162
0
  EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1163
0
                   (Node *) stmt);
1164
1165
0
  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1166
0
}
1167
1168
/*
1169
 * Invalidate the relations.
1170
 */
1171
void
1172
InvalidatePublicationRels(List *relids)
1173
0
{
1174
  /*
1175
   * We don't want to send too many individual messages, at some point it's
1176
   * cheaper to just reset whole relcache.
1177
   */
1178
0
  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1179
0
  {
1180
0
    ListCell   *lc;
1181
1182
0
    foreach(lc, relids)
1183
0
      CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1184
0
  }
1185
0
  else
1186
0
    CacheInvalidateRelcacheAll();
1187
0
}
1188
1189
/*
1190
 * Add or remove table to/from publication.
1191
 */
1192
static void
1193
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1194
             List *tables, const char *queryString,
1195
             bool publish_schema)
1196
0
{
1197
0
  List     *rels = NIL;
1198
0
  Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1199
0
  Oid     pubid = pubform->oid;
1200
1201
  /*
1202
   * Nothing to do if no objects, except in SET: for that it is quite
1203
   * possible that user has not specified any tables in which case we need
1204
   * to remove all the existing tables.
1205
   */
1206
0
  if (!tables && stmt->action != AP_SetObjects)
1207
0
    return;
1208
1209
0
  rels = OpenTableList(tables);
1210
1211
0
  if (stmt->action == AP_AddObjects)
1212
0
  {
1213
0
    TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1214
1215
0
    publish_schema |= is_schema_publication(pubid);
1216
1217
0
    CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1218
0
                   pubform->pubviaroot);
1219
1220
0
    PublicationAddTables(pubid, rels, false, stmt);
1221
0
  }
1222
0
  else if (stmt->action == AP_DropObjects)
1223
0
    PublicationDropTables(pubid, rels, false);
1224
0
  else            /* AP_SetObjects */
1225
0
  {
1226
0
    List     *oldrelids = GetPublicationRelations(pubid,
1227
0
                            PUBLICATION_PART_ROOT);
1228
0
    List     *delrels = NIL;
1229
0
    ListCell   *oldlc;
1230
1231
0
    TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1232
1233
0
    CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1234
0
                   pubform->pubviaroot);
1235
1236
    /*
1237
     * To recreate the relation list for the publication, look for
1238
     * existing relations that do not need to be dropped.
1239
     */
1240
0
    foreach(oldlc, oldrelids)
1241
0
    {
1242
0
      Oid     oldrelid = lfirst_oid(oldlc);
1243
0
      ListCell   *newlc;
1244
0
      PublicationRelInfo *oldrel;
1245
0
      bool    found = false;
1246
0
      HeapTuple rftuple;
1247
0
      Node     *oldrelwhereclause = NULL;
1248
0
      Bitmapset  *oldcolumns = NULL;
1249
1250
      /* look up the cache for the old relmap */
1251
0
      rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1252
0
                    ObjectIdGetDatum(oldrelid),
1253
0
                    ObjectIdGetDatum(pubid));
1254
1255
      /*
1256
       * See if the existing relation currently has a WHERE clause or a
1257
       * column list. We need to compare those too.
1258
       */
1259
0
      if (HeapTupleIsValid(rftuple))
1260
0
      {
1261
0
        bool    isnull = true;
1262
0
        Datum   whereClauseDatum;
1263
0
        Datum   columnListDatum;
1264
1265
        /* Load the WHERE clause for this table. */
1266
0
        whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1267
0
                           Anum_pg_publication_rel_prqual,
1268
0
                           &isnull);
1269
0
        if (!isnull)
1270
0
          oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1271
1272
        /* Transform the int2vector column list to a bitmap. */
1273
0
        columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1274
0
                          Anum_pg_publication_rel_prattrs,
1275
0
                          &isnull);
1276
1277
0
        if (!isnull)
1278
0
          oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1279
1280
0
        ReleaseSysCache(rftuple);
1281
0
      }
1282
1283
0
      foreach(newlc, rels)
1284
0
      {
1285
0
        PublicationRelInfo *newpubrel;
1286
0
        Oid     newrelid;
1287
0
        Bitmapset  *newcolumns = NULL;
1288
1289
0
        newpubrel = (PublicationRelInfo *) lfirst(newlc);
1290
0
        newrelid = RelationGetRelid(newpubrel->relation);
1291
1292
        /*
1293
         * Validate the column list.  If the column list or WHERE
1294
         * clause changes, then the validation done here will be
1295
         * duplicated inside PublicationAddTables().  The validation
1296
         * is cheap enough that that seems harmless.
1297
         */
1298
0
        newcolumns = pub_collist_validate(newpubrel->relation,
1299
0
                          newpubrel->columns);
1300
1301
        /*
1302
         * Check if any of the new set of relations matches with the
1303
         * existing relations in the publication. Additionally, if the
1304
         * relation has an associated WHERE clause, check the WHERE
1305
         * expressions also match. Same for the column list. Drop the
1306
         * rest.
1307
         */
1308
0
        if (newrelid == oldrelid)
1309
0
        {
1310
0
          if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1311
0
            bms_equal(oldcolumns, newcolumns))
1312
0
          {
1313
0
            found = true;
1314
0
            break;
1315
0
          }
1316
0
        }
1317
0
      }
1318
1319
      /*
1320
       * Add the non-matched relations to a list so that they can be
1321
       * dropped.
1322
       */
1323
0
      if (!found)
1324
0
      {
1325
0
        oldrel = palloc(sizeof(PublicationRelInfo));
1326
0
        oldrel->whereClause = NULL;
1327
0
        oldrel->columns = NIL;
1328
0
        oldrel->relation = table_open(oldrelid,
1329
0
                        ShareUpdateExclusiveLock);
1330
0
        delrels = lappend(delrels, oldrel);
1331
0
      }
1332
0
    }
1333
1334
    /* And drop them. */
1335
0
    PublicationDropTables(pubid, delrels, true);
1336
1337
    /*
1338
     * Don't bother calculating the difference for adding, we'll catch and
1339
     * skip existing ones when doing catalog update.
1340
     */
1341
0
    PublicationAddTables(pubid, rels, true, stmt);
1342
1343
0
    CloseTableList(delrels);
1344
0
  }
1345
1346
0
  CloseTableList(rels);
1347
0
}
1348
1349
/*
1350
 * Alter the publication schemas.
1351
 *
1352
 * Add or remove schemas to/from publication.
1353
 */
1354
static void
1355
AlterPublicationSchemas(AlterPublicationStmt *stmt,
1356
            HeapTuple tup, List *schemaidlist)
1357
0
{
1358
0
  Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1359
1360
  /*
1361
   * Nothing to do if no objects, except in SET: for that it is quite
1362
   * possible that user has not specified any schemas in which case we need
1363
   * to remove all the existing schemas.
1364
   */
1365
0
  if (!schemaidlist && stmt->action != AP_SetObjects)
1366
0
    return;
1367
1368
  /*
1369
   * Schema lock is held until the publication is altered to prevent
1370
   * concurrent schema deletion.
1371
   */
1372
0
  LockSchemaList(schemaidlist);
1373
0
  if (stmt->action == AP_AddObjects)
1374
0
  {
1375
0
    ListCell   *lc;
1376
0
    List     *reloids;
1377
1378
0
    reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1379
1380
0
    foreach(lc, reloids)
1381
0
    {
1382
0
      HeapTuple coltuple;
1383
1384
0
      coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1385
0
                     ObjectIdGetDatum(lfirst_oid(lc)),
1386
0
                     ObjectIdGetDatum(pubform->oid));
1387
1388
0
      if (!HeapTupleIsValid(coltuple))
1389
0
        continue;
1390
1391
      /*
1392
       * Disallow adding schema if column list is already part of the
1393
       * publication. See CheckPubRelationColumnList.
1394
       */
1395
0
      if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1396
0
        ereport(ERROR,
1397
0
            errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1398
0
            errmsg("cannot add schema to publication \"%s\"",
1399
0
                 stmt->pubname),
1400
0
            errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1401
1402
0
      ReleaseSysCache(coltuple);
1403
0
    }
1404
1405
0
    PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1406
0
  }
1407
0
  else if (stmt->action == AP_DropObjects)
1408
0
    PublicationDropSchemas(pubform->oid, schemaidlist, false);
1409
0
  else            /* AP_SetObjects */
1410
0
  {
1411
0
    List     *oldschemaids = GetPublicationSchemas(pubform->oid);
1412
0
    List     *delschemas = NIL;
1413
1414
    /* Identify which schemas should be dropped */
1415
0
    delschemas = list_difference_oid(oldschemaids, schemaidlist);
1416
1417
    /*
1418
     * Schema lock is held until the publication is altered to prevent
1419
     * concurrent schema deletion.
1420
     */
1421
0
    LockSchemaList(delschemas);
1422
1423
    /* And drop them */
1424
0
    PublicationDropSchemas(pubform->oid, delschemas, true);
1425
1426
    /*
1427
     * Don't bother calculating the difference for adding, we'll catch and
1428
     * skip existing ones when doing catalog update.
1429
     */
1430
0
    PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1431
0
  }
1432
0
}
1433
1434
/*
1435
 * Check if relations and schemas can be in a given publication and throw
1436
 * appropriate error if not.
1437
 */
1438
static void
1439
CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1440
            List *tables, List *schemaidlist)
1441
0
{
1442
0
  Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1443
1444
0
  if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1445
0
    schemaidlist && !superuser())
1446
0
    ereport(ERROR,
1447
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1448
0
         errmsg("must be superuser to add or set schemas")));
1449
1450
  /*
1451
   * Check that user is allowed to manipulate the publication tables in
1452
   * schema
1453
   */
1454
0
  if (schemaidlist && pubform->puballtables)
1455
0
    ereport(ERROR,
1456
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1457
0
         errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1458
0
            NameStr(pubform->pubname)),
1459
0
         errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1460
1461
  /* Check that user is allowed to manipulate the publication tables. */
1462
0
  if (tables && pubform->puballtables)
1463
0
    ereport(ERROR,
1464
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1465
0
         errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1466
0
            NameStr(pubform->pubname)),
1467
0
         errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1468
0
}
1469
1470
/*
1471
 * Alter the existing publication.
1472
 *
1473
 * This is dispatcher function for AlterPublicationOptions,
1474
 * AlterPublicationSchemas and AlterPublicationTables.
1475
 */
1476
void
1477
AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1478
0
{
1479
0
  Relation  rel;
1480
0
  HeapTuple tup;
1481
0
  Form_pg_publication pubform;
1482
1483
0
  rel = table_open(PublicationRelationId, RowExclusiveLock);
1484
1485
0
  tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1486
0
                CStringGetDatum(stmt->pubname));
1487
1488
0
  if (!HeapTupleIsValid(tup))
1489
0
    ereport(ERROR,
1490
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
1491
0
         errmsg("publication \"%s\" does not exist",
1492
0
            stmt->pubname)));
1493
1494
0
  pubform = (Form_pg_publication) GETSTRUCT(tup);
1495
1496
  /* must be owner */
1497
0
  if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1498
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1499
0
             stmt->pubname);
1500
1501
0
  if (stmt->options)
1502
0
    AlterPublicationOptions(pstate, stmt, rel, tup);
1503
0
  else
1504
0
  {
1505
0
    List     *relations = NIL;
1506
0
    List     *schemaidlist = NIL;
1507
0
    Oid     pubid = pubform->oid;
1508
1509
0
    ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1510
0
                   &schemaidlist);
1511
1512
0
    CheckAlterPublication(stmt, tup, relations, schemaidlist);
1513
1514
0
    heap_freetuple(tup);
1515
1516
    /* Lock the publication so nobody else can do anything with it. */
1517
0
    LockDatabaseObject(PublicationRelationId, pubid, 0,
1518
0
               AccessExclusiveLock);
1519
1520
    /*
1521
     * It is possible that by the time we acquire the lock on publication,
1522
     * concurrent DDL has removed it. We can test this by checking the
1523
     * existence of publication. We get the tuple again to avoid the risk
1524
     * of any publication option getting changed.
1525
     */
1526
0
    tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1527
0
    if (!HeapTupleIsValid(tup))
1528
0
      ereport(ERROR,
1529
0
          errcode(ERRCODE_UNDEFINED_OBJECT),
1530
0
          errmsg("publication \"%s\" does not exist",
1531
0
               stmt->pubname));
1532
1533
0
    AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1534
0
                 schemaidlist != NIL);
1535
0
    AlterPublicationSchemas(stmt, tup, schemaidlist);
1536
0
  }
1537
1538
  /* Cleanup. */
1539
0
  heap_freetuple(tup);
1540
0
  table_close(rel, RowExclusiveLock);
1541
0
}
1542
1543
/*
1544
 * Remove relation from publication by mapping OID.
1545
 */
1546
void
1547
RemovePublicationRelById(Oid proid)
1548
0
{
1549
0
  Relation  rel;
1550
0
  HeapTuple tup;
1551
0
  Form_pg_publication_rel pubrel;
1552
0
  List     *relids = NIL;
1553
1554
0
  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1555
1556
0
  tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1557
1558
0
  if (!HeapTupleIsValid(tup))
1559
0
    elog(ERROR, "cache lookup failed for publication table %u",
1560
0
       proid);
1561
1562
0
  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1563
1564
  /*
1565
   * Invalidate relcache so that publication info is rebuilt.
1566
   *
1567
   * For the partitioned tables, we must invalidate all partitions contained
1568
   * in the respective partition hierarchies, not just the one explicitly
1569
   * mentioned in the publication. This is required because we implicitly
1570
   * publish the child tables when the parent table is published.
1571
   */
1572
0
  relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1573
0
                      pubrel->prrelid);
1574
1575
0
  InvalidatePublicationRels(relids);
1576
1577
0
  CatalogTupleDelete(rel, &tup->t_self);
1578
1579
0
  ReleaseSysCache(tup);
1580
1581
0
  table_close(rel, RowExclusiveLock);
1582
0
}
1583
1584
/*
1585
 * Remove the publication by mapping OID.
1586
 */
1587
void
1588
RemovePublicationById(Oid pubid)
1589
0
{
1590
0
  Relation  rel;
1591
0
  HeapTuple tup;
1592
0
  Form_pg_publication pubform;
1593
1594
0
  rel = table_open(PublicationRelationId, RowExclusiveLock);
1595
1596
0
  tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1597
0
  if (!HeapTupleIsValid(tup))
1598
0
    elog(ERROR, "cache lookup failed for publication %u", pubid);
1599
1600
0
  pubform = (Form_pg_publication) GETSTRUCT(tup);
1601
1602
  /* Invalidate relcache so that publication info is rebuilt. */
1603
0
  if (pubform->puballtables)
1604
0
    CacheInvalidateRelcacheAll();
1605
1606
0
  CatalogTupleDelete(rel, &tup->t_self);
1607
1608
0
  ReleaseSysCache(tup);
1609
1610
0
  table_close(rel, RowExclusiveLock);
1611
0
}
1612
1613
/*
1614
 * Remove schema from publication by mapping OID.
1615
 */
1616
void
1617
RemovePublicationSchemaById(Oid psoid)
1618
0
{
1619
0
  Relation  rel;
1620
0
  HeapTuple tup;
1621
0
  List     *schemaRels = NIL;
1622
0
  Form_pg_publication_namespace pubsch;
1623
1624
0
  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1625
1626
0
  tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1627
1628
0
  if (!HeapTupleIsValid(tup))
1629
0
    elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1630
1631
0
  pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1632
1633
  /*
1634
   * Invalidate relcache so that publication info is rebuilt. See
1635
   * RemovePublicationRelById for why we need to consider all the
1636
   * partitions.
1637
   */
1638
0
  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1639
0
                         PUBLICATION_PART_ALL);
1640
0
  InvalidatePublicationRels(schemaRels);
1641
1642
0
  CatalogTupleDelete(rel, &tup->t_self);
1643
1644
0
  ReleaseSysCache(tup);
1645
1646
0
  table_close(rel, RowExclusiveLock);
1647
0
}
1648
1649
/*
1650
 * Open relations specified by a PublicationTable list.
1651
 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1652
 * add them to a publication.
1653
 */
1654
static List *
1655
OpenTableList(List *tables)
1656
0
{
1657
0
  List     *relids = NIL;
1658
0
  List     *rels = NIL;
1659
0
  ListCell   *lc;
1660
0
  List     *relids_with_rf = NIL;
1661
0
  List     *relids_with_collist = NIL;
1662
1663
  /*
1664
   * Open, share-lock, and check all the explicitly-specified relations
1665
   */
1666
0
  foreach(lc, tables)
1667
0
  {
1668
0
    PublicationTable *t = lfirst_node(PublicationTable, lc);
1669
0
    bool    recurse = t->relation->inh;
1670
0
    Relation  rel;
1671
0
    Oid     myrelid;
1672
0
    PublicationRelInfo *pub_rel;
1673
1674
    /* Allow query cancel in case this takes a long time */
1675
0
    CHECK_FOR_INTERRUPTS();
1676
1677
0
    rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1678
0
    myrelid = RelationGetRelid(rel);
1679
1680
    /*
1681
     * Filter out duplicates if user specifies "foo, foo".
1682
     *
1683
     * Note that this algorithm is known to not be very efficient (O(N^2))
1684
     * but given that it only works on list of tables given to us by user
1685
     * it's deemed acceptable.
1686
     */
1687
0
    if (list_member_oid(relids, myrelid))
1688
0
    {
1689
      /* Disallow duplicate tables if there are any with row filters. */
1690
0
      if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1691
0
        ereport(ERROR,
1692
0
            (errcode(ERRCODE_DUPLICATE_OBJECT),
1693
0
             errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1694
0
                RelationGetRelationName(rel))));
1695
1696
      /* Disallow duplicate tables if there are any with column lists. */
1697
0
      if (t->columns || list_member_oid(relids_with_collist, myrelid))
1698
0
        ereport(ERROR,
1699
0
            (errcode(ERRCODE_DUPLICATE_OBJECT),
1700
0
             errmsg("conflicting or redundant column lists for table \"%s\"",
1701
0
                RelationGetRelationName(rel))));
1702
1703
0
      table_close(rel, ShareUpdateExclusiveLock);
1704
0
      continue;
1705
0
    }
1706
1707
0
    pub_rel = palloc(sizeof(PublicationRelInfo));
1708
0
    pub_rel->relation = rel;
1709
0
    pub_rel->whereClause = t->whereClause;
1710
0
    pub_rel->columns = t->columns;
1711
0
    rels = lappend(rels, pub_rel);
1712
0
    relids = lappend_oid(relids, myrelid);
1713
1714
0
    if (t->whereClause)
1715
0
      relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1716
1717
0
    if (t->columns)
1718
0
      relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1719
1720
    /*
1721
     * Add children of this rel, if requested, so that they too are added
1722
     * to the publication.  A partitioned table can't have any inheritance
1723
     * children other than its partitions, which need not be explicitly
1724
     * added to the publication.
1725
     */
1726
0
    if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1727
0
    {
1728
0
      List     *children;
1729
0
      ListCell   *child;
1730
1731
0
      children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1732
0
                       NULL);
1733
1734
0
      foreach(child, children)
1735
0
      {
1736
0
        Oid     childrelid = lfirst_oid(child);
1737
1738
        /* Allow query cancel in case this takes a long time */
1739
0
        CHECK_FOR_INTERRUPTS();
1740
1741
        /*
1742
         * Skip duplicates if user specified both parent and child
1743
         * tables.
1744
         */
1745
0
        if (list_member_oid(relids, childrelid))
1746
0
        {
1747
          /*
1748
           * We don't allow to specify row filter for both parent
1749
           * and child table at the same time as it is not very
1750
           * clear which one should be given preference.
1751
           */
1752
0
          if (childrelid != myrelid &&
1753
0
            (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1754
0
            ereport(ERROR,
1755
0
                (errcode(ERRCODE_DUPLICATE_OBJECT),
1756
0
                 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1757
0
                    RelationGetRelationName(rel))));
1758
1759
          /*
1760
           * We don't allow to specify column list for both parent
1761
           * and child table at the same time as it is not very
1762
           * clear which one should be given preference.
1763
           */
1764
0
          if (childrelid != myrelid &&
1765
0
            (t->columns || list_member_oid(relids_with_collist, childrelid)))
1766
0
            ereport(ERROR,
1767
0
                (errcode(ERRCODE_DUPLICATE_OBJECT),
1768
0
                 errmsg("conflicting or redundant column lists for table \"%s\"",
1769
0
                    RelationGetRelationName(rel))));
1770
1771
0
          continue;
1772
0
        }
1773
1774
        /* find_all_inheritors already got lock */
1775
0
        rel = table_open(childrelid, NoLock);
1776
0
        pub_rel = palloc(sizeof(PublicationRelInfo));
1777
0
        pub_rel->relation = rel;
1778
        /* child inherits WHERE clause from parent */
1779
0
        pub_rel->whereClause = t->whereClause;
1780
1781
        /* child inherits column list from parent */
1782
0
        pub_rel->columns = t->columns;
1783
0
        rels = lappend(rels, pub_rel);
1784
0
        relids = lappend_oid(relids, childrelid);
1785
1786
0
        if (t->whereClause)
1787
0
          relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1788
1789
0
        if (t->columns)
1790
0
          relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1791
0
      }
1792
0
    }
1793
0
  }
1794
1795
0
  list_free(relids);
1796
0
  list_free(relids_with_rf);
1797
1798
0
  return rels;
1799
0
}
1800
1801
/*
1802
 * Close all relations in the list.
1803
 */
1804
static void
1805
CloseTableList(List *rels)
1806
0
{
1807
0
  ListCell   *lc;
1808
1809
0
  foreach(lc, rels)
1810
0
  {
1811
0
    PublicationRelInfo *pub_rel;
1812
1813
0
    pub_rel = (PublicationRelInfo *) lfirst(lc);
1814
0
    table_close(pub_rel->relation, NoLock);
1815
0
  }
1816
1817
0
  list_free_deep(rels);
1818
0
}
1819
1820
/*
1821
 * Lock the schemas specified in the schema list in AccessShareLock mode in
1822
 * order to prevent concurrent schema deletion.
1823
 */
1824
static void
1825
LockSchemaList(List *schemalist)
1826
0
{
1827
0
  ListCell   *lc;
1828
1829
0
  foreach(lc, schemalist)
1830
0
  {
1831
0
    Oid     schemaid = lfirst_oid(lc);
1832
1833
    /* Allow query cancel in case this takes a long time */
1834
0
    CHECK_FOR_INTERRUPTS();
1835
0
    LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1836
1837
    /*
1838
     * It is possible that by the time we acquire the lock on schema,
1839
     * concurrent DDL has removed it. We can test this by checking the
1840
     * existence of schema.
1841
     */
1842
0
    if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1843
0
      ereport(ERROR,
1844
0
          errcode(ERRCODE_UNDEFINED_SCHEMA),
1845
0
          errmsg("schema with OID %u does not exist", schemaid));
1846
0
  }
1847
0
}
1848
1849
/*
1850
 * Add listed tables to the publication.
1851
 */
1852
static void
1853
PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1854
           AlterPublicationStmt *stmt)
1855
0
{
1856
0
  ListCell   *lc;
1857
1858
0
  foreach(lc, rels)
1859
0
  {
1860
0
    PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1861
0
    Relation  rel = pub_rel->relation;
1862
0
    ObjectAddress obj;
1863
1864
    /* Must be owner of the table or superuser. */
1865
0
    if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1866
0
      aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1867
0
               RelationGetRelationName(rel));
1868
1869
0
    obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1870
0
    if (stmt)
1871
0
    {
1872
0
      EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1873
0
                       (Node *) stmt);
1874
1875
0
      InvokeObjectPostCreateHook(PublicationRelRelationId,
1876
0
                     obj.objectId, 0);
1877
0
    }
1878
0
  }
1879
0
}
1880
1881
/*
1882
 * Remove listed tables from the publication.
1883
 */
1884
static void
1885
PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1886
0
{
1887
0
  ObjectAddress obj;
1888
0
  ListCell   *lc;
1889
0
  Oid     prid;
1890
1891
0
  foreach(lc, rels)
1892
0
  {
1893
0
    PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1894
0
    Relation  rel = pubrel->relation;
1895
0
    Oid     relid = RelationGetRelid(rel);
1896
1897
0
    if (pubrel->columns)
1898
0
      ereport(ERROR,
1899
0
          errcode(ERRCODE_SYNTAX_ERROR),
1900
0
          errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1901
1902
0
    prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1903
0
                 ObjectIdGetDatum(relid),
1904
0
                 ObjectIdGetDatum(pubid));
1905
0
    if (!OidIsValid(prid))
1906
0
    {
1907
0
      if (missing_ok)
1908
0
        continue;
1909
1910
0
      ereport(ERROR,
1911
0
          (errcode(ERRCODE_UNDEFINED_OBJECT),
1912
0
           errmsg("relation \"%s\" is not part of the publication",
1913
0
              RelationGetRelationName(rel))));
1914
0
    }
1915
1916
0
    if (pubrel->whereClause)
1917
0
      ereport(ERROR,
1918
0
          (errcode(ERRCODE_SYNTAX_ERROR),
1919
0
           errmsg("cannot use a WHERE clause when removing a table from a publication")));
1920
1921
0
    ObjectAddressSet(obj, PublicationRelRelationId, prid);
1922
0
    performDeletion(&obj, DROP_CASCADE, 0);
1923
0
  }
1924
0
}
1925
1926
/*
1927
 * Add listed schemas to the publication.
1928
 */
1929
static void
1930
PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1931
            AlterPublicationStmt *stmt)
1932
0
{
1933
0
  ListCell   *lc;
1934
1935
0
  foreach(lc, schemas)
1936
0
  {
1937
0
    Oid     schemaid = lfirst_oid(lc);
1938
0
    ObjectAddress obj;
1939
1940
0
    obj = publication_add_schema(pubid, schemaid, if_not_exists);
1941
0
    if (stmt)
1942
0
    {
1943
0
      EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1944
0
                       (Node *) stmt);
1945
1946
0
      InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1947
0
                     obj.objectId, 0);
1948
0
    }
1949
0
  }
1950
0
}
1951
1952
/*
1953
 * Remove listed schemas from the publication.
1954
 */
1955
static void
1956
PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1957
0
{
1958
0
  ObjectAddress obj;
1959
0
  ListCell   *lc;
1960
0
  Oid     psid;
1961
1962
0
  foreach(lc, schemas)
1963
0
  {
1964
0
    Oid     schemaid = lfirst_oid(lc);
1965
1966
0
    psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1967
0
                 Anum_pg_publication_namespace_oid,
1968
0
                 ObjectIdGetDatum(schemaid),
1969
0
                 ObjectIdGetDatum(pubid));
1970
0
    if (!OidIsValid(psid))
1971
0
    {
1972
0
      if (missing_ok)
1973
0
        continue;
1974
1975
0
      ereport(ERROR,
1976
0
          (errcode(ERRCODE_UNDEFINED_OBJECT),
1977
0
           errmsg("tables from schema \"%s\" are not part of the publication",
1978
0
              get_namespace_name(schemaid))));
1979
0
    }
1980
1981
0
    ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1982
0
    performDeletion(&obj, DROP_CASCADE, 0);
1983
0
  }
1984
0
}
1985
1986
/*
1987
 * Internal workhorse for changing a publication owner
1988
 */
1989
static void
1990
AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1991
0
{
1992
0
  Form_pg_publication form;
1993
1994
0
  form = (Form_pg_publication) GETSTRUCT(tup);
1995
1996
0
  if (form->pubowner == newOwnerId)
1997
0
    return;
1998
1999
0
  if (!superuser())
2000
0
  {
2001
0
    AclResult aclresult;
2002
2003
    /* Must be owner */
2004
0
    if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2005
0
      aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
2006
0
               NameStr(form->pubname));
2007
2008
    /* Must be able to become new owner */
2009
0
    check_can_set_role(GetUserId(), newOwnerId);
2010
2011
    /* New owner must have CREATE privilege on database */
2012
0
    aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2013
0
    if (aclresult != ACLCHECK_OK)
2014
0
      aclcheck_error(aclresult, OBJECT_DATABASE,
2015
0
               get_database_name(MyDatabaseId));
2016
2017
0
    if (form->puballtables && !superuser_arg(newOwnerId))
2018
0
      ereport(ERROR,
2019
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2020
0
           errmsg("permission denied to change owner of publication \"%s\"",
2021
0
              NameStr(form->pubname)),
2022
0
           errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
2023
2024
0
    if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
2025
0
      ereport(ERROR,
2026
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2027
0
           errmsg("permission denied to change owner of publication \"%s\"",
2028
0
              NameStr(form->pubname)),
2029
0
           errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
2030
0
  }
2031
2032
0
  form->pubowner = newOwnerId;
2033
0
  CatalogTupleUpdate(rel, &tup->t_self, tup);
2034
2035
  /* Update owner dependency reference */
2036
0
  changeDependencyOnOwner(PublicationRelationId,
2037
0
              form->oid,
2038
0
              newOwnerId);
2039
2040
0
  InvokeObjectPostAlterHook(PublicationRelationId,
2041
0
                form->oid, 0);
2042
0
}
2043
2044
/*
2045
 * Change publication owner -- by name
2046
 */
2047
ObjectAddress
2048
AlterPublicationOwner(const char *name, Oid newOwnerId)
2049
0
{
2050
0
  Oid     pubid;
2051
0
  HeapTuple tup;
2052
0
  Relation  rel;
2053
0
  ObjectAddress address;
2054
0
  Form_pg_publication pubform;
2055
2056
0
  rel = table_open(PublicationRelationId, RowExclusiveLock);
2057
2058
0
  tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2059
2060
0
  if (!HeapTupleIsValid(tup))
2061
0
    ereport(ERROR,
2062
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
2063
0
         errmsg("publication \"%s\" does not exist", name)));
2064
2065
0
  pubform = (Form_pg_publication) GETSTRUCT(tup);
2066
0
  pubid = pubform->oid;
2067
2068
0
  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2069
2070
0
  ObjectAddressSet(address, PublicationRelationId, pubid);
2071
2072
0
  heap_freetuple(tup);
2073
2074
0
  table_close(rel, RowExclusiveLock);
2075
2076
0
  return address;
2077
0
}
2078
2079
/*
2080
 * Change publication owner -- by OID
2081
 */
2082
void
2083
AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2084
0
{
2085
0
  HeapTuple tup;
2086
0
  Relation  rel;
2087
2088
0
  rel = table_open(PublicationRelationId, RowExclusiveLock);
2089
2090
0
  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2091
2092
0
  if (!HeapTupleIsValid(tup))
2093
0
    ereport(ERROR,
2094
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
2095
0
         errmsg("publication with OID %u does not exist", pubid)));
2096
2097
0
  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2098
2099
0
  heap_freetuple(tup);
2100
2101
0
  table_close(rel, RowExclusiveLock);
2102
0
}
2103
2104
/*
2105
 * Extract the publish_generated_columns option value from a DefElem. "stored"
2106
 * and "none" values are accepted.
2107
 */
2108
static char
2109
defGetGeneratedColsOption(DefElem *def)
2110
0
{
2111
0
  char     *sval = "";
2112
2113
  /*
2114
   * A parameter value is required.
2115
   */
2116
0
  if (def->arg)
2117
0
  {
2118
0
    sval = defGetString(def);
2119
2120
0
    if (pg_strcasecmp(sval, "none") == 0)
2121
0
      return PUBLISH_GENCOLS_NONE;
2122
0
    if (pg_strcasecmp(sval, "stored") == 0)
2123
0
      return PUBLISH_GENCOLS_STORED;
2124
0
  }
2125
2126
0
  ereport(ERROR,
2127
0
      errcode(ERRCODE_SYNTAX_ERROR),
2128
0
      errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2129
0
      errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2130
2131
0
  return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2132
0
}