Coverage Report

Created: 2025-07-03 06:49

/src/postgres/src/backend/postmaster/pgarch.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * pgarch.c
4
 *
5
 *  PostgreSQL WAL archiver
6
 *
7
 *  All functions relating to archiver are included here
8
 *
9
 *  - All functions executed by archiver process
10
 *
11
 *  - archiver is forked from postmaster, and the two
12
 *  processes then communicate using signals. All functions
13
 *  executed by postmaster are included in this file.
14
 *
15
 *  Initial author: Simon Riggs   simon@2ndquadrant.com
16
 *
17
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
18
 * Portions Copyright (c) 1994, Regents of the University of California
19
 *
20
 *
21
 * IDENTIFICATION
22
 *    src/backend/postmaster/pgarch.c
23
 *
24
 *-------------------------------------------------------------------------
25
 */
26
#include "postgres.h"
27
28
#include <time.h>
29
#include <sys/stat.h>
30
#include <unistd.h>
31
32
#include "access/xlog.h"
33
#include "access/xlog_internal.h"
34
#include "archive/archive_module.h"
35
#include "archive/shell_archive.h"
36
#include "lib/binaryheap.h"
37
#include "libpq/pqsignal.h"
38
#include "pgstat.h"
39
#include "postmaster/auxprocess.h"
40
#include "postmaster/interrupt.h"
41
#include "postmaster/pgarch.h"
42
#include "storage/condition_variable.h"
43
#include "storage/aio_subsys.h"
44
#include "storage/fd.h"
45
#include "storage/ipc.h"
46
#include "storage/latch.h"
47
#include "storage/pmsignal.h"
48
#include "storage/proc.h"
49
#include "storage/procsignal.h"
50
#include "storage/shmem.h"
51
#include "utils/guc.h"
52
#include "utils/memutils.h"
53
#include "utils/ps_status.h"
54
#include "utils/resowner.h"
55
#include "utils/timeout.h"
56
57
58
/* ----------
59
 * Timer definitions.
60
 * ----------
61
 */
62
0
#define PGARCH_AUTOWAKE_INTERVAL 60 /* How often to force a poll of the
63
                   * archive status directory; in seconds. */
64
0
#define PGARCH_RESTART_INTERVAL 10  /* How often to attempt to restart a
65
                   * failed archiver; in seconds. */
66
67
/*
68
 * Maximum number of retries allowed when attempting to archive a WAL
69
 * file.
70
 */
71
0
#define NUM_ARCHIVE_RETRIES 3
72
73
/*
74
 * Maximum number of retries allowed when attempting to remove an
75
 * orphan archive status file.
76
 */
77
0
#define NUM_ORPHAN_CLEANUP_RETRIES 3
78
79
/*
80
 * Maximum number of .ready files to gather per directory scan.
81
 */
82
0
#define NUM_FILES_PER_DIRECTORY_SCAN 64
83
84
/* Shared memory area for archiver process */
85
typedef struct PgArchData
86
{
87
  int     pgprocno;   /* proc number of archiver process */
88
89
  /*
90
   * Forces a directory scan in pgarch_readyXlog().
91
   */
92
  pg_atomic_uint32 force_dir_scan;
93
} PgArchData;
94
95
char     *XLogArchiveLibrary = "";
96
char     *arch_module_check_errdetail_string;
97
98
99
/* ----------
100
 * Local data
101
 * ----------
102
 */
103
static time_t last_sigterm_time = 0;
104
static PgArchData *PgArch = NULL;
105
static const ArchiveModuleCallbacks *ArchiveCallbacks;
106
static ArchiveModuleState *archive_module_state;
107
static MemoryContext archive_context;
108
109
110
/*
111
 * Stuff for tracking multiple files to archive from each scan of
112
 * archive_status.  Minimizing the number of directory scans when there are
113
 * many files to archive can significantly improve archival rate.
114
 *
115
 * arch_heap is a max-heap that is used during the directory scan to track
116
 * the highest-priority files to archive.  After the directory scan
117
 * completes, the file names are stored in ascending order of priority in
118
 * arch_files.  pgarch_readyXlog() returns files from arch_files until it
119
 * is empty, at which point another directory scan must be performed.
120
 *
121
 * We only need this data in the archiver process, so make it a palloc'd
122
 * struct rather than a bunch of static arrays.
123
 */
124
struct arch_files_state
125
{
126
  binaryheap *arch_heap;
127
  int     arch_files_size;  /* number of live entries in arch_files[] */
128
  char     *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
129
  /* buffers underlying heap, and later arch_files[], entries: */
130
  char    arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS + 1];
131
};
132
133
static struct arch_files_state *arch_files = NULL;
134
135
/*
136
 * Flags set by interrupt handlers for later service in the main loop.
137
 */
138
static volatile sig_atomic_t ready_to_stop = false;
139
140
/* ----------
141
 * Local function forward declarations
142
 * ----------
143
 */
144
static void pgarch_waken_stop(SIGNAL_ARGS);
145
static void pgarch_MainLoop(void);
146
static void pgarch_ArchiverCopyLoop(void);
147
static bool pgarch_archiveXlog(char *xlog);
148
static bool pgarch_readyXlog(char *xlog);
149
static void pgarch_archiveDone(char *xlog);
150
static void pgarch_die(int code, Datum arg);
151
static void ProcessPgArchInterrupts(void);
152
static int  ready_file_comparator(Datum a, Datum b, void *arg);
153
static void LoadArchiveLibrary(void);
154
static void pgarch_call_module_shutdown_cb(int code, Datum arg);
155
156
/* Report shared memory space needed by PgArchShmemInit */
157
Size
158
PgArchShmemSize(void)
159
0
{
160
0
  Size    size = 0;
161
162
0
  size = add_size(size, sizeof(PgArchData));
163
164
0
  return size;
165
0
}
166
167
/* Allocate and initialize archiver-related shared memory */
168
void
169
PgArchShmemInit(void)
170
0
{
171
0
  bool    found;
172
173
0
  PgArch = (PgArchData *)
174
0
    ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
175
176
0
  if (!found)
177
0
  {
178
    /* First time through, so initialize */
179
0
    MemSet(PgArch, 0, PgArchShmemSize());
180
0
    PgArch->pgprocno = INVALID_PROC_NUMBER;
181
0
    pg_atomic_init_u32(&PgArch->force_dir_scan, 0);
182
0
  }
183
0
}
184
185
/*
186
 * PgArchCanRestart
187
 *
188
 * Return true and archiver is allowed to restart if enough time has
189
 * passed since it was launched last to reach PGARCH_RESTART_INTERVAL.
190
 * Otherwise return false.
191
 *
192
 * This is a safety valve to protect against continuous respawn attempts if the
193
 * archiver is dying immediately at launch. Note that since we will retry to
194
 * launch the archiver from the postmaster main loop, we will get another
195
 * chance later.
196
 */
197
bool
198
PgArchCanRestart(void)
199
0
{
200
0
  static time_t last_pgarch_start_time = 0;
201
0
  time_t    curtime = time(NULL);
202
203
  /*
204
   * Return false and don't restart archiver if too soon since last archiver
205
   * start.
206
   */
207
0
  if ((unsigned int) (curtime - last_pgarch_start_time) <
208
0
    (unsigned int) PGARCH_RESTART_INTERVAL)
209
0
    return false;
210
211
0
  last_pgarch_start_time = curtime;
212
0
  return true;
213
0
}
214
215
216
/* Main entry point for archiver process */
217
void
218
PgArchiverMain(const void *startup_data, size_t startup_data_len)
219
0
{
220
0
  Assert(startup_data_len == 0);
221
222
0
  MyBackendType = B_ARCHIVER;
223
0
  AuxiliaryProcessMainCommon();
224
225
  /*
226
   * Ignore all signals usually bound to some action in the postmaster,
227
   * except for SIGHUP, SIGTERM, SIGUSR1, SIGUSR2, and SIGQUIT.
228
   */
229
0
  pqsignal(SIGHUP, SignalHandlerForConfigReload);
230
0
  pqsignal(SIGINT, SIG_IGN);
231
0
  pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
232
  /* SIGQUIT handler was already set up by InitPostmasterChild */
233
0
  pqsignal(SIGALRM, SIG_IGN);
234
0
  pqsignal(SIGPIPE, SIG_IGN);
235
0
  pqsignal(SIGUSR1, procsignal_sigusr1_handler);
236
0
  pqsignal(SIGUSR2, pgarch_waken_stop);
237
238
  /* Reset some signals that are accepted by postmaster but not here */
239
0
  pqsignal(SIGCHLD, SIG_DFL);
240
241
  /* Unblock signals (they were blocked when the postmaster forked us) */
242
0
  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
243
244
  /* We shouldn't be launched unnecessarily. */
245
0
  Assert(XLogArchivingActive());
246
247
  /* Arrange to clean up at archiver exit */
248
0
  on_shmem_exit(pgarch_die, 0);
249
250
  /*
251
   * Advertise our proc number so that backends can use our latch to wake us
252
   * up while we're sleeping.
253
   */
254
0
  PgArch->pgprocno = MyProcNumber;
255
256
  /* Create workspace for pgarch_readyXlog() */
257
0
  arch_files = palloc(sizeof(struct arch_files_state));
258
0
  arch_files->arch_files_size = 0;
259
260
  /* Initialize our max-heap for prioritizing files to archive. */
261
0
  arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
262
0
                        ready_file_comparator, NULL);
263
264
  /* Initialize our memory context. */
265
0
  archive_context = AllocSetContextCreate(TopMemoryContext,
266
0
                      "archiver",
267
0
                      ALLOCSET_DEFAULT_SIZES);
268
269
  /* Load the archive_library. */
270
0
  LoadArchiveLibrary();
271
272
0
  pgarch_MainLoop();
273
274
0
  proc_exit(0);
275
0
}
276
277
/*
278
 * Wake up the archiver
279
 */
280
void
281
PgArchWakeup(void)
282
0
{
283
0
  int     arch_pgprocno = PgArch->pgprocno;
284
285
  /*
286
   * We don't acquire ProcArrayLock here.  It's actually fine because
287
   * procLatch isn't ever freed, so we just can potentially set the wrong
288
   * process' (or no process') latch.  Even in that case the archiver will
289
   * be relaunched shortly and will start archiving.
290
   */
291
0
  if (arch_pgprocno != INVALID_PROC_NUMBER)
292
0
    SetLatch(&ProcGlobal->allProcs[arch_pgprocno].procLatch);
293
0
}
294
295
296
/* SIGUSR2 signal handler for archiver process */
297
static void
298
pgarch_waken_stop(SIGNAL_ARGS)
299
0
{
300
  /* set flag to do a final cycle and shut down afterwards */
301
0
  ready_to_stop = true;
302
0
  SetLatch(MyLatch);
303
0
}
304
305
/*
306
 * pgarch_MainLoop
307
 *
308
 * Main loop for archiver
309
 */
310
static void
311
pgarch_MainLoop(void)
312
0
{
313
0
  bool    time_to_stop;
314
315
  /*
316
   * There shouldn't be anything for the archiver to do except to wait for a
317
   * signal ... however, the archiver exists to protect our data, so it
318
   * wakes up occasionally to allow itself to be proactive.
319
   */
320
0
  do
321
0
  {
322
0
    ResetLatch(MyLatch);
323
324
    /* When we get SIGUSR2, we do one more archive cycle, then exit */
325
0
    time_to_stop = ready_to_stop;
326
327
    /* Check for barrier events and config update */
328
0
    ProcessPgArchInterrupts();
329
330
    /*
331
     * If we've gotten SIGTERM, we normally just sit and do nothing until
332
     * SIGUSR2 arrives.  However, that means a random SIGTERM would
333
     * disable archiving indefinitely, which doesn't seem like a good
334
     * idea.  If more than 60 seconds pass since SIGTERM, exit anyway, so
335
     * that the postmaster can start a new archiver if needed.
336
     */
337
0
    if (ShutdownRequestPending)
338
0
    {
339
0
      time_t    curtime = time(NULL);
340
341
0
      if (last_sigterm_time == 0)
342
0
        last_sigterm_time = curtime;
343
0
      else if ((unsigned int) (curtime - last_sigterm_time) >=
344
0
           (unsigned int) 60)
345
0
        break;
346
0
    }
347
348
    /* Do what we're here for */
349
0
    pgarch_ArchiverCopyLoop();
350
351
    /*
352
     * Sleep until a signal is received, or until a poll is forced by
353
     * PGARCH_AUTOWAKE_INTERVAL, or until postmaster dies.
354
     */
355
0
    if (!time_to_stop)   /* Don't wait during last iteration */
356
0
    {
357
0
      int     rc;
358
359
0
      rc = WaitLatch(MyLatch,
360
0
               WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
361
0
               PGARCH_AUTOWAKE_INTERVAL * 1000L,
362
0
               WAIT_EVENT_ARCHIVER_MAIN);
363
0
      if (rc & WL_POSTMASTER_DEATH)
364
0
        time_to_stop = true;
365
0
    }
366
367
    /*
368
     * The archiver quits either when the postmaster dies (not expected)
369
     * or after completing one more archiving cycle after receiving
370
     * SIGUSR2.
371
     */
372
0
  } while (!time_to_stop);
373
0
}
374
375
/*
376
 * pgarch_ArchiverCopyLoop
377
 *
378
 * Archives all outstanding xlogs then returns
379
 */
380
static void
381
pgarch_ArchiverCopyLoop(void)
382
0
{
383
0
  char    xlog[MAX_XFN_CHARS + 1];
384
385
  /* force directory scan in the first call to pgarch_readyXlog() */
386
0
  arch_files->arch_files_size = 0;
387
388
  /*
389
   * loop through all xlogs with archive_status of .ready and archive
390
   * them...mostly we expect this to be a single file, though it is possible
391
   * some backend will add files onto the list of those that need archiving
392
   * while we are still copying earlier archives
393
   */
394
0
  while (pgarch_readyXlog(xlog))
395
0
  {
396
0
    int     failures = 0;
397
0
    int     failures_orphan = 0;
398
399
0
    for (;;)
400
0
    {
401
0
      struct stat stat_buf;
402
0
      char    pathname[MAXPGPATH];
403
404
      /*
405
       * Do not initiate any more archive commands after receiving
406
       * SIGTERM, nor after the postmaster has died unexpectedly. The
407
       * first condition is to try to keep from having init SIGKILL the
408
       * command, and the second is to avoid conflicts with another
409
       * archiver spawned by a newer postmaster.
410
       */
411
0
      if (ShutdownRequestPending || !PostmasterIsAlive())
412
0
        return;
413
414
      /*
415
       * Check for barrier events and config update.  This is so that
416
       * we'll adopt a new setting for archive_command as soon as
417
       * possible, even if there is a backlog of files to be archived.
418
       */
419
0
      ProcessPgArchInterrupts();
420
421
      /* Reset variables that might be set by the callback */
422
0
      arch_module_check_errdetail_string = NULL;
423
424
      /* can't do anything if not configured ... */
425
0
      if (ArchiveCallbacks->check_configured_cb != NULL &&
426
0
        !ArchiveCallbacks->check_configured_cb(archive_module_state))
427
0
      {
428
0
        ereport(WARNING,
429
0
            (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),
430
0
             arch_module_check_errdetail_string ?
431
0
             errdetail_internal("%s", arch_module_check_errdetail_string) : 0));
432
0
        return;
433
0
      }
434
435
      /*
436
       * Since archive status files are not removed in a durable manner,
437
       * a system crash could leave behind .ready files for WAL segments
438
       * that have already been recycled or removed.  In this case,
439
       * simply remove the orphan status file and move on.  unlink() is
440
       * used here as even on subsequent crashes the same orphan files
441
       * would get removed, so there is no need to worry about
442
       * durability.
443
       */
444
0
      snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
445
0
      if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
446
0
      {
447
0
        char    xlogready[MAXPGPATH];
448
449
0
        StatusFilePath(xlogready, xlog, ".ready");
450
0
        if (unlink(xlogready) == 0)
451
0
        {
452
0
          ereport(WARNING,
453
0
              (errmsg("removed orphan archive status file \"%s\"",
454
0
                  xlogready)));
455
456
          /* leave loop and move to the next status file */
457
0
          break;
458
0
        }
459
460
0
        if (++failures_orphan >= NUM_ORPHAN_CLEANUP_RETRIES)
461
0
        {
462
0
          ereport(WARNING,
463
0
              (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
464
0
                  xlogready)));
465
466
          /* give up cleanup of orphan status files */
467
0
          return;
468
0
        }
469
470
        /* wait a bit before retrying */
471
0
        pg_usleep(1000000L);
472
0
        continue;
473
0
      }
474
475
0
      if (pgarch_archiveXlog(xlog))
476
0
      {
477
        /* successful */
478
0
        pgarch_archiveDone(xlog);
479
480
        /*
481
         * Tell the cumulative stats system about the WAL file that we
482
         * successfully archived
483
         */
484
0
        pgstat_report_archiver(xlog, false);
485
486
0
        break;      /* out of inner retry loop */
487
0
      }
488
0
      else
489
0
      {
490
        /*
491
         * Tell the cumulative stats system about the WAL file that we
492
         * failed to archive
493
         */
494
0
        pgstat_report_archiver(xlog, true);
495
496
0
        if (++failures >= NUM_ARCHIVE_RETRIES)
497
0
        {
498
0
          ereport(WARNING,
499
0
              (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
500
0
                  xlog)));
501
0
          return;   /* give up archiving for now */
502
0
        }
503
0
        pg_usleep(1000000L);  /* wait a bit before retrying */
504
0
      }
505
0
    }
506
0
  }
507
0
}
508
509
/*
510
 * pgarch_archiveXlog
511
 *
512
 * Invokes archive_file_cb to copy one archive file to wherever it should go
513
 *
514
 * Returns true if successful
515
 */
516
static bool
517
pgarch_archiveXlog(char *xlog)
518
0
{
519
0
  sigjmp_buf  local_sigjmp_buf;
520
0
  MemoryContext oldcontext;
521
0
  char    pathname[MAXPGPATH];
522
0
  char    activitymsg[MAXFNAMELEN + 16];
523
0
  bool    ret;
524
525
0
  snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
526
527
  /* Report archive activity in PS display */
528
0
  snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
529
0
  set_ps_display(activitymsg);
530
531
0
  oldcontext = MemoryContextSwitchTo(archive_context);
532
533
  /*
534
   * Since the archiver operates at the bottom of the exception stack,
535
   * ERRORs turn into FATALs and cause the archiver process to restart.
536
   * However, using ereport(ERROR, ...) when there are problems is easy to
537
   * code and maintain.  Therefore, we create our own exception handler to
538
   * catch ERRORs and return false instead of restarting the archiver
539
   * whenever there is a failure.
540
   *
541
   * We assume ERRORs from the archiving callback are the most common
542
   * exceptions experienced by the archiver, so we opt to handle exceptions
543
   * here instead of PgArchiverMain() to avoid reinitializing the archiver
544
   * too frequently.  We could instead add a sigsetjmp() block to
545
   * PgArchiverMain() and use PG_TRY/PG_CATCH here, but the extra code to
546
   * avoid the odd archiver restart doesn't seem worth it.
547
   */
548
0
  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
549
0
  {
550
    /* Since not using PG_TRY, must reset error stack by hand */
551
0
    error_context_stack = NULL;
552
553
    /* Prevent interrupts while cleaning up */
554
0
    HOLD_INTERRUPTS();
555
556
    /* Report the error to the server log. */
557
0
    EmitErrorReport();
558
559
    /*
560
     * Try to clean up anything the archive module left behind.  We try to
561
     * cover anything that an archive module could conceivably have left
562
     * behind, but it is of course possible that modules could be doing
563
     * unexpected things that require additional cleanup.  Module authors
564
     * should be sure to do any extra required cleanup in a PG_CATCH block
565
     * within the archiving callback, and they are encouraged to notify
566
     * the pgsql-hackers mailing list so that we can add it here.
567
     */
568
0
    disable_all_timeouts(false);
569
0
    LWLockReleaseAll();
570
0
    ConditionVariableCancelSleep();
571
0
    pgstat_report_wait_end();
572
0
    pgaio_error_cleanup();
573
0
    ReleaseAuxProcessResources(false);
574
0
    AtEOXact_Files(false);
575
0
    AtEOXact_HashTables(false);
576
577
    /*
578
     * Return to the original memory context and clear ErrorContext for
579
     * next time.
580
     */
581
0
    MemoryContextSwitchTo(oldcontext);
582
0
    FlushErrorState();
583
584
    /* Flush any leaked data */
585
0
    MemoryContextReset(archive_context);
586
587
    /* Remove our exception handler */
588
0
    PG_exception_stack = NULL;
589
590
    /* Now we can allow interrupts again */
591
0
    RESUME_INTERRUPTS();
592
593
    /* Report failure so that the archiver retries this file */
594
0
    ret = false;
595
0
  }
596
0
  else
597
0
  {
598
    /* Enable our exception handler */
599
0
    PG_exception_stack = &local_sigjmp_buf;
600
601
    /* Archive the file! */
602
0
    ret = ArchiveCallbacks->archive_file_cb(archive_module_state,
603
0
                        xlog, pathname);
604
605
    /* Remove our exception handler */
606
0
    PG_exception_stack = NULL;
607
608
    /* Reset our memory context and switch back to the original one */
609
0
    MemoryContextSwitchTo(oldcontext);
610
0
    MemoryContextReset(archive_context);
611
0
  }
612
613
0
  if (ret)
614
0
    snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
615
0
  else
616
0
    snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
617
0
  set_ps_display(activitymsg);
618
619
0
  return ret;
620
0
}
621
622
/*
623
 * pgarch_readyXlog
624
 *
625
 * Return name of the oldest xlog file that has not yet been archived.
626
 * No notification is set that file archiving is now in progress, so
627
 * this would need to be extended if multiple concurrent archival
628
 * tasks were created. If a failure occurs, we will completely
629
 * re-copy the file at the next available opportunity.
630
 *
631
 * It is important that we return the oldest, so that we archive xlogs
632
 * in order that they were written, for two reasons:
633
 * 1) to maintain the sequential chain of xlogs required for recovery
634
 * 2) because the oldest ones will sooner become candidates for
635
 * recycling at time of checkpoint
636
 *
637
 * NOTE: the "oldest" comparison will consider any .history file to be older
638
 * than any other file except another .history file.  Segments on a timeline
639
 * with a smaller ID will be older than all segments on a timeline with a
640
 * larger ID; the net result being that past timelines are given higher
641
 * priority for archiving.  This seems okay, or at least not obviously worth
642
 * changing.
643
 */
644
static bool
645
pgarch_readyXlog(char *xlog)
646
0
{
647
0
  char    XLogArchiveStatusDir[MAXPGPATH];
648
0
  DIR      *rldir;
649
0
  struct dirent *rlde;
650
651
  /*
652
   * If a directory scan was requested, clear the stored file names and
653
   * proceed.
654
   */
655
0
  if (pg_atomic_exchange_u32(&PgArch->force_dir_scan, 0) == 1)
656
0
    arch_files->arch_files_size = 0;
657
658
  /*
659
   * If we still have stored file names from the previous directory scan,
660
   * try to return one of those.  We check to make sure the status file is
661
   * still present, as the archive_command for a previous file may have
662
   * already marked it done.
663
   */
664
0
  while (arch_files->arch_files_size > 0)
665
0
  {
666
0
    struct stat st;
667
0
    char    status_file[MAXPGPATH];
668
0
    char     *arch_file;
669
670
0
    arch_files->arch_files_size--;
671
0
    arch_file = arch_files->arch_files[arch_files->arch_files_size];
672
0
    StatusFilePath(status_file, arch_file, ".ready");
673
674
0
    if (stat(status_file, &st) == 0)
675
0
    {
676
0
      strcpy(xlog, arch_file);
677
0
      return true;
678
0
    }
679
0
    else if (errno != ENOENT)
680
0
      ereport(ERROR,
681
0
          (errcode_for_file_access(),
682
0
           errmsg("could not stat file \"%s\": %m", status_file)));
683
0
  }
684
685
  /* arch_heap is probably empty, but let's make sure */
686
0
  binaryheap_reset(arch_files->arch_heap);
687
688
  /*
689
   * Open the archive status directory and read through the list of files
690
   * with the .ready suffix, looking for the earliest files.
691
   */
692
0
  snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
693
0
  rldir = AllocateDir(XLogArchiveStatusDir);
694
695
0
  while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
696
0
  {
697
0
    int     basenamelen = (int) strlen(rlde->d_name) - 6;
698
0
    char    basename[MAX_XFN_CHARS + 1];
699
0
    char     *arch_file;
700
701
    /* Ignore entries with unexpected number of characters */
702
0
    if (basenamelen < MIN_XFN_CHARS ||
703
0
      basenamelen > MAX_XFN_CHARS)
704
0
      continue;
705
706
    /* Ignore entries with unexpected characters */
707
0
    if (strspn(rlde->d_name, VALID_XFN_CHARS) < basenamelen)
708
0
      continue;
709
710
    /* Ignore anything not suffixed with .ready */
711
0
    if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
712
0
      continue;
713
714
    /* Truncate off the .ready */
715
0
    memcpy(basename, rlde->d_name, basenamelen);
716
0
    basename[basenamelen] = '\0';
717
718
    /*
719
     * Store the file in our max-heap if it has a high enough priority.
720
     */
721
0
    if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
722
0
    {
723
      /* If the heap isn't full yet, quickly add it. */
724
0
      arch_file = arch_files->arch_filenames[binaryheap_size(arch_files->arch_heap)];
725
0
      strcpy(arch_file, basename);
726
0
      binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
727
728
      /* If we just filled the heap, make it a valid one. */
729
0
      if (binaryheap_size(arch_files->arch_heap) == NUM_FILES_PER_DIRECTORY_SCAN)
730
0
        binaryheap_build(arch_files->arch_heap);
731
0
    }
732
0
    else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
733
0
                     CStringGetDatum(basename), NULL) > 0)
734
0
    {
735
      /*
736
       * Remove the lowest priority file and add the current one to the
737
       * heap.
738
       */
739
0
      arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
740
0
      strcpy(arch_file, basename);
741
0
      binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
742
0
    }
743
0
  }
744
0
  FreeDir(rldir);
745
746
  /* If no files were found, simply return. */
747
0
  if (binaryheap_empty(arch_files->arch_heap))
748
0
    return false;
749
750
  /*
751
   * If we didn't fill the heap, we didn't make it a valid one.  Do that
752
   * now.
753
   */
754
0
  if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
755
0
    binaryheap_build(arch_files->arch_heap);
756
757
  /*
758
   * Fill arch_files array with the files to archive in ascending order of
759
   * priority.
760
   */
761
0
  arch_files->arch_files_size = binaryheap_size(arch_files->arch_heap);
762
0
  for (int i = 0; i < arch_files->arch_files_size; i++)
763
0
    arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
764
765
  /* Return the highest priority file. */
766
0
  arch_files->arch_files_size--;
767
0
  strcpy(xlog, arch_files->arch_files[arch_files->arch_files_size]);
768
769
0
  return true;
770
0
}
771
772
/*
773
 * ready_file_comparator
774
 *
775
 * Compares the archival priority of the given files to archive.  If "a"
776
 * has a higher priority than "b", a negative value will be returned.  If
777
 * "b" has a higher priority than "a", a positive value will be returned.
778
 * If "a" and "b" have equivalent values, 0 will be returned.
779
 */
780
static int
781
ready_file_comparator(Datum a, Datum b, void *arg)
782
0
{
783
0
  char     *a_str = DatumGetCString(a);
784
0
  char     *b_str = DatumGetCString(b);
785
0
  bool    a_history = IsTLHistoryFileName(a_str);
786
0
  bool    b_history = IsTLHistoryFileName(b_str);
787
788
  /* Timeline history files always have the highest priority. */
789
0
  if (a_history != b_history)
790
0
    return a_history ? -1 : 1;
791
792
  /* Priority is given to older files. */
793
0
  return strcmp(a_str, b_str);
794
0
}
795
796
/*
797
 * PgArchForceDirScan
798
 *
799
 * When called, the next call to pgarch_readyXlog() will perform a
800
 * directory scan.  This is useful for ensuring that important files such
801
 * as timeline history files are archived as quickly as possible.
802
 */
803
void
804
PgArchForceDirScan(void)
805
0
{
806
0
  pg_atomic_write_membarrier_u32(&PgArch->force_dir_scan, 1);
807
0
}
808
809
/*
810
 * pgarch_archiveDone
811
 *
812
 * Emit notification that an xlog file has been successfully archived.
813
 * We do this by renaming the status file from NNN.ready to NNN.done.
814
 * Eventually, a checkpoint process will notice this and delete both the
815
 * NNN.done file and the xlog file itself.
816
 */
817
static void
818
pgarch_archiveDone(char *xlog)
819
0
{
820
0
  char    rlogready[MAXPGPATH];
821
0
  char    rlogdone[MAXPGPATH];
822
823
0
  StatusFilePath(rlogready, xlog, ".ready");
824
0
  StatusFilePath(rlogdone, xlog, ".done");
825
826
  /*
827
   * To avoid extra overhead, we don't durably rename the .ready file to
828
   * .done.  Archive commands and libraries must gracefully handle attempts
829
   * to re-archive files (e.g., if the server crashes just before this
830
   * function is called), so it should be okay if the .ready file reappears
831
   * after a crash.
832
   */
833
0
  if (rename(rlogready, rlogdone) < 0)
834
0
    ereport(WARNING,
835
0
        (errcode_for_file_access(),
836
0
         errmsg("could not rename file \"%s\" to \"%s\": %m",
837
0
            rlogready, rlogdone)));
838
0
}
839
840
841
/*
842
 * pgarch_die
843
 *
844
 * Exit-time cleanup handler
845
 */
846
static void
847
pgarch_die(int code, Datum arg)
848
0
{
849
0
  PgArch->pgprocno = INVALID_PROC_NUMBER;
850
0
}
851
852
/*
853
 * Interrupt handler for WAL archiver process.
854
 *
855
 * This is called in the loops pgarch_MainLoop and pgarch_ArchiverCopyLoop.
856
 * It checks for barrier events, config update and request for logging of
857
 * memory contexts, but not shutdown request because how to handle
858
 * shutdown request is different between those loops.
859
 */
860
static void
861
ProcessPgArchInterrupts(void)
862
0
{
863
0
  if (ProcSignalBarrierPending)
864
0
    ProcessProcSignalBarrier();
865
866
  /* Perform logging of memory contexts of this process */
867
0
  if (LogMemoryContextPending)
868
0
    ProcessLogMemoryContextInterrupt();
869
870
0
  if (ConfigReloadPending)
871
0
  {
872
0
    char     *archiveLib = pstrdup(XLogArchiveLibrary);
873
0
    bool    archiveLibChanged;
874
875
0
    ConfigReloadPending = false;
876
0
    ProcessConfigFile(PGC_SIGHUP);
877
878
0
    if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
879
0
      ereport(ERROR,
880
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
881
0
           errmsg("both \"archive_command\" and \"archive_library\" set"),
882
0
           errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
883
884
0
    archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
885
0
    pfree(archiveLib);
886
887
0
    if (archiveLibChanged)
888
0
    {
889
      /*
890
       * Ideally, we would simply unload the previous archive module and
891
       * load the new one, but there is presently no mechanism for
892
       * unloading a library (see the comment above
893
       * internal_load_library()).  To deal with this, we simply restart
894
       * the archiver.  The new archive module will be loaded when the
895
       * new archiver process starts up.  Note that this triggers the
896
       * module's shutdown callback, if defined.
897
       */
898
0
      ereport(LOG,
899
0
          (errmsg("restarting archiver process because value of "
900
0
              "\"archive_library\" was changed")));
901
902
0
      proc_exit(0);
903
0
    }
904
0
  }
905
0
}
906
907
/*
908
 * LoadArchiveLibrary
909
 *
910
 * Loads the archiving callbacks into our local ArchiveCallbacks.
911
 */
912
static void
913
LoadArchiveLibrary(void)
914
0
{
915
0
  ArchiveModuleInit archive_init;
916
917
0
  if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
918
0
    ereport(ERROR,
919
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
920
0
         errmsg("both \"archive_command\" and \"archive_library\" set"),
921
0
         errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
922
923
  /*
924
   * If shell archiving is enabled, use our special initialization function.
925
   * Otherwise, load the library and call its _PG_archive_module_init().
926
   */
927
0
  if (XLogArchiveLibrary[0] == '\0')
928
0
    archive_init = shell_archive_init;
929
0
  else
930
0
    archive_init = (ArchiveModuleInit)
931
0
      load_external_function(XLogArchiveLibrary,
932
0
                   "_PG_archive_module_init", false, NULL);
933
934
0
  if (archive_init == NULL)
935
0
    ereport(ERROR,
936
0
        (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
937
938
0
  ArchiveCallbacks = (*archive_init) ();
939
940
0
  if (ArchiveCallbacks->archive_file_cb == NULL)
941
0
    ereport(ERROR,
942
0
        (errmsg("archive modules must register an archive callback")));
943
944
0
  archive_module_state = (ArchiveModuleState *) palloc0(sizeof(ArchiveModuleState));
945
0
  if (ArchiveCallbacks->startup_cb != NULL)
946
0
    ArchiveCallbacks->startup_cb(archive_module_state);
947
948
0
  before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
949
0
}
950
951
/*
952
 * Call the shutdown callback of the loaded archive module, if defined.
953
 */
954
static void
955
pgarch_call_module_shutdown_cb(int code, Datum arg)
956
0
{
957
0
  if (ArchiveCallbacks->shutdown_cb != NULL)
958
0
    ArchiveCallbacks->shutdown_cb(archive_module_state);
959
0
}