/src/postgres/src/backend/commands/publicationcmds.c
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * publicationcmds.c |
4 | | * publication manipulation |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * IDENTIFICATION |
10 | | * src/backend/commands/publicationcmds.c |
11 | | * |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | #include "postgres.h" |
16 | | |
17 | | #include "access/htup_details.h" |
18 | | #include "access/table.h" |
19 | | #include "access/xact.h" |
20 | | #include "catalog/catalog.h" |
21 | | #include "catalog/indexing.h" |
22 | | #include "catalog/namespace.h" |
23 | | #include "catalog/objectaccess.h" |
24 | | #include "catalog/objectaddress.h" |
25 | | #include "catalog/pg_database.h" |
26 | | #include "catalog/pg_inherits.h" |
27 | | #include "catalog/pg_namespace.h" |
28 | | #include "catalog/pg_proc.h" |
29 | | #include "catalog/pg_publication.h" |
30 | | #include "catalog/pg_publication_namespace.h" |
31 | | #include "catalog/pg_publication_rel.h" |
32 | | #include "commands/defrem.h" |
33 | | #include "commands/event_trigger.h" |
34 | | #include "commands/publicationcmds.h" |
35 | | #include "miscadmin.h" |
36 | | #include "nodes/nodeFuncs.h" |
37 | | #include "parser/parse_clause.h" |
38 | | #include "parser/parse_collate.h" |
39 | | #include "parser/parse_relation.h" |
40 | | #include "rewrite/rewriteHandler.h" |
41 | | #include "storage/lmgr.h" |
42 | | #include "utils/acl.h" |
43 | | #include "utils/builtins.h" |
44 | | #include "utils/inval.h" |
45 | | #include "utils/lsyscache.h" |
46 | | #include "utils/rel.h" |
47 | | #include "utils/syscache.h" |
48 | | #include "utils/varlena.h" |
49 | | |
50 | | |
51 | | /* |
52 | | * Information used to validate the columns in the row filter expression. See |
53 | | * contain_invalid_rfcolumn_walker for details. |
54 | | */ |
55 | | typedef struct rf_context |
56 | | { |
57 | | Bitmapset *bms_replident; /* bitset of replica identity columns */ |
58 | | bool pubviaroot; /* true if we are validating the parent |
59 | | * relation's row filter */ |
60 | | Oid relid; /* relid of the relation */ |
61 | | Oid parentid; /* relid of the parent relation */ |
62 | | } rf_context; |
63 | | |
64 | | static List *OpenTableList(List *tables); |
65 | | static void CloseTableList(List *rels); |
66 | | static void LockSchemaList(List *schemalist); |
67 | | static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, |
68 | | AlterPublicationStmt *stmt); |
69 | | static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); |
70 | | static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, |
71 | | AlterPublicationStmt *stmt); |
72 | | static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); |
73 | | static char defGetGeneratedColsOption(DefElem *def); |
74 | | |
75 | | |
76 | | static void |
77 | | parse_publication_options(ParseState *pstate, |
78 | | List *options, |
79 | | bool *publish_given, |
80 | | PublicationActions *pubactions, |
81 | | bool *publish_via_partition_root_given, |
82 | | bool *publish_via_partition_root, |
83 | | bool *publish_generated_columns_given, |
84 | | char *publish_generated_columns) |
85 | 0 | { |
86 | 0 | ListCell *lc; |
87 | |
|
88 | 0 | *publish_given = false; |
89 | 0 | *publish_via_partition_root_given = false; |
90 | 0 | *publish_generated_columns_given = false; |
91 | | |
92 | | /* defaults */ |
93 | 0 | pubactions->pubinsert = true; |
94 | 0 | pubactions->pubupdate = true; |
95 | 0 | pubactions->pubdelete = true; |
96 | 0 | pubactions->pubtruncate = true; |
97 | 0 | *publish_via_partition_root = false; |
98 | 0 | *publish_generated_columns = PUBLISH_GENCOLS_NONE; |
99 | | |
100 | | /* Parse options */ |
101 | 0 | foreach(lc, options) |
102 | 0 | { |
103 | 0 | DefElem *defel = (DefElem *) lfirst(lc); |
104 | |
|
105 | 0 | if (strcmp(defel->defname, "publish") == 0) |
106 | 0 | { |
107 | 0 | char *publish; |
108 | 0 | List *publish_list; |
109 | 0 | ListCell *lc2; |
110 | |
|
111 | 0 | if (*publish_given) |
112 | 0 | errorConflictingDefElem(defel, pstate); |
113 | | |
114 | | /* |
115 | | * If publish option was given only the explicitly listed actions |
116 | | * should be published. |
117 | | */ |
118 | 0 | pubactions->pubinsert = false; |
119 | 0 | pubactions->pubupdate = false; |
120 | 0 | pubactions->pubdelete = false; |
121 | 0 | pubactions->pubtruncate = false; |
122 | |
|
123 | 0 | *publish_given = true; |
124 | 0 | publish = defGetString(defel); |
125 | |
|
126 | 0 | if (!SplitIdentifierString(publish, ',', &publish_list)) |
127 | 0 | ereport(ERROR, |
128 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
129 | 0 | errmsg("invalid list syntax in parameter \"%s\"", |
130 | 0 | "publish"))); |
131 | | |
132 | | /* Process the option list. */ |
133 | 0 | foreach(lc2, publish_list) |
134 | 0 | { |
135 | 0 | char *publish_opt = (char *) lfirst(lc2); |
136 | |
|
137 | 0 | if (strcmp(publish_opt, "insert") == 0) |
138 | 0 | pubactions->pubinsert = true; |
139 | 0 | else if (strcmp(publish_opt, "update") == 0) |
140 | 0 | pubactions->pubupdate = true; |
141 | 0 | else if (strcmp(publish_opt, "delete") == 0) |
142 | 0 | pubactions->pubdelete = true; |
143 | 0 | else if (strcmp(publish_opt, "truncate") == 0) |
144 | 0 | pubactions->pubtruncate = true; |
145 | 0 | else |
146 | 0 | ereport(ERROR, |
147 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
148 | 0 | errmsg("unrecognized value for publication option \"%s\": \"%s\"", |
149 | 0 | "publish", publish_opt))); |
150 | 0 | } |
151 | 0 | } |
152 | 0 | else if (strcmp(defel->defname, "publish_via_partition_root") == 0) |
153 | 0 | { |
154 | 0 | if (*publish_via_partition_root_given) |
155 | 0 | errorConflictingDefElem(defel, pstate); |
156 | 0 | *publish_via_partition_root_given = true; |
157 | 0 | *publish_via_partition_root = defGetBoolean(defel); |
158 | 0 | } |
159 | 0 | else if (strcmp(defel->defname, "publish_generated_columns") == 0) |
160 | 0 | { |
161 | 0 | if (*publish_generated_columns_given) |
162 | 0 | errorConflictingDefElem(defel, pstate); |
163 | 0 | *publish_generated_columns_given = true; |
164 | 0 | *publish_generated_columns = defGetGeneratedColsOption(defel); |
165 | 0 | } |
166 | 0 | else |
167 | 0 | ereport(ERROR, |
168 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
169 | 0 | errmsg("unrecognized publication parameter: \"%s\"", defel->defname))); |
170 | 0 | } |
171 | 0 | } |
172 | | |
173 | | /* |
174 | | * Convert the PublicationObjSpecType list into schema oid list and |
175 | | * PublicationTable list. |
176 | | */ |
177 | | static void |
178 | | ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, |
179 | | List **rels, List **schemas) |
180 | 0 | { |
181 | 0 | ListCell *cell; |
182 | 0 | PublicationObjSpec *pubobj; |
183 | |
|
184 | 0 | if (!pubobjspec_list) |
185 | 0 | return; |
186 | | |
187 | 0 | foreach(cell, pubobjspec_list) |
188 | 0 | { |
189 | 0 | Oid schemaid; |
190 | 0 | List *search_path; |
191 | |
|
192 | 0 | pubobj = (PublicationObjSpec *) lfirst(cell); |
193 | |
|
194 | 0 | switch (pubobj->pubobjtype) |
195 | 0 | { |
196 | 0 | case PUBLICATIONOBJ_TABLE: |
197 | 0 | *rels = lappend(*rels, pubobj->pubtable); |
198 | 0 | break; |
199 | 0 | case PUBLICATIONOBJ_TABLES_IN_SCHEMA: |
200 | 0 | schemaid = get_namespace_oid(pubobj->name, false); |
201 | | |
202 | | /* Filter out duplicates if user specifies "sch1, sch1" */ |
203 | 0 | *schemas = list_append_unique_oid(*schemas, schemaid); |
204 | 0 | break; |
205 | 0 | case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA: |
206 | 0 | search_path = fetch_search_path(false); |
207 | 0 | if (search_path == NIL) /* nothing valid in search_path? */ |
208 | 0 | ereport(ERROR, |
209 | 0 | errcode(ERRCODE_UNDEFINED_SCHEMA), |
210 | 0 | errmsg("no schema has been selected for CURRENT_SCHEMA")); |
211 | | |
212 | 0 | schemaid = linitial_oid(search_path); |
213 | 0 | list_free(search_path); |
214 | | |
215 | | /* Filter out duplicates if user specifies "sch1, sch1" */ |
216 | 0 | *schemas = list_append_unique_oid(*schemas, schemaid); |
217 | 0 | break; |
218 | 0 | default: |
219 | | /* shouldn't happen */ |
220 | 0 | elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype); |
221 | 0 | break; |
222 | 0 | } |
223 | 0 | } |
224 | 0 | } |
225 | | |
226 | | /* |
227 | | * Returns true if any of the columns used in the row filter WHERE expression is |
228 | | * not part of REPLICA IDENTITY, false otherwise. |
229 | | */ |
230 | | static bool |
231 | | contain_invalid_rfcolumn_walker(Node *node, rf_context *context) |
232 | 0 | { |
233 | 0 | if (node == NULL) |
234 | 0 | return false; |
235 | | |
236 | 0 | if (IsA(node, Var)) |
237 | 0 | { |
238 | 0 | Var *var = (Var *) node; |
239 | 0 | AttrNumber attnum = var->varattno; |
240 | | |
241 | | /* |
242 | | * If pubviaroot is true, we are validating the row filter of the |
243 | | * parent table, but the bitmap contains the replica identity |
244 | | * information of the child table. So, get the column number of the |
245 | | * child table as parent and child column order could be different. |
246 | | */ |
247 | 0 | if (context->pubviaroot) |
248 | 0 | { |
249 | 0 | char *colname = get_attname(context->parentid, attnum, false); |
250 | |
|
251 | 0 | attnum = get_attnum(context->relid, colname); |
252 | 0 | } |
253 | |
|
254 | 0 | if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, |
255 | 0 | context->bms_replident)) |
256 | 0 | return true; |
257 | 0 | } |
258 | | |
259 | 0 | return expression_tree_walker(node, contain_invalid_rfcolumn_walker, |
260 | 0 | context); |
261 | 0 | } |
262 | | |
263 | | /* |
264 | | * Check if all columns referenced in the filter expression are part of the |
265 | | * REPLICA IDENTITY index or not. |
266 | | * |
267 | | * Returns true if any invalid column is found. |
268 | | */ |
269 | | bool |
270 | | pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, |
271 | | bool pubviaroot) |
272 | 0 | { |
273 | 0 | HeapTuple rftuple; |
274 | 0 | Oid relid = RelationGetRelid(relation); |
275 | 0 | Oid publish_as_relid = RelationGetRelid(relation); |
276 | 0 | bool result = false; |
277 | 0 | Datum rfdatum; |
278 | 0 | bool rfisnull; |
279 | | |
280 | | /* |
281 | | * FULL means all columns are in the REPLICA IDENTITY, so all columns are |
282 | | * allowed in the row filter and we can skip the validation. |
283 | | */ |
284 | 0 | if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
285 | 0 | return false; |
286 | | |
287 | | /* |
288 | | * For a partition, if pubviaroot is true, find the topmost ancestor that |
289 | | * is published via this publication as we need to use its row filter |
290 | | * expression to filter the partition's changes. |
291 | | * |
292 | | * Note that even though the row filter used is for an ancestor, the |
293 | | * REPLICA IDENTITY used will be for the actual child table. |
294 | | */ |
295 | 0 | if (pubviaroot && relation->rd_rel->relispartition) |
296 | 0 | { |
297 | 0 | publish_as_relid |
298 | 0 | = GetTopMostAncestorInPublication(pubid, ancestors, NULL); |
299 | |
|
300 | 0 | if (!OidIsValid(publish_as_relid)) |
301 | 0 | publish_as_relid = relid; |
302 | 0 | } |
303 | |
|
304 | 0 | rftuple = SearchSysCache2(PUBLICATIONRELMAP, |
305 | 0 | ObjectIdGetDatum(publish_as_relid), |
306 | 0 | ObjectIdGetDatum(pubid)); |
307 | |
|
308 | 0 | if (!HeapTupleIsValid(rftuple)) |
309 | 0 | return false; |
310 | | |
311 | 0 | rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, |
312 | 0 | Anum_pg_publication_rel_prqual, |
313 | 0 | &rfisnull); |
314 | |
|
315 | 0 | if (!rfisnull) |
316 | 0 | { |
317 | 0 | rf_context context = {0}; |
318 | 0 | Node *rfnode; |
319 | 0 | Bitmapset *bms = NULL; |
320 | |
|
321 | 0 | context.pubviaroot = pubviaroot; |
322 | 0 | context.parentid = publish_as_relid; |
323 | 0 | context.relid = relid; |
324 | | |
325 | | /* Remember columns that are part of the REPLICA IDENTITY */ |
326 | 0 | bms = RelationGetIndexAttrBitmap(relation, |
327 | 0 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
328 | |
|
329 | 0 | context.bms_replident = bms; |
330 | 0 | rfnode = stringToNode(TextDatumGetCString(rfdatum)); |
331 | 0 | result = contain_invalid_rfcolumn_walker(rfnode, &context); |
332 | 0 | } |
333 | |
|
334 | 0 | ReleaseSysCache(rftuple); |
335 | |
|
336 | 0 | return result; |
337 | 0 | } |
338 | | |
339 | | /* |
340 | | * Check for invalid columns in the publication table definition. |
341 | | * |
342 | | * This function evaluates two conditions: |
343 | | * |
344 | | * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered |
345 | | * by the column list. If any column is missing, *invalid_column_list is set |
346 | | * to true. |
347 | | * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY |
348 | | * are published, either by being explicitly named in the column list or, if |
349 | | * no column list is specified, by setting the option |
350 | | * publish_generated_columns to stored. If any unpublished |
351 | | * generated column is found, *invalid_gen_col is set to true. |
352 | | * |
353 | | * Returns true if any of the above conditions are not met. |
354 | | */ |
355 | | bool |
356 | | pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, |
357 | | bool pubviaroot, char pubgencols_type, |
358 | | bool *invalid_column_list, |
359 | | bool *invalid_gen_col) |
360 | 0 | { |
361 | 0 | Oid relid = RelationGetRelid(relation); |
362 | 0 | Oid publish_as_relid = RelationGetRelid(relation); |
363 | 0 | Bitmapset *idattrs; |
364 | 0 | Bitmapset *columns = NULL; |
365 | 0 | TupleDesc desc = RelationGetDescr(relation); |
366 | 0 | Publication *pub; |
367 | 0 | int x; |
368 | |
|
369 | 0 | *invalid_column_list = false; |
370 | 0 | *invalid_gen_col = false; |
371 | | |
372 | | /* |
373 | | * For a partition, if pubviaroot is true, find the topmost ancestor that |
374 | | * is published via this publication as we need to use its column list for |
375 | | * the changes. |
376 | | * |
377 | | * Note that even though the column list used is for an ancestor, the |
378 | | * REPLICA IDENTITY used will be for the actual child table. |
379 | | */ |
380 | 0 | if (pubviaroot && relation->rd_rel->relispartition) |
381 | 0 | { |
382 | 0 | publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); |
383 | |
|
384 | 0 | if (!OidIsValid(publish_as_relid)) |
385 | 0 | publish_as_relid = relid; |
386 | 0 | } |
387 | | |
388 | | /* Fetch the column list */ |
389 | 0 | pub = GetPublication(pubid); |
390 | 0 | check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns); |
391 | |
|
392 | 0 | if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
393 | 0 | { |
394 | | /* With REPLICA IDENTITY FULL, no column list is allowed. */ |
395 | 0 | *invalid_column_list = (columns != NULL); |
396 | | |
397 | | /* |
398 | | * As we don't allow a column list with REPLICA IDENTITY FULL, the |
399 | | * publish_generated_columns option must be set to stored if the table |
400 | | * has any stored generated columns. |
401 | | */ |
402 | 0 | if (pubgencols_type != PUBLISH_GENCOLS_STORED && |
403 | 0 | relation->rd_att->constr && |
404 | 0 | relation->rd_att->constr->has_generated_stored) |
405 | 0 | *invalid_gen_col = true; |
406 | | |
407 | | /* |
408 | | * Virtual generated columns are currently not supported for logical |
409 | | * replication at all. |
410 | | */ |
411 | 0 | if (relation->rd_att->constr && |
412 | 0 | relation->rd_att->constr->has_generated_virtual) |
413 | 0 | *invalid_gen_col = true; |
414 | |
|
415 | 0 | if (*invalid_gen_col && *invalid_column_list) |
416 | 0 | return true; |
417 | 0 | } |
418 | | |
419 | | /* Remember columns that are part of the REPLICA IDENTITY */ |
420 | 0 | idattrs = RelationGetIndexAttrBitmap(relation, |
421 | 0 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
422 | | |
423 | | /* |
424 | | * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset |
425 | | * (to handle system columns the usual way), while column list does not |
426 | | * use offset, so we can't do bms_is_subset(). Instead, we have to loop |
427 | | * over the idattrs and check all of them are in the list. |
428 | | */ |
429 | 0 | x = -1; |
430 | 0 | while ((x = bms_next_member(idattrs, x)) >= 0) |
431 | 0 | { |
432 | 0 | AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); |
433 | 0 | Form_pg_attribute att = TupleDescAttr(desc, attnum - 1); |
434 | |
|
435 | 0 | if (columns == NULL) |
436 | 0 | { |
437 | | /* |
438 | | * The publish_generated_columns option must be set to stored if |
439 | | * the REPLICA IDENTITY contains any stored generated column. |
440 | | */ |
441 | 0 | if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED) |
442 | 0 | { |
443 | 0 | *invalid_gen_col = true; |
444 | 0 | break; |
445 | 0 | } |
446 | | |
447 | | /* |
448 | | * The equivalent setting for virtual generated columns does not |
449 | | * exist yet. |
450 | | */ |
451 | 0 | if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL) |
452 | 0 | { |
453 | 0 | *invalid_gen_col = true; |
454 | 0 | break; |
455 | 0 | } |
456 | | |
457 | | /* Skip validating the column list since it is not defined */ |
458 | 0 | continue; |
459 | 0 | } |
460 | | |
461 | | /* |
462 | | * If pubviaroot is true, we are validating the column list of the |
463 | | * parent table, but the bitmap contains the replica identity |
464 | | * information of the child table. The parent/child attnums may not |
465 | | * match, so translate them to the parent - get the attname from the |
466 | | * child, and look it up in the parent. |
467 | | */ |
468 | 0 | if (pubviaroot) |
469 | 0 | { |
470 | | /* attribute name in the child table */ |
471 | 0 | char *colname = get_attname(relid, attnum, false); |
472 | | |
473 | | /* |
474 | | * Determine the attnum for the attribute name in parent (we are |
475 | | * using the column list defined on the parent). |
476 | | */ |
477 | 0 | attnum = get_attnum(publish_as_relid, colname); |
478 | 0 | } |
479 | | |
480 | | /* replica identity column, not covered by the column list */ |
481 | 0 | *invalid_column_list |= !bms_is_member(attnum, columns); |
482 | |
|
483 | 0 | if (*invalid_column_list && *invalid_gen_col) |
484 | 0 | break; |
485 | 0 | } |
486 | |
|
487 | 0 | bms_free(columns); |
488 | 0 | bms_free(idattrs); |
489 | |
|
490 | 0 | return *invalid_column_list || *invalid_gen_col; |
491 | 0 | } |
492 | | |
493 | | /* |
494 | | * Invalidate entries in the RelationSyncCache for relations included in the |
495 | | * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA. |
496 | | * |
497 | | * If 'puballtables' is true, invalidate all cache entries. |
498 | | */ |
499 | | void |
500 | | InvalidatePubRelSyncCache(Oid pubid, bool puballtables) |
501 | 0 | { |
502 | 0 | if (puballtables) |
503 | 0 | { |
504 | 0 | CacheInvalidateRelSyncAll(); |
505 | 0 | } |
506 | 0 | else |
507 | 0 | { |
508 | 0 | List *relids = NIL; |
509 | 0 | List *schemarelids = NIL; |
510 | | |
511 | | /* |
512 | | * For partitioned tables, we must invalidate all partitions and |
513 | | * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as |
514 | | * a target. However, WAL records for TRUNCATE specify both a root and |
515 | | * its leaves. |
516 | | */ |
517 | 0 | relids = GetPublicationRelations(pubid, |
518 | 0 | PUBLICATION_PART_ALL); |
519 | 0 | schemarelids = GetAllSchemaPublicationRelations(pubid, |
520 | 0 | PUBLICATION_PART_ALL); |
521 | |
|
522 | 0 | relids = list_concat_unique_oid(relids, schemarelids); |
523 | | |
524 | | /* Invalidate the relsyncache */ |
525 | 0 | foreach_oid(relid, relids) |
526 | 0 | CacheInvalidateRelSync(relid); |
527 | 0 | } |
528 | |
|
529 | 0 | return; |
530 | 0 | } |
531 | | |
532 | | /* check_functions_in_node callback */ |
533 | | static bool |
534 | | contain_mutable_or_user_functions_checker(Oid func_id, void *context) |
535 | 0 | { |
536 | 0 | return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE || |
537 | 0 | func_id >= FirstNormalObjectId); |
538 | 0 | } |
539 | | |
540 | | /* |
541 | | * The row filter walker checks if the row filter expression is a "simple |
542 | | * expression". |
543 | | * |
544 | | * It allows only simple or compound expressions such as: |
545 | | * - (Var Op Const) |
546 | | * - (Var Op Var) |
547 | | * - (Var Op Const) AND/OR (Var Op Const) |
548 | | * - etc |
549 | | * (where Var is a column of the table this filter belongs to) |
550 | | * |
551 | | * The simple expression has the following restrictions: |
552 | | * - User-defined operators are not allowed; |
553 | | * - User-defined functions are not allowed; |
554 | | * - User-defined types are not allowed; |
555 | | * - User-defined collations are not allowed; |
556 | | * - Non-immutable built-in functions are not allowed; |
557 | | * - System columns are not allowed. |
558 | | * |
559 | | * NOTES |
560 | | * |
561 | | * We don't allow user-defined functions/operators/types/collations because |
562 | | * (a) if a user drops a user-defined object used in a row filter expression or |
563 | | * if there is any other error while using it, the logical decoding |
564 | | * infrastructure won't be able to recover from such an error even if the |
565 | | * object is recreated again because a historic snapshot is used to evaluate |
566 | | * the row filter; |
567 | | * (b) a user-defined function can be used to access tables that could have |
568 | | * unpleasant results because a historic snapshot is used. That's why only |
569 | | * immutable built-in functions are allowed in row filter expressions. |
570 | | * |
571 | | * We don't allow system columns because currently, we don't have that |
572 | | * information in the tuple passed to downstream. Also, as we don't replicate |
573 | | * those to subscribers, there doesn't seem to be a need for a filter on those |
574 | | * columns. |
575 | | * |
576 | | * We can allow other node types after more analysis and testing. |
577 | | */ |
578 | | static bool |
579 | | check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate) |
580 | 0 | { |
581 | 0 | char *errdetail_msg = NULL; |
582 | |
|
583 | 0 | if (node == NULL) |
584 | 0 | return false; |
585 | | |
586 | 0 | switch (nodeTag(node)) |
587 | 0 | { |
588 | 0 | case T_Var: |
589 | | /* System columns are not allowed. */ |
590 | 0 | if (((Var *) node)->varattno < InvalidAttrNumber) |
591 | 0 | errdetail_msg = _("System columns are not allowed."); |
592 | 0 | break; |
593 | 0 | case T_OpExpr: |
594 | 0 | case T_DistinctExpr: |
595 | 0 | case T_NullIfExpr: |
596 | | /* OK, except user-defined operators are not allowed. */ |
597 | 0 | if (((OpExpr *) node)->opno >= FirstNormalObjectId) |
598 | 0 | errdetail_msg = _("User-defined operators are not allowed."); |
599 | 0 | break; |
600 | 0 | case T_ScalarArrayOpExpr: |
601 | | /* OK, except user-defined operators are not allowed. */ |
602 | 0 | if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId) |
603 | 0 | errdetail_msg = _("User-defined operators are not allowed."); |
604 | | |
605 | | /* |
606 | | * We don't need to check the hashfuncid and negfuncid of |
607 | | * ScalarArrayOpExpr as those functions are only built for a |
608 | | * subquery. |
609 | | */ |
610 | 0 | break; |
611 | 0 | case T_RowCompareExpr: |
612 | 0 | { |
613 | 0 | ListCell *opid; |
614 | | |
615 | | /* OK, except user-defined operators are not allowed. */ |
616 | 0 | foreach(opid, ((RowCompareExpr *) node)->opnos) |
617 | 0 | { |
618 | 0 | if (lfirst_oid(opid) >= FirstNormalObjectId) |
619 | 0 | { |
620 | 0 | errdetail_msg = _("User-defined operators are not allowed."); |
621 | 0 | break; |
622 | 0 | } |
623 | 0 | } |
624 | 0 | } |
625 | 0 | break; |
626 | 0 | case T_Const: |
627 | 0 | case T_FuncExpr: |
628 | 0 | case T_BoolExpr: |
629 | 0 | case T_RelabelType: |
630 | 0 | case T_CollateExpr: |
631 | 0 | case T_CaseExpr: |
632 | 0 | case T_CaseTestExpr: |
633 | 0 | case T_ArrayExpr: |
634 | 0 | case T_RowExpr: |
635 | 0 | case T_CoalesceExpr: |
636 | 0 | case T_MinMaxExpr: |
637 | 0 | case T_XmlExpr: |
638 | 0 | case T_NullTest: |
639 | 0 | case T_BooleanTest: |
640 | 0 | case T_List: |
641 | | /* OK, supported */ |
642 | 0 | break; |
643 | 0 | default: |
644 | 0 | errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed."); |
645 | 0 | break; |
646 | 0 | } |
647 | | |
648 | | /* |
649 | | * For all the supported nodes, if we haven't already found a problem, |
650 | | * check the types, functions, and collations used in it. We check List |
651 | | * by walking through each element. |
652 | | */ |
653 | 0 | if (!errdetail_msg && !IsA(node, List)) |
654 | 0 | { |
655 | 0 | if (exprType(node) >= FirstNormalObjectId) |
656 | 0 | errdetail_msg = _("User-defined types are not allowed."); |
657 | 0 | else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker, |
658 | 0 | pstate)) |
659 | 0 | errdetail_msg = _("User-defined or built-in mutable functions are not allowed."); |
660 | 0 | else if (exprCollation(node) >= FirstNormalObjectId || |
661 | 0 | exprInputCollation(node) >= FirstNormalObjectId) |
662 | 0 | errdetail_msg = _("User-defined collations are not allowed."); |
663 | 0 | } |
664 | | |
665 | | /* |
666 | | * If we found a problem in this node, throw error now. Otherwise keep |
667 | | * going. |
668 | | */ |
669 | 0 | if (errdetail_msg) |
670 | 0 | ereport(ERROR, |
671 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
672 | 0 | errmsg("invalid publication WHERE expression"), |
673 | 0 | errdetail_internal("%s", errdetail_msg), |
674 | 0 | parser_errposition(pstate, exprLocation(node)))); |
675 | | |
676 | 0 | return expression_tree_walker(node, check_simple_rowfilter_expr_walker, |
677 | 0 | pstate); |
678 | 0 | } |
679 | | |
680 | | /* |
681 | | * Check if the row filter expression is a "simple expression". |
682 | | * |
683 | | * See check_simple_rowfilter_expr_walker for details. |
684 | | */ |
685 | | static bool |
686 | | check_simple_rowfilter_expr(Node *node, ParseState *pstate) |
687 | 0 | { |
688 | 0 | return check_simple_rowfilter_expr_walker(node, pstate); |
689 | 0 | } |
690 | | |
691 | | /* |
692 | | * Transform the publication WHERE expression for all the relations in the list, |
693 | | * ensuring it is coerced to boolean and necessary collation information is |
694 | | * added if required, and add a new nsitem/RTE for the associated relation to |
695 | | * the ParseState's namespace list. |
696 | | * |
697 | | * Also check the publication row filter expression and throw an error if |
698 | | * anything not permitted or unexpected is encountered. |
699 | | */ |
700 | | static void |
701 | | TransformPubWhereClauses(List *tables, const char *queryString, |
702 | | bool pubviaroot) |
703 | 0 | { |
704 | 0 | ListCell *lc; |
705 | |
|
706 | 0 | foreach(lc, tables) |
707 | 0 | { |
708 | 0 | ParseNamespaceItem *nsitem; |
709 | 0 | Node *whereclause = NULL; |
710 | 0 | ParseState *pstate; |
711 | 0 | PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); |
712 | |
|
713 | 0 | if (pri->whereClause == NULL) |
714 | 0 | continue; |
715 | | |
716 | | /* |
717 | | * If the publication doesn't publish changes via the root partitioned |
718 | | * table, the partition's row filter will be used. So disallow using |
719 | | * WHERE clause on partitioned table in this case. |
720 | | */ |
721 | 0 | if (!pubviaroot && |
722 | 0 | pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
723 | 0 | ereport(ERROR, |
724 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
725 | 0 | errmsg("cannot use publication WHERE clause for relation \"%s\"", |
726 | 0 | RelationGetRelationName(pri->relation)), |
727 | 0 | errdetail("WHERE clause cannot be used for a partitioned table when %s is false.", |
728 | 0 | "publish_via_partition_root"))); |
729 | | |
730 | | /* |
731 | | * A fresh pstate is required so that we only have "this" table in its |
732 | | * rangetable |
733 | | */ |
734 | 0 | pstate = make_parsestate(NULL); |
735 | 0 | pstate->p_sourcetext = queryString; |
736 | 0 | nsitem = addRangeTableEntryForRelation(pstate, pri->relation, |
737 | 0 | AccessShareLock, NULL, |
738 | 0 | false, false); |
739 | 0 | addNSItemToQuery(pstate, nsitem, false, true, true); |
740 | |
|
741 | 0 | whereclause = transformWhereClause(pstate, |
742 | 0 | copyObject(pri->whereClause), |
743 | 0 | EXPR_KIND_WHERE, |
744 | 0 | "PUBLICATION WHERE"); |
745 | | |
746 | | /* Fix up collation information */ |
747 | 0 | assign_expr_collations(pstate, whereclause); |
748 | |
|
749 | 0 | whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1); |
750 | | |
751 | | /* |
752 | | * We allow only simple expressions in row filters. See |
753 | | * check_simple_rowfilter_expr_walker. |
754 | | */ |
755 | 0 | check_simple_rowfilter_expr(whereclause, pstate); |
756 | |
|
757 | 0 | free_parsestate(pstate); |
758 | |
|
759 | 0 | pri->whereClause = whereclause; |
760 | 0 | } |
761 | 0 | } |
762 | | |
763 | | |
764 | | /* |
765 | | * Given a list of tables that are going to be added to a publication, |
766 | | * verify that they fulfill the necessary preconditions, namely: no tables |
767 | | * have a column list if any schema is published; and partitioned tables do |
768 | | * not have column lists if publish_via_partition_root is not set. |
769 | | * |
770 | | * 'publish_schema' indicates that the publication contains any TABLES IN |
771 | | * SCHEMA elements (newly added in this command, or preexisting). |
772 | | * 'pubviaroot' is the value of publish_via_partition_root. |
773 | | */ |
774 | | static void |
775 | | CheckPubRelationColumnList(char *pubname, List *tables, |
776 | | bool publish_schema, bool pubviaroot) |
777 | 0 | { |
778 | 0 | ListCell *lc; |
779 | |
|
780 | 0 | foreach(lc, tables) |
781 | 0 | { |
782 | 0 | PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); |
783 | |
|
784 | 0 | if (pri->columns == NIL) |
785 | 0 | continue; |
786 | | |
787 | | /* |
788 | | * Disallow specifying column list if any schema is in the |
789 | | * publication. |
790 | | * |
791 | | * XXX We could instead just forbid the case when the publication |
792 | | * tries to publish the table with a column list and a schema for that |
793 | | * table. However, if we do that then we need a restriction during |
794 | | * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't |
795 | | * seem to be a good idea. |
796 | | */ |
797 | 0 | if (publish_schema) |
798 | 0 | ereport(ERROR, |
799 | 0 | errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
800 | 0 | errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"", |
801 | 0 | get_namespace_name(RelationGetNamespace(pri->relation)), |
802 | 0 | RelationGetRelationName(pri->relation), pubname), |
803 | 0 | errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements.")); |
804 | | |
805 | | /* |
806 | | * If the publication doesn't publish changes via the root partitioned |
807 | | * table, the partition's column list will be used. So disallow using |
808 | | * a column list on the partitioned table in this case. |
809 | | */ |
810 | 0 | if (!pubviaroot && |
811 | 0 | pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
812 | 0 | ereport(ERROR, |
813 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
814 | 0 | errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"", |
815 | 0 | get_namespace_name(RelationGetNamespace(pri->relation)), |
816 | 0 | RelationGetRelationName(pri->relation), pubname), |
817 | 0 | errdetail("Column lists cannot be specified for partitioned tables when %s is false.", |
818 | 0 | "publish_via_partition_root"))); |
819 | 0 | } |
820 | 0 | } |
821 | | |
822 | | /* |
823 | | * Create new publication. |
824 | | */ |
825 | | ObjectAddress |
826 | | CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) |
827 | | { |
828 | | Relation rel; |
829 | | ObjectAddress myself; |
830 | | Oid puboid; |
831 | | bool nulls[Natts_pg_publication]; |
832 | | Datum values[Natts_pg_publication]; |
833 | | HeapTuple tup; |
834 | | bool publish_given; |
835 | | PublicationActions pubactions; |
836 | | bool publish_via_partition_root_given; |
837 | | bool publish_via_partition_root; |
838 | | bool publish_generated_columns_given; |
839 | | char publish_generated_columns; |
840 | | AclResult aclresult; |
841 | | List *relations = NIL; |
842 | | List *schemaidlist = NIL; |
843 | | |
844 | | /* must have CREATE privilege on database */ |
845 | | aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE); |
846 | | if (aclresult != ACLCHECK_OK) |
847 | | aclcheck_error(aclresult, OBJECT_DATABASE, |
848 | | get_database_name(MyDatabaseId)); |
849 | | |
850 | | /* FOR ALL TABLES requires superuser */ |
851 | | if (stmt->for_all_tables && !superuser()) |
852 | | ereport(ERROR, |
853 | | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
854 | | errmsg("must be superuser to create FOR ALL TABLES publication"))); |
855 | | |
856 | | rel = table_open(PublicationRelationId, RowExclusiveLock); |
857 | | |
858 | | /* Check if name is used */ |
859 | | puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid, |
860 | | CStringGetDatum(stmt->pubname)); |
861 | | if (OidIsValid(puboid)) |
862 | | ereport(ERROR, |
863 | | (errcode(ERRCODE_DUPLICATE_OBJECT), |
864 | | errmsg("publication \"%s\" already exists", |
865 | | stmt->pubname))); |
866 | | |
867 | | /* Form a tuple. */ |
868 | | memset(values, 0, sizeof(values)); |
869 | | memset(nulls, false, sizeof(nulls)); |
870 | | |
871 | | values[Anum_pg_publication_pubname - 1] = |
872 | | DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname)); |
873 | | values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId()); |
874 | | |
875 | | parse_publication_options(pstate, |
876 | | stmt->options, |
877 | | &publish_given, &pubactions, |
878 | | &publish_via_partition_root_given, |
879 | | &publish_via_partition_root, |
880 | | &publish_generated_columns_given, |
881 | | &publish_generated_columns); |
882 | | |
883 | | puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, |
884 | | Anum_pg_publication_oid); |
885 | | values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); |
886 | | values[Anum_pg_publication_puballtables - 1] = |
887 | | BoolGetDatum(stmt->for_all_tables); |
888 | | values[Anum_pg_publication_pubinsert - 1] = |
889 | | BoolGetDatum(pubactions.pubinsert); |
890 | | values[Anum_pg_publication_pubupdate - 1] = |
891 | | BoolGetDatum(pubactions.pubupdate); |
892 | | values[Anum_pg_publication_pubdelete - 1] = |
893 | | BoolGetDatum(pubactions.pubdelete); |
894 | | values[Anum_pg_publication_pubtruncate - 1] = |
895 | | BoolGetDatum(pubactions.pubtruncate); |
896 | | values[Anum_pg_publication_pubviaroot - 1] = |
897 | | BoolGetDatum(publish_via_partition_root); |
898 | | values[Anum_pg_publication_pubgencols - 1] = |
899 | | CharGetDatum(publish_generated_columns); |
900 | | |
901 | | tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
902 | | |
903 | | /* Insert tuple into catalog. */ |
904 | | CatalogTupleInsert(rel, tup); |
905 | | heap_freetuple(tup); |
906 | | |
907 | | recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId()); |
908 | | |
909 | | ObjectAddressSet(myself, PublicationRelationId, puboid); |
910 | | |
911 | | /* Make the changes visible. */ |
912 | | CommandCounterIncrement(); |
913 | | |
914 | | /* Associate objects with the publication. */ |
915 | | if (stmt->for_all_tables) |
916 | | { |
917 | | /* Invalidate relcache so that publication info is rebuilt. */ |
918 | | CacheInvalidateRelcacheAll(); |
919 | | } |
920 | | else |
921 | | { |
922 | | ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, |
923 | | &schemaidlist); |
924 | | |
925 | | /* FOR TABLES IN SCHEMA requires superuser */ |
926 | | if (schemaidlist != NIL && !superuser()) |
927 | | ereport(ERROR, |
928 | | errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
929 | | errmsg("must be superuser to create FOR TABLES IN SCHEMA publication")); |
930 | | |
931 | | if (relations != NIL) |
932 | | { |
933 | | List *rels; |
934 | | |
935 | | rels = OpenTableList(relations); |
936 | | TransformPubWhereClauses(rels, pstate->p_sourcetext, |
937 | | publish_via_partition_root); |
938 | | |
939 | | CheckPubRelationColumnList(stmt->pubname, rels, |
940 | | schemaidlist != NIL, |
941 | | publish_via_partition_root); |
942 | | |
943 | | PublicationAddTables(puboid, rels, true, NULL); |
944 | | CloseTableList(rels); |
945 | | } |
946 | | |
947 | | if (schemaidlist != NIL) |
948 | | { |
949 | | /* |
950 | | * Schema lock is held until the publication is created to prevent |
951 | | * concurrent schema deletion. |
952 | | */ |
953 | | LockSchemaList(schemaidlist); |
954 | | PublicationAddSchemas(puboid, schemaidlist, true, NULL); |
955 | | } |
956 | | } |
957 | | |
958 | | table_close(rel, RowExclusiveLock); |
959 | | |
960 | | InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); |
961 | | |
962 | | if (wal_level != WAL_LEVEL_LOGICAL) |
963 | | ereport(WARNING, |
964 | | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
965 | | errmsg("\"wal_level\" is insufficient to publish logical changes"), |
966 | | errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); |
967 | | |
968 | | return myself; |
969 | | } |
970 | | |
971 | | /* |
972 | | * Change options of a publication. |
973 | | */ |
974 | | static void |
975 | | AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, |
976 | | Relation rel, HeapTuple tup) |
977 | 0 | { |
978 | 0 | bool nulls[Natts_pg_publication]; |
979 | 0 | bool replaces[Natts_pg_publication]; |
980 | 0 | Datum values[Natts_pg_publication]; |
981 | 0 | bool publish_given; |
982 | 0 | PublicationActions pubactions; |
983 | 0 | bool publish_via_partition_root_given; |
984 | 0 | bool publish_via_partition_root; |
985 | 0 | bool publish_generated_columns_given; |
986 | 0 | char publish_generated_columns; |
987 | 0 | ObjectAddress obj; |
988 | 0 | Form_pg_publication pubform; |
989 | 0 | List *root_relids = NIL; |
990 | 0 | ListCell *lc; |
991 | |
|
992 | 0 | parse_publication_options(pstate, |
993 | 0 | stmt->options, |
994 | 0 | &publish_given, &pubactions, |
995 | 0 | &publish_via_partition_root_given, |
996 | 0 | &publish_via_partition_root, |
997 | 0 | &publish_generated_columns_given, |
998 | 0 | &publish_generated_columns); |
999 | |
|
1000 | 0 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
1001 | | |
1002 | | /* |
1003 | | * If the publication doesn't publish changes via the root partitioned |
1004 | | * table, the partition's row filter and column list will be used. So |
1005 | | * disallow using WHERE clause and column lists on partitioned table in |
1006 | | * this case. |
1007 | | */ |
1008 | 0 | if (!pubform->puballtables && publish_via_partition_root_given && |
1009 | 0 | !publish_via_partition_root) |
1010 | 0 | { |
1011 | | /* |
1012 | | * Lock the publication so nobody else can do anything with it. This |
1013 | | * prevents concurrent alter to add partitioned table(s) with WHERE |
1014 | | * clause(s) and/or column lists which we don't allow when not |
1015 | | * publishing via root. |
1016 | | */ |
1017 | 0 | LockDatabaseObject(PublicationRelationId, pubform->oid, 0, |
1018 | 0 | AccessShareLock); |
1019 | |
|
1020 | 0 | root_relids = GetPublicationRelations(pubform->oid, |
1021 | 0 | PUBLICATION_PART_ROOT); |
1022 | |
|
1023 | 0 | foreach(lc, root_relids) |
1024 | 0 | { |
1025 | 0 | Oid relid = lfirst_oid(lc); |
1026 | 0 | HeapTuple rftuple; |
1027 | 0 | char relkind; |
1028 | 0 | char *relname; |
1029 | 0 | bool has_rowfilter; |
1030 | 0 | bool has_collist; |
1031 | | |
1032 | | /* |
1033 | | * Beware: we don't have lock on the relations, so cope silently |
1034 | | * with the cache lookups returning NULL. |
1035 | | */ |
1036 | |
|
1037 | 0 | rftuple = SearchSysCache2(PUBLICATIONRELMAP, |
1038 | 0 | ObjectIdGetDatum(relid), |
1039 | 0 | ObjectIdGetDatum(pubform->oid)); |
1040 | 0 | if (!HeapTupleIsValid(rftuple)) |
1041 | 0 | continue; |
1042 | 0 | has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); |
1043 | 0 | has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); |
1044 | 0 | if (!has_rowfilter && !has_collist) |
1045 | 0 | { |
1046 | 0 | ReleaseSysCache(rftuple); |
1047 | 0 | continue; |
1048 | 0 | } |
1049 | | |
1050 | 0 | relkind = get_rel_relkind(relid); |
1051 | 0 | if (relkind != RELKIND_PARTITIONED_TABLE) |
1052 | 0 | { |
1053 | 0 | ReleaseSysCache(rftuple); |
1054 | 0 | continue; |
1055 | 0 | } |
1056 | 0 | relname = get_rel_name(relid); |
1057 | 0 | if (relname == NULL) /* table concurrently dropped */ |
1058 | 0 | { |
1059 | 0 | ReleaseSysCache(rftuple); |
1060 | 0 | continue; |
1061 | 0 | } |
1062 | | |
1063 | 0 | if (has_rowfilter) |
1064 | 0 | ereport(ERROR, |
1065 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
1066 | 0 | errmsg("cannot set parameter \"%s\" to false for publication \"%s\"", |
1067 | 0 | "publish_via_partition_root", |
1068 | 0 | stmt->pubname), |
1069 | 0 | errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.", |
1070 | 0 | relname, "publish_via_partition_root"))); |
1071 | 0 | Assert(has_collist); |
1072 | 0 | ereport(ERROR, |
1073 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
1074 | 0 | errmsg("cannot set parameter \"%s\" to false for publication \"%s\"", |
1075 | 0 | "publish_via_partition_root", |
1076 | 0 | stmt->pubname), |
1077 | 0 | errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.", |
1078 | 0 | relname, "publish_via_partition_root"))); |
1079 | 0 | } |
1080 | 0 | } |
1081 | | |
1082 | | /* Everything ok, form a new tuple. */ |
1083 | 0 | memset(values, 0, sizeof(values)); |
1084 | 0 | memset(nulls, false, sizeof(nulls)); |
1085 | 0 | memset(replaces, false, sizeof(replaces)); |
1086 | |
|
1087 | 0 | if (publish_given) |
1088 | 0 | { |
1089 | 0 | values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); |
1090 | 0 | replaces[Anum_pg_publication_pubinsert - 1] = true; |
1091 | |
|
1092 | 0 | values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate); |
1093 | 0 | replaces[Anum_pg_publication_pubupdate - 1] = true; |
1094 | |
|
1095 | 0 | values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete); |
1096 | 0 | replaces[Anum_pg_publication_pubdelete - 1] = true; |
1097 | |
|
1098 | 0 | values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); |
1099 | 0 | replaces[Anum_pg_publication_pubtruncate - 1] = true; |
1100 | 0 | } |
1101 | |
|
1102 | 0 | if (publish_via_partition_root_given) |
1103 | 0 | { |
1104 | 0 | values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); |
1105 | 0 | replaces[Anum_pg_publication_pubviaroot - 1] = true; |
1106 | 0 | } |
1107 | |
|
1108 | 0 | if (publish_generated_columns_given) |
1109 | 0 | { |
1110 | 0 | values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns); |
1111 | 0 | replaces[Anum_pg_publication_pubgencols - 1] = true; |
1112 | 0 | } |
1113 | |
|
1114 | 0 | tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, |
1115 | 0 | replaces); |
1116 | | |
1117 | | /* Update the catalog. */ |
1118 | 0 | CatalogTupleUpdate(rel, &tup->t_self, tup); |
1119 | |
|
1120 | 0 | CommandCounterIncrement(); |
1121 | |
|
1122 | 0 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
1123 | | |
1124 | | /* Invalidate the relcache. */ |
1125 | 0 | if (pubform->puballtables) |
1126 | 0 | { |
1127 | 0 | CacheInvalidateRelcacheAll(); |
1128 | 0 | } |
1129 | 0 | else |
1130 | 0 | { |
1131 | 0 | List *relids = NIL; |
1132 | 0 | List *schemarelids = NIL; |
1133 | | |
1134 | | /* |
1135 | | * For any partitioned tables contained in the publication, we must |
1136 | | * invalidate all partitions contained in the respective partition |
1137 | | * trees, not just those explicitly mentioned in the publication. |
1138 | | */ |
1139 | 0 | if (root_relids == NIL) |
1140 | 0 | relids = GetPublicationRelations(pubform->oid, |
1141 | 0 | PUBLICATION_PART_ALL); |
1142 | 0 | else |
1143 | 0 | { |
1144 | | /* |
1145 | | * We already got tables explicitly mentioned in the publication. |
1146 | | * Now get all partitions for the partitioned table in the list. |
1147 | | */ |
1148 | 0 | foreach(lc, root_relids) |
1149 | 0 | relids = GetPubPartitionOptionRelations(relids, |
1150 | 0 | PUBLICATION_PART_ALL, |
1151 | 0 | lfirst_oid(lc)); |
1152 | 0 | } |
1153 | |
|
1154 | 0 | schemarelids = GetAllSchemaPublicationRelations(pubform->oid, |
1155 | 0 | PUBLICATION_PART_ALL); |
1156 | 0 | relids = list_concat_unique_oid(relids, schemarelids); |
1157 | |
|
1158 | 0 | InvalidatePublicationRels(relids); |
1159 | 0 | } |
1160 | |
|
1161 | 0 | ObjectAddressSet(obj, PublicationRelationId, pubform->oid); |
1162 | 0 | EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, |
1163 | 0 | (Node *) stmt); |
1164 | |
|
1165 | 0 | InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0); |
1166 | 0 | } |
1167 | | |
1168 | | /* |
1169 | | * Invalidate the relations. |
1170 | | */ |
1171 | | void |
1172 | | InvalidatePublicationRels(List *relids) |
1173 | 0 | { |
1174 | | /* |
1175 | | * We don't want to send too many individual messages, at some point it's |
1176 | | * cheaper to just reset whole relcache. |
1177 | | */ |
1178 | 0 | if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS) |
1179 | 0 | { |
1180 | 0 | ListCell *lc; |
1181 | |
|
1182 | 0 | foreach(lc, relids) |
1183 | 0 | CacheInvalidateRelcacheByRelid(lfirst_oid(lc)); |
1184 | 0 | } |
1185 | 0 | else |
1186 | 0 | CacheInvalidateRelcacheAll(); |
1187 | 0 | } |
1188 | | |
1189 | | /* |
1190 | | * Add or remove table to/from publication. |
1191 | | */ |
1192 | | static void |
1193 | | AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, |
1194 | | List *tables, const char *queryString, |
1195 | | bool publish_schema) |
1196 | 0 | { |
1197 | 0 | List *rels = NIL; |
1198 | 0 | Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); |
1199 | 0 | Oid pubid = pubform->oid; |
1200 | | |
1201 | | /* |
1202 | | * Nothing to do if no objects, except in SET: for that it is quite |
1203 | | * possible that user has not specified any tables in which case we need |
1204 | | * to remove all the existing tables. |
1205 | | */ |
1206 | 0 | if (!tables && stmt->action != AP_SetObjects) |
1207 | 0 | return; |
1208 | | |
1209 | 0 | rels = OpenTableList(tables); |
1210 | |
|
1211 | 0 | if (stmt->action == AP_AddObjects) |
1212 | 0 | { |
1213 | 0 | TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); |
1214 | |
|
1215 | 0 | publish_schema |= is_schema_publication(pubid); |
1216 | |
|
1217 | 0 | CheckPubRelationColumnList(stmt->pubname, rels, publish_schema, |
1218 | 0 | pubform->pubviaroot); |
1219 | |
|
1220 | 0 | PublicationAddTables(pubid, rels, false, stmt); |
1221 | 0 | } |
1222 | 0 | else if (stmt->action == AP_DropObjects) |
1223 | 0 | PublicationDropTables(pubid, rels, false); |
1224 | 0 | else /* AP_SetObjects */ |
1225 | 0 | { |
1226 | 0 | List *oldrelids = GetPublicationRelations(pubid, |
1227 | 0 | PUBLICATION_PART_ROOT); |
1228 | 0 | List *delrels = NIL; |
1229 | 0 | ListCell *oldlc; |
1230 | |
|
1231 | 0 | TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); |
1232 | |
|
1233 | 0 | CheckPubRelationColumnList(stmt->pubname, rels, publish_schema, |
1234 | 0 | pubform->pubviaroot); |
1235 | | |
1236 | | /* |
1237 | | * To recreate the relation list for the publication, look for |
1238 | | * existing relations that do not need to be dropped. |
1239 | | */ |
1240 | 0 | foreach(oldlc, oldrelids) |
1241 | 0 | { |
1242 | 0 | Oid oldrelid = lfirst_oid(oldlc); |
1243 | 0 | ListCell *newlc; |
1244 | 0 | PublicationRelInfo *oldrel; |
1245 | 0 | bool found = false; |
1246 | 0 | HeapTuple rftuple; |
1247 | 0 | Node *oldrelwhereclause = NULL; |
1248 | 0 | Bitmapset *oldcolumns = NULL; |
1249 | | |
1250 | | /* look up the cache for the old relmap */ |
1251 | 0 | rftuple = SearchSysCache2(PUBLICATIONRELMAP, |
1252 | 0 | ObjectIdGetDatum(oldrelid), |
1253 | 0 | ObjectIdGetDatum(pubid)); |
1254 | | |
1255 | | /* |
1256 | | * See if the existing relation currently has a WHERE clause or a |
1257 | | * column list. We need to compare those too. |
1258 | | */ |
1259 | 0 | if (HeapTupleIsValid(rftuple)) |
1260 | 0 | { |
1261 | 0 | bool isnull = true; |
1262 | 0 | Datum whereClauseDatum; |
1263 | 0 | Datum columnListDatum; |
1264 | | |
1265 | | /* Load the WHERE clause for this table. */ |
1266 | 0 | whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, |
1267 | 0 | Anum_pg_publication_rel_prqual, |
1268 | 0 | &isnull); |
1269 | 0 | if (!isnull) |
1270 | 0 | oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); |
1271 | | |
1272 | | /* Transform the int2vector column list to a bitmap. */ |
1273 | 0 | columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, |
1274 | 0 | Anum_pg_publication_rel_prattrs, |
1275 | 0 | &isnull); |
1276 | |
|
1277 | 0 | if (!isnull) |
1278 | 0 | oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); |
1279 | |
|
1280 | 0 | ReleaseSysCache(rftuple); |
1281 | 0 | } |
1282 | |
|
1283 | 0 | foreach(newlc, rels) |
1284 | 0 | { |
1285 | 0 | PublicationRelInfo *newpubrel; |
1286 | 0 | Oid newrelid; |
1287 | 0 | Bitmapset *newcolumns = NULL; |
1288 | |
|
1289 | 0 | newpubrel = (PublicationRelInfo *) lfirst(newlc); |
1290 | 0 | newrelid = RelationGetRelid(newpubrel->relation); |
1291 | | |
1292 | | /* |
1293 | | * Validate the column list. If the column list or WHERE |
1294 | | * clause changes, then the validation done here will be |
1295 | | * duplicated inside PublicationAddTables(). The validation |
1296 | | * is cheap enough that that seems harmless. |
1297 | | */ |
1298 | 0 | newcolumns = pub_collist_validate(newpubrel->relation, |
1299 | 0 | newpubrel->columns); |
1300 | | |
1301 | | /* |
1302 | | * Check if any of the new set of relations matches with the |
1303 | | * existing relations in the publication. Additionally, if the |
1304 | | * relation has an associated WHERE clause, check the WHERE |
1305 | | * expressions also match. Same for the column list. Drop the |
1306 | | * rest. |
1307 | | */ |
1308 | 0 | if (newrelid == oldrelid) |
1309 | 0 | { |
1310 | 0 | if (equal(oldrelwhereclause, newpubrel->whereClause) && |
1311 | 0 | bms_equal(oldcolumns, newcolumns)) |
1312 | 0 | { |
1313 | 0 | found = true; |
1314 | 0 | break; |
1315 | 0 | } |
1316 | 0 | } |
1317 | 0 | } |
1318 | | |
1319 | | /* |
1320 | | * Add the non-matched relations to a list so that they can be |
1321 | | * dropped. |
1322 | | */ |
1323 | 0 | if (!found) |
1324 | 0 | { |
1325 | 0 | oldrel = palloc(sizeof(PublicationRelInfo)); |
1326 | 0 | oldrel->whereClause = NULL; |
1327 | 0 | oldrel->columns = NIL; |
1328 | 0 | oldrel->relation = table_open(oldrelid, |
1329 | 0 | ShareUpdateExclusiveLock); |
1330 | 0 | delrels = lappend(delrels, oldrel); |
1331 | 0 | } |
1332 | 0 | } |
1333 | | |
1334 | | /* And drop them. */ |
1335 | 0 | PublicationDropTables(pubid, delrels, true); |
1336 | | |
1337 | | /* |
1338 | | * Don't bother calculating the difference for adding, we'll catch and |
1339 | | * skip existing ones when doing catalog update. |
1340 | | */ |
1341 | 0 | PublicationAddTables(pubid, rels, true, stmt); |
1342 | |
|
1343 | 0 | CloseTableList(delrels); |
1344 | 0 | } |
1345 | |
|
1346 | 0 | CloseTableList(rels); |
1347 | 0 | } |
1348 | | |
1349 | | /* |
1350 | | * Alter the publication schemas. |
1351 | | * |
1352 | | * Add or remove schemas to/from publication. |
1353 | | */ |
1354 | | static void |
1355 | | AlterPublicationSchemas(AlterPublicationStmt *stmt, |
1356 | | HeapTuple tup, List *schemaidlist) |
1357 | 0 | { |
1358 | 0 | Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); |
1359 | | |
1360 | | /* |
1361 | | * Nothing to do if no objects, except in SET: for that it is quite |
1362 | | * possible that user has not specified any schemas in which case we need |
1363 | | * to remove all the existing schemas. |
1364 | | */ |
1365 | 0 | if (!schemaidlist && stmt->action != AP_SetObjects) |
1366 | 0 | return; |
1367 | | |
1368 | | /* |
1369 | | * Schema lock is held until the publication is altered to prevent |
1370 | | * concurrent schema deletion. |
1371 | | */ |
1372 | 0 | LockSchemaList(schemaidlist); |
1373 | 0 | if (stmt->action == AP_AddObjects) |
1374 | 0 | { |
1375 | 0 | ListCell *lc; |
1376 | 0 | List *reloids; |
1377 | |
|
1378 | 0 | reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); |
1379 | |
|
1380 | 0 | foreach(lc, reloids) |
1381 | 0 | { |
1382 | 0 | HeapTuple coltuple; |
1383 | |
|
1384 | 0 | coltuple = SearchSysCache2(PUBLICATIONRELMAP, |
1385 | 0 | ObjectIdGetDatum(lfirst_oid(lc)), |
1386 | 0 | ObjectIdGetDatum(pubform->oid)); |
1387 | |
|
1388 | 0 | if (!HeapTupleIsValid(coltuple)) |
1389 | 0 | continue; |
1390 | | |
1391 | | /* |
1392 | | * Disallow adding schema if column list is already part of the |
1393 | | * publication. See CheckPubRelationColumnList. |
1394 | | */ |
1395 | 0 | if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL)) |
1396 | 0 | ereport(ERROR, |
1397 | 0 | errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
1398 | 0 | errmsg("cannot add schema to publication \"%s\"", |
1399 | 0 | stmt->pubname), |
1400 | 0 | errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication.")); |
1401 | | |
1402 | 0 | ReleaseSysCache(coltuple); |
1403 | 0 | } |
1404 | | |
1405 | 0 | PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); |
1406 | 0 | } |
1407 | 0 | else if (stmt->action == AP_DropObjects) |
1408 | 0 | PublicationDropSchemas(pubform->oid, schemaidlist, false); |
1409 | 0 | else /* AP_SetObjects */ |
1410 | 0 | { |
1411 | 0 | List *oldschemaids = GetPublicationSchemas(pubform->oid); |
1412 | 0 | List *delschemas = NIL; |
1413 | | |
1414 | | /* Identify which schemas should be dropped */ |
1415 | 0 | delschemas = list_difference_oid(oldschemaids, schemaidlist); |
1416 | | |
1417 | | /* |
1418 | | * Schema lock is held until the publication is altered to prevent |
1419 | | * concurrent schema deletion. |
1420 | | */ |
1421 | 0 | LockSchemaList(delschemas); |
1422 | | |
1423 | | /* And drop them */ |
1424 | 0 | PublicationDropSchemas(pubform->oid, delschemas, true); |
1425 | | |
1426 | | /* |
1427 | | * Don't bother calculating the difference for adding, we'll catch and |
1428 | | * skip existing ones when doing catalog update. |
1429 | | */ |
1430 | 0 | PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); |
1431 | 0 | } |
1432 | 0 | } |
1433 | | |
1434 | | /* |
1435 | | * Check if relations and schemas can be in a given publication and throw |
1436 | | * appropriate error if not. |
1437 | | */ |
1438 | | static void |
1439 | | CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, |
1440 | | List *tables, List *schemaidlist) |
1441 | 0 | { |
1442 | 0 | Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); |
1443 | |
|
1444 | 0 | if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) && |
1445 | 0 | schemaidlist && !superuser()) |
1446 | 0 | ereport(ERROR, |
1447 | 0 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
1448 | 0 | errmsg("must be superuser to add or set schemas"))); |
1449 | | |
1450 | | /* |
1451 | | * Check that user is allowed to manipulate the publication tables in |
1452 | | * schema |
1453 | | */ |
1454 | 0 | if (schemaidlist && pubform->puballtables) |
1455 | 0 | ereport(ERROR, |
1456 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1457 | 0 | errmsg("publication \"%s\" is defined as FOR ALL TABLES", |
1458 | 0 | NameStr(pubform->pubname)), |
1459 | 0 | errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."))); |
1460 | | |
1461 | | /* Check that user is allowed to manipulate the publication tables. */ |
1462 | 0 | if (tables && pubform->puballtables) |
1463 | 0 | ereport(ERROR, |
1464 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1465 | 0 | errmsg("publication \"%s\" is defined as FOR ALL TABLES", |
1466 | 0 | NameStr(pubform->pubname)), |
1467 | 0 | errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); |
1468 | 0 | } |
1469 | | |
1470 | | /* |
1471 | | * Alter the existing publication. |
1472 | | * |
1473 | | * This is dispatcher function for AlterPublicationOptions, |
1474 | | * AlterPublicationSchemas and AlterPublicationTables. |
1475 | | */ |
1476 | | void |
1477 | | AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) |
1478 | 0 | { |
1479 | 0 | Relation rel; |
1480 | 0 | HeapTuple tup; |
1481 | 0 | Form_pg_publication pubform; |
1482 | |
|
1483 | 0 | rel = table_open(PublicationRelationId, RowExclusiveLock); |
1484 | |
|
1485 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONNAME, |
1486 | 0 | CStringGetDatum(stmt->pubname)); |
1487 | |
|
1488 | 0 | if (!HeapTupleIsValid(tup)) |
1489 | 0 | ereport(ERROR, |
1490 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
1491 | 0 | errmsg("publication \"%s\" does not exist", |
1492 | 0 | stmt->pubname))); |
1493 | | |
1494 | 0 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
1495 | | |
1496 | | /* must be owner */ |
1497 | 0 | if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId())) |
1498 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, |
1499 | 0 | stmt->pubname); |
1500 | |
|
1501 | 0 | if (stmt->options) |
1502 | 0 | AlterPublicationOptions(pstate, stmt, rel, tup); |
1503 | 0 | else |
1504 | 0 | { |
1505 | 0 | List *relations = NIL; |
1506 | 0 | List *schemaidlist = NIL; |
1507 | 0 | Oid pubid = pubform->oid; |
1508 | |
|
1509 | 0 | ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, |
1510 | 0 | &schemaidlist); |
1511 | |
|
1512 | 0 | CheckAlterPublication(stmt, tup, relations, schemaidlist); |
1513 | |
|
1514 | 0 | heap_freetuple(tup); |
1515 | | |
1516 | | /* Lock the publication so nobody else can do anything with it. */ |
1517 | 0 | LockDatabaseObject(PublicationRelationId, pubid, 0, |
1518 | 0 | AccessExclusiveLock); |
1519 | | |
1520 | | /* |
1521 | | * It is possible that by the time we acquire the lock on publication, |
1522 | | * concurrent DDL has removed it. We can test this by checking the |
1523 | | * existence of publication. We get the tuple again to avoid the risk |
1524 | | * of any publication option getting changed. |
1525 | | */ |
1526 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
1527 | 0 | if (!HeapTupleIsValid(tup)) |
1528 | 0 | ereport(ERROR, |
1529 | 0 | errcode(ERRCODE_UNDEFINED_OBJECT), |
1530 | 0 | errmsg("publication \"%s\" does not exist", |
1531 | 0 | stmt->pubname)); |
1532 | | |
1533 | 0 | AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext, |
1534 | 0 | schemaidlist != NIL); |
1535 | 0 | AlterPublicationSchemas(stmt, tup, schemaidlist); |
1536 | 0 | } |
1537 | | |
1538 | | /* Cleanup. */ |
1539 | 0 | heap_freetuple(tup); |
1540 | 0 | table_close(rel, RowExclusiveLock); |
1541 | 0 | } |
1542 | | |
1543 | | /* |
1544 | | * Remove relation from publication by mapping OID. |
1545 | | */ |
1546 | | void |
1547 | | RemovePublicationRelById(Oid proid) |
1548 | 0 | { |
1549 | 0 | Relation rel; |
1550 | 0 | HeapTuple tup; |
1551 | 0 | Form_pg_publication_rel pubrel; |
1552 | 0 | List *relids = NIL; |
1553 | |
|
1554 | 0 | rel = table_open(PublicationRelRelationId, RowExclusiveLock); |
1555 | |
|
1556 | 0 | tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid)); |
1557 | |
|
1558 | 0 | if (!HeapTupleIsValid(tup)) |
1559 | 0 | elog(ERROR, "cache lookup failed for publication table %u", |
1560 | 0 | proid); |
1561 | | |
1562 | 0 | pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); |
1563 | | |
1564 | | /* |
1565 | | * Invalidate relcache so that publication info is rebuilt. |
1566 | | * |
1567 | | * For the partitioned tables, we must invalidate all partitions contained |
1568 | | * in the respective partition hierarchies, not just the one explicitly |
1569 | | * mentioned in the publication. This is required because we implicitly |
1570 | | * publish the child tables when the parent table is published. |
1571 | | */ |
1572 | 0 | relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, |
1573 | 0 | pubrel->prrelid); |
1574 | |
|
1575 | 0 | InvalidatePublicationRels(relids); |
1576 | |
|
1577 | 0 | CatalogTupleDelete(rel, &tup->t_self); |
1578 | |
|
1579 | 0 | ReleaseSysCache(tup); |
1580 | |
|
1581 | 0 | table_close(rel, RowExclusiveLock); |
1582 | 0 | } |
1583 | | |
1584 | | /* |
1585 | | * Remove the publication by mapping OID. |
1586 | | */ |
1587 | | void |
1588 | | RemovePublicationById(Oid pubid) |
1589 | 0 | { |
1590 | 0 | Relation rel; |
1591 | 0 | HeapTuple tup; |
1592 | 0 | Form_pg_publication pubform; |
1593 | |
|
1594 | 0 | rel = table_open(PublicationRelationId, RowExclusiveLock); |
1595 | |
|
1596 | 0 | tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
1597 | 0 | if (!HeapTupleIsValid(tup)) |
1598 | 0 | elog(ERROR, "cache lookup failed for publication %u", pubid); |
1599 | | |
1600 | 0 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
1601 | | |
1602 | | /* Invalidate relcache so that publication info is rebuilt. */ |
1603 | 0 | if (pubform->puballtables) |
1604 | 0 | CacheInvalidateRelcacheAll(); |
1605 | |
|
1606 | 0 | CatalogTupleDelete(rel, &tup->t_self); |
1607 | |
|
1608 | 0 | ReleaseSysCache(tup); |
1609 | |
|
1610 | 0 | table_close(rel, RowExclusiveLock); |
1611 | 0 | } |
1612 | | |
1613 | | /* |
1614 | | * Remove schema from publication by mapping OID. |
1615 | | */ |
1616 | | void |
1617 | | RemovePublicationSchemaById(Oid psoid) |
1618 | 0 | { |
1619 | 0 | Relation rel; |
1620 | 0 | HeapTuple tup; |
1621 | 0 | List *schemaRels = NIL; |
1622 | 0 | Form_pg_publication_namespace pubsch; |
1623 | |
|
1624 | 0 | rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock); |
1625 | |
|
1626 | 0 | tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid)); |
1627 | |
|
1628 | 0 | if (!HeapTupleIsValid(tup)) |
1629 | 0 | elog(ERROR, "cache lookup failed for publication schema %u", psoid); |
1630 | | |
1631 | 0 | pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup); |
1632 | | |
1633 | | /* |
1634 | | * Invalidate relcache so that publication info is rebuilt. See |
1635 | | * RemovePublicationRelById for why we need to consider all the |
1636 | | * partitions. |
1637 | | */ |
1638 | 0 | schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, |
1639 | 0 | PUBLICATION_PART_ALL); |
1640 | 0 | InvalidatePublicationRels(schemaRels); |
1641 | |
|
1642 | 0 | CatalogTupleDelete(rel, &tup->t_self); |
1643 | |
|
1644 | 0 | ReleaseSysCache(tup); |
1645 | |
|
1646 | 0 | table_close(rel, RowExclusiveLock); |
1647 | 0 | } |
1648 | | |
1649 | | /* |
1650 | | * Open relations specified by a PublicationTable list. |
1651 | | * The returned tables are locked in ShareUpdateExclusiveLock mode in order to |
1652 | | * add them to a publication. |
1653 | | */ |
1654 | | static List * |
1655 | | OpenTableList(List *tables) |
1656 | 0 | { |
1657 | 0 | List *relids = NIL; |
1658 | 0 | List *rels = NIL; |
1659 | 0 | ListCell *lc; |
1660 | 0 | List *relids_with_rf = NIL; |
1661 | 0 | List *relids_with_collist = NIL; |
1662 | | |
1663 | | /* |
1664 | | * Open, share-lock, and check all the explicitly-specified relations |
1665 | | */ |
1666 | 0 | foreach(lc, tables) |
1667 | 0 | { |
1668 | 0 | PublicationTable *t = lfirst_node(PublicationTable, lc); |
1669 | 0 | bool recurse = t->relation->inh; |
1670 | 0 | Relation rel; |
1671 | 0 | Oid myrelid; |
1672 | 0 | PublicationRelInfo *pub_rel; |
1673 | | |
1674 | | /* Allow query cancel in case this takes a long time */ |
1675 | 0 | CHECK_FOR_INTERRUPTS(); |
1676 | |
|
1677 | 0 | rel = table_openrv(t->relation, ShareUpdateExclusiveLock); |
1678 | 0 | myrelid = RelationGetRelid(rel); |
1679 | | |
1680 | | /* |
1681 | | * Filter out duplicates if user specifies "foo, foo". |
1682 | | * |
1683 | | * Note that this algorithm is known to not be very efficient (O(N^2)) |
1684 | | * but given that it only works on list of tables given to us by user |
1685 | | * it's deemed acceptable. |
1686 | | */ |
1687 | 0 | if (list_member_oid(relids, myrelid)) |
1688 | 0 | { |
1689 | | /* Disallow duplicate tables if there are any with row filters. */ |
1690 | 0 | if (t->whereClause || list_member_oid(relids_with_rf, myrelid)) |
1691 | 0 | ereport(ERROR, |
1692 | 0 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
1693 | 0 | errmsg("conflicting or redundant WHERE clauses for table \"%s\"", |
1694 | 0 | RelationGetRelationName(rel)))); |
1695 | | |
1696 | | /* Disallow duplicate tables if there are any with column lists. */ |
1697 | 0 | if (t->columns || list_member_oid(relids_with_collist, myrelid)) |
1698 | 0 | ereport(ERROR, |
1699 | 0 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
1700 | 0 | errmsg("conflicting or redundant column lists for table \"%s\"", |
1701 | 0 | RelationGetRelationName(rel)))); |
1702 | | |
1703 | 0 | table_close(rel, ShareUpdateExclusiveLock); |
1704 | 0 | continue; |
1705 | 0 | } |
1706 | | |
1707 | 0 | pub_rel = palloc(sizeof(PublicationRelInfo)); |
1708 | 0 | pub_rel->relation = rel; |
1709 | 0 | pub_rel->whereClause = t->whereClause; |
1710 | 0 | pub_rel->columns = t->columns; |
1711 | 0 | rels = lappend(rels, pub_rel); |
1712 | 0 | relids = lappend_oid(relids, myrelid); |
1713 | |
|
1714 | 0 | if (t->whereClause) |
1715 | 0 | relids_with_rf = lappend_oid(relids_with_rf, myrelid); |
1716 | |
|
1717 | 0 | if (t->columns) |
1718 | 0 | relids_with_collist = lappend_oid(relids_with_collist, myrelid); |
1719 | | |
1720 | | /* |
1721 | | * Add children of this rel, if requested, so that they too are added |
1722 | | * to the publication. A partitioned table can't have any inheritance |
1723 | | * children other than its partitions, which need not be explicitly |
1724 | | * added to the publication. |
1725 | | */ |
1726 | 0 | if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) |
1727 | 0 | { |
1728 | 0 | List *children; |
1729 | 0 | ListCell *child; |
1730 | |
|
1731 | 0 | children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock, |
1732 | 0 | NULL); |
1733 | |
|
1734 | 0 | foreach(child, children) |
1735 | 0 | { |
1736 | 0 | Oid childrelid = lfirst_oid(child); |
1737 | | |
1738 | | /* Allow query cancel in case this takes a long time */ |
1739 | 0 | CHECK_FOR_INTERRUPTS(); |
1740 | | |
1741 | | /* |
1742 | | * Skip duplicates if user specified both parent and child |
1743 | | * tables. |
1744 | | */ |
1745 | 0 | if (list_member_oid(relids, childrelid)) |
1746 | 0 | { |
1747 | | /* |
1748 | | * We don't allow to specify row filter for both parent |
1749 | | * and child table at the same time as it is not very |
1750 | | * clear which one should be given preference. |
1751 | | */ |
1752 | 0 | if (childrelid != myrelid && |
1753 | 0 | (t->whereClause || list_member_oid(relids_with_rf, childrelid))) |
1754 | 0 | ereport(ERROR, |
1755 | 0 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
1756 | 0 | errmsg("conflicting or redundant WHERE clauses for table \"%s\"", |
1757 | 0 | RelationGetRelationName(rel)))); |
1758 | | |
1759 | | /* |
1760 | | * We don't allow to specify column list for both parent |
1761 | | * and child table at the same time as it is not very |
1762 | | * clear which one should be given preference. |
1763 | | */ |
1764 | 0 | if (childrelid != myrelid && |
1765 | 0 | (t->columns || list_member_oid(relids_with_collist, childrelid))) |
1766 | 0 | ereport(ERROR, |
1767 | 0 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
1768 | 0 | errmsg("conflicting or redundant column lists for table \"%s\"", |
1769 | 0 | RelationGetRelationName(rel)))); |
1770 | | |
1771 | 0 | continue; |
1772 | 0 | } |
1773 | | |
1774 | | /* find_all_inheritors already got lock */ |
1775 | 0 | rel = table_open(childrelid, NoLock); |
1776 | 0 | pub_rel = palloc(sizeof(PublicationRelInfo)); |
1777 | 0 | pub_rel->relation = rel; |
1778 | | /* child inherits WHERE clause from parent */ |
1779 | 0 | pub_rel->whereClause = t->whereClause; |
1780 | | |
1781 | | /* child inherits column list from parent */ |
1782 | 0 | pub_rel->columns = t->columns; |
1783 | 0 | rels = lappend(rels, pub_rel); |
1784 | 0 | relids = lappend_oid(relids, childrelid); |
1785 | |
|
1786 | 0 | if (t->whereClause) |
1787 | 0 | relids_with_rf = lappend_oid(relids_with_rf, childrelid); |
1788 | |
|
1789 | 0 | if (t->columns) |
1790 | 0 | relids_with_collist = lappend_oid(relids_with_collist, childrelid); |
1791 | 0 | } |
1792 | 0 | } |
1793 | 0 | } |
1794 | | |
1795 | 0 | list_free(relids); |
1796 | 0 | list_free(relids_with_rf); |
1797 | |
|
1798 | 0 | return rels; |
1799 | 0 | } |
1800 | | |
1801 | | /* |
1802 | | * Close all relations in the list. |
1803 | | */ |
1804 | | static void |
1805 | | CloseTableList(List *rels) |
1806 | 0 | { |
1807 | 0 | ListCell *lc; |
1808 | |
|
1809 | 0 | foreach(lc, rels) |
1810 | 0 | { |
1811 | 0 | PublicationRelInfo *pub_rel; |
1812 | |
|
1813 | 0 | pub_rel = (PublicationRelInfo *) lfirst(lc); |
1814 | 0 | table_close(pub_rel->relation, NoLock); |
1815 | 0 | } |
1816 | |
|
1817 | 0 | list_free_deep(rels); |
1818 | 0 | } |
1819 | | |
1820 | | /* |
1821 | | * Lock the schemas specified in the schema list in AccessShareLock mode in |
1822 | | * order to prevent concurrent schema deletion. |
1823 | | */ |
1824 | | static void |
1825 | | LockSchemaList(List *schemalist) |
1826 | 0 | { |
1827 | 0 | ListCell *lc; |
1828 | |
|
1829 | 0 | foreach(lc, schemalist) |
1830 | 0 | { |
1831 | 0 | Oid schemaid = lfirst_oid(lc); |
1832 | | |
1833 | | /* Allow query cancel in case this takes a long time */ |
1834 | 0 | CHECK_FOR_INTERRUPTS(); |
1835 | 0 | LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock); |
1836 | | |
1837 | | /* |
1838 | | * It is possible that by the time we acquire the lock on schema, |
1839 | | * concurrent DDL has removed it. We can test this by checking the |
1840 | | * existence of schema. |
1841 | | */ |
1842 | 0 | if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid))) |
1843 | 0 | ereport(ERROR, |
1844 | 0 | errcode(ERRCODE_UNDEFINED_SCHEMA), |
1845 | 0 | errmsg("schema with OID %u does not exist", schemaid)); |
1846 | 0 | } |
1847 | 0 | } |
1848 | | |
1849 | | /* |
1850 | | * Add listed tables to the publication. |
1851 | | */ |
1852 | | static void |
1853 | | PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, |
1854 | | AlterPublicationStmt *stmt) |
1855 | 0 | { |
1856 | 0 | ListCell *lc; |
1857 | |
|
1858 | 0 | foreach(lc, rels) |
1859 | 0 | { |
1860 | 0 | PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); |
1861 | 0 | Relation rel = pub_rel->relation; |
1862 | 0 | ObjectAddress obj; |
1863 | | |
1864 | | /* Must be owner of the table or superuser. */ |
1865 | 0 | if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId())) |
1866 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), |
1867 | 0 | RelationGetRelationName(rel)); |
1868 | |
|
1869 | 0 | obj = publication_add_relation(pubid, pub_rel, if_not_exists); |
1870 | 0 | if (stmt) |
1871 | 0 | { |
1872 | 0 | EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, |
1873 | 0 | (Node *) stmt); |
1874 | |
|
1875 | 0 | InvokeObjectPostCreateHook(PublicationRelRelationId, |
1876 | 0 | obj.objectId, 0); |
1877 | 0 | } |
1878 | 0 | } |
1879 | 0 | } |
1880 | | |
1881 | | /* |
1882 | | * Remove listed tables from the publication. |
1883 | | */ |
1884 | | static void |
1885 | | PublicationDropTables(Oid pubid, List *rels, bool missing_ok) |
1886 | 0 | { |
1887 | 0 | ObjectAddress obj; |
1888 | 0 | ListCell *lc; |
1889 | 0 | Oid prid; |
1890 | |
|
1891 | 0 | foreach(lc, rels) |
1892 | 0 | { |
1893 | 0 | PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc); |
1894 | 0 | Relation rel = pubrel->relation; |
1895 | 0 | Oid relid = RelationGetRelid(rel); |
1896 | |
|
1897 | 0 | if (pubrel->columns) |
1898 | 0 | ereport(ERROR, |
1899 | 0 | errcode(ERRCODE_SYNTAX_ERROR), |
1900 | 0 | errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); |
1901 | | |
1902 | 0 | prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, |
1903 | 0 | ObjectIdGetDatum(relid), |
1904 | 0 | ObjectIdGetDatum(pubid)); |
1905 | 0 | if (!OidIsValid(prid)) |
1906 | 0 | { |
1907 | 0 | if (missing_ok) |
1908 | 0 | continue; |
1909 | | |
1910 | 0 | ereport(ERROR, |
1911 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
1912 | 0 | errmsg("relation \"%s\" is not part of the publication", |
1913 | 0 | RelationGetRelationName(rel)))); |
1914 | 0 | } |
1915 | | |
1916 | 0 | if (pubrel->whereClause) |
1917 | 0 | ereport(ERROR, |
1918 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
1919 | 0 | errmsg("cannot use a WHERE clause when removing a table from a publication"))); |
1920 | | |
1921 | 0 | ObjectAddressSet(obj, PublicationRelRelationId, prid); |
1922 | 0 | performDeletion(&obj, DROP_CASCADE, 0); |
1923 | 0 | } |
1924 | 0 | } |
1925 | | |
1926 | | /* |
1927 | | * Add listed schemas to the publication. |
1928 | | */ |
1929 | | static void |
1930 | | PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, |
1931 | | AlterPublicationStmt *stmt) |
1932 | 0 | { |
1933 | 0 | ListCell *lc; |
1934 | |
|
1935 | 0 | foreach(lc, schemas) |
1936 | 0 | { |
1937 | 0 | Oid schemaid = lfirst_oid(lc); |
1938 | 0 | ObjectAddress obj; |
1939 | |
|
1940 | 0 | obj = publication_add_schema(pubid, schemaid, if_not_exists); |
1941 | 0 | if (stmt) |
1942 | 0 | { |
1943 | 0 | EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, |
1944 | 0 | (Node *) stmt); |
1945 | |
|
1946 | 0 | InvokeObjectPostCreateHook(PublicationNamespaceRelationId, |
1947 | 0 | obj.objectId, 0); |
1948 | 0 | } |
1949 | 0 | } |
1950 | 0 | } |
1951 | | |
1952 | | /* |
1953 | | * Remove listed schemas from the publication. |
1954 | | */ |
1955 | | static void |
1956 | | PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) |
1957 | 0 | { |
1958 | 0 | ObjectAddress obj; |
1959 | 0 | ListCell *lc; |
1960 | 0 | Oid psid; |
1961 | |
|
1962 | 0 | foreach(lc, schemas) |
1963 | 0 | { |
1964 | 0 | Oid schemaid = lfirst_oid(lc); |
1965 | |
|
1966 | 0 | psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP, |
1967 | 0 | Anum_pg_publication_namespace_oid, |
1968 | 0 | ObjectIdGetDatum(schemaid), |
1969 | 0 | ObjectIdGetDatum(pubid)); |
1970 | 0 | if (!OidIsValid(psid)) |
1971 | 0 | { |
1972 | 0 | if (missing_ok) |
1973 | 0 | continue; |
1974 | | |
1975 | 0 | ereport(ERROR, |
1976 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
1977 | 0 | errmsg("tables from schema \"%s\" are not part of the publication", |
1978 | 0 | get_namespace_name(schemaid)))); |
1979 | 0 | } |
1980 | | |
1981 | 0 | ObjectAddressSet(obj, PublicationNamespaceRelationId, psid); |
1982 | 0 | performDeletion(&obj, DROP_CASCADE, 0); |
1983 | 0 | } |
1984 | 0 | } |
1985 | | |
1986 | | /* |
1987 | | * Internal workhorse for changing a publication owner |
1988 | | */ |
1989 | | static void |
1990 | | AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) |
1991 | 0 | { |
1992 | 0 | Form_pg_publication form; |
1993 | |
|
1994 | 0 | form = (Form_pg_publication) GETSTRUCT(tup); |
1995 | |
|
1996 | 0 | if (form->pubowner == newOwnerId) |
1997 | 0 | return; |
1998 | | |
1999 | 0 | if (!superuser()) |
2000 | 0 | { |
2001 | 0 | AclResult aclresult; |
2002 | | |
2003 | | /* Must be owner */ |
2004 | 0 | if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId())) |
2005 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, |
2006 | 0 | NameStr(form->pubname)); |
2007 | | |
2008 | | /* Must be able to become new owner */ |
2009 | 0 | check_can_set_role(GetUserId(), newOwnerId); |
2010 | | |
2011 | | /* New owner must have CREATE privilege on database */ |
2012 | 0 | aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE); |
2013 | 0 | if (aclresult != ACLCHECK_OK) |
2014 | 0 | aclcheck_error(aclresult, OBJECT_DATABASE, |
2015 | 0 | get_database_name(MyDatabaseId)); |
2016 | |
|
2017 | 0 | if (form->puballtables && !superuser_arg(newOwnerId)) |
2018 | 0 | ereport(ERROR, |
2019 | 0 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
2020 | 0 | errmsg("permission denied to change owner of publication \"%s\"", |
2021 | 0 | NameStr(form->pubname)), |
2022 | 0 | errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); |
2023 | | |
2024 | 0 | if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid)) |
2025 | 0 | ereport(ERROR, |
2026 | 0 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
2027 | 0 | errmsg("permission denied to change owner of publication \"%s\"", |
2028 | 0 | NameStr(form->pubname)), |
2029 | 0 | errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser."))); |
2030 | 0 | } |
2031 | | |
2032 | 0 | form->pubowner = newOwnerId; |
2033 | 0 | CatalogTupleUpdate(rel, &tup->t_self, tup); |
2034 | | |
2035 | | /* Update owner dependency reference */ |
2036 | 0 | changeDependencyOnOwner(PublicationRelationId, |
2037 | 0 | form->oid, |
2038 | 0 | newOwnerId); |
2039 | |
|
2040 | 0 | InvokeObjectPostAlterHook(PublicationRelationId, |
2041 | 0 | form->oid, 0); |
2042 | 0 | } |
2043 | | |
2044 | | /* |
2045 | | * Change publication owner -- by name |
2046 | | */ |
2047 | | ObjectAddress |
2048 | | AlterPublicationOwner(const char *name, Oid newOwnerId) |
2049 | 0 | { |
2050 | 0 | Oid pubid; |
2051 | 0 | HeapTuple tup; |
2052 | 0 | Relation rel; |
2053 | 0 | ObjectAddress address; |
2054 | 0 | Form_pg_publication pubform; |
2055 | |
|
2056 | 0 | rel = table_open(PublicationRelationId, RowExclusiveLock); |
2057 | |
|
2058 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name)); |
2059 | |
|
2060 | 0 | if (!HeapTupleIsValid(tup)) |
2061 | 0 | ereport(ERROR, |
2062 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
2063 | 0 | errmsg("publication \"%s\" does not exist", name))); |
2064 | | |
2065 | 0 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
2066 | 0 | pubid = pubform->oid; |
2067 | |
|
2068 | 0 | AlterPublicationOwner_internal(rel, tup, newOwnerId); |
2069 | |
|
2070 | 0 | ObjectAddressSet(address, PublicationRelationId, pubid); |
2071 | |
|
2072 | 0 | heap_freetuple(tup); |
2073 | |
|
2074 | 0 | table_close(rel, RowExclusiveLock); |
2075 | |
|
2076 | 0 | return address; |
2077 | 0 | } |
2078 | | |
2079 | | /* |
2080 | | * Change publication owner -- by OID |
2081 | | */ |
2082 | | void |
2083 | | AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId) |
2084 | 0 | { |
2085 | 0 | HeapTuple tup; |
2086 | 0 | Relation rel; |
2087 | |
|
2088 | 0 | rel = table_open(PublicationRelationId, RowExclusiveLock); |
2089 | |
|
2090 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
2091 | |
|
2092 | 0 | if (!HeapTupleIsValid(tup)) |
2093 | 0 | ereport(ERROR, |
2094 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
2095 | 0 | errmsg("publication with OID %u does not exist", pubid))); |
2096 | | |
2097 | 0 | AlterPublicationOwner_internal(rel, tup, newOwnerId); |
2098 | |
|
2099 | 0 | heap_freetuple(tup); |
2100 | |
|
2101 | 0 | table_close(rel, RowExclusiveLock); |
2102 | 0 | } |
2103 | | |
2104 | | /* |
2105 | | * Extract the publish_generated_columns option value from a DefElem. "stored" |
2106 | | * and "none" values are accepted. |
2107 | | */ |
2108 | | static char |
2109 | | defGetGeneratedColsOption(DefElem *def) |
2110 | 0 | { |
2111 | 0 | char *sval = ""; |
2112 | | |
2113 | | /* |
2114 | | * A parameter value is required. |
2115 | | */ |
2116 | 0 | if (def->arg) |
2117 | 0 | { |
2118 | 0 | sval = defGetString(def); |
2119 | |
|
2120 | 0 | if (pg_strcasecmp(sval, "none") == 0) |
2121 | 0 | return PUBLISH_GENCOLS_NONE; |
2122 | 0 | if (pg_strcasecmp(sval, "stored") == 0) |
2123 | 0 | return PUBLISH_GENCOLS_STORED; |
2124 | 0 | } |
2125 | | |
2126 | 0 | ereport(ERROR, |
2127 | 0 | errcode(ERRCODE_SYNTAX_ERROR), |
2128 | 0 | errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval), |
2129 | 0 | errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored")); |
2130 | | |
2131 | 0 | return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */ |
2132 | 0 | } |