Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/replication/logical/proto.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * proto.c
4
 *    logical replication protocol functions
5
 *
6
 * Copyright (c) 2015-2025, PostgreSQL Global Development Group
7
 *
8
 * IDENTIFICATION
9
 *    src/backend/replication/logical/proto.c
10
 *
11
 *-------------------------------------------------------------------------
12
 */
13
#include "postgres.h"
14
15
#include "access/sysattr.h"
16
#include "catalog/pg_namespace.h"
17
#include "catalog/pg_type.h"
18
#include "libpq/pqformat.h"
19
#include "replication/logicalproto.h"
20
#include "utils/lsyscache.h"
21
#include "utils/syscache.h"
22
23
/*
24
 * Protocol message flags.
25
 */
26
0
#define LOGICALREP_IS_REPLICA_IDENTITY 1
27
28
0
#define MESSAGE_TRANSACTIONAL (1<<0)
29
0
#define TRUNCATE_CASCADE    (1<<0)
30
0
#define TRUNCATE_RESTART_SEQS (1<<1)
31
32
static void logicalrep_write_attrs(StringInfo out, Relation rel,
33
                   Bitmapset *columns,
34
                   PublishGencolsType include_gencols_type);
35
static void logicalrep_write_tuple(StringInfo out, Relation rel,
36
                   TupleTableSlot *slot,
37
                   bool binary, Bitmapset *columns,
38
                   PublishGencolsType include_gencols_type);
39
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
40
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
41
42
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
43
static const char *logicalrep_read_namespace(StringInfo in);
44
45
/*
46
 * Write BEGIN to the output stream.
47
 */
48
void
49
logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
50
0
{
51
0
  pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
52
53
  /* fixed fields */
54
0
  pq_sendint64(out, txn->final_lsn);
55
0
  pq_sendint64(out, txn->xact_time.commit_time);
56
0
  pq_sendint32(out, txn->xid);
57
0
}
58
59
/*
60
 * Read transaction BEGIN from the stream.
61
 */
62
void
63
logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
64
0
{
65
  /* read fields */
66
0
  begin_data->final_lsn = pq_getmsgint64(in);
67
0
  if (begin_data->final_lsn == InvalidXLogRecPtr)
68
0
    elog(ERROR, "final_lsn not set in begin message");
69
0
  begin_data->committime = pq_getmsgint64(in);
70
0
  begin_data->xid = pq_getmsgint(in, 4);
71
0
}
72
73
74
/*
75
 * Write COMMIT to the output stream.
76
 */
77
void
78
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
79
            XLogRecPtr commit_lsn)
80
0
{
81
0
  uint8   flags = 0;
82
83
0
  pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
84
85
  /* send the flags field (unused for now) */
86
0
  pq_sendbyte(out, flags);
87
88
  /* send fields */
89
0
  pq_sendint64(out, commit_lsn);
90
0
  pq_sendint64(out, txn->end_lsn);
91
0
  pq_sendint64(out, txn->xact_time.commit_time);
92
0
}
93
94
/*
95
 * Read transaction COMMIT from the stream.
96
 */
97
void
98
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
99
0
{
100
  /* read flags (unused for now) */
101
0
  uint8   flags = pq_getmsgbyte(in);
102
103
0
  if (flags != 0)
104
0
    elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106
  /* read fields */
107
0
  commit_data->commit_lsn = pq_getmsgint64(in);
108
0
  commit_data->end_lsn = pq_getmsgint64(in);
109
0
  commit_data->committime = pq_getmsgint64(in);
110
0
}
111
112
/*
113
 * Write BEGIN PREPARE to the output stream.
114
 */
115
void
116
logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
117
0
{
118
0
  pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
119
120
  /* fixed fields */
121
0
  pq_sendint64(out, txn->final_lsn);
122
0
  pq_sendint64(out, txn->end_lsn);
123
0
  pq_sendint64(out, txn->xact_time.prepare_time);
124
0
  pq_sendint32(out, txn->xid);
125
126
  /* send gid */
127
0
  pq_sendstring(out, txn->gid);
128
0
}
129
130
/*
131
 * Read transaction BEGIN PREPARE from the stream.
132
 */
133
void
134
logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
135
0
{
136
  /* read fields */
137
0
  begin_data->prepare_lsn = pq_getmsgint64(in);
138
0
  if (begin_data->prepare_lsn == InvalidXLogRecPtr)
139
0
    elog(ERROR, "prepare_lsn not set in begin prepare message");
140
0
  begin_data->end_lsn = pq_getmsgint64(in);
141
0
  if (begin_data->end_lsn == InvalidXLogRecPtr)
142
0
    elog(ERROR, "end_lsn not set in begin prepare message");
143
0
  begin_data->prepare_time = pq_getmsgint64(in);
144
0
  begin_data->xid = pq_getmsgint(in, 4);
145
146
  /* read gid (copy it into a pre-allocated buffer) */
147
0
  strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148
0
}
149
150
/*
151
 * The core functionality for logicalrep_write_prepare and
152
 * logicalrep_write_stream_prepare.
153
 */
154
static void
155
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
156
                ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
157
0
{
158
0
  uint8   flags = 0;
159
160
0
  pq_sendbyte(out, type);
161
162
  /*
163
   * This should only ever happen for two-phase commit transactions, in
164
   * which case we expect to have a valid GID.
165
   */
166
0
  Assert(txn->gid != NULL);
167
0
  Assert(rbtxn_is_prepared(txn));
168
0
  Assert(TransactionIdIsValid(txn->xid));
169
170
  /* send the flags field */
171
0
  pq_sendbyte(out, flags);
172
173
  /* send fields */
174
0
  pq_sendint64(out, prepare_lsn);
175
0
  pq_sendint64(out, txn->end_lsn);
176
0
  pq_sendint64(out, txn->xact_time.prepare_time);
177
0
  pq_sendint32(out, txn->xid);
178
179
  /* send gid */
180
0
  pq_sendstring(out, txn->gid);
181
0
}
182
183
/*
184
 * Write PREPARE to the output stream.
185
 */
186
void
187
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
188
             XLogRecPtr prepare_lsn)
189
0
{
190
0
  logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
191
0
                  txn, prepare_lsn);
192
0
}
193
194
/*
195
 * The core functionality for logicalrep_read_prepare and
196
 * logicalrep_read_stream_prepare.
197
 */
198
static void
199
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
200
                 LogicalRepPreparedTxnData *prepare_data)
201
0
{
202
  /* read flags */
203
0
  uint8   flags = pq_getmsgbyte(in);
204
205
0
  if (flags != 0)
206
0
    elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208
  /* read fields */
209
0
  prepare_data->prepare_lsn = pq_getmsgint64(in);
210
0
  if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
211
0
    elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
212
0
  prepare_data->end_lsn = pq_getmsgint64(in);
213
0
  if (prepare_data->end_lsn == InvalidXLogRecPtr)
214
0
    elog(ERROR, "end_lsn is not set in %s message", msgtype);
215
0
  prepare_data->prepare_time = pq_getmsgint64(in);
216
0
  prepare_data->xid = pq_getmsgint(in, 4);
217
0
  if (prepare_data->xid == InvalidTransactionId)
218
0
    elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220
  /* read gid (copy it into a pre-allocated buffer) */
221
0
  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
222
0
}
223
224
/*
225
 * Read transaction PREPARE from the stream.
226
 */
227
void
228
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
229
0
{
230
0
  logicalrep_read_prepare_common(in, "prepare", prepare_data);
231
0
}
232
233
/*
234
 * Write COMMIT PREPARED to the output stream.
235
 */
236
void
237
logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
238
                 XLogRecPtr commit_lsn)
239
0
{
240
0
  uint8   flags = 0;
241
242
0
  pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
243
244
  /*
245
   * This should only ever happen for two-phase commit transactions, in
246
   * which case we expect to have a valid GID.
247
   */
248
0
  Assert(txn->gid != NULL);
249
250
  /* send the flags field */
251
0
  pq_sendbyte(out, flags);
252
253
  /* send fields */
254
0
  pq_sendint64(out, commit_lsn);
255
0
  pq_sendint64(out, txn->end_lsn);
256
0
  pq_sendint64(out, txn->xact_time.commit_time);
257
0
  pq_sendint32(out, txn->xid);
258
259
  /* send gid */
260
0
  pq_sendstring(out, txn->gid);
261
0
}
262
263
/*
264
 * Read transaction COMMIT PREPARED from the stream.
265
 */
266
void
267
logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
268
0
{
269
  /* read flags */
270
0
  uint8   flags = pq_getmsgbyte(in);
271
272
0
  if (flags != 0)
273
0
    elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275
  /* read fields */
276
0
  prepare_data->commit_lsn = pq_getmsgint64(in);
277
0
  if (prepare_data->commit_lsn == InvalidXLogRecPtr)
278
0
    elog(ERROR, "commit_lsn is not set in commit prepared message");
279
0
  prepare_data->end_lsn = pq_getmsgint64(in);
280
0
  if (prepare_data->end_lsn == InvalidXLogRecPtr)
281
0
    elog(ERROR, "end_lsn is not set in commit prepared message");
282
0
  prepare_data->commit_time = pq_getmsgint64(in);
283
0
  prepare_data->xid = pq_getmsgint(in, 4);
284
285
  /* read gid (copy it into a pre-allocated buffer) */
286
0
  strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287
0
}
288
289
/*
290
 * Write ROLLBACK PREPARED to the output stream.
291
 */
292
void
293
logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
294
                   XLogRecPtr prepare_end_lsn,
295
                   TimestampTz prepare_time)
296
0
{
297
0
  uint8   flags = 0;
298
299
0
  pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
300
301
  /*
302
   * This should only ever happen for two-phase commit transactions, in
303
   * which case we expect to have a valid GID.
304
   */
305
0
  Assert(txn->gid != NULL);
306
307
  /* send the flags field */
308
0
  pq_sendbyte(out, flags);
309
310
  /* send fields */
311
0
  pq_sendint64(out, prepare_end_lsn);
312
0
  pq_sendint64(out, txn->end_lsn);
313
0
  pq_sendint64(out, prepare_time);
314
0
  pq_sendint64(out, txn->xact_time.commit_time);
315
0
  pq_sendint32(out, txn->xid);
316
317
  /* send gid */
318
0
  pq_sendstring(out, txn->gid);
319
0
}
320
321
/*
322
 * Read transaction ROLLBACK PREPARED from the stream.
323
 */
324
void
325
logicalrep_read_rollback_prepared(StringInfo in,
326
                  LogicalRepRollbackPreparedTxnData *rollback_data)
327
0
{
328
  /* read flags */
329
0
  uint8   flags = pq_getmsgbyte(in);
330
331
0
  if (flags != 0)
332
0
    elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334
  /* read fields */
335
0
  rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336
0
  if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
337
0
    elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338
0
  rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339
0
  if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
340
0
    elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341
0
  rollback_data->prepare_time = pq_getmsgint64(in);
342
0
  rollback_data->rollback_time = pq_getmsgint64(in);
343
0
  rollback_data->xid = pq_getmsgint(in, 4);
344
345
  /* read gid (copy it into a pre-allocated buffer) */
346
0
  strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347
0
}
348
349
/*
350
 * Write STREAM PREPARE to the output stream.
351
 */
352
void
353
logicalrep_write_stream_prepare(StringInfo out,
354
                ReorderBufferTXN *txn,
355
                XLogRecPtr prepare_lsn)
356
0
{
357
0
  logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
358
0
                  txn, prepare_lsn);
359
0
}
360
361
/*
362
 * Read STREAM PREPARE from the stream.
363
 */
364
void
365
logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
366
0
{
367
0
  logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368
0
}
369
370
/*
371
 * Write ORIGIN to the output stream.
372
 */
373
void
374
logicalrep_write_origin(StringInfo out, const char *origin,
375
            XLogRecPtr origin_lsn)
376
0
{
377
0
  pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
378
379
  /* fixed fields */
380
0
  pq_sendint64(out, origin_lsn);
381
382
  /* origin string */
383
0
  pq_sendstring(out, origin);
384
0
}
385
386
/*
387
 * Read ORIGIN from the output stream.
388
 */
389
char *
390
logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
391
0
{
392
  /* fixed fields */
393
0
  *origin_lsn = pq_getmsgint64(in);
394
395
  /* return origin */
396
0
  return pstrdup(pq_getmsgstring(in));
397
0
}
398
399
/*
400
 * Write INSERT to the output stream.
401
 */
402
void
403
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
404
            TupleTableSlot *newslot, bool binary,
405
            Bitmapset *columns,
406
            PublishGencolsType include_gencols_type)
407
0
{
408
0
  pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
409
410
  /* transaction ID (if not valid, we're not streaming) */
411
0
  if (TransactionIdIsValid(xid))
412
0
    pq_sendint32(out, xid);
413
414
  /* use Oid as relation identifier */
415
0
  pq_sendint32(out, RelationGetRelid(rel));
416
417
0
  pq_sendbyte(out, 'N');    /* new tuple follows */
418
0
  logicalrep_write_tuple(out, rel, newslot, binary, columns,
419
0
               include_gencols_type);
420
0
}
421
422
/*
423
 * Read INSERT from stream.
424
 *
425
 * Fills the new tuple.
426
 */
427
LogicalRepRelId
428
logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
429
0
{
430
0
  char    action;
431
0
  LogicalRepRelId relid;
432
433
  /* read the relation id */
434
0
  relid = pq_getmsgint(in, 4);
435
436
0
  action = pq_getmsgbyte(in);
437
0
  if (action != 'N')
438
0
    elog(ERROR, "expected new tuple but got %d",
439
0
       action);
440
441
0
  logicalrep_read_tuple(in, newtup);
442
443
0
  return relid;
444
0
}
445
446
/*
447
 * Write UPDATE to the output stream.
448
 */
449
void
450
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
451
            TupleTableSlot *oldslot, TupleTableSlot *newslot,
452
            bool binary, Bitmapset *columns,
453
            PublishGencolsType include_gencols_type)
454
0
{
455
0
  pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
456
457
0
  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458
0
       rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459
0
       rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461
  /* transaction ID (if not valid, we're not streaming) */
462
0
  if (TransactionIdIsValid(xid))
463
0
    pq_sendint32(out, xid);
464
465
  /* use Oid as relation identifier */
466
0
  pq_sendint32(out, RelationGetRelid(rel));
467
468
0
  if (oldslot != NULL)
469
0
  {
470
0
    if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471
0
      pq_sendbyte(out, 'O'); /* old tuple follows */
472
0
    else
473
0
      pq_sendbyte(out, 'K'); /* old key follows */
474
0
    logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475
0
                 include_gencols_type);
476
0
  }
477
478
0
  pq_sendbyte(out, 'N');    /* new tuple follows */
479
0
  logicalrep_write_tuple(out, rel, newslot, binary, columns,
480
0
               include_gencols_type);
481
0
}
482
483
/*
484
 * Read UPDATE from stream.
485
 */
486
LogicalRepRelId
487
logicalrep_read_update(StringInfo in, bool *has_oldtuple,
488
             LogicalRepTupleData *oldtup,
489
             LogicalRepTupleData *newtup)
490
0
{
491
0
  char    action;
492
0
  LogicalRepRelId relid;
493
494
  /* read the relation id */
495
0
  relid = pq_getmsgint(in, 4);
496
497
  /* read and verify action */
498
0
  action = pq_getmsgbyte(in);
499
0
  if (action != 'K' && action != 'O' && action != 'N')
500
0
    elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501
0
       action);
502
503
  /* check for old tuple */
504
0
  if (action == 'K' || action == 'O')
505
0
  {
506
0
    logicalrep_read_tuple(in, oldtup);
507
0
    *has_oldtuple = true;
508
509
0
    action = pq_getmsgbyte(in);
510
0
  }
511
0
  else
512
0
    *has_oldtuple = false;
513
514
  /* check for new  tuple */
515
0
  if (action != 'N')
516
0
    elog(ERROR, "expected action 'N', got %c",
517
0
       action);
518
519
0
  logicalrep_read_tuple(in, newtup);
520
521
0
  return relid;
522
0
}
523
524
/*
525
 * Write DELETE to the output stream.
526
 */
527
void
528
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
529
            TupleTableSlot *oldslot, bool binary,
530
            Bitmapset *columns,
531
            PublishGencolsType include_gencols_type)
532
0
{
533
0
  Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534
0
       rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535
0
       rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
537
0
  pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
538
539
  /* transaction ID (if not valid, we're not streaming) */
540
0
  if (TransactionIdIsValid(xid))
541
0
    pq_sendint32(out, xid);
542
543
  /* use Oid as relation identifier */
544
0
  pq_sendint32(out, RelationGetRelid(rel));
545
546
0
  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547
0
    pq_sendbyte(out, 'O'); /* old tuple follows */
548
0
  else
549
0
    pq_sendbyte(out, 'K'); /* old key follows */
550
551
0
  logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552
0
               include_gencols_type);
553
0
}
554
555
/*
556
 * Read DELETE from stream.
557
 *
558
 * Fills the old tuple.
559
 */
560
LogicalRepRelId
561
logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
562
0
{
563
0
  char    action;
564
0
  LogicalRepRelId relid;
565
566
  /* read the relation id */
567
0
  relid = pq_getmsgint(in, 4);
568
569
  /* read and verify action */
570
0
  action = pq_getmsgbyte(in);
571
0
  if (action != 'K' && action != 'O')
572
0
    elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
574
0
  logicalrep_read_tuple(in, oldtup);
575
576
0
  return relid;
577
0
}
578
579
/*
580
 * Write TRUNCATE to the output stream.
581
 */
582
void
583
logicalrep_write_truncate(StringInfo out,
584
              TransactionId xid,
585
              int nrelids,
586
              Oid relids[],
587
              bool cascade, bool restart_seqs)
588
0
{
589
0
  int     i;
590
0
  uint8   flags = 0;
591
592
0
  pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
593
594
  /* transaction ID (if not valid, we're not streaming) */
595
0
  if (TransactionIdIsValid(xid))
596
0
    pq_sendint32(out, xid);
597
598
0
  pq_sendint32(out, nrelids);
599
600
  /* encode and send truncate flags */
601
0
  if (cascade)
602
0
    flags |= TRUNCATE_CASCADE;
603
0
  if (restart_seqs)
604
0
    flags |= TRUNCATE_RESTART_SEQS;
605
0
  pq_sendint8(out, flags);
606
607
0
  for (i = 0; i < nrelids; i++)
608
0
    pq_sendint32(out, relids[i]);
609
0
}
610
611
/*
612
 * Read TRUNCATE from stream.
613
 */
614
List *
615
logicalrep_read_truncate(StringInfo in,
616
             bool *cascade, bool *restart_seqs)
617
0
{
618
0
  int     i;
619
0
  int     nrelids;
620
0
  List     *relids = NIL;
621
0
  uint8   flags;
622
623
0
  nrelids = pq_getmsgint(in, 4);
624
625
  /* read and decode truncate flags */
626
0
  flags = pq_getmsgint(in, 1);
627
0
  *cascade = (flags & TRUNCATE_CASCADE) > 0;
628
0
  *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629
630
0
  for (i = 0; i < nrelids; i++)
631
0
    relids = lappend_oid(relids, pq_getmsgint(in, 4));
632
633
0
  return relids;
634
0
}
635
636
/*
637
 * Write MESSAGE to stream
638
 */
639
void
640
logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
641
             bool transactional, const char *prefix, Size sz,
642
             const char *message)
643
0
{
644
0
  uint8   flags = 0;
645
646
0
  pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
647
648
  /* encode and send message flags */
649
0
  if (transactional)
650
0
    flags |= MESSAGE_TRANSACTIONAL;
651
652
  /* transaction ID (if not valid, we're not streaming) */
653
0
  if (TransactionIdIsValid(xid))
654
0
    pq_sendint32(out, xid);
655
656
0
  pq_sendint8(out, flags);
657
0
  pq_sendint64(out, lsn);
658
0
  pq_sendstring(out, prefix);
659
0
  pq_sendint32(out, sz);
660
0
  pq_sendbytes(out, message, sz);
661
0
}
662
663
/*
664
 * Write relation description to the output stream.
665
 */
666
void
667
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
668
           Bitmapset *columns,
669
           PublishGencolsType include_gencols_type)
670
0
{
671
0
  char     *relname;
672
673
0
  pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
674
675
  /* transaction ID (if not valid, we're not streaming) */
676
0
  if (TransactionIdIsValid(xid))
677
0
    pq_sendint32(out, xid);
678
679
  /* use Oid as relation identifier */
680
0
  pq_sendint32(out, RelationGetRelid(rel));
681
682
  /* send qualified relation name */
683
0
  logicalrep_write_namespace(out, RelationGetNamespace(rel));
684
0
  relname = RelationGetRelationName(rel);
685
0
  pq_sendstring(out, relname);
686
687
  /* send replica identity */
688
0
  pq_sendbyte(out, rel->rd_rel->relreplident);
689
690
  /* send the attribute info */
691
0
  logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692
0
}
693
694
/*
695
 * Read the relation info from stream and return as LogicalRepRelation.
696
 */
697
LogicalRepRelation *
698
logicalrep_read_rel(StringInfo in)
699
0
{
700
0
  LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
701
702
0
  rel->remoteid = pq_getmsgint(in, 4);
703
704
  /* Read relation name from stream */
705
0
  rel->nspname = pstrdup(logicalrep_read_namespace(in));
706
0
  rel->relname = pstrdup(pq_getmsgstring(in));
707
708
  /* Read the replica identity. */
709
0
  rel->replident = pq_getmsgbyte(in);
710
711
  /* Get attribute description */
712
0
  logicalrep_read_attrs(in, rel);
713
714
0
  return rel;
715
0
}
716
717
/*
718
 * Write type info to the output stream.
719
 *
720
 * This function will always write base type info.
721
 */
722
void
723
logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
724
0
{
725
0
  Oid     basetypoid = getBaseType(typoid);
726
0
  HeapTuple tup;
727
0
  Form_pg_type typtup;
728
729
0
  pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
730
731
  /* transaction ID (if not valid, we're not streaming) */
732
0
  if (TransactionIdIsValid(xid))
733
0
    pq_sendint32(out, xid);
734
735
0
  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
736
0
  if (!HeapTupleIsValid(tup))
737
0
    elog(ERROR, "cache lookup failed for type %u", basetypoid);
738
0
  typtup = (Form_pg_type) GETSTRUCT(tup);
739
740
  /* use Oid as type identifier */
741
0
  pq_sendint32(out, typoid);
742
743
  /* send qualified type name */
744
0
  logicalrep_write_namespace(out, typtup->typnamespace);
745
0
  pq_sendstring(out, NameStr(typtup->typname));
746
747
0
  ReleaseSysCache(tup);
748
0
}
749
750
/*
751
 * Read type info from the output stream.
752
 */
753
void
754
logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
755
0
{
756
0
  ltyp->remoteid = pq_getmsgint(in, 4);
757
758
  /* Read type name from stream */
759
0
  ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
760
0
  ltyp->typname = pstrdup(pq_getmsgstring(in));
761
0
}
762
763
/*
764
 * Write a tuple to the outputstream, in the most efficient format possible.
765
 */
766
static void
767
logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
768
             bool binary, Bitmapset *columns,
769
             PublishGencolsType include_gencols_type)
770
0
{
771
0
  TupleDesc desc;
772
0
  Datum    *values;
773
0
  bool     *isnull;
774
0
  int     i;
775
0
  uint16    nliveatts = 0;
776
777
0
  desc = RelationGetDescr(rel);
778
779
0
  for (i = 0; i < desc->natts; i++)
780
0
  {
781
0
    Form_pg_attribute att = TupleDescAttr(desc, i);
782
783
0
    if (!logicalrep_should_publish_column(att, columns,
784
0
                        include_gencols_type))
785
0
      continue;
786
787
0
    nliveatts++;
788
0
  }
789
0
  pq_sendint16(out, nliveatts);
790
791
0
  slot_getallattrs(slot);
792
0
  values = slot->tts_values;
793
0
  isnull = slot->tts_isnull;
794
795
  /* Write the values */
796
0
  for (i = 0; i < desc->natts; i++)
797
0
  {
798
0
    HeapTuple typtup;
799
0
    Form_pg_type typclass;
800
0
    Form_pg_attribute att = TupleDescAttr(desc, i);
801
802
0
    if (!logicalrep_should_publish_column(att, columns,
803
0
                        include_gencols_type))
804
0
      continue;
805
806
0
    if (isnull[i])
807
0
    {
808
0
      pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
809
0
      continue;
810
0
    }
811
812
0
    if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i])))
813
0
    {
814
      /*
815
       * Unchanged toasted datum.  (Note that we don't promise to detect
816
       * unchanged data in general; this is just a cheap check to avoid
817
       * sending large values unnecessarily.)
818
       */
819
0
      pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
820
0
      continue;
821
0
    }
822
823
0
    typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
824
0
    if (!HeapTupleIsValid(typtup))
825
0
      elog(ERROR, "cache lookup failed for type %u", att->atttypid);
826
0
    typclass = (Form_pg_type) GETSTRUCT(typtup);
827
828
    /*
829
     * Send in binary if requested and type has suitable send function.
830
     */
831
0
    if (binary && OidIsValid(typclass->typsend))
832
0
    {
833
0
      bytea    *outputbytes;
834
0
      int     len;
835
836
0
      pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
837
0
      outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
838
0
      len = VARSIZE(outputbytes) - VARHDRSZ;
839
0
      pq_sendint(out, len, 4);  /* length */
840
0
      pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
841
0
      pfree(outputbytes);
842
0
    }
843
0
    else
844
0
    {
845
0
      char     *outputstr;
846
847
0
      pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
848
0
      outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
849
0
      pq_sendcountedtext(out, outputstr, strlen(outputstr));
850
0
      pfree(outputstr);
851
0
    }
852
853
0
    ReleaseSysCache(typtup);
854
0
  }
855
0
}
856
857
/*
858
 * Read tuple in logical replication format from stream.
859
 */
860
static void
861
logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
862
0
{
863
0
  int     i;
864
0
  int     natts;
865
866
  /* Get number of attributes */
867
0
  natts = pq_getmsgint(in, 2);
868
869
  /* Allocate space for per-column values; zero out unused StringInfoDatas */
870
0
  tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
871
0
  tuple->colstatus = (char *) palloc(natts * sizeof(char));
872
0
  tuple->ncols = natts;
873
874
  /* Read the data */
875
0
  for (i = 0; i < natts; i++)
876
0
  {
877
0
    char     *buff;
878
0
    char    kind;
879
0
    int     len;
880
0
    StringInfo  value = &tuple->colvalues[i];
881
882
0
    kind = pq_getmsgbyte(in);
883
0
    tuple->colstatus[i] = kind;
884
885
0
    switch (kind)
886
0
    {
887
0
      case LOGICALREP_COLUMN_NULL:
888
        /* nothing more to do */
889
0
        break;
890
0
      case LOGICALREP_COLUMN_UNCHANGED:
891
        /* we don't receive the value of an unchanged column */
892
0
        break;
893
0
      case LOGICALREP_COLUMN_TEXT:
894
0
      case LOGICALREP_COLUMN_BINARY:
895
0
        len = pq_getmsgint(in, 4);  /* read length */
896
897
        /* and data */
898
0
        buff = palloc(len + 1);
899
0
        pq_copymsgbytes(in, buff, len);
900
901
        /*
902
         * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
903
         * as input functions require that.  For
904
         * LOGICALREP_COLUMN_BINARY it's not technically required, but
905
         * it's harmless.
906
         */
907
0
        buff[len] = '\0';
908
909
0
        initStringInfoFromString(value, buff, len);
910
0
        break;
911
0
      default:
912
0
        elog(ERROR, "unrecognized data representation type '%c'", kind);
913
0
    }
914
0
  }
915
0
}
916
917
/*
918
 * Write relation attribute metadata to the stream.
919
 */
920
static void
921
logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
922
             PublishGencolsType include_gencols_type)
923
0
{
924
0
  TupleDesc desc;
925
0
  int     i;
926
0
  uint16    nliveatts = 0;
927
0
  Bitmapset  *idattrs = NULL;
928
0
  bool    replidentfull;
929
930
0
  desc = RelationGetDescr(rel);
931
932
  /* send number of live attributes */
933
0
  for (i = 0; i < desc->natts; i++)
934
0
  {
935
0
    Form_pg_attribute att = TupleDescAttr(desc, i);
936
937
0
    if (!logicalrep_should_publish_column(att, columns,
938
0
                        include_gencols_type))
939
0
      continue;
940
941
0
    nliveatts++;
942
0
  }
943
0
  pq_sendint16(out, nliveatts);
944
945
  /* fetch bitmap of REPLICATION IDENTITY attributes */
946
0
  replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
947
0
  if (!replidentfull)
948
0
    idattrs = RelationGetIdentityKeyBitmap(rel);
949
950
  /* send the attributes */
951
0
  for (i = 0; i < desc->natts; i++)
952
0
  {
953
0
    Form_pg_attribute att = TupleDescAttr(desc, i);
954
0
    uint8   flags = 0;
955
956
0
    if (!logicalrep_should_publish_column(att, columns,
957
0
                        include_gencols_type))
958
0
      continue;
959
960
    /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
961
0
    if (replidentfull ||
962
0
      bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
963
0
              idattrs))
964
0
      flags |= LOGICALREP_IS_REPLICA_IDENTITY;
965
966
0
    pq_sendbyte(out, flags);
967
968
    /* attribute name */
969
0
    pq_sendstring(out, NameStr(att->attname));
970
971
    /* attribute type id */
972
0
    pq_sendint32(out, (int) att->atttypid);
973
974
    /* attribute mode */
975
0
    pq_sendint32(out, att->atttypmod);
976
0
  }
977
978
0
  bms_free(idattrs);
979
0
}
980
981
/*
982
 * Read relation attribute metadata from the stream.
983
 */
984
static void
985
logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
986
0
{
987
0
  int     i;
988
0
  int     natts;
989
0
  char    **attnames;
990
0
  Oid      *atttyps;
991
0
  Bitmapset  *attkeys = NULL;
992
993
0
  natts = pq_getmsgint(in, 2);
994
0
  attnames = palloc(natts * sizeof(char *));
995
0
  atttyps = palloc(natts * sizeof(Oid));
996
997
  /* read the attributes */
998
0
  for (i = 0; i < natts; i++)
999
0
  {
1000
0
    uint8   flags;
1001
1002
    /* Check for replica identity column */
1003
0
    flags = pq_getmsgbyte(in);
1004
0
    if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1005
0
      attkeys = bms_add_member(attkeys, i);
1006
1007
    /* attribute name */
1008
0
    attnames[i] = pstrdup(pq_getmsgstring(in));
1009
1010
    /* attribute type id */
1011
0
    atttyps[i] = (Oid) pq_getmsgint(in, 4);
1012
1013
    /* we ignore attribute mode for now */
1014
0
    (void) pq_getmsgint(in, 4);
1015
0
  }
1016
1017
0
  rel->attnames = attnames;
1018
0
  rel->atttyps = atttyps;
1019
0
  rel->attkeys = attkeys;
1020
0
  rel->natts = natts;
1021
0
}
1022
1023
/*
1024
 * Write the namespace name or empty string for pg_catalog (to save space).
1025
 */
1026
static void
1027
logicalrep_write_namespace(StringInfo out, Oid nspid)
1028
0
{
1029
0
  if (nspid == PG_CATALOG_NAMESPACE)
1030
0
    pq_sendbyte(out, '\0');
1031
0
  else
1032
0
  {
1033
0
    char     *nspname = get_namespace_name(nspid);
1034
1035
0
    if (nspname == NULL)
1036
0
      elog(ERROR, "cache lookup failed for namespace %u",
1037
0
         nspid);
1038
1039
0
    pq_sendstring(out, nspname);
1040
0
  }
1041
0
}
1042
1043
/*
1044
 * Read the namespace name while treating empty string as pg_catalog.
1045
 */
1046
static const char *
1047
logicalrep_read_namespace(StringInfo in)
1048
0
{
1049
0
  const char *nspname = pq_getmsgstring(in);
1050
1051
0
  if (nspname[0] == '\0')
1052
0
    nspname = "pg_catalog";
1053
1054
0
  return nspname;
1055
0
}
1056
1057
/*
1058
 * Write the information for the start stream message to the output stream.
1059
 */
1060
void
1061
logicalrep_write_stream_start(StringInfo out,
1062
                TransactionId xid, bool first_segment)
1063
0
{
1064
0
  pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1065
1066
0
  Assert(TransactionIdIsValid(xid));
1067
1068
  /* transaction ID (we're starting to stream, so must be valid) */
1069
0
  pq_sendint32(out, xid);
1070
1071
  /* 1 if this is the first streaming segment for this xid */
1072
0
  pq_sendbyte(out, first_segment ? 1 : 0);
1073
0
}
1074
1075
/*
1076
 * Read the information about the start stream message from output stream.
1077
 */
1078
TransactionId
1079
logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1080
0
{
1081
0
  TransactionId xid;
1082
1083
0
  Assert(first_segment);
1084
1085
0
  xid = pq_getmsgint(in, 4);
1086
0
  *first_segment = (pq_getmsgbyte(in) == 1);
1087
1088
0
  return xid;
1089
0
}
1090
1091
/*
1092
 * Write the stop stream message to the output stream.
1093
 */
1094
void
1095
logicalrep_write_stream_stop(StringInfo out)
1096
0
{
1097
0
  pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1098
0
}
1099
1100
/*
1101
 * Write STREAM COMMIT to the output stream.
1102
 */
1103
void
1104
logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1105
                 XLogRecPtr commit_lsn)
1106
0
{
1107
0
  uint8   flags = 0;
1108
1109
0
  pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1110
1111
0
  Assert(TransactionIdIsValid(txn->xid));
1112
1113
  /* transaction ID */
1114
0
  pq_sendint32(out, txn->xid);
1115
1116
  /* send the flags field (unused for now) */
1117
0
  pq_sendbyte(out, flags);
1118
1119
  /* send fields */
1120
0
  pq_sendint64(out, commit_lsn);
1121
0
  pq_sendint64(out, txn->end_lsn);
1122
0
  pq_sendint64(out, txn->xact_time.commit_time);
1123
0
}
1124
1125
/*
1126
 * Read STREAM COMMIT from the output stream.
1127
 */
1128
TransactionId
1129
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1130
0
{
1131
0
  TransactionId xid;
1132
0
  uint8   flags;
1133
1134
0
  xid = pq_getmsgint(in, 4);
1135
1136
  /* read flags (unused for now) */
1137
0
  flags = pq_getmsgbyte(in);
1138
1139
0
  if (flags != 0)
1140
0
    elog(ERROR, "unrecognized flags %u in commit message", flags);
1141
1142
  /* read fields */
1143
0
  commit_data->commit_lsn = pq_getmsgint64(in);
1144
0
  commit_data->end_lsn = pq_getmsgint64(in);
1145
0
  commit_data->committime = pq_getmsgint64(in);
1146
1147
0
  return xid;
1148
0
}
1149
1150
/*
1151
 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1152
 * same for the top-level transaction abort.
1153
 *
1154
 * If write_abort_info is true, send the abort_lsn and abort_time fields,
1155
 * otherwise don't.
1156
 */
1157
void
1158
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1159
                TransactionId subxid, XLogRecPtr abort_lsn,
1160
                TimestampTz abort_time, bool write_abort_info)
1161
0
{
1162
0
  pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1163
1164
0
  Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1165
1166
  /* transaction ID */
1167
0
  pq_sendint32(out, xid);
1168
0
  pq_sendint32(out, subxid);
1169
1170
0
  if (write_abort_info)
1171
0
  {
1172
0
    pq_sendint64(out, abort_lsn);
1173
0
    pq_sendint64(out, abort_time);
1174
0
  }
1175
0
}
1176
1177
/*
1178
 * Read STREAM ABORT from the output stream.
1179
 *
1180
 * If read_abort_info is true, read the abort_lsn and abort_time fields,
1181
 * otherwise don't.
1182
 */
1183
void
1184
logicalrep_read_stream_abort(StringInfo in,
1185
               LogicalRepStreamAbortData *abort_data,
1186
               bool read_abort_info)
1187
0
{
1188
0
  Assert(abort_data);
1189
1190
0
  abort_data->xid = pq_getmsgint(in, 4);
1191
0
  abort_data->subxid = pq_getmsgint(in, 4);
1192
1193
0
  if (read_abort_info)
1194
0
  {
1195
0
    abort_data->abort_lsn = pq_getmsgint64(in);
1196
0
    abort_data->abort_time = pq_getmsgint64(in);
1197
0
  }
1198
0
  else
1199
0
  {
1200
0
    abort_data->abort_lsn = InvalidXLogRecPtr;
1201
0
    abort_data->abort_time = 0;
1202
0
  }
1203
0
}
1204
1205
/*
1206
 * Get string representing LogicalRepMsgType.
1207
 */
1208
const char *
1209
logicalrep_message_type(LogicalRepMsgType action)
1210
0
{
1211
0
  static char err_unknown[20];
1212
1213
0
  switch (action)
1214
0
  {
1215
0
    case LOGICAL_REP_MSG_BEGIN:
1216
0
      return "BEGIN";
1217
0
    case LOGICAL_REP_MSG_COMMIT:
1218
0
      return "COMMIT";
1219
0
    case LOGICAL_REP_MSG_ORIGIN:
1220
0
      return "ORIGIN";
1221
0
    case LOGICAL_REP_MSG_INSERT:
1222
0
      return "INSERT";
1223
0
    case LOGICAL_REP_MSG_UPDATE:
1224
0
      return "UPDATE";
1225
0
    case LOGICAL_REP_MSG_DELETE:
1226
0
      return "DELETE";
1227
0
    case LOGICAL_REP_MSG_TRUNCATE:
1228
0
      return "TRUNCATE";
1229
0
    case LOGICAL_REP_MSG_RELATION:
1230
0
      return "RELATION";
1231
0
    case LOGICAL_REP_MSG_TYPE:
1232
0
      return "TYPE";
1233
0
    case LOGICAL_REP_MSG_MESSAGE:
1234
0
      return "MESSAGE";
1235
0
    case LOGICAL_REP_MSG_BEGIN_PREPARE:
1236
0
      return "BEGIN PREPARE";
1237
0
    case LOGICAL_REP_MSG_PREPARE:
1238
0
      return "PREPARE";
1239
0
    case LOGICAL_REP_MSG_COMMIT_PREPARED:
1240
0
      return "COMMIT PREPARED";
1241
0
    case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1242
0
      return "ROLLBACK PREPARED";
1243
0
    case LOGICAL_REP_MSG_STREAM_START:
1244
0
      return "STREAM START";
1245
0
    case LOGICAL_REP_MSG_STREAM_STOP:
1246
0
      return "STREAM STOP";
1247
0
    case LOGICAL_REP_MSG_STREAM_COMMIT:
1248
0
      return "STREAM COMMIT";
1249
0
    case LOGICAL_REP_MSG_STREAM_ABORT:
1250
0
      return "STREAM ABORT";
1251
0
    case LOGICAL_REP_MSG_STREAM_PREPARE:
1252
0
      return "STREAM PREPARE";
1253
0
  }
1254
1255
  /*
1256
   * This message provides context in the error raised when applying a
1257
   * logical message. So we can't throw an error here. Return an unknown
1258
   * indicator value so that the original error is still reported.
1259
   */
1260
0
  snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262
0
  return err_unknown;
1263
0
}
1264
1265
/*
1266
 * Check if the column 'att' of a table should be published.
1267
 *
1268
 * 'columns' represents the publication column list (if any) for that table.
1269
 *
1270
 * 'include_gencols_type' value indicates whether generated columns should be
1271
 * published when there is no column list. Typically, this will have the same
1272
 * value as the 'publish_generated_columns' publication parameter.
1273
 *
1274
 * Note that generated columns can be published only when present in a
1275
 * publication column list, or when include_gencols_type is
1276
 * PUBLISH_GENCOLS_STORED.
1277
 */
1278
bool
1279
logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
1280
                 PublishGencolsType include_gencols_type)
1281
0
{
1282
0
  if (att->attisdropped)
1283
0
    return false;
1284
1285
  /* If a column list is provided, publish only the cols in that list. */
1286
0
  if (columns)
1287
0
    return bms_is_member(att->attnum, columns);
1288
1289
  /* All non-generated columns are always published. */
1290
0
  if (!att->attgenerated)
1291
0
    return true;
1292
1293
  /*
1294
   * Stored generated columns are only published when the user sets
1295
   * publish_generated_columns as stored.
1296
   */
1297
0
  if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298
0
    return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300
0
  return false;
1301
0
}