/src/postgres/src/include/replication/worker_internal.h
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * worker_internal.h |
4 | | * Internal headers shared by logical replication workers. |
5 | | * |
6 | | * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group |
7 | | * |
8 | | * src/include/replication/worker_internal.h |
9 | | * |
10 | | *------------------------------------------------------------------------- |
11 | | */ |
12 | | #ifndef WORKER_INTERNAL_H |
13 | | #define WORKER_INTERNAL_H |
14 | | |
15 | | #include "access/xlogdefs.h" |
16 | | #include "catalog/pg_subscription.h" |
17 | | #include "datatype/timestamp.h" |
18 | | #include "miscadmin.h" |
19 | | #include "replication/logicalrelation.h" |
20 | | #include "replication/walreceiver.h" |
21 | | #include "storage/buffile.h" |
22 | | #include "storage/fileset.h" |
23 | | #include "storage/lock.h" |
24 | | #include "storage/shm_mq.h" |
25 | | #include "storage/shm_toc.h" |
26 | | #include "storage/spin.h" |
27 | | |
28 | | /* Different types of worker */ |
29 | | typedef enum LogicalRepWorkerType |
30 | | { |
31 | | WORKERTYPE_UNKNOWN = 0, |
32 | | WORKERTYPE_TABLESYNC, |
33 | | WORKERTYPE_APPLY, |
34 | | WORKERTYPE_PARALLEL_APPLY, |
35 | | } LogicalRepWorkerType; |
36 | | |
37 | | typedef struct LogicalRepWorker |
38 | | { |
39 | | /* What type of worker is this? */ |
40 | | LogicalRepWorkerType type; |
41 | | |
42 | | /* Time at which this worker was launched. */ |
43 | | TimestampTz launch_time; |
44 | | |
45 | | /* Indicates if this slot is used or free. */ |
46 | | bool in_use; |
47 | | |
48 | | /* Increased every time the slot is taken by new worker. */ |
49 | | uint16 generation; |
50 | | |
51 | | /* Pointer to proc array. NULL if not running. */ |
52 | | PGPROC *proc; |
53 | | |
54 | | /* Database id to connect to. */ |
55 | | Oid dbid; |
56 | | |
57 | | /* User to use for connection (will be same as owner of subscription). */ |
58 | | Oid userid; |
59 | | |
60 | | /* Subscription id for the worker. */ |
61 | | Oid subid; |
62 | | |
63 | | /* Used for initial table synchronization. */ |
64 | | Oid relid; |
65 | | char relstate; |
66 | | XLogRecPtr relstate_lsn; |
67 | | slock_t relmutex; |
68 | | |
69 | | /* |
70 | | * Used to create the changes and subxact files for the streaming |
71 | | * transactions. Upon the arrival of the first streaming transaction or |
72 | | * when the first-time leader apply worker times out while sending changes |
73 | | * to the parallel apply worker, the fileset will be initialized, and it |
74 | | * will be deleted when the worker exits. Under this, separate buffiles |
75 | | * would be created for each transaction which will be deleted after the |
76 | | * transaction is finished. |
77 | | */ |
78 | | FileSet *stream_fileset; |
79 | | |
80 | | /* |
81 | | * PID of leader apply worker if this slot is used for a parallel apply |
82 | | * worker, InvalidPid otherwise. |
83 | | */ |
84 | | pid_t leader_pid; |
85 | | |
86 | | /* Indicates whether apply can be performed in parallel. */ |
87 | | bool parallel_apply; |
88 | | |
89 | | /* |
90 | | * Changes made by this transaction and subsequent ones must be preserved. |
91 | | * This ensures that update_deleted conflicts can be accurately detected |
92 | | * during the apply phase of logical replication by this worker. |
93 | | * |
94 | | * The logical replication launcher manages an internal replication slot |
95 | | * named "pg_conflict_detection". It asynchronously collects this ID to |
96 | | * decide when to advance the xmin value of the slot. |
97 | | * |
98 | | * This ID is set to InvalidTransactionId when the apply worker stops |
99 | | * retaining information needed for conflict detection. |
100 | | */ |
101 | | TransactionId oldest_nonremovable_xid; |
102 | | |
103 | | /* Stats. */ |
104 | | XLogRecPtr last_lsn; |
105 | | TimestampTz last_send_time; |
106 | | TimestampTz last_recv_time; |
107 | | XLogRecPtr reply_lsn; |
108 | | TimestampTz reply_time; |
109 | | } LogicalRepWorker; |
110 | | |
111 | | /* |
112 | | * State of the transaction in parallel apply worker. |
113 | | * |
114 | | * The enum values must have the same order as the transaction state |
115 | | * transitions. |
116 | | */ |
117 | | typedef enum ParallelTransState |
118 | | { |
119 | | PARALLEL_TRANS_UNKNOWN, |
120 | | PARALLEL_TRANS_STARTED, |
121 | | PARALLEL_TRANS_FINISHED, |
122 | | } ParallelTransState; |
123 | | |
124 | | /* |
125 | | * State of fileset used to communicate changes from leader to parallel |
126 | | * apply worker. |
127 | | * |
128 | | * FS_EMPTY indicates an initial state where the leader doesn't need to use |
129 | | * the file to communicate with the parallel apply worker. |
130 | | * |
131 | | * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes |
132 | | * to the file. |
133 | | * |
134 | | * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to |
135 | | * the file. |
136 | | * |
137 | | * FS_READY indicates that it is now ok for a parallel apply worker to |
138 | | * read the file. |
139 | | */ |
140 | | typedef enum PartialFileSetState |
141 | | { |
142 | | FS_EMPTY, |
143 | | FS_SERIALIZE_IN_PROGRESS, |
144 | | FS_SERIALIZE_DONE, |
145 | | FS_READY, |
146 | | } PartialFileSetState; |
147 | | |
148 | | /* |
149 | | * Struct for sharing information between leader apply worker and parallel |
150 | | * apply workers. |
151 | | */ |
152 | | typedef struct ParallelApplyWorkerShared |
153 | | { |
154 | | slock_t mutex; |
155 | | |
156 | | TransactionId xid; |
157 | | |
158 | | /* |
159 | | * State used to ensure commit ordering. |
160 | | * |
161 | | * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after |
162 | | * handling the transaction finish commands while the apply leader will |
163 | | * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in |
164 | | * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/ |
165 | | * STREAM_ABORT). |
166 | | */ |
167 | | ParallelTransState xact_state; |
168 | | |
169 | | /* Information from the corresponding LogicalRepWorker slot. */ |
170 | | uint16 logicalrep_worker_generation; |
171 | | int logicalrep_worker_slot_no; |
172 | | |
173 | | /* |
174 | | * Indicates whether there are pending streaming blocks in the queue. The |
175 | | * parallel apply worker will check it before starting to wait. |
176 | | */ |
177 | | pg_atomic_uint32 pending_stream_count; |
178 | | |
179 | | /* |
180 | | * XactLastCommitEnd from the parallel apply worker. This is required by |
181 | | * the leader worker so it can update the lsn_mappings. |
182 | | */ |
183 | | XLogRecPtr last_commit_end; |
184 | | |
185 | | /* |
186 | | * After entering PARTIAL_SERIALIZE mode, the leader apply worker will |
187 | | * serialize changes to the file, and share the fileset with the parallel |
188 | | * apply worker when processing the transaction finish command. Then the |
189 | | * parallel apply worker will apply all the spooled messages. |
190 | | * |
191 | | * FileSet is used here instead of SharedFileSet because we need it to |
192 | | * survive after releasing the shared memory so that the leader apply |
193 | | * worker can re-use the same fileset for the next streaming transaction. |
194 | | */ |
195 | | PartialFileSetState fileset_state; |
196 | | FileSet fileset; |
197 | | } ParallelApplyWorkerShared; |
198 | | |
199 | | /* |
200 | | * Information which is used to manage the parallel apply worker. |
201 | | */ |
202 | | typedef struct ParallelApplyWorkerInfo |
203 | | { |
204 | | /* |
205 | | * This queue is used to send changes from the leader apply worker to the |
206 | | * parallel apply worker. |
207 | | */ |
208 | | shm_mq_handle *mq_handle; |
209 | | |
210 | | /* |
211 | | * This queue is used to transfer error messages from the parallel apply |
212 | | * worker to the leader apply worker. |
213 | | */ |
214 | | shm_mq_handle *error_mq_handle; |
215 | | |
216 | | dsm_segment *dsm_seg; |
217 | | |
218 | | /* |
219 | | * Indicates whether the leader apply worker needs to serialize the |
220 | | * remaining changes to a file due to timeout when attempting to send data |
221 | | * to the parallel apply worker via shared memory. |
222 | | */ |
223 | | bool serialize_changes; |
224 | | |
225 | | /* |
226 | | * True if the worker is being used to process a parallel apply |
227 | | * transaction. False indicates this worker is available for re-use. |
228 | | */ |
229 | | bool in_use; |
230 | | |
231 | | ParallelApplyWorkerShared *shared; |
232 | | } ParallelApplyWorkerInfo; |
233 | | |
234 | | /* Main memory context for apply worker. Permanent during worker lifetime. */ |
235 | | extern PGDLLIMPORT MemoryContext ApplyContext; |
236 | | |
237 | | extern PGDLLIMPORT MemoryContext ApplyMessageContext; |
238 | | |
239 | | extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack; |
240 | | |
241 | | extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared; |
242 | | |
243 | | /* libpqreceiver connection */ |
244 | | extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; |
245 | | |
246 | | /* Worker and subscription objects. */ |
247 | | extern PGDLLIMPORT Subscription *MySubscription; |
248 | | extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; |
249 | | |
250 | | extern PGDLLIMPORT bool in_remote_transaction; |
251 | | |
252 | | extern PGDLLIMPORT bool InitializingApplyWorker; |
253 | | |
254 | | extern void logicalrep_worker_attach(int slot); |
255 | | extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, |
256 | | bool only_running); |
257 | | extern List *logicalrep_workers_find(Oid subid, bool only_running, |
258 | | bool acquire_lock); |
259 | | extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, |
260 | | Oid dbid, Oid subid, const char *subname, |
261 | | Oid userid, Oid relid, |
262 | | dsm_handle subworker_dsm, |
263 | | bool retain_dead_tuples); |
264 | | extern void logicalrep_worker_stop(Oid subid, Oid relid); |
265 | | extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); |
266 | | extern void logicalrep_worker_wakeup(Oid subid, Oid relid); |
267 | | extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); |
268 | | |
269 | | extern int logicalrep_sync_worker_count(Oid subid); |
270 | | |
271 | | extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, |
272 | | char *originname, Size szoriginname); |
273 | | |
274 | | extern bool AllTablesyncsReady(void); |
275 | | extern bool HasSubscriptionRelationsCached(void); |
276 | | extern void UpdateTwoPhaseState(Oid suboid, char new_state); |
277 | | |
278 | | extern void process_syncing_tables(XLogRecPtr current_lsn); |
279 | | extern void invalidate_syncing_table_states(Datum arg, int cacheid, |
280 | | uint32 hashvalue); |
281 | | |
282 | | extern void stream_start_internal(TransactionId xid, bool first_segment); |
283 | | extern void stream_stop_internal(TransactionId xid); |
284 | | |
285 | | /* Common streaming function to apply all the spooled messages */ |
286 | | extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, |
287 | | XLogRecPtr lsn); |
288 | | |
289 | | extern void apply_dispatch(StringInfo s); |
290 | | |
291 | | extern void maybe_reread_subscription(void); |
292 | | |
293 | | extern void stream_cleanup_files(Oid subid, TransactionId xid); |
294 | | |
295 | | extern void set_stream_options(WalRcvStreamOptions *options, |
296 | | char *slotname, |
297 | | XLogRecPtr *origin_startpos); |
298 | | |
299 | | extern void start_apply(XLogRecPtr origin_startpos); |
300 | | |
301 | | extern void InitializeLogRepWorker(void); |
302 | | |
303 | | extern void SetupApplyOrSyncWorker(int worker_slot); |
304 | | |
305 | | extern void DisableSubscriptionAndExit(void); |
306 | | |
307 | | extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); |
308 | | |
309 | | /* Function for apply error callback */ |
310 | | extern void apply_error_callback(void *arg); |
311 | | extern void set_apply_error_context_origin(char *originname); |
312 | | |
313 | | /* Parallel apply worker setup and interactions */ |
314 | | extern void pa_allocate_worker(TransactionId xid); |
315 | | extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); |
316 | | extern void pa_detach_all_error_mq(void); |
317 | | |
318 | | extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, |
319 | | const void *data); |
320 | | extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, |
321 | | bool stream_locked); |
322 | | |
323 | | extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, |
324 | | ParallelTransState xact_state); |
325 | | extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); |
326 | | |
327 | | extern void pa_start_subtrans(TransactionId current_xid, |
328 | | TransactionId top_xid); |
329 | | extern void pa_reset_subtrans(void); |
330 | | extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); |
331 | | extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, |
332 | | PartialFileSetState fileset_state); |
333 | | |
334 | | extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); |
335 | | extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); |
336 | | |
337 | | extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode); |
338 | | extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode); |
339 | | |
340 | | extern void pa_decr_and_wait_stream_block(void); |
341 | | |
342 | | extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, |
343 | | XLogRecPtr remote_lsn); |
344 | | |
345 | 0 | #define isParallelApplyWorker(worker) ((worker)->in_use && \ |
346 | 0 | (worker)->type == WORKERTYPE_PARALLEL_APPLY) |
347 | 0 | #define isTablesyncWorker(worker) ((worker)->in_use && \ |
348 | 0 | (worker)->type == WORKERTYPE_TABLESYNC) |
349 | | |
350 | | static inline bool |
351 | | am_tablesync_worker(void) |
352 | 0 | { |
353 | 0 | return isTablesyncWorker(MyLogicalRepWorker); |
354 | 0 | } Unexecuted instantiation: subscriptioncmds.c:am_tablesync_worker Unexecuted instantiation: applyparallelworker.c:am_tablesync_worker Unexecuted instantiation: conflict.c:am_tablesync_worker Unexecuted instantiation: launcher.c:am_tablesync_worker Unexecuted instantiation: relation.c:am_tablesync_worker Unexecuted instantiation: tablesync.c:am_tablesync_worker Unexecuted instantiation: worker.c:am_tablesync_worker Unexecuted instantiation: pg_upgrade_support.c:am_tablesync_worker |
355 | | |
356 | | static inline bool |
357 | | am_leader_apply_worker(void) |
358 | 0 | { |
359 | 0 | Assert(MyLogicalRepWorker->in_use); |
360 | 0 | return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); |
361 | 0 | } Unexecuted instantiation: subscriptioncmds.c:am_leader_apply_worker Unexecuted instantiation: applyparallelworker.c:am_leader_apply_worker Unexecuted instantiation: conflict.c:am_leader_apply_worker Unexecuted instantiation: launcher.c:am_leader_apply_worker Unexecuted instantiation: relation.c:am_leader_apply_worker Unexecuted instantiation: tablesync.c:am_leader_apply_worker Unexecuted instantiation: worker.c:am_leader_apply_worker Unexecuted instantiation: pg_upgrade_support.c:am_leader_apply_worker |
362 | | |
363 | | static inline bool |
364 | | am_parallel_apply_worker(void) |
365 | 0 | { |
366 | 0 | Assert(MyLogicalRepWorker->in_use); |
367 | 0 | return isParallelApplyWorker(MyLogicalRepWorker); |
368 | 0 | } Unexecuted instantiation: subscriptioncmds.c:am_parallel_apply_worker Unexecuted instantiation: applyparallelworker.c:am_parallel_apply_worker Unexecuted instantiation: conflict.c:am_parallel_apply_worker Unexecuted instantiation: launcher.c:am_parallel_apply_worker Unexecuted instantiation: relation.c:am_parallel_apply_worker Unexecuted instantiation: tablesync.c:am_parallel_apply_worker Unexecuted instantiation: worker.c:am_parallel_apply_worker Unexecuted instantiation: pg_upgrade_support.c:am_parallel_apply_worker |
369 | | |
370 | | #endif /* WORKER_INTERNAL_H */ |