Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/commands/schemacmds.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * schemacmds.c
4
 *    schema creation/manipulation commands
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/commands/schemacmds.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include "access/htup_details.h"
18
#include "access/table.h"
19
#include "access/xact.h"
20
#include "catalog/catalog.h"
21
#include "catalog/dependency.h"
22
#include "catalog/indexing.h"
23
#include "catalog/namespace.h"
24
#include "catalog/objectaccess.h"
25
#include "catalog/pg_authid.h"
26
#include "catalog/pg_database.h"
27
#include "catalog/pg_namespace.h"
28
#include "commands/dbcommands.h"
29
#include "commands/event_trigger.h"
30
#include "commands/schemacmds.h"
31
#include "miscadmin.h"
32
#include "parser/parse_utilcmd.h"
33
#include "parser/scansup.h"
34
#include "tcop/utility.h"
35
#include "utils/acl.h"
36
#include "utils/builtins.h"
37
#include "utils/rel.h"
38
#include "utils/syscache.h"
39
40
static void AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId);
41
42
/*
43
 * CREATE SCHEMA
44
 *
45
 * Note: caller should pass in location information for the whole
46
 * CREATE SCHEMA statement, which in turn we pass down as the location
47
 * of the component commands.  This comports with our general plan of
48
 * reporting location/len for the whole command even when executing
49
 * a subquery.
50
 */
51
Oid
52
CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString,
53
          int stmt_location, int stmt_len)
54
0
{
55
0
  const char *schemaName = stmt->schemaname;
56
0
  Oid     namespaceId;
57
0
  List     *parsetree_list;
58
0
  ListCell   *parsetree_item;
59
0
  Oid     owner_uid;
60
0
  Oid     saved_uid;
61
0
  int     save_sec_context;
62
0
  int     save_nestlevel;
63
0
  char     *nsp = namespace_search_path;
64
0
  AclResult aclresult;
65
0
  ObjectAddress address;
66
0
  StringInfoData pathbuf;
67
68
0
  GetUserIdAndSecContext(&saved_uid, &save_sec_context);
69
70
  /*
71
   * Who is supposed to own the new schema?
72
   */
73
0
  if (stmt->authrole)
74
0
    owner_uid = get_rolespec_oid(stmt->authrole, false);
75
0
  else
76
0
    owner_uid = saved_uid;
77
78
  /* fill schema name with the user name if not specified */
79
0
  if (!schemaName)
80
0
  {
81
0
    HeapTuple tuple;
82
83
0
    tuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(owner_uid));
84
0
    if (!HeapTupleIsValid(tuple))
85
0
      elog(ERROR, "cache lookup failed for role %u", owner_uid);
86
0
    schemaName =
87
0
      pstrdup(NameStr(((Form_pg_authid) GETSTRUCT(tuple))->rolname));
88
0
    ReleaseSysCache(tuple);
89
0
  }
90
91
  /*
92
   * To create a schema, must have schema-create privilege on the current
93
   * database and must be able to become the target role (this does not
94
   * imply that the target role itself must have create-schema privilege).
95
   * The latter provision guards against "giveaway" attacks.  Note that a
96
   * superuser will always have both of these privileges a fortiori.
97
   */
98
0
  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, saved_uid, ACL_CREATE);
99
0
  if (aclresult != ACLCHECK_OK)
100
0
    aclcheck_error(aclresult, OBJECT_DATABASE,
101
0
             get_database_name(MyDatabaseId));
102
103
0
  check_can_set_role(saved_uid, owner_uid);
104
105
  /* Additional check to protect reserved schema names */
106
0
  if (!allowSystemTableMods && IsReservedName(schemaName))
107
0
    ereport(ERROR,
108
0
        (errcode(ERRCODE_RESERVED_NAME),
109
0
         errmsg("unacceptable schema name \"%s\"", schemaName),
110
0
         errdetail("The prefix \"pg_\" is reserved for system schemas.")));
111
112
  /*
113
   * If if_not_exists was given and the schema already exists, bail out.
114
   * (Note: we needn't check this when not if_not_exists, because
115
   * NamespaceCreate will complain anyway.)  We could do this before making
116
   * the permissions checks, but since CREATE TABLE IF NOT EXISTS makes its
117
   * creation-permission check first, we do likewise.
118
   */
119
0
  if (stmt->if_not_exists)
120
0
  {
121
0
    namespaceId = get_namespace_oid(schemaName, true);
122
0
    if (OidIsValid(namespaceId))
123
0
    {
124
      /*
125
       * If we are in an extension script, insist that the pre-existing
126
       * object be a member of the extension, to avoid security risks.
127
       */
128
0
      ObjectAddressSet(address, NamespaceRelationId, namespaceId);
129
0
      checkMembershipInCurrentExtension(&address);
130
131
      /* OK to skip */
132
0
      ereport(NOTICE,
133
0
          (errcode(ERRCODE_DUPLICATE_SCHEMA),
134
0
           errmsg("schema \"%s\" already exists, skipping",
135
0
              schemaName)));
136
0
      return InvalidOid;
137
0
    }
138
0
  }
139
140
  /*
141
   * If the requested authorization is different from the current user,
142
   * temporarily set the current user so that the object(s) will be created
143
   * with the correct ownership.
144
   *
145
   * (The setting will be restored at the end of this routine, or in case of
146
   * error, transaction abort will clean things up.)
147
   */
148
0
  if (saved_uid != owner_uid)
149
0
    SetUserIdAndSecContext(owner_uid,
150
0
                 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
151
152
  /* Create the schema's namespace */
153
0
  namespaceId = NamespaceCreate(schemaName, owner_uid, false);
154
155
  /* Advance cmd counter to make the namespace visible */
156
0
  CommandCounterIncrement();
157
158
  /*
159
   * Prepend the new schema to the current search path.
160
   *
161
   * We use the equivalent of a function SET option to allow the setting to
162
   * persist for exactly the duration of the schema creation.  guc.c also
163
   * takes care of undoing the setting on error.
164
   */
165
0
  save_nestlevel = NewGUCNestLevel();
166
167
0
  initStringInfo(&pathbuf);
168
0
  appendStringInfoString(&pathbuf, quote_identifier(schemaName));
169
170
0
  while (scanner_isspace(*nsp))
171
0
    nsp++;
172
173
0
  if (*nsp != '\0')
174
0
    appendStringInfo(&pathbuf, ", %s", nsp);
175
176
0
  (void) set_config_option("search_path", pathbuf.data,
177
0
               PGC_USERSET, PGC_S_SESSION,
178
0
               GUC_ACTION_SAVE, true, 0, false);
179
180
  /*
181
   * Report the new schema to possibly interested event triggers.  Note we
182
   * must do this here and not in ProcessUtilitySlow because otherwise the
183
   * objects created below are reported before the schema, which would be
184
   * wrong.
185
   */
186
0
  ObjectAddressSet(address, NamespaceRelationId, namespaceId);
187
0
  EventTriggerCollectSimpleCommand(address, InvalidObjectAddress,
188
0
                   (Node *) stmt);
189
190
  /*
191
   * Examine the list of commands embedded in the CREATE SCHEMA command, and
192
   * reorganize them into a sequentially executable order with no forward
193
   * references.  Note that the result is still a list of raw parsetrees ---
194
   * we cannot, in general, run parse analysis on one statement until we
195
   * have actually executed the prior ones.
196
   */
197
0
  parsetree_list = transformCreateSchemaStmtElements(stmt->schemaElts,
198
0
                             schemaName);
199
200
  /*
201
   * Execute each command contained in the CREATE SCHEMA.  Since the grammar
202
   * allows only utility commands in CREATE SCHEMA, there is no need to pass
203
   * them through parse_analyze_*() or the rewriter; we can just hand them
204
   * straight to ProcessUtility.
205
   */
206
0
  foreach(parsetree_item, parsetree_list)
207
0
  {
208
0
    Node     *stmt = (Node *) lfirst(parsetree_item);
209
0
    PlannedStmt *wrapper;
210
211
    /* need to make a wrapper PlannedStmt */
212
0
    wrapper = makeNode(PlannedStmt);
213
0
    wrapper->commandType = CMD_UTILITY;
214
0
    wrapper->canSetTag = false;
215
0
    wrapper->utilityStmt = stmt;
216
0
    wrapper->stmt_location = stmt_location;
217
0
    wrapper->stmt_len = stmt_len;
218
219
    /* do this step */
220
0
    ProcessUtility(wrapper,
221
0
             queryString,
222
0
             false,
223
0
             PROCESS_UTILITY_SUBCOMMAND,
224
0
             NULL,
225
0
             NULL,
226
0
             None_Receiver,
227
0
             NULL);
228
229
    /* make sure later steps can see the object created here */
230
0
    CommandCounterIncrement();
231
0
  }
232
233
  /*
234
   * Restore the GUC variable search_path we set above.
235
   */
236
0
  AtEOXact_GUC(true, save_nestlevel);
237
238
  /* Reset current user and security context */
239
0
  SetUserIdAndSecContext(saved_uid, save_sec_context);
240
241
0
  return namespaceId;
242
0
}
243
244
245
/*
246
 * Rename schema
247
 */
248
ObjectAddress
249
RenameSchema(const char *oldname, const char *newname)
250
0
{
251
0
  Oid     nspOid;
252
0
  HeapTuple tup;
253
0
  Relation  rel;
254
0
  AclResult aclresult;
255
0
  ObjectAddress address;
256
0
  Form_pg_namespace nspform;
257
258
0
  rel = table_open(NamespaceRelationId, RowExclusiveLock);
259
260
0
  tup = SearchSysCacheCopy1(NAMESPACENAME, CStringGetDatum(oldname));
261
0
  if (!HeapTupleIsValid(tup))
262
0
    ereport(ERROR,
263
0
        (errcode(ERRCODE_UNDEFINED_SCHEMA),
264
0
         errmsg("schema \"%s\" does not exist", oldname)));
265
266
0
  nspform = (Form_pg_namespace) GETSTRUCT(tup);
267
0
  nspOid = nspform->oid;
268
269
  /* make sure the new name doesn't exist */
270
0
  if (OidIsValid(get_namespace_oid(newname, true)))
271
0
    ereport(ERROR,
272
0
        (errcode(ERRCODE_DUPLICATE_SCHEMA),
273
0
         errmsg("schema \"%s\" already exists", newname)));
274
275
  /* must be owner */
276
0
  if (!object_ownercheck(NamespaceRelationId, nspOid, GetUserId()))
277
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
278
0
             oldname);
279
280
  /* must have CREATE privilege on database */
281
0
  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
282
0
  if (aclresult != ACLCHECK_OK)
283
0
    aclcheck_error(aclresult, OBJECT_DATABASE,
284
0
             get_database_name(MyDatabaseId));
285
286
0
  if (!allowSystemTableMods && IsReservedName(newname))
287
0
    ereport(ERROR,
288
0
        (errcode(ERRCODE_RESERVED_NAME),
289
0
         errmsg("unacceptable schema name \"%s\"", newname),
290
0
         errdetail("The prefix \"pg_\" is reserved for system schemas.")));
291
292
  /* rename */
293
0
  namestrcpy(&nspform->nspname, newname);
294
0
  CatalogTupleUpdate(rel, &tup->t_self, tup);
295
296
0
  InvokeObjectPostAlterHook(NamespaceRelationId, nspOid, 0);
297
298
0
  ObjectAddressSet(address, NamespaceRelationId, nspOid);
299
300
0
  table_close(rel, NoLock);
301
0
  heap_freetuple(tup);
302
303
0
  return address;
304
0
}
305
306
void
307
AlterSchemaOwner_oid(Oid schemaoid, Oid newOwnerId)
308
0
{
309
0
  HeapTuple tup;
310
0
  Relation  rel;
311
312
0
  rel = table_open(NamespaceRelationId, RowExclusiveLock);
313
314
0
  tup = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaoid));
315
0
  if (!HeapTupleIsValid(tup))
316
0
    elog(ERROR, "cache lookup failed for schema %u", schemaoid);
317
318
0
  AlterSchemaOwner_internal(tup, rel, newOwnerId);
319
320
0
  ReleaseSysCache(tup);
321
322
0
  table_close(rel, RowExclusiveLock);
323
0
}
324
325
326
/*
327
 * Change schema owner
328
 */
329
ObjectAddress
330
AlterSchemaOwner(const char *name, Oid newOwnerId)
331
0
{
332
0
  Oid     nspOid;
333
0
  HeapTuple tup;
334
0
  Relation  rel;
335
0
  ObjectAddress address;
336
0
  Form_pg_namespace nspform;
337
338
0
  rel = table_open(NamespaceRelationId, RowExclusiveLock);
339
340
0
  tup = SearchSysCache1(NAMESPACENAME, CStringGetDatum(name));
341
0
  if (!HeapTupleIsValid(tup))
342
0
    ereport(ERROR,
343
0
        (errcode(ERRCODE_UNDEFINED_SCHEMA),
344
0
         errmsg("schema \"%s\" does not exist", name)));
345
346
0
  nspform = (Form_pg_namespace) GETSTRUCT(tup);
347
0
  nspOid = nspform->oid;
348
349
0
  AlterSchemaOwner_internal(tup, rel, newOwnerId);
350
351
0
  ObjectAddressSet(address, NamespaceRelationId, nspOid);
352
353
0
  ReleaseSysCache(tup);
354
355
0
  table_close(rel, RowExclusiveLock);
356
357
0
  return address;
358
0
}
359
360
static void
361
AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId)
362
0
{
363
0
  Form_pg_namespace nspForm;
364
365
0
  Assert(tup->t_tableOid == NamespaceRelationId);
366
0
  Assert(RelationGetRelid(rel) == NamespaceRelationId);
367
368
0
  nspForm = (Form_pg_namespace) GETSTRUCT(tup);
369
370
  /*
371
   * If the new owner is the same as the existing owner, consider the
372
   * command to have succeeded.  This is for dump restoration purposes.
373
   */
374
0
  if (nspForm->nspowner != newOwnerId)
375
0
  {
376
0
    Datum   repl_val[Natts_pg_namespace];
377
0
    bool    repl_null[Natts_pg_namespace];
378
0
    bool    repl_repl[Natts_pg_namespace];
379
0
    Acl      *newAcl;
380
0
    Datum   aclDatum;
381
0
    bool    isNull;
382
0
    HeapTuple newtuple;
383
0
    AclResult aclresult;
384
385
    /* Otherwise, must be owner of the existing object */
386
0
    if (!object_ownercheck(NamespaceRelationId, nspForm->oid, GetUserId()))
387
0
      aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
388
0
               NameStr(nspForm->nspname));
389
390
    /* Must be able to become new owner */
391
0
    check_can_set_role(GetUserId(), newOwnerId);
392
393
    /*
394
     * must have create-schema rights
395
     *
396
     * NOTE: This is different from other alter-owner checks in that the
397
     * current user is checked for create privileges instead of the
398
     * destination owner.  This is consistent with the CREATE case for
399
     * schemas.  Because superusers will always have this right, we need
400
     * no special case for them.
401
     */
402
0
    aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(),
403
0
                  ACL_CREATE);
404
0
    if (aclresult != ACLCHECK_OK)
405
0
      aclcheck_error(aclresult, OBJECT_DATABASE,
406
0
               get_database_name(MyDatabaseId));
407
408
0
    memset(repl_null, false, sizeof(repl_null));
409
0
    memset(repl_repl, false, sizeof(repl_repl));
410
411
0
    repl_repl[Anum_pg_namespace_nspowner - 1] = true;
412
0
    repl_val[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(newOwnerId);
413
414
    /*
415
     * Determine the modified ACL for the new owner.  This is only
416
     * necessary when the ACL is non-null.
417
     */
418
0
    aclDatum = SysCacheGetAttr(NAMESPACENAME, tup,
419
0
                   Anum_pg_namespace_nspacl,
420
0
                   &isNull);
421
0
    if (!isNull)
422
0
    {
423
0
      newAcl = aclnewowner(DatumGetAclP(aclDatum),
424
0
                 nspForm->nspowner, newOwnerId);
425
0
      repl_repl[Anum_pg_namespace_nspacl - 1] = true;
426
0
      repl_val[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(newAcl);
427
0
    }
428
429
0
    newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
430
431
0
    CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
432
433
0
    heap_freetuple(newtuple);
434
435
    /* Update owner dependency reference */
436
0
    changeDependencyOnOwner(NamespaceRelationId, nspForm->oid,
437
0
                newOwnerId);
438
0
  }
439
440
0
  InvokeObjectPostAlterHook(NamespaceRelationId,
441
0
                nspForm->oid, 0);
442
0
}