Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/replication/logical/relation.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 * relation.c
3
 *     PostgreSQL logical replication relation mapping cache
4
 *
5
 * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6
 *
7
 * IDENTIFICATION
8
 *    src/backend/replication/logical/relation.c
9
 *
10
 * NOTES
11
 *    Routines in this file mainly have to do with mapping the properties
12
 *    of local replication target relations to the properties of their
13
 *    remote counterpart.
14
 *
15
 *-------------------------------------------------------------------------
16
 */
17
18
#include "postgres.h"
19
20
#include "access/amapi.h"
21
#include "access/genam.h"
22
#include "access/table.h"
23
#include "catalog/namespace.h"
24
#include "catalog/pg_subscription_rel.h"
25
#include "executor/executor.h"
26
#include "nodes/makefuncs.h"
27
#include "replication/logicalrelation.h"
28
#include "replication/worker_internal.h"
29
#include "utils/inval.h"
30
#include "utils/lsyscache.h"
31
#include "utils/syscache.h"
32
33
34
static MemoryContext LogicalRepRelMapContext = NULL;
35
36
static HTAB *LogicalRepRelMap = NULL;
37
38
/*
39
 * Partition map (LogicalRepPartMap)
40
 *
41
 * When a partitioned table is used as replication target, replicated
42
 * operations are actually performed on its leaf partitions, which requires
43
 * the partitions to also be mapped to the remote relation.  Parent's entry
44
 * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
45
 * individual partitions may have different attribute numbers, which means
46
 * attribute mappings to remote relation's attributes must be maintained
47
 * separately for each partition.
48
 */
49
static MemoryContext LogicalRepPartMapContext = NULL;
50
static HTAB *LogicalRepPartMap = NULL;
51
typedef struct LogicalRepPartMapEntry
52
{
53
  Oid     partoid;    /* LogicalRepPartMap's key */
54
  LogicalRepRelMapEntry relmapentry;
55
} LogicalRepPartMapEntry;
56
57
static Oid  FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
58
                   AttrMap *attrMap);
59
60
/*
61
 * Relcache invalidation callback for our relation map cache.
62
 */
63
static void
64
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
65
0
{
66
0
  LogicalRepRelMapEntry *entry;
67
68
  /* Just to be sure. */
69
0
  if (LogicalRepRelMap == NULL)
70
0
    return;
71
72
0
  if (reloid != InvalidOid)
73
0
  {
74
0
    HASH_SEQ_STATUS status;
75
76
0
    hash_seq_init(&status, LogicalRepRelMap);
77
78
    /* TODO, use inverse lookup hashtable? */
79
0
    while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
80
0
    {
81
0
      if (entry->localreloid == reloid)
82
0
      {
83
0
        entry->localrelvalid = false;
84
0
        hash_seq_term(&status);
85
0
        break;
86
0
      }
87
0
    }
88
0
  }
89
0
  else
90
0
  {
91
    /* invalidate all cache entries */
92
0
    HASH_SEQ_STATUS status;
93
94
0
    hash_seq_init(&status, LogicalRepRelMap);
95
96
0
    while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
97
0
      entry->localrelvalid = false;
98
0
  }
99
0
}
100
101
/*
102
 * Initialize the relation map cache.
103
 */
104
static void
105
logicalrep_relmap_init(void)
106
0
{
107
0
  HASHCTL   ctl;
108
109
0
  if (!LogicalRepRelMapContext)
110
0
    LogicalRepRelMapContext =
111
0
      AllocSetContextCreate(CacheMemoryContext,
112
0
                  "LogicalRepRelMapContext",
113
0
                  ALLOCSET_DEFAULT_SIZES);
114
115
  /* Initialize the relation hash table. */
116
0
  ctl.keysize = sizeof(LogicalRepRelId);
117
0
  ctl.entrysize = sizeof(LogicalRepRelMapEntry);
118
0
  ctl.hcxt = LogicalRepRelMapContext;
119
120
0
  LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
121
0
                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
122
123
  /* Watch for invalidation events. */
124
0
  CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
125
0
                  (Datum) 0);
126
0
}
127
128
/*
129
 * Free the entry of a relation map cache.
130
 */
131
static void
132
logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
133
0
{
134
0
  LogicalRepRelation *remoterel;
135
136
0
  remoterel = &entry->remoterel;
137
138
0
  pfree(remoterel->nspname);
139
0
  pfree(remoterel->relname);
140
141
0
  if (remoterel->natts > 0)
142
0
  {
143
0
    int     i;
144
145
0
    for (i = 0; i < remoterel->natts; i++)
146
0
      pfree(remoterel->attnames[i]);
147
148
0
    pfree(remoterel->attnames);
149
0
    pfree(remoterel->atttyps);
150
0
  }
151
0
  bms_free(remoterel->attkeys);
152
153
0
  if (entry->attrmap)
154
0
    free_attrmap(entry->attrmap);
155
0
}
156
157
/*
158
 * Add new entry or update existing entry in the relation map cache.
159
 *
160
 * Called when new relation mapping is sent by the publisher to update
161
 * our expected view of incoming data from said publisher.
162
 */
163
void
164
logicalrep_relmap_update(LogicalRepRelation *remoterel)
165
0
{
166
0
  MemoryContext oldctx;
167
0
  LogicalRepRelMapEntry *entry;
168
0
  bool    found;
169
0
  int     i;
170
171
0
  if (LogicalRepRelMap == NULL)
172
0
    logicalrep_relmap_init();
173
174
  /*
175
   * HASH_ENTER returns the existing entry if present or creates a new one.
176
   */
177
0
  entry = hash_search(LogicalRepRelMap, &remoterel->remoteid,
178
0
            HASH_ENTER, &found);
179
180
0
  if (found)
181
0
    logicalrep_relmap_free_entry(entry);
182
183
0
  memset(entry, 0, sizeof(LogicalRepRelMapEntry));
184
185
  /* Make cached copy of the data */
186
0
  oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
187
0
  entry->remoterel.remoteid = remoterel->remoteid;
188
0
  entry->remoterel.nspname = pstrdup(remoterel->nspname);
189
0
  entry->remoterel.relname = pstrdup(remoterel->relname);
190
0
  entry->remoterel.natts = remoterel->natts;
191
0
  entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
192
0
  entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
193
0
  for (i = 0; i < remoterel->natts; i++)
194
0
  {
195
0
    entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
196
0
    entry->remoterel.atttyps[i] = remoterel->atttyps[i];
197
0
  }
198
0
  entry->remoterel.replident = remoterel->replident;
199
0
  entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
200
0
  MemoryContextSwitchTo(oldctx);
201
0
}
202
203
/*
204
 * Find attribute index in TupleDesc struct by attribute name.
205
 *
206
 * Returns -1 if not found.
207
 */
208
static int
209
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
210
0
{
211
0
  int     i;
212
213
0
  for (i = 0; i < remoterel->natts; i++)
214
0
  {
215
0
    if (strcmp(remoterel->attnames[i], attname) == 0)
216
0
      return i;
217
0
  }
218
219
0
  return -1;
220
0
}
221
222
/*
223
 * Returns a comma-separated string of attribute names based on the provided
224
 * relation and bitmap indicating which attributes to include.
225
 */
226
static char *
227
logicalrep_get_attrs_str(LogicalRepRelation *remoterel, Bitmapset *atts)
228
0
{
229
0
  StringInfoData attsbuf;
230
0
  int     attcnt = 0;
231
0
  int     i = -1;
232
233
0
  Assert(!bms_is_empty(atts));
234
235
0
  initStringInfo(&attsbuf);
236
237
0
  while ((i = bms_next_member(atts, i)) >= 0)
238
0
  {
239
0
    attcnt++;
240
0
    if (attcnt > 1)
241
0
      appendStringInfoString(&attsbuf, _(", "));
242
243
0
    appendStringInfo(&attsbuf, _("\"%s\""), remoterel->attnames[i]);
244
0
  }
245
246
0
  return attsbuf.data;
247
0
}
248
249
/*
250
 * If attempting to replicate missing or generated columns, report an error.
251
 * Prioritize 'missing' errors if both occur though the prioritization is
252
 * arbitrary.
253
 */
254
static void
255
logicalrep_report_missing_or_gen_attrs(LogicalRepRelation *remoterel,
256
                     Bitmapset *missingatts,
257
                     Bitmapset *generatedatts)
258
0
{
259
0
  if (!bms_is_empty(missingatts))
260
0
    ereport(ERROR,
261
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
262
0
        errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s",
263
0
                "logical replication target relation \"%s.%s\" is missing replicated columns: %s",
264
0
                bms_num_members(missingatts),
265
0
                remoterel->nspname,
266
0
                remoterel->relname,
267
0
                logicalrep_get_attrs_str(remoterel,
268
0
                             missingatts)));
269
270
0
  if (!bms_is_empty(generatedatts))
271
0
    ereport(ERROR,
272
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
273
0
        errmsg_plural("logical replication target relation \"%s.%s\" has incompatible generated column: %s",
274
0
                "logical replication target relation \"%s.%s\" has incompatible generated columns: %s",
275
0
                bms_num_members(generatedatts),
276
0
                remoterel->nspname,
277
0
                remoterel->relname,
278
0
                logicalrep_get_attrs_str(remoterel,
279
0
                             generatedatts)));
280
0
}
281
282
/*
283
 * Check if replica identity matches and mark the updatable flag.
284
 *
285
 * We allow for stricter replica identity (fewer columns) on subscriber as
286
 * that will not stop us from finding unique tuple. IE, if publisher has
287
 * identity (id,timestamp) and subscriber just (id) this will not be a
288
 * problem, but in the opposite scenario it will.
289
 *
290
 * We just mark the relation entry as not updatable here if the local
291
 * replica identity is found to be insufficient for applying
292
 * updates/deletes (inserts don't care!) and leave it to
293
 * check_relation_updatable() to throw the actual error if needed.
294
 */
295
static void
296
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
297
0
{
298
0
  Bitmapset  *idkey;
299
0
  LogicalRepRelation *remoterel = &entry->remoterel;
300
0
  int     i;
301
302
0
  entry->updatable = true;
303
304
0
  idkey = RelationGetIndexAttrBitmap(entry->localrel,
305
0
                     INDEX_ATTR_BITMAP_IDENTITY_KEY);
306
  /* fallback to PK if no replica identity */
307
0
  if (idkey == NULL)
308
0
  {
309
0
    idkey = RelationGetIndexAttrBitmap(entry->localrel,
310
0
                       INDEX_ATTR_BITMAP_PRIMARY_KEY);
311
312
    /*
313
     * If no replica identity index and no PK, the published table must
314
     * have replica identity FULL.
315
     */
316
0
    if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
317
0
      entry->updatable = false;
318
0
  }
319
320
0
  i = -1;
321
0
  while ((i = bms_next_member(idkey, i)) >= 0)
322
0
  {
323
0
    int     attnum = i + FirstLowInvalidHeapAttributeNumber;
324
325
0
    if (!AttrNumberIsForUserDefinedAttr(attnum))
326
0
      ereport(ERROR,
327
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
328
0
           errmsg("logical replication target relation \"%s.%s\" uses "
329
0
              "system columns in REPLICA IDENTITY index",
330
0
              remoterel->nspname, remoterel->relname)));
331
332
0
    attnum = AttrNumberGetAttrOffset(attnum);
333
334
0
    if (entry->attrmap->attnums[attnum] < 0 ||
335
0
      !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
336
0
    {
337
0
      entry->updatable = false;
338
0
      break;
339
0
    }
340
0
  }
341
0
}
342
343
/*
344
 * Open the local relation associated with the remote one.
345
 *
346
 * Rebuilds the Relcache mapping if it was invalidated by local DDL.
347
 */
348
LogicalRepRelMapEntry *
349
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
350
0
{
351
0
  LogicalRepRelMapEntry *entry;
352
0
  bool    found;
353
0
  LogicalRepRelation *remoterel;
354
355
0
  if (LogicalRepRelMap == NULL)
356
0
    logicalrep_relmap_init();
357
358
  /* Search for existing entry. */
359
0
  entry = hash_search(LogicalRepRelMap, &remoteid,
360
0
            HASH_FIND, &found);
361
362
0
  if (!found)
363
0
    elog(ERROR, "no relation map entry for remote relation ID %u",
364
0
       remoteid);
365
366
0
  remoterel = &entry->remoterel;
367
368
  /* Ensure we don't leak a relcache refcount. */
369
0
  if (entry->localrel)
370
0
    elog(ERROR, "remote relation ID %u is already open", remoteid);
371
372
  /*
373
   * When opening and locking a relation, pending invalidation messages are
374
   * processed which can invalidate the relation.  Hence, if the entry is
375
   * currently considered valid, try to open the local relation by OID and
376
   * see if invalidation ensues.
377
   */
378
0
  if (entry->localrelvalid)
379
0
  {
380
0
    entry->localrel = try_table_open(entry->localreloid, lockmode);
381
0
    if (!entry->localrel)
382
0
    {
383
      /* Table was renamed or dropped. */
384
0
      entry->localrelvalid = false;
385
0
    }
386
0
    else if (!entry->localrelvalid)
387
0
    {
388
      /* Note we release the no-longer-useful lock here. */
389
0
      table_close(entry->localrel, lockmode);
390
0
      entry->localrel = NULL;
391
0
    }
392
0
  }
393
394
  /*
395
   * If the entry has been marked invalid since we last had lock on it,
396
   * re-open the local relation by name and rebuild all derived data.
397
   */
398
0
  if (!entry->localrelvalid)
399
0
  {
400
0
    Oid     relid;
401
0
    TupleDesc desc;
402
0
    MemoryContext oldctx;
403
0
    int     i;
404
0
    Bitmapset  *missingatts;
405
0
    Bitmapset  *generatedattrs = NULL;
406
407
    /* Release the no-longer-useful attrmap, if any. */
408
0
    if (entry->attrmap)
409
0
    {
410
0
      free_attrmap(entry->attrmap);
411
0
      entry->attrmap = NULL;
412
0
    }
413
414
    /* Try to find and lock the relation by name. */
415
0
    relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
416
0
                        remoterel->relname, -1),
417
0
                 lockmode, true);
418
0
    if (!OidIsValid(relid))
419
0
      ereport(ERROR,
420
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
421
0
           errmsg("logical replication target relation \"%s.%s\" does not exist",
422
0
              remoterel->nspname, remoterel->relname)));
423
0
    entry->localrel = table_open(relid, NoLock);
424
0
    entry->localreloid = relid;
425
426
    /* Check for supported relkind. */
427
0
    CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
428
0
                 remoterel->nspname, remoterel->relname);
429
430
    /*
431
     * Build the mapping of local attribute numbers to remote attribute
432
     * numbers and validate that we don't miss any replicated columns as
433
     * that would result in potentially unwanted data loss.
434
     */
435
0
    desc = RelationGetDescr(entry->localrel);
436
0
    oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
437
0
    entry->attrmap = make_attrmap(desc->natts);
438
0
    MemoryContextSwitchTo(oldctx);
439
440
    /* check and report missing attrs, if any */
441
0
    missingatts = bms_add_range(NULL, 0, remoterel->natts - 1);
442
0
    for (i = 0; i < desc->natts; i++)
443
0
    {
444
0
      int     attnum;
445
0
      Form_pg_attribute attr = TupleDescAttr(desc, i);
446
447
0
      if (attr->attisdropped)
448
0
      {
449
0
        entry->attrmap->attnums[i] = -1;
450
0
        continue;
451
0
      }
452
453
0
      attnum = logicalrep_rel_att_by_name(remoterel,
454
0
                        NameStr(attr->attname));
455
456
0
      entry->attrmap->attnums[i] = attnum;
457
0
      if (attnum >= 0)
458
0
      {
459
        /* Remember which subscriber columns are generated. */
460
0
        if (attr->attgenerated)
461
0
          generatedattrs = bms_add_member(generatedattrs, attnum);
462
463
0
        missingatts = bms_del_member(missingatts, attnum);
464
0
      }
465
0
    }
466
467
0
    logicalrep_report_missing_or_gen_attrs(remoterel, missingatts,
468
0
                         generatedattrs);
469
470
    /* be tidy */
471
0
    bms_free(generatedattrs);
472
0
    bms_free(missingatts);
473
474
    /*
475
     * Set if the table's replica identity is enough to apply
476
     * update/delete.
477
     */
478
0
    logicalrep_rel_mark_updatable(entry);
479
480
    /*
481
     * Finding a usable index is an infrequent task. It occurs when an
482
     * operation is first performed on the relation, or after invalidation
483
     * of the relation cache entry (such as ANALYZE or CREATE/DROP index
484
     * on the relation).
485
     */
486
0
    entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel,
487
0
                            entry->attrmap);
488
489
0
    entry->localrelvalid = true;
490
0
  }
491
492
0
  if (entry->state != SUBREL_STATE_READY)
493
0
    entry->state = GetSubscriptionRelState(MySubscription->oid,
494
0
                         entry->localreloid,
495
0
                         &entry->statelsn);
496
497
0
  return entry;
498
0
}
499
500
/*
501
 * Close the previously opened logical relation.
502
 */
503
void
504
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
505
0
{
506
0
  table_close(rel->localrel, lockmode);
507
0
  rel->localrel = NULL;
508
0
}
509
510
/*
511
 * Partition cache: look up partition LogicalRepRelMapEntry's
512
 *
513
 * Unlike relation map cache, this is keyed by partition OID, not remote
514
 * relation OID, because we only have to use this cache in the case where
515
 * partitions are not directly mapped to any remote relation, such as when
516
 * replication is occurring with one of their ancestors as target.
517
 */
518
519
/*
520
 * Relcache invalidation callback
521
 */
522
static void
523
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
524
0
{
525
0
  LogicalRepPartMapEntry *entry;
526
527
  /* Just to be sure. */
528
0
  if (LogicalRepPartMap == NULL)
529
0
    return;
530
531
0
  if (reloid != InvalidOid)
532
0
  {
533
0
    HASH_SEQ_STATUS status;
534
535
0
    hash_seq_init(&status, LogicalRepPartMap);
536
537
    /* TODO, use inverse lookup hashtable? */
538
0
    while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
539
0
    {
540
0
      if (entry->relmapentry.localreloid == reloid)
541
0
      {
542
0
        entry->relmapentry.localrelvalid = false;
543
0
        hash_seq_term(&status);
544
0
        break;
545
0
      }
546
0
    }
547
0
  }
548
0
  else
549
0
  {
550
    /* invalidate all cache entries */
551
0
    HASH_SEQ_STATUS status;
552
553
0
    hash_seq_init(&status, LogicalRepPartMap);
554
555
0
    while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
556
0
      entry->relmapentry.localrelvalid = false;
557
0
  }
558
0
}
559
560
/*
561
 * Reset the entries in the partition map that refer to remoterel.
562
 *
563
 * Called when new relation mapping is sent by the publisher to update our
564
 * expected view of incoming data from said publisher.
565
 *
566
 * Note that we don't update the remoterel information in the entry here,
567
 * we will update the information in logicalrep_partition_open to avoid
568
 * unnecessary work.
569
 */
570
void
571
logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
572
0
{
573
0
  HASH_SEQ_STATUS status;
574
0
  LogicalRepPartMapEntry *part_entry;
575
0
  LogicalRepRelMapEntry *entry;
576
577
0
  if (LogicalRepPartMap == NULL)
578
0
    return;
579
580
0
  hash_seq_init(&status, LogicalRepPartMap);
581
0
  while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
582
0
  {
583
0
    entry = &part_entry->relmapentry;
584
585
0
    if (entry->remoterel.remoteid != remoterel->remoteid)
586
0
      continue;
587
588
0
    logicalrep_relmap_free_entry(entry);
589
590
0
    memset(entry, 0, sizeof(LogicalRepRelMapEntry));
591
0
  }
592
0
}
593
594
/*
595
 * Initialize the partition map cache.
596
 */
597
static void
598
logicalrep_partmap_init(void)
599
0
{
600
0
  HASHCTL   ctl;
601
602
0
  if (!LogicalRepPartMapContext)
603
0
    LogicalRepPartMapContext =
604
0
      AllocSetContextCreate(CacheMemoryContext,
605
0
                  "LogicalRepPartMapContext",
606
0
                  ALLOCSET_DEFAULT_SIZES);
607
608
  /* Initialize the relation hash table. */
609
0
  ctl.keysize = sizeof(Oid);  /* partition OID */
610
0
  ctl.entrysize = sizeof(LogicalRepPartMapEntry);
611
0
  ctl.hcxt = LogicalRepPartMapContext;
612
613
0
  LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
614
0
                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
615
616
  /* Watch for invalidation events. */
617
0
  CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
618
0
                  (Datum) 0);
619
0
}
620
621
/*
622
 * logicalrep_partition_open
623
 *
624
 * Returned entry reuses most of the values of the root table's entry, save
625
 * the attribute map, which can be different for the partition.  However,
626
 * we must physically copy all the data, in case the root table's entry
627
 * gets freed/rebuilt.
628
 *
629
 * Note there's no logicalrep_partition_close, because the caller closes the
630
 * component relation.
631
 */
632
LogicalRepRelMapEntry *
633
logicalrep_partition_open(LogicalRepRelMapEntry *root,
634
              Relation partrel, AttrMap *map)
635
0
{
636
0
  LogicalRepRelMapEntry *entry;
637
0
  LogicalRepPartMapEntry *part_entry;
638
0
  LogicalRepRelation *remoterel = &root->remoterel;
639
0
  Oid     partOid = RelationGetRelid(partrel);
640
0
  AttrMap    *attrmap = root->attrmap;
641
0
  bool    found;
642
0
  MemoryContext oldctx;
643
644
0
  if (LogicalRepPartMap == NULL)
645
0
    logicalrep_partmap_init();
646
647
  /* Search for existing entry. */
648
0
  part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
649
0
                            &partOid,
650
0
                            HASH_ENTER, &found);
651
652
0
  entry = &part_entry->relmapentry;
653
654
  /*
655
   * We must always overwrite entry->localrel with the latest partition
656
   * Relation pointer, because the Relation pointed to by the old value may
657
   * have been cleared after the caller would have closed the partition
658
   * relation after the last use of this entry.  Note that localrelvalid is
659
   * only updated by the relcache invalidation callback, so it may still be
660
   * true irrespective of whether the Relation pointed to by localrel has
661
   * been cleared or not.
662
   */
663
0
  if (found && entry->localrelvalid)
664
0
  {
665
0
    entry->localrel = partrel;
666
0
    return entry;
667
0
  }
668
669
  /* Switch to longer-lived context. */
670
0
  oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
671
672
0
  if (!found)
673
0
  {
674
0
    memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
675
0
    part_entry->partoid = partOid;
676
0
  }
677
678
  /* Release the no-longer-useful attrmap, if any. */
679
0
  if (entry->attrmap)
680
0
  {
681
0
    free_attrmap(entry->attrmap);
682
0
    entry->attrmap = NULL;
683
0
  }
684
685
0
  if (!entry->remoterel.remoteid)
686
0
  {
687
0
    int     i;
688
689
    /* Remote relation is copied as-is from the root entry. */
690
0
    entry->remoterel.remoteid = remoterel->remoteid;
691
0
    entry->remoterel.nspname = pstrdup(remoterel->nspname);
692
0
    entry->remoterel.relname = pstrdup(remoterel->relname);
693
0
    entry->remoterel.natts = remoterel->natts;
694
0
    entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
695
0
    entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
696
0
    for (i = 0; i < remoterel->natts; i++)
697
0
    {
698
0
      entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
699
0
      entry->remoterel.atttyps[i] = remoterel->atttyps[i];
700
0
    }
701
0
    entry->remoterel.replident = remoterel->replident;
702
0
    entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
703
0
  }
704
705
0
  entry->localrel = partrel;
706
0
  entry->localreloid = partOid;
707
708
  /*
709
   * If the partition's attributes don't match the root relation's, we'll
710
   * need to make a new attrmap which maps partition attribute numbers to
711
   * remoterel's, instead of the original which maps root relation's
712
   * attribute numbers to remoterel's.
713
   *
714
   * Note that 'map' which comes from the tuple routing data structure
715
   * contains 1-based attribute numbers (of the parent relation).  However,
716
   * the map in 'entry', a logical replication data structure, contains
717
   * 0-based attribute numbers (of the remote relation).
718
   */
719
0
  if (map)
720
0
  {
721
0
    AttrNumber  attno;
722
723
0
    entry->attrmap = make_attrmap(map->maplen);
724
0
    for (attno = 0; attno < entry->attrmap->maplen; attno++)
725
0
    {
726
0
      AttrNumber  root_attno = map->attnums[attno];
727
728
      /* 0 means it's a dropped attribute.  See comments atop AttrMap. */
729
0
      if (root_attno == 0)
730
0
        entry->attrmap->attnums[attno] = -1;
731
0
      else
732
0
        entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
733
0
    }
734
0
  }
735
0
  else
736
0
  {
737
    /* Lacking copy_attmap, do this the hard way. */
738
0
    entry->attrmap = make_attrmap(attrmap->maplen);
739
0
    memcpy(entry->attrmap->attnums, attrmap->attnums,
740
0
         attrmap->maplen * sizeof(AttrNumber));
741
0
  }
742
743
  /* Set if the table's replica identity is enough to apply update/delete. */
744
0
  logicalrep_rel_mark_updatable(entry);
745
746
  /* state and statelsn are left set to 0. */
747
0
  MemoryContextSwitchTo(oldctx);
748
749
  /*
750
   * Finding a usable index is an infrequent task. It occurs when an
751
   * operation is first performed on the relation, or after invalidation of
752
   * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
753
   * relation).
754
   *
755
   * We also prefer to run this code on the oldctx so that we do not leak
756
   * anything in the LogicalRepPartMapContext (hence CacheMemoryContext).
757
   */
758
0
  entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel,
759
0
                          entry->attrmap);
760
761
0
  entry->localrelvalid = true;
762
763
0
  return entry;
764
0
}
765
766
/*
767
 * Returns the oid of an index that can be used by the apply worker to scan
768
 * the relation.
769
 *
770
 * We expect to call this function when REPLICA IDENTITY FULL is defined for
771
 * the remote relation.
772
 *
773
 * If no suitable index is found, returns InvalidOid.
774
 */
775
static Oid
776
FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
777
0
{
778
0
  List     *idxlist = RelationGetIndexList(localrel);
779
780
0
  foreach_oid(idxoid, idxlist)
781
0
  {
782
0
    bool    isUsableIdx;
783
0
    Relation  idxRel;
784
785
0
    idxRel = index_open(idxoid, AccessShareLock);
786
0
    isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxRel, attrmap);
787
0
    index_close(idxRel, AccessShareLock);
788
789
    /* Return the first eligible index found */
790
0
    if (isUsableIdx)
791
0
      return idxoid;
792
0
  }
793
794
0
  return InvalidOid;
795
0
}
796
797
/*
798
 * Returns true if the index is usable for replica identity full.
799
 *
800
 * The index must have an equal strategy for each key column, be non-partial,
801
 * and the leftmost field must be a column (not an expression) that references
802
 * the remote relation column. These limitations help to keep the index scan
803
 * similar to PK/RI index scans.
804
 *
805
 * attrmap is a map of local attributes to remote ones. We can consult this
806
 * map to check whether the local index attribute has a corresponding remote
807
 * attribute.
808
 *
809
 * Note that the limitations of index scans for replica identity full only
810
 * adheres to a subset of the limitations of PK/RI. For example, we support
811
 * columns that are marked as [NULL] or we are not interested in the [NOT
812
 * DEFERRABLE] aspect of constraints here. It works for us because we always
813
 * compare the tuples for non-PK/RI index scans. See
814
 * RelationFindReplTupleByIndex().
815
 *
816
 * XXX: To support partial indexes, the required changes are likely to be larger.
817
 * If none of the tuples satisfy the expression for the index scan, we fall-back
818
 * to sequential execution, which might not be a good idea in some cases.
819
 */
820
bool
821
IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
822
0
{
823
0
  AttrNumber  keycol;
824
0
  oidvector  *indclass;
825
826
  /* The index must not be a partial index */
827
0
  if (!heap_attisnull(idxrel->rd_indextuple, Anum_pg_index_indpred, NULL))
828
0
    return false;
829
830
0
  Assert(idxrel->rd_index->indnatts >= 1);
831
832
0
  indclass = (oidvector *) DatumGetPointer(SysCacheGetAttrNotNull(INDEXRELID,
833
0
                                  idxrel->rd_indextuple,
834
0
                                  Anum_pg_index_indclass));
835
836
  /* Ensure that the index has a valid equal strategy for each key column */
837
0
  for (int i = 0; i < idxrel->rd_index->indnkeyatts; i++)
838
0
  {
839
0
    Oid     opfamily;
840
841
0
    opfamily = get_opclass_family(indclass->values[i]);
842
0
    if (IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, true) == InvalidStrategy)
843
0
      return false;
844
0
  }
845
846
  /*
847
   * For indexes other than PK and REPLICA IDENTITY, we need to match the
848
   * local and remote tuples.  The equality routine tuples_equal() cannot
849
   * accept a data type where the type cache cannot provide an equality
850
   * operator.
851
   */
852
0
  for (int i = 0; i < idxrel->rd_att->natts; i++)
853
0
  {
854
0
    TypeCacheEntry *typentry;
855
856
0
    typentry = lookup_type_cache(TupleDescAttr(idxrel->rd_att, i)->atttypid, TYPECACHE_EQ_OPR_FINFO);
857
0
    if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
858
0
      return false;
859
0
  }
860
861
  /* The leftmost index field must not be an expression */
862
0
  keycol = idxrel->rd_index->indkey.values[0];
863
0
  if (!AttributeNumberIsValid(keycol))
864
0
    return false;
865
866
  /*
867
   * And the leftmost index field must reference the remote relation column.
868
   * This is because if it doesn't, the sequential scan is favorable over
869
   * index scan in most cases.
870
   */
871
0
  if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol) ||
872
0
    attrmap->attnums[AttrNumberGetAttrOffset(keycol)] < 0)
873
0
    return false;
874
875
  /*
876
   * The given index access method must implement "amgettuple", which will
877
   * be used later to fetch the tuples.  See RelationFindReplTupleByIndex().
878
   */
879
0
  if (GetIndexAmRoutineByAmId(idxrel->rd_rel->relam, false)->amgettuple == NULL)
880
0
    return false;
881
882
0
  return true;
883
0
}
884
885
/*
886
 * Return the OID of the replica identity index if one is defined;
887
 * the OID of the PK if one exists and is not deferrable;
888
 * otherwise, InvalidOid.
889
 */
890
Oid
891
GetRelationIdentityOrPK(Relation rel)
892
0
{
893
0
  Oid     idxoid;
894
895
0
  idxoid = RelationGetReplicaIndex(rel);
896
897
0
  if (!OidIsValid(idxoid))
898
0
    idxoid = RelationGetPrimaryKeyIndex(rel, false);
899
900
0
  return idxoid;
901
0
}
902
903
/*
904
 * Returns the index oid if we can use an index for subscriber. Otherwise,
905
 * returns InvalidOid.
906
 */
907
static Oid
908
FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
909
             AttrMap *attrMap)
910
0
{
911
0
  Oid     idxoid;
912
913
  /*
914
   * We never need index oid for partitioned tables, always rely on leaf
915
   * partition's index.
916
   */
917
0
  if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
918
0
    return InvalidOid;
919
920
  /*
921
   * Simple case, we already have a primary key or a replica identity index.
922
   */
923
0
  idxoid = GetRelationIdentityOrPK(localrel);
924
0
  if (OidIsValid(idxoid))
925
0
    return idxoid;
926
927
0
  if (remoterel->replident == REPLICA_IDENTITY_FULL)
928
0
  {
929
    /*
930
     * We are looking for one more opportunity for using an index. If
931
     * there are any indexes defined on the local relation, try to pick a
932
     * suitable index.
933
     *
934
     * The index selection safely assumes that all the columns are going
935
     * to be available for the index scan given that remote relation has
936
     * replica identity full.
937
     *
938
     * Note that we are not using the planner to find the cheapest method
939
     * to scan the relation as that would require us to either use lower
940
     * level planner functions which would be a maintenance burden in the
941
     * long run or use the full-fledged planner which could cause
942
     * overhead.
943
     */
944
0
    return FindUsableIndexForReplicaIdentityFull(localrel, attrMap);
945
0
  }
946
947
0
  return InvalidOid;
948
0
}