/src/postgres/src/backend/replication/logical/relation.c
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * relation.c |
3 | | * PostgreSQL logical replication relation mapping cache |
4 | | * |
5 | | * Copyright (c) 2016-2025, PostgreSQL Global Development Group |
6 | | * |
7 | | * IDENTIFICATION |
8 | | * src/backend/replication/logical/relation.c |
9 | | * |
10 | | * NOTES |
11 | | * Routines in this file mainly have to do with mapping the properties |
12 | | * of local replication target relations to the properties of their |
13 | | * remote counterpart. |
14 | | * |
15 | | *------------------------------------------------------------------------- |
16 | | */ |
17 | | |
18 | | #include "postgres.h" |
19 | | |
20 | | #include "access/amapi.h" |
21 | | #include "access/genam.h" |
22 | | #include "access/table.h" |
23 | | #include "catalog/namespace.h" |
24 | | #include "catalog/pg_subscription_rel.h" |
25 | | #include "executor/executor.h" |
26 | | #include "nodes/makefuncs.h" |
27 | | #include "replication/logicalrelation.h" |
28 | | #include "replication/worker_internal.h" |
29 | | #include "utils/inval.h" |
30 | | #include "utils/lsyscache.h" |
31 | | #include "utils/syscache.h" |
32 | | |
33 | | |
34 | | static MemoryContext LogicalRepRelMapContext = NULL; |
35 | | |
36 | | static HTAB *LogicalRepRelMap = NULL; |
37 | | |
38 | | /* |
39 | | * Partition map (LogicalRepPartMap) |
40 | | * |
41 | | * When a partitioned table is used as replication target, replicated |
42 | | * operations are actually performed on its leaf partitions, which requires |
43 | | * the partitions to also be mapped to the remote relation. Parent's entry |
44 | | * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because |
45 | | * individual partitions may have different attribute numbers, which means |
46 | | * attribute mappings to remote relation's attributes must be maintained |
47 | | * separately for each partition. |
48 | | */ |
49 | | static MemoryContext LogicalRepPartMapContext = NULL; |
50 | | static HTAB *LogicalRepPartMap = NULL; |
51 | | typedef struct LogicalRepPartMapEntry |
52 | | { |
53 | | Oid partoid; /* LogicalRepPartMap's key */ |
54 | | LogicalRepRelMapEntry relmapentry; |
55 | | } LogicalRepPartMapEntry; |
56 | | |
57 | | static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, |
58 | | AttrMap *attrMap); |
59 | | |
60 | | /* |
61 | | * Relcache invalidation callback for our relation map cache. |
62 | | */ |
63 | | static void |
64 | | logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) |
65 | 0 | { |
66 | 0 | LogicalRepRelMapEntry *entry; |
67 | | |
68 | | /* Just to be sure. */ |
69 | 0 | if (LogicalRepRelMap == NULL) |
70 | 0 | return; |
71 | | |
72 | 0 | if (reloid != InvalidOid) |
73 | 0 | { |
74 | 0 | HASH_SEQ_STATUS status; |
75 | |
|
76 | 0 | hash_seq_init(&status, LogicalRepRelMap); |
77 | | |
78 | | /* TODO, use inverse lookup hashtable? */ |
79 | 0 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
80 | 0 | { |
81 | 0 | if (entry->localreloid == reloid) |
82 | 0 | { |
83 | 0 | entry->localrelvalid = false; |
84 | 0 | hash_seq_term(&status); |
85 | 0 | break; |
86 | 0 | } |
87 | 0 | } |
88 | 0 | } |
89 | 0 | else |
90 | 0 | { |
91 | | /* invalidate all cache entries */ |
92 | 0 | HASH_SEQ_STATUS status; |
93 | |
|
94 | 0 | hash_seq_init(&status, LogicalRepRelMap); |
95 | |
|
96 | 0 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
97 | 0 | entry->localrelvalid = false; |
98 | 0 | } |
99 | 0 | } |
100 | | |
101 | | /* |
102 | | * Initialize the relation map cache. |
103 | | */ |
104 | | static void |
105 | | logicalrep_relmap_init(void) |
106 | 0 | { |
107 | 0 | HASHCTL ctl; |
108 | |
|
109 | 0 | if (!LogicalRepRelMapContext) |
110 | 0 | LogicalRepRelMapContext = |
111 | 0 | AllocSetContextCreate(CacheMemoryContext, |
112 | 0 | "LogicalRepRelMapContext", |
113 | 0 | ALLOCSET_DEFAULT_SIZES); |
114 | | |
115 | | /* Initialize the relation hash table. */ |
116 | 0 | ctl.keysize = sizeof(LogicalRepRelId); |
117 | 0 | ctl.entrysize = sizeof(LogicalRepRelMapEntry); |
118 | 0 | ctl.hcxt = LogicalRepRelMapContext; |
119 | |
|
120 | 0 | LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl, |
121 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
122 | | |
123 | | /* Watch for invalidation events. */ |
124 | 0 | CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, |
125 | 0 | (Datum) 0); |
126 | 0 | } |
127 | | |
128 | | /* |
129 | | * Free the entry of a relation map cache. |
130 | | */ |
131 | | static void |
132 | | logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) |
133 | 0 | { |
134 | 0 | LogicalRepRelation *remoterel; |
135 | |
|
136 | 0 | remoterel = &entry->remoterel; |
137 | |
|
138 | 0 | pfree(remoterel->nspname); |
139 | 0 | pfree(remoterel->relname); |
140 | |
|
141 | 0 | if (remoterel->natts > 0) |
142 | 0 | { |
143 | 0 | int i; |
144 | |
|
145 | 0 | for (i = 0; i < remoterel->natts; i++) |
146 | 0 | pfree(remoterel->attnames[i]); |
147 | |
|
148 | 0 | pfree(remoterel->attnames); |
149 | 0 | pfree(remoterel->atttyps); |
150 | 0 | } |
151 | 0 | bms_free(remoterel->attkeys); |
152 | |
|
153 | 0 | if (entry->attrmap) |
154 | 0 | free_attrmap(entry->attrmap); |
155 | 0 | } |
156 | | |
157 | | /* |
158 | | * Add new entry or update existing entry in the relation map cache. |
159 | | * |
160 | | * Called when new relation mapping is sent by the publisher to update |
161 | | * our expected view of incoming data from said publisher. |
162 | | */ |
163 | | void |
164 | | logicalrep_relmap_update(LogicalRepRelation *remoterel) |
165 | 0 | { |
166 | 0 | MemoryContext oldctx; |
167 | 0 | LogicalRepRelMapEntry *entry; |
168 | 0 | bool found; |
169 | 0 | int i; |
170 | |
|
171 | 0 | if (LogicalRepRelMap == NULL) |
172 | 0 | logicalrep_relmap_init(); |
173 | | |
174 | | /* |
175 | | * HASH_ENTER returns the existing entry if present or creates a new one. |
176 | | */ |
177 | 0 | entry = hash_search(LogicalRepRelMap, &remoterel->remoteid, |
178 | 0 | HASH_ENTER, &found); |
179 | |
|
180 | 0 | if (found) |
181 | 0 | logicalrep_relmap_free_entry(entry); |
182 | |
|
183 | 0 | memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
184 | | |
185 | | /* Make cached copy of the data */ |
186 | 0 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
187 | 0 | entry->remoterel.remoteid = remoterel->remoteid; |
188 | 0 | entry->remoterel.nspname = pstrdup(remoterel->nspname); |
189 | 0 | entry->remoterel.relname = pstrdup(remoterel->relname); |
190 | 0 | entry->remoterel.natts = remoterel->natts; |
191 | 0 | entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
192 | 0 | entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
193 | 0 | for (i = 0; i < remoterel->natts; i++) |
194 | 0 | { |
195 | 0 | entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
196 | 0 | entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
197 | 0 | } |
198 | 0 | entry->remoterel.replident = remoterel->replident; |
199 | 0 | entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
200 | 0 | MemoryContextSwitchTo(oldctx); |
201 | 0 | } |
202 | | |
203 | | /* |
204 | | * Find attribute index in TupleDesc struct by attribute name. |
205 | | * |
206 | | * Returns -1 if not found. |
207 | | */ |
208 | | static int |
209 | | logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) |
210 | 0 | { |
211 | 0 | int i; |
212 | |
|
213 | 0 | for (i = 0; i < remoterel->natts; i++) |
214 | 0 | { |
215 | 0 | if (strcmp(remoterel->attnames[i], attname) == 0) |
216 | 0 | return i; |
217 | 0 | } |
218 | | |
219 | 0 | return -1; |
220 | 0 | } |
221 | | |
222 | | /* |
223 | | * Returns a comma-separated string of attribute names based on the provided |
224 | | * relation and bitmap indicating which attributes to include. |
225 | | */ |
226 | | static char * |
227 | | logicalrep_get_attrs_str(LogicalRepRelation *remoterel, Bitmapset *atts) |
228 | 0 | { |
229 | 0 | StringInfoData attsbuf; |
230 | 0 | int attcnt = 0; |
231 | 0 | int i = -1; |
232 | |
|
233 | 0 | Assert(!bms_is_empty(atts)); |
234 | |
|
235 | 0 | initStringInfo(&attsbuf); |
236 | |
|
237 | 0 | while ((i = bms_next_member(atts, i)) >= 0) |
238 | 0 | { |
239 | 0 | attcnt++; |
240 | 0 | if (attcnt > 1) |
241 | 0 | appendStringInfoString(&attsbuf, _(", ")); |
242 | |
|
243 | 0 | appendStringInfo(&attsbuf, _("\"%s\""), remoterel->attnames[i]); |
244 | 0 | } |
245 | |
|
246 | 0 | return attsbuf.data; |
247 | 0 | } |
248 | | |
249 | | /* |
250 | | * If attempting to replicate missing or generated columns, report an error. |
251 | | * Prioritize 'missing' errors if both occur though the prioritization is |
252 | | * arbitrary. |
253 | | */ |
254 | | static void |
255 | | logicalrep_report_missing_or_gen_attrs(LogicalRepRelation *remoterel, |
256 | | Bitmapset *missingatts, |
257 | | Bitmapset *generatedatts) |
258 | 0 | { |
259 | 0 | if (!bms_is_empty(missingatts)) |
260 | 0 | ereport(ERROR, |
261 | 0 | errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
262 | 0 | errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s", |
263 | 0 | "logical replication target relation \"%s.%s\" is missing replicated columns: %s", |
264 | 0 | bms_num_members(missingatts), |
265 | 0 | remoterel->nspname, |
266 | 0 | remoterel->relname, |
267 | 0 | logicalrep_get_attrs_str(remoterel, |
268 | 0 | missingatts))); |
269 | | |
270 | 0 | if (!bms_is_empty(generatedatts)) |
271 | 0 | ereport(ERROR, |
272 | 0 | errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
273 | 0 | errmsg_plural("logical replication target relation \"%s.%s\" has incompatible generated column: %s", |
274 | 0 | "logical replication target relation \"%s.%s\" has incompatible generated columns: %s", |
275 | 0 | bms_num_members(generatedatts), |
276 | 0 | remoterel->nspname, |
277 | 0 | remoterel->relname, |
278 | 0 | logicalrep_get_attrs_str(remoterel, |
279 | 0 | generatedatts))); |
280 | 0 | } |
281 | | |
282 | | /* |
283 | | * Check if replica identity matches and mark the updatable flag. |
284 | | * |
285 | | * We allow for stricter replica identity (fewer columns) on subscriber as |
286 | | * that will not stop us from finding unique tuple. IE, if publisher has |
287 | | * identity (id,timestamp) and subscriber just (id) this will not be a |
288 | | * problem, but in the opposite scenario it will. |
289 | | * |
290 | | * We just mark the relation entry as not updatable here if the local |
291 | | * replica identity is found to be insufficient for applying |
292 | | * updates/deletes (inserts don't care!) and leave it to |
293 | | * check_relation_updatable() to throw the actual error if needed. |
294 | | */ |
295 | | static void |
296 | | logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) |
297 | 0 | { |
298 | 0 | Bitmapset *idkey; |
299 | 0 | LogicalRepRelation *remoterel = &entry->remoterel; |
300 | 0 | int i; |
301 | |
|
302 | 0 | entry->updatable = true; |
303 | |
|
304 | 0 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
305 | 0 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
306 | | /* fallback to PK if no replica identity */ |
307 | 0 | if (idkey == NULL) |
308 | 0 | { |
309 | 0 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
310 | 0 | INDEX_ATTR_BITMAP_PRIMARY_KEY); |
311 | | |
312 | | /* |
313 | | * If no replica identity index and no PK, the published table must |
314 | | * have replica identity FULL. |
315 | | */ |
316 | 0 | if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) |
317 | 0 | entry->updatable = false; |
318 | 0 | } |
319 | |
|
320 | 0 | i = -1; |
321 | 0 | while ((i = bms_next_member(idkey, i)) >= 0) |
322 | 0 | { |
323 | 0 | int attnum = i + FirstLowInvalidHeapAttributeNumber; |
324 | |
|
325 | 0 | if (!AttrNumberIsForUserDefinedAttr(attnum)) |
326 | 0 | ereport(ERROR, |
327 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
328 | 0 | errmsg("logical replication target relation \"%s.%s\" uses " |
329 | 0 | "system columns in REPLICA IDENTITY index", |
330 | 0 | remoterel->nspname, remoterel->relname))); |
331 | | |
332 | 0 | attnum = AttrNumberGetAttrOffset(attnum); |
333 | |
|
334 | 0 | if (entry->attrmap->attnums[attnum] < 0 || |
335 | 0 | !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) |
336 | 0 | { |
337 | 0 | entry->updatable = false; |
338 | 0 | break; |
339 | 0 | } |
340 | 0 | } |
341 | 0 | } |
342 | | |
343 | | /* |
344 | | * Open the local relation associated with the remote one. |
345 | | * |
346 | | * Rebuilds the Relcache mapping if it was invalidated by local DDL. |
347 | | */ |
348 | | LogicalRepRelMapEntry * |
349 | | logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) |
350 | 0 | { |
351 | 0 | LogicalRepRelMapEntry *entry; |
352 | 0 | bool found; |
353 | 0 | LogicalRepRelation *remoterel; |
354 | |
|
355 | 0 | if (LogicalRepRelMap == NULL) |
356 | 0 | logicalrep_relmap_init(); |
357 | | |
358 | | /* Search for existing entry. */ |
359 | 0 | entry = hash_search(LogicalRepRelMap, &remoteid, |
360 | 0 | HASH_FIND, &found); |
361 | |
|
362 | 0 | if (!found) |
363 | 0 | elog(ERROR, "no relation map entry for remote relation ID %u", |
364 | 0 | remoteid); |
365 | | |
366 | 0 | remoterel = &entry->remoterel; |
367 | | |
368 | | /* Ensure we don't leak a relcache refcount. */ |
369 | 0 | if (entry->localrel) |
370 | 0 | elog(ERROR, "remote relation ID %u is already open", remoteid); |
371 | | |
372 | | /* |
373 | | * When opening and locking a relation, pending invalidation messages are |
374 | | * processed which can invalidate the relation. Hence, if the entry is |
375 | | * currently considered valid, try to open the local relation by OID and |
376 | | * see if invalidation ensues. |
377 | | */ |
378 | 0 | if (entry->localrelvalid) |
379 | 0 | { |
380 | 0 | entry->localrel = try_table_open(entry->localreloid, lockmode); |
381 | 0 | if (!entry->localrel) |
382 | 0 | { |
383 | | /* Table was renamed or dropped. */ |
384 | 0 | entry->localrelvalid = false; |
385 | 0 | } |
386 | 0 | else if (!entry->localrelvalid) |
387 | 0 | { |
388 | | /* Note we release the no-longer-useful lock here. */ |
389 | 0 | table_close(entry->localrel, lockmode); |
390 | 0 | entry->localrel = NULL; |
391 | 0 | } |
392 | 0 | } |
393 | | |
394 | | /* |
395 | | * If the entry has been marked invalid since we last had lock on it, |
396 | | * re-open the local relation by name and rebuild all derived data. |
397 | | */ |
398 | 0 | if (!entry->localrelvalid) |
399 | 0 | { |
400 | 0 | Oid relid; |
401 | 0 | TupleDesc desc; |
402 | 0 | MemoryContext oldctx; |
403 | 0 | int i; |
404 | 0 | Bitmapset *missingatts; |
405 | 0 | Bitmapset *generatedattrs = NULL; |
406 | | |
407 | | /* Release the no-longer-useful attrmap, if any. */ |
408 | 0 | if (entry->attrmap) |
409 | 0 | { |
410 | 0 | free_attrmap(entry->attrmap); |
411 | 0 | entry->attrmap = NULL; |
412 | 0 | } |
413 | | |
414 | | /* Try to find and lock the relation by name. */ |
415 | 0 | relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, |
416 | 0 | remoterel->relname, -1), |
417 | 0 | lockmode, true); |
418 | 0 | if (!OidIsValid(relid)) |
419 | 0 | ereport(ERROR, |
420 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
421 | 0 | errmsg("logical replication target relation \"%s.%s\" does not exist", |
422 | 0 | remoterel->nspname, remoterel->relname))); |
423 | 0 | entry->localrel = table_open(relid, NoLock); |
424 | 0 | entry->localreloid = relid; |
425 | | |
426 | | /* Check for supported relkind. */ |
427 | 0 | CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, |
428 | 0 | remoterel->nspname, remoterel->relname); |
429 | | |
430 | | /* |
431 | | * Build the mapping of local attribute numbers to remote attribute |
432 | | * numbers and validate that we don't miss any replicated columns as |
433 | | * that would result in potentially unwanted data loss. |
434 | | */ |
435 | 0 | desc = RelationGetDescr(entry->localrel); |
436 | 0 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
437 | 0 | entry->attrmap = make_attrmap(desc->natts); |
438 | 0 | MemoryContextSwitchTo(oldctx); |
439 | | |
440 | | /* check and report missing attrs, if any */ |
441 | 0 | missingatts = bms_add_range(NULL, 0, remoterel->natts - 1); |
442 | 0 | for (i = 0; i < desc->natts; i++) |
443 | 0 | { |
444 | 0 | int attnum; |
445 | 0 | Form_pg_attribute attr = TupleDescAttr(desc, i); |
446 | |
|
447 | 0 | if (attr->attisdropped) |
448 | 0 | { |
449 | 0 | entry->attrmap->attnums[i] = -1; |
450 | 0 | continue; |
451 | 0 | } |
452 | | |
453 | 0 | attnum = logicalrep_rel_att_by_name(remoterel, |
454 | 0 | NameStr(attr->attname)); |
455 | |
|
456 | 0 | entry->attrmap->attnums[i] = attnum; |
457 | 0 | if (attnum >= 0) |
458 | 0 | { |
459 | | /* Remember which subscriber columns are generated. */ |
460 | 0 | if (attr->attgenerated) |
461 | 0 | generatedattrs = bms_add_member(generatedattrs, attnum); |
462 | |
|
463 | 0 | missingatts = bms_del_member(missingatts, attnum); |
464 | 0 | } |
465 | 0 | } |
466 | |
|
467 | 0 | logicalrep_report_missing_or_gen_attrs(remoterel, missingatts, |
468 | 0 | generatedattrs); |
469 | | |
470 | | /* be tidy */ |
471 | 0 | bms_free(generatedattrs); |
472 | 0 | bms_free(missingatts); |
473 | | |
474 | | /* |
475 | | * Set if the table's replica identity is enough to apply |
476 | | * update/delete. |
477 | | */ |
478 | 0 | logicalrep_rel_mark_updatable(entry); |
479 | | |
480 | | /* |
481 | | * Finding a usable index is an infrequent task. It occurs when an |
482 | | * operation is first performed on the relation, or after invalidation |
483 | | * of the relation cache entry (such as ANALYZE or CREATE/DROP index |
484 | | * on the relation). |
485 | | */ |
486 | 0 | entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel, |
487 | 0 | entry->attrmap); |
488 | |
|
489 | 0 | entry->localrelvalid = true; |
490 | 0 | } |
491 | | |
492 | 0 | if (entry->state != SUBREL_STATE_READY) |
493 | 0 | entry->state = GetSubscriptionRelState(MySubscription->oid, |
494 | 0 | entry->localreloid, |
495 | 0 | &entry->statelsn); |
496 | |
|
497 | 0 | return entry; |
498 | 0 | } |
499 | | |
500 | | /* |
501 | | * Close the previously opened logical relation. |
502 | | */ |
503 | | void |
504 | | logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) |
505 | 0 | { |
506 | 0 | table_close(rel->localrel, lockmode); |
507 | 0 | rel->localrel = NULL; |
508 | 0 | } |
509 | | |
510 | | /* |
511 | | * Partition cache: look up partition LogicalRepRelMapEntry's |
512 | | * |
513 | | * Unlike relation map cache, this is keyed by partition OID, not remote |
514 | | * relation OID, because we only have to use this cache in the case where |
515 | | * partitions are not directly mapped to any remote relation, such as when |
516 | | * replication is occurring with one of their ancestors as target. |
517 | | */ |
518 | | |
519 | | /* |
520 | | * Relcache invalidation callback |
521 | | */ |
522 | | static void |
523 | | logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) |
524 | 0 | { |
525 | 0 | LogicalRepPartMapEntry *entry; |
526 | | |
527 | | /* Just to be sure. */ |
528 | 0 | if (LogicalRepPartMap == NULL) |
529 | 0 | return; |
530 | | |
531 | 0 | if (reloid != InvalidOid) |
532 | 0 | { |
533 | 0 | HASH_SEQ_STATUS status; |
534 | |
|
535 | 0 | hash_seq_init(&status, LogicalRepPartMap); |
536 | | |
537 | | /* TODO, use inverse lookup hashtable? */ |
538 | 0 | while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
539 | 0 | { |
540 | 0 | if (entry->relmapentry.localreloid == reloid) |
541 | 0 | { |
542 | 0 | entry->relmapentry.localrelvalid = false; |
543 | 0 | hash_seq_term(&status); |
544 | 0 | break; |
545 | 0 | } |
546 | 0 | } |
547 | 0 | } |
548 | 0 | else |
549 | 0 | { |
550 | | /* invalidate all cache entries */ |
551 | 0 | HASH_SEQ_STATUS status; |
552 | |
|
553 | 0 | hash_seq_init(&status, LogicalRepPartMap); |
554 | |
|
555 | 0 | while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
556 | 0 | entry->relmapentry.localrelvalid = false; |
557 | 0 | } |
558 | 0 | } |
559 | | |
560 | | /* |
561 | | * Reset the entries in the partition map that refer to remoterel. |
562 | | * |
563 | | * Called when new relation mapping is sent by the publisher to update our |
564 | | * expected view of incoming data from said publisher. |
565 | | * |
566 | | * Note that we don't update the remoterel information in the entry here, |
567 | | * we will update the information in logicalrep_partition_open to avoid |
568 | | * unnecessary work. |
569 | | */ |
570 | | void |
571 | | logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel) |
572 | 0 | { |
573 | 0 | HASH_SEQ_STATUS status; |
574 | 0 | LogicalRepPartMapEntry *part_entry; |
575 | 0 | LogicalRepRelMapEntry *entry; |
576 | |
|
577 | 0 | if (LogicalRepPartMap == NULL) |
578 | 0 | return; |
579 | | |
580 | 0 | hash_seq_init(&status, LogicalRepPartMap); |
581 | 0 | while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL) |
582 | 0 | { |
583 | 0 | entry = &part_entry->relmapentry; |
584 | |
|
585 | 0 | if (entry->remoterel.remoteid != remoterel->remoteid) |
586 | 0 | continue; |
587 | | |
588 | 0 | logicalrep_relmap_free_entry(entry); |
589 | |
|
590 | 0 | memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
591 | 0 | } |
592 | 0 | } |
593 | | |
594 | | /* |
595 | | * Initialize the partition map cache. |
596 | | */ |
597 | | static void |
598 | | logicalrep_partmap_init(void) |
599 | 0 | { |
600 | 0 | HASHCTL ctl; |
601 | |
|
602 | 0 | if (!LogicalRepPartMapContext) |
603 | 0 | LogicalRepPartMapContext = |
604 | 0 | AllocSetContextCreate(CacheMemoryContext, |
605 | 0 | "LogicalRepPartMapContext", |
606 | 0 | ALLOCSET_DEFAULT_SIZES); |
607 | | |
608 | | /* Initialize the relation hash table. */ |
609 | 0 | ctl.keysize = sizeof(Oid); /* partition OID */ |
610 | 0 | ctl.entrysize = sizeof(LogicalRepPartMapEntry); |
611 | 0 | ctl.hcxt = LogicalRepPartMapContext; |
612 | |
|
613 | 0 | LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, |
614 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
615 | | |
616 | | /* Watch for invalidation events. */ |
617 | 0 | CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, |
618 | 0 | (Datum) 0); |
619 | 0 | } |
620 | | |
621 | | /* |
622 | | * logicalrep_partition_open |
623 | | * |
624 | | * Returned entry reuses most of the values of the root table's entry, save |
625 | | * the attribute map, which can be different for the partition. However, |
626 | | * we must physically copy all the data, in case the root table's entry |
627 | | * gets freed/rebuilt. |
628 | | * |
629 | | * Note there's no logicalrep_partition_close, because the caller closes the |
630 | | * component relation. |
631 | | */ |
632 | | LogicalRepRelMapEntry * |
633 | | logicalrep_partition_open(LogicalRepRelMapEntry *root, |
634 | | Relation partrel, AttrMap *map) |
635 | 0 | { |
636 | 0 | LogicalRepRelMapEntry *entry; |
637 | 0 | LogicalRepPartMapEntry *part_entry; |
638 | 0 | LogicalRepRelation *remoterel = &root->remoterel; |
639 | 0 | Oid partOid = RelationGetRelid(partrel); |
640 | 0 | AttrMap *attrmap = root->attrmap; |
641 | 0 | bool found; |
642 | 0 | MemoryContext oldctx; |
643 | |
|
644 | 0 | if (LogicalRepPartMap == NULL) |
645 | 0 | logicalrep_partmap_init(); |
646 | | |
647 | | /* Search for existing entry. */ |
648 | 0 | part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap, |
649 | 0 | &partOid, |
650 | 0 | HASH_ENTER, &found); |
651 | |
|
652 | 0 | entry = &part_entry->relmapentry; |
653 | | |
654 | | /* |
655 | | * We must always overwrite entry->localrel with the latest partition |
656 | | * Relation pointer, because the Relation pointed to by the old value may |
657 | | * have been cleared after the caller would have closed the partition |
658 | | * relation after the last use of this entry. Note that localrelvalid is |
659 | | * only updated by the relcache invalidation callback, so it may still be |
660 | | * true irrespective of whether the Relation pointed to by localrel has |
661 | | * been cleared or not. |
662 | | */ |
663 | 0 | if (found && entry->localrelvalid) |
664 | 0 | { |
665 | 0 | entry->localrel = partrel; |
666 | 0 | return entry; |
667 | 0 | } |
668 | | |
669 | | /* Switch to longer-lived context. */ |
670 | 0 | oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext); |
671 | |
|
672 | 0 | if (!found) |
673 | 0 | { |
674 | 0 | memset(part_entry, 0, sizeof(LogicalRepPartMapEntry)); |
675 | 0 | part_entry->partoid = partOid; |
676 | 0 | } |
677 | | |
678 | | /* Release the no-longer-useful attrmap, if any. */ |
679 | 0 | if (entry->attrmap) |
680 | 0 | { |
681 | 0 | free_attrmap(entry->attrmap); |
682 | 0 | entry->attrmap = NULL; |
683 | 0 | } |
684 | |
|
685 | 0 | if (!entry->remoterel.remoteid) |
686 | 0 | { |
687 | 0 | int i; |
688 | | |
689 | | /* Remote relation is copied as-is from the root entry. */ |
690 | 0 | entry->remoterel.remoteid = remoterel->remoteid; |
691 | 0 | entry->remoterel.nspname = pstrdup(remoterel->nspname); |
692 | 0 | entry->remoterel.relname = pstrdup(remoterel->relname); |
693 | 0 | entry->remoterel.natts = remoterel->natts; |
694 | 0 | entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
695 | 0 | entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
696 | 0 | for (i = 0; i < remoterel->natts; i++) |
697 | 0 | { |
698 | 0 | entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
699 | 0 | entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
700 | 0 | } |
701 | 0 | entry->remoterel.replident = remoterel->replident; |
702 | 0 | entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
703 | 0 | } |
704 | |
|
705 | 0 | entry->localrel = partrel; |
706 | 0 | entry->localreloid = partOid; |
707 | | |
708 | | /* |
709 | | * If the partition's attributes don't match the root relation's, we'll |
710 | | * need to make a new attrmap which maps partition attribute numbers to |
711 | | * remoterel's, instead of the original which maps root relation's |
712 | | * attribute numbers to remoterel's. |
713 | | * |
714 | | * Note that 'map' which comes from the tuple routing data structure |
715 | | * contains 1-based attribute numbers (of the parent relation). However, |
716 | | * the map in 'entry', a logical replication data structure, contains |
717 | | * 0-based attribute numbers (of the remote relation). |
718 | | */ |
719 | 0 | if (map) |
720 | 0 | { |
721 | 0 | AttrNumber attno; |
722 | |
|
723 | 0 | entry->attrmap = make_attrmap(map->maplen); |
724 | 0 | for (attno = 0; attno < entry->attrmap->maplen; attno++) |
725 | 0 | { |
726 | 0 | AttrNumber root_attno = map->attnums[attno]; |
727 | | |
728 | | /* 0 means it's a dropped attribute. See comments atop AttrMap. */ |
729 | 0 | if (root_attno == 0) |
730 | 0 | entry->attrmap->attnums[attno] = -1; |
731 | 0 | else |
732 | 0 | entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; |
733 | 0 | } |
734 | 0 | } |
735 | 0 | else |
736 | 0 | { |
737 | | /* Lacking copy_attmap, do this the hard way. */ |
738 | 0 | entry->attrmap = make_attrmap(attrmap->maplen); |
739 | 0 | memcpy(entry->attrmap->attnums, attrmap->attnums, |
740 | 0 | attrmap->maplen * sizeof(AttrNumber)); |
741 | 0 | } |
742 | | |
743 | | /* Set if the table's replica identity is enough to apply update/delete. */ |
744 | 0 | logicalrep_rel_mark_updatable(entry); |
745 | | |
746 | | /* state and statelsn are left set to 0. */ |
747 | 0 | MemoryContextSwitchTo(oldctx); |
748 | | |
749 | | /* |
750 | | * Finding a usable index is an infrequent task. It occurs when an |
751 | | * operation is first performed on the relation, or after invalidation of |
752 | | * the relation cache entry (such as ANALYZE or CREATE/DROP index on the |
753 | | * relation). |
754 | | * |
755 | | * We also prefer to run this code on the oldctx so that we do not leak |
756 | | * anything in the LogicalRepPartMapContext (hence CacheMemoryContext). |
757 | | */ |
758 | 0 | entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, |
759 | 0 | entry->attrmap); |
760 | |
|
761 | 0 | entry->localrelvalid = true; |
762 | |
|
763 | 0 | return entry; |
764 | 0 | } |
765 | | |
766 | | /* |
767 | | * Returns the oid of an index that can be used by the apply worker to scan |
768 | | * the relation. |
769 | | * |
770 | | * We expect to call this function when REPLICA IDENTITY FULL is defined for |
771 | | * the remote relation. |
772 | | * |
773 | | * If no suitable index is found, returns InvalidOid. |
774 | | */ |
775 | | static Oid |
776 | | FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap) |
777 | 0 | { |
778 | 0 | List *idxlist = RelationGetIndexList(localrel); |
779 | |
|
780 | 0 | foreach_oid(idxoid, idxlist) |
781 | 0 | { |
782 | 0 | bool isUsableIdx; |
783 | 0 | Relation idxRel; |
784 | |
|
785 | 0 | idxRel = index_open(idxoid, AccessShareLock); |
786 | 0 | isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxRel, attrmap); |
787 | 0 | index_close(idxRel, AccessShareLock); |
788 | | |
789 | | /* Return the first eligible index found */ |
790 | 0 | if (isUsableIdx) |
791 | 0 | return idxoid; |
792 | 0 | } |
793 | | |
794 | 0 | return InvalidOid; |
795 | 0 | } |
796 | | |
797 | | /* |
798 | | * Returns true if the index is usable for replica identity full. |
799 | | * |
800 | | * The index must have an equal strategy for each key column, be non-partial, |
801 | | * and the leftmost field must be a column (not an expression) that references |
802 | | * the remote relation column. These limitations help to keep the index scan |
803 | | * similar to PK/RI index scans. |
804 | | * |
805 | | * attrmap is a map of local attributes to remote ones. We can consult this |
806 | | * map to check whether the local index attribute has a corresponding remote |
807 | | * attribute. |
808 | | * |
809 | | * Note that the limitations of index scans for replica identity full only |
810 | | * adheres to a subset of the limitations of PK/RI. For example, we support |
811 | | * columns that are marked as [NULL] or we are not interested in the [NOT |
812 | | * DEFERRABLE] aspect of constraints here. It works for us because we always |
813 | | * compare the tuples for non-PK/RI index scans. See |
814 | | * RelationFindReplTupleByIndex(). |
815 | | * |
816 | | * XXX: To support partial indexes, the required changes are likely to be larger. |
817 | | * If none of the tuples satisfy the expression for the index scan, we fall-back |
818 | | * to sequential execution, which might not be a good idea in some cases. |
819 | | */ |
820 | | bool |
821 | | IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap) |
822 | 0 | { |
823 | 0 | AttrNumber keycol; |
824 | 0 | oidvector *indclass; |
825 | | |
826 | | /* The index must not be a partial index */ |
827 | 0 | if (!heap_attisnull(idxrel->rd_indextuple, Anum_pg_index_indpred, NULL)) |
828 | 0 | return false; |
829 | | |
830 | 0 | Assert(idxrel->rd_index->indnatts >= 1); |
831 | |
|
832 | 0 | indclass = (oidvector *) DatumGetPointer(SysCacheGetAttrNotNull(INDEXRELID, |
833 | 0 | idxrel->rd_indextuple, |
834 | 0 | Anum_pg_index_indclass)); |
835 | | |
836 | | /* Ensure that the index has a valid equal strategy for each key column */ |
837 | 0 | for (int i = 0; i < idxrel->rd_index->indnkeyatts; i++) |
838 | 0 | { |
839 | 0 | Oid opfamily; |
840 | |
|
841 | 0 | opfamily = get_opclass_family(indclass->values[i]); |
842 | 0 | if (IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, true) == InvalidStrategy) |
843 | 0 | return false; |
844 | 0 | } |
845 | | |
846 | | /* |
847 | | * For indexes other than PK and REPLICA IDENTITY, we need to match the |
848 | | * local and remote tuples. The equality routine tuples_equal() cannot |
849 | | * accept a data type where the type cache cannot provide an equality |
850 | | * operator. |
851 | | */ |
852 | 0 | for (int i = 0; i < idxrel->rd_att->natts; i++) |
853 | 0 | { |
854 | 0 | TypeCacheEntry *typentry; |
855 | |
|
856 | 0 | typentry = lookup_type_cache(TupleDescAttr(idxrel->rd_att, i)->atttypid, TYPECACHE_EQ_OPR_FINFO); |
857 | 0 | if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) |
858 | 0 | return false; |
859 | 0 | } |
860 | | |
861 | | /* The leftmost index field must not be an expression */ |
862 | 0 | keycol = idxrel->rd_index->indkey.values[0]; |
863 | 0 | if (!AttributeNumberIsValid(keycol)) |
864 | 0 | return false; |
865 | | |
866 | | /* |
867 | | * And the leftmost index field must reference the remote relation column. |
868 | | * This is because if it doesn't, the sequential scan is favorable over |
869 | | * index scan in most cases. |
870 | | */ |
871 | 0 | if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol) || |
872 | 0 | attrmap->attnums[AttrNumberGetAttrOffset(keycol)] < 0) |
873 | 0 | return false; |
874 | | |
875 | | /* |
876 | | * The given index access method must implement "amgettuple", which will |
877 | | * be used later to fetch the tuples. See RelationFindReplTupleByIndex(). |
878 | | */ |
879 | 0 | if (GetIndexAmRoutineByAmId(idxrel->rd_rel->relam, false)->amgettuple == NULL) |
880 | 0 | return false; |
881 | | |
882 | 0 | return true; |
883 | 0 | } |
884 | | |
885 | | /* |
886 | | * Return the OID of the replica identity index if one is defined; |
887 | | * the OID of the PK if one exists and is not deferrable; |
888 | | * otherwise, InvalidOid. |
889 | | */ |
890 | | Oid |
891 | | GetRelationIdentityOrPK(Relation rel) |
892 | 0 | { |
893 | 0 | Oid idxoid; |
894 | |
|
895 | 0 | idxoid = RelationGetReplicaIndex(rel); |
896 | |
|
897 | 0 | if (!OidIsValid(idxoid)) |
898 | 0 | idxoid = RelationGetPrimaryKeyIndex(rel, false); |
899 | |
|
900 | 0 | return idxoid; |
901 | 0 | } |
902 | | |
903 | | /* |
904 | | * Returns the index oid if we can use an index for subscriber. Otherwise, |
905 | | * returns InvalidOid. |
906 | | */ |
907 | | static Oid |
908 | | FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, |
909 | | AttrMap *attrMap) |
910 | 0 | { |
911 | 0 | Oid idxoid; |
912 | | |
913 | | /* |
914 | | * We never need index oid for partitioned tables, always rely on leaf |
915 | | * partition's index. |
916 | | */ |
917 | 0 | if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
918 | 0 | return InvalidOid; |
919 | | |
920 | | /* |
921 | | * Simple case, we already have a primary key or a replica identity index. |
922 | | */ |
923 | 0 | idxoid = GetRelationIdentityOrPK(localrel); |
924 | 0 | if (OidIsValid(idxoid)) |
925 | 0 | return idxoid; |
926 | | |
927 | 0 | if (remoterel->replident == REPLICA_IDENTITY_FULL) |
928 | 0 | { |
929 | | /* |
930 | | * We are looking for one more opportunity for using an index. If |
931 | | * there are any indexes defined on the local relation, try to pick a |
932 | | * suitable index. |
933 | | * |
934 | | * The index selection safely assumes that all the columns are going |
935 | | * to be available for the index scan given that remote relation has |
936 | | * replica identity full. |
937 | | * |
938 | | * Note that we are not using the planner to find the cheapest method |
939 | | * to scan the relation as that would require us to either use lower |
940 | | * level planner functions which would be a maintenance burden in the |
941 | | * long run or use the full-fledged planner which could cause |
942 | | * overhead. |
943 | | */ |
944 | 0 | return FindUsableIndexForReplicaIdentityFull(localrel, attrMap); |
945 | 0 | } |
946 | | |
947 | 0 | return InvalidOid; |
948 | 0 | } |