Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/include/replication/worker_internal.h
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * worker_internal.h
4
 *    Internal headers shared by logical replication workers.
5
 *
6
 * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
7
 *
8
 * src/include/replication/worker_internal.h
9
 *
10
 *-------------------------------------------------------------------------
11
 */
12
#ifndef WORKER_INTERNAL_H
13
#define WORKER_INTERNAL_H
14
15
#include "access/xlogdefs.h"
16
#include "catalog/pg_subscription.h"
17
#include "datatype/timestamp.h"
18
#include "miscadmin.h"
19
#include "replication/logicalrelation.h"
20
#include "replication/walreceiver.h"
21
#include "storage/buffile.h"
22
#include "storage/fileset.h"
23
#include "storage/lock.h"
24
#include "storage/shm_mq.h"
25
#include "storage/shm_toc.h"
26
#include "storage/spin.h"
27
28
/* Different types of worker */
29
typedef enum LogicalRepWorkerType
30
{
31
  WORKERTYPE_UNKNOWN = 0,
32
  WORKERTYPE_TABLESYNC,
33
  WORKERTYPE_APPLY,
34
  WORKERTYPE_PARALLEL_APPLY,
35
} LogicalRepWorkerType;
36
37
typedef struct LogicalRepWorker
38
{
39
  /* What type of worker is this? */
40
  LogicalRepWorkerType type;
41
42
  /* Time at which this worker was launched. */
43
  TimestampTz launch_time;
44
45
  /* Indicates if this slot is used or free. */
46
  bool    in_use;
47
48
  /* Increased every time the slot is taken by new worker. */
49
  uint16    generation;
50
51
  /* Pointer to proc array. NULL if not running. */
52
  PGPROC     *proc;
53
54
  /* Database id to connect to. */
55
  Oid     dbid;
56
57
  /* User to use for connection (will be same as owner of subscription). */
58
  Oid     userid;
59
60
  /* Subscription id for the worker. */
61
  Oid     subid;
62
63
  /* Used for initial table synchronization. */
64
  Oid     relid;
65
  char    relstate;
66
  XLogRecPtr  relstate_lsn;
67
  slock_t   relmutex;
68
69
  /*
70
   * Used to create the changes and subxact files for the streaming
71
   * transactions.  Upon the arrival of the first streaming transaction or
72
   * when the first-time leader apply worker times out while sending changes
73
   * to the parallel apply worker, the fileset will be initialized, and it
74
   * will be deleted when the worker exits.  Under this, separate buffiles
75
   * would be created for each transaction which will be deleted after the
76
   * transaction is finished.
77
   */
78
  FileSet    *stream_fileset;
79
80
  /*
81
   * PID of leader apply worker if this slot is used for a parallel apply
82
   * worker, InvalidPid otherwise.
83
   */
84
  pid_t   leader_pid;
85
86
  /* Indicates whether apply can be performed in parallel. */
87
  bool    parallel_apply;
88
89
  /*
90
   * Changes made by this transaction and subsequent ones must be preserved.
91
   * This ensures that update_deleted conflicts can be accurately detected
92
   * during the apply phase of logical replication by this worker.
93
   *
94
   * The logical replication launcher manages an internal replication slot
95
   * named "pg_conflict_detection". It asynchronously collects this ID to
96
   * decide when to advance the xmin value of the slot.
97
   *
98
   * This ID is set to InvalidTransactionId when the apply worker stops
99
   * retaining information needed for conflict detection.
100
   */
101
  TransactionId oldest_nonremovable_xid;
102
103
  /* Stats. */
104
  XLogRecPtr  last_lsn;
105
  TimestampTz last_send_time;
106
  TimestampTz last_recv_time;
107
  XLogRecPtr  reply_lsn;
108
  TimestampTz reply_time;
109
} LogicalRepWorker;
110
111
/*
112
 * State of the transaction in parallel apply worker.
113
 *
114
 * The enum values must have the same order as the transaction state
115
 * transitions.
116
 */
117
typedef enum ParallelTransState
118
{
119
  PARALLEL_TRANS_UNKNOWN,
120
  PARALLEL_TRANS_STARTED,
121
  PARALLEL_TRANS_FINISHED,
122
} ParallelTransState;
123
124
/*
125
 * State of fileset used to communicate changes from leader to parallel
126
 * apply worker.
127
 *
128
 * FS_EMPTY indicates an initial state where the leader doesn't need to use
129
 * the file to communicate with the parallel apply worker.
130
 *
131
 * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
132
 * to the file.
133
 *
134
 * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
135
 * the file.
136
 *
137
 * FS_READY indicates that it is now ok for a parallel apply worker to
138
 * read the file.
139
 */
140
typedef enum PartialFileSetState
141
{
142
  FS_EMPTY,
143
  FS_SERIALIZE_IN_PROGRESS,
144
  FS_SERIALIZE_DONE,
145
  FS_READY,
146
} PartialFileSetState;
147
148
/*
149
 * Struct for sharing information between leader apply worker and parallel
150
 * apply workers.
151
 */
152
typedef struct ParallelApplyWorkerShared
153
{
154
  slock_t   mutex;
155
156
  TransactionId xid;
157
158
  /*
159
   * State used to ensure commit ordering.
160
   *
161
   * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
162
   * handling the transaction finish commands while the apply leader will
163
   * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
164
   * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
165
   * STREAM_ABORT).
166
   */
167
  ParallelTransState xact_state;
168
169
  /* Information from the corresponding LogicalRepWorker slot. */
170
  uint16    logicalrep_worker_generation;
171
  int     logicalrep_worker_slot_no;
172
173
  /*
174
   * Indicates whether there are pending streaming blocks in the queue. The
175
   * parallel apply worker will check it before starting to wait.
176
   */
177
  pg_atomic_uint32 pending_stream_count;
178
179
  /*
180
   * XactLastCommitEnd from the parallel apply worker. This is required by
181
   * the leader worker so it can update the lsn_mappings.
182
   */
183
  XLogRecPtr  last_commit_end;
184
185
  /*
186
   * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
187
   * serialize changes to the file, and share the fileset with the parallel
188
   * apply worker when processing the transaction finish command. Then the
189
   * parallel apply worker will apply all the spooled messages.
190
   *
191
   * FileSet is used here instead of SharedFileSet because we need it to
192
   * survive after releasing the shared memory so that the leader apply
193
   * worker can re-use the same fileset for the next streaming transaction.
194
   */
195
  PartialFileSetState fileset_state;
196
  FileSet   fileset;
197
} ParallelApplyWorkerShared;
198
199
/*
200
 * Information which is used to manage the parallel apply worker.
201
 */
202
typedef struct ParallelApplyWorkerInfo
203
{
204
  /*
205
   * This queue is used to send changes from the leader apply worker to the
206
   * parallel apply worker.
207
   */
208
  shm_mq_handle *mq_handle;
209
210
  /*
211
   * This queue is used to transfer error messages from the parallel apply
212
   * worker to the leader apply worker.
213
   */
214
  shm_mq_handle *error_mq_handle;
215
216
  dsm_segment *dsm_seg;
217
218
  /*
219
   * Indicates whether the leader apply worker needs to serialize the
220
   * remaining changes to a file due to timeout when attempting to send data
221
   * to the parallel apply worker via shared memory.
222
   */
223
  bool    serialize_changes;
224
225
  /*
226
   * True if the worker is being used to process a parallel apply
227
   * transaction. False indicates this worker is available for re-use.
228
   */
229
  bool    in_use;
230
231
  ParallelApplyWorkerShared *shared;
232
} ParallelApplyWorkerInfo;
233
234
/* Main memory context for apply worker. Permanent during worker lifetime. */
235
extern PGDLLIMPORT MemoryContext ApplyContext;
236
237
extern PGDLLIMPORT MemoryContext ApplyMessageContext;
238
239
extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
240
241
extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
242
243
/* libpqreceiver connection */
244
extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
245
246
/* Worker and subscription objects. */
247
extern PGDLLIMPORT Subscription *MySubscription;
248
extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
249
250
extern PGDLLIMPORT bool in_remote_transaction;
251
252
extern PGDLLIMPORT bool InitializingApplyWorker;
253
254
extern void logicalrep_worker_attach(int slot);
255
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
256
                        bool only_running);
257
extern List *logicalrep_workers_find(Oid subid, bool only_running,
258
                   bool acquire_lock);
259
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
260
                   Oid dbid, Oid subid, const char *subname,
261
                   Oid userid, Oid relid,
262
                   dsm_handle subworker_dsm,
263
                   bool retain_dead_tuples);
264
extern void logicalrep_worker_stop(Oid subid, Oid relid);
265
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
266
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
267
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
268
269
extern int  logicalrep_sync_worker_count(Oid subid);
270
271
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
272
                         char *originname, Size szoriginname);
273
274
extern bool AllTablesyncsReady(void);
275
extern bool HasSubscriptionRelationsCached(void);
276
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
277
278
extern void process_syncing_tables(XLogRecPtr current_lsn);
279
extern void invalidate_syncing_table_states(Datum arg, int cacheid,
280
                      uint32 hashvalue);
281
282
extern void stream_start_internal(TransactionId xid, bool first_segment);
283
extern void stream_stop_internal(TransactionId xid);
284
285
/* Common streaming function to apply all the spooled messages */
286
extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
287
                   XLogRecPtr lsn);
288
289
extern void apply_dispatch(StringInfo s);
290
291
extern void maybe_reread_subscription(void);
292
293
extern void stream_cleanup_files(Oid subid, TransactionId xid);
294
295
extern void set_stream_options(WalRcvStreamOptions *options,
296
                 char *slotname,
297
                 XLogRecPtr *origin_startpos);
298
299
extern void start_apply(XLogRecPtr origin_startpos);
300
301
extern void InitializeLogRepWorker(void);
302
303
extern void SetupApplyOrSyncWorker(int worker_slot);
304
305
extern void DisableSubscriptionAndExit(void);
306
307
extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
308
309
/* Function for apply error callback */
310
extern void apply_error_callback(void *arg);
311
extern void set_apply_error_context_origin(char *originname);
312
313
/* Parallel apply worker setup and interactions */
314
extern void pa_allocate_worker(TransactionId xid);
315
extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
316
extern void pa_detach_all_error_mq(void);
317
318
extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
319
             const void *data);
320
extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
321
                       bool stream_locked);
322
323
extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
324
                ParallelTransState xact_state);
325
extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
326
327
extern void pa_start_subtrans(TransactionId current_xid,
328
                TransactionId top_xid);
329
extern void pa_reset_subtrans(void);
330
extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
331
extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
332
                 PartialFileSetState fileset_state);
333
334
extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
335
extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
336
337
extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
338
extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
339
340
extern void pa_decr_and_wait_stream_block(void);
341
342
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
343
               XLogRecPtr remote_lsn);
344
345
0
#define isParallelApplyWorker(worker) ((worker)->in_use && \
346
0
                     (worker)->type == WORKERTYPE_PARALLEL_APPLY)
347
0
#define isTablesyncWorker(worker) ((worker)->in_use && \
348
0
                   (worker)->type == WORKERTYPE_TABLESYNC)
349
350
static inline bool
351
am_tablesync_worker(void)
352
0
{
353
0
  return isTablesyncWorker(MyLogicalRepWorker);
354
0
}
Unexecuted instantiation: subscriptioncmds.c:am_tablesync_worker
Unexecuted instantiation: applyparallelworker.c:am_tablesync_worker
Unexecuted instantiation: conflict.c:am_tablesync_worker
Unexecuted instantiation: launcher.c:am_tablesync_worker
Unexecuted instantiation: relation.c:am_tablesync_worker
Unexecuted instantiation: tablesync.c:am_tablesync_worker
Unexecuted instantiation: worker.c:am_tablesync_worker
Unexecuted instantiation: pg_upgrade_support.c:am_tablesync_worker
355
356
static inline bool
357
am_leader_apply_worker(void)
358
0
{
359
0
  Assert(MyLogicalRepWorker->in_use);
360
0
  return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
361
0
}
Unexecuted instantiation: subscriptioncmds.c:am_leader_apply_worker
Unexecuted instantiation: applyparallelworker.c:am_leader_apply_worker
Unexecuted instantiation: conflict.c:am_leader_apply_worker
Unexecuted instantiation: launcher.c:am_leader_apply_worker
Unexecuted instantiation: relation.c:am_leader_apply_worker
Unexecuted instantiation: tablesync.c:am_leader_apply_worker
Unexecuted instantiation: worker.c:am_leader_apply_worker
Unexecuted instantiation: pg_upgrade_support.c:am_leader_apply_worker
362
363
static inline bool
364
am_parallel_apply_worker(void)
365
0
{
366
0
  Assert(MyLogicalRepWorker->in_use);
367
0
  return isParallelApplyWorker(MyLogicalRepWorker);
368
0
}
Unexecuted instantiation: subscriptioncmds.c:am_parallel_apply_worker
Unexecuted instantiation: applyparallelworker.c:am_parallel_apply_worker
Unexecuted instantiation: conflict.c:am_parallel_apply_worker
Unexecuted instantiation: launcher.c:am_parallel_apply_worker
Unexecuted instantiation: relation.c:am_parallel_apply_worker
Unexecuted instantiation: tablesync.c:am_parallel_apply_worker
Unexecuted instantiation: worker.c:am_parallel_apply_worker
Unexecuted instantiation: pg_upgrade_support.c:am_parallel_apply_worker
369
370
#endif              /* WORKER_INTERNAL_H */