Coverage Report

Created: 2025-08-12 06:43

/src/postgres/src/backend/commands/dbcommands.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * dbcommands.c
4
 *    Database management commands (create/drop database).
5
 *
6
 * Note: database creation/destruction commands use exclusive locks on
7
 * the database objects (as expressed by LockSharedObject()) to avoid
8
 * stepping on each others' toes.  Formerly we used table-level locks
9
 * on pg_database, but that's too coarse-grained.
10
 *
11
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 *
15
 * IDENTIFICATION
16
 *    src/backend/commands/dbcommands.c
17
 *
18
 *-------------------------------------------------------------------------
19
 */
20
#include "postgres.h"
21
22
#include <fcntl.h>
23
#include <unistd.h>
24
#include <sys/stat.h>
25
26
#include "access/genam.h"
27
#include "access/heapam.h"
28
#include "access/htup_details.h"
29
#include "access/multixact.h"
30
#include "access/tableam.h"
31
#include "access/xact.h"
32
#include "access/xloginsert.h"
33
#include "access/xlogrecovery.h"
34
#include "access/xlogutils.h"
35
#include "catalog/catalog.h"
36
#include "catalog/dependency.h"
37
#include "catalog/indexing.h"
38
#include "catalog/objectaccess.h"
39
#include "catalog/pg_authid.h"
40
#include "catalog/pg_collation.h"
41
#include "catalog/pg_database.h"
42
#include "catalog/pg_db_role_setting.h"
43
#include "catalog/pg_subscription.h"
44
#include "catalog/pg_tablespace.h"
45
#include "commands/comment.h"
46
#include "commands/dbcommands.h"
47
#include "commands/dbcommands_xlog.h"
48
#include "commands/defrem.h"
49
#include "commands/seclabel.h"
50
#include "commands/tablespace.h"
51
#include "common/file_perm.h"
52
#include "mb/pg_wchar.h"
53
#include "miscadmin.h"
54
#include "pgstat.h"
55
#include "postmaster/bgwriter.h"
56
#include "replication/slot.h"
57
#include "storage/copydir.h"
58
#include "storage/fd.h"
59
#include "storage/ipc.h"
60
#include "storage/lmgr.h"
61
#include "storage/md.h"
62
#include "storage/procarray.h"
63
#include "storage/smgr.h"
64
#include "utils/acl.h"
65
#include "utils/builtins.h"
66
#include "utils/fmgroids.h"
67
#include "utils/pg_locale.h"
68
#include "utils/relmapper.h"
69
#include "utils/snapmgr.h"
70
#include "utils/syscache.h"
71
72
/*
73
 * Create database strategy.
74
 *
75
 * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76
 * copied block.
77
 *
78
 * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79
 * database and log a single record for each tablespace copied. To make this
80
 * safe, it also triggers checkpoints before and after the operation.
81
 */
82
typedef enum CreateDBStrategy
83
{
84
  CREATEDB_WAL_LOG,
85
  CREATEDB_FILE_COPY,
86
} CreateDBStrategy;
87
88
typedef struct
89
{
90
  Oid     src_dboid;    /* source (template) DB */
91
  Oid     dest_dboid;   /* DB we are trying to create */
92
  CreateDBStrategy strategy;  /* create db strategy */
93
} createdb_failure_params;
94
95
typedef struct
96
{
97
  Oid     dest_dboid;   /* DB we are trying to move */
98
  Oid     dest_tsoid;   /* tablespace we are trying to move to */
99
} movedb_failure_params;
100
101
/*
102
 * Information about a relation to be copied when creating a database.
103
 */
104
typedef struct CreateDBRelInfo
105
{
106
  RelFileLocator rlocator;  /* physical relation identifier */
107
  Oid     reloid;     /* relation oid */
108
  bool    permanent;    /* relation is permanent or unlogged */
109
} CreateDBRelInfo;
110
111
112
/* non-export function prototypes */
113
static void createdb_failure_callback(int code, Datum arg);
114
static void movedb(const char *dbname, const char *tblspcname);
115
static void movedb_failure_callback(int code, Datum arg);
116
static bool get_db_info(const char *name, LOCKMODE lockmode,
117
            Oid *dbIdP, Oid *ownerIdP,
118
            int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119
            TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120
            Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121
            char **dbIcurules,
122
            char *dbLocProvider,
123
            char **dbCollversion);
124
static void remove_dbtablespaces(Oid db_id);
125
static bool check_db_file_conflict(Oid db_id);
126
static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
127
static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128
                    Oid dst_tsid);
129
static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
130
static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
131
                       Oid dbid, char *srcpath,
132
                       List *rlocatorlist, Snapshot snapshot);
133
static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
134
                             Oid tbid, Oid dbid,
135
                             char *srcpath);
136
static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137
                  bool isRedo);
138
static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139
                    Oid src_tsid, Oid dst_tsid);
140
static void recovery_create_dbdir(char *path, bool only_tblspc);
141
142
/*
143
 * Create a new database using the WAL_LOG strategy.
144
 *
145
 * Each copied block is separately written to the write-ahead log.
146
 */
147
static void
148
CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149
              Oid src_tsid, Oid dst_tsid)
150
0
{
151
0
  char     *srcpath;
152
0
  char     *dstpath;
153
0
  List     *rlocatorlist = NULL;
154
0
  ListCell   *cell;
155
0
  LockRelId srcrelid;
156
0
  LockRelId dstrelid;
157
0
  RelFileLocator srcrlocator;
158
0
  RelFileLocator dstrlocator;
159
0
  CreateDBRelInfo *relinfo;
160
161
  /* Get source and destination database paths. */
162
0
  srcpath = GetDatabasePath(src_dboid, src_tsid);
163
0
  dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164
165
  /* Create database directory and write PG_VERSION file. */
166
0
  CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167
168
  /* Copy relmap file from source database to the destination database. */
169
0
  RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170
171
  /* Get list of relfilelocators to copy from the source database. */
172
0
  rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173
0
  Assert(rlocatorlist != NIL);
174
175
  /*
176
   * Database IDs will be the same for all relations so set them before
177
   * entering the loop.
178
   */
179
0
  srcrelid.dbId = src_dboid;
180
0
  dstrelid.dbId = dst_dboid;
181
182
  /* Loop over our list of relfilelocators and copy each one. */
183
0
  foreach(cell, rlocatorlist)
184
0
  {
185
0
    relinfo = lfirst(cell);
186
0
    srcrlocator = relinfo->rlocator;
187
188
    /*
189
     * If the relation is from the source db's default tablespace then we
190
     * need to create it in the destination db's default tablespace.
191
     * Otherwise, we need to create in the same tablespace as it is in the
192
     * source database.
193
     */
194
0
    if (srcrlocator.spcOid == src_tsid)
195
0
      dstrlocator.spcOid = dst_tsid;
196
0
    else
197
0
      dstrlocator.spcOid = srcrlocator.spcOid;
198
199
0
    dstrlocator.dbOid = dst_dboid;
200
0
    dstrlocator.relNumber = srcrlocator.relNumber;
201
202
    /*
203
     * Acquire locks on source and target relations before copying.
204
     *
205
     * We typically do not read relation data into shared_buffers without
206
     * holding a relation lock. It's unclear what could go wrong if we
207
     * skipped it in this case, because nobody can be modifying either the
208
     * source or destination database at this point, and we have locks on
209
     * both databases, too, but let's take the conservative route.
210
     */
211
0
    dstrelid.relId = srcrelid.relId = relinfo->reloid;
212
0
    LockRelationId(&srcrelid, AccessShareLock);
213
0
    LockRelationId(&dstrelid, AccessShareLock);
214
215
    /* Copy relation storage from source to the destination. */
216
0
    CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217
218
    /* Release the relation locks. */
219
0
    UnlockRelationId(&srcrelid, AccessShareLock);
220
0
    UnlockRelationId(&dstrelid, AccessShareLock);
221
0
  }
222
223
0
  pfree(srcpath);
224
0
  pfree(dstpath);
225
0
  list_free_deep(rlocatorlist);
226
0
}
227
228
/*
229
 * Scan the pg_class table in the source database to identify the relations
230
 * that need to be copied to the destination database.
231
 *
232
 * This is an exception to the usual rule that cross-database access is
233
 * not possible. We can make it work here because we know that there are no
234
 * connections to the source database and (since there can't be prepared
235
 * transactions touching that database) no in-doubt tuples either. This
236
 * means that we don't need to worry about pruning removing anything from
237
 * under us, and we don't need to be too picky about our snapshot either.
238
 * As long as it sees all previously-committed XIDs as committed and all
239
 * aborted XIDs as aborted, we should be fine: nothing else is possible
240
 * here.
241
 *
242
 * We can't rely on the relcache for anything here, because that only knows
243
 * about the database to which we are connected, and can't handle access to
244
 * other databases. That also means we can't rely on the heap scan
245
 * infrastructure, which would be a bad idea anyway since it might try
246
 * to do things like HOT pruning which we definitely can't do safely in
247
 * a database to which we're not even connected.
248
 */
249
static List *
250
ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251
0
{
252
0
  RelFileLocator rlocator;
253
0
  BlockNumber nblocks;
254
0
  BlockNumber blkno;
255
0
  Buffer    buf;
256
0
  RelFileNumber relfilenumber;
257
0
  Page    page;
258
0
  List     *rlocatorlist = NIL;
259
0
  LockRelId relid;
260
0
  Snapshot  snapshot;
261
0
  SMgrRelation smgr;
262
0
  BufferAccessStrategy bstrategy;
263
264
  /* Get pg_class relfilenumber. */
265
0
  relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266
0
                              RelationRelationId);
267
268
  /* Don't read data into shared_buffers without holding a relation lock. */
269
0
  relid.dbId = dbid;
270
0
  relid.relId = RelationRelationId;
271
0
  LockRelationId(&relid, AccessShareLock);
272
273
  /* Prepare a RelFileLocator for the pg_class relation. */
274
0
  rlocator.spcOid = tbid;
275
0
  rlocator.dbOid = dbid;
276
0
  rlocator.relNumber = relfilenumber;
277
278
0
  smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279
0
  nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280
0
  smgrclose(smgr);
281
282
  /* Use a buffer access strategy since this is a bulk read operation. */
283
0
  bstrategy = GetAccessStrategy(BAS_BULKREAD);
284
285
  /*
286
   * As explained in the function header comments, we need a snapshot that
287
   * will see all committed transactions as committed, and our transaction
288
   * snapshot - or the active snapshot - might not be new enough for that,
289
   * but the return value of GetLatestSnapshot() should work fine.
290
   */
291
0
  snapshot = RegisterSnapshot(GetLatestSnapshot());
292
293
  /* Process the relation block by block. */
294
0
  for (blkno = 0; blkno < nblocks; blkno++)
295
0
  {
296
0
    CHECK_FOR_INTERRUPTS();
297
298
0
    buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299
0
                    RBM_NORMAL, bstrategy, true);
300
301
0
    LockBuffer(buf, BUFFER_LOCK_SHARE);
302
0
    page = BufferGetPage(buf);
303
0
    if (PageIsNew(page) || PageIsEmpty(page))
304
0
    {
305
0
      UnlockReleaseBuffer(buf);
306
0
      continue;
307
0
    }
308
309
    /* Append relevant pg_class tuples for current page to rlocatorlist. */
310
0
    rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311
0
                           srcpath, rlocatorlist,
312
0
                           snapshot);
313
314
0
    UnlockReleaseBuffer(buf);
315
0
  }
316
0
  UnregisterSnapshot(snapshot);
317
318
  /* Release relation lock. */
319
0
  UnlockRelationId(&relid, AccessShareLock);
320
321
0
  return rlocatorlist;
322
0
}
323
324
/*
325
 * Scan one page of the source database's pg_class relation and add relevant
326
 * entries to rlocatorlist. The return value is the updated list.
327
 */
328
static List *
329
ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
330
                char *srcpath, List *rlocatorlist,
331
                Snapshot snapshot)
332
0
{
333
0
  BlockNumber blkno = BufferGetBlockNumber(buf);
334
0
  OffsetNumber offnum;
335
0
  OffsetNumber maxoff;
336
0
  HeapTupleData tuple;
337
338
0
  maxoff = PageGetMaxOffsetNumber(page);
339
340
  /* Loop over offsets. */
341
0
  for (offnum = FirstOffsetNumber;
342
0
     offnum <= maxoff;
343
0
     offnum = OffsetNumberNext(offnum))
344
0
  {
345
0
    ItemId    itemid;
346
347
0
    itemid = PageGetItemId(page, offnum);
348
349
    /* Nothing to do if slot is empty or already dead. */
350
0
    if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
351
0
      ItemIdIsRedirected(itemid))
352
0
      continue;
353
354
0
    Assert(ItemIdIsNormal(itemid));
355
0
    ItemPointerSet(&(tuple.t_self), blkno, offnum);
356
357
    /* Initialize a HeapTupleData structure. */
358
0
    tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
359
0
    tuple.t_len = ItemIdGetLength(itemid);
360
0
    tuple.t_tableOid = RelationRelationId;
361
362
    /* Skip tuples that are not visible to this snapshot. */
363
0
    if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
364
0
    {
365
0
      CreateDBRelInfo *relinfo;
366
367
      /*
368
       * ScanSourceDatabasePgClassTuple is in charge of constructing a
369
       * CreateDBRelInfo object for this tuple, but can also decide that
370
       * this tuple isn't something we need to copy. If we do need to
371
       * copy the relation, add it to the list.
372
       */
373
0
      relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
374
0
                           srcpath);
375
0
      if (relinfo != NULL)
376
0
        rlocatorlist = lappend(rlocatorlist, relinfo);
377
0
    }
378
0
  }
379
380
0
  return rlocatorlist;
381
0
}
382
383
/*
384
 * Decide whether a certain pg_class tuple represents something that
385
 * needs to be copied from the source database to the destination database,
386
 * and if so, construct a CreateDBRelInfo for it.
387
 *
388
 * Visibility checks are handled by the caller, so our job here is just
389
 * to assess the data stored in the tuple.
390
 */
391
CreateDBRelInfo *
392
ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
393
                 char *srcpath)
394
0
{
395
0
  CreateDBRelInfo *relinfo;
396
0
  Form_pg_class classForm;
397
0
  RelFileNumber relfilenumber = InvalidRelFileNumber;
398
399
0
  classForm = (Form_pg_class) GETSTRUCT(tuple);
400
401
  /*
402
   * Return NULL if this object does not need to be copied.
403
   *
404
   * Shared objects don't need to be copied, because they are shared.
405
   * Objects without storage can't be copied, because there's nothing to
406
   * copy. Temporary relations don't need to be copied either, because they
407
   * are inaccessible outside of the session that created them, which must
408
   * be gone already, and couldn't connect to a different database if it
409
   * still existed. autovacuum will eventually remove the pg_class entries
410
   * as well.
411
   */
412
0
  if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
413
0
    !RELKIND_HAS_STORAGE(classForm->relkind) ||
414
0
    classForm->relpersistence == RELPERSISTENCE_TEMP)
415
0
    return NULL;
416
417
  /*
418
   * If relfilenumber is valid then directly use it.  Otherwise, consult the
419
   * relmap.
420
   */
421
0
  if (RelFileNumberIsValid(classForm->relfilenode))
422
0
    relfilenumber = classForm->relfilenode;
423
0
  else
424
0
    relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
425
0
                                classForm->oid);
426
427
  /* We must have a valid relfilenumber. */
428
0
  if (!RelFileNumberIsValid(relfilenumber))
429
0
    elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
430
0
       classForm->oid);
431
432
  /* Prepare a rel info element and add it to the list. */
433
0
  relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
434
0
  if (OidIsValid(classForm->reltablespace))
435
0
    relinfo->rlocator.spcOid = classForm->reltablespace;
436
0
  else
437
0
    relinfo->rlocator.spcOid = tbid;
438
439
0
  relinfo->rlocator.dbOid = dbid;
440
0
  relinfo->rlocator.relNumber = relfilenumber;
441
0
  relinfo->reloid = classForm->oid;
442
443
  /* Temporary relations were rejected above. */
444
0
  Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
445
0
  relinfo->permanent =
446
0
    (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
447
448
0
  return relinfo;
449
0
}
450
451
/*
452
 * Create database directory and write out the PG_VERSION file in the database
453
 * path.  If isRedo is true, it's okay for the database directory to exist
454
 * already.
455
 */
456
static void
457
CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
458
0
{
459
0
  int     fd;
460
0
  int     nbytes;
461
0
  char    versionfile[MAXPGPATH];
462
0
  char    buf[16];
463
464
  /*
465
   * Note that we don't have to copy version data from the source database;
466
   * there's only one legal value.
467
   */
468
0
  sprintf(buf, "%s\n", PG_MAJORVERSION);
469
0
  nbytes = strlen(PG_MAJORVERSION) + 1;
470
471
  /* Create database directory. */
472
0
  if (MakePGDirectory(dbpath) < 0)
473
0
  {
474
    /* Failure other than already exists or not in WAL replay? */
475
0
    if (errno != EEXIST || !isRedo)
476
0
      ereport(ERROR,
477
0
          (errcode_for_file_access(),
478
0
           errmsg("could not create directory \"%s\": %m", dbpath)));
479
0
  }
480
481
  /*
482
   * Create PG_VERSION file in the database path.  If the file already
483
   * exists and we are in WAL replay then try again to open it in write
484
   * mode.
485
   */
486
0
  snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
487
488
0
  fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
489
0
  if (fd < 0 && errno == EEXIST && isRedo)
490
0
    fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
491
492
0
  if (fd < 0)
493
0
    ereport(ERROR,
494
0
        (errcode_for_file_access(),
495
0
         errmsg("could not create file \"%s\": %m", versionfile)));
496
497
  /* Write PG_MAJORVERSION in the PG_VERSION file. */
498
0
  pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
499
0
  errno = 0;
500
0
  if ((int) write(fd, buf, nbytes) != nbytes)
501
0
  {
502
    /* If write didn't set errno, assume problem is no disk space. */
503
0
    if (errno == 0)
504
0
      errno = ENOSPC;
505
0
    ereport(ERROR,
506
0
        (errcode_for_file_access(),
507
0
         errmsg("could not write to file \"%s\": %m", versionfile)));
508
0
  }
509
0
  pgstat_report_wait_end();
510
511
0
  pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
512
0
  if (pg_fsync(fd) != 0)
513
0
    ereport(data_sync_elevel(ERROR),
514
0
        (errcode_for_file_access(),
515
0
         errmsg("could not fsync file \"%s\": %m", versionfile)));
516
0
  fsync_fname(dbpath, true);
517
0
  pgstat_report_wait_end();
518
519
  /* Close the version file. */
520
0
  CloseTransientFile(fd);
521
522
  /* If we are not in WAL replay then write the WAL. */
523
0
  if (!isRedo)
524
0
  {
525
0
    xl_dbase_create_wal_log_rec xlrec;
526
527
0
    START_CRIT_SECTION();
528
529
0
    xlrec.db_id = dbid;
530
0
    xlrec.tablespace_id = tsid;
531
532
0
    XLogBeginInsert();
533
0
    XLogRegisterData(&xlrec,
534
0
             sizeof(xl_dbase_create_wal_log_rec));
535
536
0
    (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
537
538
0
    END_CRIT_SECTION();
539
0
  }
540
0
}
541
542
/*
543
 * Create a new database using the FILE_COPY strategy.
544
 *
545
 * Copy each tablespace at the filesystem level, and log a single WAL record
546
 * for each tablespace copied.  This requires a checkpoint before and after the
547
 * copy, which may be expensive, but it does greatly reduce WAL generation
548
 * if the copied database is large.
549
 */
550
static void
551
CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
552
              Oid dst_tsid)
553
0
{
554
0
  TableScanDesc scan;
555
0
  Relation  rel;
556
0
  HeapTuple tuple;
557
558
  /*
559
   * Force a checkpoint before starting the copy. This will force all dirty
560
   * buffers, including those of unlogged tables, out to disk, to ensure
561
   * source database is up-to-date on disk for the copy.
562
   * FlushDatabaseBuffers() would suffice for that, but we also want to
563
   * process any pending unlink requests. Otherwise, if a checkpoint
564
   * happened while we're copying files, a file might be deleted just when
565
   * we're about to copy it, causing the lstat() call in copydir() to fail
566
   * with ENOENT.
567
   *
568
   * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
569
   * is careful to ensure that template0 is fully written to disk prior to
570
   * any CREATE DATABASE commands.
571
   */
572
0
  if (!IsBinaryUpgrade)
573
0
    RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
574
0
              CHECKPOINT_WAIT | CHECKPOINT_FLUSH_UNLOGGED);
575
576
  /*
577
   * Iterate through all tablespaces of the template database, and copy each
578
   * one to the new database.
579
   */
580
0
  rel = table_open(TableSpaceRelationId, AccessShareLock);
581
0
  scan = table_beginscan_catalog(rel, 0, NULL);
582
0
  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
583
0
  {
584
0
    Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
585
0
    Oid     srctablespace = spaceform->oid;
586
0
    Oid     dsttablespace;
587
0
    char     *srcpath;
588
0
    char     *dstpath;
589
0
    struct stat st;
590
591
    /* No need to copy global tablespace */
592
0
    if (srctablespace == GLOBALTABLESPACE_OID)
593
0
      continue;
594
595
0
    srcpath = GetDatabasePath(src_dboid, srctablespace);
596
597
0
    if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
598
0
      directory_is_empty(srcpath))
599
0
    {
600
      /* Assume we can ignore it */
601
0
      pfree(srcpath);
602
0
      continue;
603
0
    }
604
605
0
    if (srctablespace == src_tsid)
606
0
      dsttablespace = dst_tsid;
607
0
    else
608
0
      dsttablespace = srctablespace;
609
610
0
    dstpath = GetDatabasePath(dst_dboid, dsttablespace);
611
612
    /*
613
     * Copy this subdirectory to the new location
614
     *
615
     * We don't need to copy subdirectories
616
     */
617
0
    copydir(srcpath, dstpath, false);
618
619
    /* Record the filesystem change in XLOG */
620
0
    {
621
0
      xl_dbase_create_file_copy_rec xlrec;
622
623
0
      xlrec.db_id = dst_dboid;
624
0
      xlrec.tablespace_id = dsttablespace;
625
0
      xlrec.src_db_id = src_dboid;
626
0
      xlrec.src_tablespace_id = srctablespace;
627
628
0
      XLogBeginInsert();
629
0
      XLogRegisterData(&xlrec,
630
0
               sizeof(xl_dbase_create_file_copy_rec));
631
632
0
      (void) XLogInsert(RM_DBASE_ID,
633
0
                XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
634
0
    }
635
0
    pfree(srcpath);
636
0
    pfree(dstpath);
637
0
  }
638
0
  table_endscan(scan);
639
0
  table_close(rel, AccessShareLock);
640
641
  /*
642
   * We force a checkpoint before committing.  This effectively means that
643
   * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
644
   * replayed (at least not in ordinary crash recovery; we still have to
645
   * make the XLOG entry for the benefit of PITR operations). This avoids
646
   * two nasty scenarios:
647
   *
648
   * #1: At wal_level=minimal, we don't XLOG the contents of newly created
649
   * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
650
   * of DBASE_CREATE replay would lose such files created in the new
651
   * database between our commit and the next checkpoint.
652
   *
653
   * #2: Since we have to recopy the source database during DBASE_CREATE
654
   * replay, we run the risk of copying changes in it that were committed
655
   * after the original CREATE DATABASE command but before the system crash
656
   * that led to the replay.  This is at least unexpected and at worst could
657
   * lead to inconsistencies, eg duplicate table names.
658
   *
659
   * (Both of these were real bugs in releases 8.0 through 8.0.3.)
660
   *
661
   * In PITR replay, the first of these isn't an issue, and the second is
662
   * only a risk if the CREATE DATABASE and subsequent template database
663
   * change both occur while a base backup is being taken. There doesn't
664
   * seem to be much we can do about that except document it as a
665
   * limitation.
666
   *
667
   * In binary upgrade mode, we can skip this checkpoint because neither of
668
   * these problems applies: we don't ever replay the WAL generated during
669
   * pg_upgrade, and we don't support taking base backups during pg_upgrade
670
   * (not to mention that we don't concurrently modify template0, either).
671
   *
672
   * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
673
   * strategy that avoids these problems.
674
   */
675
0
  if (!IsBinaryUpgrade)
676
0
    RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE |
677
0
              CHECKPOINT_WAIT);
678
0
}
679
680
/*
681
 * CREATE DATABASE
682
 */
683
Oid
684
createdb(ParseState *pstate, const CreatedbStmt *stmt)
685
0
{
686
0
  Oid     src_dboid;
687
0
  Oid     src_owner;
688
0
  int     src_encoding = -1;
689
0
  char     *src_collate = NULL;
690
0
  char     *src_ctype = NULL;
691
0
  char     *src_locale = NULL;
692
0
  char     *src_icurules = NULL;
693
0
  char    src_locprovider = '\0';
694
0
  char     *src_collversion = NULL;
695
0
  bool    src_istemplate;
696
0
  bool    src_hasloginevt = false;
697
0
  bool    src_allowconn;
698
0
  TransactionId src_frozenxid = InvalidTransactionId;
699
0
  MultiXactId src_minmxid = InvalidMultiXactId;
700
0
  Oid     src_deftablespace;
701
0
  volatile Oid dst_deftablespace;
702
0
  Relation  pg_database_rel;
703
0
  HeapTuple tuple;
704
0
  Datum   new_record[Natts_pg_database] = {0};
705
0
  bool    new_record_nulls[Natts_pg_database] = {0};
706
0
  Oid     dboid = InvalidOid;
707
0
  Oid     datdba;
708
0
  ListCell   *option;
709
0
  DefElem    *tablespacenameEl = NULL;
710
0
  DefElem    *ownerEl = NULL;
711
0
  DefElem    *templateEl = NULL;
712
0
  DefElem    *encodingEl = NULL;
713
0
  DefElem    *localeEl = NULL;
714
0
  DefElem    *builtinlocaleEl = NULL;
715
0
  DefElem    *collateEl = NULL;
716
0
  DefElem    *ctypeEl = NULL;
717
0
  DefElem    *iculocaleEl = NULL;
718
0
  DefElem    *icurulesEl = NULL;
719
0
  DefElem    *locproviderEl = NULL;
720
0
  DefElem    *istemplateEl = NULL;
721
0
  DefElem    *allowconnectionsEl = NULL;
722
0
  DefElem    *connlimitEl = NULL;
723
0
  DefElem    *collversionEl = NULL;
724
0
  DefElem    *strategyEl = NULL;
725
0
  char     *dbname = stmt->dbname;
726
0
  char     *dbowner = NULL;
727
0
  const char *dbtemplate = NULL;
728
0
  char     *dbcollate = NULL;
729
0
  char     *dbctype = NULL;
730
0
  const char *dblocale = NULL;
731
0
  char     *dbicurules = NULL;
732
0
  char    dblocprovider = '\0';
733
0
  char     *canonname;
734
0
  int     encoding = -1;
735
0
  bool    dbistemplate = false;
736
0
  bool    dballowconnections = true;
737
0
  int     dbconnlimit = DATCONNLIMIT_UNLIMITED;
738
0
  char     *dbcollversion = NULL;
739
0
  int     notherbackends;
740
0
  int     npreparedxacts;
741
0
  CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
742
0
  createdb_failure_params fparms;
743
744
  /* Extract options from the statement node tree */
745
0
  foreach(option, stmt->options)
746
0
  {
747
0
    DefElem    *defel = (DefElem *) lfirst(option);
748
749
0
    if (strcmp(defel->defname, "tablespace") == 0)
750
0
    {
751
0
      if (tablespacenameEl)
752
0
        errorConflictingDefElem(defel, pstate);
753
0
      tablespacenameEl = defel;
754
0
    }
755
0
    else if (strcmp(defel->defname, "owner") == 0)
756
0
    {
757
0
      if (ownerEl)
758
0
        errorConflictingDefElem(defel, pstate);
759
0
      ownerEl = defel;
760
0
    }
761
0
    else if (strcmp(defel->defname, "template") == 0)
762
0
    {
763
0
      if (templateEl)
764
0
        errorConflictingDefElem(defel, pstate);
765
0
      templateEl = defel;
766
0
    }
767
0
    else if (strcmp(defel->defname, "encoding") == 0)
768
0
    {
769
0
      if (encodingEl)
770
0
        errorConflictingDefElem(defel, pstate);
771
0
      encodingEl = defel;
772
0
    }
773
0
    else if (strcmp(defel->defname, "locale") == 0)
774
0
    {
775
0
      if (localeEl)
776
0
        errorConflictingDefElem(defel, pstate);
777
0
      localeEl = defel;
778
0
    }
779
0
    else if (strcmp(defel->defname, "builtin_locale") == 0)
780
0
    {
781
0
      if (builtinlocaleEl)
782
0
        errorConflictingDefElem(defel, pstate);
783
0
      builtinlocaleEl = defel;
784
0
    }
785
0
    else if (strcmp(defel->defname, "lc_collate") == 0)
786
0
    {
787
0
      if (collateEl)
788
0
        errorConflictingDefElem(defel, pstate);
789
0
      collateEl = defel;
790
0
    }
791
0
    else if (strcmp(defel->defname, "lc_ctype") == 0)
792
0
    {
793
0
      if (ctypeEl)
794
0
        errorConflictingDefElem(defel, pstate);
795
0
      ctypeEl = defel;
796
0
    }
797
0
    else if (strcmp(defel->defname, "icu_locale") == 0)
798
0
    {
799
0
      if (iculocaleEl)
800
0
        errorConflictingDefElem(defel, pstate);
801
0
      iculocaleEl = defel;
802
0
    }
803
0
    else if (strcmp(defel->defname, "icu_rules") == 0)
804
0
    {
805
0
      if (icurulesEl)
806
0
        errorConflictingDefElem(defel, pstate);
807
0
      icurulesEl = defel;
808
0
    }
809
0
    else if (strcmp(defel->defname, "locale_provider") == 0)
810
0
    {
811
0
      if (locproviderEl)
812
0
        errorConflictingDefElem(defel, pstate);
813
0
      locproviderEl = defel;
814
0
    }
815
0
    else if (strcmp(defel->defname, "is_template") == 0)
816
0
    {
817
0
      if (istemplateEl)
818
0
        errorConflictingDefElem(defel, pstate);
819
0
      istemplateEl = defel;
820
0
    }
821
0
    else if (strcmp(defel->defname, "allow_connections") == 0)
822
0
    {
823
0
      if (allowconnectionsEl)
824
0
        errorConflictingDefElem(defel, pstate);
825
0
      allowconnectionsEl = defel;
826
0
    }
827
0
    else if (strcmp(defel->defname, "connection_limit") == 0)
828
0
    {
829
0
      if (connlimitEl)
830
0
        errorConflictingDefElem(defel, pstate);
831
0
      connlimitEl = defel;
832
0
    }
833
0
    else if (strcmp(defel->defname, "collation_version") == 0)
834
0
    {
835
0
      if (collversionEl)
836
0
        errorConflictingDefElem(defel, pstate);
837
0
      collversionEl = defel;
838
0
    }
839
0
    else if (strcmp(defel->defname, "location") == 0)
840
0
    {
841
0
      ereport(WARNING,
842
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
843
0
           errmsg("LOCATION is not supported anymore"),
844
0
           errhint("Consider using tablespaces instead."),
845
0
           parser_errposition(pstate, defel->location)));
846
0
    }
847
0
    else if (strcmp(defel->defname, "oid") == 0)
848
0
    {
849
0
      dboid = defGetObjectId(defel);
850
851
      /*
852
       * We don't normally permit new databases to be created with
853
       * system-assigned OIDs. pg_upgrade tries to preserve database
854
       * OIDs, so we can't allow any database to be created with an OID
855
       * that might be in use in a freshly-initialized cluster created
856
       * by some future version. We assume all such OIDs will be from
857
       * the system-managed OID range.
858
       *
859
       * As an exception, however, we permit any OID to be assigned when
860
       * allow_system_table_mods=on (so that initdb can assign system
861
       * OIDs to template0 and postgres) or when performing a binary
862
       * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
863
       * in the source cluster).
864
       */
865
0
      if (dboid < FirstNormalObjectId &&
866
0
        !allowSystemTableMods && !IsBinaryUpgrade)
867
0
        ereport(ERROR,
868
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
869
0
            errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
870
0
    }
871
0
    else if (strcmp(defel->defname, "strategy") == 0)
872
0
    {
873
0
      if (strategyEl)
874
0
        errorConflictingDefElem(defel, pstate);
875
0
      strategyEl = defel;
876
0
    }
877
0
    else
878
0
      ereport(ERROR,
879
0
          (errcode(ERRCODE_SYNTAX_ERROR),
880
0
           errmsg("option \"%s\" not recognized", defel->defname),
881
0
           parser_errposition(pstate, defel->location)));
882
0
  }
883
884
0
  if (ownerEl && ownerEl->arg)
885
0
    dbowner = defGetString(ownerEl);
886
0
  if (templateEl && templateEl->arg)
887
0
    dbtemplate = defGetString(templateEl);
888
0
  if (encodingEl && encodingEl->arg)
889
0
  {
890
0
    const char *encoding_name;
891
892
0
    if (IsA(encodingEl->arg, Integer))
893
0
    {
894
0
      encoding = defGetInt32(encodingEl);
895
0
      encoding_name = pg_encoding_to_char(encoding);
896
0
      if (strcmp(encoding_name, "") == 0 ||
897
0
        pg_valid_server_encoding(encoding_name) < 0)
898
0
        ereport(ERROR,
899
0
            (errcode(ERRCODE_UNDEFINED_OBJECT),
900
0
             errmsg("%d is not a valid encoding code",
901
0
                encoding),
902
0
             parser_errposition(pstate, encodingEl->location)));
903
0
    }
904
0
    else
905
0
    {
906
0
      encoding_name = defGetString(encodingEl);
907
0
      encoding = pg_valid_server_encoding(encoding_name);
908
0
      if (encoding < 0)
909
0
        ereport(ERROR,
910
0
            (errcode(ERRCODE_UNDEFINED_OBJECT),
911
0
             errmsg("%s is not a valid encoding name",
912
0
                encoding_name),
913
0
             parser_errposition(pstate, encodingEl->location)));
914
0
    }
915
0
  }
916
0
  if (localeEl && localeEl->arg)
917
0
  {
918
0
    dbcollate = defGetString(localeEl);
919
0
    dbctype = defGetString(localeEl);
920
0
    dblocale = defGetString(localeEl);
921
0
  }
922
0
  if (builtinlocaleEl && builtinlocaleEl->arg)
923
0
    dblocale = defGetString(builtinlocaleEl);
924
0
  if (collateEl && collateEl->arg)
925
0
    dbcollate = defGetString(collateEl);
926
0
  if (ctypeEl && ctypeEl->arg)
927
0
    dbctype = defGetString(ctypeEl);
928
0
  if (iculocaleEl && iculocaleEl->arg)
929
0
    dblocale = defGetString(iculocaleEl);
930
0
  if (icurulesEl && icurulesEl->arg)
931
0
    dbicurules = defGetString(icurulesEl);
932
0
  if (locproviderEl && locproviderEl->arg)
933
0
  {
934
0
    char     *locproviderstr = defGetString(locproviderEl);
935
936
0
    if (pg_strcasecmp(locproviderstr, "builtin") == 0)
937
0
      dblocprovider = COLLPROVIDER_BUILTIN;
938
0
    else if (pg_strcasecmp(locproviderstr, "icu") == 0)
939
0
      dblocprovider = COLLPROVIDER_ICU;
940
0
    else if (pg_strcasecmp(locproviderstr, "libc") == 0)
941
0
      dblocprovider = COLLPROVIDER_LIBC;
942
0
    else
943
0
      ereport(ERROR,
944
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
945
0
           errmsg("unrecognized locale provider: %s",
946
0
              locproviderstr)));
947
0
  }
948
0
  if (istemplateEl && istemplateEl->arg)
949
0
    dbistemplate = defGetBoolean(istemplateEl);
950
0
  if (allowconnectionsEl && allowconnectionsEl->arg)
951
0
    dballowconnections = defGetBoolean(allowconnectionsEl);
952
0
  if (connlimitEl && connlimitEl->arg)
953
0
  {
954
0
    dbconnlimit = defGetInt32(connlimitEl);
955
0
    if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
956
0
      ereport(ERROR,
957
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958
0
           errmsg("invalid connection limit: %d", dbconnlimit)));
959
0
  }
960
0
  if (collversionEl)
961
0
    dbcollversion = defGetString(collversionEl);
962
963
  /* obtain OID of proposed owner */
964
0
  if (dbowner)
965
0
    datdba = get_role_oid(dbowner, false);
966
0
  else
967
0
    datdba = GetUserId();
968
969
  /*
970
   * To create a database, must have createdb privilege and must be able to
971
   * become the target role (this does not imply that the target role itself
972
   * must have createdb privilege).  The latter provision guards against
973
   * "giveaway" attacks.  Note that a superuser will always have both of
974
   * these privileges a fortiori.
975
   */
976
0
  if (!have_createdb_privilege())
977
0
    ereport(ERROR,
978
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
979
0
         errmsg("permission denied to create database")));
980
981
0
  check_can_set_role(GetUserId(), datdba);
982
983
  /*
984
   * Lookup database (template) to be cloned, and obtain share lock on it.
985
   * ShareLock allows two CREATE DATABASEs to work from the same template
986
   * concurrently, while ensuring no one is busy dropping it in parallel
987
   * (which would be Very Bad since we'd likely get an incomplete copy
988
   * without knowing it).  This also prevents any new connections from being
989
   * made to the source until we finish copying it, so we can be sure it
990
   * won't change underneath us.
991
   */
992
0
  if (!dbtemplate)
993
0
    dbtemplate = "template1"; /* Default template database name */
994
995
0
  if (!get_db_info(dbtemplate, ShareLock,
996
0
           &src_dboid, &src_owner, &src_encoding,
997
0
           &src_istemplate, &src_allowconn, &src_hasloginevt,
998
0
           &src_frozenxid, &src_minmxid, &src_deftablespace,
999
0
           &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
1000
0
           &src_collversion))
1001
0
    ereport(ERROR,
1002
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1003
0
         errmsg("template database \"%s\" does not exist",
1004
0
            dbtemplate)));
1005
1006
  /*
1007
   * If the source database was in the process of being dropped, we can't
1008
   * use it as a template.
1009
   */
1010
0
  if (database_is_invalid_oid(src_dboid))
1011
0
    ereport(ERROR,
1012
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1013
0
        errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1014
0
        errhint("Use DROP DATABASE to drop invalid databases."));
1015
1016
  /*
1017
   * Permission check: to copy a DB that's not marked datistemplate, you
1018
   * must be superuser or the owner thereof.
1019
   */
1020
0
  if (!src_istemplate)
1021
0
  {
1022
0
    if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1023
0
      ereport(ERROR,
1024
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1025
0
           errmsg("permission denied to copy database \"%s\"",
1026
0
              dbtemplate)));
1027
0
  }
1028
1029
  /* Validate the database creation strategy. */
1030
0
  if (strategyEl && strategyEl->arg)
1031
0
  {
1032
0
    char     *strategy;
1033
1034
0
    strategy = defGetString(strategyEl);
1035
0
    if (pg_strcasecmp(strategy, "wal_log") == 0)
1036
0
      dbstrategy = CREATEDB_WAL_LOG;
1037
0
    else if (pg_strcasecmp(strategy, "file_copy") == 0)
1038
0
      dbstrategy = CREATEDB_FILE_COPY;
1039
0
    else
1040
0
      ereport(ERROR,
1041
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1042
0
           errmsg("invalid create database strategy \"%s\"", strategy),
1043
0
           errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1044
0
  }
1045
1046
  /* If encoding or locales are defaulted, use source's setting */
1047
0
  if (encoding < 0)
1048
0
    encoding = src_encoding;
1049
0
  if (dbcollate == NULL)
1050
0
    dbcollate = src_collate;
1051
0
  if (dbctype == NULL)
1052
0
    dbctype = src_ctype;
1053
0
  if (dblocprovider == '\0')
1054
0
    dblocprovider = src_locprovider;
1055
0
  if (dblocale == NULL && dblocprovider == src_locprovider)
1056
0
    dblocale = src_locale;
1057
0
  if (dbicurules == NULL)
1058
0
    dbicurules = src_icurules;
1059
1060
  /* Some encodings are client only */
1061
0
  if (!PG_VALID_BE_ENCODING(encoding))
1062
0
    ereport(ERROR,
1063
0
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1064
0
         errmsg("invalid server encoding %d", encoding)));
1065
1066
  /* Check that the chosen locales are valid, and get canonical spellings */
1067
0
  if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1068
0
  {
1069
0
    if (dblocprovider == COLLPROVIDER_BUILTIN)
1070
0
      ereport(ERROR,
1071
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1072
0
           errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1073
0
           errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1074
0
    else if (dblocprovider == COLLPROVIDER_ICU)
1075
0
      ereport(ERROR,
1076
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1077
0
           errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1078
0
           errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1079
0
    else
1080
0
      ereport(ERROR,
1081
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1082
0
           errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate)));
1083
0
  }
1084
0
  dbcollate = canonname;
1085
0
  if (!check_locale(LC_CTYPE, dbctype, &canonname))
1086
0
  {
1087
0
    if (dblocprovider == COLLPROVIDER_BUILTIN)
1088
0
      ereport(ERROR,
1089
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1090
0
           errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1091
0
           errhint("If the locale name is specific to the builtin provider, use BUILTIN_LOCALE.")));
1092
0
    else if (dblocprovider == COLLPROVIDER_ICU)
1093
0
      ereport(ERROR,
1094
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1095
0
           errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1096
0
           errhint("If the locale name is specific to the ICU provider, use ICU_LOCALE.")));
1097
0
    else
1098
0
      ereport(ERROR,
1099
0
          (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1100
0
           errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype)));
1101
0
  }
1102
1103
0
  dbctype = canonname;
1104
1105
0
  check_encoding_locale_matches(encoding, dbcollate, dbctype);
1106
1107
  /* validate provider-specific parameters */
1108
0
  if (dblocprovider != COLLPROVIDER_BUILTIN)
1109
0
  {
1110
0
    if (builtinlocaleEl)
1111
0
      ereport(ERROR,
1112
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1113
0
           errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1114
0
  }
1115
1116
0
  if (dblocprovider != COLLPROVIDER_ICU)
1117
0
  {
1118
0
    if (iculocaleEl)
1119
0
      ereport(ERROR,
1120
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1121
0
           errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1122
1123
0
    if (dbicurules)
1124
0
      ereport(ERROR,
1125
0
          (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1126
0
           errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1127
0
  }
1128
1129
  /* validate and canonicalize locale for the provider */
1130
0
  if (dblocprovider == COLLPROVIDER_BUILTIN)
1131
0
  {
1132
    /*
1133
     * This would happen if template0 uses the libc provider but the new
1134
     * database uses builtin.
1135
     */
1136
0
    if (!dblocale)
1137
0
      ereport(ERROR,
1138
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1139
0
           errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1140
1141
0
    dblocale = builtin_validate_locale(encoding, dblocale);
1142
0
  }
1143
0
  else if (dblocprovider == COLLPROVIDER_ICU)
1144
0
  {
1145
0
    if (!(is_encoding_supported_by_icu(encoding)))
1146
0
      ereport(ERROR,
1147
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1148
0
           errmsg("encoding \"%s\" is not supported with ICU provider",
1149
0
              pg_encoding_to_char(encoding))));
1150
1151
    /*
1152
     * This would happen if template0 uses the libc provider but the new
1153
     * database uses icu.
1154
     */
1155
0
    if (!dblocale)
1156
0
      ereport(ERROR,
1157
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1158
0
           errmsg("LOCALE or ICU_LOCALE must be specified")));
1159
1160
    /*
1161
     * During binary upgrade, or when the locale came from the template
1162
     * database, preserve locale string. Otherwise, canonicalize to a
1163
     * language tag.
1164
     */
1165
0
    if (!IsBinaryUpgrade && dblocale != src_locale)
1166
0
    {
1167
0
      char     *langtag = icu_language_tag(dblocale,
1168
0
                           icu_validation_level);
1169
1170
0
      if (langtag && strcmp(dblocale, langtag) != 0)
1171
0
      {
1172
0
        ereport(NOTICE,
1173
0
            (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1174
0
                langtag, dblocale)));
1175
1176
0
        dblocale = langtag;
1177
0
      }
1178
0
    }
1179
1180
0
    icu_validate_locale(dblocale);
1181
0
  }
1182
1183
  /* for libc, locale comes from datcollate and datctype */
1184
0
  if (dblocprovider == COLLPROVIDER_LIBC)
1185
0
    dblocale = NULL;
1186
1187
  /*
1188
   * Check that the new encoding and locale settings match the source
1189
   * database.  We insist on this because we simply copy the source data ---
1190
   * any non-ASCII data would be wrongly encoded, and any indexes sorted
1191
   * according to the source locale would be wrong.
1192
   *
1193
   * However, we assume that template0 doesn't contain any non-ASCII data
1194
   * nor any indexes that depend on collation or ctype, so template0 can be
1195
   * used as template for creating a database with any encoding or locale.
1196
   */
1197
0
  if (strcmp(dbtemplate, "template0") != 0)
1198
0
  {
1199
0
    if (encoding != src_encoding)
1200
0
      ereport(ERROR,
1201
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1202
0
           errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1203
0
              pg_encoding_to_char(encoding),
1204
0
              pg_encoding_to_char(src_encoding)),
1205
0
           errhint("Use the same encoding as in the template database, or use template0 as template.")));
1206
1207
0
    if (strcmp(dbcollate, src_collate) != 0)
1208
0
      ereport(ERROR,
1209
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1210
0
           errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1211
0
              dbcollate, src_collate),
1212
0
           errhint("Use the same collation as in the template database, or use template0 as template.")));
1213
1214
0
    if (strcmp(dbctype, src_ctype) != 0)
1215
0
      ereport(ERROR,
1216
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1217
0
           errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1218
0
              dbctype, src_ctype),
1219
0
           errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1220
1221
0
    if (dblocprovider != src_locprovider)
1222
0
      ereport(ERROR,
1223
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1224
0
           errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1225
0
              collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1226
0
           errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1227
1228
0
    if (dblocprovider == COLLPROVIDER_ICU)
1229
0
    {
1230
0
      char     *val1;
1231
0
      char     *val2;
1232
1233
0
      Assert(dblocale);
1234
0
      Assert(src_locale);
1235
0
      if (strcmp(dblocale, src_locale) != 0)
1236
0
        ereport(ERROR,
1237
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1238
0
             errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1239
0
                dblocale, src_locale),
1240
0
             errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1241
1242
0
      val1 = dbicurules;
1243
0
      if (!val1)
1244
0
        val1 = "";
1245
0
      val2 = src_icurules;
1246
0
      if (!val2)
1247
0
        val2 = "";
1248
0
      if (strcmp(val1, val2) != 0)
1249
0
        ereport(ERROR,
1250
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1251
0
             errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1252
0
                val1, val2),
1253
0
             errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1254
0
    }
1255
0
  }
1256
1257
  /*
1258
   * If we got a collation version for the template database, check that it
1259
   * matches the actual OS collation version.  Otherwise error; the user
1260
   * needs to fix the template database first.  Don't complain if a
1261
   * collation version was specified explicitly as a statement option; that
1262
   * is used by pg_upgrade to reproduce the old state exactly.
1263
   *
1264
   * (If the template database has no collation version, then either the
1265
   * platform/provider does not support collation versioning, or it's
1266
   * template0, for which we stipulate that it does not contain
1267
   * collation-using objects.)
1268
   */
1269
0
  if (src_collversion && !collversionEl)
1270
0
  {
1271
0
    char     *actual_versionstr;
1272
0
    const char *locale;
1273
1274
0
    if (dblocprovider == COLLPROVIDER_LIBC)
1275
0
      locale = dbcollate;
1276
0
    else
1277
0
      locale = dblocale;
1278
1279
0
    actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1280
0
    if (!actual_versionstr)
1281
0
      ereport(ERROR,
1282
0
          (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1283
0
              dbtemplate)));
1284
1285
0
    if (strcmp(actual_versionstr, src_collversion) != 0)
1286
0
      ereport(ERROR,
1287
0
          (errmsg("template database \"%s\" has a collation version mismatch",
1288
0
              dbtemplate),
1289
0
           errdetail("The template database was created using collation version %s, "
1290
0
                 "but the operating system provides version %s.",
1291
0
                 src_collversion, actual_versionstr),
1292
0
           errhint("Rebuild all objects in the template database that use the default collation and run "
1293
0
               "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1294
0
               "or build PostgreSQL with the right library version.",
1295
0
               quote_identifier(dbtemplate))));
1296
0
  }
1297
1298
0
  if (dbcollversion == NULL)
1299
0
    dbcollversion = src_collversion;
1300
1301
  /*
1302
   * Normally, we copy the collation version from the template database.
1303
   * This last resort only applies if the template database does not have a
1304
   * collation version, which is normally only the case for template0.
1305
   */
1306
0
  if (dbcollversion == NULL)
1307
0
  {
1308
0
    const char *locale;
1309
1310
0
    if (dblocprovider == COLLPROVIDER_LIBC)
1311
0
      locale = dbcollate;
1312
0
    else
1313
0
      locale = dblocale;
1314
1315
0
    dbcollversion = get_collation_actual_version(dblocprovider, locale);
1316
0
  }
1317
1318
  /* Resolve default tablespace for new database */
1319
0
  if (tablespacenameEl && tablespacenameEl->arg)
1320
0
  {
1321
0
    char     *tablespacename;
1322
0
    AclResult aclresult;
1323
1324
0
    tablespacename = defGetString(tablespacenameEl);
1325
0
    dst_deftablespace = get_tablespace_oid(tablespacename, false);
1326
    /* check permissions */
1327
0
    aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1328
0
                  ACL_CREATE);
1329
0
    if (aclresult != ACLCHECK_OK)
1330
0
      aclcheck_error(aclresult, OBJECT_TABLESPACE,
1331
0
               tablespacename);
1332
1333
    /* pg_global must never be the default tablespace */
1334
0
    if (dst_deftablespace == GLOBALTABLESPACE_OID)
1335
0
      ereport(ERROR,
1336
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337
0
           errmsg("pg_global cannot be used as default tablespace")));
1338
1339
    /*
1340
     * If we are trying to change the default tablespace of the template,
1341
     * we require that the template not have any files in the new default
1342
     * tablespace.  This is necessary because otherwise the copied
1343
     * database would contain pg_class rows that refer to its default
1344
     * tablespace both explicitly (by OID) and implicitly (as zero), which
1345
     * would cause problems.  For example another CREATE DATABASE using
1346
     * the copied database as template, and trying to change its default
1347
     * tablespace again, would yield outright incorrect results (it would
1348
     * improperly move tables to the new default tablespace that should
1349
     * stay in the same tablespace).
1350
     */
1351
0
    if (dst_deftablespace != src_deftablespace)
1352
0
    {
1353
0
      char     *srcpath;
1354
0
      struct stat st;
1355
1356
0
      srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1357
1358
0
      if (stat(srcpath, &st) == 0 &&
1359
0
        S_ISDIR(st.st_mode) &&
1360
0
        !directory_is_empty(srcpath))
1361
0
        ereport(ERROR,
1362
0
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1363
0
             errmsg("cannot assign new default tablespace \"%s\"",
1364
0
                tablespacename),
1365
0
             errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1366
0
                   dbtemplate)));
1367
0
      pfree(srcpath);
1368
0
    }
1369
0
  }
1370
0
  else
1371
0
  {
1372
    /* Use template database's default tablespace */
1373
0
    dst_deftablespace = src_deftablespace;
1374
    /* Note there is no additional permission check in this path */
1375
0
  }
1376
1377
  /*
1378
   * If built with appropriate switch, whine when regression-testing
1379
   * conventions for database names are violated.  But don't complain during
1380
   * initdb.
1381
   */
1382
#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1383
  if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1384
    elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1385
#endif
1386
1387
  /*
1388
   * Check for db name conflict.  This is just to give a more friendly error
1389
   * message than "unique index violation".  There's a race condition but
1390
   * we're willing to accept the less friendly message in that case.
1391
   */
1392
0
  if (OidIsValid(get_database_oid(dbname, true)))
1393
0
    ereport(ERROR,
1394
0
        (errcode(ERRCODE_DUPLICATE_DATABASE),
1395
0
         errmsg("database \"%s\" already exists", dbname)));
1396
1397
  /*
1398
   * The source DB can't have any active backends, except this one
1399
   * (exception is to allow CREATE DB while connected to template1).
1400
   * Otherwise we might copy inconsistent data.
1401
   *
1402
   * This should be last among the basic error checks, because it involves
1403
   * potential waiting; we may as well throw an error first if we're gonna
1404
   * throw one.
1405
   */
1406
0
  if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
1407
0
    ereport(ERROR,
1408
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1409
0
         errmsg("source database \"%s\" is being accessed by other users",
1410
0
            dbtemplate),
1411
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
1412
1413
  /*
1414
   * Select an OID for the new database, checking that it doesn't have a
1415
   * filename conflict with anything already existing in the tablespace
1416
   * directories.
1417
   */
1418
0
  pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1419
1420
  /*
1421
   * If database OID is configured, check if the OID is already in use or
1422
   * data directory already exists.
1423
   */
1424
0
  if (OidIsValid(dboid))
1425
0
  {
1426
0
    char     *existing_dbname = get_database_name(dboid);
1427
1428
0
    if (existing_dbname != NULL)
1429
0
      ereport(ERROR,
1430
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1431
0
          errmsg("database OID %u is already in use by database \"%s\"",
1432
0
               dboid, existing_dbname));
1433
1434
0
    if (check_db_file_conflict(dboid))
1435
0
      ereport(ERROR,
1436
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1437
0
          errmsg("data directory with the specified OID %u already exists", dboid));
1438
0
  }
1439
0
  else
1440
0
  {
1441
    /* Select an OID for the new database if is not explicitly configured. */
1442
0
    do
1443
0
    {
1444
0
      dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1445
0
                     Anum_pg_database_oid);
1446
0
    } while (check_db_file_conflict(dboid));
1447
0
  }
1448
1449
  /*
1450
   * Insert a new tuple into pg_database.  This establishes our ownership of
1451
   * the new database name (anyone else trying to insert the same name will
1452
   * block on the unique index, and fail after we commit).
1453
   */
1454
1455
0
  Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1456
0
       (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1457
1458
  /* Form tuple */
1459
0
  new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1460
0
  new_record[Anum_pg_database_datname - 1] =
1461
0
    DirectFunctionCall1(namein, CStringGetDatum(dbname));
1462
0
  new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1463
0
  new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1464
0
  new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1465
0
  new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1466
0
  new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1467
0
  new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1468
0
  new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1469
0
  new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1470
0
  new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1471
0
  new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1472
0
  new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1473
0
  new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1474
0
  if (dblocale)
1475
0
    new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1476
0
  else
1477
0
    new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1478
0
  if (dbicurules)
1479
0
    new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1480
0
  else
1481
0
    new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1482
0
  if (dbcollversion)
1483
0
    new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1484
0
  else
1485
0
    new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1486
1487
  /*
1488
   * We deliberately set datacl to default (NULL), rather than copying it
1489
   * from the template database.  Copying it would be a bad idea when the
1490
   * owner is not the same as the template's owner.
1491
   */
1492
0
  new_record_nulls[Anum_pg_database_datacl - 1] = true;
1493
1494
0
  tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1495
0
              new_record, new_record_nulls);
1496
1497
0
  CatalogTupleInsert(pg_database_rel, tuple);
1498
1499
  /*
1500
   * Now generate additional catalog entries associated with the new DB
1501
   */
1502
1503
  /* Register owner dependency */
1504
0
  recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1505
1506
  /* Create pg_shdepend entries for objects within database */
1507
0
  copyTemplateDependencies(src_dboid, dboid);
1508
1509
  /* Post creation hook for new database */
1510
0
  InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1511
1512
  /*
1513
   * If we're going to be reading data for the to-be-created database into
1514
   * shared_buffers, take a lock on it. Nobody should know that this
1515
   * database exists yet, but it's good to maintain the invariant that an
1516
   * AccessExclusiveLock on the database is sufficient to drop all of its
1517
   * buffers without worrying about more being read later.
1518
   *
1519
   * Note that we need to do this before entering the
1520
   * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1521
   * expects this lock to be held already.
1522
   */
1523
0
  if (dbstrategy == CREATEDB_WAL_LOG)
1524
0
    LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1525
1526
  /*
1527
   * Once we start copying subdirectories, we need to be able to clean 'em
1528
   * up if we fail.  Use an ENSURE block to make sure this happens.  (This
1529
   * is not a 100% solution, because of the possibility of failure during
1530
   * transaction commit after we leave this routine, but it should handle
1531
   * most scenarios.)
1532
   */
1533
0
  fparms.src_dboid = src_dboid;
1534
0
  fparms.dest_dboid = dboid;
1535
0
  fparms.strategy = dbstrategy;
1536
1537
0
  PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1538
0
              PointerGetDatum(&fparms));
1539
0
  {
1540
    /*
1541
     * If the user has asked to create a database with WAL_LOG strategy
1542
     * then call CreateDatabaseUsingWalLog, which will copy the database
1543
     * at the block level and it will WAL log each copied block.
1544
     * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1545
     * database file by file.
1546
     */
1547
0
    if (dbstrategy == CREATEDB_WAL_LOG)
1548
0
      CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1549
0
                    dst_deftablespace);
1550
0
    else
1551
0
      CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1552
0
                    dst_deftablespace);
1553
1554
    /*
1555
     * Close pg_database, but keep lock till commit.
1556
     */
1557
0
    table_close(pg_database_rel, NoLock);
1558
1559
    /*
1560
     * Force synchronous commit, thus minimizing the window between
1561
     * creation of the database files and committal of the transaction. If
1562
     * we crash before committing, we'll have a DB that's taking up disk
1563
     * space but is not in pg_database, which is not good.
1564
     */
1565
0
    ForceSyncCommit();
1566
0
  }
1567
0
  PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1568
0
                PointerGetDatum(&fparms));
1569
1570
0
  return dboid;
1571
0
}
1572
1573
/*
1574
 * Check whether chosen encoding matches chosen locale settings.  This
1575
 * restriction is necessary because libc's locale-specific code usually
1576
 * fails when presented with data in an encoding it's not expecting. We
1577
 * allow mismatch in four cases:
1578
 *
1579
 * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1580
 * which works with any encoding.
1581
 *
1582
 * 2. locale encoding = -1, which means that we couldn't determine the
1583
 * locale's encoding and have to trust the user to get it right.
1584
 *
1585
 * 3. selected encoding is UTF8 and platform is win32. This is because
1586
 * UTF8 is a pseudo codepage that is supported in all locales since it's
1587
 * converted to UTF16 before being used.
1588
 *
1589
 * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1590
 * is risky but we have historically allowed it --- notably, the
1591
 * regression tests require it.
1592
 *
1593
 * Note: if you change this policy, fix initdb to match.
1594
 */
1595
void
1596
check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1597
0
{
1598
0
  int     ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1599
0
  int     collate_encoding = pg_get_encoding_from_locale(collate, true);
1600
1601
0
  if (!(ctype_encoding == encoding ||
1602
0
      ctype_encoding == PG_SQL_ASCII ||
1603
0
      ctype_encoding == -1 ||
1604
#ifdef WIN32
1605
      encoding == PG_UTF8 ||
1606
#endif
1607
0
      (encoding == PG_SQL_ASCII && superuser())))
1608
0
    ereport(ERROR,
1609
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1610
0
         errmsg("encoding \"%s\" does not match locale \"%s\"",
1611
0
            pg_encoding_to_char(encoding),
1612
0
            ctype),
1613
0
         errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1614
0
               pg_encoding_to_char(ctype_encoding))));
1615
1616
0
  if (!(collate_encoding == encoding ||
1617
0
      collate_encoding == PG_SQL_ASCII ||
1618
0
      collate_encoding == -1 ||
1619
#ifdef WIN32
1620
      encoding == PG_UTF8 ||
1621
#endif
1622
0
      (encoding == PG_SQL_ASCII && superuser())))
1623
0
    ereport(ERROR,
1624
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1625
0
         errmsg("encoding \"%s\" does not match locale \"%s\"",
1626
0
            pg_encoding_to_char(encoding),
1627
0
            collate),
1628
0
         errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1629
0
               pg_encoding_to_char(collate_encoding))));
1630
0
}
1631
1632
/* Error cleanup callback for createdb */
1633
static void
1634
createdb_failure_callback(int code, Datum arg)
1635
0
{
1636
0
  createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1637
1638
  /*
1639
   * If we were copying database at block levels then drop pages for the
1640
   * destination database that are in the shared buffer cache.  And tell
1641
   * checkpointer to forget any pending fsync and unlink requests for files
1642
   * in the database.  The reasoning behind doing this is same as explained
1643
   * in dropdb function.  But unlike dropdb we don't need to call
1644
   * pgstat_drop_database because this database is still not created so
1645
   * there should not be any stat for this.
1646
   */
1647
0
  if (fparms->strategy == CREATEDB_WAL_LOG)
1648
0
  {
1649
0
    DropDatabaseBuffers(fparms->dest_dboid);
1650
0
    ForgetDatabaseSyncRequests(fparms->dest_dboid);
1651
1652
    /* Release lock on the target database. */
1653
0
    UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1654
0
               AccessShareLock);
1655
0
  }
1656
1657
  /*
1658
   * Release lock on source database before doing recursive remove. This is
1659
   * not essential but it seems desirable to release the lock as soon as
1660
   * possible.
1661
   */
1662
0
  UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1663
1664
  /* Throw away any successfully copied subdirectories */
1665
0
  remove_dbtablespaces(fparms->dest_dboid);
1666
0
}
1667
1668
1669
/*
1670
 * DROP DATABASE
1671
 */
1672
void
1673
dropdb(const char *dbname, bool missing_ok, bool force)
1674
0
{
1675
0
  Oid     db_id;
1676
0
  bool    db_istemplate;
1677
0
  Relation  pgdbrel;
1678
0
  HeapTuple tup;
1679
0
  ScanKeyData scankey;
1680
0
  void     *inplace_state;
1681
0
  Form_pg_database datform;
1682
0
  int     notherbackends;
1683
0
  int     npreparedxacts;
1684
0
  int     nslots,
1685
0
        nslots_active;
1686
0
  int     nsubscriptions;
1687
1688
  /*
1689
   * Look up the target database's OID, and get exclusive lock on it. We
1690
   * need this to ensure that no new backend starts up in the target
1691
   * database while we are deleting it (see postinit.c), and that no one is
1692
   * using it as a CREATE DATABASE template or trying to delete it for
1693
   * themselves.
1694
   */
1695
0
  pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1696
1697
0
  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1698
0
           &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1699
0
  {
1700
0
    if (!missing_ok)
1701
0
    {
1702
0
      ereport(ERROR,
1703
0
          (errcode(ERRCODE_UNDEFINED_DATABASE),
1704
0
           errmsg("database \"%s\" does not exist", dbname)));
1705
0
    }
1706
0
    else
1707
0
    {
1708
      /* Close pg_database, release the lock, since we changed nothing */
1709
0
      table_close(pgdbrel, RowExclusiveLock);
1710
0
      ereport(NOTICE,
1711
0
          (errmsg("database \"%s\" does not exist, skipping",
1712
0
              dbname)));
1713
0
      return;
1714
0
    }
1715
0
  }
1716
1717
  /*
1718
   * Permission checks
1719
   */
1720
0
  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1721
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1722
0
             dbname);
1723
1724
  /* DROP hook for the database being removed */
1725
0
  InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1726
1727
  /*
1728
   * Disallow dropping a DB that is marked istemplate.  This is just to
1729
   * prevent people from accidentally dropping template0 or template1; they
1730
   * can do so if they're really determined ...
1731
   */
1732
0
  if (db_istemplate)
1733
0
    ereport(ERROR,
1734
0
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1735
0
         errmsg("cannot drop a template database")));
1736
1737
  /* Obviously can't drop my own database */
1738
0
  if (db_id == MyDatabaseId)
1739
0
    ereport(ERROR,
1740
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1741
0
         errmsg("cannot drop the currently open database")));
1742
1743
  /*
1744
   * Check whether there are active logical slots that refer to the
1745
   * to-be-dropped database. The database lock we are holding prevents the
1746
   * creation of new slots using the database or existing slots becoming
1747
   * active.
1748
   */
1749
0
  (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1750
0
  if (nslots_active)
1751
0
  {
1752
0
    ereport(ERROR,
1753
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1754
0
         errmsg("database \"%s\" is used by an active logical replication slot",
1755
0
            dbname),
1756
0
         errdetail_plural("There is %d active slot.",
1757
0
                  "There are %d active slots.",
1758
0
                  nslots_active, nslots_active)));
1759
0
  }
1760
1761
  /*
1762
   * Check if there are subscriptions defined in the target database.
1763
   *
1764
   * We can't drop them automatically because they might be holding
1765
   * resources in other databases/instances.
1766
   */
1767
0
  if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1768
0
    ereport(ERROR,
1769
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1770
0
         errmsg("database \"%s\" is being used by logical replication subscription",
1771
0
            dbname),
1772
0
         errdetail_plural("There is %d subscription.",
1773
0
                  "There are %d subscriptions.",
1774
0
                  nsubscriptions, nsubscriptions)));
1775
1776
1777
  /*
1778
   * Attempt to terminate all existing connections to the target database if
1779
   * the user has requested to do so.
1780
   */
1781
0
  if (force)
1782
0
    TerminateOtherDBBackends(db_id);
1783
1784
  /*
1785
   * Check for other backends in the target database.  (Because we hold the
1786
   * database lock, no new ones can start after this.)
1787
   *
1788
   * As in CREATE DATABASE, check this after other error conditions.
1789
   */
1790
0
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1791
0
    ereport(ERROR,
1792
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1793
0
         errmsg("database \"%s\" is being accessed by other users",
1794
0
            dbname),
1795
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
1796
1797
  /*
1798
   * Delete any comments or security labels associated with the database.
1799
   */
1800
0
  DeleteSharedComments(db_id, DatabaseRelationId);
1801
0
  DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1802
1803
  /*
1804
   * Remove settings associated with this database
1805
   */
1806
0
  DropSetting(db_id, InvalidOid);
1807
1808
  /*
1809
   * Remove shared dependency references for the database.
1810
   */
1811
0
  dropDatabaseDependencies(db_id);
1812
1813
  /*
1814
   * Tell the cumulative stats system to forget it immediately, too.
1815
   */
1816
0
  pgstat_drop_database(db_id);
1817
1818
  /*
1819
   * Except for the deletion of the catalog row, subsequent actions are not
1820
   * transactional (consider DropDatabaseBuffers() discarding modified
1821
   * buffers). But we might crash or get interrupted below. To prevent
1822
   * accesses to a database with invalid contents, mark the database as
1823
   * invalid using an in-place update.
1824
   *
1825
   * We need to flush the WAL before continuing, to guarantee the
1826
   * modification is durable before performing irreversible filesystem
1827
   * operations.
1828
   */
1829
0
  ScanKeyInit(&scankey,
1830
0
        Anum_pg_database_datname,
1831
0
        BTEqualStrategyNumber, F_NAMEEQ,
1832
0
        CStringGetDatum(dbname));
1833
0
  systable_inplace_update_begin(pgdbrel, DatabaseNameIndexId, true,
1834
0
                  NULL, 1, &scankey, &tup, &inplace_state);
1835
0
  if (!HeapTupleIsValid(tup))
1836
0
    elog(ERROR, "cache lookup failed for database %u", db_id);
1837
0
  datform = (Form_pg_database) GETSTRUCT(tup);
1838
0
  datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1839
0
  systable_inplace_update_finish(inplace_state, tup);
1840
0
  XLogFlush(XactLastRecEnd);
1841
1842
  /*
1843
   * Also delete the tuple - transactionally. If this transaction commits,
1844
   * the row will be gone, but if we fail, dropdb() can be invoked again.
1845
   */
1846
0
  CatalogTupleDelete(pgdbrel, &tup->t_self);
1847
0
  heap_freetuple(tup);
1848
1849
  /*
1850
   * Drop db-specific replication slots.
1851
   */
1852
0
  ReplicationSlotsDropDBSlots(db_id);
1853
1854
  /*
1855
   * Drop pages for this database that are in the shared buffer cache. This
1856
   * is important to ensure that no remaining backend tries to write out a
1857
   * dirty buffer to the dead database later...
1858
   */
1859
0
  DropDatabaseBuffers(db_id);
1860
1861
  /*
1862
   * Tell checkpointer to forget any pending fsync and unlink requests for
1863
   * files in the database; else the fsyncs will fail at next checkpoint, or
1864
   * worse, it will delete files that belong to a newly created database
1865
   * with the same OID.
1866
   */
1867
0
  ForgetDatabaseSyncRequests(db_id);
1868
1869
  /*
1870
   * Force a checkpoint to make sure the checkpointer has received the
1871
   * message sent by ForgetDatabaseSyncRequests.
1872
   */
1873
0
  RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1874
1875
  /* Close all smgr fds in all backends. */
1876
0
  WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1877
1878
  /*
1879
   * Remove all tablespace subdirs belonging to the database.
1880
   */
1881
0
  remove_dbtablespaces(db_id);
1882
1883
  /*
1884
   * Close pg_database, but keep lock till commit.
1885
   */
1886
0
  table_close(pgdbrel, NoLock);
1887
1888
  /*
1889
   * Force synchronous commit, thus minimizing the window between removal of
1890
   * the database files and committal of the transaction. If we crash before
1891
   * committing, we'll have a DB that's gone on disk but still there
1892
   * according to pg_database, which is not good.
1893
   */
1894
0
  ForceSyncCommit();
1895
0
}
1896
1897
1898
/*
1899
 * Rename database
1900
 */
1901
ObjectAddress
1902
RenameDatabase(const char *oldname, const char *newname)
1903
0
{
1904
0
  Oid     db_id;
1905
0
  HeapTuple newtup;
1906
0
  ItemPointerData otid;
1907
0
  Relation  rel;
1908
0
  int     notherbackends;
1909
0
  int     npreparedxacts;
1910
0
  ObjectAddress address;
1911
1912
  /*
1913
   * Look up the target database's OID, and get exclusive lock on it. We
1914
   * need this for the same reasons as DROP DATABASE.
1915
   */
1916
0
  rel = table_open(DatabaseRelationId, RowExclusiveLock);
1917
1918
0
  if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1919
0
           NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1920
0
    ereport(ERROR,
1921
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1922
0
         errmsg("database \"%s\" does not exist", oldname)));
1923
1924
  /* must be owner */
1925
0
  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1926
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1927
0
             oldname);
1928
1929
  /* must have createdb rights */
1930
0
  if (!have_createdb_privilege())
1931
0
    ereport(ERROR,
1932
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1933
0
         errmsg("permission denied to rename database")));
1934
1935
  /*
1936
   * If built with appropriate switch, whine when regression-testing
1937
   * conventions for database names are violated.
1938
   */
1939
#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1940
  if (strstr(newname, "regression") == NULL)
1941
    elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1942
#endif
1943
1944
  /*
1945
   * Make sure the new name doesn't exist.  See notes for same error in
1946
   * CREATE DATABASE.
1947
   */
1948
0
  if (OidIsValid(get_database_oid(newname, true)))
1949
0
    ereport(ERROR,
1950
0
        (errcode(ERRCODE_DUPLICATE_DATABASE),
1951
0
         errmsg("database \"%s\" already exists", newname)));
1952
1953
  /*
1954
   * XXX Client applications probably store the current database somewhere,
1955
   * so renaming it could cause confusion.  On the other hand, there may not
1956
   * be an actual problem besides a little confusion, so think about this
1957
   * and decide.
1958
   */
1959
0
  if (db_id == MyDatabaseId)
1960
0
    ereport(ERROR,
1961
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1962
0
         errmsg("current database cannot be renamed")));
1963
1964
  /*
1965
   * Make sure the database does not have active sessions.  This is the same
1966
   * concern as above, but applied to other sessions.
1967
   *
1968
   * As in CREATE DATABASE, check this after other error conditions.
1969
   */
1970
0
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1971
0
    ereport(ERROR,
1972
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1973
0
         errmsg("database \"%s\" is being accessed by other users",
1974
0
            oldname),
1975
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
1976
1977
  /* rename */
1978
0
  newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1979
0
  if (!HeapTupleIsValid(newtup))
1980
0
    elog(ERROR, "cache lookup failed for database %u", db_id);
1981
0
  otid = newtup->t_self;
1982
0
  namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1983
0
  CatalogTupleUpdate(rel, &otid, newtup);
1984
0
  UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
1985
1986
0
  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1987
1988
0
  ObjectAddressSet(address, DatabaseRelationId, db_id);
1989
1990
  /*
1991
   * Close pg_database, but keep lock till commit.
1992
   */
1993
0
  table_close(rel, NoLock);
1994
1995
0
  return address;
1996
0
}
1997
1998
1999
/*
2000
 * ALTER DATABASE SET TABLESPACE
2001
 */
2002
static void
2003
movedb(const char *dbname, const char *tblspcname)
2004
0
{
2005
0
  Oid     db_id;
2006
0
  Relation  pgdbrel;
2007
0
  int     notherbackends;
2008
0
  int     npreparedxacts;
2009
0
  HeapTuple oldtuple,
2010
0
        newtuple;
2011
0
  Oid     src_tblspcoid,
2012
0
        dst_tblspcoid;
2013
0
  ScanKeyData scankey;
2014
0
  SysScanDesc sysscan;
2015
0
  AclResult aclresult;
2016
0
  char     *src_dbpath;
2017
0
  char     *dst_dbpath;
2018
0
  DIR      *dstdir;
2019
0
  struct dirent *xlde;
2020
0
  movedb_failure_params fparms;
2021
2022
  /*
2023
   * Look up the target database's OID, and get exclusive lock on it. We
2024
   * need this to ensure that no new backend starts up in the database while
2025
   * we are moving it, and that no one is using it as a CREATE DATABASE
2026
   * template or trying to delete it.
2027
   */
2028
0
  pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2029
2030
0
  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2031
0
           NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2032
0
    ereport(ERROR,
2033
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
2034
0
         errmsg("database \"%s\" does not exist", dbname)));
2035
2036
  /*
2037
   * We actually need a session lock, so that the lock will persist across
2038
   * the commit/restart below.  (We could almost get away with letting the
2039
   * lock be released at commit, except that someone could try to move
2040
   * relations of the DB back into the old directory while we rmtree() it.)
2041
   */
2042
0
  LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2043
0
                 AccessExclusiveLock);
2044
2045
  /*
2046
   * Permission checks
2047
   */
2048
0
  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2049
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2050
0
             dbname);
2051
2052
  /*
2053
   * Obviously can't move the tables of my own database
2054
   */
2055
0
  if (db_id == MyDatabaseId)
2056
0
    ereport(ERROR,
2057
0
        (errcode(ERRCODE_OBJECT_IN_USE),
2058
0
         errmsg("cannot change the tablespace of the currently open database")));
2059
2060
  /*
2061
   * Get tablespace's oid
2062
   */
2063
0
  dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2064
2065
  /*
2066
   * Permission checks
2067
   */
2068
0
  aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2069
0
                ACL_CREATE);
2070
0
  if (aclresult != ACLCHECK_OK)
2071
0
    aclcheck_error(aclresult, OBJECT_TABLESPACE,
2072
0
             tblspcname);
2073
2074
  /*
2075
   * pg_global must never be the default tablespace
2076
   */
2077
0
  if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2078
0
    ereport(ERROR,
2079
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2080
0
         errmsg("pg_global cannot be used as default tablespace")));
2081
2082
  /*
2083
   * No-op if same tablespace
2084
   */
2085
0
  if (src_tblspcoid == dst_tblspcoid)
2086
0
  {
2087
0
    table_close(pgdbrel, NoLock);
2088
0
    UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2089
0
                   AccessExclusiveLock);
2090
0
    return;
2091
0
  }
2092
2093
  /*
2094
   * Check for other backends in the target database.  (Because we hold the
2095
   * database lock, no new ones can start after this.)
2096
   *
2097
   * As in CREATE DATABASE, check this after other error conditions.
2098
   */
2099
0
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
2100
0
    ereport(ERROR,
2101
0
        (errcode(ERRCODE_OBJECT_IN_USE),
2102
0
         errmsg("database \"%s\" is being accessed by other users",
2103
0
            dbname),
2104
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
2105
2106
  /*
2107
   * Get old and new database paths
2108
   */
2109
0
  src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2110
0
  dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2111
2112
  /*
2113
   * Force a checkpoint before proceeding. This will force all dirty
2114
   * buffers, including those of unlogged tables, out to disk, to ensure
2115
   * source database is up-to-date on disk for the copy.
2116
   * FlushDatabaseBuffers() would suffice for that, but we also want to
2117
   * process any pending unlink requests. Otherwise, the check for existing
2118
   * files in the target directory might fail unnecessarily, not to mention
2119
   * that the copy might fail due to source files getting deleted under it.
2120
   * On Windows, this also ensures that background procs don't hold any open
2121
   * files, which would cause rmdir() to fail.
2122
   */
2123
0
  RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT
2124
0
            | CHECKPOINT_FLUSH_UNLOGGED);
2125
2126
  /* Close all smgr fds in all backends. */
2127
0
  WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
2128
2129
  /*
2130
   * Now drop all buffers holding data of the target database; they should
2131
   * no longer be dirty so DropDatabaseBuffers is safe.
2132
   *
2133
   * It might seem that we could just let these buffers age out of shared
2134
   * buffers naturally, since they should not get referenced anymore.  The
2135
   * problem with that is that if the user later moves the database back to
2136
   * its original tablespace, any still-surviving buffers would appear to
2137
   * contain valid data again --- but they'd be missing any changes made in
2138
   * the database while it was in the new tablespace.  In any case, freeing
2139
   * buffers that should never be used again seems worth the cycles.
2140
   *
2141
   * Note: it'd be sufficient to get rid of buffers matching db_id and
2142
   * src_tblspcoid, but bufmgr.c presently provides no API for that.
2143
   */
2144
0
  DropDatabaseBuffers(db_id);
2145
2146
  /*
2147
   * Check for existence of files in the target directory, i.e., objects of
2148
   * this database that are already in the target tablespace.  We can't
2149
   * allow the move in such a case, because we would need to change those
2150
   * relations' pg_class.reltablespace entries to zero, and we don't have
2151
   * access to the DB's pg_class to do so.
2152
   */
2153
0
  dstdir = AllocateDir(dst_dbpath);
2154
0
  if (dstdir != NULL)
2155
0
  {
2156
0
    while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2157
0
    {
2158
0
      if (strcmp(xlde->d_name, ".") == 0 ||
2159
0
        strcmp(xlde->d_name, "..") == 0)
2160
0
        continue;
2161
2162
0
      ereport(ERROR,
2163
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2164
0
           errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2165
0
              dbname, tblspcname),
2166
0
           errhint("You must move them back to the database's default tablespace before using this command.")));
2167
0
    }
2168
2169
0
    FreeDir(dstdir);
2170
2171
    /*
2172
     * The directory exists but is empty. We must remove it before using
2173
     * the copydir function.
2174
     */
2175
0
    if (rmdir(dst_dbpath) != 0)
2176
0
      elog(ERROR, "could not remove directory \"%s\": %m",
2177
0
         dst_dbpath);
2178
0
  }
2179
2180
  /*
2181
   * Use an ENSURE block to make sure we remove the debris if the copy fails
2182
   * (eg, due to out-of-disk-space).  This is not a 100% solution, because
2183
   * of the possibility of failure during transaction commit, but it should
2184
   * handle most scenarios.
2185
   */
2186
0
  fparms.dest_dboid = db_id;
2187
0
  fparms.dest_tsoid = dst_tblspcoid;
2188
0
  PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2189
0
              PointerGetDatum(&fparms));
2190
0
  {
2191
0
    Datum   new_record[Natts_pg_database] = {0};
2192
0
    bool    new_record_nulls[Natts_pg_database] = {0};
2193
0
    bool    new_record_repl[Natts_pg_database] = {0};
2194
2195
    /*
2196
     * Copy files from the old tablespace to the new one
2197
     */
2198
0
    copydir(src_dbpath, dst_dbpath, false);
2199
2200
    /*
2201
     * Record the filesystem change in XLOG
2202
     */
2203
0
    {
2204
0
      xl_dbase_create_file_copy_rec xlrec;
2205
2206
0
      xlrec.db_id = db_id;
2207
0
      xlrec.tablespace_id = dst_tblspcoid;
2208
0
      xlrec.src_db_id = db_id;
2209
0
      xlrec.src_tablespace_id = src_tblspcoid;
2210
2211
0
      XLogBeginInsert();
2212
0
      XLogRegisterData(&xlrec,
2213
0
               sizeof(xl_dbase_create_file_copy_rec));
2214
2215
0
      (void) XLogInsert(RM_DBASE_ID,
2216
0
                XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
2217
0
    }
2218
2219
    /*
2220
     * Update the database's pg_database tuple
2221
     */
2222
0
    ScanKeyInit(&scankey,
2223
0
          Anum_pg_database_datname,
2224
0
          BTEqualStrategyNumber, F_NAMEEQ,
2225
0
          CStringGetDatum(dbname));
2226
0
    sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2227
0
                   NULL, 1, &scankey);
2228
0
    oldtuple = systable_getnext(sysscan);
2229
0
    if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2230
0
      ereport(ERROR,
2231
0
          (errcode(ERRCODE_UNDEFINED_DATABASE),
2232
0
           errmsg("database \"%s\" does not exist", dbname)));
2233
0
    LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2234
2235
0
    new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2236
0
    new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2237
2238
0
    newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2239
0
                   new_record,
2240
0
                   new_record_nulls, new_record_repl);
2241
0
    CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2242
0
    UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
2243
2244
0
    InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2245
2246
0
    systable_endscan(sysscan);
2247
2248
    /*
2249
     * Force another checkpoint here.  As in CREATE DATABASE, this is to
2250
     * ensure that we don't have to replay a committed
2251
     * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2252
     * any unlogged operations done in the new DB tablespace before the
2253
     * next checkpoint.
2254
     */
2255
0
    RequestCheckpoint(CHECKPOINT_FAST | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2256
2257
    /*
2258
     * Force synchronous commit, thus minimizing the window between
2259
     * copying the database files and committal of the transaction. If we
2260
     * crash before committing, we'll leave an orphaned set of files on
2261
     * disk, which is not fatal but not good either.
2262
     */
2263
0
    ForceSyncCommit();
2264
2265
    /*
2266
     * Close pg_database, but keep lock till commit.
2267
     */
2268
0
    table_close(pgdbrel, NoLock);
2269
0
  }
2270
0
  PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2271
0
                PointerGetDatum(&fparms));
2272
2273
  /*
2274
   * Commit the transaction so that the pg_database update is committed. If
2275
   * we crash while removing files, the database won't be corrupt, we'll
2276
   * just leave some orphaned files in the old directory.
2277
   *
2278
   * (This is OK because we know we aren't inside a transaction block.)
2279
   *
2280
   * XXX would it be safe/better to do this inside the ensure block?  Not
2281
   * convinced it's a good idea; consider elog just after the transaction
2282
   * really commits.
2283
   */
2284
0
  PopActiveSnapshot();
2285
0
  CommitTransactionCommand();
2286
2287
  /* Start new transaction for the remaining work; don't need a snapshot */
2288
0
  StartTransactionCommand();
2289
2290
  /*
2291
   * Remove files from the old tablespace
2292
   */
2293
0
  if (!rmtree(src_dbpath, true))
2294
0
    ereport(WARNING,
2295
0
        (errmsg("some useless files may be left behind in old database directory \"%s\"",
2296
0
            src_dbpath)));
2297
2298
  /*
2299
   * Record the filesystem change in XLOG
2300
   */
2301
0
  {
2302
0
    xl_dbase_drop_rec xlrec;
2303
2304
0
    xlrec.db_id = db_id;
2305
0
    xlrec.ntablespaces = 1;
2306
2307
0
    XLogBeginInsert();
2308
0
    XLogRegisterData(&xlrec, sizeof(xl_dbase_drop_rec));
2309
0
    XLogRegisterData(&src_tblspcoid, sizeof(Oid));
2310
2311
0
    (void) XLogInsert(RM_DBASE_ID,
2312
0
              XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2313
0
  }
2314
2315
  /* Now it's safe to release the database lock */
2316
0
  UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2317
0
                 AccessExclusiveLock);
2318
2319
0
  pfree(src_dbpath);
2320
0
  pfree(dst_dbpath);
2321
0
}
2322
2323
/* Error cleanup callback for movedb */
2324
static void
2325
movedb_failure_callback(int code, Datum arg)
2326
0
{
2327
0
  movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2328
0
  char     *dstpath;
2329
2330
  /* Get rid of anything we managed to copy to the target directory */
2331
0
  dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2332
2333
0
  (void) rmtree(dstpath, true);
2334
2335
0
  pfree(dstpath);
2336
0
}
2337
2338
/*
2339
 * Process options and call dropdb function.
2340
 */
2341
void
2342
DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2343
0
{
2344
0
  bool    force = false;
2345
0
  ListCell   *lc;
2346
2347
0
  foreach(lc, stmt->options)
2348
0
  {
2349
0
    DefElem    *opt = (DefElem *) lfirst(lc);
2350
2351
0
    if (strcmp(opt->defname, "force") == 0)
2352
0
      force = true;
2353
0
    else
2354
0
      ereport(ERROR,
2355
0
          (errcode(ERRCODE_SYNTAX_ERROR),
2356
0
           errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2357
0
           parser_errposition(pstate, opt->location)));
2358
0
  }
2359
2360
0
  dropdb(stmt->dbname, stmt->missing_ok, force);
2361
0
}
2362
2363
/*
2364
 * ALTER DATABASE name ...
2365
 */
2366
Oid
2367
AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2368
0
{
2369
0
  Relation  rel;
2370
0
  Oid     dboid;
2371
0
  HeapTuple tuple,
2372
0
        newtuple;
2373
0
  Form_pg_database datform;
2374
0
  ScanKeyData scankey;
2375
0
  SysScanDesc scan;
2376
0
  ListCell   *option;
2377
0
  bool    dbistemplate = false;
2378
0
  bool    dballowconnections = true;
2379
0
  int     dbconnlimit = DATCONNLIMIT_UNLIMITED;
2380
0
  DefElem    *distemplate = NULL;
2381
0
  DefElem    *dallowconnections = NULL;
2382
0
  DefElem    *dconnlimit = NULL;
2383
0
  DefElem    *dtablespace = NULL;
2384
0
  Datum   new_record[Natts_pg_database] = {0};
2385
0
  bool    new_record_nulls[Natts_pg_database] = {0};
2386
0
  bool    new_record_repl[Natts_pg_database] = {0};
2387
2388
  /* Extract options from the statement node tree */
2389
0
  foreach(option, stmt->options)
2390
0
  {
2391
0
    DefElem    *defel = (DefElem *) lfirst(option);
2392
2393
0
    if (strcmp(defel->defname, "is_template") == 0)
2394
0
    {
2395
0
      if (distemplate)
2396
0
        errorConflictingDefElem(defel, pstate);
2397
0
      distemplate = defel;
2398
0
    }
2399
0
    else if (strcmp(defel->defname, "allow_connections") == 0)
2400
0
    {
2401
0
      if (dallowconnections)
2402
0
        errorConflictingDefElem(defel, pstate);
2403
0
      dallowconnections = defel;
2404
0
    }
2405
0
    else if (strcmp(defel->defname, "connection_limit") == 0)
2406
0
    {
2407
0
      if (dconnlimit)
2408
0
        errorConflictingDefElem(defel, pstate);
2409
0
      dconnlimit = defel;
2410
0
    }
2411
0
    else if (strcmp(defel->defname, "tablespace") == 0)
2412
0
    {
2413
0
      if (dtablespace)
2414
0
        errorConflictingDefElem(defel, pstate);
2415
0
      dtablespace = defel;
2416
0
    }
2417
0
    else
2418
0
      ereport(ERROR,
2419
0
          (errcode(ERRCODE_SYNTAX_ERROR),
2420
0
           errmsg("option \"%s\" not recognized", defel->defname),
2421
0
           parser_errposition(pstate, defel->location)));
2422
0
  }
2423
2424
0
  if (dtablespace)
2425
0
  {
2426
    /*
2427
     * While the SET TABLESPACE syntax doesn't allow any other options,
2428
     * somebody could write "WITH TABLESPACE ...".  Forbid any other
2429
     * options from being specified in that case.
2430
     */
2431
0
    if (list_length(stmt->options) != 1)
2432
0
      ereport(ERROR,
2433
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2434
0
           errmsg("option \"%s\" cannot be specified with other options",
2435
0
              dtablespace->defname),
2436
0
           parser_errposition(pstate, dtablespace->location)));
2437
    /* this case isn't allowed within a transaction block */
2438
0
    PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2439
0
    movedb(stmt->dbname, defGetString(dtablespace));
2440
0
    return InvalidOid;
2441
0
  }
2442
2443
0
  if (distemplate && distemplate->arg)
2444
0
    dbistemplate = defGetBoolean(distemplate);
2445
0
  if (dallowconnections && dallowconnections->arg)
2446
0
    dballowconnections = defGetBoolean(dallowconnections);
2447
0
  if (dconnlimit && dconnlimit->arg)
2448
0
  {
2449
0
    dbconnlimit = defGetInt32(dconnlimit);
2450
0
    if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2451
0
      ereport(ERROR,
2452
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2453
0
           errmsg("invalid connection limit: %d", dbconnlimit)));
2454
0
  }
2455
2456
  /*
2457
   * Get the old tuple.  We don't need a lock on the database per se,
2458
   * because we're not going to do anything that would mess up incoming
2459
   * connections.
2460
   */
2461
0
  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2462
0
  ScanKeyInit(&scankey,
2463
0
        Anum_pg_database_datname,
2464
0
        BTEqualStrategyNumber, F_NAMEEQ,
2465
0
        CStringGetDatum(stmt->dbname));
2466
0
  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2467
0
                NULL, 1, &scankey);
2468
0
  tuple = systable_getnext(scan);
2469
0
  if (!HeapTupleIsValid(tuple))
2470
0
    ereport(ERROR,
2471
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
2472
0
         errmsg("database \"%s\" does not exist", stmt->dbname)));
2473
0
  LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2474
2475
0
  datform = (Form_pg_database) GETSTRUCT(tuple);
2476
0
  dboid = datform->oid;
2477
2478
0
  if (database_is_invalid_form(datform))
2479
0
  {
2480
0
    ereport(FATAL,
2481
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2482
0
        errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2483
0
        errhint("Use DROP DATABASE to drop invalid databases."));
2484
0
  }
2485
2486
0
  if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2487
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2488
0
             stmt->dbname);
2489
2490
  /*
2491
   * In order to avoid getting locked out and having to go through
2492
   * standalone mode, we refuse to disallow connections to the database
2493
   * we're currently connected to.  Lockout can still happen with concurrent
2494
   * sessions but the likeliness of that is not high enough to worry about.
2495
   */
2496
0
  if (!dballowconnections && dboid == MyDatabaseId)
2497
0
    ereport(ERROR,
2498
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2499
0
         errmsg("cannot disallow connections for current database")));
2500
2501
  /*
2502
   * Build an updated tuple, perusing the information just obtained
2503
   */
2504
0
  if (distemplate)
2505
0
  {
2506
0
    new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2507
0
    new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2508
0
  }
2509
0
  if (dallowconnections)
2510
0
  {
2511
0
    new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2512
0
    new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2513
0
  }
2514
0
  if (dconnlimit)
2515
0
  {
2516
0
    new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2517
0
    new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2518
0
  }
2519
2520
0
  newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2521
0
                 new_record_nulls, new_record_repl);
2522
0
  CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2523
0
  UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2524
2525
0
  InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2526
2527
0
  systable_endscan(scan);
2528
2529
  /* Close pg_database, but keep lock till commit */
2530
0
  table_close(rel, NoLock);
2531
2532
0
  return dboid;
2533
0
}
2534
2535
2536
/*
2537
 * ALTER DATABASE name REFRESH COLLATION VERSION
2538
 */
2539
ObjectAddress
2540
AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2541
0
{
2542
0
  Relation  rel;
2543
0
  ScanKeyData scankey;
2544
0
  SysScanDesc scan;
2545
0
  Oid     db_id;
2546
0
  HeapTuple tuple;
2547
0
  Form_pg_database datForm;
2548
0
  ObjectAddress address;
2549
0
  Datum   datum;
2550
0
  bool    isnull;
2551
0
  char     *oldversion;
2552
0
  char     *newversion;
2553
2554
0
  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2555
0
  ScanKeyInit(&scankey,
2556
0
        Anum_pg_database_datname,
2557
0
        BTEqualStrategyNumber, F_NAMEEQ,
2558
0
        CStringGetDatum(stmt->dbname));
2559
0
  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2560
0
                NULL, 1, &scankey);
2561
0
  tuple = systable_getnext(scan);
2562
0
  if (!HeapTupleIsValid(tuple))
2563
0
    ereport(ERROR,
2564
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
2565
0
         errmsg("database \"%s\" does not exist", stmt->dbname)));
2566
2567
0
  datForm = (Form_pg_database) GETSTRUCT(tuple);
2568
0
  db_id = datForm->oid;
2569
2570
0
  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2571
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2572
0
             stmt->dbname);
2573
0
  LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2574
2575
0
  datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2576
0
  oldversion = isnull ? NULL : TextDatumGetCString(datum);
2577
2578
0
  if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2579
0
  {
2580
0
    datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2581
0
    if (isnull)
2582
0
      elog(ERROR, "unexpected null in pg_database");
2583
0
  }
2584
0
  else
2585
0
  {
2586
0
    datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2587
0
    if (isnull)
2588
0
      elog(ERROR, "unexpected null in pg_database");
2589
0
  }
2590
2591
0
  newversion = get_collation_actual_version(datForm->datlocprovider,
2592
0
                        TextDatumGetCString(datum));
2593
2594
  /* cannot change from NULL to non-NULL or vice versa */
2595
0
  if ((!oldversion && newversion) || (oldversion && !newversion))
2596
0
    elog(ERROR, "invalid collation version change");
2597
0
  else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2598
0
  {
2599
0
    bool    nulls[Natts_pg_database] = {0};
2600
0
    bool    replaces[Natts_pg_database] = {0};
2601
0
    Datum   values[Natts_pg_database] = {0};
2602
0
    HeapTuple newtuple;
2603
2604
0
    ereport(NOTICE,
2605
0
        (errmsg("changing version from %s to %s",
2606
0
            oldversion, newversion)));
2607
2608
0
    values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2609
0
    replaces[Anum_pg_database_datcollversion - 1] = true;
2610
2611
0
    newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2612
0
                   values, nulls, replaces);
2613
0
    CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2614
0
    heap_freetuple(newtuple);
2615
0
  }
2616
0
  else
2617
0
    ereport(NOTICE,
2618
0
        (errmsg("version has not changed")));
2619
0
  UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2620
2621
0
  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2622
2623
0
  ObjectAddressSet(address, DatabaseRelationId, db_id);
2624
2625
0
  systable_endscan(scan);
2626
2627
0
  table_close(rel, NoLock);
2628
2629
0
  return address;
2630
0
}
2631
2632
2633
/*
2634
 * ALTER DATABASE name SET ...
2635
 */
2636
Oid
2637
AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
2638
0
{
2639
0
  Oid     datid = get_database_oid(stmt->dbname, false);
2640
2641
  /*
2642
   * Obtain a lock on the database and make sure it didn't go away in the
2643
   * meantime.
2644
   */
2645
0
  shdepLockAndCheckObject(DatabaseRelationId, datid);
2646
2647
0
  if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2648
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2649
0
             stmt->dbname);
2650
2651
0
  AlterSetting(datid, InvalidOid, stmt->setstmt);
2652
2653
0
  UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2654
2655
0
  return datid;
2656
0
}
2657
2658
2659
/*
2660
 * ALTER DATABASE name OWNER TO newowner
2661
 */
2662
ObjectAddress
2663
AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2664
0
{
2665
0
  Oid     db_id;
2666
0
  HeapTuple tuple;
2667
0
  Relation  rel;
2668
0
  ScanKeyData scankey;
2669
0
  SysScanDesc scan;
2670
0
  Form_pg_database datForm;
2671
0
  ObjectAddress address;
2672
2673
  /*
2674
   * Get the old tuple.  We don't need a lock on the database per se,
2675
   * because we're not going to do anything that would mess up incoming
2676
   * connections.
2677
   */
2678
0
  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2679
0
  ScanKeyInit(&scankey,
2680
0
        Anum_pg_database_datname,
2681
0
        BTEqualStrategyNumber, F_NAMEEQ,
2682
0
        CStringGetDatum(dbname));
2683
0
  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2684
0
                NULL, 1, &scankey);
2685
0
  tuple = systable_getnext(scan);
2686
0
  if (!HeapTupleIsValid(tuple))
2687
0
    ereport(ERROR,
2688
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
2689
0
         errmsg("database \"%s\" does not exist", dbname)));
2690
2691
0
  datForm = (Form_pg_database) GETSTRUCT(tuple);
2692
0
  db_id = datForm->oid;
2693
2694
  /*
2695
   * If the new owner is the same as the existing owner, consider the
2696
   * command to have succeeded.  This is to be consistent with other
2697
   * objects.
2698
   */
2699
0
  if (datForm->datdba != newOwnerId)
2700
0
  {
2701
0
    Datum   repl_val[Natts_pg_database];
2702
0
    bool    repl_null[Natts_pg_database] = {0};
2703
0
    bool    repl_repl[Natts_pg_database] = {0};
2704
0
    Acl      *newAcl;
2705
0
    Datum   aclDatum;
2706
0
    bool    isNull;
2707
0
    HeapTuple newtuple;
2708
2709
    /* Otherwise, must be owner of the existing object */
2710
0
    if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2711
0
      aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2712
0
               dbname);
2713
2714
    /* Must be able to become new owner */
2715
0
    check_can_set_role(GetUserId(), newOwnerId);
2716
2717
    /*
2718
     * must have createdb rights
2719
     *
2720
     * NOTE: This is different from other alter-owner checks in that the
2721
     * current user is checked for createdb privileges instead of the
2722
     * destination owner.  This is consistent with the CREATE case for
2723
     * databases.  Because superusers will always have this right, we need
2724
     * no special case for them.
2725
     */
2726
0
    if (!have_createdb_privilege())
2727
0
      ereport(ERROR,
2728
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2729
0
           errmsg("permission denied to change owner of database")));
2730
2731
0
    LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2732
2733
0
    repl_repl[Anum_pg_database_datdba - 1] = true;
2734
0
    repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2735
2736
    /*
2737
     * Determine the modified ACL for the new owner.  This is only
2738
     * necessary when the ACL is non-null.
2739
     */
2740
0
    aclDatum = heap_getattr(tuple,
2741
0
                Anum_pg_database_datacl,
2742
0
                RelationGetDescr(rel),
2743
0
                &isNull);
2744
0
    if (!isNull)
2745
0
    {
2746
0
      newAcl = aclnewowner(DatumGetAclP(aclDatum),
2747
0
                 datForm->datdba, newOwnerId);
2748
0
      repl_repl[Anum_pg_database_datacl - 1] = true;
2749
0
      repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2750
0
    }
2751
2752
0
    newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2753
0
    CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2754
0
    UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
2755
2756
0
    heap_freetuple(newtuple);
2757
2758
    /* Update owner dependency reference */
2759
0
    changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2760
0
  }
2761
2762
0
  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2763
2764
0
  ObjectAddressSet(address, DatabaseRelationId, db_id);
2765
2766
0
  systable_endscan(scan);
2767
2768
  /* Close pg_database, but keep lock till commit */
2769
0
  table_close(rel, NoLock);
2770
2771
0
  return address;
2772
0
}
2773
2774
2775
Datum
2776
pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2777
0
{
2778
0
  Oid     dbid = PG_GETARG_OID(0);
2779
0
  HeapTuple tp;
2780
0
  char    datlocprovider;
2781
0
  Datum   datum;
2782
0
  char     *version;
2783
2784
0
  tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2785
0
  if (!HeapTupleIsValid(tp))
2786
0
    ereport(ERROR,
2787
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
2788
0
         errmsg("database with OID %u does not exist", dbid)));
2789
2790
0
  datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2791
2792
0
  if (datlocprovider == COLLPROVIDER_LIBC)
2793
0
    datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2794
0
  else
2795
0
    datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2796
2797
0
  version = get_collation_actual_version(datlocprovider,
2798
0
                       TextDatumGetCString(datum));
2799
2800
0
  ReleaseSysCache(tp);
2801
2802
0
  if (version)
2803
0
    PG_RETURN_TEXT_P(cstring_to_text(version));
2804
0
  else
2805
0
    PG_RETURN_NULL();
2806
0
}
2807
2808
2809
/*
2810
 * Helper functions
2811
 */
2812
2813
/*
2814
 * Look up info about the database named "name".  If the database exists,
2815
 * obtain the specified lock type on it, fill in any of the remaining
2816
 * parameters that aren't NULL, and return true.  If no such database,
2817
 * return false.
2818
 */
2819
static bool
2820
get_db_info(const char *name, LOCKMODE lockmode,
2821
      Oid *dbIdP, Oid *ownerIdP,
2822
      int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2823
      TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2824
      Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2825
      char **dbIcurules,
2826
      char *dbLocProvider,
2827
      char **dbCollversion)
2828
0
{
2829
0
  bool    result = false;
2830
0
  Relation  relation;
2831
2832
0
  Assert(name);
2833
2834
  /* Caller may wish to grab a better lock on pg_database beforehand... */
2835
0
  relation = table_open(DatabaseRelationId, AccessShareLock);
2836
2837
  /*
2838
   * Loop covers the rare case where the database is renamed before we can
2839
   * lock it.  We try again just in case we can find a new one of the same
2840
   * name.
2841
   */
2842
0
  for (;;)
2843
0
  {
2844
0
    ScanKeyData scanKey;
2845
0
    SysScanDesc scan;
2846
0
    HeapTuple tuple;
2847
0
    Oid     dbOid;
2848
2849
    /*
2850
     * there's no syscache for database-indexed-by-name, so must do it the
2851
     * hard way
2852
     */
2853
0
    ScanKeyInit(&scanKey,
2854
0
          Anum_pg_database_datname,
2855
0
          BTEqualStrategyNumber, F_NAMEEQ,
2856
0
          CStringGetDatum(name));
2857
2858
0
    scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2859
0
                  NULL, 1, &scanKey);
2860
2861
0
    tuple = systable_getnext(scan);
2862
2863
0
    if (!HeapTupleIsValid(tuple))
2864
0
    {
2865
      /* definitely no database of that name */
2866
0
      systable_endscan(scan);
2867
0
      break;
2868
0
    }
2869
2870
0
    dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2871
2872
0
    systable_endscan(scan);
2873
2874
    /*
2875
     * Now that we have a database OID, we can try to lock the DB.
2876
     */
2877
0
    if (lockmode != NoLock)
2878
0
      LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2879
2880
    /*
2881
     * And now, re-fetch the tuple by OID.  If it's still there and still
2882
     * the same name, we win; else, drop the lock and loop back to try
2883
     * again.
2884
     */
2885
0
    tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2886
0
    if (HeapTupleIsValid(tuple))
2887
0
    {
2888
0
      Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2889
2890
0
      if (strcmp(name, NameStr(dbform->datname)) == 0)
2891
0
      {
2892
0
        Datum   datum;
2893
0
        bool    isnull;
2894
2895
        /* oid of the database */
2896
0
        if (dbIdP)
2897
0
          *dbIdP = dbOid;
2898
        /* oid of the owner */
2899
0
        if (ownerIdP)
2900
0
          *ownerIdP = dbform->datdba;
2901
        /* character encoding */
2902
0
        if (encodingP)
2903
0
          *encodingP = dbform->encoding;
2904
        /* allowed as template? */
2905
0
        if (dbIsTemplateP)
2906
0
          *dbIsTemplateP = dbform->datistemplate;
2907
        /* Has on login event trigger? */
2908
0
        if (dbHasLoginEvtP)
2909
0
          *dbHasLoginEvtP = dbform->dathasloginevt;
2910
        /* allowing connections? */
2911
0
        if (dbAllowConnP)
2912
0
          *dbAllowConnP = dbform->datallowconn;
2913
        /* limit of frozen XIDs */
2914
0
        if (dbFrozenXidP)
2915
0
          *dbFrozenXidP = dbform->datfrozenxid;
2916
        /* minimum MultiXactId */
2917
0
        if (dbMinMultiP)
2918
0
          *dbMinMultiP = dbform->datminmxid;
2919
        /* default tablespace for this database */
2920
0
        if (dbTablespace)
2921
0
          *dbTablespace = dbform->dattablespace;
2922
        /* default locale settings for this database */
2923
0
        if (dbLocProvider)
2924
0
          *dbLocProvider = dbform->datlocprovider;
2925
0
        if (dbCollate)
2926
0
        {
2927
0
          datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2928
0
          *dbCollate = TextDatumGetCString(datum);
2929
0
        }
2930
0
        if (dbCtype)
2931
0
        {
2932
0
          datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2933
0
          *dbCtype = TextDatumGetCString(datum);
2934
0
        }
2935
0
        if (dbLocale)
2936
0
        {
2937
0
          datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2938
0
          if (isnull)
2939
0
            *dbLocale = NULL;
2940
0
          else
2941
0
            *dbLocale = TextDatumGetCString(datum);
2942
0
        }
2943
0
        if (dbIcurules)
2944
0
        {
2945
0
          datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2946
0
          if (isnull)
2947
0
            *dbIcurules = NULL;
2948
0
          else
2949
0
            *dbIcurules = TextDatumGetCString(datum);
2950
0
        }
2951
0
        if (dbCollversion)
2952
0
        {
2953
0
          datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2954
0
          if (isnull)
2955
0
            *dbCollversion = NULL;
2956
0
          else
2957
0
            *dbCollversion = TextDatumGetCString(datum);
2958
0
        }
2959
0
        ReleaseSysCache(tuple);
2960
0
        result = true;
2961
0
        break;
2962
0
      }
2963
      /* can only get here if it was just renamed */
2964
0
      ReleaseSysCache(tuple);
2965
0
    }
2966
2967
0
    if (lockmode != NoLock)
2968
0
      UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2969
0
  }
2970
2971
0
  table_close(relation, AccessShareLock);
2972
2973
0
  return result;
2974
0
}
2975
2976
/* Check if current user has createdb privileges */
2977
bool
2978
have_createdb_privilege(void)
2979
0
{
2980
0
  bool    result = false;
2981
0
  HeapTuple utup;
2982
2983
  /* Superusers can always do everything */
2984
0
  if (superuser())
2985
0
    return true;
2986
2987
0
  utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2988
0
  if (HeapTupleIsValid(utup))
2989
0
  {
2990
0
    result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2991
0
    ReleaseSysCache(utup);
2992
0
  }
2993
0
  return result;
2994
0
}
2995
2996
/*
2997
 * Remove tablespace directories
2998
 *
2999
 * We don't know what tablespaces db_id is using, so iterate through all
3000
 * tablespaces removing <tablespace>/db_id
3001
 */
3002
static void
3003
remove_dbtablespaces(Oid db_id)
3004
0
{
3005
0
  Relation  rel;
3006
0
  TableScanDesc scan;
3007
0
  HeapTuple tuple;
3008
0
  List     *ltblspc = NIL;
3009
0
  ListCell   *cell;
3010
0
  int     ntblspc;
3011
0
  int     i;
3012
0
  Oid      *tablespace_ids;
3013
3014
0
  rel = table_open(TableSpaceRelationId, AccessShareLock);
3015
0
  scan = table_beginscan_catalog(rel, 0, NULL);
3016
0
  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3017
0
  {
3018
0
    Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3019
0
    Oid     dsttablespace = spcform->oid;
3020
0
    char     *dstpath;
3021
0
    struct stat st;
3022
3023
    /* Don't mess with the global tablespace */
3024
0
    if (dsttablespace == GLOBALTABLESPACE_OID)
3025
0
      continue;
3026
3027
0
    dstpath = GetDatabasePath(db_id, dsttablespace);
3028
3029
0
    if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3030
0
    {
3031
      /* Assume we can ignore it */
3032
0
      pfree(dstpath);
3033
0
      continue;
3034
0
    }
3035
3036
0
    if (!rmtree(dstpath, true))
3037
0
      ereport(WARNING,
3038
0
          (errmsg("some useless files may be left behind in old database directory \"%s\"",
3039
0
              dstpath)));
3040
3041
0
    ltblspc = lappend_oid(ltblspc, dsttablespace);
3042
0
    pfree(dstpath);
3043
0
  }
3044
3045
0
  ntblspc = list_length(ltblspc);
3046
0
  if (ntblspc == 0)
3047
0
  {
3048
0
    table_endscan(scan);
3049
0
    table_close(rel, AccessShareLock);
3050
0
    return;
3051
0
  }
3052
3053
0
  tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3054
0
  i = 0;
3055
0
  foreach(cell, ltblspc)
3056
0
    tablespace_ids[i++] = lfirst_oid(cell);
3057
3058
  /* Record the filesystem change in XLOG */
3059
0
  {
3060
0
    xl_dbase_drop_rec xlrec;
3061
3062
0
    xlrec.db_id = db_id;
3063
0
    xlrec.ntablespaces = ntblspc;
3064
3065
0
    XLogBeginInsert();
3066
0
    XLogRegisterData(&xlrec, MinSizeOfDbaseDropRec);
3067
0
    XLogRegisterData(tablespace_ids, ntblspc * sizeof(Oid));
3068
3069
0
    (void) XLogInsert(RM_DBASE_ID,
3070
0
              XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
3071
0
  }
3072
3073
0
  list_free(ltblspc);
3074
0
  pfree(tablespace_ids);
3075
3076
0
  table_endscan(scan);
3077
0
  table_close(rel, AccessShareLock);
3078
0
}
3079
3080
/*
3081
 * Check for existing files that conflict with a proposed new DB OID;
3082
 * return true if there are any
3083
 *
3084
 * If there were a subdirectory in any tablespace matching the proposed new
3085
 * OID, we'd get a create failure due to the duplicate name ... and then we'd
3086
 * try to remove that already-existing subdirectory during the cleanup in
3087
 * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
3088
 * instead we make this extra check before settling on the OID of the new
3089
 * database.  This exactly parallels what GetNewRelFileNumber() does for table
3090
 * relfilenumber values.
3091
 */
3092
static bool
3093
check_db_file_conflict(Oid db_id)
3094
0
{
3095
0
  bool    result = false;
3096
0
  Relation  rel;
3097
0
  TableScanDesc scan;
3098
0
  HeapTuple tuple;
3099
3100
0
  rel = table_open(TableSpaceRelationId, AccessShareLock);
3101
0
  scan = table_beginscan_catalog(rel, 0, NULL);
3102
0
  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3103
0
  {
3104
0
    Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3105
0
    Oid     dsttablespace = spcform->oid;
3106
0
    char     *dstpath;
3107
0
    struct stat st;
3108
3109
    /* Don't mess with the global tablespace */
3110
0
    if (dsttablespace == GLOBALTABLESPACE_OID)
3111
0
      continue;
3112
3113
0
    dstpath = GetDatabasePath(db_id, dsttablespace);
3114
3115
0
    if (lstat(dstpath, &st) == 0)
3116
0
    {
3117
      /* Found a conflicting file (or directory, whatever) */
3118
0
      pfree(dstpath);
3119
0
      result = true;
3120
0
      break;
3121
0
    }
3122
3123
0
    pfree(dstpath);
3124
0
  }
3125
3126
0
  table_endscan(scan);
3127
0
  table_close(rel, AccessShareLock);
3128
3129
0
  return result;
3130
0
}
3131
3132
/*
3133
 * Issue a suitable errdetail message for a busy database
3134
 */
3135
static int
3136
errdetail_busy_db(int notherbackends, int npreparedxacts)
3137
0
{
3138
0
  if (notherbackends > 0 && npreparedxacts > 0)
3139
3140
    /*
3141
     * We don't deal with singular versus plural here, since gettext
3142
     * doesn't support multiple plurals in one string.
3143
     */
3144
0
    errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3145
0
          notherbackends, npreparedxacts);
3146
0
  else if (notherbackends > 0)
3147
0
    errdetail_plural("There is %d other session using the database.",
3148
0
             "There are %d other sessions using the database.",
3149
0
             notherbackends,
3150
0
             notherbackends);
3151
0
  else
3152
0
    errdetail_plural("There is %d prepared transaction using the database.",
3153
0
             "There are %d prepared transactions using the database.",
3154
0
             npreparedxacts,
3155
0
             npreparedxacts);
3156
0
  return 0;         /* just to keep ereport macro happy */
3157
0
}
3158
3159
/*
3160
 * get_database_oid - given a database name, look up the OID
3161
 *
3162
 * If missing_ok is false, throw an error if database name not found.  If
3163
 * true, just return InvalidOid.
3164
 */
3165
Oid
3166
get_database_oid(const char *dbname, bool missing_ok)
3167
0
{
3168
0
  Relation  pg_database;
3169
0
  ScanKeyData entry[1];
3170
0
  SysScanDesc scan;
3171
0
  HeapTuple dbtuple;
3172
0
  Oid     oid;
3173
3174
  /*
3175
   * There's no syscache for pg_database indexed by name, so we must look
3176
   * the hard way.
3177
   */
3178
0
  pg_database = table_open(DatabaseRelationId, AccessShareLock);
3179
0
  ScanKeyInit(&entry[0],
3180
0
        Anum_pg_database_datname,
3181
0
        BTEqualStrategyNumber, F_NAMEEQ,
3182
0
        CStringGetDatum(dbname));
3183
0
  scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3184
0
                NULL, 1, entry);
3185
3186
0
  dbtuple = systable_getnext(scan);
3187
3188
  /* We assume that there can be at most one matching tuple */
3189
0
  if (HeapTupleIsValid(dbtuple))
3190
0
    oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3191
0
  else
3192
0
    oid = InvalidOid;
3193
3194
0
  systable_endscan(scan);
3195
0
  table_close(pg_database, AccessShareLock);
3196
3197
0
  if (!OidIsValid(oid) && !missing_ok)
3198
0
    ereport(ERROR,
3199
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
3200
0
         errmsg("database \"%s\" does not exist",
3201
0
            dbname)));
3202
3203
0
  return oid;
3204
0
}
3205
3206
3207
/*
3208
 * get_database_name - given a database OID, look up the name
3209
 *
3210
 * Returns a palloc'd string, or NULL if no such database.
3211
 */
3212
char *
3213
get_database_name(Oid dbid)
3214
0
{
3215
0
  HeapTuple dbtuple;
3216
0
  char     *result;
3217
3218
0
  dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3219
0
  if (HeapTupleIsValid(dbtuple))
3220
0
  {
3221
0
    result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3222
0
    ReleaseSysCache(dbtuple);
3223
0
  }
3224
0
  else
3225
0
    result = NULL;
3226
3227
0
  return result;
3228
0
}
3229
3230
3231
/*
3232
 * While dropping a database the pg_database row is marked invalid, but the
3233
 * catalog contents still exist. Connections to such a database are not
3234
 * allowed.
3235
 */
3236
bool
3237
database_is_invalid_form(Form_pg_database datform)
3238
0
{
3239
0
  return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3240
0
}
3241
3242
3243
/*
3244
 * Convenience wrapper around database_is_invalid_form()
3245
 */
3246
bool
3247
database_is_invalid_oid(Oid dboid)
3248
0
{
3249
0
  HeapTuple dbtup;
3250
0
  Form_pg_database dbform;
3251
0
  bool    invalid;
3252
3253
0
  dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3254
0
  if (!HeapTupleIsValid(dbtup))
3255
0
    elog(ERROR, "cache lookup failed for database %u", dboid);
3256
0
  dbform = (Form_pg_database) GETSTRUCT(dbtup);
3257
3258
0
  invalid = database_is_invalid_form(dbform);
3259
3260
0
  ReleaseSysCache(dbtup);
3261
3262
0
  return invalid;
3263
0
}
3264
3265
3266
/*
3267
 * recovery_create_dbdir()
3268
 *
3269
 * During recovery, there's a case where we validly need to recover a missing
3270
 * tablespace directory so that recovery can continue.  This happens when
3271
 * recovery wants to create a database but the holding tablespace has been
3272
 * removed before the server stopped.  Since we expect that the directory will
3273
 * be gone before reaching recovery consistency, and we have no knowledge about
3274
 * the tablespace other than its OID here, we create a real directory under
3275
 * pg_tblspc here instead of restoring the symlink.
3276
 *
3277
 * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3278
 */
3279
static void
3280
recovery_create_dbdir(char *path, bool only_tblspc)
3281
0
{
3282
0
  struct stat st;
3283
3284
0
  Assert(RecoveryInProgress());
3285
3286
0
  if (stat(path, &st) == 0)
3287
0
    return;
3288
3289
0
  if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3290
0
    elog(PANIC, "requested to created invalid directory: %s", path);
3291
3292
0
  if (reachedConsistency && !allow_in_place_tablespaces)
3293
0
    ereport(PANIC,
3294
0
        errmsg("missing directory \"%s\"", path));
3295
3296
0
  elog(reachedConsistency ? WARNING : DEBUG1,
3297
0
     "creating missing directory: %s", path);
3298
3299
0
  if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3300
0
    ereport(PANIC,
3301
0
        errmsg("could not create missing directory \"%s\": %m", path));
3302
0
}
3303
3304
3305
/*
3306
 * DATABASE resource manager's routines
3307
 */
3308
void
3309
dbase_redo(XLogReaderState *record)
3310
0
{
3311
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3312
3313
  /* Backup blocks are not used in dbase records */
3314
0
  Assert(!XLogRecHasAnyBlockRefs(record));
3315
3316
0
  if (info == XLOG_DBASE_CREATE_FILE_COPY)
3317
0
  {
3318
0
    xl_dbase_create_file_copy_rec *xlrec =
3319
0
      (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3320
0
    char     *src_path;
3321
0
    char     *dst_path;
3322
0
    char     *parent_path;
3323
0
    struct stat st;
3324
3325
0
    src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3326
0
    dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3327
3328
    /*
3329
     * Our theory for replaying a CREATE is to forcibly drop the target
3330
     * subdirectory if present, then re-copy the source data. This may be
3331
     * more work than needed, but it is simple to implement.
3332
     */
3333
0
    if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3334
0
    {
3335
0
      if (!rmtree(dst_path, true))
3336
        /* If this failed, copydir() below is going to error. */
3337
0
        ereport(WARNING,
3338
0
            (errmsg("some useless files may be left behind in old database directory \"%s\"",
3339
0
                dst_path)));
3340
0
    }
3341
3342
    /*
3343
     * If the parent of the target path doesn't exist, create it now. This
3344
     * enables us to create the target underneath later.
3345
     */
3346
0
    parent_path = pstrdup(dst_path);
3347
0
    get_parent_directory(parent_path);
3348
0
    if (stat(parent_path, &st) < 0)
3349
0
    {
3350
0
      if (errno != ENOENT)
3351
0
        ereport(FATAL,
3352
0
            errmsg("could not stat directory \"%s\": %m",
3353
0
                 dst_path));
3354
3355
      /* create the parent directory if needed and valid */
3356
0
      recovery_create_dbdir(parent_path, true);
3357
0
    }
3358
0
    pfree(parent_path);
3359
3360
    /*
3361
     * There's a case where the copy source directory is missing for the
3362
     * same reason above.  Create the empty source directory so that
3363
     * copydir below doesn't fail.  The directory will be dropped soon by
3364
     * recovery.
3365
     */
3366
0
    if (stat(src_path, &st) < 0 && errno == ENOENT)
3367
0
      recovery_create_dbdir(src_path, false);
3368
3369
    /*
3370
     * Force dirty buffers out to disk, to ensure source database is
3371
     * up-to-date for the copy.
3372
     */
3373
0
    FlushDatabaseBuffers(xlrec->src_db_id);
3374
3375
    /* Close all smgr fds in all backends. */
3376
0
    WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3377
3378
    /*
3379
     * Copy this subdirectory to the new location
3380
     *
3381
     * We don't need to copy subdirectories
3382
     */
3383
0
    copydir(src_path, dst_path, false);
3384
3385
0
    pfree(src_path);
3386
0
    pfree(dst_path);
3387
0
  }
3388
0
  else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3389
0
  {
3390
0
    xl_dbase_create_wal_log_rec *xlrec =
3391
0
      (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3392
0
    char     *dbpath;
3393
0
    char     *parent_path;
3394
3395
0
    dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3396
3397
    /* create the parent directory if needed and valid */
3398
0
    parent_path = pstrdup(dbpath);
3399
0
    get_parent_directory(parent_path);
3400
0
    recovery_create_dbdir(parent_path, true);
3401
3402
    /* Create the database directory with the version file. */
3403
0
    CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3404
0
                true);
3405
0
    pfree(dbpath);
3406
0
  }
3407
0
  else if (info == XLOG_DBASE_DROP)
3408
0
  {
3409
0
    xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3410
0
    char     *dst_path;
3411
0
    int     i;
3412
3413
0
    if (InHotStandby)
3414
0
    {
3415
      /*
3416
       * Lock database while we resolve conflicts to ensure that
3417
       * InitPostgres() cannot fully re-execute concurrently. This
3418
       * avoids backends re-connecting automatically to same database,
3419
       * which can happen in some cases.
3420
       *
3421
       * This will lock out walsenders trying to connect to db-specific
3422
       * slots for logical decoding too, so it's safe for us to drop
3423
       * slots.
3424
       */
3425
0
      LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3426
0
      ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3427
0
    }
3428
3429
    /* Drop any database-specific replication slots */
3430
0
    ReplicationSlotsDropDBSlots(xlrec->db_id);
3431
3432
    /* Drop pages for this database that are in the shared buffer cache */
3433
0
    DropDatabaseBuffers(xlrec->db_id);
3434
3435
    /* Also, clean out any fsync requests that might be pending in md.c */
3436
0
    ForgetDatabaseSyncRequests(xlrec->db_id);
3437
3438
    /* Clean out the xlog relcache too */
3439
0
    XLogDropDatabase(xlrec->db_id);
3440
3441
    /* Close all smgr fds in all backends. */
3442
0
    WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3443
3444
0
    for (i = 0; i < xlrec->ntablespaces; i++)
3445
0
    {
3446
0
      dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3447
3448
      /* And remove the physical files */
3449
0
      if (!rmtree(dst_path, true))
3450
0
        ereport(WARNING,
3451
0
            (errmsg("some useless files may be left behind in old database directory \"%s\"",
3452
0
                dst_path)));
3453
0
      pfree(dst_path);
3454
0
    }
3455
3456
0
    if (InHotStandby)
3457
0
    {
3458
      /*
3459
       * Release locks prior to commit. XXX There is a race condition
3460
       * here that may allow backends to reconnect, but the window for
3461
       * this is small because the gap between here and commit is mostly
3462
       * fairly small and it is unlikely that people will be dropping
3463
       * databases that we are trying to connect to anyway.
3464
       */
3465
0
      UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3466
0
    }
3467
0
  }
3468
0
  else
3469
0
    elog(PANIC, "dbase_redo: unknown op code %u", info);
3470
0
}