/src/postgres/src/backend/replication/logical/proto.c
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * proto.c |
4 | | * logical replication protocol functions |
5 | | * |
6 | | * Copyright (c) 2015-2025, PostgreSQL Global Development Group |
7 | | * |
8 | | * IDENTIFICATION |
9 | | * src/backend/replication/logical/proto.c |
10 | | * |
11 | | *------------------------------------------------------------------------- |
12 | | */ |
13 | | #include "postgres.h" |
14 | | |
15 | | #include "access/sysattr.h" |
16 | | #include "catalog/pg_namespace.h" |
17 | | #include "catalog/pg_type.h" |
18 | | #include "libpq/pqformat.h" |
19 | | #include "replication/logicalproto.h" |
20 | | #include "utils/lsyscache.h" |
21 | | #include "utils/syscache.h" |
22 | | |
23 | | /* |
24 | | * Protocol message flags. |
25 | | */ |
26 | 0 | #define LOGICALREP_IS_REPLICA_IDENTITY 1 |
27 | | |
28 | 0 | #define MESSAGE_TRANSACTIONAL (1<<0) |
29 | 0 | #define TRUNCATE_CASCADE (1<<0) |
30 | 0 | #define TRUNCATE_RESTART_SEQS (1<<1) |
31 | | |
32 | | static void logicalrep_write_attrs(StringInfo out, Relation rel, |
33 | | Bitmapset *columns, |
34 | | PublishGencolsType include_gencols_type); |
35 | | static void logicalrep_write_tuple(StringInfo out, Relation rel, |
36 | | TupleTableSlot *slot, |
37 | | bool binary, Bitmapset *columns, |
38 | | PublishGencolsType include_gencols_type); |
39 | | static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); |
40 | | static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); |
41 | | |
42 | | static void logicalrep_write_namespace(StringInfo out, Oid nspid); |
43 | | static const char *logicalrep_read_namespace(StringInfo in); |
44 | | |
45 | | /* |
46 | | * Write BEGIN to the output stream. |
47 | | */ |
48 | | void |
49 | | logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) |
50 | 0 | { |
51 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); |
52 | | |
53 | | /* fixed fields */ |
54 | 0 | pq_sendint64(out, txn->final_lsn); |
55 | 0 | pq_sendint64(out, txn->xact_time.commit_time); |
56 | 0 | pq_sendint32(out, txn->xid); |
57 | 0 | } |
58 | | |
59 | | /* |
60 | | * Read transaction BEGIN from the stream. |
61 | | */ |
62 | | void |
63 | | logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) |
64 | 0 | { |
65 | | /* read fields */ |
66 | 0 | begin_data->final_lsn = pq_getmsgint64(in); |
67 | 0 | if (begin_data->final_lsn == InvalidXLogRecPtr) |
68 | 0 | elog(ERROR, "final_lsn not set in begin message"); |
69 | 0 | begin_data->committime = pq_getmsgint64(in); |
70 | 0 | begin_data->xid = pq_getmsgint(in, 4); |
71 | 0 | } |
72 | | |
73 | | |
74 | | /* |
75 | | * Write COMMIT to the output stream. |
76 | | */ |
77 | | void |
78 | | logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, |
79 | | XLogRecPtr commit_lsn) |
80 | 0 | { |
81 | 0 | uint8 flags = 0; |
82 | |
|
83 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); |
84 | | |
85 | | /* send the flags field (unused for now) */ |
86 | 0 | pq_sendbyte(out, flags); |
87 | | |
88 | | /* send fields */ |
89 | 0 | pq_sendint64(out, commit_lsn); |
90 | 0 | pq_sendint64(out, txn->end_lsn); |
91 | 0 | pq_sendint64(out, txn->xact_time.commit_time); |
92 | 0 | } |
93 | | |
94 | | /* |
95 | | * Read transaction COMMIT from the stream. |
96 | | */ |
97 | | void |
98 | | logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) |
99 | 0 | { |
100 | | /* read flags (unused for now) */ |
101 | 0 | uint8 flags = pq_getmsgbyte(in); |
102 | |
|
103 | 0 | if (flags != 0) |
104 | 0 | elog(ERROR, "unrecognized flags %u in commit message", flags); |
105 | | |
106 | | /* read fields */ |
107 | 0 | commit_data->commit_lsn = pq_getmsgint64(in); |
108 | 0 | commit_data->end_lsn = pq_getmsgint64(in); |
109 | 0 | commit_data->committime = pq_getmsgint64(in); |
110 | 0 | } |
111 | | |
112 | | /* |
113 | | * Write BEGIN PREPARE to the output stream. |
114 | | */ |
115 | | void |
116 | | logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) |
117 | 0 | { |
118 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); |
119 | | |
120 | | /* fixed fields */ |
121 | 0 | pq_sendint64(out, txn->final_lsn); |
122 | 0 | pq_sendint64(out, txn->end_lsn); |
123 | 0 | pq_sendint64(out, txn->xact_time.prepare_time); |
124 | 0 | pq_sendint32(out, txn->xid); |
125 | | |
126 | | /* send gid */ |
127 | 0 | pq_sendstring(out, txn->gid); |
128 | 0 | } |
129 | | |
130 | | /* |
131 | | * Read transaction BEGIN PREPARE from the stream. |
132 | | */ |
133 | | void |
134 | | logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) |
135 | 0 | { |
136 | | /* read fields */ |
137 | 0 | begin_data->prepare_lsn = pq_getmsgint64(in); |
138 | 0 | if (begin_data->prepare_lsn == InvalidXLogRecPtr) |
139 | 0 | elog(ERROR, "prepare_lsn not set in begin prepare message"); |
140 | 0 | begin_data->end_lsn = pq_getmsgint64(in); |
141 | 0 | if (begin_data->end_lsn == InvalidXLogRecPtr) |
142 | 0 | elog(ERROR, "end_lsn not set in begin prepare message"); |
143 | 0 | begin_data->prepare_time = pq_getmsgint64(in); |
144 | 0 | begin_data->xid = pq_getmsgint(in, 4); |
145 | | |
146 | | /* read gid (copy it into a pre-allocated buffer) */ |
147 | 0 | strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid)); |
148 | 0 | } |
149 | | |
150 | | /* |
151 | | * The core functionality for logicalrep_write_prepare and |
152 | | * logicalrep_write_stream_prepare. |
153 | | */ |
154 | | static void |
155 | | logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, |
156 | | ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) |
157 | 0 | { |
158 | 0 | uint8 flags = 0; |
159 | |
|
160 | 0 | pq_sendbyte(out, type); |
161 | | |
162 | | /* |
163 | | * This should only ever happen for two-phase commit transactions, in |
164 | | * which case we expect to have a valid GID. |
165 | | */ |
166 | 0 | Assert(txn->gid != NULL); |
167 | 0 | Assert(rbtxn_is_prepared(txn)); |
168 | 0 | Assert(TransactionIdIsValid(txn->xid)); |
169 | | |
170 | | /* send the flags field */ |
171 | 0 | pq_sendbyte(out, flags); |
172 | | |
173 | | /* send fields */ |
174 | 0 | pq_sendint64(out, prepare_lsn); |
175 | 0 | pq_sendint64(out, txn->end_lsn); |
176 | 0 | pq_sendint64(out, txn->xact_time.prepare_time); |
177 | 0 | pq_sendint32(out, txn->xid); |
178 | | |
179 | | /* send gid */ |
180 | 0 | pq_sendstring(out, txn->gid); |
181 | 0 | } |
182 | | |
183 | | /* |
184 | | * Write PREPARE to the output stream. |
185 | | */ |
186 | | void |
187 | | logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, |
188 | | XLogRecPtr prepare_lsn) |
189 | 0 | { |
190 | 0 | logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE, |
191 | 0 | txn, prepare_lsn); |
192 | 0 | } |
193 | | |
194 | | /* |
195 | | * The core functionality for logicalrep_read_prepare and |
196 | | * logicalrep_read_stream_prepare. |
197 | | */ |
198 | | static void |
199 | | logicalrep_read_prepare_common(StringInfo in, char *msgtype, |
200 | | LogicalRepPreparedTxnData *prepare_data) |
201 | 0 | { |
202 | | /* read flags */ |
203 | 0 | uint8 flags = pq_getmsgbyte(in); |
204 | |
|
205 | 0 | if (flags != 0) |
206 | 0 | elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype); |
207 | | |
208 | | /* read fields */ |
209 | 0 | prepare_data->prepare_lsn = pq_getmsgint64(in); |
210 | 0 | if (prepare_data->prepare_lsn == InvalidXLogRecPtr) |
211 | 0 | elog(ERROR, "prepare_lsn is not set in %s message", msgtype); |
212 | 0 | prepare_data->end_lsn = pq_getmsgint64(in); |
213 | 0 | if (prepare_data->end_lsn == InvalidXLogRecPtr) |
214 | 0 | elog(ERROR, "end_lsn is not set in %s message", msgtype); |
215 | 0 | prepare_data->prepare_time = pq_getmsgint64(in); |
216 | 0 | prepare_data->xid = pq_getmsgint(in, 4); |
217 | 0 | if (prepare_data->xid == InvalidTransactionId) |
218 | 0 | elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype); |
219 | | |
220 | | /* read gid (copy it into a pre-allocated buffer) */ |
221 | 0 | strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); |
222 | 0 | } |
223 | | |
224 | | /* |
225 | | * Read transaction PREPARE from the stream. |
226 | | */ |
227 | | void |
228 | | logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) |
229 | 0 | { |
230 | 0 | logicalrep_read_prepare_common(in, "prepare", prepare_data); |
231 | 0 | } |
232 | | |
233 | | /* |
234 | | * Write COMMIT PREPARED to the output stream. |
235 | | */ |
236 | | void |
237 | | logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, |
238 | | XLogRecPtr commit_lsn) |
239 | 0 | { |
240 | 0 | uint8 flags = 0; |
241 | |
|
242 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); |
243 | | |
244 | | /* |
245 | | * This should only ever happen for two-phase commit transactions, in |
246 | | * which case we expect to have a valid GID. |
247 | | */ |
248 | 0 | Assert(txn->gid != NULL); |
249 | | |
250 | | /* send the flags field */ |
251 | 0 | pq_sendbyte(out, flags); |
252 | | |
253 | | /* send fields */ |
254 | 0 | pq_sendint64(out, commit_lsn); |
255 | 0 | pq_sendint64(out, txn->end_lsn); |
256 | 0 | pq_sendint64(out, txn->xact_time.commit_time); |
257 | 0 | pq_sendint32(out, txn->xid); |
258 | | |
259 | | /* send gid */ |
260 | 0 | pq_sendstring(out, txn->gid); |
261 | 0 | } |
262 | | |
263 | | /* |
264 | | * Read transaction COMMIT PREPARED from the stream. |
265 | | */ |
266 | | void |
267 | | logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data) |
268 | 0 | { |
269 | | /* read flags */ |
270 | 0 | uint8 flags = pq_getmsgbyte(in); |
271 | |
|
272 | 0 | if (flags != 0) |
273 | 0 | elog(ERROR, "unrecognized flags %u in commit prepared message", flags); |
274 | | |
275 | | /* read fields */ |
276 | 0 | prepare_data->commit_lsn = pq_getmsgint64(in); |
277 | 0 | if (prepare_data->commit_lsn == InvalidXLogRecPtr) |
278 | 0 | elog(ERROR, "commit_lsn is not set in commit prepared message"); |
279 | 0 | prepare_data->end_lsn = pq_getmsgint64(in); |
280 | 0 | if (prepare_data->end_lsn == InvalidXLogRecPtr) |
281 | 0 | elog(ERROR, "end_lsn is not set in commit prepared message"); |
282 | 0 | prepare_data->commit_time = pq_getmsgint64(in); |
283 | 0 | prepare_data->xid = pq_getmsgint(in, 4); |
284 | | |
285 | | /* read gid (copy it into a pre-allocated buffer) */ |
286 | 0 | strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); |
287 | 0 | } |
288 | | |
289 | | /* |
290 | | * Write ROLLBACK PREPARED to the output stream. |
291 | | */ |
292 | | void |
293 | | logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, |
294 | | XLogRecPtr prepare_end_lsn, |
295 | | TimestampTz prepare_time) |
296 | 0 | { |
297 | 0 | uint8 flags = 0; |
298 | |
|
299 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); |
300 | | |
301 | | /* |
302 | | * This should only ever happen for two-phase commit transactions, in |
303 | | * which case we expect to have a valid GID. |
304 | | */ |
305 | 0 | Assert(txn->gid != NULL); |
306 | | |
307 | | /* send the flags field */ |
308 | 0 | pq_sendbyte(out, flags); |
309 | | |
310 | | /* send fields */ |
311 | 0 | pq_sendint64(out, prepare_end_lsn); |
312 | 0 | pq_sendint64(out, txn->end_lsn); |
313 | 0 | pq_sendint64(out, prepare_time); |
314 | 0 | pq_sendint64(out, txn->xact_time.commit_time); |
315 | 0 | pq_sendint32(out, txn->xid); |
316 | | |
317 | | /* send gid */ |
318 | 0 | pq_sendstring(out, txn->gid); |
319 | 0 | } |
320 | | |
321 | | /* |
322 | | * Read transaction ROLLBACK PREPARED from the stream. |
323 | | */ |
324 | | void |
325 | | logicalrep_read_rollback_prepared(StringInfo in, |
326 | | LogicalRepRollbackPreparedTxnData *rollback_data) |
327 | 0 | { |
328 | | /* read flags */ |
329 | 0 | uint8 flags = pq_getmsgbyte(in); |
330 | |
|
331 | 0 | if (flags != 0) |
332 | 0 | elog(ERROR, "unrecognized flags %u in rollback prepared message", flags); |
333 | | |
334 | | /* read fields */ |
335 | 0 | rollback_data->prepare_end_lsn = pq_getmsgint64(in); |
336 | 0 | if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) |
337 | 0 | elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); |
338 | 0 | rollback_data->rollback_end_lsn = pq_getmsgint64(in); |
339 | 0 | if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) |
340 | 0 | elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); |
341 | 0 | rollback_data->prepare_time = pq_getmsgint64(in); |
342 | 0 | rollback_data->rollback_time = pq_getmsgint64(in); |
343 | 0 | rollback_data->xid = pq_getmsgint(in, 4); |
344 | | |
345 | | /* read gid (copy it into a pre-allocated buffer) */ |
346 | 0 | strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); |
347 | 0 | } |
348 | | |
349 | | /* |
350 | | * Write STREAM PREPARE to the output stream. |
351 | | */ |
352 | | void |
353 | | logicalrep_write_stream_prepare(StringInfo out, |
354 | | ReorderBufferTXN *txn, |
355 | | XLogRecPtr prepare_lsn) |
356 | 0 | { |
357 | 0 | logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE, |
358 | 0 | txn, prepare_lsn); |
359 | 0 | } |
360 | | |
361 | | /* |
362 | | * Read STREAM PREPARE from the stream. |
363 | | */ |
364 | | void |
365 | | logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) |
366 | 0 | { |
367 | 0 | logicalrep_read_prepare_common(in, "stream prepare", prepare_data); |
368 | 0 | } |
369 | | |
370 | | /* |
371 | | * Write ORIGIN to the output stream. |
372 | | */ |
373 | | void |
374 | | logicalrep_write_origin(StringInfo out, const char *origin, |
375 | | XLogRecPtr origin_lsn) |
376 | 0 | { |
377 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); |
378 | | |
379 | | /* fixed fields */ |
380 | 0 | pq_sendint64(out, origin_lsn); |
381 | | |
382 | | /* origin string */ |
383 | 0 | pq_sendstring(out, origin); |
384 | 0 | } |
385 | | |
386 | | /* |
387 | | * Read ORIGIN from the output stream. |
388 | | */ |
389 | | char * |
390 | | logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) |
391 | 0 | { |
392 | | /* fixed fields */ |
393 | 0 | *origin_lsn = pq_getmsgint64(in); |
394 | | |
395 | | /* return origin */ |
396 | 0 | return pstrdup(pq_getmsgstring(in)); |
397 | 0 | } |
398 | | |
399 | | /* |
400 | | * Write INSERT to the output stream. |
401 | | */ |
402 | | void |
403 | | logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, |
404 | | TupleTableSlot *newslot, bool binary, |
405 | | Bitmapset *columns, |
406 | | PublishGencolsType include_gencols_type) |
407 | 0 | { |
408 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); |
409 | | |
410 | | /* transaction ID (if not valid, we're not streaming) */ |
411 | 0 | if (TransactionIdIsValid(xid)) |
412 | 0 | pq_sendint32(out, xid); |
413 | | |
414 | | /* use Oid as relation identifier */ |
415 | 0 | pq_sendint32(out, RelationGetRelid(rel)); |
416 | |
|
417 | 0 | pq_sendbyte(out, 'N'); /* new tuple follows */ |
418 | 0 | logicalrep_write_tuple(out, rel, newslot, binary, columns, |
419 | 0 | include_gencols_type); |
420 | 0 | } |
421 | | |
422 | | /* |
423 | | * Read INSERT from stream. |
424 | | * |
425 | | * Fills the new tuple. |
426 | | */ |
427 | | LogicalRepRelId |
428 | | logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) |
429 | 0 | { |
430 | 0 | char action; |
431 | 0 | LogicalRepRelId relid; |
432 | | |
433 | | /* read the relation id */ |
434 | 0 | relid = pq_getmsgint(in, 4); |
435 | |
|
436 | 0 | action = pq_getmsgbyte(in); |
437 | 0 | if (action != 'N') |
438 | 0 | elog(ERROR, "expected new tuple but got %d", |
439 | 0 | action); |
440 | | |
441 | 0 | logicalrep_read_tuple(in, newtup); |
442 | |
|
443 | 0 | return relid; |
444 | 0 | } |
445 | | |
446 | | /* |
447 | | * Write UPDATE to the output stream. |
448 | | */ |
449 | | void |
450 | | logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, |
451 | | TupleTableSlot *oldslot, TupleTableSlot *newslot, |
452 | | bool binary, Bitmapset *columns, |
453 | | PublishGencolsType include_gencols_type) |
454 | 0 | { |
455 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); |
456 | |
|
457 | 0 | Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || |
458 | 0 | rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || |
459 | 0 | rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); |
460 | | |
461 | | /* transaction ID (if not valid, we're not streaming) */ |
462 | 0 | if (TransactionIdIsValid(xid)) |
463 | 0 | pq_sendint32(out, xid); |
464 | | |
465 | | /* use Oid as relation identifier */ |
466 | 0 | pq_sendint32(out, RelationGetRelid(rel)); |
467 | |
|
468 | 0 | if (oldslot != NULL) |
469 | 0 | { |
470 | 0 | if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
471 | 0 | pq_sendbyte(out, 'O'); /* old tuple follows */ |
472 | 0 | else |
473 | 0 | pq_sendbyte(out, 'K'); /* old key follows */ |
474 | 0 | logicalrep_write_tuple(out, rel, oldslot, binary, columns, |
475 | 0 | include_gencols_type); |
476 | 0 | } |
477 | |
|
478 | 0 | pq_sendbyte(out, 'N'); /* new tuple follows */ |
479 | 0 | logicalrep_write_tuple(out, rel, newslot, binary, columns, |
480 | 0 | include_gencols_type); |
481 | 0 | } |
482 | | |
483 | | /* |
484 | | * Read UPDATE from stream. |
485 | | */ |
486 | | LogicalRepRelId |
487 | | logicalrep_read_update(StringInfo in, bool *has_oldtuple, |
488 | | LogicalRepTupleData *oldtup, |
489 | | LogicalRepTupleData *newtup) |
490 | 0 | { |
491 | 0 | char action; |
492 | 0 | LogicalRepRelId relid; |
493 | | |
494 | | /* read the relation id */ |
495 | 0 | relid = pq_getmsgint(in, 4); |
496 | | |
497 | | /* read and verify action */ |
498 | 0 | action = pq_getmsgbyte(in); |
499 | 0 | if (action != 'K' && action != 'O' && action != 'N') |
500 | 0 | elog(ERROR, "expected action 'N', 'O' or 'K', got %c", |
501 | 0 | action); |
502 | | |
503 | | /* check for old tuple */ |
504 | 0 | if (action == 'K' || action == 'O') |
505 | 0 | { |
506 | 0 | logicalrep_read_tuple(in, oldtup); |
507 | 0 | *has_oldtuple = true; |
508 | |
|
509 | 0 | action = pq_getmsgbyte(in); |
510 | 0 | } |
511 | 0 | else |
512 | 0 | *has_oldtuple = false; |
513 | | |
514 | | /* check for new tuple */ |
515 | 0 | if (action != 'N') |
516 | 0 | elog(ERROR, "expected action 'N', got %c", |
517 | 0 | action); |
518 | | |
519 | 0 | logicalrep_read_tuple(in, newtup); |
520 | |
|
521 | 0 | return relid; |
522 | 0 | } |
523 | | |
524 | | /* |
525 | | * Write DELETE to the output stream. |
526 | | */ |
527 | | void |
528 | | logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, |
529 | | TupleTableSlot *oldslot, bool binary, |
530 | | Bitmapset *columns, |
531 | | PublishGencolsType include_gencols_type) |
532 | 0 | { |
533 | 0 | Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || |
534 | 0 | rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || |
535 | 0 | rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); |
536 | |
|
537 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); |
538 | | |
539 | | /* transaction ID (if not valid, we're not streaming) */ |
540 | 0 | if (TransactionIdIsValid(xid)) |
541 | 0 | pq_sendint32(out, xid); |
542 | | |
543 | | /* use Oid as relation identifier */ |
544 | 0 | pq_sendint32(out, RelationGetRelid(rel)); |
545 | |
|
546 | 0 | if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
547 | 0 | pq_sendbyte(out, 'O'); /* old tuple follows */ |
548 | 0 | else |
549 | 0 | pq_sendbyte(out, 'K'); /* old key follows */ |
550 | |
|
551 | 0 | logicalrep_write_tuple(out, rel, oldslot, binary, columns, |
552 | 0 | include_gencols_type); |
553 | 0 | } |
554 | | |
555 | | /* |
556 | | * Read DELETE from stream. |
557 | | * |
558 | | * Fills the old tuple. |
559 | | */ |
560 | | LogicalRepRelId |
561 | | logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) |
562 | 0 | { |
563 | 0 | char action; |
564 | 0 | LogicalRepRelId relid; |
565 | | |
566 | | /* read the relation id */ |
567 | 0 | relid = pq_getmsgint(in, 4); |
568 | | |
569 | | /* read and verify action */ |
570 | 0 | action = pq_getmsgbyte(in); |
571 | 0 | if (action != 'K' && action != 'O') |
572 | 0 | elog(ERROR, "expected action 'O' or 'K', got %c", action); |
573 | | |
574 | 0 | logicalrep_read_tuple(in, oldtup); |
575 | |
|
576 | 0 | return relid; |
577 | 0 | } |
578 | | |
579 | | /* |
580 | | * Write TRUNCATE to the output stream. |
581 | | */ |
582 | | void |
583 | | logicalrep_write_truncate(StringInfo out, |
584 | | TransactionId xid, |
585 | | int nrelids, |
586 | | Oid relids[], |
587 | | bool cascade, bool restart_seqs) |
588 | 0 | { |
589 | 0 | int i; |
590 | 0 | uint8 flags = 0; |
591 | |
|
592 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); |
593 | | |
594 | | /* transaction ID (if not valid, we're not streaming) */ |
595 | 0 | if (TransactionIdIsValid(xid)) |
596 | 0 | pq_sendint32(out, xid); |
597 | |
|
598 | 0 | pq_sendint32(out, nrelids); |
599 | | |
600 | | /* encode and send truncate flags */ |
601 | 0 | if (cascade) |
602 | 0 | flags |= TRUNCATE_CASCADE; |
603 | 0 | if (restart_seqs) |
604 | 0 | flags |= TRUNCATE_RESTART_SEQS; |
605 | 0 | pq_sendint8(out, flags); |
606 | |
|
607 | 0 | for (i = 0; i < nrelids; i++) |
608 | 0 | pq_sendint32(out, relids[i]); |
609 | 0 | } |
610 | | |
611 | | /* |
612 | | * Read TRUNCATE from stream. |
613 | | */ |
614 | | List * |
615 | | logicalrep_read_truncate(StringInfo in, |
616 | | bool *cascade, bool *restart_seqs) |
617 | 0 | { |
618 | 0 | int i; |
619 | 0 | int nrelids; |
620 | 0 | List *relids = NIL; |
621 | 0 | uint8 flags; |
622 | |
|
623 | 0 | nrelids = pq_getmsgint(in, 4); |
624 | | |
625 | | /* read and decode truncate flags */ |
626 | 0 | flags = pq_getmsgint(in, 1); |
627 | 0 | *cascade = (flags & TRUNCATE_CASCADE) > 0; |
628 | 0 | *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; |
629 | |
|
630 | 0 | for (i = 0; i < nrelids; i++) |
631 | 0 | relids = lappend_oid(relids, pq_getmsgint(in, 4)); |
632 | |
|
633 | 0 | return relids; |
634 | 0 | } |
635 | | |
636 | | /* |
637 | | * Write MESSAGE to stream |
638 | | */ |
639 | | void |
640 | | logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, |
641 | | bool transactional, const char *prefix, Size sz, |
642 | | const char *message) |
643 | 0 | { |
644 | 0 | uint8 flags = 0; |
645 | |
|
646 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); |
647 | | |
648 | | /* encode and send message flags */ |
649 | 0 | if (transactional) |
650 | 0 | flags |= MESSAGE_TRANSACTIONAL; |
651 | | |
652 | | /* transaction ID (if not valid, we're not streaming) */ |
653 | 0 | if (TransactionIdIsValid(xid)) |
654 | 0 | pq_sendint32(out, xid); |
655 | |
|
656 | 0 | pq_sendint8(out, flags); |
657 | 0 | pq_sendint64(out, lsn); |
658 | 0 | pq_sendstring(out, prefix); |
659 | 0 | pq_sendint32(out, sz); |
660 | 0 | pq_sendbytes(out, message, sz); |
661 | 0 | } |
662 | | |
663 | | /* |
664 | | * Write relation description to the output stream. |
665 | | */ |
666 | | void |
667 | | logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, |
668 | | Bitmapset *columns, |
669 | | PublishGencolsType include_gencols_type) |
670 | 0 | { |
671 | 0 | char *relname; |
672 | |
|
673 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); |
674 | | |
675 | | /* transaction ID (if not valid, we're not streaming) */ |
676 | 0 | if (TransactionIdIsValid(xid)) |
677 | 0 | pq_sendint32(out, xid); |
678 | | |
679 | | /* use Oid as relation identifier */ |
680 | 0 | pq_sendint32(out, RelationGetRelid(rel)); |
681 | | |
682 | | /* send qualified relation name */ |
683 | 0 | logicalrep_write_namespace(out, RelationGetNamespace(rel)); |
684 | 0 | relname = RelationGetRelationName(rel); |
685 | 0 | pq_sendstring(out, relname); |
686 | | |
687 | | /* send replica identity */ |
688 | 0 | pq_sendbyte(out, rel->rd_rel->relreplident); |
689 | | |
690 | | /* send the attribute info */ |
691 | 0 | logicalrep_write_attrs(out, rel, columns, include_gencols_type); |
692 | 0 | } |
693 | | |
694 | | /* |
695 | | * Read the relation info from stream and return as LogicalRepRelation. |
696 | | */ |
697 | | LogicalRepRelation * |
698 | | logicalrep_read_rel(StringInfo in) |
699 | 0 | { |
700 | 0 | LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation)); |
701 | |
|
702 | 0 | rel->remoteid = pq_getmsgint(in, 4); |
703 | | |
704 | | /* Read relation name from stream */ |
705 | 0 | rel->nspname = pstrdup(logicalrep_read_namespace(in)); |
706 | 0 | rel->relname = pstrdup(pq_getmsgstring(in)); |
707 | | |
708 | | /* Read the replica identity. */ |
709 | 0 | rel->replident = pq_getmsgbyte(in); |
710 | | |
711 | | /* Get attribute description */ |
712 | 0 | logicalrep_read_attrs(in, rel); |
713 | |
|
714 | 0 | return rel; |
715 | 0 | } |
716 | | |
717 | | /* |
718 | | * Write type info to the output stream. |
719 | | * |
720 | | * This function will always write base type info. |
721 | | */ |
722 | | void |
723 | | logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) |
724 | 0 | { |
725 | 0 | Oid basetypoid = getBaseType(typoid); |
726 | 0 | HeapTuple tup; |
727 | 0 | Form_pg_type typtup; |
728 | |
|
729 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); |
730 | | |
731 | | /* transaction ID (if not valid, we're not streaming) */ |
732 | 0 | if (TransactionIdIsValid(xid)) |
733 | 0 | pq_sendint32(out, xid); |
734 | |
|
735 | 0 | tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); |
736 | 0 | if (!HeapTupleIsValid(tup)) |
737 | 0 | elog(ERROR, "cache lookup failed for type %u", basetypoid); |
738 | 0 | typtup = (Form_pg_type) GETSTRUCT(tup); |
739 | | |
740 | | /* use Oid as type identifier */ |
741 | 0 | pq_sendint32(out, typoid); |
742 | | |
743 | | /* send qualified type name */ |
744 | 0 | logicalrep_write_namespace(out, typtup->typnamespace); |
745 | 0 | pq_sendstring(out, NameStr(typtup->typname)); |
746 | |
|
747 | 0 | ReleaseSysCache(tup); |
748 | 0 | } |
749 | | |
750 | | /* |
751 | | * Read type info from the output stream. |
752 | | */ |
753 | | void |
754 | | logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) |
755 | 0 | { |
756 | 0 | ltyp->remoteid = pq_getmsgint(in, 4); |
757 | | |
758 | | /* Read type name from stream */ |
759 | 0 | ltyp->nspname = pstrdup(logicalrep_read_namespace(in)); |
760 | 0 | ltyp->typname = pstrdup(pq_getmsgstring(in)); |
761 | 0 | } |
762 | | |
763 | | /* |
764 | | * Write a tuple to the outputstream, in the most efficient format possible. |
765 | | */ |
766 | | static void |
767 | | logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, |
768 | | bool binary, Bitmapset *columns, |
769 | | PublishGencolsType include_gencols_type) |
770 | 0 | { |
771 | 0 | TupleDesc desc; |
772 | 0 | Datum *values; |
773 | 0 | bool *isnull; |
774 | 0 | int i; |
775 | 0 | uint16 nliveatts = 0; |
776 | |
|
777 | 0 | desc = RelationGetDescr(rel); |
778 | |
|
779 | 0 | for (i = 0; i < desc->natts; i++) |
780 | 0 | { |
781 | 0 | Form_pg_attribute att = TupleDescAttr(desc, i); |
782 | |
|
783 | 0 | if (!logicalrep_should_publish_column(att, columns, |
784 | 0 | include_gencols_type)) |
785 | 0 | continue; |
786 | | |
787 | 0 | nliveatts++; |
788 | 0 | } |
789 | 0 | pq_sendint16(out, nliveatts); |
790 | |
|
791 | 0 | slot_getallattrs(slot); |
792 | 0 | values = slot->tts_values; |
793 | 0 | isnull = slot->tts_isnull; |
794 | | |
795 | | /* Write the values */ |
796 | 0 | for (i = 0; i < desc->natts; i++) |
797 | 0 | { |
798 | 0 | HeapTuple typtup; |
799 | 0 | Form_pg_type typclass; |
800 | 0 | Form_pg_attribute att = TupleDescAttr(desc, i); |
801 | |
|
802 | 0 | if (!logicalrep_should_publish_column(att, columns, |
803 | 0 | include_gencols_type)) |
804 | 0 | continue; |
805 | | |
806 | 0 | if (isnull[i]) |
807 | 0 | { |
808 | 0 | pq_sendbyte(out, LOGICALREP_COLUMN_NULL); |
809 | 0 | continue; |
810 | 0 | } |
811 | | |
812 | 0 | if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(values[i]))) |
813 | 0 | { |
814 | | /* |
815 | | * Unchanged toasted datum. (Note that we don't promise to detect |
816 | | * unchanged data in general; this is just a cheap check to avoid |
817 | | * sending large values unnecessarily.) |
818 | | */ |
819 | 0 | pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); |
820 | 0 | continue; |
821 | 0 | } |
822 | | |
823 | 0 | typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); |
824 | 0 | if (!HeapTupleIsValid(typtup)) |
825 | 0 | elog(ERROR, "cache lookup failed for type %u", att->atttypid); |
826 | 0 | typclass = (Form_pg_type) GETSTRUCT(typtup); |
827 | | |
828 | | /* |
829 | | * Send in binary if requested and type has suitable send function. |
830 | | */ |
831 | 0 | if (binary && OidIsValid(typclass->typsend)) |
832 | 0 | { |
833 | 0 | bytea *outputbytes; |
834 | 0 | int len; |
835 | |
|
836 | 0 | pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); |
837 | 0 | outputbytes = OidSendFunctionCall(typclass->typsend, values[i]); |
838 | 0 | len = VARSIZE(outputbytes) - VARHDRSZ; |
839 | 0 | pq_sendint(out, len, 4); /* length */ |
840 | 0 | pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ |
841 | 0 | pfree(outputbytes); |
842 | 0 | } |
843 | 0 | else |
844 | 0 | { |
845 | 0 | char *outputstr; |
846 | |
|
847 | 0 | pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); |
848 | 0 | outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); |
849 | 0 | pq_sendcountedtext(out, outputstr, strlen(outputstr)); |
850 | 0 | pfree(outputstr); |
851 | 0 | } |
852 | |
|
853 | 0 | ReleaseSysCache(typtup); |
854 | 0 | } |
855 | 0 | } |
856 | | |
857 | | /* |
858 | | * Read tuple in logical replication format from stream. |
859 | | */ |
860 | | static void |
861 | | logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) |
862 | 0 | { |
863 | 0 | int i; |
864 | 0 | int natts; |
865 | | |
866 | | /* Get number of attributes */ |
867 | 0 | natts = pq_getmsgint(in, 2); |
868 | | |
869 | | /* Allocate space for per-column values; zero out unused StringInfoDatas */ |
870 | 0 | tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); |
871 | 0 | tuple->colstatus = (char *) palloc(natts * sizeof(char)); |
872 | 0 | tuple->ncols = natts; |
873 | | |
874 | | /* Read the data */ |
875 | 0 | for (i = 0; i < natts; i++) |
876 | 0 | { |
877 | 0 | char *buff; |
878 | 0 | char kind; |
879 | 0 | int len; |
880 | 0 | StringInfo value = &tuple->colvalues[i]; |
881 | |
|
882 | 0 | kind = pq_getmsgbyte(in); |
883 | 0 | tuple->colstatus[i] = kind; |
884 | |
|
885 | 0 | switch (kind) |
886 | 0 | { |
887 | 0 | case LOGICALREP_COLUMN_NULL: |
888 | | /* nothing more to do */ |
889 | 0 | break; |
890 | 0 | case LOGICALREP_COLUMN_UNCHANGED: |
891 | | /* we don't receive the value of an unchanged column */ |
892 | 0 | break; |
893 | 0 | case LOGICALREP_COLUMN_TEXT: |
894 | 0 | case LOGICALREP_COLUMN_BINARY: |
895 | 0 | len = pq_getmsgint(in, 4); /* read length */ |
896 | | |
897 | | /* and data */ |
898 | 0 | buff = palloc(len + 1); |
899 | 0 | pq_copymsgbytes(in, buff, len); |
900 | | |
901 | | /* |
902 | | * NUL termination is required for LOGICALREP_COLUMN_TEXT mode |
903 | | * as input functions require that. For |
904 | | * LOGICALREP_COLUMN_BINARY it's not technically required, but |
905 | | * it's harmless. |
906 | | */ |
907 | 0 | buff[len] = '\0'; |
908 | |
|
909 | 0 | initStringInfoFromString(value, buff, len); |
910 | 0 | break; |
911 | 0 | default: |
912 | 0 | elog(ERROR, "unrecognized data representation type '%c'", kind); |
913 | 0 | } |
914 | 0 | } |
915 | 0 | } |
916 | | |
917 | | /* |
918 | | * Write relation attribute metadata to the stream. |
919 | | */ |
920 | | static void |
921 | | logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, |
922 | | PublishGencolsType include_gencols_type) |
923 | 0 | { |
924 | 0 | TupleDesc desc; |
925 | 0 | int i; |
926 | 0 | uint16 nliveatts = 0; |
927 | 0 | Bitmapset *idattrs = NULL; |
928 | 0 | bool replidentfull; |
929 | |
|
930 | 0 | desc = RelationGetDescr(rel); |
931 | | |
932 | | /* send number of live attributes */ |
933 | 0 | for (i = 0; i < desc->natts; i++) |
934 | 0 | { |
935 | 0 | Form_pg_attribute att = TupleDescAttr(desc, i); |
936 | |
|
937 | 0 | if (!logicalrep_should_publish_column(att, columns, |
938 | 0 | include_gencols_type)) |
939 | 0 | continue; |
940 | | |
941 | 0 | nliveatts++; |
942 | 0 | } |
943 | 0 | pq_sendint16(out, nliveatts); |
944 | | |
945 | | /* fetch bitmap of REPLICATION IDENTITY attributes */ |
946 | 0 | replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); |
947 | 0 | if (!replidentfull) |
948 | 0 | idattrs = RelationGetIdentityKeyBitmap(rel); |
949 | | |
950 | | /* send the attributes */ |
951 | 0 | for (i = 0; i < desc->natts; i++) |
952 | 0 | { |
953 | 0 | Form_pg_attribute att = TupleDescAttr(desc, i); |
954 | 0 | uint8 flags = 0; |
955 | |
|
956 | 0 | if (!logicalrep_should_publish_column(att, columns, |
957 | 0 | include_gencols_type)) |
958 | 0 | continue; |
959 | | |
960 | | /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ |
961 | 0 | if (replidentfull || |
962 | 0 | bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, |
963 | 0 | idattrs)) |
964 | 0 | flags |= LOGICALREP_IS_REPLICA_IDENTITY; |
965 | |
|
966 | 0 | pq_sendbyte(out, flags); |
967 | | |
968 | | /* attribute name */ |
969 | 0 | pq_sendstring(out, NameStr(att->attname)); |
970 | | |
971 | | /* attribute type id */ |
972 | 0 | pq_sendint32(out, (int) att->atttypid); |
973 | | |
974 | | /* attribute mode */ |
975 | 0 | pq_sendint32(out, att->atttypmod); |
976 | 0 | } |
977 | |
|
978 | 0 | bms_free(idattrs); |
979 | 0 | } |
980 | | |
981 | | /* |
982 | | * Read relation attribute metadata from the stream. |
983 | | */ |
984 | | static void |
985 | | logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) |
986 | 0 | { |
987 | 0 | int i; |
988 | 0 | int natts; |
989 | 0 | char **attnames; |
990 | 0 | Oid *atttyps; |
991 | 0 | Bitmapset *attkeys = NULL; |
992 | |
|
993 | 0 | natts = pq_getmsgint(in, 2); |
994 | 0 | attnames = palloc(natts * sizeof(char *)); |
995 | 0 | atttyps = palloc(natts * sizeof(Oid)); |
996 | | |
997 | | /* read the attributes */ |
998 | 0 | for (i = 0; i < natts; i++) |
999 | 0 | { |
1000 | 0 | uint8 flags; |
1001 | | |
1002 | | /* Check for replica identity column */ |
1003 | 0 | flags = pq_getmsgbyte(in); |
1004 | 0 | if (flags & LOGICALREP_IS_REPLICA_IDENTITY) |
1005 | 0 | attkeys = bms_add_member(attkeys, i); |
1006 | | |
1007 | | /* attribute name */ |
1008 | 0 | attnames[i] = pstrdup(pq_getmsgstring(in)); |
1009 | | |
1010 | | /* attribute type id */ |
1011 | 0 | atttyps[i] = (Oid) pq_getmsgint(in, 4); |
1012 | | |
1013 | | /* we ignore attribute mode for now */ |
1014 | 0 | (void) pq_getmsgint(in, 4); |
1015 | 0 | } |
1016 | |
|
1017 | 0 | rel->attnames = attnames; |
1018 | 0 | rel->atttyps = atttyps; |
1019 | 0 | rel->attkeys = attkeys; |
1020 | 0 | rel->natts = natts; |
1021 | 0 | } |
1022 | | |
1023 | | /* |
1024 | | * Write the namespace name or empty string for pg_catalog (to save space). |
1025 | | */ |
1026 | | static void |
1027 | | logicalrep_write_namespace(StringInfo out, Oid nspid) |
1028 | 0 | { |
1029 | 0 | if (nspid == PG_CATALOG_NAMESPACE) |
1030 | 0 | pq_sendbyte(out, '\0'); |
1031 | 0 | else |
1032 | 0 | { |
1033 | 0 | char *nspname = get_namespace_name(nspid); |
1034 | |
|
1035 | 0 | if (nspname == NULL) |
1036 | 0 | elog(ERROR, "cache lookup failed for namespace %u", |
1037 | 0 | nspid); |
1038 | | |
1039 | 0 | pq_sendstring(out, nspname); |
1040 | 0 | } |
1041 | 0 | } |
1042 | | |
1043 | | /* |
1044 | | * Read the namespace name while treating empty string as pg_catalog. |
1045 | | */ |
1046 | | static const char * |
1047 | | logicalrep_read_namespace(StringInfo in) |
1048 | 0 | { |
1049 | 0 | const char *nspname = pq_getmsgstring(in); |
1050 | |
|
1051 | 0 | if (nspname[0] == '\0') |
1052 | 0 | nspname = "pg_catalog"; |
1053 | |
|
1054 | 0 | return nspname; |
1055 | 0 | } |
1056 | | |
1057 | | /* |
1058 | | * Write the information for the start stream message to the output stream. |
1059 | | */ |
1060 | | void |
1061 | | logicalrep_write_stream_start(StringInfo out, |
1062 | | TransactionId xid, bool first_segment) |
1063 | 0 | { |
1064 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); |
1065 | |
|
1066 | 0 | Assert(TransactionIdIsValid(xid)); |
1067 | | |
1068 | | /* transaction ID (we're starting to stream, so must be valid) */ |
1069 | 0 | pq_sendint32(out, xid); |
1070 | | |
1071 | | /* 1 if this is the first streaming segment for this xid */ |
1072 | 0 | pq_sendbyte(out, first_segment ? 1 : 0); |
1073 | 0 | } |
1074 | | |
1075 | | /* |
1076 | | * Read the information about the start stream message from output stream. |
1077 | | */ |
1078 | | TransactionId |
1079 | | logicalrep_read_stream_start(StringInfo in, bool *first_segment) |
1080 | 0 | { |
1081 | 0 | TransactionId xid; |
1082 | |
|
1083 | 0 | Assert(first_segment); |
1084 | |
|
1085 | 0 | xid = pq_getmsgint(in, 4); |
1086 | 0 | *first_segment = (pq_getmsgbyte(in) == 1); |
1087 | |
|
1088 | 0 | return xid; |
1089 | 0 | } |
1090 | | |
1091 | | /* |
1092 | | * Write the stop stream message to the output stream. |
1093 | | */ |
1094 | | void |
1095 | | logicalrep_write_stream_stop(StringInfo out) |
1096 | 0 | { |
1097 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP); |
1098 | 0 | } |
1099 | | |
1100 | | /* |
1101 | | * Write STREAM COMMIT to the output stream. |
1102 | | */ |
1103 | | void |
1104 | | logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, |
1105 | | XLogRecPtr commit_lsn) |
1106 | 0 | { |
1107 | 0 | uint8 flags = 0; |
1108 | |
|
1109 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); |
1110 | |
|
1111 | 0 | Assert(TransactionIdIsValid(txn->xid)); |
1112 | | |
1113 | | /* transaction ID */ |
1114 | 0 | pq_sendint32(out, txn->xid); |
1115 | | |
1116 | | /* send the flags field (unused for now) */ |
1117 | 0 | pq_sendbyte(out, flags); |
1118 | | |
1119 | | /* send fields */ |
1120 | 0 | pq_sendint64(out, commit_lsn); |
1121 | 0 | pq_sendint64(out, txn->end_lsn); |
1122 | 0 | pq_sendint64(out, txn->xact_time.commit_time); |
1123 | 0 | } |
1124 | | |
1125 | | /* |
1126 | | * Read STREAM COMMIT from the output stream. |
1127 | | */ |
1128 | | TransactionId |
1129 | | logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) |
1130 | 0 | { |
1131 | 0 | TransactionId xid; |
1132 | 0 | uint8 flags; |
1133 | |
|
1134 | 0 | xid = pq_getmsgint(in, 4); |
1135 | | |
1136 | | /* read flags (unused for now) */ |
1137 | 0 | flags = pq_getmsgbyte(in); |
1138 | |
|
1139 | 0 | if (flags != 0) |
1140 | 0 | elog(ERROR, "unrecognized flags %u in commit message", flags); |
1141 | | |
1142 | | /* read fields */ |
1143 | 0 | commit_data->commit_lsn = pq_getmsgint64(in); |
1144 | 0 | commit_data->end_lsn = pq_getmsgint64(in); |
1145 | 0 | commit_data->committime = pq_getmsgint64(in); |
1146 | |
|
1147 | 0 | return xid; |
1148 | 0 | } |
1149 | | |
1150 | | /* |
1151 | | * Write STREAM ABORT to the output stream. Note that xid and subxid will be |
1152 | | * same for the top-level transaction abort. |
1153 | | * |
1154 | | * If write_abort_info is true, send the abort_lsn and abort_time fields, |
1155 | | * otherwise don't. |
1156 | | */ |
1157 | | void |
1158 | | logicalrep_write_stream_abort(StringInfo out, TransactionId xid, |
1159 | | TransactionId subxid, XLogRecPtr abort_lsn, |
1160 | | TimestampTz abort_time, bool write_abort_info) |
1161 | 0 | { |
1162 | 0 | pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); |
1163 | |
|
1164 | 0 | Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); |
1165 | | |
1166 | | /* transaction ID */ |
1167 | 0 | pq_sendint32(out, xid); |
1168 | 0 | pq_sendint32(out, subxid); |
1169 | |
|
1170 | 0 | if (write_abort_info) |
1171 | 0 | { |
1172 | 0 | pq_sendint64(out, abort_lsn); |
1173 | 0 | pq_sendint64(out, abort_time); |
1174 | 0 | } |
1175 | 0 | } |
1176 | | |
1177 | | /* |
1178 | | * Read STREAM ABORT from the output stream. |
1179 | | * |
1180 | | * If read_abort_info is true, read the abort_lsn and abort_time fields, |
1181 | | * otherwise don't. |
1182 | | */ |
1183 | | void |
1184 | | logicalrep_read_stream_abort(StringInfo in, |
1185 | | LogicalRepStreamAbortData *abort_data, |
1186 | | bool read_abort_info) |
1187 | 0 | { |
1188 | 0 | Assert(abort_data); |
1189 | |
|
1190 | 0 | abort_data->xid = pq_getmsgint(in, 4); |
1191 | 0 | abort_data->subxid = pq_getmsgint(in, 4); |
1192 | |
|
1193 | 0 | if (read_abort_info) |
1194 | 0 | { |
1195 | 0 | abort_data->abort_lsn = pq_getmsgint64(in); |
1196 | 0 | abort_data->abort_time = pq_getmsgint64(in); |
1197 | 0 | } |
1198 | 0 | else |
1199 | 0 | { |
1200 | 0 | abort_data->abort_lsn = InvalidXLogRecPtr; |
1201 | 0 | abort_data->abort_time = 0; |
1202 | 0 | } |
1203 | 0 | } |
1204 | | |
1205 | | /* |
1206 | | * Get string representing LogicalRepMsgType. |
1207 | | */ |
1208 | | const char * |
1209 | | logicalrep_message_type(LogicalRepMsgType action) |
1210 | 0 | { |
1211 | 0 | static char err_unknown[20]; |
1212 | |
|
1213 | 0 | switch (action) |
1214 | 0 | { |
1215 | 0 | case LOGICAL_REP_MSG_BEGIN: |
1216 | 0 | return "BEGIN"; |
1217 | 0 | case LOGICAL_REP_MSG_COMMIT: |
1218 | 0 | return "COMMIT"; |
1219 | 0 | case LOGICAL_REP_MSG_ORIGIN: |
1220 | 0 | return "ORIGIN"; |
1221 | 0 | case LOGICAL_REP_MSG_INSERT: |
1222 | 0 | return "INSERT"; |
1223 | 0 | case LOGICAL_REP_MSG_UPDATE: |
1224 | 0 | return "UPDATE"; |
1225 | 0 | case LOGICAL_REP_MSG_DELETE: |
1226 | 0 | return "DELETE"; |
1227 | 0 | case LOGICAL_REP_MSG_TRUNCATE: |
1228 | 0 | return "TRUNCATE"; |
1229 | 0 | case LOGICAL_REP_MSG_RELATION: |
1230 | 0 | return "RELATION"; |
1231 | 0 | case LOGICAL_REP_MSG_TYPE: |
1232 | 0 | return "TYPE"; |
1233 | 0 | case LOGICAL_REP_MSG_MESSAGE: |
1234 | 0 | return "MESSAGE"; |
1235 | 0 | case LOGICAL_REP_MSG_BEGIN_PREPARE: |
1236 | 0 | return "BEGIN PREPARE"; |
1237 | 0 | case LOGICAL_REP_MSG_PREPARE: |
1238 | 0 | return "PREPARE"; |
1239 | 0 | case LOGICAL_REP_MSG_COMMIT_PREPARED: |
1240 | 0 | return "COMMIT PREPARED"; |
1241 | 0 | case LOGICAL_REP_MSG_ROLLBACK_PREPARED: |
1242 | 0 | return "ROLLBACK PREPARED"; |
1243 | 0 | case LOGICAL_REP_MSG_STREAM_START: |
1244 | 0 | return "STREAM START"; |
1245 | 0 | case LOGICAL_REP_MSG_STREAM_STOP: |
1246 | 0 | return "STREAM STOP"; |
1247 | 0 | case LOGICAL_REP_MSG_STREAM_COMMIT: |
1248 | 0 | return "STREAM COMMIT"; |
1249 | 0 | case LOGICAL_REP_MSG_STREAM_ABORT: |
1250 | 0 | return "STREAM ABORT"; |
1251 | 0 | case LOGICAL_REP_MSG_STREAM_PREPARE: |
1252 | 0 | return "STREAM PREPARE"; |
1253 | 0 | } |
1254 | | |
1255 | | /* |
1256 | | * This message provides context in the error raised when applying a |
1257 | | * logical message. So we can't throw an error here. Return an unknown |
1258 | | * indicator value so that the original error is still reported. |
1259 | | */ |
1260 | 0 | snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action); |
1261 | |
|
1262 | 0 | return err_unknown; |
1263 | 0 | } |
1264 | | |
1265 | | /* |
1266 | | * Check if the column 'att' of a table should be published. |
1267 | | * |
1268 | | * 'columns' represents the publication column list (if any) for that table. |
1269 | | * |
1270 | | * 'include_gencols_type' value indicates whether generated columns should be |
1271 | | * published when there is no column list. Typically, this will have the same |
1272 | | * value as the 'publish_generated_columns' publication parameter. |
1273 | | * |
1274 | | * Note that generated columns can be published only when present in a |
1275 | | * publication column list, or when include_gencols_type is |
1276 | | * PUBLISH_GENCOLS_STORED. |
1277 | | */ |
1278 | | bool |
1279 | | logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, |
1280 | | PublishGencolsType include_gencols_type) |
1281 | 0 | { |
1282 | 0 | if (att->attisdropped) |
1283 | 0 | return false; |
1284 | | |
1285 | | /* If a column list is provided, publish only the cols in that list. */ |
1286 | 0 | if (columns) |
1287 | 0 | return bms_is_member(att->attnum, columns); |
1288 | | |
1289 | | /* All non-generated columns are always published. */ |
1290 | 0 | if (!att->attgenerated) |
1291 | 0 | return true; |
1292 | | |
1293 | | /* |
1294 | | * Stored generated columns are only published when the user sets |
1295 | | * publish_generated_columns as stored. |
1296 | | */ |
1297 | 0 | if (att->attgenerated == ATTRIBUTE_GENERATED_STORED) |
1298 | 0 | return include_gencols_type == PUBLISH_GENCOLS_STORED; |
1299 | | |
1300 | 0 | return false; |
1301 | 0 | } |