Coverage Report

Created: 2025-10-09 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/catalog/heap.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * heap.c
4
 *    code to create and destroy POSTGRES heap relations
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/catalog/heap.c
12
 *
13
 *
14
 * INTERFACE ROUTINES
15
 *    heap_create()     - Create an uncataloged heap relation
16
 *    heap_create_with_catalog() - Create a cataloged relation
17
 *    heap_drop_with_catalog() - Removes named relation from catalogs
18
 *
19
 * NOTES
20
 *    this code taken from access/heap/create.c, which contains
21
 *    the old heap_create_with_catalog, amcreate, and amdestroy.
22
 *    those routines will soon call these routines using the function
23
 *    manager,
24
 *    just like the poorly named "NewXXX" routines do.  The
25
 *    "New" routines are all going to die soon, once and for all!
26
 *    -cim 1/13/91
27
 *
28
 *-------------------------------------------------------------------------
29
 */
30
#include "postgres.h"
31
32
#include "access/genam.h"
33
#include "access/multixact.h"
34
#include "access/relation.h"
35
#include "access/table.h"
36
#include "access/tableam.h"
37
#include "catalog/binary_upgrade.h"
38
#include "catalog/catalog.h"
39
#include "catalog/heap.h"
40
#include "catalog/index.h"
41
#include "catalog/objectaccess.h"
42
#include "catalog/partition.h"
43
#include "catalog/pg_am.h"
44
#include "catalog/pg_attrdef.h"
45
#include "catalog/pg_collation.h"
46
#include "catalog/pg_constraint.h"
47
#include "catalog/pg_foreign_table.h"
48
#include "catalog/pg_inherits.h"
49
#include "catalog/pg_namespace.h"
50
#include "catalog/pg_opclass.h"
51
#include "catalog/pg_partitioned_table.h"
52
#include "catalog/pg_statistic.h"
53
#include "catalog/pg_subscription_rel.h"
54
#include "catalog/pg_tablespace.h"
55
#include "catalog/pg_type.h"
56
#include "catalog/storage.h"
57
#include "commands/tablecmds.h"
58
#include "commands/typecmds.h"
59
#include "common/int.h"
60
#include "miscadmin.h"
61
#include "nodes/nodeFuncs.h"
62
#include "optimizer/optimizer.h"
63
#include "parser/parse_coerce.h"
64
#include "parser/parse_collate.h"
65
#include "parser/parse_expr.h"
66
#include "parser/parse_relation.h"
67
#include "parser/parsetree.h"
68
#include "partitioning/partdesc.h"
69
#include "pgstat.h"
70
#include "storage/lmgr.h"
71
#include "storage/predicate.h"
72
#include "utils/array.h"
73
#include "utils/builtins.h"
74
#include "utils/fmgroids.h"
75
#include "utils/inval.h"
76
#include "utils/lsyscache.h"
77
#include "utils/syscache.h"
78
79
80
/* Potentially set by pg_upgrade_support functions */
81
Oid     binary_upgrade_next_heap_pg_class_oid = InvalidOid;
82
Oid     binary_upgrade_next_toast_pg_class_oid = InvalidOid;
83
RelFileNumber binary_upgrade_next_heap_pg_class_relfilenumber = InvalidRelFileNumber;
84
RelFileNumber binary_upgrade_next_toast_pg_class_relfilenumber = InvalidRelFileNumber;
85
86
static void AddNewRelationTuple(Relation pg_class_desc,
87
                Relation new_rel_desc,
88
                Oid new_rel_oid,
89
                Oid new_type_oid,
90
                Oid reloftype,
91
                Oid relowner,
92
                char relkind,
93
                TransactionId relfrozenxid,
94
                TransactionId relminmxid,
95
                Datum relacl,
96
                Datum reloptions);
97
static ObjectAddress AddNewRelationType(const char *typeName,
98
                    Oid typeNamespace,
99
                    Oid new_rel_oid,
100
                    char new_rel_kind,
101
                    Oid ownerid,
102
                    Oid new_row_type,
103
                    Oid new_array_type);
104
static void RelationRemoveInheritance(Oid relid);
105
static Oid  StoreRelCheck(Relation rel, const char *ccname, Node *expr,
106
              bool is_enforced, bool is_validated, bool is_local,
107
              int16 inhcount, bool is_no_inherit, bool is_internal);
108
static void StoreConstraints(Relation rel, List *cooked_constraints,
109
               bool is_internal);
110
static bool MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
111
                    bool allow_merge, bool is_local,
112
                    bool is_enforced,
113
                    bool is_initially_valid,
114
                    bool is_no_inherit);
115
static void SetRelationNumChecks(Relation rel, int numchecks);
116
static Node *cookConstraint(ParseState *pstate,
117
              Node *raw_constraint,
118
              char *relname);
119
120
121
/* ----------------------------------------------------------------
122
 *        XXX UGLY HARD CODED BADNESS FOLLOWS XXX
123
 *
124
 *    these should all be moved to someplace in the lib/catalog
125
 *    module, if not obliterated first.
126
 * ----------------------------------------------------------------
127
 */
128
129
130
/*
131
 * Note:
132
 *    Should the system special case these attributes in the future?
133
 *    Advantage:  consume much less space in the ATTRIBUTE relation.
134
 *    Disadvantage:  special cases will be all over the place.
135
 */
136
137
/*
138
 * The initializers below do not include trailing variable length fields,
139
 * but that's OK - we're never going to reference anything beyond the
140
 * fixed-size portion of the structure anyway.  Fields that can default
141
 * to zeroes are also not mentioned.
142
 */
143
144
static const FormData_pg_attribute a1 = {
145
  .attname = {"ctid"},
146
  .atttypid = TIDOID,
147
  .attlen = sizeof(ItemPointerData),
148
  .attnum = SelfItemPointerAttributeNumber,
149
  .atttypmod = -1,
150
  .attbyval = false,
151
  .attalign = TYPALIGN_SHORT,
152
  .attstorage = TYPSTORAGE_PLAIN,
153
  .attnotnull = true,
154
  .attislocal = true,
155
};
156
157
static const FormData_pg_attribute a2 = {
158
  .attname = {"xmin"},
159
  .atttypid = XIDOID,
160
  .attlen = sizeof(TransactionId),
161
  .attnum = MinTransactionIdAttributeNumber,
162
  .atttypmod = -1,
163
  .attbyval = true,
164
  .attalign = TYPALIGN_INT,
165
  .attstorage = TYPSTORAGE_PLAIN,
166
  .attnotnull = true,
167
  .attislocal = true,
168
};
169
170
static const FormData_pg_attribute a3 = {
171
  .attname = {"cmin"},
172
  .atttypid = CIDOID,
173
  .attlen = sizeof(CommandId),
174
  .attnum = MinCommandIdAttributeNumber,
175
  .atttypmod = -1,
176
  .attbyval = true,
177
  .attalign = TYPALIGN_INT,
178
  .attstorage = TYPSTORAGE_PLAIN,
179
  .attnotnull = true,
180
  .attislocal = true,
181
};
182
183
static const FormData_pg_attribute a4 = {
184
  .attname = {"xmax"},
185
  .atttypid = XIDOID,
186
  .attlen = sizeof(TransactionId),
187
  .attnum = MaxTransactionIdAttributeNumber,
188
  .atttypmod = -1,
189
  .attbyval = true,
190
  .attalign = TYPALIGN_INT,
191
  .attstorage = TYPSTORAGE_PLAIN,
192
  .attnotnull = true,
193
  .attislocal = true,
194
};
195
196
static const FormData_pg_attribute a5 = {
197
  .attname = {"cmax"},
198
  .atttypid = CIDOID,
199
  .attlen = sizeof(CommandId),
200
  .attnum = MaxCommandIdAttributeNumber,
201
  .atttypmod = -1,
202
  .attbyval = true,
203
  .attalign = TYPALIGN_INT,
204
  .attstorage = TYPSTORAGE_PLAIN,
205
  .attnotnull = true,
206
  .attislocal = true,
207
};
208
209
/*
210
 * We decided to call this attribute "tableoid" rather than say
211
 * "classoid" on the basis that in the future there may be more than one
212
 * table of a particular class/type. In any case table is still the word
213
 * used in SQL.
214
 */
215
static const FormData_pg_attribute a6 = {
216
  .attname = {"tableoid"},
217
  .atttypid = OIDOID,
218
  .attlen = sizeof(Oid),
219
  .attnum = TableOidAttributeNumber,
220
  .atttypmod = -1,
221
  .attbyval = true,
222
  .attalign = TYPALIGN_INT,
223
  .attstorage = TYPSTORAGE_PLAIN,
224
  .attnotnull = true,
225
  .attislocal = true,
226
};
227
228
static const FormData_pg_attribute *const SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6};
229
230
/*
231
 * This function returns a Form_pg_attribute pointer for a system attribute.
232
 * Note that we elog if the presented attno is invalid, which would only
233
 * happen if there's a problem upstream.
234
 */
235
const FormData_pg_attribute *
236
SystemAttributeDefinition(AttrNumber attno)
237
0
{
238
0
  if (attno >= 0 || attno < -(int) lengthof(SysAtt))
239
0
    elog(ERROR, "invalid system attribute number %d", attno);
240
0
  return SysAtt[-attno - 1];
241
0
}
242
243
/*
244
 * If the given name is a system attribute name, return a Form_pg_attribute
245
 * pointer for a prototype definition.  If not, return NULL.
246
 */
247
const FormData_pg_attribute *
248
SystemAttributeByName(const char *attname)
249
0
{
250
0
  int     j;
251
252
0
  for (j = 0; j < (int) lengthof(SysAtt); j++)
253
0
  {
254
0
    const FormData_pg_attribute *att = SysAtt[j];
255
256
0
    if (strcmp(NameStr(att->attname), attname) == 0)
257
0
      return att;
258
0
  }
259
260
0
  return NULL;
261
0
}
262
263
264
/* ----------------------------------------------------------------
265
 *        XXX END OF UGLY HARD CODED BADNESS XXX
266
 * ---------------------------------------------------------------- */
267
268
269
/* ----------------------------------------------------------------
270
 *    heap_create   - Create an uncataloged heap relation
271
 *
272
 *    Note API change: the caller must now always provide the OID
273
 *    to use for the relation.  The relfilenumber may be (and in
274
 *    the simplest cases is) left unspecified.
275
 *
276
 *    create_storage indicates whether or not to create the storage.
277
 *    However, even if create_storage is true, no storage will be
278
 *    created if the relkind is one that doesn't have storage.
279
 *
280
 *    rel->rd_rel is initialized by RelationBuildLocalRelation,
281
 *    and is mostly zeroes at return.
282
 * ----------------------------------------------------------------
283
 */
284
Relation
285
heap_create(const char *relname,
286
      Oid relnamespace,
287
      Oid reltablespace,
288
      Oid relid,
289
      RelFileNumber relfilenumber,
290
      Oid accessmtd,
291
      TupleDesc tupDesc,
292
      char relkind,
293
      char relpersistence,
294
      bool shared_relation,
295
      bool mapped_relation,
296
      bool allow_system_table_mods,
297
      TransactionId *relfrozenxid,
298
      MultiXactId *relminmxid,
299
      bool create_storage)
300
0
{
301
0
  Relation  rel;
302
303
  /* The caller must have provided an OID for the relation. */
304
0
  Assert(OidIsValid(relid));
305
306
  /*
307
   * Don't allow creating relations in pg_catalog directly, even though it
308
   * is allowed to move user defined relations there. Semantics with search
309
   * paths including pg_catalog are too confusing for now.
310
   *
311
   * But allow creating indexes on relations in pg_catalog even if
312
   * allow_system_table_mods = off, upper layers already guarantee it's on a
313
   * user defined relation, not a system one.
314
   */
315
0
  if (!allow_system_table_mods &&
316
0
    ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
317
0
     IsToastNamespace(relnamespace)) &&
318
0
    IsNormalProcessingMode())
319
0
    ereport(ERROR,
320
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
321
0
         errmsg("permission denied to create \"%s.%s\"",
322
0
            get_namespace_name(relnamespace), relname),
323
0
         errdetail("System catalog modifications are currently disallowed.")));
324
325
0
  *relfrozenxid = InvalidTransactionId;
326
0
  *relminmxid = InvalidMultiXactId;
327
328
  /*
329
   * Force reltablespace to zero if the relation kind does not support
330
   * tablespaces.  This is mainly just for cleanliness' sake.
331
   */
332
0
  if (!RELKIND_HAS_TABLESPACE(relkind))
333
0
    reltablespace = InvalidOid;
334
335
  /* Don't create storage for relkinds without physical storage. */
336
0
  if (!RELKIND_HAS_STORAGE(relkind))
337
0
    create_storage = false;
338
0
  else
339
0
  {
340
    /*
341
     * If relfilenumber is unspecified by the caller then create storage
342
     * with oid same as relid.
343
     */
344
0
    if (!RelFileNumberIsValid(relfilenumber))
345
0
      relfilenumber = relid;
346
0
  }
347
348
  /*
349
   * Never allow a pg_class entry to explicitly specify the database's
350
   * default tablespace in reltablespace; force it to zero instead. This
351
   * ensures that if the database is cloned with a different default
352
   * tablespace, the pg_class entry will still match where CREATE DATABASE
353
   * will put the physically copied relation.
354
   *
355
   * Yes, this is a bit of a hack.
356
   */
357
0
  if (reltablespace == MyDatabaseTableSpace)
358
0
    reltablespace = InvalidOid;
359
360
  /*
361
   * build the relcache entry.
362
   */
363
0
  rel = RelationBuildLocalRelation(relname,
364
0
                   relnamespace,
365
0
                   tupDesc,
366
0
                   relid,
367
0
                   accessmtd,
368
0
                   relfilenumber,
369
0
                   reltablespace,
370
0
                   shared_relation,
371
0
                   mapped_relation,
372
0
                   relpersistence,
373
0
                   relkind);
374
375
  /*
376
   * Have the storage manager create the relation's disk file, if needed.
377
   *
378
   * For tables, the AM callback creates both the main and the init fork.
379
   * For others, only the main fork is created; the other forks will be
380
   * created on demand.
381
   */
382
0
  if (create_storage)
383
0
  {
384
0
    if (RELKIND_HAS_TABLE_AM(rel->rd_rel->relkind))
385
0
      table_relation_set_new_filelocator(rel, &rel->rd_locator,
386
0
                         relpersistence,
387
0
                         relfrozenxid, relminmxid);
388
0
    else if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
389
0
      RelationCreateStorage(rel->rd_locator, relpersistence, true);
390
0
    else
391
0
      Assert(false);
392
0
  }
393
394
  /*
395
   * If a tablespace is specified, removal of that tablespace is normally
396
   * protected by the existence of a physical file; but for relations with
397
   * no files, add a pg_shdepend entry to account for that.
398
   */
399
0
  if (!create_storage && reltablespace != InvalidOid)
400
0
    recordDependencyOnTablespace(RelationRelationId, relid,
401
0
                   reltablespace);
402
403
  /* ensure that stats are dropped if transaction aborts */
404
0
  pgstat_create_relation(rel);
405
406
0
  return rel;
407
0
}
408
409
/* ----------------------------------------------------------------
410
 *    heap_create_with_catalog    - Create a cataloged relation
411
 *
412
 *    this is done in multiple steps:
413
 *
414
 *    1) CheckAttributeNamesTypes() is used to make certain the tuple
415
 *       descriptor contains a valid set of attribute names and types
416
 *
417
 *    2) pg_class is opened and get_relname_relid()
418
 *       performs a scan to ensure that no relation with the
419
 *       same name already exists.
420
 *
421
 *    3) heap_create() is called to create the new relation on disk.
422
 *
423
 *    4) TypeCreate() is called to define a new type corresponding
424
 *       to the new relation.
425
 *
426
 *    5) AddNewRelationTuple() is called to register the
427
 *       relation in pg_class.
428
 *
429
 *    6) AddNewAttributeTuples() is called to register the
430
 *       new relation's schema in pg_attribute.
431
 *
432
 *    7) StoreConstraints() is called     - vadim 08/22/97
433
 *
434
 *    8) the relations are closed and the new relation's oid
435
 *       is returned.
436
 *
437
 * ----------------------------------------------------------------
438
 */
439
440
/* --------------------------------
441
 *    CheckAttributeNamesTypes
442
 *
443
 *    this is used to make certain the tuple descriptor contains a
444
 *    valid set of attribute names and datatypes.  a problem simply
445
 *    generates ereport(ERROR) which aborts the current transaction.
446
 *
447
 *    relkind is the relkind of the relation to be created.
448
 *    flags controls which datatypes are allowed, cf CheckAttributeType.
449
 * --------------------------------
450
 */
451
void
452
CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
453
             int flags)
454
0
{
455
0
  int     i;
456
0
  int     j;
457
0
  int     natts = tupdesc->natts;
458
459
  /* Sanity check on column count */
460
0
  if (natts < 0 || natts > MaxHeapAttributeNumber)
461
0
    ereport(ERROR,
462
0
        (errcode(ERRCODE_TOO_MANY_COLUMNS),
463
0
         errmsg("tables can have at most %d columns",
464
0
            MaxHeapAttributeNumber)));
465
466
  /*
467
   * first check for collision with system attribute names
468
   *
469
   * Skip this for a view or type relation, since those don't have system
470
   * attributes.
471
   */
472
0
  if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
473
0
  {
474
0
    for (i = 0; i < natts; i++)
475
0
    {
476
0
      Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
477
478
0
      if (SystemAttributeByName(NameStr(attr->attname)) != NULL)
479
0
        ereport(ERROR,
480
0
            (errcode(ERRCODE_DUPLICATE_COLUMN),
481
0
             errmsg("column name \"%s\" conflicts with a system column name",
482
0
                NameStr(attr->attname))));
483
0
    }
484
0
  }
485
486
  /*
487
   * next check for repeated attribute names
488
   */
489
0
  for (i = 1; i < natts; i++)
490
0
  {
491
0
    for (j = 0; j < i; j++)
492
0
    {
493
0
      if (strcmp(NameStr(TupleDescAttr(tupdesc, j)->attname),
494
0
             NameStr(TupleDescAttr(tupdesc, i)->attname)) == 0)
495
0
        ereport(ERROR,
496
0
            (errcode(ERRCODE_DUPLICATE_COLUMN),
497
0
             errmsg("column name \"%s\" specified more than once",
498
0
                NameStr(TupleDescAttr(tupdesc, j)->attname))));
499
0
    }
500
0
  }
501
502
  /*
503
   * next check the attribute types
504
   */
505
0
  for (i = 0; i < natts; i++)
506
0
  {
507
0
    CheckAttributeType(NameStr(TupleDescAttr(tupdesc, i)->attname),
508
0
               TupleDescAttr(tupdesc, i)->atttypid,
509
0
               TupleDescAttr(tupdesc, i)->attcollation,
510
0
               NIL, /* assume we're creating a new rowtype */
511
0
               flags | (TupleDescAttr(tupdesc, i)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL ? CHKATYPE_IS_VIRTUAL : 0));
512
0
  }
513
0
}
514
515
/* --------------------------------
516
 *    CheckAttributeType
517
 *
518
 *    Verify that the proposed datatype of an attribute is legal.
519
 *    This is needed mainly because there are types (and pseudo-types)
520
 *    in the catalogs that we do not support as elements of real tuples.
521
 *    We also check some other properties required of a table column.
522
 *
523
 * If the attribute is being proposed for addition to an existing table or
524
 * composite type, pass a one-element list of the rowtype OID as
525
 * containing_rowtypes.  When checking a to-be-created rowtype, it's
526
 * sufficient to pass NIL, because there could not be any recursive reference
527
 * to a not-yet-existing rowtype.
528
 *
529
 * flags is a bitmask controlling which datatypes we allow.  For the most
530
 * part, pseudo-types are disallowed as attribute types, but there are some
531
 * exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
532
 * in some cases.  (This works because values of those type classes are
533
 * self-identifying to some extent.  However, RECORDOID and RECORDARRAYOID
534
 * are reliably identifiable only within a session, since the identity info
535
 * may use a typmod that is only locally assigned.  The caller is expected
536
 * to know whether these cases are safe.)
537
 *
538
 * flags can also control the phrasing of the error messages.  If
539
 * CHKATYPE_IS_PARTKEY is specified, "attname" should be a partition key
540
 * column number as text, not a real column name.
541
 * --------------------------------
542
 */
543
void
544
CheckAttributeType(const char *attname,
545
           Oid atttypid, Oid attcollation,
546
           List *containing_rowtypes,
547
           int flags)
548
0
{
549
0
  char    att_typtype = get_typtype(atttypid);
550
0
  Oid     att_typelem;
551
552
  /* since this function recurses, it could be driven to stack overflow */
553
0
  check_stack_depth();
554
555
0
  if (att_typtype == TYPTYPE_PSEUDO)
556
0
  {
557
    /*
558
     * We disallow pseudo-type columns, with the exception of ANYARRAY,
559
     * RECORD, and RECORD[] when the caller says that those are OK.
560
     *
561
     * We don't need to worry about recursive containment for RECORD and
562
     * RECORD[] because (a) no named composite type should be allowed to
563
     * contain those, and (b) two "anonymous" record types couldn't be
564
     * considered to be the same type, so infinite recursion isn't
565
     * possible.
566
     */
567
0
    if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
568
0
        (atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
569
0
        (atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
570
0
    {
571
0
      if (flags & CHKATYPE_IS_PARTKEY)
572
0
        ereport(ERROR,
573
0
            (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
574
        /* translator: first %s is an integer not a name */
575
0
             errmsg("partition key column %s has pseudo-type %s",
576
0
                attname, format_type_be(atttypid))));
577
0
      else
578
0
        ereport(ERROR,
579
0
            (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
580
0
             errmsg("column \"%s\" has pseudo-type %s",
581
0
                attname, format_type_be(atttypid))));
582
0
    }
583
0
  }
584
0
  else if (att_typtype == TYPTYPE_DOMAIN)
585
0
  {
586
    /*
587
     * Prevent virtual generated columns from having a domain type.  We
588
     * would have to enforce domain constraints when columns underlying
589
     * the generated column change.  This could possibly be implemented,
590
     * but it's not.
591
     */
592
0
    if (flags & CHKATYPE_IS_VIRTUAL)
593
0
      ereport(ERROR,
594
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
595
0
          errmsg("virtual generated column \"%s\" cannot have a domain type", attname));
596
597
    /*
598
     * If it's a domain, recurse to check its base type.
599
     */
600
0
    CheckAttributeType(attname, getBaseType(atttypid), attcollation,
601
0
               containing_rowtypes,
602
0
               flags);
603
0
  }
604
0
  else if (att_typtype == TYPTYPE_COMPOSITE)
605
0
  {
606
    /*
607
     * For a composite type, recurse into its attributes.
608
     */
609
0
    Relation  relation;
610
0
    TupleDesc tupdesc;
611
0
    int     i;
612
613
    /*
614
     * Check for self-containment.  Eventually we might be able to allow
615
     * this (just return without complaint, if so) but it's not clear how
616
     * many other places would require anti-recursion defenses before it
617
     * would be safe to allow tables to contain their own rowtype.
618
     */
619
0
    if (list_member_oid(containing_rowtypes, atttypid))
620
0
      ereport(ERROR,
621
0
          (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
622
0
           errmsg("composite type %s cannot be made a member of itself",
623
0
              format_type_be(atttypid))));
624
625
0
    containing_rowtypes = lappend_oid(containing_rowtypes, atttypid);
626
627
0
    relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
628
629
0
    tupdesc = RelationGetDescr(relation);
630
631
0
    for (i = 0; i < tupdesc->natts; i++)
632
0
    {
633
0
      Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
634
635
0
      if (attr->attisdropped)
636
0
        continue;
637
0
      CheckAttributeType(NameStr(attr->attname),
638
0
                 attr->atttypid, attr->attcollation,
639
0
                 containing_rowtypes,
640
0
                 flags & ~CHKATYPE_IS_PARTKEY);
641
0
    }
642
643
0
    relation_close(relation, AccessShareLock);
644
645
0
    containing_rowtypes = list_delete_last(containing_rowtypes);
646
0
  }
647
0
  else if (att_typtype == TYPTYPE_RANGE)
648
0
  {
649
    /*
650
     * If it's a range, recurse to check its subtype.
651
     */
652
0
    CheckAttributeType(attname, get_range_subtype(atttypid),
653
0
               get_range_collation(atttypid),
654
0
               containing_rowtypes,
655
0
               flags);
656
0
  }
657
0
  else if (OidIsValid((att_typelem = get_element_type(atttypid))))
658
0
  {
659
    /*
660
     * Must recurse into array types, too, in case they are composite.
661
     */
662
0
    CheckAttributeType(attname, att_typelem, attcollation,
663
0
               containing_rowtypes,
664
0
               flags);
665
0
  }
666
667
  /*
668
   * For consistency with check_virtual_generated_security().
669
   */
670
0
  if ((flags & CHKATYPE_IS_VIRTUAL) && atttypid >= FirstUnpinnedObjectId)
671
0
    ereport(ERROR,
672
0
        errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673
0
        errmsg("virtual generated column \"%s\" cannot have a user-defined type", attname),
674
0
        errdetail("Virtual generated columns that make use of user-defined types are not yet supported."));
675
676
  /*
677
   * This might not be strictly invalid per SQL standard, but it is pretty
678
   * useless, and it cannot be dumped, so we must disallow it.
679
   */
680
0
  if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
681
0
  {
682
0
    if (flags & CHKATYPE_IS_PARTKEY)
683
0
      ereport(ERROR,
684
0
          (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
685
      /* translator: first %s is an integer not a name */
686
0
           errmsg("no collation was derived for partition key column %s with collatable type %s",
687
0
              attname, format_type_be(atttypid)),
688
0
           errhint("Use the COLLATE clause to set the collation explicitly.")));
689
0
    else
690
0
      ereport(ERROR,
691
0
          (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
692
0
           errmsg("no collation was derived for column \"%s\" with collatable type %s",
693
0
              attname, format_type_be(atttypid)),
694
0
           errhint("Use the COLLATE clause to set the collation explicitly.")));
695
0
  }
696
0
}
697
698
/*
699
 * InsertPgAttributeTuples
700
 *    Construct and insert a set of tuples in pg_attribute.
701
 *
702
 * Caller has already opened and locked pg_attribute.  tupdesc contains the
703
 * attributes to insert.  tupdesc_extra supplies the values for certain
704
 * variable-length/nullable pg_attribute fields and must contain the same
705
 * number of elements as tupdesc or be NULL.  The other variable-length fields
706
 * of pg_attribute are always initialized to null values.
707
 *
708
 * indstate is the index state for CatalogTupleInsertWithInfo.  It can be
709
 * passed as NULL, in which case we'll fetch the necessary info.  (Don't do
710
 * this when inserting multiple attributes, because it's a tad more
711
 * expensive.)
712
 *
713
 * new_rel_oid is the relation OID assigned to the attributes inserted.
714
 * If set to InvalidOid, the relation OID from tupdesc is used instead.
715
 */
716
void
717
InsertPgAttributeTuples(Relation pg_attribute_rel,
718
            TupleDesc tupdesc,
719
            Oid new_rel_oid,
720
            const FormExtraData_pg_attribute tupdesc_extra[],
721
            CatalogIndexState indstate)
722
0
{
723
0
  TupleTableSlot **slot;
724
0
  TupleDesc td;
725
0
  int     nslots;
726
0
  int     natts = 0;
727
0
  int     slotCount = 0;
728
0
  bool    close_index = false;
729
730
0
  td = RelationGetDescr(pg_attribute_rel);
731
732
  /* Initialize the number of slots to use */
733
0
  nslots = Min(tupdesc->natts,
734
0
         (MAX_CATALOG_MULTI_INSERT_BYTES / sizeof(FormData_pg_attribute)));
735
0
  slot = palloc(sizeof(TupleTableSlot *) * nslots);
736
0
  for (int i = 0; i < nslots; i++)
737
0
    slot[i] = MakeSingleTupleTableSlot(td, &TTSOpsHeapTuple);
738
739
0
  while (natts < tupdesc->natts)
740
0
  {
741
0
    Form_pg_attribute attrs = TupleDescAttr(tupdesc, natts);
742
0
    const FormExtraData_pg_attribute *attrs_extra = tupdesc_extra ? &tupdesc_extra[natts] : NULL;
743
744
0
    ExecClearTuple(slot[slotCount]);
745
746
0
    memset(slot[slotCount]->tts_isnull, false,
747
0
         slot[slotCount]->tts_tupleDescriptor->natts * sizeof(bool));
748
749
0
    if (new_rel_oid != InvalidOid)
750
0
      slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_rel_oid);
751
0
    else
752
0
      slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(attrs->attrelid);
753
754
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attname - 1] = NameGetDatum(&attrs->attname);
755
0
    slot[slotCount]->tts_values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(attrs->atttypid);
756
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(attrs->attlen);
757
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(attrs->attnum);
758
0
    slot[slotCount]->tts_values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(attrs->atttypmod);
759
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attndims - 1] = Int16GetDatum(attrs->attndims);
760
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(attrs->attbyval);
761
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attalign - 1] = CharGetDatum(attrs->attalign);
762
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(attrs->attstorage);
763
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attcompression - 1] = CharGetDatum(attrs->attcompression);
764
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(attrs->attnotnull);
765
0
    slot[slotCount]->tts_values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(attrs->atthasdef);
766
0
    slot[slotCount]->tts_values[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(attrs->atthasmissing);
767
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(attrs->attidentity);
768
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attgenerated - 1] = CharGetDatum(attrs->attgenerated);
769
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(attrs->attisdropped);
770
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(attrs->attislocal);
771
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attinhcount - 1] = Int16GetDatum(attrs->attinhcount);
772
0
    slot[slotCount]->tts_values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(attrs->attcollation);
773
0
    if (attrs_extra)
774
0
    {
775
0
      slot[slotCount]->tts_values[Anum_pg_attribute_attstattarget - 1] = attrs_extra->attstattarget.value;
776
0
      slot[slotCount]->tts_isnull[Anum_pg_attribute_attstattarget - 1] = attrs_extra->attstattarget.isnull;
777
778
0
      slot[slotCount]->tts_values[Anum_pg_attribute_attoptions - 1] = attrs_extra->attoptions.value;
779
0
      slot[slotCount]->tts_isnull[Anum_pg_attribute_attoptions - 1] = attrs_extra->attoptions.isnull;
780
0
    }
781
0
    else
782
0
    {
783
0
      slot[slotCount]->tts_isnull[Anum_pg_attribute_attstattarget - 1] = true;
784
0
      slot[slotCount]->tts_isnull[Anum_pg_attribute_attoptions - 1] = true;
785
0
    }
786
787
    /*
788
     * The remaining fields are not set for new columns.
789
     */
790
0
    slot[slotCount]->tts_isnull[Anum_pg_attribute_attacl - 1] = true;
791
0
    slot[slotCount]->tts_isnull[Anum_pg_attribute_attfdwoptions - 1] = true;
792
0
    slot[slotCount]->tts_isnull[Anum_pg_attribute_attmissingval - 1] = true;
793
794
0
    ExecStoreVirtualTuple(slot[slotCount]);
795
0
    slotCount++;
796
797
    /*
798
     * If slots are full or the end of processing has been reached, insert
799
     * a batch of tuples.
800
     */
801
0
    if (slotCount == nslots || natts == tupdesc->natts - 1)
802
0
    {
803
      /* fetch index info only when we know we need it */
804
0
      if (!indstate)
805
0
      {
806
0
        indstate = CatalogOpenIndexes(pg_attribute_rel);
807
0
        close_index = true;
808
0
      }
809
810
      /* insert the new tuples and update the indexes */
811
0
      CatalogTuplesMultiInsertWithInfo(pg_attribute_rel, slot, slotCount,
812
0
                       indstate);
813
0
      slotCount = 0;
814
0
    }
815
816
0
    natts++;
817
0
  }
818
819
0
  if (close_index)
820
0
    CatalogCloseIndexes(indstate);
821
0
  for (int i = 0; i < nslots; i++)
822
0
    ExecDropSingleTupleTableSlot(slot[i]);
823
0
  pfree(slot);
824
0
}
825
826
/* --------------------------------
827
 *    AddNewAttributeTuples
828
 *
829
 *    this registers the new relation's schema by adding
830
 *    tuples to pg_attribute.
831
 * --------------------------------
832
 */
833
static void
834
AddNewAttributeTuples(Oid new_rel_oid,
835
            TupleDesc tupdesc,
836
            char relkind)
837
0
{
838
0
  Relation  rel;
839
0
  CatalogIndexState indstate;
840
0
  int     natts = tupdesc->natts;
841
0
  ObjectAddress myself,
842
0
        referenced;
843
844
  /*
845
   * open pg_attribute and its indexes.
846
   */
847
0
  rel = table_open(AttributeRelationId, RowExclusiveLock);
848
849
0
  indstate = CatalogOpenIndexes(rel);
850
851
0
  InsertPgAttributeTuples(rel, tupdesc, new_rel_oid, NULL, indstate);
852
853
  /* add dependencies on their datatypes and collations */
854
0
  for (int i = 0; i < natts; i++)
855
0
  {
856
0
    Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
857
858
    /* Add dependency info */
859
0
    ObjectAddressSubSet(myself, RelationRelationId, new_rel_oid, i + 1);
860
0
    ObjectAddressSet(referenced, TypeRelationId, attr->atttypid);
861
0
    recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
862
863
    /* The default collation is pinned, so don't bother recording it */
864
0
    if (OidIsValid(attr->attcollation) &&
865
0
      attr->attcollation != DEFAULT_COLLATION_OID)
866
0
    {
867
0
      ObjectAddressSet(referenced, CollationRelationId,
868
0
               attr->attcollation);
869
0
      recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
870
0
    }
871
0
  }
872
873
  /*
874
   * Next we add the system attributes.  Skip all for a view or type
875
   * relation.  We don't bother with making datatype dependencies here,
876
   * since presumably all these types are pinned.
877
   */
878
0
  if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
879
0
  {
880
0
    TupleDesc td;
881
882
0
    td = CreateTupleDesc(lengthof(SysAtt), (FormData_pg_attribute **) &SysAtt);
883
884
0
    InsertPgAttributeTuples(rel, td, new_rel_oid, NULL, indstate);
885
0
    FreeTupleDesc(td);
886
0
  }
887
888
  /*
889
   * clean up
890
   */
891
0
  CatalogCloseIndexes(indstate);
892
893
0
  table_close(rel, RowExclusiveLock);
894
0
}
895
896
/* --------------------------------
897
 *    InsertPgClassTuple
898
 *
899
 *    Construct and insert a new tuple in pg_class.
900
 *
901
 * Caller has already opened and locked pg_class.
902
 * Tuple data is taken from new_rel_desc->rd_rel, except for the
903
 * variable-width fields which are not present in a cached reldesc.
904
 * relacl and reloptions are passed in Datum form (to avoid having
905
 * to reference the data types in heap.h).  Pass (Datum) 0 to set them
906
 * to NULL.
907
 * --------------------------------
908
 */
909
void
910
InsertPgClassTuple(Relation pg_class_desc,
911
           Relation new_rel_desc,
912
           Oid new_rel_oid,
913
           Datum relacl,
914
           Datum reloptions)
915
0
{
916
0
  Form_pg_class rd_rel = new_rel_desc->rd_rel;
917
0
  Datum   values[Natts_pg_class];
918
0
  bool    nulls[Natts_pg_class];
919
0
  HeapTuple tup;
920
921
  /* This is a tad tedious, but way cleaner than what we used to do... */
922
0
  memset(values, 0, sizeof(values));
923
0
  memset(nulls, false, sizeof(nulls));
924
925
0
  values[Anum_pg_class_oid - 1] = ObjectIdGetDatum(new_rel_oid);
926
0
  values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
927
0
  values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
928
0
  values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
929
0
  values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
930
0
  values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
931
0
  values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
932
0
  values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
933
0
  values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
934
0
  values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
935
0
  values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
936
0
  values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
937
0
  values[Anum_pg_class_relallfrozen - 1] = Int32GetDatum(rd_rel->relallfrozen);
938
0
  values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
939
0
  values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
940
0
  values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
941
0
  values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
942
0
  values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
943
0
  values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
944
0
  values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
945
0
  values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
946
0
  values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
947
0
  values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
948
0
  values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
949
0
  values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
950
0
  values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
951
0
  values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
952
0
  values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
953
0
  values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
954
0
  values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
955
0
  values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
956
0
  if (relacl != (Datum) 0)
957
0
    values[Anum_pg_class_relacl - 1] = relacl;
958
0
  else
959
0
    nulls[Anum_pg_class_relacl - 1] = true;
960
0
  if (reloptions != (Datum) 0)
961
0
    values[Anum_pg_class_reloptions - 1] = reloptions;
962
0
  else
963
0
    nulls[Anum_pg_class_reloptions - 1] = true;
964
965
  /* relpartbound is set by updating this tuple, if necessary */
966
0
  nulls[Anum_pg_class_relpartbound - 1] = true;
967
968
0
  tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
969
970
  /* finally insert the new tuple, update the indexes, and clean up */
971
0
  CatalogTupleInsert(pg_class_desc, tup);
972
973
0
  heap_freetuple(tup);
974
0
}
975
976
/* --------------------------------
977
 *    AddNewRelationTuple
978
 *
979
 *    this registers the new relation in the catalogs by
980
 *    adding a tuple to pg_class.
981
 * --------------------------------
982
 */
983
static void
984
AddNewRelationTuple(Relation pg_class_desc,
985
          Relation new_rel_desc,
986
          Oid new_rel_oid,
987
          Oid new_type_oid,
988
          Oid reloftype,
989
          Oid relowner,
990
          char relkind,
991
          TransactionId relfrozenxid,
992
          TransactionId relminmxid,
993
          Datum relacl,
994
          Datum reloptions)
995
0
{
996
0
  Form_pg_class new_rel_reltup;
997
998
  /*
999
   * first we update some of the information in our uncataloged relation's
1000
   * relation descriptor.
1001
   */
1002
0
  new_rel_reltup = new_rel_desc->rd_rel;
1003
1004
  /* The relation is empty */
1005
0
  new_rel_reltup->relpages = 0;
1006
0
  new_rel_reltup->reltuples = -1;
1007
0
  new_rel_reltup->relallvisible = 0;
1008
0
  new_rel_reltup->relallfrozen = 0;
1009
1010
  /* Sequences always have a known size */
1011
0
  if (relkind == RELKIND_SEQUENCE)
1012
0
  {
1013
0
    new_rel_reltup->relpages = 1;
1014
0
    new_rel_reltup->reltuples = 1;
1015
0
  }
1016
1017
0
  new_rel_reltup->relfrozenxid = relfrozenxid;
1018
0
  new_rel_reltup->relminmxid = relminmxid;
1019
0
  new_rel_reltup->relowner = relowner;
1020
0
  new_rel_reltup->reltype = new_type_oid;
1021
0
  new_rel_reltup->reloftype = reloftype;
1022
1023
  /* relispartition is always set by updating this tuple later */
1024
0
  new_rel_reltup->relispartition = false;
1025
1026
  /* fill rd_att's type ID with something sane even if reltype is zero */
1027
0
  new_rel_desc->rd_att->tdtypeid = new_type_oid ? new_type_oid : RECORDOID;
1028
0
  new_rel_desc->rd_att->tdtypmod = -1;
1029
1030
  /* Now build and insert the tuple */
1031
0
  InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
1032
0
             relacl, reloptions);
1033
0
}
1034
1035
1036
/* --------------------------------
1037
 *    AddNewRelationType -
1038
 *
1039
 *    define a composite type corresponding to the new relation
1040
 * --------------------------------
1041
 */
1042
static ObjectAddress
1043
AddNewRelationType(const char *typeName,
1044
           Oid typeNamespace,
1045
           Oid new_rel_oid,
1046
           char new_rel_kind,
1047
           Oid ownerid,
1048
           Oid new_row_type,
1049
           Oid new_array_type)
1050
0
{
1051
0
  return
1052
0
    TypeCreate(new_row_type,  /* optional predetermined OID */
1053
0
           typeName,  /* type name */
1054
0
           typeNamespace, /* type namespace */
1055
0
           new_rel_oid, /* relation oid */
1056
0
           new_rel_kind,  /* relation kind */
1057
0
           ownerid,   /* owner's ID */
1058
0
           -1,      /* internal size (varlena) */
1059
0
           TYPTYPE_COMPOSITE, /* type-type (composite) */
1060
0
           TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
1061
0
           false,    /* composite types are never preferred */
1062
0
           DEFAULT_TYPDELIM, /* default array delimiter */
1063
0
           F_RECORD_IN, /* input procedure */
1064
0
           F_RECORD_OUT, /* output procedure */
1065
0
           F_RECORD_RECV, /* receive procedure */
1066
0
           F_RECORD_SEND, /* send procedure */
1067
0
           InvalidOid, /* typmodin procedure - none */
1068
0
           InvalidOid, /* typmodout procedure - none */
1069
0
           InvalidOid, /* analyze procedure - default */
1070
0
           InvalidOid, /* subscript procedure - none */
1071
0
           InvalidOid, /* array element type - irrelevant */
1072
0
           false,    /* this is not an array type */
1073
0
           new_array_type,  /* array type if any */
1074
0
           InvalidOid, /* domain base type - irrelevant */
1075
0
           NULL,   /* default value - none */
1076
0
           NULL,   /* default binary representation */
1077
0
           false,    /* passed by reference */
1078
0
           TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1079
0
           TYPSTORAGE_EXTENDED, /* fully TOASTable */
1080
0
           -1,      /* typmod */
1081
0
           0,     /* array dimensions for typBaseType */
1082
0
           false,    /* Type NOT NULL */
1083
0
           InvalidOid); /* rowtypes never have a collation */
1084
0
}
1085
1086
/* --------------------------------
1087
 *    heap_create_with_catalog
1088
 *
1089
 *    creates a new cataloged relation.  see comments above.
1090
 *
1091
 * Arguments:
1092
 *  relname: name to give to new rel
1093
 *  relnamespace: OID of namespace it goes in
1094
 *  reltablespace: OID of tablespace it goes in
1095
 *  relid: OID to assign to new rel, or InvalidOid to select a new OID
1096
 *  reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1097
 *  reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1098
 *  ownerid: OID of new rel's owner
1099
 *  accessmtd: OID of new rel's access method
1100
 *  tupdesc: tuple descriptor (source of column definitions)
1101
 *  cooked_constraints: list of precooked check constraints and defaults
1102
 *  relkind: relkind for new rel
1103
 *  relpersistence: rel's persistence status (permanent, temp, or unlogged)
1104
 *  shared_relation: true if it's to be a shared relation
1105
 *  mapped_relation: true if the relation will use the relfilenumber map
1106
 *  oncommit: ON COMMIT marking (only relevant if it's a temp table)
1107
 *  reloptions: reloptions in Datum form, or (Datum) 0 if none
1108
 *  use_user_acl: true if should look for user-defined default permissions;
1109
 *    if false, relacl is always set NULL
1110
 *  allow_system_table_mods: true to allow creation in system namespaces
1111
 *  is_internal: is this a system-generated catalog?
1112
 *  relrewrite: link to original relation during a table rewrite
1113
 *
1114
 * Output parameters:
1115
 *  typaddress: if not null, gets the object address of the new pg_type entry
1116
 *  (this must be null if the relkind is one that doesn't get a pg_type entry)
1117
 *
1118
 * Returns the OID of the new relation
1119
 * --------------------------------
1120
 */
1121
Oid
1122
heap_create_with_catalog(const char *relname,
1123
             Oid relnamespace,
1124
             Oid reltablespace,
1125
             Oid relid,
1126
             Oid reltypeid,
1127
             Oid reloftypeid,
1128
             Oid ownerid,
1129
             Oid accessmtd,
1130
             TupleDesc tupdesc,
1131
             List *cooked_constraints,
1132
             char relkind,
1133
             char relpersistence,
1134
             bool shared_relation,
1135
             bool mapped_relation,
1136
             OnCommitAction oncommit,
1137
             Datum reloptions,
1138
             bool use_user_acl,
1139
             bool allow_system_table_mods,
1140
             bool is_internal,
1141
             Oid relrewrite,
1142
             ObjectAddress *typaddress)
1143
0
{
1144
0
  Relation  pg_class_desc;
1145
0
  Relation  new_rel_desc;
1146
0
  Acl      *relacl;
1147
0
  Oid     existing_relid;
1148
0
  Oid     old_type_oid;
1149
0
  Oid     new_type_oid;
1150
1151
  /* By default set to InvalidOid unless overridden by binary-upgrade */
1152
0
  RelFileNumber relfilenumber = InvalidRelFileNumber;
1153
0
  TransactionId relfrozenxid;
1154
0
  MultiXactId relminmxid;
1155
1156
0
  pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1157
1158
  /*
1159
   * sanity checks
1160
   */
1161
0
  Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1162
1163
  /*
1164
   * Validate proposed tupdesc for the desired relkind.  If
1165
   * allow_system_table_mods is on, allow ANYARRAY to be used; this is a
1166
   * hack to allow creating pg_statistic and cloning it during VACUUM FULL.
1167
   */
1168
0
  CheckAttributeNamesTypes(tupdesc, relkind,
1169
0
               allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
1170
1171
  /*
1172
   * This would fail later on anyway, if the relation already exists.  But
1173
   * by catching it here we can emit a nicer error message.
1174
   */
1175
0
  existing_relid = get_relname_relid(relname, relnamespace);
1176
0
  if (existing_relid != InvalidOid)
1177
0
    ereport(ERROR,
1178
0
        (errcode(ERRCODE_DUPLICATE_TABLE),
1179
0
         errmsg("relation \"%s\" already exists", relname)));
1180
1181
  /*
1182
   * Since we are going to create a rowtype as well, also check for
1183
   * collision with an existing type name.  If there is one and it's an
1184
   * autogenerated array, we can rename it out of the way; otherwise we can
1185
   * at least give a good error message.
1186
   */
1187
0
  old_type_oid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
1188
0
                   CStringGetDatum(relname),
1189
0
                   ObjectIdGetDatum(relnamespace));
1190
0
  if (OidIsValid(old_type_oid))
1191
0
  {
1192
0
    if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1193
0
      ereport(ERROR,
1194
0
          (errcode(ERRCODE_DUPLICATE_OBJECT),
1195
0
           errmsg("type \"%s\" already exists", relname),
1196
0
           errhint("A relation has an associated type of the same name, "
1197
0
               "so you must use a name that doesn't conflict "
1198
0
               "with any existing type.")));
1199
0
  }
1200
1201
  /*
1202
   * Shared relations must be in pg_global (last-ditch check)
1203
   */
1204
0
  if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1205
0
    elog(ERROR, "shared relations must be placed in pg_global tablespace");
1206
1207
  /*
1208
   * Allocate an OID for the relation, unless we were told what to use.
1209
   *
1210
   * The OID will be the relfilenumber as well, so make sure it doesn't
1211
   * collide with either pg_class OIDs or existing physical files.
1212
   */
1213
0
  if (!OidIsValid(relid))
1214
0
  {
1215
    /* Use binary-upgrade override for pg_class.oid and relfilenumber */
1216
0
    if (IsBinaryUpgrade)
1217
0
    {
1218
      /*
1219
       * Indexes are not supported here; they use
1220
       * binary_upgrade_next_index_pg_class_oid.
1221
       */
1222
0
      Assert(relkind != RELKIND_INDEX);
1223
0
      Assert(relkind != RELKIND_PARTITIONED_INDEX);
1224
1225
0
      if (relkind == RELKIND_TOASTVALUE)
1226
0
      {
1227
        /* There might be no TOAST table, so we have to test for it. */
1228
0
        if (OidIsValid(binary_upgrade_next_toast_pg_class_oid))
1229
0
        {
1230
0
          relid = binary_upgrade_next_toast_pg_class_oid;
1231
0
          binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1232
1233
0
          if (!RelFileNumberIsValid(binary_upgrade_next_toast_pg_class_relfilenumber))
1234
0
            ereport(ERROR,
1235
0
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1236
0
                 errmsg("toast relfilenumber value not set when in binary upgrade mode")));
1237
1238
0
          relfilenumber = binary_upgrade_next_toast_pg_class_relfilenumber;
1239
0
          binary_upgrade_next_toast_pg_class_relfilenumber = InvalidRelFileNumber;
1240
0
        }
1241
0
      }
1242
0
      else
1243
0
      {
1244
0
        if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1245
0
          ereport(ERROR,
1246
0
              (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1247
0
               errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1248
1249
0
        relid = binary_upgrade_next_heap_pg_class_oid;
1250
0
        binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1251
1252
0
        if (RELKIND_HAS_STORAGE(relkind))
1253
0
        {
1254
0
          if (!RelFileNumberIsValid(binary_upgrade_next_heap_pg_class_relfilenumber))
1255
0
            ereport(ERROR,
1256
0
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1257
0
                 errmsg("relfilenumber value not set when in binary upgrade mode")));
1258
1259
0
          relfilenumber = binary_upgrade_next_heap_pg_class_relfilenumber;
1260
0
          binary_upgrade_next_heap_pg_class_relfilenumber = InvalidRelFileNumber;
1261
0
        }
1262
0
      }
1263
0
    }
1264
1265
0
    if (!OidIsValid(relid))
1266
0
      relid = GetNewRelFileNumber(reltablespace, pg_class_desc,
1267
0
                    relpersistence);
1268
0
  }
1269
1270
  /*
1271
   * Other sessions' catalog scans can't find this until we commit.  Hence,
1272
   * it doesn't hurt to hold AccessExclusiveLock.  Do it here so callers
1273
   * can't accidentally vary in their lock mode or acquisition timing.
1274
   */
1275
0
  LockRelationOid(relid, AccessExclusiveLock);
1276
1277
  /*
1278
   * Determine the relation's initial permissions.
1279
   */
1280
0
  if (use_user_acl)
1281
0
  {
1282
0
    switch (relkind)
1283
0
    {
1284
0
      case RELKIND_RELATION:
1285
0
      case RELKIND_VIEW:
1286
0
      case RELKIND_MATVIEW:
1287
0
      case RELKIND_FOREIGN_TABLE:
1288
0
      case RELKIND_PARTITIONED_TABLE:
1289
0
        relacl = get_user_default_acl(OBJECT_TABLE, ownerid,
1290
0
                        relnamespace);
1291
0
        break;
1292
0
      case RELKIND_SEQUENCE:
1293
0
        relacl = get_user_default_acl(OBJECT_SEQUENCE, ownerid,
1294
0
                        relnamespace);
1295
0
        break;
1296
0
      default:
1297
0
        relacl = NULL;
1298
0
        break;
1299
0
    }
1300
0
  }
1301
0
  else
1302
0
    relacl = NULL;
1303
1304
  /*
1305
   * Create the relcache entry (mostly dummy at this point) and the physical
1306
   * disk file.  (If we fail further down, it's the smgr's responsibility to
1307
   * remove the disk file again.)
1308
   *
1309
   * NB: Note that passing create_storage = true is correct even for binary
1310
   * upgrade.  The storage we create here will be replaced later, but we
1311
   * need to have something on disk in the meanwhile.
1312
   */
1313
0
  new_rel_desc = heap_create(relname,
1314
0
                 relnamespace,
1315
0
                 reltablespace,
1316
0
                 relid,
1317
0
                 relfilenumber,
1318
0
                 accessmtd,
1319
0
                 tupdesc,
1320
0
                 relkind,
1321
0
                 relpersistence,
1322
0
                 shared_relation,
1323
0
                 mapped_relation,
1324
0
                 allow_system_table_mods,
1325
0
                 &relfrozenxid,
1326
0
                 &relminmxid,
1327
0
                 true);
1328
1329
0
  Assert(relid == RelationGetRelid(new_rel_desc));
1330
1331
0
  new_rel_desc->rd_rel->relrewrite = relrewrite;
1332
1333
  /*
1334
   * Decide whether to create a pg_type entry for the relation's rowtype.
1335
   * These types are made except where the use of a relation as such is an
1336
   * implementation detail: toast tables, sequences and indexes.
1337
   */
1338
0
  if (!(relkind == RELKIND_SEQUENCE ||
1339
0
      relkind == RELKIND_TOASTVALUE ||
1340
0
      relkind == RELKIND_INDEX ||
1341
0
      relkind == RELKIND_PARTITIONED_INDEX))
1342
0
  {
1343
0
    Oid     new_array_oid;
1344
0
    ObjectAddress new_type_addr;
1345
0
    char     *relarrayname;
1346
1347
    /*
1348
     * We'll make an array over the composite type, too.  For largely
1349
     * historical reasons, the array type's OID is assigned first.
1350
     */
1351
0
    new_array_oid = AssignTypeArrayOid();
1352
1353
    /*
1354
     * Make the pg_type entry for the composite type.  The OID of the
1355
     * composite type can be preselected by the caller, but if reltypeid
1356
     * is InvalidOid, we'll generate a new OID for it.
1357
     *
1358
     * NOTE: we could get a unique-index failure here, in case someone
1359
     * else is creating the same type name in parallel but hadn't
1360
     * committed yet when we checked for a duplicate name above.
1361
     */
1362
0
    new_type_addr = AddNewRelationType(relname,
1363
0
                       relnamespace,
1364
0
                       relid,
1365
0
                       relkind,
1366
0
                       ownerid,
1367
0
                       reltypeid,
1368
0
                       new_array_oid);
1369
0
    new_type_oid = new_type_addr.objectId;
1370
0
    if (typaddress)
1371
0
      *typaddress = new_type_addr;
1372
1373
    /* Now create the array type. */
1374
0
    relarrayname = makeArrayTypeName(relname, relnamespace);
1375
1376
0
    TypeCreate(new_array_oid, /* force the type's OID to this */
1377
0
           relarrayname,  /* Array type name */
1378
0
           relnamespace,  /* Same namespace as parent */
1379
0
           InvalidOid, /* Not composite, no relationOid */
1380
0
           0,     /* relkind, also N/A here */
1381
0
           ownerid,   /* owner's ID */
1382
0
           -1,      /* Internal size (varlena) */
1383
0
           TYPTYPE_BASE, /* Not composite - typelem is */
1384
0
           TYPCATEGORY_ARRAY, /* type-category (array) */
1385
0
           false,    /* array types are never preferred */
1386
0
           DEFAULT_TYPDELIM, /* default array delimiter */
1387
0
           F_ARRAY_IN, /* array input proc */
1388
0
           F_ARRAY_OUT, /* array output proc */
1389
0
           F_ARRAY_RECV, /* array recv (bin) proc */
1390
0
           F_ARRAY_SEND, /* array send (bin) proc */
1391
0
           InvalidOid, /* typmodin procedure - none */
1392
0
           InvalidOid, /* typmodout procedure - none */
1393
0
           F_ARRAY_TYPANALYZE, /* array analyze procedure */
1394
0
           F_ARRAY_SUBSCRIPT_HANDLER, /* array subscript procedure */
1395
0
           new_type_oid,  /* array element type - the rowtype */
1396
0
           true,   /* yes, this is an array type */
1397
0
           InvalidOid, /* this has no array type */
1398
0
           InvalidOid, /* domain base type - irrelevant */
1399
0
           NULL,   /* default value - none */
1400
0
           NULL,   /* default binary representation */
1401
0
           false,    /* passed by reference */
1402
0
           TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1403
0
           TYPSTORAGE_EXTENDED, /* fully TOASTable */
1404
0
           -1,      /* typmod */
1405
0
           0,     /* array dimensions for typBaseType */
1406
0
           false,    /* Type NOT NULL */
1407
0
           InvalidOid); /* rowtypes never have a collation */
1408
1409
0
    pfree(relarrayname);
1410
0
  }
1411
0
  else
1412
0
  {
1413
    /* Caller should not be expecting a type to be created. */
1414
0
    Assert(reltypeid == InvalidOid);
1415
0
    Assert(typaddress == NULL);
1416
1417
0
    new_type_oid = InvalidOid;
1418
0
  }
1419
1420
  /*
1421
   * now create an entry in pg_class for the relation.
1422
   *
1423
   * NOTE: we could get a unique-index failure here, in case someone else is
1424
   * creating the same relation name in parallel but hadn't committed yet
1425
   * when we checked for a duplicate name above.
1426
   */
1427
0
  AddNewRelationTuple(pg_class_desc,
1428
0
            new_rel_desc,
1429
0
            relid,
1430
0
            new_type_oid,
1431
0
            reloftypeid,
1432
0
            ownerid,
1433
0
            relkind,
1434
0
            relfrozenxid,
1435
0
            relminmxid,
1436
0
            PointerGetDatum(relacl),
1437
0
            reloptions);
1438
1439
  /*
1440
   * now add tuples to pg_attribute for the attributes in our new relation.
1441
   */
1442
0
  AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
1443
1444
  /*
1445
   * Make a dependency link to force the relation to be deleted if its
1446
   * namespace is.  Also make a dependency link to its owner, as well as
1447
   * dependencies for any roles mentioned in the default ACL.
1448
   *
1449
   * For composite types, these dependencies are tracked for the pg_type
1450
   * entry, so we needn't record them here.  Likewise, TOAST tables don't
1451
   * need a namespace dependency (they live in a pinned namespace) nor an
1452
   * owner dependency (they depend indirectly through the parent table), nor
1453
   * should they have any ACL entries.  The same applies for extension
1454
   * dependencies.
1455
   *
1456
   * Also, skip this in bootstrap mode, since we don't make dependencies
1457
   * while bootstrapping.
1458
   */
1459
0
  if (relkind != RELKIND_COMPOSITE_TYPE &&
1460
0
    relkind != RELKIND_TOASTVALUE &&
1461
0
    !IsBootstrapProcessingMode())
1462
0
  {
1463
0
    ObjectAddress myself,
1464
0
          referenced;
1465
0
    ObjectAddresses *addrs;
1466
1467
0
    ObjectAddressSet(myself, RelationRelationId, relid);
1468
1469
0
    recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1470
1471
0
    recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1472
1473
0
    recordDependencyOnCurrentExtension(&myself, false);
1474
1475
0
    addrs = new_object_addresses();
1476
1477
0
    ObjectAddressSet(referenced, NamespaceRelationId, relnamespace);
1478
0
    add_exact_object_address(&referenced, addrs);
1479
1480
0
    if (reloftypeid)
1481
0
    {
1482
0
      ObjectAddressSet(referenced, TypeRelationId, reloftypeid);
1483
0
      add_exact_object_address(&referenced, addrs);
1484
0
    }
1485
1486
    /*
1487
     * Make a dependency link to force the relation to be deleted if its
1488
     * access method is.
1489
     *
1490
     * No need to add an explicit dependency for the toast table, as the
1491
     * main table depends on it.  Partitioned tables may not have an
1492
     * access method set.
1493
     */
1494
0
    if ((RELKIND_HAS_TABLE_AM(relkind) && relkind != RELKIND_TOASTVALUE) ||
1495
0
      (relkind == RELKIND_PARTITIONED_TABLE && OidIsValid(accessmtd)))
1496
0
    {
1497
0
      ObjectAddressSet(referenced, AccessMethodRelationId, accessmtd);
1498
0
      add_exact_object_address(&referenced, addrs);
1499
0
    }
1500
1501
0
    record_object_address_dependencies(&myself, addrs, DEPENDENCY_NORMAL);
1502
0
    free_object_addresses(addrs);
1503
0
  }
1504
1505
  /* Post creation hook for new relation */
1506
0
  InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1507
1508
  /*
1509
   * Store any supplied CHECK constraints and defaults.
1510
   *
1511
   * NB: this may do a CommandCounterIncrement and rebuild the relcache
1512
   * entry, so the relation must be valid and self-consistent at this point.
1513
   * In particular, there are not yet constraints and defaults anywhere.
1514
   */
1515
0
  StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1516
1517
  /*
1518
   * If there's a special on-commit action, remember it
1519
   */
1520
0
  if (oncommit != ONCOMMIT_NOOP)
1521
0
    register_on_commit_action(relid, oncommit);
1522
1523
  /*
1524
   * ok, the relation has been cataloged, so close our relations and return
1525
   * the OID of the newly created relation.
1526
   */
1527
0
  table_close(new_rel_desc, NoLock);  /* do not unlock till end of xact */
1528
0
  table_close(pg_class_desc, RowExclusiveLock);
1529
1530
0
  return relid;
1531
0
}
1532
1533
/*
1534
 *    RelationRemoveInheritance
1535
 *
1536
 * Formerly, this routine checked for child relations and aborted the
1537
 * deletion if any were found.  Now we rely on the dependency mechanism
1538
 * to check for or delete child relations.  By the time we get here,
1539
 * there are no children and we need only remove any pg_inherits rows
1540
 * linking this relation to its parent(s).
1541
 */
1542
static void
1543
RelationRemoveInheritance(Oid relid)
1544
0
{
1545
0
  Relation  catalogRelation;
1546
0
  SysScanDesc scan;
1547
0
  ScanKeyData key;
1548
0
  HeapTuple tuple;
1549
1550
0
  catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
1551
1552
0
  ScanKeyInit(&key,
1553
0
        Anum_pg_inherits_inhrelid,
1554
0
        BTEqualStrategyNumber, F_OIDEQ,
1555
0
        ObjectIdGetDatum(relid));
1556
1557
0
  scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1558
0
                NULL, 1, &key);
1559
1560
0
  while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1561
0
    CatalogTupleDelete(catalogRelation, &tuple->t_self);
1562
1563
0
  systable_endscan(scan);
1564
0
  table_close(catalogRelation, RowExclusiveLock);
1565
0
}
1566
1567
/*
1568
 *    DeleteRelationTuple
1569
 *
1570
 * Remove pg_class row for the given relid.
1571
 *
1572
 * Note: this is shared by relation deletion and index deletion.  It's
1573
 * not intended for use anyplace else.
1574
 */
1575
void
1576
DeleteRelationTuple(Oid relid)
1577
0
{
1578
0
  Relation  pg_class_desc;
1579
0
  HeapTuple tup;
1580
1581
  /* Grab an appropriate lock on the pg_class relation */
1582
0
  pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1583
1584
0
  tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1585
0
  if (!HeapTupleIsValid(tup))
1586
0
    elog(ERROR, "cache lookup failed for relation %u", relid);
1587
1588
  /* delete the relation tuple from pg_class, and finish up */
1589
0
  CatalogTupleDelete(pg_class_desc, &tup->t_self);
1590
1591
0
  ReleaseSysCache(tup);
1592
1593
0
  table_close(pg_class_desc, RowExclusiveLock);
1594
0
}
1595
1596
/*
1597
 *    DeleteAttributeTuples
1598
 *
1599
 * Remove pg_attribute rows for the given relid.
1600
 *
1601
 * Note: this is shared by relation deletion and index deletion.  It's
1602
 * not intended for use anyplace else.
1603
 */
1604
void
1605
DeleteAttributeTuples(Oid relid)
1606
0
{
1607
0
  Relation  attrel;
1608
0
  SysScanDesc scan;
1609
0
  ScanKeyData key[1];
1610
0
  HeapTuple atttup;
1611
1612
  /* Grab an appropriate lock on the pg_attribute relation */
1613
0
  attrel = table_open(AttributeRelationId, RowExclusiveLock);
1614
1615
  /* Use the index to scan only attributes of the target relation */
1616
0
  ScanKeyInit(&key[0],
1617
0
        Anum_pg_attribute_attrelid,
1618
0
        BTEqualStrategyNumber, F_OIDEQ,
1619
0
        ObjectIdGetDatum(relid));
1620
1621
0
  scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1622
0
                NULL, 1, key);
1623
1624
  /* Delete all the matching tuples */
1625
0
  while ((atttup = systable_getnext(scan)) != NULL)
1626
0
    CatalogTupleDelete(attrel, &atttup->t_self);
1627
1628
  /* Clean up after the scan */
1629
0
  systable_endscan(scan);
1630
0
  table_close(attrel, RowExclusiveLock);
1631
0
}
1632
1633
/*
1634
 *    DeleteSystemAttributeTuples
1635
 *
1636
 * Remove pg_attribute rows for system columns of the given relid.
1637
 *
1638
 * Note: this is only used when converting a table to a view.  Views don't
1639
 * have system columns, so we should remove them from pg_attribute.
1640
 */
1641
void
1642
DeleteSystemAttributeTuples(Oid relid)
1643
0
{
1644
0
  Relation  attrel;
1645
0
  SysScanDesc scan;
1646
0
  ScanKeyData key[2];
1647
0
  HeapTuple atttup;
1648
1649
  /* Grab an appropriate lock on the pg_attribute relation */
1650
0
  attrel = table_open(AttributeRelationId, RowExclusiveLock);
1651
1652
  /* Use the index to scan only system attributes of the target relation */
1653
0
  ScanKeyInit(&key[0],
1654
0
        Anum_pg_attribute_attrelid,
1655
0
        BTEqualStrategyNumber, F_OIDEQ,
1656
0
        ObjectIdGetDatum(relid));
1657
0
  ScanKeyInit(&key[1],
1658
0
        Anum_pg_attribute_attnum,
1659
0
        BTLessEqualStrategyNumber, F_INT2LE,
1660
0
        Int16GetDatum(0));
1661
1662
0
  scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1663
0
                NULL, 2, key);
1664
1665
  /* Delete all the matching tuples */
1666
0
  while ((atttup = systable_getnext(scan)) != NULL)
1667
0
    CatalogTupleDelete(attrel, &atttup->t_self);
1668
1669
  /* Clean up after the scan */
1670
0
  systable_endscan(scan);
1671
0
  table_close(attrel, RowExclusiveLock);
1672
0
}
1673
1674
/*
1675
 *    RemoveAttributeById
1676
 *
1677
 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1678
 * deleted in pg_attribute.  We also remove pg_statistic entries for it.
1679
 * (Everything else needed, such as getting rid of any pg_attrdef entry,
1680
 * is handled by dependency.c.)
1681
 */
1682
void
1683
RemoveAttributeById(Oid relid, AttrNumber attnum)
1684
0
{
1685
0
  Relation  rel;
1686
0
  Relation  attr_rel;
1687
0
  HeapTuple tuple;
1688
0
  Form_pg_attribute attStruct;
1689
0
  char    newattname[NAMEDATALEN];
1690
0
  Datum   valuesAtt[Natts_pg_attribute] = {0};
1691
0
  bool    nullsAtt[Natts_pg_attribute] = {0};
1692
0
  bool    replacesAtt[Natts_pg_attribute] = {0};
1693
1694
  /*
1695
   * Grab an exclusive lock on the target table, which we will NOT release
1696
   * until end of transaction.  (In the simple case where we are directly
1697
   * dropping this column, ATExecDropColumn already did this ... but when
1698
   * cascading from a drop of some other object, we may not have any lock.)
1699
   */
1700
0
  rel = relation_open(relid, AccessExclusiveLock);
1701
1702
0
  attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1703
1704
0
  tuple = SearchSysCacheCopy2(ATTNUM,
1705
0
                ObjectIdGetDatum(relid),
1706
0
                Int16GetDatum(attnum));
1707
0
  if (!HeapTupleIsValid(tuple))  /* shouldn't happen */
1708
0
    elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1709
0
       attnum, relid);
1710
0
  attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1711
1712
  /* Mark the attribute as dropped */
1713
0
  attStruct->attisdropped = true;
1714
1715
  /*
1716
   * Set the type OID to invalid.  A dropped attribute's type link cannot be
1717
   * relied on (once the attribute is dropped, the type might be too).
1718
   * Fortunately we do not need the type row --- the only really essential
1719
   * information is the type's typlen and typalign, which are preserved in
1720
   * the attribute's attlen and attalign.  We set atttypid to zero here as a
1721
   * means of catching code that incorrectly expects it to be valid.
1722
   */
1723
0
  attStruct->atttypid = InvalidOid;
1724
1725
  /* Remove any not-null constraint the column may have */
1726
0
  attStruct->attnotnull = false;
1727
1728
  /* Unset this so no one tries to look up the generation expression */
1729
0
  attStruct->attgenerated = '\0';
1730
1731
  /*
1732
   * Change the column name to something that isn't likely to conflict
1733
   */
1734
0
  snprintf(newattname, sizeof(newattname),
1735
0
       "........pg.dropped.%d........", attnum);
1736
0
  namestrcpy(&(attStruct->attname), newattname);
1737
1738
  /* Clear the missing value */
1739
0
  attStruct->atthasmissing = false;
1740
0
  nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
1741
0
  replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
1742
1743
  /*
1744
   * Clear the other nullable fields.  This saves some space in pg_attribute
1745
   * and removes no longer useful information.
1746
   */
1747
0
  nullsAtt[Anum_pg_attribute_attstattarget - 1] = true;
1748
0
  replacesAtt[Anum_pg_attribute_attstattarget - 1] = true;
1749
0
  nullsAtt[Anum_pg_attribute_attacl - 1] = true;
1750
0
  replacesAtt[Anum_pg_attribute_attacl - 1] = true;
1751
0
  nullsAtt[Anum_pg_attribute_attoptions - 1] = true;
1752
0
  replacesAtt[Anum_pg_attribute_attoptions - 1] = true;
1753
0
  nullsAtt[Anum_pg_attribute_attfdwoptions - 1] = true;
1754
0
  replacesAtt[Anum_pg_attribute_attfdwoptions - 1] = true;
1755
1756
0
  tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
1757
0
                valuesAtt, nullsAtt, replacesAtt);
1758
1759
0
  CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1760
1761
  /*
1762
   * Because updating the pg_attribute row will trigger a relcache flush for
1763
   * the target relation, we need not do anything else to notify other
1764
   * backends of the change.
1765
   */
1766
1767
0
  table_close(attr_rel, RowExclusiveLock);
1768
1769
0
  RemoveStatistics(relid, attnum);
1770
1771
0
  relation_close(rel, NoLock);
1772
0
}
1773
1774
/*
1775
 * heap_drop_with_catalog - removes specified relation from catalogs
1776
 *
1777
 * Note that this routine is not responsible for dropping objects that are
1778
 * linked to the pg_class entry via dependencies (for example, indexes and
1779
 * constraints).  Those are deleted by the dependency-tracing logic in
1780
 * dependency.c before control gets here.  In general, therefore, this routine
1781
 * should never be called directly; go through performDeletion() instead.
1782
 */
1783
void
1784
heap_drop_with_catalog(Oid relid)
1785
0
{
1786
0
  Relation  rel;
1787
0
  HeapTuple tuple;
1788
0
  Oid     parentOid = InvalidOid,
1789
0
        defaultPartOid = InvalidOid;
1790
1791
  /*
1792
   * To drop a partition safely, we must grab exclusive lock on its parent,
1793
   * because another backend might be about to execute a query on the parent
1794
   * table.  If it relies on previously cached partition descriptor, then it
1795
   * could attempt to access the just-dropped relation as its partition. We
1796
   * must therefore take a table lock strong enough to prevent all queries
1797
   * on the table from proceeding until we commit and send out a
1798
   * shared-cache-inval notice that will make them update their partition
1799
   * descriptors.
1800
   */
1801
0
  tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1802
0
  if (!HeapTupleIsValid(tuple))
1803
0
    elog(ERROR, "cache lookup failed for relation %u", relid);
1804
0
  if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1805
0
  {
1806
    /*
1807
     * We have to lock the parent if the partition is being detached,
1808
     * because it's possible that some query still has a partition
1809
     * descriptor that includes this partition.
1810
     */
1811
0
    parentOid = get_partition_parent(relid, true);
1812
0
    LockRelationOid(parentOid, AccessExclusiveLock);
1813
1814
    /*
1815
     * If this is not the default partition, dropping it will change the
1816
     * default partition's partition constraint, so we must lock it.
1817
     */
1818
0
    defaultPartOid = get_default_partition_oid(parentOid);
1819
0
    if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1820
0
      LockRelationOid(defaultPartOid, AccessExclusiveLock);
1821
0
  }
1822
1823
0
  ReleaseSysCache(tuple);
1824
1825
  /*
1826
   * Open and lock the relation.
1827
   */
1828
0
  rel = relation_open(relid, AccessExclusiveLock);
1829
1830
  /*
1831
   * There can no longer be anyone *else* touching the relation, but we
1832
   * might still have open queries or cursors, or pending trigger events, in
1833
   * our own session.
1834
   */
1835
0
  CheckTableNotInUse(rel, "DROP TABLE");
1836
1837
  /*
1838
   * This effectively deletes all rows in the table, and may be done in a
1839
   * serializable transaction.  In that case we must record a rw-conflict in
1840
   * to this transaction from each transaction holding a predicate lock on
1841
   * the table.
1842
   */
1843
0
  CheckTableForSerializableConflictIn(rel);
1844
1845
  /*
1846
   * Delete pg_foreign_table tuple first.
1847
   */
1848
0
  if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1849
0
  {
1850
0
    Relation  ftrel;
1851
0
    HeapTuple fttuple;
1852
1853
0
    ftrel = table_open(ForeignTableRelationId, RowExclusiveLock);
1854
1855
0
    fttuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1856
0
    if (!HeapTupleIsValid(fttuple))
1857
0
      elog(ERROR, "cache lookup failed for foreign table %u", relid);
1858
1859
0
    CatalogTupleDelete(ftrel, &fttuple->t_self);
1860
1861
0
    ReleaseSysCache(fttuple);
1862
0
    table_close(ftrel, RowExclusiveLock);
1863
0
  }
1864
1865
  /*
1866
   * If a partitioned table, delete the pg_partitioned_table tuple.
1867
   */
1868
0
  if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1869
0
    RemovePartitionKeyByRelId(relid);
1870
1871
  /*
1872
   * If the relation being dropped is the default partition itself,
1873
   * invalidate its entry in pg_partitioned_table.
1874
   */
1875
0
  if (relid == defaultPartOid)
1876
0
    update_default_partition_oid(parentOid, InvalidOid);
1877
1878
  /*
1879
   * Schedule unlinking of the relation's physical files at commit.
1880
   */
1881
0
  if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
1882
0
    RelationDropStorage(rel);
1883
1884
  /* ensure that stats are dropped if transaction commits */
1885
0
  pgstat_drop_relation(rel);
1886
1887
  /*
1888
   * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1889
   * until transaction commit.  This ensures no one else will try to do
1890
   * something with the doomed relation.
1891
   */
1892
0
  relation_close(rel, NoLock);
1893
1894
  /*
1895
   * Remove any associated relation synchronization states.
1896
   */
1897
0
  RemoveSubscriptionRel(InvalidOid, relid);
1898
1899
  /*
1900
   * Forget any ON COMMIT action for the rel
1901
   */
1902
0
  remove_on_commit_action(relid);
1903
1904
  /*
1905
   * Flush the relation from the relcache.  We want to do this before
1906
   * starting to remove catalog entries, just to be certain that no relcache
1907
   * entry rebuild will happen partway through.  (That should not really
1908
   * matter, since we don't do CommandCounterIncrement here, but let's be
1909
   * safe.)
1910
   */
1911
0
  RelationForgetRelation(relid);
1912
1913
  /*
1914
   * remove inheritance information
1915
   */
1916
0
  RelationRemoveInheritance(relid);
1917
1918
  /*
1919
   * delete statistics
1920
   */
1921
0
  RemoveStatistics(relid, 0);
1922
1923
  /*
1924
   * delete attribute tuples
1925
   */
1926
0
  DeleteAttributeTuples(relid);
1927
1928
  /*
1929
   * delete relation tuple
1930
   */
1931
0
  DeleteRelationTuple(relid);
1932
1933
0
  if (OidIsValid(parentOid))
1934
0
  {
1935
    /*
1936
     * If this is not the default partition, the partition constraint of
1937
     * the default partition has changed to include the portion of the key
1938
     * space previously covered by the dropped partition.
1939
     */
1940
0
    if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1941
0
      CacheInvalidateRelcacheByRelid(defaultPartOid);
1942
1943
    /*
1944
     * Invalidate the parent's relcache so that the partition is no longer
1945
     * included in its partition descriptor.
1946
     */
1947
0
    CacheInvalidateRelcacheByRelid(parentOid);
1948
    /* keep the lock */
1949
0
  }
1950
0
}
1951
1952
1953
/*
1954
 * RelationClearMissing
1955
 *
1956
 * Set atthasmissing and attmissingval to false/null for all attributes
1957
 * where they are currently set. This can be safely and usefully done if
1958
 * the table is rewritten (e.g. by VACUUM FULL or CLUSTER) where we know there
1959
 * are no rows left with less than a full complement of attributes.
1960
 *
1961
 * The caller must have an AccessExclusive lock on the relation.
1962
 */
1963
void
1964
RelationClearMissing(Relation rel)
1965
0
{
1966
0
  Relation  attr_rel;
1967
0
  Oid     relid = RelationGetRelid(rel);
1968
0
  int     natts = RelationGetNumberOfAttributes(rel);
1969
0
  int     attnum;
1970
0
  Datum   repl_val[Natts_pg_attribute];
1971
0
  bool    repl_null[Natts_pg_attribute];
1972
0
  bool    repl_repl[Natts_pg_attribute];
1973
0
  Form_pg_attribute attrtuple;
1974
0
  HeapTuple tuple,
1975
0
        newtuple;
1976
1977
0
  memset(repl_val, 0, sizeof(repl_val));
1978
0
  memset(repl_null, false, sizeof(repl_null));
1979
0
  memset(repl_repl, false, sizeof(repl_repl));
1980
1981
0
  repl_val[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(false);
1982
0
  repl_null[Anum_pg_attribute_attmissingval - 1] = true;
1983
1984
0
  repl_repl[Anum_pg_attribute_atthasmissing - 1] = true;
1985
0
  repl_repl[Anum_pg_attribute_attmissingval - 1] = true;
1986
1987
1988
  /* Get a lock on pg_attribute */
1989
0
  attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1990
1991
  /* process each non-system attribute, including any dropped columns */
1992
0
  for (attnum = 1; attnum <= natts; attnum++)
1993
0
  {
1994
0
    tuple = SearchSysCache2(ATTNUM,
1995
0
                ObjectIdGetDatum(relid),
1996
0
                Int16GetDatum(attnum));
1997
0
    if (!HeapTupleIsValid(tuple))  /* shouldn't happen */
1998
0
      elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1999
0
         attnum, relid);
2000
2001
0
    attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
2002
2003
    /* ignore any where atthasmissing is not true */
2004
0
    if (attrtuple->atthasmissing)
2005
0
    {
2006
0
      newtuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
2007
0
                     repl_val, repl_null, repl_repl);
2008
2009
0
      CatalogTupleUpdate(attr_rel, &newtuple->t_self, newtuple);
2010
2011
0
      heap_freetuple(newtuple);
2012
0
    }
2013
2014
0
    ReleaseSysCache(tuple);
2015
0
  }
2016
2017
  /*
2018
   * Our update of the pg_attribute rows will force a relcache rebuild, so
2019
   * there's nothing else to do here.
2020
   */
2021
0
  table_close(attr_rel, RowExclusiveLock);
2022
0
}
2023
2024
/*
2025
 * StoreAttrMissingVal
2026
 *
2027
 * Set the missing value of a single attribute.
2028
 */
2029
void
2030
StoreAttrMissingVal(Relation rel, AttrNumber attnum, Datum missingval)
2031
0
{
2032
0
  Datum   valuesAtt[Natts_pg_attribute] = {0};
2033
0
  bool    nullsAtt[Natts_pg_attribute] = {0};
2034
0
  bool    replacesAtt[Natts_pg_attribute] = {0};
2035
0
  Relation  attrrel;
2036
0
  Form_pg_attribute attStruct;
2037
0
  HeapTuple atttup,
2038
0
        newtup;
2039
2040
  /* This is only supported for plain tables */
2041
0
  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
2042
2043
  /* Fetch the pg_attribute row */
2044
0
  attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2045
2046
0
  atttup = SearchSysCache2(ATTNUM,
2047
0
               ObjectIdGetDatum(RelationGetRelid(rel)),
2048
0
               Int16GetDatum(attnum));
2049
0
  if (!HeapTupleIsValid(atttup)) /* shouldn't happen */
2050
0
    elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2051
0
       attnum, RelationGetRelid(rel));
2052
0
  attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2053
2054
  /* Make a one-element array containing the value */
2055
0
  missingval = PointerGetDatum(construct_array(&missingval,
2056
0
                         1,
2057
0
                         attStruct->atttypid,
2058
0
                         attStruct->attlen,
2059
0
                         attStruct->attbyval,
2060
0
                         attStruct->attalign));
2061
2062
  /* Update the pg_attribute row */
2063
0
  valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2064
0
  replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2065
2066
0
  valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2067
0
  replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2068
2069
0
  newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2070
0
                 valuesAtt, nullsAtt, replacesAtt);
2071
0
  CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2072
2073
  /* clean up */
2074
0
  ReleaseSysCache(atttup);
2075
0
  table_close(attrrel, RowExclusiveLock);
2076
0
}
2077
2078
/*
2079
 * SetAttrMissing
2080
 *
2081
 * Set the missing value of a single attribute. This should only be used by
2082
 * binary upgrade. Takes an AccessExclusive lock on the relation owning the
2083
 * attribute.
2084
 */
2085
void
2086
SetAttrMissing(Oid relid, char *attname, char *value)
2087
0
{
2088
0
  Datum   valuesAtt[Natts_pg_attribute] = {0};
2089
0
  bool    nullsAtt[Natts_pg_attribute] = {0};
2090
0
  bool    replacesAtt[Natts_pg_attribute] = {0};
2091
0
  Datum   missingval;
2092
0
  Form_pg_attribute attStruct;
2093
0
  Relation  attrrel,
2094
0
        tablerel;
2095
0
  HeapTuple atttup,
2096
0
        newtup;
2097
2098
  /* lock the table the attribute belongs to */
2099
0
  tablerel = table_open(relid, AccessExclusiveLock);
2100
2101
  /* Don't do anything unless it's a plain table */
2102
0
  if (tablerel->rd_rel->relkind != RELKIND_RELATION)
2103
0
  {
2104
0
    table_close(tablerel, AccessExclusiveLock);
2105
0
    return;
2106
0
  }
2107
2108
  /* Lock the attribute row and get the data */
2109
0
  attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2110
0
  atttup = SearchSysCacheAttName(relid, attname);
2111
0
  if (!HeapTupleIsValid(atttup))
2112
0
    elog(ERROR, "cache lookup failed for attribute %s of relation %u",
2113
0
       attname, relid);
2114
0
  attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2115
2116
  /* get an array value from the value string */
2117
0
  missingval = OidFunctionCall3(F_ARRAY_IN,
2118
0
                  CStringGetDatum(value),
2119
0
                  ObjectIdGetDatum(attStruct->atttypid),
2120
0
                  Int32GetDatum(attStruct->atttypmod));
2121
2122
  /* update the tuple - set atthasmissing and attmissingval */
2123
0
  valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2124
0
  replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2125
0
  valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2126
0
  replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2127
2128
0
  newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2129
0
                 valuesAtt, nullsAtt, replacesAtt);
2130
0
  CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2131
2132
  /* clean up */
2133
0
  ReleaseSysCache(atttup);
2134
0
  table_close(attrrel, RowExclusiveLock);
2135
0
  table_close(tablerel, AccessExclusiveLock);
2136
0
}
2137
2138
/*
2139
 * Store a check-constraint expression for the given relation.
2140
 *
2141
 * Caller is responsible for updating the count of constraints
2142
 * in the pg_class entry for the relation.
2143
 *
2144
 * The OID of the new constraint is returned.
2145
 */
2146
static Oid
2147
StoreRelCheck(Relation rel, const char *ccname, Node *expr,
2148
        bool is_enforced, bool is_validated, bool is_local,
2149
        int16 inhcount, bool is_no_inherit, bool is_internal)
2150
0
{
2151
0
  char     *ccbin;
2152
0
  List     *varList;
2153
0
  int     keycount;
2154
0
  int16    *attNos;
2155
0
  Oid     constrOid;
2156
2157
  /*
2158
   * Flatten expression to string form for storage.
2159
   */
2160
0
  ccbin = nodeToString(expr);
2161
2162
  /*
2163
   * Find columns of rel that are used in expr
2164
   *
2165
   * NB: pull_var_clause is okay here only because we don't allow subselects
2166
   * in check constraints; it would fail to examine the contents of
2167
   * subselects.
2168
   */
2169
0
  varList = pull_var_clause(expr, 0);
2170
0
  keycount = list_length(varList);
2171
2172
0
  if (keycount > 0)
2173
0
  {
2174
0
    ListCell   *vl;
2175
0
    int     i = 0;
2176
2177
0
    attNos = (int16 *) palloc(keycount * sizeof(int16));
2178
0
    foreach(vl, varList)
2179
0
    {
2180
0
      Var      *var = (Var *) lfirst(vl);
2181
0
      int     j;
2182
2183
0
      for (j = 0; j < i; j++)
2184
0
        if (attNos[j] == var->varattno)
2185
0
          break;
2186
0
      if (j == i)
2187
0
        attNos[i++] = var->varattno;
2188
0
    }
2189
0
    keycount = i;
2190
0
  }
2191
0
  else
2192
0
    attNos = NULL;
2193
2194
  /*
2195
   * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2196
   * constraint makes no sense.
2197
   */
2198
0
  if (is_no_inherit &&
2199
0
    rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2200
0
    ereport(ERROR,
2201
0
        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2202
0
         errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2203
0
            RelationGetRelationName(rel))));
2204
2205
  /*
2206
   * Create the Check Constraint
2207
   */
2208
0
  constrOid =
2209
0
    CreateConstraintEntry(ccname, /* Constraint Name */
2210
0
                RelationGetNamespace(rel),  /* namespace */
2211
0
                CONSTRAINT_CHECK, /* Constraint Type */
2212
0
                false,  /* Is Deferrable */
2213
0
                false,  /* Is Deferred */
2214
0
                is_enforced,  /* Is Enforced */
2215
0
                is_validated,
2216
0
                InvalidOid, /* no parent constraint */
2217
0
                RelationGetRelid(rel),  /* relation */
2218
0
                attNos, /* attrs in the constraint */
2219
0
                keycount, /* # key attrs in the constraint */
2220
0
                keycount, /* # total attrs in the constraint */
2221
0
                InvalidOid, /* not a domain constraint */
2222
0
                InvalidOid, /* no associated index */
2223
0
                InvalidOid, /* Foreign key fields */
2224
0
                NULL,
2225
0
                NULL,
2226
0
                NULL,
2227
0
                NULL,
2228
0
                0,
2229
0
                ' ',
2230
0
                ' ',
2231
0
                NULL,
2232
0
                0,
2233
0
                ' ',
2234
0
                NULL, /* not an exclusion constraint */
2235
0
                expr, /* Tree form of check constraint */
2236
0
                ccbin,  /* Binary form of check constraint */
2237
0
                is_local, /* conislocal */
2238
0
                inhcount, /* coninhcount */
2239
0
                is_no_inherit,  /* connoinherit */
2240
0
                false,  /* conperiod */
2241
0
                is_internal); /* internally constructed? */
2242
2243
0
  pfree(ccbin);
2244
2245
0
  return constrOid;
2246
0
}
2247
2248
/*
2249
 * Store a not-null constraint for the given relation
2250
 *
2251
 * The OID of the new constraint is returned.
2252
 */
2253
static Oid
2254
StoreRelNotNull(Relation rel, const char *nnname, AttrNumber attnum,
2255
        bool is_validated, bool is_local, int inhcount,
2256
        bool is_no_inherit)
2257
0
{
2258
0
  Oid     constrOid;
2259
2260
0
  Assert(attnum > InvalidAttrNumber);
2261
2262
0
  constrOid =
2263
0
    CreateConstraintEntry(nnname,
2264
0
                RelationGetNamespace(rel),
2265
0
                CONSTRAINT_NOTNULL,
2266
0
                false,
2267
0
                false,
2268
0
                true, /* Is Enforced */
2269
0
                is_validated,
2270
0
                InvalidOid,
2271
0
                RelationGetRelid(rel),
2272
0
                &attnum,
2273
0
                1,
2274
0
                1,
2275
0
                InvalidOid, /* not a domain constraint */
2276
0
                InvalidOid, /* no associated index */
2277
0
                InvalidOid, /* Foreign key fields */
2278
0
                NULL,
2279
0
                NULL,
2280
0
                NULL,
2281
0
                NULL,
2282
0
                0,
2283
0
                ' ',
2284
0
                ' ',
2285
0
                NULL,
2286
0
                0,
2287
0
                ' ',
2288
0
                NULL, /* not an exclusion constraint */
2289
0
                NULL,
2290
0
                NULL,
2291
0
                is_local,
2292
0
                inhcount,
2293
0
                is_no_inherit,
2294
0
                false,
2295
0
                false);
2296
0
  return constrOid;
2297
0
}
2298
2299
/*
2300
 * Store defaults and CHECK constraints (passed as a list of CookedConstraint).
2301
 *
2302
 * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2303
 *
2304
 * NOTE: only pre-cooked expressions will be passed this way, which is to
2305
 * say expressions inherited from an existing relation.  Newly parsed
2306
 * expressions can be added later, by direct calls to StoreAttrDefault
2307
 * and StoreRelCheck (see AddRelationNewConstraints()).
2308
 */
2309
static void
2310
StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2311
0
{
2312
0
  int     numchecks = 0;
2313
0
  ListCell   *lc;
2314
2315
0
  if (cooked_constraints == NIL)
2316
0
    return;         /* nothing to do */
2317
2318
  /*
2319
   * Deparsing of constraint expressions will fail unless the just-created
2320
   * pg_attribute tuples for this relation are made visible.  So, bump the
2321
   * command counter.  CAUTION: this will cause a relcache entry rebuild.
2322
   */
2323
0
  CommandCounterIncrement();
2324
2325
0
  foreach(lc, cooked_constraints)
2326
0
  {
2327
0
    CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2328
2329
0
    switch (con->contype)
2330
0
    {
2331
0
      case CONSTR_DEFAULT:
2332
0
        con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2333
0
                         is_internal);
2334
0
        break;
2335
0
      case CONSTR_CHECK:
2336
0
        con->conoid =
2337
0
          StoreRelCheck(rel, con->name, con->expr,
2338
0
                  con->is_enforced, !con->skip_validation,
2339
0
                  con->is_local, con->inhcount,
2340
0
                  con->is_no_inherit, is_internal);
2341
0
        numchecks++;
2342
0
        break;
2343
2344
0
      default:
2345
0
        elog(ERROR, "unrecognized constraint type: %d",
2346
0
           (int) con->contype);
2347
0
    }
2348
0
  }
2349
2350
0
  if (numchecks > 0)
2351
0
    SetRelationNumChecks(rel, numchecks);
2352
0
}
2353
2354
/*
2355
 * AddRelationNewConstraints
2356
 *
2357
 * Add new column default expressions and/or constraint check expressions
2358
 * to an existing relation.  This is defined to do both for efficiency in
2359
 * DefineRelation, but of course you can do just one or the other by passing
2360
 * empty lists.
2361
 *
2362
 * rel: relation to be modified
2363
 * newColDefaults: list of RawColumnDefault structures
2364
 * newConstraints: list of Constraint nodes
2365
 * allow_merge: true if check constraints may be merged with existing ones
2366
 * is_local: true if definition is local, false if it's inherited
2367
 * is_internal: true if result of some internal process, not a user request
2368
 * queryString: used during expression transformation of default values and
2369
 *    cooked CHECK constraints
2370
 *
2371
 * All entries in newColDefaults will be processed.  Entries in newConstraints
2372
 * will be processed only if they are CONSTR_CHECK or CONSTR_NOTNULL types.
2373
 *
2374
 * Returns a list of CookedConstraint nodes that shows the cooked form of
2375
 * the default and constraint expressions added to the relation.
2376
 *
2377
 * NB: caller should have opened rel with some self-conflicting lock mode,
2378
 * and should hold that lock till end of transaction; for normal cases that'll
2379
 * be AccessExclusiveLock, but if caller knows that the constraint is already
2380
 * enforced by some other means, it can be ShareUpdateExclusiveLock.  Also, we
2381
 * assume the caller has done a CommandCounterIncrement if necessary to make
2382
 * the relation's catalog tuples visible.
2383
 */
2384
List *
2385
AddRelationNewConstraints(Relation rel,
2386
              List *newColDefaults,
2387
              List *newConstraints,
2388
              bool allow_merge,
2389
              bool is_local,
2390
              bool is_internal,
2391
              const char *queryString)
2392
0
{
2393
0
  List     *cookedConstraints = NIL;
2394
0
  TupleDesc tupleDesc;
2395
0
  TupleConstr *oldconstr;
2396
0
  int     numoldchecks;
2397
0
  ParseState *pstate;
2398
0
  ParseNamespaceItem *nsitem;
2399
0
  int     numchecks;
2400
0
  List     *checknames;
2401
0
  List     *nnnames;
2402
0
  Node     *expr;
2403
0
  CookedConstraint *cooked;
2404
2405
  /*
2406
   * Get info about existing constraints.
2407
   */
2408
0
  tupleDesc = RelationGetDescr(rel);
2409
0
  oldconstr = tupleDesc->constr;
2410
0
  if (oldconstr)
2411
0
    numoldchecks = oldconstr->num_check;
2412
0
  else
2413
0
    numoldchecks = 0;
2414
2415
  /*
2416
   * Create a dummy ParseState and insert the target relation as its sole
2417
   * rangetable entry.  We need a ParseState for transformExpr.
2418
   */
2419
0
  pstate = make_parsestate(NULL);
2420
0
  pstate->p_sourcetext = queryString;
2421
0
  nsitem = addRangeTableEntryForRelation(pstate,
2422
0
                       rel,
2423
0
                       AccessShareLock,
2424
0
                       NULL,
2425
0
                       false,
2426
0
                       true);
2427
0
  addNSItemToQuery(pstate, nsitem, true, true, true);
2428
2429
  /*
2430
   * Process column default expressions.
2431
   */
2432
0
  foreach_ptr(RawColumnDefault, colDef, newColDefaults)
2433
0
  {
2434
0
    Form_pg_attribute atp = TupleDescAttr(rel->rd_att, colDef->attnum - 1);
2435
0
    Oid     defOid;
2436
2437
0
    expr = cookDefault(pstate, colDef->raw_default,
2438
0
               atp->atttypid, atp->atttypmod,
2439
0
               NameStr(atp->attname),
2440
0
               atp->attgenerated);
2441
2442
    /*
2443
     * If the expression is just a NULL constant, we do not bother to make
2444
     * an explicit pg_attrdef entry, since the default behavior is
2445
     * equivalent.  This applies to column defaults, but not for
2446
     * generation expressions.
2447
     *
2448
     * Note a nonobvious property of this test: if the column is of a
2449
     * domain type, what we'll get is not a bare null Const but a
2450
     * CoerceToDomain expr, so we will not discard the default.  This is
2451
     * critical because the column default needs to be retained to
2452
     * override any default that the domain might have.
2453
     */
2454
0
    if (expr == NULL ||
2455
0
      (!colDef->generated &&
2456
0
       IsA(expr, Const) &&
2457
0
       castNode(Const, expr)->constisnull))
2458
0
      continue;
2459
2460
0
    defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal);
2461
2462
0
    cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2463
0
    cooked->contype = CONSTR_DEFAULT;
2464
0
    cooked->conoid = defOid;
2465
0
    cooked->name = NULL;
2466
0
    cooked->attnum = colDef->attnum;
2467
0
    cooked->expr = expr;
2468
0
    cooked->is_enforced = true;
2469
0
    cooked->skip_validation = false;
2470
0
    cooked->is_local = is_local;
2471
0
    cooked->inhcount = is_local ? 0 : 1;
2472
0
    cooked->is_no_inherit = false;
2473
0
    cookedConstraints = lappend(cookedConstraints, cooked);
2474
0
  }
2475
2476
  /*
2477
   * Process constraint expressions.
2478
   */
2479
0
  numchecks = numoldchecks;
2480
0
  checknames = NIL;
2481
0
  nnnames = NIL;
2482
0
  foreach_node(Constraint, cdef, newConstraints)
2483
0
  {
2484
0
    Oid     constrOid;
2485
2486
0
    if (cdef->contype == CONSTR_CHECK)
2487
0
    {
2488
0
      char     *ccname;
2489
2490
0
      if (cdef->raw_expr != NULL)
2491
0
      {
2492
0
        Assert(cdef->cooked_expr == NULL);
2493
2494
        /*
2495
         * Transform raw parsetree to executable expression, and
2496
         * verify it's valid as a CHECK constraint.
2497
         */
2498
0
        expr = cookConstraint(pstate, cdef->raw_expr,
2499
0
                    RelationGetRelationName(rel));
2500
0
      }
2501
0
      else
2502
0
      {
2503
0
        Assert(cdef->cooked_expr != NULL);
2504
2505
        /*
2506
         * Here, we assume the parser will only pass us valid CHECK
2507
         * expressions, so we do no particular checking.
2508
         */
2509
0
        expr = stringToNode(cdef->cooked_expr);
2510
0
      }
2511
2512
      /*
2513
       * Check name uniqueness, or generate a name if none was given.
2514
       */
2515
0
      if (cdef->conname != NULL)
2516
0
      {
2517
0
        ccname = cdef->conname;
2518
        /* Check against other new constraints */
2519
        /* Needed because we don't do CommandCounterIncrement in loop */
2520
0
        foreach_ptr(char, chkname, checknames)
2521
0
        {
2522
0
          if (strcmp(chkname, ccname) == 0)
2523
0
            ereport(ERROR,
2524
0
                (errcode(ERRCODE_DUPLICATE_OBJECT),
2525
0
                 errmsg("check constraint \"%s\" already exists",
2526
0
                    ccname)));
2527
0
        }
2528
2529
        /* save name for future checks */
2530
0
        checknames = lappend(checknames, ccname);
2531
2532
        /*
2533
         * Check against pre-existing constraints.  If we are allowed
2534
         * to merge with an existing constraint, there's no more to do
2535
         * here. (We omit the duplicate constraint from the result,
2536
         * which is what ATAddCheckNNConstraint wants.)
2537
         */
2538
0
        if (MergeWithExistingConstraint(rel, ccname, expr,
2539
0
                        allow_merge, is_local,
2540
0
                        cdef->is_enforced,
2541
0
                        cdef->initially_valid,
2542
0
                        cdef->is_no_inherit))
2543
0
          continue;
2544
0
      }
2545
0
      else
2546
0
      {
2547
        /*
2548
         * When generating a name, we want to create "tab_col_check"
2549
         * for a column constraint and "tab_check" for a table
2550
         * constraint.  We no longer have any info about the syntactic
2551
         * positioning of the constraint phrase, so we approximate
2552
         * this by seeing whether the expression references more than
2553
         * one column.  (If the user played by the rules, the result
2554
         * is the same...)
2555
         *
2556
         * Note: pull_var_clause() doesn't descend into sublinks, but
2557
         * we eliminated those above; and anyway this only needs to be
2558
         * an approximate answer.
2559
         */
2560
0
        List     *vars;
2561
0
        char     *colname;
2562
2563
0
        vars = pull_var_clause(expr, 0);
2564
2565
        /* eliminate duplicates */
2566
0
        vars = list_union(NIL, vars);
2567
2568
0
        if (list_length(vars) == 1)
2569
0
          colname = get_attname(RelationGetRelid(rel),
2570
0
                      ((Var *) linitial(vars))->varattno,
2571
0
                      true);
2572
0
        else
2573
0
          colname = NULL;
2574
2575
0
        ccname = ChooseConstraintName(RelationGetRelationName(rel),
2576
0
                        colname,
2577
0
                        "check",
2578
0
                        RelationGetNamespace(rel),
2579
0
                        checknames);
2580
2581
        /* save name for future checks */
2582
0
        checknames = lappend(checknames, ccname);
2583
0
      }
2584
2585
      /*
2586
       * OK, store it.
2587
       */
2588
0
      constrOid =
2589
0
        StoreRelCheck(rel, ccname, expr, cdef->is_enforced,
2590
0
                cdef->initially_valid, is_local,
2591
0
                is_local ? 0 : 1, cdef->is_no_inherit,
2592
0
                is_internal);
2593
2594
0
      numchecks++;
2595
2596
0
      cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2597
0
      cooked->contype = CONSTR_CHECK;
2598
0
      cooked->conoid = constrOid;
2599
0
      cooked->name = ccname;
2600
0
      cooked->attnum = 0;
2601
0
      cooked->expr = expr;
2602
0
      cooked->is_enforced = cdef->is_enforced;
2603
0
      cooked->skip_validation = cdef->skip_validation;
2604
0
      cooked->is_local = is_local;
2605
0
      cooked->inhcount = is_local ? 0 : 1;
2606
0
      cooked->is_no_inherit = cdef->is_no_inherit;
2607
0
      cookedConstraints = lappend(cookedConstraints, cooked);
2608
0
    }
2609
0
    else if (cdef->contype == CONSTR_NOTNULL)
2610
0
    {
2611
0
      CookedConstraint *nncooked;
2612
0
      AttrNumber  colnum;
2613
0
      int16   inhcount = is_local ? 0 : 1;
2614
0
      char     *nnname;
2615
2616
      /* Determine which column to modify */
2617
0
      colnum = get_attnum(RelationGetRelid(rel), strVal(linitial(cdef->keys)));
2618
0
      if (colnum == InvalidAttrNumber)
2619
0
        ereport(ERROR,
2620
0
            errcode(ERRCODE_UNDEFINED_COLUMN),
2621
0
            errmsg("column \"%s\" of relation \"%s\" does not exist",
2622
0
                 strVal(linitial(cdef->keys)), RelationGetRelationName(rel)));
2623
0
      if (colnum < InvalidAttrNumber)
2624
0
        ereport(ERROR,
2625
0
            errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2626
0
            errmsg("cannot add not-null constraint on system column \"%s\"",
2627
0
                 strVal(linitial(cdef->keys))));
2628
2629
0
      Assert(cdef->initially_valid != cdef->skip_validation);
2630
2631
      /*
2632
       * If the column already has a not-null constraint, we don't want
2633
       * to add another one; adjust inheritance status as needed.  This
2634
       * also checks whether the existing constraint matches the
2635
       * requested validity.
2636
       */
2637
0
      if (AdjustNotNullInheritance(RelationGetRelid(rel), colnum,
2638
0
                     is_local, cdef->is_no_inherit,
2639
0
                     cdef->skip_validation))
2640
0
        continue;
2641
2642
      /*
2643
       * If a constraint name is specified, check that it isn't already
2644
       * used.  Otherwise, choose a non-conflicting one ourselves.
2645
       */
2646
0
      if (cdef->conname)
2647
0
      {
2648
0
        if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
2649
0
                     RelationGetRelid(rel),
2650
0
                     cdef->conname))
2651
0
          ereport(ERROR,
2652
0
              errcode(ERRCODE_DUPLICATE_OBJECT),
2653
0
              errmsg("constraint \"%s\" for relation \"%s\" already exists",
2654
0
                   cdef->conname, RelationGetRelationName(rel)));
2655
0
        nnname = cdef->conname;
2656
0
      }
2657
0
      else
2658
0
        nnname = ChooseConstraintName(RelationGetRelationName(rel),
2659
0
                        strVal(linitial(cdef->keys)),
2660
0
                        "not_null",
2661
0
                        RelationGetNamespace(rel),
2662
0
                        nnnames);
2663
0
      nnnames = lappend(nnnames, nnname);
2664
2665
0
      constrOid =
2666
0
        StoreRelNotNull(rel, nnname, colnum,
2667
0
                cdef->initially_valid,
2668
0
                is_local,
2669
0
                inhcount,
2670
0
                cdef->is_no_inherit);
2671
2672
0
      nncooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2673
0
      nncooked->contype = CONSTR_NOTNULL;
2674
0
      nncooked->conoid = constrOid;
2675
0
      nncooked->name = nnname;
2676
0
      nncooked->attnum = colnum;
2677
0
      nncooked->expr = NULL;
2678
0
      nncooked->is_enforced = true;
2679
0
      nncooked->skip_validation = cdef->skip_validation;
2680
0
      nncooked->is_local = is_local;
2681
0
      nncooked->inhcount = inhcount;
2682
0
      nncooked->is_no_inherit = cdef->is_no_inherit;
2683
2684
0
      cookedConstraints = lappend(cookedConstraints, nncooked);
2685
0
    }
2686
0
  }
2687
2688
  /*
2689
   * Update the count of constraints in the relation's pg_class tuple. We do
2690
   * this even if there was no change, in order to ensure that an SI update
2691
   * message is sent out for the pg_class tuple, which will force other
2692
   * backends to rebuild their relcache entries for the rel. (This is
2693
   * critical if we added defaults but not constraints.)
2694
   */
2695
0
  SetRelationNumChecks(rel, numchecks);
2696
2697
0
  return cookedConstraints;
2698
0
}
2699
2700
/*
2701
 * Check for a pre-existing check constraint that conflicts with a proposed
2702
 * new one, and either adjust its conislocal/coninhcount settings or throw
2703
 * error as needed.
2704
 *
2705
 * Returns true if merged (constraint is a duplicate), or false if it's
2706
 * got a so-far-unique name, or throws error if conflict.
2707
 *
2708
 * XXX See MergeConstraintsIntoExisting too if you change this code.
2709
 */
2710
static bool
2711
MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
2712
              bool allow_merge, bool is_local,
2713
              bool is_enforced,
2714
              bool is_initially_valid,
2715
              bool is_no_inherit)
2716
0
{
2717
0
  bool    found;
2718
0
  Relation  conDesc;
2719
0
  SysScanDesc conscan;
2720
0
  ScanKeyData skey[3];
2721
0
  HeapTuple tup;
2722
2723
  /* Search for a pg_constraint entry with same name and relation */
2724
0
  conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
2725
2726
0
  found = false;
2727
2728
0
  ScanKeyInit(&skey[0],
2729
0
        Anum_pg_constraint_conrelid,
2730
0
        BTEqualStrategyNumber, F_OIDEQ,
2731
0
        ObjectIdGetDatum(RelationGetRelid(rel)));
2732
0
  ScanKeyInit(&skey[1],
2733
0
        Anum_pg_constraint_contypid,
2734
0
        BTEqualStrategyNumber, F_OIDEQ,
2735
0
        ObjectIdGetDatum(InvalidOid));
2736
0
  ScanKeyInit(&skey[2],
2737
0
        Anum_pg_constraint_conname,
2738
0
        BTEqualStrategyNumber, F_NAMEEQ,
2739
0
        CStringGetDatum(ccname));
2740
2741
0
  conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId, true,
2742
0
                 NULL, 3, skey);
2743
2744
  /* There can be at most one matching row */
2745
0
  if (HeapTupleIsValid(tup = systable_getnext(conscan)))
2746
0
  {
2747
0
    Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2748
2749
    /* Found it.  Conflicts if not identical check constraint */
2750
0
    if (con->contype == CONSTRAINT_CHECK)
2751
0
    {
2752
0
      Datum   val;
2753
0
      bool    isnull;
2754
2755
0
      val = fastgetattr(tup,
2756
0
                Anum_pg_constraint_conbin,
2757
0
                conDesc->rd_att, &isnull);
2758
0
      if (isnull)
2759
0
        elog(ERROR, "null conbin for rel %s",
2760
0
           RelationGetRelationName(rel));
2761
0
      if (equal(expr, stringToNode(TextDatumGetCString(val))))
2762
0
        found = true;
2763
0
    }
2764
2765
    /*
2766
     * If the existing constraint is purely inherited (no local
2767
     * definition) then interpret addition of a local constraint as a
2768
     * legal merge.  This allows ALTER ADD CONSTRAINT on parent and child
2769
     * tables to be given in either order with same end state.  However if
2770
     * the relation is a partition, all inherited constraints are always
2771
     * non-local, including those that were merged.
2772
     */
2773
0
    if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2774
0
      allow_merge = true;
2775
2776
0
    if (!found || !allow_merge)
2777
0
      ereport(ERROR,
2778
0
          (errcode(ERRCODE_DUPLICATE_OBJECT),
2779
0
           errmsg("constraint \"%s\" for relation \"%s\" already exists",
2780
0
              ccname, RelationGetRelationName(rel))));
2781
2782
    /* If the child constraint is "no inherit" then cannot merge */
2783
0
    if (con->connoinherit)
2784
0
      ereport(ERROR,
2785
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2786
0
           errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2787
0
              ccname, RelationGetRelationName(rel))));
2788
2789
    /*
2790
     * Must not change an existing inherited constraint to "no inherit"
2791
     * status.  That's because inherited constraints should be able to
2792
     * propagate to lower-level children.
2793
     */
2794
0
    if (con->coninhcount > 0 && is_no_inherit)
2795
0
      ereport(ERROR,
2796
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2797
0
           errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2798
0
              ccname, RelationGetRelationName(rel))));
2799
2800
    /*
2801
     * If the child constraint is "not valid" then cannot merge with a
2802
     * valid parent constraint.
2803
     */
2804
0
    if (is_initially_valid && con->conenforced && !con->convalidated)
2805
0
      ereport(ERROR,
2806
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2807
0
           errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2808
0
              ccname, RelationGetRelationName(rel))));
2809
2810
    /*
2811
     * A non-enforced child constraint cannot be merged with an enforced
2812
     * parent constraint. However, the reverse is allowed, where the child
2813
     * constraint is enforced.
2814
     */
2815
0
    if ((!is_local && is_enforced && !con->conenforced) ||
2816
0
      (is_local && !is_enforced && con->conenforced))
2817
0
      ereport(ERROR,
2818
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2819
0
           errmsg("constraint \"%s\" conflicts with NOT ENFORCED constraint on relation \"%s\"",
2820
0
              ccname, RelationGetRelationName(rel))));
2821
2822
    /* OK to update the tuple */
2823
0
    ereport(NOTICE,
2824
0
        (errmsg("merging constraint \"%s\" with inherited definition",
2825
0
            ccname)));
2826
2827
0
    tup = heap_copytuple(tup);
2828
0
    con = (Form_pg_constraint) GETSTRUCT(tup);
2829
2830
    /*
2831
     * In case of partitions, an inherited constraint must be inherited
2832
     * only once since it cannot have multiple parents and it is never
2833
     * considered local.
2834
     */
2835
0
    if (rel->rd_rel->relispartition)
2836
0
    {
2837
0
      con->coninhcount = 1;
2838
0
      con->conislocal = false;
2839
0
    }
2840
0
    else
2841
0
    {
2842
0
      if (is_local)
2843
0
        con->conislocal = true;
2844
0
      else if (pg_add_s16_overflow(con->coninhcount, 1,
2845
0
                     &con->coninhcount))
2846
0
        ereport(ERROR,
2847
0
            errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2848
0
            errmsg("too many inheritance parents"));
2849
0
    }
2850
2851
0
    if (is_no_inherit)
2852
0
    {
2853
0
      Assert(is_local);
2854
0
      con->connoinherit = true;
2855
0
    }
2856
2857
    /*
2858
     * If the child constraint is required to be enforced while the parent
2859
     * constraint is not, this should be allowed by marking the child
2860
     * constraint as enforced. In the reverse case, an error would have
2861
     * already been thrown before reaching this point.
2862
     */
2863
0
    if (is_enforced && !con->conenforced)
2864
0
    {
2865
0
      Assert(is_local);
2866
0
      con->conenforced = true;
2867
0
      con->convalidated = true;
2868
0
    }
2869
2870
0
    CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2871
0
  }
2872
2873
0
  systable_endscan(conscan);
2874
0
  table_close(conDesc, RowExclusiveLock);
2875
2876
0
  return found;
2877
0
}
2878
2879
/*
2880
 * Create the not-null constraints when creating a new relation
2881
 *
2882
 * These come from two sources: the 'constraints' list (of Constraint) is
2883
 * specified directly by the user; the 'old_notnulls' list (of
2884
 * CookedConstraint) comes from inheritance.  We create one constraint
2885
 * for each column, giving priority to user-specified ones, and setting
2886
 * inhcount according to how many parents cause each column to get a
2887
 * not-null constraint.  If a user-specified name clashes with another
2888
 * user-specified name, an error is raised.
2889
 *
2890
 * Returns a list of AttrNumber for columns that need to have the attnotnull
2891
 * flag set.
2892
 */
2893
List *
2894
AddRelationNotNullConstraints(Relation rel, List *constraints,
2895
                List *old_notnulls)
2896
0
{
2897
0
  List     *givennames;
2898
0
  List     *nnnames;
2899
0
  List     *nncols = NIL;
2900
2901
  /*
2902
   * We track two lists of names: nnnames keeps all the constraint names,
2903
   * givennames tracks user-generated names.  The distinction is important,
2904
   * because we must raise error for user-generated name conflicts, but for
2905
   * system-generated name conflicts we just generate another.
2906
   */
2907
0
  nnnames = NIL;
2908
0
  givennames = NIL;
2909
2910
  /*
2911
   * First, create all not-null constraints that are directly specified by
2912
   * the user.  Note that inheritance might have given us another source for
2913
   * each, so we must scan the old_notnulls list and increment inhcount for
2914
   * each element with identical attnum.  We delete from there any element
2915
   * that we process.
2916
   *
2917
   * We don't use foreach() here because we have two nested loops over the
2918
   * constraint list, with possible element deletions in the inner one. If
2919
   * we used foreach_delete_current() it could only fix up the state of one
2920
   * of the loops, so it seems cleaner to use looping over list indexes for
2921
   * both loops.  Note that any deletion will happen beyond where the outer
2922
   * loop is, so its index never needs adjustment.
2923
   */
2924
0
  for (int outerpos = 0; outerpos < list_length(constraints); outerpos++)
2925
0
  {
2926
0
    Constraint *constr;
2927
0
    AttrNumber  attnum;
2928
0
    char     *conname;
2929
0
    int     inhcount = 0;
2930
2931
0
    constr = list_nth_node(Constraint, constraints, outerpos);
2932
2933
0
    Assert(constr->contype == CONSTR_NOTNULL);
2934
2935
0
    attnum = get_attnum(RelationGetRelid(rel),
2936
0
              strVal(linitial(constr->keys)));
2937
0
    if (attnum == InvalidAttrNumber)
2938
0
      ereport(ERROR,
2939
0
          errcode(ERRCODE_UNDEFINED_COLUMN),
2940
0
          errmsg("column \"%s\" of relation \"%s\" does not exist",
2941
0
               strVal(linitial(constr->keys)),
2942
0
               RelationGetRelationName(rel)));
2943
0
    if (attnum < InvalidAttrNumber)
2944
0
      ereport(ERROR,
2945
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2946
0
          errmsg("cannot add not-null constraint on system column \"%s\"",
2947
0
               strVal(linitial(constr->keys))));
2948
2949
    /*
2950
     * A column can only have one not-null constraint, so discard any
2951
     * additional ones that appear for columns we already saw; but check
2952
     * that the NO INHERIT flags match.
2953
     */
2954
0
    for (int restpos = outerpos + 1; restpos < list_length(constraints);)
2955
0
    {
2956
0
      Constraint *other;
2957
2958
0
      other = list_nth_node(Constraint, constraints, restpos);
2959
0
      if (strcmp(strVal(linitial(constr->keys)),
2960
0
             strVal(linitial(other->keys))) == 0)
2961
0
      {
2962
0
        if (other->is_no_inherit != constr->is_no_inherit)
2963
0
          ereport(ERROR,
2964
0
              errcode(ERRCODE_SYNTAX_ERROR),
2965
0
              errmsg("conflicting NO INHERIT declaration for not-null constraint on column \"%s\"",
2966
0
                   strVal(linitial(constr->keys))));
2967
2968
        /*
2969
         * Preserve constraint name if one is specified, but raise an
2970
         * error if conflicting ones are specified.
2971
         */
2972
0
        if (other->conname)
2973
0
        {
2974
0
          if (!constr->conname)
2975
0
            constr->conname = pstrdup(other->conname);
2976
0
          else if (strcmp(constr->conname, other->conname) != 0)
2977
0
            ereport(ERROR,
2978
0
                errcode(ERRCODE_SYNTAX_ERROR),
2979
0
                errmsg("conflicting not-null constraint names \"%s\" and \"%s\"",
2980
0
                     constr->conname, other->conname));
2981
0
        }
2982
2983
        /* XXX do we need to verify any other fields? */
2984
0
        constraints = list_delete_nth_cell(constraints, restpos);
2985
0
      }
2986
0
      else
2987
0
        restpos++;
2988
0
    }
2989
2990
    /*
2991
     * Search in the list of inherited constraints for any entries on the
2992
     * same column; determine an inheritance count from that.  Also, if at
2993
     * least one parent has a constraint for this column, then we must not
2994
     * accept a user specification for a NO INHERIT one.  Any constraint
2995
     * from parents that we process here is deleted from the list: we no
2996
     * longer need to process it in the loop below.
2997
     */
2998
0
    foreach_ptr(CookedConstraint, old, old_notnulls)
2999
0
    {
3000
0
      if (old->attnum == attnum)
3001
0
      {
3002
        /*
3003
         * If we get a constraint from the parent, having a local NO
3004
         * INHERIT one doesn't work.
3005
         */
3006
0
        if (constr->is_no_inherit)
3007
0
          ereport(ERROR,
3008
0
              (errcode(ERRCODE_DATATYPE_MISMATCH),
3009
0
               errmsg("cannot define not-null constraint with NO INHERIT on column \"%s\"",
3010
0
                  strVal(linitial(constr->keys))),
3011
0
               errdetail("The column has an inherited not-null constraint.")));
3012
3013
0
        inhcount++;
3014
0
        old_notnulls = foreach_delete_current(old_notnulls, old);
3015
0
      }
3016
0
    }
3017
3018
    /*
3019
     * Determine a constraint name, which may have been specified by the
3020
     * user, or raise an error if a conflict exists with another
3021
     * user-specified name.
3022
     */
3023
0
    if (constr->conname)
3024
0
    {
3025
0
      foreach_ptr(char, thisname, givennames)
3026
0
      {
3027
0
        if (strcmp(thisname, constr->conname) == 0)
3028
0
          ereport(ERROR,
3029
0
              errcode(ERRCODE_DUPLICATE_OBJECT),
3030
0
              errmsg("constraint \"%s\" for relation \"%s\" already exists",
3031
0
                   constr->conname,
3032
0
                   RelationGetRelationName(rel)));
3033
0
      }
3034
3035
0
      conname = constr->conname;
3036
0
      givennames = lappend(givennames, conname);
3037
0
    }
3038
0
    else
3039
0
      conname = ChooseConstraintName(RelationGetRelationName(rel),
3040
0
                       get_attname(RelationGetRelid(rel),
3041
0
                             attnum, false),
3042
0
                       "not_null",
3043
0
                       RelationGetNamespace(rel),
3044
0
                       nnnames);
3045
0
    nnnames = lappend(nnnames, conname);
3046
3047
0
    StoreRelNotNull(rel, conname,
3048
0
            attnum, true, true,
3049
0
            inhcount, constr->is_no_inherit);
3050
3051
0
    nncols = lappend_int(nncols, attnum);
3052
0
  }
3053
3054
  /*
3055
   * If any column remains in the old_notnulls list, we must create a not-
3056
   * null constraint marked not-local for that column.  Because multiple
3057
   * parents could specify a not-null constraint for the same column, we
3058
   * must count how many there are and set an appropriate inhcount
3059
   * accordingly, deleting elements we've already processed.
3060
   *
3061
   * We don't use foreach() here because we have two nested loops over the
3062
   * constraint list, with possible element deletions in the inner one. If
3063
   * we used foreach_delete_current() it could only fix up the state of one
3064
   * of the loops, so it seems cleaner to use looping over list indexes for
3065
   * both loops.  Note that any deletion will happen beyond where the outer
3066
   * loop is, so its index never needs adjustment.
3067
   */
3068
0
  for (int outerpos = 0; outerpos < list_length(old_notnulls); outerpos++)
3069
0
  {
3070
0
    CookedConstraint *cooked;
3071
0
    char     *conname = NULL;
3072
0
    int     inhcount = 1;
3073
3074
0
    cooked = (CookedConstraint *) list_nth(old_notnulls, outerpos);
3075
0
    Assert(cooked->contype == CONSTR_NOTNULL);
3076
0
    Assert(cooked->name);
3077
3078
    /*
3079
     * Preserve the first non-conflicting constraint name we come across.
3080
     */
3081
0
    if (conname == NULL)
3082
0
      conname = cooked->name;
3083
3084
0
    for (int restpos = outerpos + 1; restpos < list_length(old_notnulls);)
3085
0
    {
3086
0
      CookedConstraint *other;
3087
3088
0
      other = (CookedConstraint *) list_nth(old_notnulls, restpos);
3089
0
      Assert(other->name);
3090
0
      if (other->attnum == cooked->attnum)
3091
0
      {
3092
0
        if (conname == NULL)
3093
0
          conname = other->name;
3094
3095
0
        inhcount++;
3096
0
        old_notnulls = list_delete_nth_cell(old_notnulls, restpos);
3097
0
      }
3098
0
      else
3099
0
        restpos++;
3100
0
    }
3101
3102
    /* If we got a name, make sure it isn't one we've already used */
3103
0
    if (conname != NULL)
3104
0
    {
3105
0
      foreach_ptr(char, thisname, nnnames)
3106
0
      {
3107
0
        if (strcmp(thisname, conname) == 0)
3108
0
        {
3109
0
          conname = NULL;
3110
0
          break;
3111
0
        }
3112
0
      }
3113
0
    }
3114
3115
    /* and choose a name, if needed */
3116
0
    if (conname == NULL)
3117
0
      conname = ChooseConstraintName(RelationGetRelationName(rel),
3118
0
                       get_attname(RelationGetRelid(rel),
3119
0
                             cooked->attnum, false),
3120
0
                       "not_null",
3121
0
                       RelationGetNamespace(rel),
3122
0
                       nnnames);
3123
0
    nnnames = lappend(nnnames, conname);
3124
3125
    /* ignore the origin constraint's is_local and inhcount */
3126
0
    StoreRelNotNull(rel, conname, cooked->attnum, true,
3127
0
            false, inhcount, false);
3128
3129
0
    nncols = lappend_int(nncols, cooked->attnum);
3130
0
  }
3131
3132
0
  return nncols;
3133
0
}
3134
3135
/*
3136
 * Update the count of constraints in the relation's pg_class tuple.
3137
 *
3138
 * Caller had better hold exclusive lock on the relation.
3139
 *
3140
 * An important side effect is that a SI update message will be sent out for
3141
 * the pg_class tuple, which will force other backends to rebuild their
3142
 * relcache entries for the rel.  Also, this backend will rebuild its
3143
 * own relcache entry at the next CommandCounterIncrement.
3144
 */
3145
static void
3146
SetRelationNumChecks(Relation rel, int numchecks)
3147
0
{
3148
0
  Relation  relrel;
3149
0
  HeapTuple reltup;
3150
0
  Form_pg_class relStruct;
3151
3152
0
  relrel = table_open(RelationRelationId, RowExclusiveLock);
3153
0
  reltup = SearchSysCacheCopy1(RELOID,
3154
0
                 ObjectIdGetDatum(RelationGetRelid(rel)));
3155
0
  if (!HeapTupleIsValid(reltup))
3156
0
    elog(ERROR, "cache lookup failed for relation %u",
3157
0
       RelationGetRelid(rel));
3158
0
  relStruct = (Form_pg_class) GETSTRUCT(reltup);
3159
3160
0
  if (relStruct->relchecks != numchecks)
3161
0
  {
3162
0
    relStruct->relchecks = numchecks;
3163
3164
0
    CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
3165
0
  }
3166
0
  else
3167
0
  {
3168
    /* Skip the disk update, but force relcache inval anyway */
3169
0
    CacheInvalidateRelcache(rel);
3170
0
  }
3171
3172
0
  heap_freetuple(reltup);
3173
0
  table_close(relrel, RowExclusiveLock);
3174
0
}
3175
3176
/*
3177
 * Check for references to generated columns
3178
 */
3179
static bool
3180
check_nested_generated_walker(Node *node, void *context)
3181
0
{
3182
0
  ParseState *pstate = context;
3183
3184
0
  if (node == NULL)
3185
0
    return false;
3186
0
  else if (IsA(node, Var))
3187
0
  {
3188
0
    Var      *var = (Var *) node;
3189
0
    Oid     relid;
3190
0
    AttrNumber  attnum;
3191
3192
0
    relid = rt_fetch(var->varno, pstate->p_rtable)->relid;
3193
0
    if (!OidIsValid(relid))
3194
0
      return false;   /* XXX shouldn't we raise an error? */
3195
3196
0
    attnum = var->varattno;
3197
3198
0
    if (attnum > 0 && get_attgenerated(relid, attnum))
3199
0
      ereport(ERROR,
3200
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3201
0
           errmsg("cannot use generated column \"%s\" in column generation expression",
3202
0
              get_attname(relid, attnum, false)),
3203
0
           errdetail("A generated column cannot reference another generated column."),
3204
0
           parser_errposition(pstate, var->location)));
3205
    /* A whole-row Var is necessarily self-referential, so forbid it */
3206
0
    if (attnum == 0)
3207
0
      ereport(ERROR,
3208
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3209
0
           errmsg("cannot use whole-row variable in column generation expression"),
3210
0
           errdetail("This would cause the generated column to depend on its own value."),
3211
0
           parser_errposition(pstate, var->location)));
3212
    /* System columns were already checked in the parser */
3213
3214
0
    return false;
3215
0
  }
3216
0
  else
3217
0
    return expression_tree_walker(node, check_nested_generated_walker,
3218
0
                    context);
3219
0
}
3220
3221
static void
3222
check_nested_generated(ParseState *pstate, Node *node)
3223
0
{
3224
0
  check_nested_generated_walker(node, pstate);
3225
0
}
3226
3227
/*
3228
 * Check security of virtual generated column expression.
3229
 *
3230
 * Just like selecting from a view is exploitable (CVE-2024-7348), selecting
3231
 * from a table with virtual generated columns is exploitable.  Users who are
3232
 * concerned about this can avoid selecting from views, but telling them to
3233
 * avoid selecting from tables is less practical.
3234
 *
3235
 * To address this, this restricts generation expressions for virtual
3236
 * generated columns are restricted to using built-in functions and types.  We
3237
 * assume that built-in functions and types cannot be exploited for this
3238
 * purpose.  Note the overall security also requires that all functions in use
3239
 * a immutable.  (For example, there are some built-in non-immutable functions
3240
 * that can run arbitrary SQL.)  The immutability is checked elsewhere, since
3241
 * that is a property that needs to hold independent of security
3242
 * considerations.
3243
 *
3244
 * In the future, this could be expanded by some new mechanism to declare
3245
 * other functions and types as safe or trusted for this purpose, but that is
3246
 * to be designed.
3247
 */
3248
3249
/*
3250
 * Callback for check_functions_in_node() that determines whether a function
3251
 * is user-defined.
3252
 */
3253
static bool
3254
contains_user_functions_checker(Oid func_id, void *context)
3255
0
{
3256
0
  return (func_id >= FirstUnpinnedObjectId);
3257
0
}
3258
3259
/*
3260
 * Checks for all the things we don't want in the generation expressions of
3261
 * virtual generated columns for security reasons.  Errors out if it finds
3262
 * one.
3263
 */
3264
static bool
3265
check_virtual_generated_security_walker(Node *node, void *context)
3266
0
{
3267
0
  ParseState *pstate = context;
3268
3269
0
  if (node == NULL)
3270
0
    return false;
3271
3272
0
  if (!IsA(node, List))
3273
0
  {
3274
0
    if (check_functions_in_node(node, contains_user_functions_checker, NULL))
3275
0
      ereport(ERROR,
3276
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3277
0
          errmsg("generation expression uses user-defined function"),
3278
0
          errdetail("Virtual generated columns that make use of user-defined functions are not yet supported."),
3279
0
          parser_errposition(pstate, exprLocation(node)));
3280
3281
    /*
3282
     * check_functions_in_node() doesn't check some node types (see
3283
     * comment there).  We handle CoerceToDomain and MinMaxExpr by
3284
     * checking for built-in types.  The other listed node types cannot
3285
     * call user-definable SQL-visible functions.
3286
     *
3287
     * We furthermore need this type check to handle built-in, immutable
3288
     * polymorphic functions such as array_eq().
3289
     */
3290
0
    if (exprType(node) >= FirstUnpinnedObjectId)
3291
0
      ereport(ERROR,
3292
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3293
0
          errmsg("generation expression uses user-defined type"),
3294
0
          errdetail("Virtual generated columns that make use of user-defined types are not yet supported."),
3295
0
          parser_errposition(pstate, exprLocation(node)));
3296
0
  }
3297
3298
0
  return expression_tree_walker(node, check_virtual_generated_security_walker, context);
3299
0
}
3300
3301
static void
3302
check_virtual_generated_security(ParseState *pstate, Node *node)
3303
0
{
3304
0
  check_virtual_generated_security_walker(node, pstate);
3305
0
}
3306
3307
/*
3308
 * Take a raw default and convert it to a cooked format ready for
3309
 * storage.
3310
 *
3311
 * Parse state should be set up to recognize any vars that might appear
3312
 * in the expression.  (Even though we plan to reject vars, it's more
3313
 * user-friendly to give the correct error message than "unknown var".)
3314
 *
3315
 * If atttypid is not InvalidOid, coerce the expression to the specified
3316
 * type (and typmod atttypmod).   attname is only needed in this case:
3317
 * it is used in the error message, if any.
3318
 */
3319
Node *
3320
cookDefault(ParseState *pstate,
3321
      Node *raw_default,
3322
      Oid atttypid,
3323
      int32 atttypmod,
3324
      const char *attname,
3325
      char attgenerated)
3326
0
{
3327
0
  Node     *expr;
3328
3329
0
  Assert(raw_default != NULL);
3330
3331
  /*
3332
   * Transform raw parsetree to executable expression.
3333
   */
3334
0
  expr = transformExpr(pstate, raw_default, attgenerated ? EXPR_KIND_GENERATED_COLUMN : EXPR_KIND_COLUMN_DEFAULT);
3335
3336
0
  if (attgenerated)
3337
0
  {
3338
    /* Disallow refs to other generated columns */
3339
0
    check_nested_generated(pstate, expr);
3340
3341
    /* Disallow mutable functions */
3342
0
    if (contain_mutable_functions_after_planning((Expr *) expr))
3343
0
      ereport(ERROR,
3344
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3345
0
           errmsg("generation expression is not immutable")));
3346
3347
    /* Check security of expressions for virtual generated column */
3348
0
    if (attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
3349
0
      check_virtual_generated_security(pstate, expr);
3350
0
  }
3351
0
  else
3352
0
  {
3353
    /*
3354
     * For a default expression, transformExpr() should have rejected
3355
     * column references.
3356
     */
3357
0
    Assert(!contain_var_clause(expr));
3358
0
  }
3359
3360
  /*
3361
   * Coerce the expression to the correct type and typmod, if given. This
3362
   * should match the parser's processing of non-defaulted expressions ---
3363
   * see transformAssignedExpr().
3364
   */
3365
0
  if (OidIsValid(atttypid))
3366
0
  {
3367
0
    Oid     type_id = exprType(expr);
3368
3369
0
    expr = coerce_to_target_type(pstate, expr, type_id,
3370
0
                   atttypid, atttypmod,
3371
0
                   COERCION_ASSIGNMENT,
3372
0
                   COERCE_IMPLICIT_CAST,
3373
0
                   -1);
3374
0
    if (expr == NULL)
3375
0
      ereport(ERROR,
3376
0
          (errcode(ERRCODE_DATATYPE_MISMATCH),
3377
0
           errmsg("column \"%s\" is of type %s"
3378
0
              " but default expression is of type %s",
3379
0
              attname,
3380
0
              format_type_be(atttypid),
3381
0
              format_type_be(type_id)),
3382
0
           errhint("You will need to rewrite or cast the expression.")));
3383
0
  }
3384
3385
  /*
3386
   * Finally, take care of collations in the finished expression.
3387
   */
3388
0
  assign_expr_collations(pstate, expr);
3389
3390
0
  return expr;
3391
0
}
3392
3393
/*
3394
 * Take a raw CHECK constraint expression and convert it to a cooked format
3395
 * ready for storage.
3396
 *
3397
 * Parse state must be set up to recognize any vars that might appear
3398
 * in the expression.
3399
 */
3400
static Node *
3401
cookConstraint(ParseState *pstate,
3402
         Node *raw_constraint,
3403
         char *relname)
3404
0
{
3405
0
  Node     *expr;
3406
3407
  /*
3408
   * Transform raw parsetree to executable expression.
3409
   */
3410
0
  expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
3411
3412
  /*
3413
   * Make sure it yields a boolean result.
3414
   */
3415
0
  expr = coerce_to_boolean(pstate, expr, "CHECK");
3416
3417
  /*
3418
   * Take care of collations.
3419
   */
3420
0
  assign_expr_collations(pstate, expr);
3421
3422
  /*
3423
   * Make sure no outside relations are referred to (this is probably dead
3424
   * code now that add_missing_from is history).
3425
   */
3426
0
  if (list_length(pstate->p_rtable) != 1)
3427
0
    ereport(ERROR,
3428
0
        (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
3429
0
         errmsg("only table \"%s\" can be referenced in check constraint",
3430
0
            relname)));
3431
3432
0
  return expr;
3433
0
}
3434
3435
/*
3436
 * CopyStatistics --- copy entries in pg_statistic from one rel to another
3437
 */
3438
void
3439
CopyStatistics(Oid fromrelid, Oid torelid)
3440
0
{
3441
0
  HeapTuple tup;
3442
0
  SysScanDesc scan;
3443
0
  ScanKeyData key[1];
3444
0
  Relation  statrel;
3445
0
  CatalogIndexState indstate = NULL;
3446
3447
0
  statrel = table_open(StatisticRelationId, RowExclusiveLock);
3448
3449
  /* Now search for stat records */
3450
0
  ScanKeyInit(&key[0],
3451
0
        Anum_pg_statistic_starelid,
3452
0
        BTEqualStrategyNumber, F_OIDEQ,
3453
0
        ObjectIdGetDatum(fromrelid));
3454
3455
0
  scan = systable_beginscan(statrel, StatisticRelidAttnumInhIndexId,
3456
0
                true, NULL, 1, key);
3457
3458
0
  while (HeapTupleIsValid((tup = systable_getnext(scan))))
3459
0
  {
3460
0
    Form_pg_statistic statform;
3461
3462
    /* make a modifiable copy */
3463
0
    tup = heap_copytuple(tup);
3464
0
    statform = (Form_pg_statistic) GETSTRUCT(tup);
3465
3466
    /* update the copy of the tuple and insert it */
3467
0
    statform->starelid = torelid;
3468
3469
    /* fetch index information when we know we need it */
3470
0
    if (indstate == NULL)
3471
0
      indstate = CatalogOpenIndexes(statrel);
3472
3473
0
    CatalogTupleInsertWithInfo(statrel, tup, indstate);
3474
3475
0
    heap_freetuple(tup);
3476
0
  }
3477
3478
0
  systable_endscan(scan);
3479
3480
0
  if (indstate != NULL)
3481
0
    CatalogCloseIndexes(indstate);
3482
0
  table_close(statrel, RowExclusiveLock);
3483
0
}
3484
3485
/*
3486
 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
3487
 *
3488
 * If attnum is zero, remove all entries for rel; else remove only the one(s)
3489
 * for that column.
3490
 */
3491
void
3492
RemoveStatistics(Oid relid, AttrNumber attnum)
3493
0
{
3494
0
  Relation  pgstatistic;
3495
0
  SysScanDesc scan;
3496
0
  ScanKeyData key[2];
3497
0
  int     nkeys;
3498
0
  HeapTuple tuple;
3499
3500
0
  pgstatistic = table_open(StatisticRelationId, RowExclusiveLock);
3501
3502
0
  ScanKeyInit(&key[0],
3503
0
        Anum_pg_statistic_starelid,
3504
0
        BTEqualStrategyNumber, F_OIDEQ,
3505
0
        ObjectIdGetDatum(relid));
3506
3507
0
  if (attnum == 0)
3508
0
    nkeys = 1;
3509
0
  else
3510
0
  {
3511
0
    ScanKeyInit(&key[1],
3512
0
          Anum_pg_statistic_staattnum,
3513
0
          BTEqualStrategyNumber, F_INT2EQ,
3514
0
          Int16GetDatum(attnum));
3515
0
    nkeys = 2;
3516
0
  }
3517
3518
0
  scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
3519
0
                NULL, nkeys, key);
3520
3521
  /* we must loop even when attnum != 0, in case of inherited stats */
3522
0
  while (HeapTupleIsValid(tuple = systable_getnext(scan)))
3523
0
    CatalogTupleDelete(pgstatistic, &tuple->t_self);
3524
3525
0
  systable_endscan(scan);
3526
3527
0
  table_close(pgstatistic, RowExclusiveLock);
3528
0
}
3529
3530
3531
/*
3532
 * RelationTruncateIndexes - truncate all indexes associated
3533
 * with the heap relation to zero tuples.
3534
 *
3535
 * The routine will truncate and then reconstruct the indexes on
3536
 * the specified relation.  Caller must hold exclusive lock on rel.
3537
 */
3538
static void
3539
RelationTruncateIndexes(Relation heapRelation)
3540
0
{
3541
0
  ListCell   *indlist;
3542
3543
  /* Ask the relcache to produce a list of the indexes of the rel */
3544
0
  foreach(indlist, RelationGetIndexList(heapRelation))
3545
0
  {
3546
0
    Oid     indexId = lfirst_oid(indlist);
3547
0
    Relation  currentIndex;
3548
0
    IndexInfo  *indexInfo;
3549
3550
    /* Open the index relation; use exclusive lock, just to be sure */
3551
0
    currentIndex = index_open(indexId, AccessExclusiveLock);
3552
3553
    /*
3554
     * Fetch info needed for index_build.  Since we know there are no
3555
     * tuples that actually need indexing, we can use a dummy IndexInfo.
3556
     * This is slightly cheaper to build, but the real point is to avoid
3557
     * possibly running user-defined code in index expressions or
3558
     * predicates.  We might be getting invoked during ON COMMIT
3559
     * processing, and we don't want to run any such code then.
3560
     */
3561
0
    indexInfo = BuildDummyIndexInfo(currentIndex);
3562
3563
    /*
3564
     * Now truncate the actual file (and discard buffers).
3565
     */
3566
0
    RelationTruncate(currentIndex, 0);
3567
3568
    /* Initialize the index and rebuild */
3569
    /* Note: we do not need to re-establish pkey setting */
3570
0
    index_build(heapRelation, currentIndex, indexInfo, true, false);
3571
3572
    /* We're done with this index */
3573
0
    index_close(currentIndex, NoLock);
3574
0
  }
3575
0
}
3576
3577
/*
3578
 *   heap_truncate
3579
 *
3580
 *   This routine deletes all data within all the specified relations.
3581
 *
3582
 * This is not transaction-safe!  There is another, transaction-safe
3583
 * implementation in commands/tablecmds.c.  We now use this only for
3584
 * ON COMMIT truncation of temporary tables, where it doesn't matter.
3585
 */
3586
void
3587
heap_truncate(List *relids)
3588
0
{
3589
0
  List     *relations = NIL;
3590
0
  ListCell   *cell;
3591
3592
  /* Open relations for processing, and grab exclusive access on each */
3593
0
  foreach(cell, relids)
3594
0
  {
3595
0
    Oid     rid = lfirst_oid(cell);
3596
0
    Relation  rel;
3597
3598
0
    rel = table_open(rid, AccessExclusiveLock);
3599
0
    relations = lappend(relations, rel);
3600
0
  }
3601
3602
  /* Don't allow truncate on tables that are referenced by foreign keys */
3603
0
  heap_truncate_check_FKs(relations, true);
3604
3605
  /* OK to do it */
3606
0
  foreach(cell, relations)
3607
0
  {
3608
0
    Relation  rel = lfirst(cell);
3609
3610
    /* Truncate the relation */
3611
0
    heap_truncate_one_rel(rel);
3612
3613
    /* Close the relation, but keep exclusive lock on it until commit */
3614
0
    table_close(rel, NoLock);
3615
0
  }
3616
0
}
3617
3618
/*
3619
 *   heap_truncate_one_rel
3620
 *
3621
 *   This routine deletes all data within the specified relation.
3622
 *
3623
 * This is not transaction-safe, because the truncation is done immediately
3624
 * and cannot be rolled back later.  Caller is responsible for having
3625
 * checked permissions etc, and must have obtained AccessExclusiveLock.
3626
 */
3627
void
3628
heap_truncate_one_rel(Relation rel)
3629
0
{
3630
0
  Oid     toastrelid;
3631
3632
  /*
3633
   * Truncate the relation.  Partitioned tables have no storage, so there is
3634
   * nothing to do for them here.
3635
   */
3636
0
  if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3637
0
    return;
3638
3639
  /* Truncate the underlying relation */
3640
0
  table_relation_nontransactional_truncate(rel);
3641
3642
  /* If the relation has indexes, truncate the indexes too */
3643
0
  RelationTruncateIndexes(rel);
3644
3645
  /* If there is a toast table, truncate that too */
3646
0
  toastrelid = rel->rd_rel->reltoastrelid;
3647
0
  if (OidIsValid(toastrelid))
3648
0
  {
3649
0
    Relation  toastrel = table_open(toastrelid, AccessExclusiveLock);
3650
3651
0
    table_relation_nontransactional_truncate(toastrel);
3652
0
    RelationTruncateIndexes(toastrel);
3653
    /* keep the lock... */
3654
0
    table_close(toastrel, NoLock);
3655
0
  }
3656
0
}
3657
3658
/*
3659
 * heap_truncate_check_FKs
3660
 *    Check for foreign keys referencing a list of relations that
3661
 *    are to be truncated, and raise error if there are any
3662
 *
3663
 * We disallow such FKs (except self-referential ones) since the whole point
3664
 * of TRUNCATE is to not scan the individual rows to be thrown away.
3665
 *
3666
 * This is split out so it can be shared by both implementations of truncate.
3667
 * Caller should already hold a suitable lock on the relations.
3668
 *
3669
 * tempTables is only used to select an appropriate error message.
3670
 */
3671
void
3672
heap_truncate_check_FKs(List *relations, bool tempTables)
3673
0
{
3674
0
  List     *oids = NIL;
3675
0
  List     *dependents;
3676
0
  ListCell   *cell;
3677
3678
  /*
3679
   * Build a list of OIDs of the interesting relations.
3680
   *
3681
   * If a relation has no triggers, then it can neither have FKs nor be
3682
   * referenced by a FK from another table, so we can ignore it.  For
3683
   * partitioned tables, FKs have no triggers, so we must include them
3684
   * anyway.
3685
   */
3686
0
  foreach(cell, relations)
3687
0
  {
3688
0
    Relation  rel = lfirst(cell);
3689
3690
0
    if (rel->rd_rel->relhastriggers ||
3691
0
      rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3692
0
      oids = lappend_oid(oids, RelationGetRelid(rel));
3693
0
  }
3694
3695
  /*
3696
   * Fast path: if no relation has triggers, none has FKs either.
3697
   */
3698
0
  if (oids == NIL)
3699
0
    return;
3700
3701
  /*
3702
   * Otherwise, must scan pg_constraint.  We make one pass with all the
3703
   * relations considered; if this finds nothing, then all is well.
3704
   */
3705
0
  dependents = heap_truncate_find_FKs(oids);
3706
0
  if (dependents == NIL)
3707
0
    return;
3708
3709
  /*
3710
   * Otherwise we repeat the scan once per relation to identify a particular
3711
   * pair of relations to complain about.  This is pretty slow, but
3712
   * performance shouldn't matter much in a failure path.  The reason for
3713
   * doing things this way is to ensure that the message produced is not
3714
   * dependent on chance row locations within pg_constraint.
3715
   */
3716
0
  foreach(cell, oids)
3717
0
  {
3718
0
    Oid     relid = lfirst_oid(cell);
3719
0
    ListCell   *cell2;
3720
3721
0
    dependents = heap_truncate_find_FKs(list_make1_oid(relid));
3722
3723
0
    foreach(cell2, dependents)
3724
0
    {
3725
0
      Oid     relid2 = lfirst_oid(cell2);
3726
3727
0
      if (!list_member_oid(oids, relid2))
3728
0
      {
3729
0
        char     *relname = get_rel_name(relid);
3730
0
        char     *relname2 = get_rel_name(relid2);
3731
3732
0
        if (tempTables)
3733
0
          ereport(ERROR,
3734
0
              (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3735
0
               errmsg("unsupported ON COMMIT and foreign key combination"),
3736
0
               errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
3737
0
                     relname2, relname)));
3738
0
        else
3739
0
          ereport(ERROR,
3740
0
              (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3741
0
               errmsg("cannot truncate a table referenced in a foreign key constraint"),
3742
0
               errdetail("Table \"%s\" references \"%s\".",
3743
0
                     relname2, relname),
3744
0
               errhint("Truncate table \"%s\" at the same time, "
3745
0
                   "or use TRUNCATE ... CASCADE.",
3746
0
                   relname2)));
3747
0
      }
3748
0
    }
3749
0
  }
3750
0
}
3751
3752
/*
3753
 * heap_truncate_find_FKs
3754
 *    Find relations having foreign keys referencing any of the given rels
3755
 *
3756
 * Input and result are both lists of relation OIDs.  The result contains
3757
 * no duplicates, does *not* include any rels that were already in the input
3758
 * list, and is sorted in OID order.  (The last property is enforced mainly
3759
 * to guarantee consistent behavior in the regression tests; we don't want
3760
 * behavior to change depending on chance locations of rows in pg_constraint.)
3761
 *
3762
 * Note: caller should already have appropriate lock on all rels mentioned
3763
 * in relationIds.  Since adding or dropping an FK requires exclusive lock
3764
 * on both rels, this ensures that the answer will be stable.
3765
 */
3766
List *
3767
heap_truncate_find_FKs(List *relationIds)
3768
0
{
3769
0
  List     *result = NIL;
3770
0
  List     *oids;
3771
0
  List     *parent_cons;
3772
0
  ListCell   *cell;
3773
0
  ScanKeyData key;
3774
0
  Relation  fkeyRel;
3775
0
  SysScanDesc fkeyScan;
3776
0
  HeapTuple tuple;
3777
0
  bool    restart;
3778
3779
0
  oids = list_copy(relationIds);
3780
3781
  /*
3782
   * Must scan pg_constraint.  Right now, it is a seqscan because there is
3783
   * no available index on confrelid.
3784
   */
3785
0
  fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
3786
3787
0
restart:
3788
0
  restart = false;
3789
0
  parent_cons = NIL;
3790
3791
0
  fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3792
0
                  NULL, 0, NULL);
3793
3794
0
  while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3795
0
  {
3796
0
    Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3797
3798
    /* Not a foreign key */
3799
0
    if (con->contype != CONSTRAINT_FOREIGN)
3800
0
      continue;
3801
3802
    /* Not referencing one of our list of tables */
3803
0
    if (!list_member_oid(oids, con->confrelid))
3804
0
      continue;
3805
3806
    /*
3807
     * If this constraint has a parent constraint which we have not seen
3808
     * yet, keep track of it for the second loop, below.  Tracking parent
3809
     * constraints allows us to climb up to the top-level constraint and
3810
     * look for all possible relations referencing the partitioned table.
3811
     */
3812
0
    if (OidIsValid(con->conparentid) &&
3813
0
      !list_member_oid(parent_cons, con->conparentid))
3814
0
      parent_cons = lappend_oid(parent_cons, con->conparentid);
3815
3816
    /*
3817
     * Add referencer to result, unless present in input list.  (Don't
3818
     * worry about dupes: we'll fix that below).
3819
     */
3820
0
    if (!list_member_oid(relationIds, con->conrelid))
3821
0
      result = lappend_oid(result, con->conrelid);
3822
0
  }
3823
3824
0
  systable_endscan(fkeyScan);
3825
3826
  /*
3827
   * Process each parent constraint we found to add the list of referenced
3828
   * relations by them to the oids list.  If we do add any new such
3829
   * relations, redo the first loop above.  Also, if we see that the parent
3830
   * constraint in turn has a parent, add that so that we process all
3831
   * relations in a single additional pass.
3832
   */
3833
0
  foreach(cell, parent_cons)
3834
0
  {
3835
0
    Oid     parent = lfirst_oid(cell);
3836
3837
0
    ScanKeyInit(&key,
3838
0
          Anum_pg_constraint_oid,
3839
0
          BTEqualStrategyNumber, F_OIDEQ,
3840
0
          ObjectIdGetDatum(parent));
3841
3842
0
    fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
3843
0
                    true, NULL, 1, &key);
3844
3845
0
    tuple = systable_getnext(fkeyScan);
3846
0
    if (HeapTupleIsValid(tuple))
3847
0
    {
3848
0
      Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3849
3850
      /*
3851
       * pg_constraint rows always appear for partitioned hierarchies
3852
       * this way: on the each side of the constraint, one row appears
3853
       * for each partition that points to the top-most table on the
3854
       * other side.
3855
       *
3856
       * Because of this arrangement, we can correctly catch all
3857
       * relevant relations by adding to 'parent_cons' all rows with
3858
       * valid conparentid, and to the 'oids' list all rows with a zero
3859
       * conparentid.  If any oids are added to 'oids', redo the first
3860
       * loop above by setting 'restart'.
3861
       */
3862
0
      if (OidIsValid(con->conparentid))
3863
0
        parent_cons = list_append_unique_oid(parent_cons,
3864
0
                           con->conparentid);
3865
0
      else if (!list_member_oid(oids, con->confrelid))
3866
0
      {
3867
0
        oids = lappend_oid(oids, con->confrelid);
3868
0
        restart = true;
3869
0
      }
3870
0
    }
3871
3872
0
    systable_endscan(fkeyScan);
3873
0
  }
3874
3875
0
  list_free(parent_cons);
3876
0
  if (restart)
3877
0
    goto restart;
3878
3879
0
  table_close(fkeyRel, AccessShareLock);
3880
0
  list_free(oids);
3881
3882
  /* Now sort and de-duplicate the result list */
3883
0
  list_sort(result, list_oid_cmp);
3884
0
  list_deduplicate_oid(result);
3885
3886
0
  return result;
3887
0
}
3888
3889
/*
3890
 * StorePartitionKey
3891
 *    Store information about the partition key rel into the catalog
3892
 */
3893
void
3894
StorePartitionKey(Relation rel,
3895
          char strategy,
3896
          int16 partnatts,
3897
          AttrNumber *partattrs,
3898
          List *partexprs,
3899
          Oid *partopclass,
3900
          Oid *partcollation)
3901
0
{
3902
0
  int     i;
3903
0
  int2vector *partattrs_vec;
3904
0
  oidvector  *partopclass_vec;
3905
0
  oidvector  *partcollation_vec;
3906
0
  Datum   partexprDatum;
3907
0
  Relation  pg_partitioned_table;
3908
0
  HeapTuple tuple;
3909
0
  Datum   values[Natts_pg_partitioned_table];
3910
0
  bool    nulls[Natts_pg_partitioned_table] = {0};
3911
0
  ObjectAddress myself;
3912
0
  ObjectAddress referenced;
3913
0
  ObjectAddresses *addrs;
3914
3915
0
  Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3916
3917
  /* Copy the partition attribute numbers, opclass OIDs into arrays */
3918
0
  partattrs_vec = buildint2vector(partattrs, partnatts);
3919
0
  partopclass_vec = buildoidvector(partopclass, partnatts);
3920
0
  partcollation_vec = buildoidvector(partcollation, partnatts);
3921
3922
  /* Convert the expressions (if any) to a text datum */
3923
0
  if (partexprs)
3924
0
  {
3925
0
    char     *exprString;
3926
3927
0
    exprString = nodeToString(partexprs);
3928
0
    partexprDatum = CStringGetTextDatum(exprString);
3929
0
    pfree(exprString);
3930
0
  }
3931
0
  else
3932
0
    partexprDatum = (Datum) 0;
3933
3934
0
  pg_partitioned_table = table_open(PartitionedRelationId, RowExclusiveLock);
3935
3936
  /* Only this can ever be NULL */
3937
0
  if (!partexprDatum)
3938
0
    nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3939
3940
0
  values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3941
0
  values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3942
0
  values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3943
0
  values[Anum_pg_partitioned_table_partdefid - 1] = ObjectIdGetDatum(InvalidOid);
3944
0
  values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3945
0
  values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3946
0
  values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3947
0
  values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3948
3949
0
  tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3950
3951
0
  CatalogTupleInsert(pg_partitioned_table, tuple);
3952
0
  table_close(pg_partitioned_table, RowExclusiveLock);
3953
3954
  /* Mark this relation as dependent on a few things as follows */
3955
0
  addrs = new_object_addresses();
3956
0
  ObjectAddressSet(myself, RelationRelationId, RelationGetRelid(rel));
3957
3958
  /* Operator class and collation per key column */
3959
0
  for (i = 0; i < partnatts; i++)
3960
0
  {
3961
0
    ObjectAddressSet(referenced, OperatorClassRelationId, partopclass[i]);
3962
0
    add_exact_object_address(&referenced, addrs);
3963
3964
    /* The default collation is pinned, so don't bother recording it */
3965
0
    if (OidIsValid(partcollation[i]) &&
3966
0
      partcollation[i] != DEFAULT_COLLATION_OID)
3967
0
    {
3968
0
      ObjectAddressSet(referenced, CollationRelationId, partcollation[i]);
3969
0
      add_exact_object_address(&referenced, addrs);
3970
0
    }
3971
0
  }
3972
3973
0
  record_object_address_dependencies(&myself, addrs, DEPENDENCY_NORMAL);
3974
0
  free_object_addresses(addrs);
3975
3976
  /*
3977
   * The partitioning columns are made internally dependent on the table,
3978
   * because we cannot drop any of them without dropping the whole table.
3979
   * (ATExecDropColumn independently enforces that, but it's not bulletproof
3980
   * so we need the dependencies too.)
3981
   */
3982
0
  for (i = 0; i < partnatts; i++)
3983
0
  {
3984
0
    if (partattrs[i] == 0)
3985
0
      continue;     /* ignore expressions here */
3986
3987
0
    ObjectAddressSubSet(referenced, RelationRelationId,
3988
0
              RelationGetRelid(rel), partattrs[i]);
3989
0
    recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3990
0
  }
3991
3992
  /*
3993
   * Also consider anything mentioned in partition expressions.  External
3994
   * references (e.g. functions) get NORMAL dependencies.  Table columns
3995
   * mentioned in the expressions are handled the same as plain partitioning
3996
   * columns, i.e. they become internally dependent on the whole table.
3997
   */
3998
0
  if (partexprs)
3999
0
    recordDependencyOnSingleRelExpr(&myself,
4000
0
                    (Node *) partexprs,
4001
0
                    RelationGetRelid(rel),
4002
0
                    DEPENDENCY_NORMAL,
4003
0
                    DEPENDENCY_INTERNAL,
4004
0
                    true /* reverse the self-deps */ );
4005
4006
  /*
4007
   * We must invalidate the relcache so that the next
4008
   * CommandCounterIncrement() will cause the same to be rebuilt using the
4009
   * information in just created catalog entry.
4010
   */
4011
0
  CacheInvalidateRelcache(rel);
4012
0
}
4013
4014
/*
4015
 *  RemovePartitionKeyByRelId
4016
 *    Remove pg_partitioned_table entry for a relation
4017
 */
4018
void
4019
RemovePartitionKeyByRelId(Oid relid)
4020
0
{
4021
0
  Relation  rel;
4022
0
  HeapTuple tuple;
4023
4024
0
  rel = table_open(PartitionedRelationId, RowExclusiveLock);
4025
4026
0
  tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
4027
0
  if (!HeapTupleIsValid(tuple))
4028
0
    elog(ERROR, "cache lookup failed for partition key of relation %u",
4029
0
       relid);
4030
4031
0
  CatalogTupleDelete(rel, &tuple->t_self);
4032
4033
0
  ReleaseSysCache(tuple);
4034
0
  table_close(rel, RowExclusiveLock);
4035
0
}
4036
4037
/*
4038
 * StorePartitionBound
4039
 *    Update pg_class tuple of rel to store the partition bound and set
4040
 *    relispartition to true
4041
 *
4042
 * If this is the default partition, also update the default partition OID in
4043
 * pg_partitioned_table.
4044
 *
4045
 * Also, invalidate the parent's relcache, so that the next rebuild will load
4046
 * the new partition's info into its partition descriptor.  If there is a
4047
 * default partition, we must invalidate its relcache entry as well.
4048
 */
4049
void
4050
StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
4051
0
{
4052
0
  Relation  classRel;
4053
0
  HeapTuple tuple,
4054
0
        newtuple;
4055
0
  Datum   new_val[Natts_pg_class];
4056
0
  bool    new_null[Natts_pg_class],
4057
0
        new_repl[Natts_pg_class];
4058
0
  Oid     defaultPartOid;
4059
4060
  /* Update pg_class tuple */
4061
0
  classRel = table_open(RelationRelationId, RowExclusiveLock);
4062
0
  tuple = SearchSysCacheCopy1(RELOID,
4063
0
                ObjectIdGetDatum(RelationGetRelid(rel)));
4064
0
  if (!HeapTupleIsValid(tuple))
4065
0
    elog(ERROR, "cache lookup failed for relation %u",
4066
0
       RelationGetRelid(rel));
4067
4068
#ifdef USE_ASSERT_CHECKING
4069
  {
4070
    Form_pg_class classForm;
4071
    bool    isnull;
4072
4073
    classForm = (Form_pg_class) GETSTRUCT(tuple);
4074
    Assert(!classForm->relispartition);
4075
    (void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
4076
                 &isnull);
4077
    Assert(isnull);
4078
  }
4079
#endif
4080
4081
  /* Fill in relpartbound value */
4082
0
  memset(new_val, 0, sizeof(new_val));
4083
0
  memset(new_null, false, sizeof(new_null));
4084
0
  memset(new_repl, false, sizeof(new_repl));
4085
0
  new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
4086
0
  new_null[Anum_pg_class_relpartbound - 1] = false;
4087
0
  new_repl[Anum_pg_class_relpartbound - 1] = true;
4088
0
  newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
4089
0
                 new_val, new_null, new_repl);
4090
  /* Also set the flag */
4091
0
  ((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
4092
4093
  /*
4094
   * We already checked for no inheritance children, but reset
4095
   * relhassubclass in case it was left over.
4096
   */
4097
0
  if (rel->rd_rel->relkind == RELKIND_RELATION && rel->rd_rel->relhassubclass)
4098
0
    ((Form_pg_class) GETSTRUCT(newtuple))->relhassubclass = false;
4099
4100
0
  CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
4101
0
  heap_freetuple(newtuple);
4102
0
  table_close(classRel, RowExclusiveLock);
4103
4104
  /*
4105
   * If we're storing bounds for the default partition, update
4106
   * pg_partitioned_table too.
4107
   */
4108
0
  if (bound->is_default)
4109
0
    update_default_partition_oid(RelationGetRelid(parent),
4110
0
                   RelationGetRelid(rel));
4111
4112
  /* Make these updates visible */
4113
0
  CommandCounterIncrement();
4114
4115
  /*
4116
   * The partition constraint for the default partition depends on the
4117
   * partition bounds of every other partition, so we must invalidate the
4118
   * relcache entry for that partition every time a partition is added or
4119
   * removed.
4120
   */
4121
0
  defaultPartOid =
4122
0
    get_default_oid_from_partdesc(RelationGetPartitionDesc(parent, true));
4123
0
  if (OidIsValid(defaultPartOid))
4124
0
    CacheInvalidateRelcacheByRelid(defaultPartOid);
4125
4126
0
  CacheInvalidateRelcache(parent);
4127
0
}