Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/libpq/pqcomm.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * pqcomm.c
4
 *    Communication functions between the Frontend and the Backend
5
 *
6
 * These routines handle the low-level details of communication between
7
 * frontend and backend.  They just shove data across the communication
8
 * channel, and are ignorant of the semantics of the data.
9
 *
10
 * To emit an outgoing message, use the routines in pqformat.c to construct
11
 * the message in a buffer and then emit it in one call to pq_putmessage.
12
 * There are no functions to send raw bytes or partial messages; this
13
 * ensures that the channel will not be clogged by an incomplete message if
14
 * execution is aborted by ereport(ERROR) partway through the message.
15
 *
16
 * At one time, libpq was shared between frontend and backend, but now
17
 * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
18
 * All that remains is similarities of names to trap the unwary...
19
 *
20
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
21
 * Portions Copyright (c) 1994, Regents of the University of California
22
 *
23
 *  src/backend/libpq/pqcomm.c
24
 *
25
 *-------------------------------------------------------------------------
26
 */
27
28
/*------------------------
29
 * INTERFACE ROUTINES
30
 *
31
 * setup/teardown:
32
 *    ListenServerPort  - Open postmaster's server port
33
 *    AcceptConnection  - Accept new connection with client
34
 *    TouchSocketFiles  - Protect socket files against /tmp cleaners
35
 *    pq_init       - initialize libpq at backend startup
36
 *    socket_comm_reset - reset libpq during error recovery
37
 *    socket_close    - shutdown libpq at backend exit
38
 *
39
 * low-level I/O:
40
 *    pq_getbytes   - get a known number of bytes from connection
41
 *    pq_getmessage - get a message with length word from connection
42
 *    pq_getbyte    - get next byte from connection
43
 *    pq_peekbyte   - peek at next byte from connection
44
 *    pq_flush    - flush pending output
45
 *    pq_flush_if_writable - flush pending output if writable without blocking
46
 *    pq_getbyte_if_available - get a byte if available without blocking
47
 *
48
 * message-level I/O
49
 *    pq_putmessage - send a normal message (suppressed in COPY OUT mode)
50
 *    pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
51
 *
52
 *------------------------
53
 */
54
#include "postgres.h"
55
56
#ifdef HAVE_POLL_H
57
#include <poll.h>
58
#endif
59
#include <signal.h>
60
#include <fcntl.h>
61
#include <grp.h>
62
#include <unistd.h>
63
#include <sys/file.h>
64
#include <sys/socket.h>
65
#include <sys/stat.h>
66
#include <sys/time.h>
67
#include <netdb.h>
68
#include <netinet/in.h>
69
#include <netinet/tcp.h>
70
#include <utime.h>
71
#ifdef WIN32
72
#include <mstcpip.h>
73
#endif
74
75
#include "common/ip.h"
76
#include "libpq/libpq.h"
77
#include "miscadmin.h"
78
#include "port/pg_bswap.h"
79
#include "postmaster/postmaster.h"
80
#include "storage/ipc.h"
81
#include "utils/guc_hooks.h"
82
#include "utils/memutils.h"
83
84
/*
85
 * Cope with the various platform-specific ways to spell TCP keepalive socket
86
 * options.  This doesn't cover Windows, which as usual does its own thing.
87
 */
88
#if defined(TCP_KEEPIDLE)
89
/* TCP_KEEPIDLE is the name of this option on Linux and *BSD */
90
0
#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPIDLE
91
#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPIDLE"
92
#elif defined(TCP_KEEPALIVE_THRESHOLD)
93
/* TCP_KEEPALIVE_THRESHOLD is the name of this option on Solaris >= 11 */
94
#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE_THRESHOLD
95
#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE_THRESHOLD"
96
#elif defined(TCP_KEEPALIVE) && defined(__darwin__)
97
/* TCP_KEEPALIVE is the name of this option on macOS */
98
/* Caution: Solaris has this symbol but it means something different */
99
#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE
100
#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE"
101
#endif
102
103
/*
104
 * Configuration options
105
 */
106
int     Unix_socket_permissions;
107
char     *Unix_socket_group;
108
109
/* Where the Unix socket files are (list of palloc'd strings) */
110
static List *sock_paths = NIL;
111
112
/*
113
 * Buffers for low-level I/O.
114
 *
115
 * The receive buffer is fixed size. Send buffer is usually 8k, but can be
116
 * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
117
 */
118
119
0
#define PQ_SEND_BUFFER_SIZE 8192
120
0
#define PQ_RECV_BUFFER_SIZE 8192
121
122
static char *PqSendBuffer;
123
static int  PqSendBufferSize; /* Size send buffer */
124
static size_t PqSendPointer;  /* Next index to store a byte in PqSendBuffer */
125
static size_t PqSendStart;    /* Next index to send a byte in PqSendBuffer */
126
127
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
128
static int  PqRecvPointer;    /* Next index to read a byte from PqRecvBuffer */
129
static int  PqRecvLength;   /* End of data available in PqRecvBuffer */
130
131
/*
132
 * Message status
133
 */
134
static bool PqCommBusy;     /* busy sending data to the client */
135
static bool PqCommReadingMsg; /* in the middle of reading a message */
136
137
138
/* Internal functions */
139
static void socket_comm_reset(void);
140
static void socket_close(int code, Datum arg);
141
static void socket_set_nonblocking(bool nonblocking);
142
static int  socket_flush(void);
143
static int  socket_flush_if_writable(void);
144
static bool socket_is_send_pending(void);
145
static int  socket_putmessage(char msgtype, const char *s, size_t len);
146
static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
147
static inline int internal_putbytes(const void *b, size_t len);
148
static inline int internal_flush(void);
149
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start,
150
                       size_t *end);
151
152
static int  Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath);
153
static int  Setup_AF_UNIX(const char *sock_path);
154
155
static const PQcommMethods PqCommSocketMethods = {
156
  .comm_reset = socket_comm_reset,
157
  .flush = socket_flush,
158
  .flush_if_writable = socket_flush_if_writable,
159
  .is_send_pending = socket_is_send_pending,
160
  .putmessage = socket_putmessage,
161
  .putmessage_noblock = socket_putmessage_noblock
162
};
163
164
const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
165
166
WaitEventSet *FeBeWaitSet;
167
168
169
/* --------------------------------
170
 *    pq_init - initialize libpq at backend startup
171
 * --------------------------------
172
 */
173
Port *
174
pq_init(ClientSocket *client_sock)
175
0
{
176
0
  Port     *port;
177
0
  int     socket_pos PG_USED_FOR_ASSERTS_ONLY;
178
0
  int     latch_pos PG_USED_FOR_ASSERTS_ONLY;
179
180
  /* allocate the Port struct and copy the ClientSocket contents to it */
181
0
  port = palloc0(sizeof(Port));
182
0
  port->sock = client_sock->sock;
183
0
  memcpy(&port->raddr.addr, &client_sock->raddr.addr, client_sock->raddr.salen);
184
0
  port->raddr.salen = client_sock->raddr.salen;
185
186
  /* fill in the server (local) address */
187
0
  port->laddr.salen = sizeof(port->laddr.addr);
188
0
  if (getsockname(port->sock,
189
0
          (struct sockaddr *) &port->laddr.addr,
190
0
          &port->laddr.salen) < 0)
191
0
  {
192
0
    ereport(FATAL,
193
0
        (errmsg("%s() failed: %m", "getsockname")));
194
0
  }
195
196
  /* select NODELAY and KEEPALIVE options if it's a TCP connection */
197
0
  if (port->laddr.addr.ss_family != AF_UNIX)
198
0
  {
199
0
    int     on;
200
#ifdef WIN32
201
    int     oldopt;
202
    int     optlen;
203
    int     newopt;
204
#endif
205
206
0
#ifdef  TCP_NODELAY
207
0
    on = 1;
208
0
    if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
209
0
             (char *) &on, sizeof(on)) < 0)
210
0
    {
211
0
      ereport(FATAL,
212
0
          (errmsg("%s(%s) failed: %m", "setsockopt", "TCP_NODELAY")));
213
0
    }
214
0
#endif
215
0
    on = 1;
216
0
    if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
217
0
             (char *) &on, sizeof(on)) < 0)
218
0
    {
219
0
      ereport(FATAL,
220
0
          (errmsg("%s(%s) failed: %m", "setsockopt", "SO_KEEPALIVE")));
221
0
    }
222
223
#ifdef WIN32
224
225
    /*
226
     * This is a Win32 socket optimization.  The OS send buffer should be
227
     * large enough to send the whole Postgres send buffer in one go, or
228
     * performance suffers.  The Postgres send buffer can be enlarged if a
229
     * very large message needs to be sent, but we won't attempt to
230
     * enlarge the OS buffer if that happens, so somewhat arbitrarily
231
     * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
232
     * (That's 32kB with the current default).
233
     *
234
     * The default OS buffer size used to be 8kB in earlier Windows
235
     * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
236
     * be necessary to change it in later versions anymore.  Changing it
237
     * unnecessarily can even reduce performance, because setting
238
     * SO_SNDBUF in the application disables the "dynamic send buffering"
239
     * feature that was introduced in Windows 7.  So before fiddling with
240
     * SO_SNDBUF, check if the current buffer size is already large enough
241
     * and only increase it if necessary.
242
     *
243
     * See https://support.microsoft.com/kb/823764/EN-US/ and
244
     * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
245
     */
246
    optlen = sizeof(oldopt);
247
    if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
248
             &optlen) < 0)
249
    {
250
      ereport(FATAL,
251
          (errmsg("%s(%s) failed: %m", "getsockopt", "SO_SNDBUF")));
252
    }
253
    newopt = PQ_SEND_BUFFER_SIZE * 4;
254
    if (oldopt < newopt)
255
    {
256
      if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
257
               sizeof(newopt)) < 0)
258
      {
259
        ereport(FATAL,
260
            (errmsg("%s(%s) failed: %m", "setsockopt", "SO_SNDBUF")));
261
      }
262
    }
263
#endif
264
265
    /*
266
     * Also apply the current keepalive parameters.  If we fail to set a
267
     * parameter, don't error out, because these aren't universally
268
     * supported.  (Note: you might think we need to reset the GUC
269
     * variables to 0 in such a case, but it's not necessary because the
270
     * show hooks for these variables report the truth anyway.)
271
     */
272
0
    (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
273
0
    (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
274
0
    (void) pq_setkeepalivescount(tcp_keepalives_count, port);
275
0
    (void) pq_settcpusertimeout(tcp_user_timeout, port);
276
0
  }
277
278
  /* initialize state variables */
279
0
  PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
280
0
  PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
281
0
  PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
282
0
  PqCommBusy = false;
283
0
  PqCommReadingMsg = false;
284
285
  /* set up process-exit hook to close the socket */
286
0
  on_proc_exit(socket_close, 0);
287
288
  /*
289
   * In backends (as soon as forked) we operate the underlying socket in
290
   * nonblocking mode and use latches to implement blocking semantics if
291
   * needed. That allows us to provide safely interruptible reads and
292
   * writes.
293
   */
294
0
#ifndef WIN32
295
0
  if (!pg_set_noblock(port->sock))
296
0
    ereport(FATAL,
297
0
        (errmsg("could not set socket to nonblocking mode: %m")));
298
0
#endif
299
300
0
#ifndef WIN32
301
302
  /* Don't give the socket to any subprograms we execute. */
303
0
  if (fcntl(port->sock, F_SETFD, FD_CLOEXEC) < 0)
304
0
    elog(FATAL, "fcntl(F_SETFD) failed on socket: %m");
305
0
#endif
306
307
0
  FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
308
0
  socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
309
0
                   port->sock, NULL, NULL);
310
0
  latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
311
0
                  MyLatch, NULL);
312
0
  AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
313
0
            NULL, NULL);
314
315
  /*
316
   * The event positions match the order we added them, but let's sanity
317
   * check them to be sure.
318
   */
319
0
  Assert(socket_pos == FeBeWaitSetSocketPos);
320
0
  Assert(latch_pos == FeBeWaitSetLatchPos);
321
322
0
  return port;
323
0
}
324
325
/* --------------------------------
326
 *    socket_comm_reset - reset libpq during error recovery
327
 *
328
 * This is called from error recovery at the outer idle loop.  It's
329
 * just to get us out of trouble if we somehow manage to elog() from
330
 * inside a pqcomm.c routine (which ideally will never happen, but...)
331
 * --------------------------------
332
 */
333
static void
334
socket_comm_reset(void)
335
0
{
336
  /* Do not throw away pending data, but do reset the busy flag */
337
0
  PqCommBusy = false;
338
0
}
339
340
/* --------------------------------
341
 *    socket_close - shutdown libpq at backend exit
342
 *
343
 * This is the one pg_on_exit_callback in place during BackendInitialize().
344
 * That function's unusual signal handling constrains that this callback be
345
 * safe to run at any instant.
346
 * --------------------------------
347
 */
348
static void
349
socket_close(int code, Datum arg)
350
0
{
351
  /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
352
0
  if (MyProcPort != NULL)
353
0
  {
354
#ifdef ENABLE_GSS
355
    /*
356
     * Shutdown GSSAPI layer.  This section does nothing when interrupting
357
     * BackendInitialize(), because pg_GSS_recvauth() makes first use of
358
     * "ctx" and "cred".
359
     *
360
     * Note that we don't bother to free MyProcPort->gss, since we're
361
     * about to exit anyway.
362
     */
363
    if (MyProcPort->gss)
364
    {
365
      OM_uint32 min_s;
366
367
      if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
368
        gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
369
370
      if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
371
        gss_release_cred(&min_s, &MyProcPort->gss->cred);
372
    }
373
#endif              /* ENABLE_GSS */
374
375
    /*
376
     * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
377
     * call this, so this is safe when interrupting BackendInitialize().
378
     */
379
0
    secure_close(MyProcPort);
380
381
    /*
382
     * Formerly we did an explicit close() here, but it seems better to
383
     * leave the socket open until the process dies.  This allows clients
384
     * to perform a "synchronous close" if they care --- wait till the
385
     * transport layer reports connection closure, and you can be sure the
386
     * backend has exited.
387
     *
388
     * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
389
     * though.
390
     */
391
0
    MyProcPort->sock = PGINVALID_SOCKET;
392
0
  }
393
0
}
394
395
396
397
/* --------------------------------
398
 * Postmaster functions to handle sockets.
399
 * --------------------------------
400
 */
401
402
/*
403
 * ListenServerPort -- open a "listening" port to accept connections.
404
 *
405
 * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
406
 * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
407
 * specified.  For TCP ports, hostName is either NULL for all interfaces or
408
 * the interface to listen on, and unixSocketDir is ignored (can be NULL).
409
 *
410
 * Successfully opened sockets are appended to the ListenSockets[] array.  On
411
 * entry, *NumListenSockets holds the number of elements currently in the
412
 * array, and it is updated to reflect the opened sockets.  MaxListen is the
413
 * allocated size of the array.
414
 *
415
 * RETURNS: STATUS_OK or STATUS_ERROR
416
 */
417
int
418
ListenServerPort(int family, const char *hostName, unsigned short portNumber,
419
         const char *unixSocketDir,
420
         pgsocket ListenSockets[], int *NumListenSockets, int MaxListen)
421
0
{
422
0
  pgsocket  fd;
423
0
  int     err;
424
0
  int     maxconn;
425
0
  int     ret;
426
0
  char    portNumberStr[32];
427
0
  const char *familyDesc;
428
0
  char    familyDescBuf[64];
429
0
  const char *addrDesc;
430
0
  char    addrBuf[NI_MAXHOST];
431
0
  char     *service;
432
0
  struct addrinfo *addrs = NULL,
433
0
         *addr;
434
0
  struct addrinfo hint;
435
0
  int     added = 0;
436
0
  char    unixSocketPath[MAXPGPATH];
437
0
#if !defined(WIN32) || defined(IPV6_V6ONLY)
438
0
  int     one = 1;
439
0
#endif
440
441
  /* Initialize hint structure */
442
0
  MemSet(&hint, 0, sizeof(hint));
443
0
  hint.ai_family = family;
444
0
  hint.ai_flags = AI_PASSIVE;
445
0
  hint.ai_socktype = SOCK_STREAM;
446
447
0
  if (family == AF_UNIX)
448
0
  {
449
    /*
450
     * Create unixSocketPath from portNumber and unixSocketDir and lock
451
     * that file path
452
     */
453
0
    UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
454
0
    if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
455
0
    {
456
0
      ereport(LOG,
457
0
          (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
458
0
              unixSocketPath,
459
0
              (int) (UNIXSOCK_PATH_BUFLEN - 1))));
460
0
      return STATUS_ERROR;
461
0
    }
462
0
    if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
463
0
      return STATUS_ERROR;
464
0
    service = unixSocketPath;
465
0
  }
466
0
  else
467
0
  {
468
0
    snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
469
0
    service = portNumberStr;
470
0
  }
471
472
0
  ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
473
0
  if (ret || !addrs)
474
0
  {
475
0
    if (hostName)
476
0
      ereport(LOG,
477
0
          (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
478
0
              hostName, service, gai_strerror(ret))));
479
0
    else
480
0
      ereport(LOG,
481
0
          (errmsg("could not translate service \"%s\" to address: %s",
482
0
              service, gai_strerror(ret))));
483
0
    if (addrs)
484
0
      pg_freeaddrinfo_all(hint.ai_family, addrs);
485
0
    return STATUS_ERROR;
486
0
  }
487
488
0
  for (addr = addrs; addr; addr = addr->ai_next)
489
0
  {
490
0
    if (family != AF_UNIX && addr->ai_family == AF_UNIX)
491
0
    {
492
      /*
493
       * Only set up a unix domain socket when they really asked for it.
494
       * The service/port is different in that case.
495
       */
496
0
      continue;
497
0
    }
498
499
    /* See if there is still room to add 1 more socket. */
500
0
    if (*NumListenSockets == MaxListen)
501
0
    {
502
0
      ereport(LOG,
503
0
          (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
504
0
              MaxListen)));
505
0
      break;
506
0
    }
507
508
    /* set up address family name for log messages */
509
0
    switch (addr->ai_family)
510
0
    {
511
0
      case AF_INET:
512
0
        familyDesc = _("IPv4");
513
0
        break;
514
0
      case AF_INET6:
515
0
        familyDesc = _("IPv6");
516
0
        break;
517
0
      case AF_UNIX:
518
0
        familyDesc = _("Unix");
519
0
        break;
520
0
      default:
521
0
        snprintf(familyDescBuf, sizeof(familyDescBuf),
522
0
             _("unrecognized address family %d"),
523
0
             addr->ai_family);
524
0
        familyDesc = familyDescBuf;
525
0
        break;
526
0
    }
527
528
    /* set up text form of address for log messages */
529
0
    if (addr->ai_family == AF_UNIX)
530
0
      addrDesc = unixSocketPath;
531
0
    else
532
0
    {
533
0
      pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
534
0
                 addr->ai_addrlen,
535
0
                 addrBuf, sizeof(addrBuf),
536
0
                 NULL, 0,
537
0
                 NI_NUMERICHOST);
538
0
      addrDesc = addrBuf;
539
0
    }
540
541
0
    if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
542
0
    {
543
0
      ereport(LOG,
544
0
          (errcode_for_socket_access(),
545
      /* translator: first %s is IPv4, IPv6, or Unix */
546
0
           errmsg("could not create %s socket for address \"%s\": %m",
547
0
              familyDesc, addrDesc)));
548
0
      continue;
549
0
    }
550
551
0
#ifndef WIN32
552
    /* Don't give the listen socket to any subprograms we execute. */
553
0
    if (fcntl(fd, F_SETFD, FD_CLOEXEC) < 0)
554
0
      elog(FATAL, "fcntl(F_SETFD) failed on socket: %m");
555
556
    /*
557
     * Without the SO_REUSEADDR flag, a new postmaster can't be started
558
     * right away after a stop or crash, giving "address already in use"
559
     * error on TCP ports.
560
     *
561
     * On win32, however, this behavior only happens if the
562
     * SO_EXCLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows
563
     * multiple servers to listen on the same address, resulting in
564
     * unpredictable behavior. With no flags at all, win32 behaves as Unix
565
     * with SO_REUSEADDR.
566
     */
567
0
    if (addr->ai_family != AF_UNIX)
568
0
    {
569
0
      if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
570
0
              (char *) &one, sizeof(one))) == -1)
571
0
      {
572
0
        ereport(LOG,
573
0
            (errcode_for_socket_access(),
574
        /* translator: third %s is IPv4 or IPv6 */
575
0
             errmsg("%s(%s) failed for %s address \"%s\": %m",
576
0
                "setsockopt", "SO_REUSEADDR",
577
0
                familyDesc, addrDesc)));
578
0
        closesocket(fd);
579
0
        continue;
580
0
      }
581
0
    }
582
0
#endif
583
584
0
#ifdef IPV6_V6ONLY
585
0
    if (addr->ai_family == AF_INET6)
586
0
    {
587
0
      if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
588
0
               (char *) &one, sizeof(one)) == -1)
589
0
      {
590
0
        ereport(LOG,
591
0
            (errcode_for_socket_access(),
592
        /* translator: third %s is IPv6 */
593
0
             errmsg("%s(%s) failed for %s address \"%s\": %m",
594
0
                "setsockopt", "IPV6_V6ONLY",
595
0
                familyDesc, addrDesc)));
596
0
        closesocket(fd);
597
0
        continue;
598
0
      }
599
0
    }
600
0
#endif
601
602
    /*
603
     * Note: This might fail on some OS's, like Linux older than
604
     * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
605
     * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
606
     * connections.
607
     */
608
0
    err = bind(fd, addr->ai_addr, addr->ai_addrlen);
609
0
    if (err < 0)
610
0
    {
611
0
      int     saved_errno = errno;
612
613
0
      ereport(LOG,
614
0
          (errcode_for_socket_access(),
615
      /* translator: first %s is IPv4, IPv6, or Unix */
616
0
           errmsg("could not bind %s address \"%s\": %m",
617
0
              familyDesc, addrDesc),
618
0
           saved_errno == EADDRINUSE ?
619
0
           (addr->ai_family == AF_UNIX ?
620
0
            errhint("Is another postmaster already running on port %d?",
621
0
                (int) portNumber) :
622
0
            errhint("Is another postmaster already running on port %d?"
623
0
                " If not, wait a few seconds and retry.",
624
0
                (int) portNumber)) : 0));
625
0
      closesocket(fd);
626
0
      continue;
627
0
    }
628
629
0
    if (addr->ai_family == AF_UNIX)
630
0
    {
631
0
      if (Setup_AF_UNIX(service) != STATUS_OK)
632
0
      {
633
0
        closesocket(fd);
634
0
        break;
635
0
      }
636
0
    }
637
638
    /*
639
     * Select appropriate accept-queue length limit.  It seems reasonable
640
     * to use a value similar to the maximum number of child processes
641
     * that the postmaster will permit.
642
     */
643
0
    maxconn = MaxConnections * 2;
644
645
0
    err = listen(fd, maxconn);
646
0
    if (err < 0)
647
0
    {
648
0
      ereport(LOG,
649
0
          (errcode_for_socket_access(),
650
      /* translator: first %s is IPv4, IPv6, or Unix */
651
0
           errmsg("could not listen on %s address \"%s\": %m",
652
0
              familyDesc, addrDesc)));
653
0
      closesocket(fd);
654
0
      continue;
655
0
    }
656
657
0
    if (addr->ai_family == AF_UNIX)
658
0
      ereport(LOG,
659
0
          (errmsg("listening on Unix socket \"%s\"",
660
0
              addrDesc)));
661
0
    else
662
0
      ereport(LOG,
663
      /* translator: first %s is IPv4 or IPv6 */
664
0
          (errmsg("listening on %s address \"%s\", port %d",
665
0
              familyDesc, addrDesc, (int) portNumber)));
666
667
0
    ListenSockets[*NumListenSockets] = fd;
668
0
    (*NumListenSockets)++;
669
0
    added++;
670
0
  }
671
672
0
  pg_freeaddrinfo_all(hint.ai_family, addrs);
673
674
0
  if (!added)
675
0
    return STATUS_ERROR;
676
677
0
  return STATUS_OK;
678
0
}
679
680
681
/*
682
 * Lock_AF_UNIX -- configure unix socket file path
683
 */
684
static int
685
Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath)
686
0
{
687
  /* no lock file for abstract sockets */
688
0
  if (unixSocketPath[0] == '@')
689
0
    return STATUS_OK;
690
691
  /*
692
   * Grab an interlock file associated with the socket file.
693
   *
694
   * Note: there are two reasons for using a socket lock file, rather than
695
   * trying to interlock directly on the socket itself.  First, it's a lot
696
   * more portable, and second, it lets us remove any pre-existing socket
697
   * file without race conditions.
698
   */
699
0
  CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
700
701
  /*
702
   * Once we have the interlock, we can safely delete any pre-existing
703
   * socket file to avoid failure at bind() time.
704
   */
705
0
  (void) unlink(unixSocketPath);
706
707
  /*
708
   * Remember socket file pathnames for later maintenance.
709
   */
710
0
  sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
711
712
0
  return STATUS_OK;
713
0
}
714
715
716
/*
717
 * Setup_AF_UNIX -- configure unix socket permissions
718
 */
719
static int
720
Setup_AF_UNIX(const char *sock_path)
721
0
{
722
  /* no file system permissions for abstract sockets */
723
0
  if (sock_path[0] == '@')
724
0
    return STATUS_OK;
725
726
  /*
727
   * Fix socket ownership/permission if requested.  Note we must do this
728
   * before we listen() to avoid a window where unwanted connections could
729
   * get accepted.
730
   */
731
0
  Assert(Unix_socket_group);
732
0
  if (Unix_socket_group[0] != '\0')
733
0
  {
734
#ifdef WIN32
735
    elog(WARNING, "configuration item \"unix_socket_group\" is not supported on this platform");
736
#else
737
0
    char     *endptr;
738
0
    unsigned long val;
739
0
    gid_t   gid;
740
741
0
    val = strtoul(Unix_socket_group, &endptr, 10);
742
0
    if (*endptr == '\0')
743
0
    {           /* numeric group id */
744
0
      gid = val;
745
0
    }
746
0
    else
747
0
    {           /* convert group name to id */
748
0
      struct group *gr;
749
750
0
      gr = getgrnam(Unix_socket_group);
751
0
      if (!gr)
752
0
      {
753
0
        ereport(LOG,
754
0
            (errmsg("group \"%s\" does not exist",
755
0
                Unix_socket_group)));
756
0
        return STATUS_ERROR;
757
0
      }
758
0
      gid = gr->gr_gid;
759
0
    }
760
0
    if (chown(sock_path, -1, gid) == -1)
761
0
    {
762
0
      ereport(LOG,
763
0
          (errcode_for_file_access(),
764
0
           errmsg("could not set group of file \"%s\": %m",
765
0
              sock_path)));
766
0
      return STATUS_ERROR;
767
0
    }
768
0
#endif
769
0
  }
770
771
0
  if (chmod(sock_path, Unix_socket_permissions) == -1)
772
0
  {
773
0
    ereport(LOG,
774
0
        (errcode_for_file_access(),
775
0
         errmsg("could not set permissions of file \"%s\": %m",
776
0
            sock_path)));
777
0
    return STATUS_ERROR;
778
0
  }
779
0
  return STATUS_OK;
780
0
}
781
782
783
/*
784
 * AcceptConnection -- accept a new connection with client using
785
 *    server port.  Fills *client_sock with the FD and endpoint info
786
 *    of the new connection.
787
 *
788
 * ASSUME: that this doesn't need to be non-blocking because
789
 *    the Postmaster waits for the socket to be ready to accept().
790
 *
791
 * RETURNS: STATUS_OK or STATUS_ERROR
792
 */
793
int
794
AcceptConnection(pgsocket server_fd, ClientSocket *client_sock)
795
0
{
796
  /* accept connection and fill in the client (remote) address */
797
0
  client_sock->raddr.salen = sizeof(client_sock->raddr.addr);
798
0
  if ((client_sock->sock = accept(server_fd,
799
0
                  (struct sockaddr *) &client_sock->raddr.addr,
800
0
                  &client_sock->raddr.salen)) == PGINVALID_SOCKET)
801
0
  {
802
0
    ereport(LOG,
803
0
        (errcode_for_socket_access(),
804
0
         errmsg("could not accept new connection: %m")));
805
806
    /*
807
     * If accept() fails then postmaster.c will still see the server
808
     * socket as read-ready, and will immediately try again.  To avoid
809
     * uselessly sucking lots of CPU, delay a bit before trying again.
810
     * (The most likely reason for failure is being out of kernel file
811
     * table slots; we can do little except hope some will get freed up.)
812
     */
813
0
    pg_usleep(100000L);   /* wait 0.1 sec */
814
0
    return STATUS_ERROR;
815
0
  }
816
817
0
  return STATUS_OK;
818
0
}
819
820
/*
821
 * TouchSocketFiles -- mark socket files as recently accessed
822
 *
823
 * This routine should be called every so often to ensure that the socket
824
 * files have a recent mod date (ordinary operations on sockets usually won't
825
 * change the mod date).  That saves them from being removed by
826
 * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
827
 * never have put the socket file in /tmp...)
828
 */
829
void
830
TouchSocketFiles(void)
831
0
{
832
0
  ListCell   *l;
833
834
  /* Loop through all created sockets... */
835
0
  foreach(l, sock_paths)
836
0
  {
837
0
    char     *sock_path = (char *) lfirst(l);
838
839
    /* Ignore errors; there's no point in complaining */
840
0
    (void) utime(sock_path, NULL);
841
0
  }
842
0
}
843
844
/*
845
 * RemoveSocketFiles -- unlink socket files at postmaster shutdown
846
 */
847
void
848
RemoveSocketFiles(void)
849
0
{
850
0
  ListCell   *l;
851
852
  /* Loop through all created sockets... */
853
0
  foreach(l, sock_paths)
854
0
  {
855
0
    char     *sock_path = (char *) lfirst(l);
856
857
    /* Ignore any error. */
858
0
    (void) unlink(sock_path);
859
0
  }
860
  /* Since we're about to exit, no need to reclaim storage */
861
0
  sock_paths = NIL;
862
0
}
863
864
865
/* --------------------------------
866
 * Low-level I/O routines begin here.
867
 *
868
 * These routines communicate with a frontend client across a connection
869
 * already established by the preceding routines.
870
 * --------------------------------
871
 */
872
873
/* --------------------------------
874
 *        socket_set_nonblocking - set socket blocking/non-blocking
875
 *
876
 * Sets the socket non-blocking if nonblocking is true, or sets it
877
 * blocking otherwise.
878
 * --------------------------------
879
 */
880
static void
881
socket_set_nonblocking(bool nonblocking)
882
0
{
883
0
  if (MyProcPort == NULL)
884
0
    ereport(ERROR,
885
0
        (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
886
0
         errmsg("there is no client connection")));
887
888
0
  MyProcPort->noblock = nonblocking;
889
0
}
890
891
/* --------------------------------
892
 *    pq_recvbuf - load some bytes into the input buffer
893
 *
894
 *    returns 0 if OK, EOF if trouble
895
 * --------------------------------
896
 */
897
static int
898
pq_recvbuf(void)
899
0
{
900
0
  if (PqRecvPointer > 0)
901
0
  {
902
0
    if (PqRecvLength > PqRecvPointer)
903
0
    {
904
      /* still some unread data, left-justify it in the buffer */
905
0
      memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
906
0
          PqRecvLength - PqRecvPointer);
907
0
      PqRecvLength -= PqRecvPointer;
908
0
      PqRecvPointer = 0;
909
0
    }
910
0
    else
911
0
      PqRecvLength = PqRecvPointer = 0;
912
0
  }
913
914
  /* Ensure that we're in blocking mode */
915
0
  socket_set_nonblocking(false);
916
917
  /* Can fill buffer from PqRecvLength and upwards */
918
0
  for (;;)
919
0
  {
920
0
    int     r;
921
922
0
    errno = 0;
923
924
0
    r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
925
0
            PQ_RECV_BUFFER_SIZE - PqRecvLength);
926
927
0
    if (r < 0)
928
0
    {
929
0
      if (errno == EINTR)
930
0
        continue;   /* Ok if interrupted */
931
932
      /*
933
       * Careful: an ereport() that tries to write to the client would
934
       * cause recursion to here, leading to stack overflow and core
935
       * dump!  This message must go *only* to the postmaster log.
936
       *
937
       * If errno is zero, assume it's EOF and let the caller complain.
938
       */
939
0
      if (errno != 0)
940
0
        ereport(COMMERROR,
941
0
            (errcode_for_socket_access(),
942
0
             errmsg("could not receive data from client: %m")));
943
0
      return EOF;
944
0
    }
945
0
    if (r == 0)
946
0
    {
947
      /*
948
       * EOF detected.  We used to write a log message here, but it's
949
       * better to expect the ultimate caller to do that.
950
       */
951
0
      return EOF;
952
0
    }
953
    /* r contains number of bytes read, so just incr length */
954
0
    PqRecvLength += r;
955
0
    return 0;
956
0
  }
957
0
}
958
959
/* --------------------------------
960
 *    pq_getbyte  - get a single byte from connection, or return EOF
961
 * --------------------------------
962
 */
963
int
964
pq_getbyte(void)
965
0
{
966
0
  Assert(PqCommReadingMsg);
967
968
0
  while (PqRecvPointer >= PqRecvLength)
969
0
  {
970
0
    if (pq_recvbuf())   /* If nothing in buffer, then recv some */
971
0
      return EOF;     /* Failed to recv data */
972
0
  }
973
0
  return (unsigned char) PqRecvBuffer[PqRecvPointer++];
974
0
}
975
976
/* --------------------------------
977
 *    pq_peekbyte   - peek at next byte from connection
978
 *
979
 *   Same as pq_getbyte() except we don't advance the pointer.
980
 * --------------------------------
981
 */
982
int
983
pq_peekbyte(void)
984
0
{
985
0
  Assert(PqCommReadingMsg);
986
987
0
  while (PqRecvPointer >= PqRecvLength)
988
0
  {
989
0
    if (pq_recvbuf())   /* If nothing in buffer, then recv some */
990
0
      return EOF;     /* Failed to recv data */
991
0
  }
992
0
  return (unsigned char) PqRecvBuffer[PqRecvPointer];
993
0
}
994
995
/* --------------------------------
996
 *    pq_getbyte_if_available - get a single byte from connection,
997
 *      if available
998
 *
999
 * The received byte is stored in *c. Returns 1 if a byte was read,
1000
 * 0 if no data was available, or EOF if trouble.
1001
 * --------------------------------
1002
 */
1003
int
1004
pq_getbyte_if_available(unsigned char *c)
1005
0
{
1006
0
  int     r;
1007
1008
0
  Assert(PqCommReadingMsg);
1009
1010
0
  if (PqRecvPointer < PqRecvLength)
1011
0
  {
1012
0
    *c = PqRecvBuffer[PqRecvPointer++];
1013
0
    return 1;
1014
0
  }
1015
1016
  /* Put the socket into non-blocking mode */
1017
0
  socket_set_nonblocking(true);
1018
1019
0
  errno = 0;
1020
1021
0
  r = secure_read(MyProcPort, c, 1);
1022
0
  if (r < 0)
1023
0
  {
1024
    /*
1025
     * Ok if no data available without blocking or interrupted (though
1026
     * EINTR really shouldn't happen with a non-blocking socket). Report
1027
     * other errors.
1028
     */
1029
0
    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1030
0
      r = 0;
1031
0
    else
1032
0
    {
1033
      /*
1034
       * Careful: an ereport() that tries to write to the client would
1035
       * cause recursion to here, leading to stack overflow and core
1036
       * dump!  This message must go *only* to the postmaster log.
1037
       *
1038
       * If errno is zero, assume it's EOF and let the caller complain.
1039
       */
1040
0
      if (errno != 0)
1041
0
        ereport(COMMERROR,
1042
0
            (errcode_for_socket_access(),
1043
0
             errmsg("could not receive data from client: %m")));
1044
0
      r = EOF;
1045
0
    }
1046
0
  }
1047
0
  else if (r == 0)
1048
0
  {
1049
    /* EOF detected */
1050
0
    r = EOF;
1051
0
  }
1052
1053
0
  return r;
1054
0
}
1055
1056
/* --------------------------------
1057
 *    pq_getbytes   - get a known number of bytes from connection
1058
 *
1059
 *    returns 0 if OK, EOF if trouble
1060
 * --------------------------------
1061
 */
1062
int
1063
pq_getbytes(void *b, size_t len)
1064
0
{
1065
0
  char     *s = b;
1066
0
  size_t    amount;
1067
1068
0
  Assert(PqCommReadingMsg);
1069
1070
0
  while (len > 0)
1071
0
  {
1072
0
    while (PqRecvPointer >= PqRecvLength)
1073
0
    {
1074
0
      if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1075
0
        return EOF;   /* Failed to recv data */
1076
0
    }
1077
0
    amount = PqRecvLength - PqRecvPointer;
1078
0
    if (amount > len)
1079
0
      amount = len;
1080
0
    memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1081
0
    PqRecvPointer += amount;
1082
0
    s += amount;
1083
0
    len -= amount;
1084
0
  }
1085
0
  return 0;
1086
0
}
1087
1088
/* --------------------------------
1089
 *    pq_discardbytes   - throw away a known number of bytes
1090
 *
1091
 *    same as pq_getbytes except we do not copy the data to anyplace.
1092
 *    this is used for resynchronizing after read errors.
1093
 *
1094
 *    returns 0 if OK, EOF if trouble
1095
 * --------------------------------
1096
 */
1097
static int
1098
pq_discardbytes(size_t len)
1099
0
{
1100
0
  size_t    amount;
1101
1102
0
  Assert(PqCommReadingMsg);
1103
1104
0
  while (len > 0)
1105
0
  {
1106
0
    while (PqRecvPointer >= PqRecvLength)
1107
0
    {
1108
0
      if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1109
0
        return EOF;   /* Failed to recv data */
1110
0
    }
1111
0
    amount = PqRecvLength - PqRecvPointer;
1112
0
    if (amount > len)
1113
0
      amount = len;
1114
0
    PqRecvPointer += amount;
1115
0
    len -= amount;
1116
0
  }
1117
0
  return 0;
1118
0
}
1119
1120
/* --------------------------------
1121
 *    pq_buffer_remaining_data  - return number of bytes in receive buffer
1122
 *
1123
 * This will *not* attempt to read more data. And reading up to that number of
1124
 * bytes should not cause reading any more data either.
1125
 * --------------------------------
1126
 */
1127
ssize_t
1128
pq_buffer_remaining_data(void)
1129
0
{
1130
0
  Assert(PqRecvLength >= PqRecvPointer);
1131
0
  return (PqRecvLength - PqRecvPointer);
1132
0
}
1133
1134
1135
/* --------------------------------
1136
 *    pq_startmsgread - begin reading a message from the client.
1137
 *
1138
 *    This must be called before any of the pq_get* functions.
1139
 * --------------------------------
1140
 */
1141
void
1142
pq_startmsgread(void)
1143
0
{
1144
  /*
1145
   * There shouldn't be a read active already, but let's check just to be
1146
   * sure.
1147
   */
1148
0
  if (PqCommReadingMsg)
1149
0
    ereport(FATAL,
1150
0
        (errcode(ERRCODE_PROTOCOL_VIOLATION),
1151
0
         errmsg("terminating connection because protocol synchronization was lost")));
1152
1153
0
  PqCommReadingMsg = true;
1154
0
}
1155
1156
1157
/* --------------------------------
1158
 *    pq_endmsgread - finish reading message.
1159
 *
1160
 *    This must be called after reading a message with pq_getbytes()
1161
 *    and friends, to indicate that we have read the whole message.
1162
 *    pq_getmessage() does this implicitly.
1163
 * --------------------------------
1164
 */
1165
void
1166
pq_endmsgread(void)
1167
0
{
1168
0
  Assert(PqCommReadingMsg);
1169
1170
0
  PqCommReadingMsg = false;
1171
0
}
1172
1173
/* --------------------------------
1174
 *    pq_is_reading_msg - are we currently reading a message?
1175
 *
1176
 * This is used in error recovery at the outer idle loop to detect if we have
1177
 * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1178
 * will check for that too, but it's nicer to detect it earlier.
1179
 * --------------------------------
1180
 */
1181
bool
1182
pq_is_reading_msg(void)
1183
0
{
1184
0
  return PqCommReadingMsg;
1185
0
}
1186
1187
/* --------------------------------
1188
 *    pq_getmessage - get a message with length word from connection
1189
 *
1190
 *    The return value is placed in an expansible StringInfo, which has
1191
 *    already been initialized by the caller.
1192
 *    Only the message body is placed in the StringInfo; the length word
1193
 *    is removed.  Also, s->cursor is initialized to zero for convenience
1194
 *    in scanning the message contents.
1195
 *
1196
 *    maxlen is the upper limit on the length of the
1197
 *    message we are willing to accept.  We abort the connection (by
1198
 *    returning EOF) if client tries to send more than that.
1199
 *
1200
 *    returns 0 if OK, EOF if trouble
1201
 * --------------------------------
1202
 */
1203
int
1204
pq_getmessage(StringInfo s, int maxlen)
1205
0
{
1206
0
  int32   len;
1207
1208
0
  Assert(PqCommReadingMsg);
1209
1210
0
  resetStringInfo(s);
1211
1212
  /* Read message length word */
1213
0
  if (pq_getbytes(&len, 4) == EOF)
1214
0
  {
1215
0
    ereport(COMMERROR,
1216
0
        (errcode(ERRCODE_PROTOCOL_VIOLATION),
1217
0
         errmsg("unexpected EOF within message length word")));
1218
0
    return EOF;
1219
0
  }
1220
1221
0
  len = pg_ntoh32(len);
1222
1223
0
  if (len < 4 || len > maxlen)
1224
0
  {
1225
0
    ereport(COMMERROR,
1226
0
        (errcode(ERRCODE_PROTOCOL_VIOLATION),
1227
0
         errmsg("invalid message length")));
1228
0
    return EOF;
1229
0
  }
1230
1231
0
  len -= 4;         /* discount length itself */
1232
1233
0
  if (len > 0)
1234
0
  {
1235
    /*
1236
     * Allocate space for message.  If we run out of room (ridiculously
1237
     * large message), we will elog(ERROR), but we want to discard the
1238
     * message body so as not to lose communication sync.
1239
     */
1240
0
    PG_TRY();
1241
0
    {
1242
0
      enlargeStringInfo(s, len);
1243
0
    }
1244
0
    PG_CATCH();
1245
0
    {
1246
0
      if (pq_discardbytes(len) == EOF)
1247
0
        ereport(COMMERROR,
1248
0
            (errcode(ERRCODE_PROTOCOL_VIOLATION),
1249
0
             errmsg("incomplete message from client")));
1250
1251
      /* we discarded the rest of the message so we're back in sync. */
1252
0
      PqCommReadingMsg = false;
1253
0
      PG_RE_THROW();
1254
0
    }
1255
0
    PG_END_TRY();
1256
1257
    /* And grab the message */
1258
0
    if (pq_getbytes(s->data, len) == EOF)
1259
0
    {
1260
0
      ereport(COMMERROR,
1261
0
          (errcode(ERRCODE_PROTOCOL_VIOLATION),
1262
0
           errmsg("incomplete message from client")));
1263
0
      return EOF;
1264
0
    }
1265
0
    s->len = len;
1266
    /* Place a trailing null per StringInfo convention */
1267
0
    s->data[len] = '\0';
1268
0
  }
1269
1270
  /* finished reading the message. */
1271
0
  PqCommReadingMsg = false;
1272
1273
0
  return 0;
1274
0
}
1275
1276
1277
static inline int
1278
internal_putbytes(const void *b, size_t len)
1279
0
{
1280
0
  const char *s = b;
1281
1282
0
  while (len > 0)
1283
0
  {
1284
    /* If buffer is full, then flush it out */
1285
0
    if (PqSendPointer >= PqSendBufferSize)
1286
0
    {
1287
0
      socket_set_nonblocking(false);
1288
0
      if (internal_flush())
1289
0
        return EOF;
1290
0
    }
1291
1292
    /*
1293
     * If the buffer is empty and data length is larger than the buffer
1294
     * size, send it without buffering.  Otherwise, copy as much data as
1295
     * possible into the buffer.
1296
     */
1297
0
    if (len >= PqSendBufferSize && PqSendStart == PqSendPointer)
1298
0
    {
1299
0
      size_t    start = 0;
1300
1301
0
      socket_set_nonblocking(false);
1302
0
      if (internal_flush_buffer(s, &start, &len))
1303
0
        return EOF;
1304
0
    }
1305
0
    else
1306
0
    {
1307
0
      size_t    amount = PqSendBufferSize - PqSendPointer;
1308
1309
0
      if (amount > len)
1310
0
        amount = len;
1311
0
      memcpy(PqSendBuffer + PqSendPointer, s, amount);
1312
0
      PqSendPointer += amount;
1313
0
      s += amount;
1314
0
      len -= amount;
1315
0
    }
1316
0
  }
1317
1318
0
  return 0;
1319
0
}
1320
1321
/* --------------------------------
1322
 *    socket_flush    - flush pending output
1323
 *
1324
 *    returns 0 if OK, EOF if trouble
1325
 * --------------------------------
1326
 */
1327
static int
1328
socket_flush(void)
1329
0
{
1330
0
  int     res;
1331
1332
  /* No-op if reentrant call */
1333
0
  if (PqCommBusy)
1334
0
    return 0;
1335
0
  PqCommBusy = true;
1336
0
  socket_set_nonblocking(false);
1337
0
  res = internal_flush();
1338
0
  PqCommBusy = false;
1339
0
  return res;
1340
0
}
1341
1342
/* --------------------------------
1343
 *    internal_flush - flush pending output
1344
 *
1345
 * Returns 0 if OK (meaning everything was sent, or operation would block
1346
 * and the socket is in non-blocking mode), or EOF if trouble.
1347
 * --------------------------------
1348
 */
1349
static inline int
1350
internal_flush(void)
1351
0
{
1352
0
  return internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer);
1353
0
}
1354
1355
/* --------------------------------
1356
 *    internal_flush_buffer - flush the given buffer content
1357
 *
1358
 * Returns 0 if OK (meaning everything was sent, or operation would block
1359
 * and the socket is in non-blocking mode), or EOF if trouble.
1360
 * --------------------------------
1361
 */
1362
static pg_noinline int
1363
internal_flush_buffer(const char *buf, size_t *start, size_t *end)
1364
0
{
1365
0
  static int  last_reported_send_errno = 0;
1366
1367
0
  const char *bufptr = buf + *start;
1368
0
  const char *bufend = buf + *end;
1369
1370
0
  while (bufptr < bufend)
1371
0
  {
1372
0
    int     r;
1373
1374
0
    r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1375
1376
0
    if (r <= 0)
1377
0
    {
1378
0
      if (errno == EINTR)
1379
0
        continue;   /* Ok if we were interrupted */
1380
1381
      /*
1382
       * Ok if no data writable without blocking, and the socket is in
1383
       * non-blocking mode.
1384
       */
1385
0
      if (errno == EAGAIN ||
1386
0
        errno == EWOULDBLOCK)
1387
0
      {
1388
0
        return 0;
1389
0
      }
1390
1391
      /*
1392
       * Careful: an ereport() that tries to write to the client would
1393
       * cause recursion to here, leading to stack overflow and core
1394
       * dump!  This message must go *only* to the postmaster log.
1395
       *
1396
       * If a client disconnects while we're in the midst of output, we
1397
       * might write quite a bit of data before we get to a safe query
1398
       * abort point.  So, suppress duplicate log messages.
1399
       */
1400
0
      if (errno != last_reported_send_errno)
1401
0
      {
1402
0
        last_reported_send_errno = errno;
1403
0
        ereport(COMMERROR,
1404
0
            (errcode_for_socket_access(),
1405
0
             errmsg("could not send data to client: %m")));
1406
0
      }
1407
1408
      /*
1409
       * We drop the buffered data anyway so that processing can
1410
       * continue, even though we'll probably quit soon. We also set a
1411
       * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1412
       * the connection.
1413
       */
1414
0
      *start = *end = 0;
1415
0
      ClientConnectionLost = 1;
1416
0
      InterruptPending = 1;
1417
0
      return EOF;
1418
0
    }
1419
1420
0
    last_reported_send_errno = 0; /* reset after any successful send */
1421
0
    bufptr += r;
1422
0
    *start += r;
1423
0
  }
1424
1425
0
  *start = *end = 0;
1426
0
  return 0;
1427
0
}
1428
1429
/* --------------------------------
1430
 *    pq_flush_if_writable - flush pending output if writable without blocking
1431
 *
1432
 * Returns 0 if OK, or EOF if trouble.
1433
 * --------------------------------
1434
 */
1435
static int
1436
socket_flush_if_writable(void)
1437
0
{
1438
0
  int     res;
1439
1440
  /* Quick exit if nothing to do */
1441
0
  if (PqSendPointer == PqSendStart)
1442
0
    return 0;
1443
1444
  /* No-op if reentrant call */
1445
0
  if (PqCommBusy)
1446
0
    return 0;
1447
1448
  /* Temporarily put the socket into non-blocking mode */
1449
0
  socket_set_nonblocking(true);
1450
1451
0
  PqCommBusy = true;
1452
0
  res = internal_flush();
1453
0
  PqCommBusy = false;
1454
0
  return res;
1455
0
}
1456
1457
/* --------------------------------
1458
 *  socket_is_send_pending  - is there any pending data in the output buffer?
1459
 * --------------------------------
1460
 */
1461
static bool
1462
socket_is_send_pending(void)
1463
0
{
1464
0
  return (PqSendStart < PqSendPointer);
1465
0
}
1466
1467
/* --------------------------------
1468
 * Message-level I/O routines begin here.
1469
 * --------------------------------
1470
 */
1471
1472
1473
/* --------------------------------
1474
 *    socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1475
 *
1476
 *    msgtype is a message type code to place before the message body.
1477
 *
1478
 *    len is the length of the message body data at *s.  A message length
1479
 *    word (equal to len+4 because it counts itself too) is inserted by this
1480
 *    routine.
1481
 *
1482
 *    We suppress messages generated while pqcomm.c is busy.  This
1483
 *    avoids any possibility of messages being inserted within other
1484
 *    messages.  The only known trouble case arises if SIGQUIT occurs
1485
 *    during a pqcomm.c routine --- quickdie() will try to send a warning
1486
 *    message, and the most reasonable approach seems to be to drop it.
1487
 *
1488
 *    returns 0 if OK, EOF if trouble
1489
 * --------------------------------
1490
 */
1491
static int
1492
socket_putmessage(char msgtype, const char *s, size_t len)
1493
0
{
1494
0
  uint32    n32;
1495
1496
0
  Assert(msgtype != 0);
1497
1498
0
  if (PqCommBusy)
1499
0
    return 0;
1500
0
  PqCommBusy = true;
1501
0
  if (internal_putbytes(&msgtype, 1))
1502
0
    goto fail;
1503
1504
0
  n32 = pg_hton32((uint32) (len + 4));
1505
0
  if (internal_putbytes(&n32, 4))
1506
0
    goto fail;
1507
1508
0
  if (internal_putbytes(s, len))
1509
0
    goto fail;
1510
0
  PqCommBusy = false;
1511
0
  return 0;
1512
1513
0
fail:
1514
0
  PqCommBusy = false;
1515
0
  return EOF;
1516
0
}
1517
1518
/* --------------------------------
1519
 *    pq_putmessage_noblock - like pq_putmessage, but never blocks
1520
 *
1521
 *    If the output buffer is too small to hold the message, the buffer
1522
 *    is enlarged.
1523
 */
1524
static void
1525
socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1526
0
{
1527
0
  int     res PG_USED_FOR_ASSERTS_ONLY;
1528
0
  int     required;
1529
1530
  /*
1531
   * Ensure we have enough space in the output buffer for the message header
1532
   * as well as the message itself.
1533
   */
1534
0
  required = PqSendPointer + 1 + 4 + len;
1535
0
  if (required > PqSendBufferSize)
1536
0
  {
1537
0
    PqSendBuffer = repalloc(PqSendBuffer, required);
1538
0
    PqSendBufferSize = required;
1539
0
  }
1540
0
  res = pq_putmessage(msgtype, s, len);
1541
0
  Assert(res == 0);     /* should not fail when the message fits in
1542
                 * buffer */
1543
0
}
1544
1545
/* --------------------------------
1546
 *    pq_putmessage_v2 - send a message in protocol version 2
1547
 *
1548
 *    msgtype is a message type code to place before the message body.
1549
 *
1550
 *    We no longer support protocol version 2, but we have kept this
1551
 *    function so that if a client tries to connect with protocol version 2,
1552
 *    as a courtesy we can still send the "unsupported protocol version"
1553
 *    error to the client in the old format.
1554
 *
1555
 *    Like in pq_putmessage(), we suppress messages generated while
1556
 *    pqcomm.c is busy.
1557
 *
1558
 *    returns 0 if OK, EOF if trouble
1559
 * --------------------------------
1560
 */
1561
int
1562
pq_putmessage_v2(char msgtype, const char *s, size_t len)
1563
0
{
1564
0
  Assert(msgtype != 0);
1565
1566
0
  if (PqCommBusy)
1567
0
    return 0;
1568
0
  PqCommBusy = true;
1569
0
  if (internal_putbytes(&msgtype, 1))
1570
0
    goto fail;
1571
1572
0
  if (internal_putbytes(s, len))
1573
0
    goto fail;
1574
0
  PqCommBusy = false;
1575
0
  return 0;
1576
1577
0
fail:
1578
0
  PqCommBusy = false;
1579
0
  return EOF;
1580
0
}
1581
1582
/*
1583
 * Support for TCP Keepalive parameters
1584
 */
1585
1586
/*
1587
 * On Windows, we need to set both idle and interval at the same time.
1588
 * We also cannot reset them to the default (setting to zero will
1589
 * actually set them to zero, not default), therefore we fallback to
1590
 * the out-of-the-box default instead.
1591
 */
1592
#if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1593
static int
1594
pq_setkeepaliveswin32(Port *port, int idle, int interval)
1595
{
1596
  struct tcp_keepalive ka;
1597
  DWORD   retsize;
1598
1599
  if (idle <= 0)
1600
    idle = 2 * 60 * 60;   /* default = 2 hours */
1601
  if (interval <= 0)
1602
    interval = 1;     /* default = 1 second */
1603
1604
  ka.onoff = 1;
1605
  ka.keepalivetime = idle * 1000;
1606
  ka.keepaliveinterval = interval * 1000;
1607
1608
  if (WSAIoctl(port->sock,
1609
         SIO_KEEPALIVE_VALS,
1610
         (LPVOID) &ka,
1611
         sizeof(ka),
1612
         NULL,
1613
         0,
1614
         &retsize,
1615
         NULL,
1616
         NULL)
1617
    != 0)
1618
  {
1619
    ereport(LOG,
1620
        (errmsg("%s(%s) failed: error code %d",
1621
            "WSAIoctl", "SIO_KEEPALIVE_VALS", WSAGetLastError())));
1622
    return STATUS_ERROR;
1623
  }
1624
  if (port->keepalives_idle != idle)
1625
    port->keepalives_idle = idle;
1626
  if (port->keepalives_interval != interval)
1627
    port->keepalives_interval = interval;
1628
  return STATUS_OK;
1629
}
1630
#endif
1631
1632
int
1633
pq_getkeepalivesidle(Port *port)
1634
0
{
1635
0
#if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1636
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1637
0
    return 0;
1638
1639
0
  if (port->keepalives_idle != 0)
1640
0
    return port->keepalives_idle;
1641
1642
0
  if (port->default_keepalives_idle == 0)
1643
0
  {
1644
0
#ifndef WIN32
1645
0
    socklen_t size = sizeof(port->default_keepalives_idle);
1646
1647
0
    if (getsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1648
0
             (char *) &port->default_keepalives_idle,
1649
0
             &size) < 0)
1650
0
    {
1651
0
      ereport(LOG,
1652
0
          (errmsg("%s(%s) failed: %m", "getsockopt", PG_TCP_KEEPALIVE_IDLE_STR)));
1653
0
      port->default_keepalives_idle = -1; /* don't know */
1654
0
    }
1655
#else             /* WIN32 */
1656
    /* We can't get the defaults on Windows, so return "don't know" */
1657
    port->default_keepalives_idle = -1;
1658
#endif              /* WIN32 */
1659
0
  }
1660
1661
0
  return port->default_keepalives_idle;
1662
#else
1663
  return 0;
1664
#endif
1665
0
}
1666
1667
int
1668
pq_setkeepalivesidle(int idle, Port *port)
1669
0
{
1670
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1671
0
    return STATUS_OK;
1672
1673
/* check SIO_KEEPALIVE_VALS here, not just WIN32, as some toolchains lack it */
1674
0
#if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1675
0
  if (idle == port->keepalives_idle)
1676
0
    return STATUS_OK;
1677
1678
0
#ifndef WIN32
1679
0
  if (port->default_keepalives_idle <= 0)
1680
0
  {
1681
0
    if (pq_getkeepalivesidle(port) < 0)
1682
0
    {
1683
0
      if (idle == 0)
1684
0
        return STATUS_OK; /* default is set but unknown */
1685
0
      else
1686
0
        return STATUS_ERROR;
1687
0
    }
1688
0
  }
1689
1690
0
  if (idle == 0)
1691
0
    idle = port->default_keepalives_idle;
1692
1693
0
  if (setsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1694
0
           (char *) &idle, sizeof(idle)) < 0)
1695
0
  {
1696
0
    ereport(LOG,
1697
0
        (errmsg("%s(%s) failed: %m", "setsockopt", PG_TCP_KEEPALIVE_IDLE_STR)));
1698
0
    return STATUS_ERROR;
1699
0
  }
1700
1701
0
  port->keepalives_idle = idle;
1702
#else             /* WIN32 */
1703
  return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1704
#endif
1705
#else
1706
  if (idle != 0)
1707
  {
1708
    ereport(LOG,
1709
        (errmsg("setting the keepalive idle time is not supported")));
1710
    return STATUS_ERROR;
1711
  }
1712
#endif
1713
1714
0
  return STATUS_OK;
1715
0
}
1716
1717
int
1718
pq_getkeepalivesinterval(Port *port)
1719
0
{
1720
0
#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1721
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1722
0
    return 0;
1723
1724
0
  if (port->keepalives_interval != 0)
1725
0
    return port->keepalives_interval;
1726
1727
0
  if (port->default_keepalives_interval == 0)
1728
0
  {
1729
0
#ifndef WIN32
1730
0
    socklen_t size = sizeof(port->default_keepalives_interval);
1731
1732
0
    if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1733
0
             (char *) &port->default_keepalives_interval,
1734
0
             &size) < 0)
1735
0
    {
1736
0
      ereport(LOG,
1737
0
          (errmsg("%s(%s) failed: %m", "getsockopt", "TCP_KEEPINTVL")));
1738
0
      port->default_keepalives_interval = -1; /* don't know */
1739
0
    }
1740
#else
1741
    /* We can't get the defaults on Windows, so return "don't know" */
1742
    port->default_keepalives_interval = -1;
1743
#endif              /* WIN32 */
1744
0
  }
1745
1746
0
  return port->default_keepalives_interval;
1747
#else
1748
  return 0;
1749
#endif
1750
0
}
1751
1752
int
1753
pq_setkeepalivesinterval(int interval, Port *port)
1754
0
{
1755
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1756
0
    return STATUS_OK;
1757
1758
0
#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1759
0
  if (interval == port->keepalives_interval)
1760
0
    return STATUS_OK;
1761
1762
0
#ifndef WIN32
1763
0
  if (port->default_keepalives_interval <= 0)
1764
0
  {
1765
0
    if (pq_getkeepalivesinterval(port) < 0)
1766
0
    {
1767
0
      if (interval == 0)
1768
0
        return STATUS_OK; /* default is set but unknown */
1769
0
      else
1770
0
        return STATUS_ERROR;
1771
0
    }
1772
0
  }
1773
1774
0
  if (interval == 0)
1775
0
    interval = port->default_keepalives_interval;
1776
1777
0
  if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1778
0
           (char *) &interval, sizeof(interval)) < 0)
1779
0
  {
1780
0
    ereport(LOG,
1781
0
        (errmsg("%s(%s) failed: %m", "setsockopt", "TCP_KEEPINTVL")));
1782
0
    return STATUS_ERROR;
1783
0
  }
1784
1785
0
  port->keepalives_interval = interval;
1786
#else             /* WIN32 */
1787
  return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1788
#endif
1789
#else
1790
  if (interval != 0)
1791
  {
1792
    ereport(LOG,
1793
        (errmsg("%s(%s) not supported", "setsockopt", "TCP_KEEPINTVL")));
1794
    return STATUS_ERROR;
1795
  }
1796
#endif
1797
1798
0
  return STATUS_OK;
1799
0
}
1800
1801
int
1802
pq_getkeepalivescount(Port *port)
1803
0
{
1804
0
#ifdef TCP_KEEPCNT
1805
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1806
0
    return 0;
1807
1808
0
  if (port->keepalives_count != 0)
1809
0
    return port->keepalives_count;
1810
1811
0
  if (port->default_keepalives_count == 0)
1812
0
  {
1813
0
    socklen_t size = sizeof(port->default_keepalives_count);
1814
1815
0
    if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1816
0
             (char *) &port->default_keepalives_count,
1817
0
             &size) < 0)
1818
0
    {
1819
0
      ereport(LOG,
1820
0
          (errmsg("%s(%s) failed: %m", "getsockopt", "TCP_KEEPCNT")));
1821
0
      port->default_keepalives_count = -1;  /* don't know */
1822
0
    }
1823
0
  }
1824
1825
0
  return port->default_keepalives_count;
1826
#else
1827
  return 0;
1828
#endif
1829
0
}
1830
1831
int
1832
pq_setkeepalivescount(int count, Port *port)
1833
0
{
1834
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1835
0
    return STATUS_OK;
1836
1837
0
#ifdef TCP_KEEPCNT
1838
0
  if (count == port->keepalives_count)
1839
0
    return STATUS_OK;
1840
1841
0
  if (port->default_keepalives_count <= 0)
1842
0
  {
1843
0
    if (pq_getkeepalivescount(port) < 0)
1844
0
    {
1845
0
      if (count == 0)
1846
0
        return STATUS_OK; /* default is set but unknown */
1847
0
      else
1848
0
        return STATUS_ERROR;
1849
0
    }
1850
0
  }
1851
1852
0
  if (count == 0)
1853
0
    count = port->default_keepalives_count;
1854
1855
0
  if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1856
0
           (char *) &count, sizeof(count)) < 0)
1857
0
  {
1858
0
    ereport(LOG,
1859
0
        (errmsg("%s(%s) failed: %m", "setsockopt", "TCP_KEEPCNT")));
1860
0
    return STATUS_ERROR;
1861
0
  }
1862
1863
0
  port->keepalives_count = count;
1864
#else
1865
  if (count != 0)
1866
  {
1867
    ereport(LOG,
1868
        (errmsg("%s(%s) not supported", "setsockopt", "TCP_KEEPCNT")));
1869
    return STATUS_ERROR;
1870
  }
1871
#endif
1872
1873
0
  return STATUS_OK;
1874
0
}
1875
1876
int
1877
pq_gettcpusertimeout(Port *port)
1878
0
{
1879
0
#ifdef TCP_USER_TIMEOUT
1880
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1881
0
    return 0;
1882
1883
0
  if (port->tcp_user_timeout != 0)
1884
0
    return port->tcp_user_timeout;
1885
1886
0
  if (port->default_tcp_user_timeout == 0)
1887
0
  {
1888
0
    socklen_t size = sizeof(port->default_tcp_user_timeout);
1889
1890
0
    if (getsockopt(port->sock, IPPROTO_TCP, TCP_USER_TIMEOUT,
1891
0
             (char *) &port->default_tcp_user_timeout,
1892
0
             &size) < 0)
1893
0
    {
1894
0
      ereport(LOG,
1895
0
          (errmsg("%s(%s) failed: %m", "getsockopt", "TCP_USER_TIMEOUT")));
1896
0
      port->default_tcp_user_timeout = -1;  /* don't know */
1897
0
    }
1898
0
  }
1899
1900
0
  return port->default_tcp_user_timeout;
1901
#else
1902
  return 0;
1903
#endif
1904
0
}
1905
1906
int
1907
pq_settcpusertimeout(int timeout, Port *port)
1908
0
{
1909
0
  if (port == NULL || port->laddr.addr.ss_family == AF_UNIX)
1910
0
    return STATUS_OK;
1911
1912
0
#ifdef TCP_USER_TIMEOUT
1913
0
  if (timeout == port->tcp_user_timeout)
1914
0
    return STATUS_OK;
1915
1916
0
  if (port->default_tcp_user_timeout <= 0)
1917
0
  {
1918
0
    if (pq_gettcpusertimeout(port) < 0)
1919
0
    {
1920
0
      if (timeout == 0)
1921
0
        return STATUS_OK; /* default is set but unknown */
1922
0
      else
1923
0
        return STATUS_ERROR;
1924
0
    }
1925
0
  }
1926
1927
0
  if (timeout == 0)
1928
0
    timeout = port->default_tcp_user_timeout;
1929
1930
0
  if (setsockopt(port->sock, IPPROTO_TCP, TCP_USER_TIMEOUT,
1931
0
           (char *) &timeout, sizeof(timeout)) < 0)
1932
0
  {
1933
0
    ereport(LOG,
1934
0
        (errmsg("%s(%s) failed: %m", "setsockopt", "TCP_USER_TIMEOUT")));
1935
0
    return STATUS_ERROR;
1936
0
  }
1937
1938
0
  port->tcp_user_timeout = timeout;
1939
#else
1940
  if (timeout != 0)
1941
  {
1942
    ereport(LOG,
1943
        (errmsg("%s(%s) not supported", "setsockopt", "TCP_USER_TIMEOUT")));
1944
    return STATUS_ERROR;
1945
  }
1946
#endif
1947
1948
0
  return STATUS_OK;
1949
0
}
1950
1951
/*
1952
 * GUC assign_hook for tcp_keepalives_idle
1953
 */
1954
void
1955
assign_tcp_keepalives_idle(int newval, void *extra)
1956
0
{
1957
  /*
1958
   * The kernel API provides no way to test a value without setting it; and
1959
   * once we set it we might fail to unset it.  So there seems little point
1960
   * in fully implementing the check-then-assign GUC API for these
1961
   * variables.  Instead we just do the assignment on demand.
1962
   * pq_setkeepalivesidle reports any problems via ereport(LOG).
1963
   *
1964
   * This approach means that the GUC value might have little to do with the
1965
   * actual kernel value, so we use a show_hook that retrieves the kernel
1966
   * value rather than trusting GUC's copy.
1967
   */
1968
0
  (void) pq_setkeepalivesidle(newval, MyProcPort);
1969
0
}
1970
1971
/*
1972
 * GUC show_hook for tcp_keepalives_idle
1973
 */
1974
const char *
1975
show_tcp_keepalives_idle(void)
1976
0
{
1977
  /* See comments in assign_tcp_keepalives_idle */
1978
0
  static char nbuf[16];
1979
1980
0
  snprintf(nbuf, sizeof(nbuf), "%d", pq_getkeepalivesidle(MyProcPort));
1981
0
  return nbuf;
1982
0
}
1983
1984
/*
1985
 * GUC assign_hook for tcp_keepalives_interval
1986
 */
1987
void
1988
assign_tcp_keepalives_interval(int newval, void *extra)
1989
0
{
1990
  /* See comments in assign_tcp_keepalives_idle */
1991
0
  (void) pq_setkeepalivesinterval(newval, MyProcPort);
1992
0
}
1993
1994
/*
1995
 * GUC show_hook for tcp_keepalives_interval
1996
 */
1997
const char *
1998
show_tcp_keepalives_interval(void)
1999
0
{
2000
  /* See comments in assign_tcp_keepalives_idle */
2001
0
  static char nbuf[16];
2002
2003
0
  snprintf(nbuf, sizeof(nbuf), "%d", pq_getkeepalivesinterval(MyProcPort));
2004
0
  return nbuf;
2005
0
}
2006
2007
/*
2008
 * GUC assign_hook for tcp_keepalives_count
2009
 */
2010
void
2011
assign_tcp_keepalives_count(int newval, void *extra)
2012
0
{
2013
  /* See comments in assign_tcp_keepalives_idle */
2014
0
  (void) pq_setkeepalivescount(newval, MyProcPort);
2015
0
}
2016
2017
/*
2018
 * GUC show_hook for tcp_keepalives_count
2019
 */
2020
const char *
2021
show_tcp_keepalives_count(void)
2022
0
{
2023
  /* See comments in assign_tcp_keepalives_idle */
2024
0
  static char nbuf[16];
2025
2026
0
  snprintf(nbuf, sizeof(nbuf), "%d", pq_getkeepalivescount(MyProcPort));
2027
0
  return nbuf;
2028
0
}
2029
2030
/*
2031
 * GUC assign_hook for tcp_user_timeout
2032
 */
2033
void
2034
assign_tcp_user_timeout(int newval, void *extra)
2035
0
{
2036
  /* See comments in assign_tcp_keepalives_idle */
2037
0
  (void) pq_settcpusertimeout(newval, MyProcPort);
2038
0
}
2039
2040
/*
2041
 * GUC show_hook for tcp_user_timeout
2042
 */
2043
const char *
2044
show_tcp_user_timeout(void)
2045
0
{
2046
  /* See comments in assign_tcp_keepalives_idle */
2047
0
  static char nbuf[16];
2048
2049
0
  snprintf(nbuf, sizeof(nbuf), "%d", pq_gettcpusertimeout(MyProcPort));
2050
0
  return nbuf;
2051
0
}
2052
2053
/*
2054
 * Check if the client is still connected.
2055
 */
2056
bool
2057
pq_check_connection(void)
2058
0
{
2059
0
  WaitEvent events[FeBeWaitSetNEvents];
2060
0
  int     rc;
2061
2062
  /*
2063
   * It's OK to modify the socket event filter without restoring, because
2064
   * all FeBeWaitSet socket wait sites do the same.
2065
   */
2066
0
  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
2067
2068
0
retry:
2069
0
  rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
2070
0
  for (int i = 0; i < rc; ++i)
2071
0
  {
2072
0
    if (events[i].events & WL_SOCKET_CLOSED)
2073
0
      return false;
2074
0
    if (events[i].events & WL_LATCH_SET)
2075
0
    {
2076
      /*
2077
       * A latch event might be preventing other events from being
2078
       * reported.  Reset it and poll again.  No need to restore it
2079
       * because no code should expect latches to survive across
2080
       * CHECK_FOR_INTERRUPTS().
2081
       */
2082
0
      ResetLatch(MyLatch);
2083
0
      goto retry;
2084
0
    }
2085
0
  }
2086
2087
0
  return true;
2088
0
}