/src/postgres/src/backend/backup/basebackup_lz4.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * basebackup_lz4.c |
4 | | * Basebackup sink implementing lz4 compression. |
5 | | * |
6 | | * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group |
7 | | * |
8 | | * IDENTIFICATION |
9 | | * src/backend/backup/basebackup_lz4.c |
10 | | * |
11 | | *------------------------------------------------------------------------- |
12 | | */ |
13 | | #include "postgres.h" |
14 | | |
15 | | #ifdef USE_LZ4 |
16 | | #include <lz4frame.h> |
17 | | #endif |
18 | | |
19 | | #include "backup/basebackup_sink.h" |
20 | | |
21 | | #ifdef USE_LZ4 |
22 | | |
23 | | typedef struct bbsink_lz4 |
24 | | { |
25 | | /* Common information for all types of sink. */ |
26 | | bbsink base; |
27 | | |
28 | | /* Compression level. */ |
29 | | int compresslevel; |
30 | | |
31 | | LZ4F_compressionContext_t ctx; |
32 | | LZ4F_preferences_t prefs; |
33 | | |
34 | | /* Number of bytes staged in output buffer. */ |
35 | | size_t bytes_written; |
36 | | } bbsink_lz4; |
37 | | |
38 | | static void bbsink_lz4_begin_backup(bbsink *sink); |
39 | | static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name); |
40 | | static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in); |
41 | | static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len); |
42 | | static void bbsink_lz4_end_archive(bbsink *sink); |
43 | | static void bbsink_lz4_cleanup(bbsink *sink); |
44 | | |
45 | | static const bbsink_ops bbsink_lz4_ops = { |
46 | | .begin_backup = bbsink_lz4_begin_backup, |
47 | | .begin_archive = bbsink_lz4_begin_archive, |
48 | | .archive_contents = bbsink_lz4_archive_contents, |
49 | | .end_archive = bbsink_lz4_end_archive, |
50 | | .begin_manifest = bbsink_forward_begin_manifest, |
51 | | .manifest_contents = bbsink_lz4_manifest_contents, |
52 | | .end_manifest = bbsink_forward_end_manifest, |
53 | | .end_backup = bbsink_forward_end_backup, |
54 | | .cleanup = bbsink_lz4_cleanup |
55 | | }; |
56 | | #endif |
57 | | |
58 | | /* |
59 | | * Create a new basebackup sink that performs lz4 compression. |
60 | | */ |
61 | | bbsink * |
62 | | bbsink_lz4_new(bbsink *next, pg_compress_specification *compress) |
63 | 0 | { |
64 | 0 | #ifndef USE_LZ4 |
65 | 0 | ereport(ERROR, |
66 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
67 | 0 | errmsg("lz4 compression is not supported by this build"))); |
68 | 0 | return NULL; /* keep compiler quiet */ |
69 | | #else |
70 | | bbsink_lz4 *sink; |
71 | | int compresslevel; |
72 | | |
73 | | Assert(next != NULL); |
74 | | |
75 | | compresslevel = compress->level; |
76 | | Assert(compresslevel >= 0 && compresslevel <= 12); |
77 | | |
78 | | sink = palloc0(sizeof(bbsink_lz4)); |
79 | | *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops; |
80 | | sink->base.bbs_next = next; |
81 | | sink->compresslevel = compresslevel; |
82 | | |
83 | | return &sink->base; |
84 | | #endif |
85 | 0 | } |
86 | | |
87 | | #ifdef USE_LZ4 |
88 | | |
89 | | /* |
90 | | * Begin backup. |
91 | | */ |
92 | | static void |
93 | | bbsink_lz4_begin_backup(bbsink *sink) |
94 | | { |
95 | | bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
96 | | size_t output_buffer_bound; |
97 | | LZ4F_preferences_t *prefs = &mysink->prefs; |
98 | | |
99 | | /* Initialize compressor object. */ |
100 | | memset(prefs, 0, sizeof(LZ4F_preferences_t)); |
101 | | prefs->frameInfo.blockSizeID = LZ4F_max256KB; |
102 | | prefs->compressionLevel = mysink->compresslevel; |
103 | | |
104 | | /* |
105 | | * We need our own buffer, because we're going to pass different data to |
106 | | * the next sink than what gets passed to us. |
107 | | */ |
108 | | mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length); |
109 | | |
110 | | /* |
111 | | * Since LZ4F_compressUpdate() requires the output buffer of size equal or |
112 | | * greater than that of LZ4F_compressBound(), make sure we have the next |
113 | | * sink's bbs_buffer of length that can accommodate the compressed input |
114 | | * buffer. |
115 | | */ |
116 | | output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length, |
117 | | &mysink->prefs); |
118 | | |
119 | | /* |
120 | | * The buffer length is expected to be a multiple of BLCKSZ, so round up. |
121 | | */ |
122 | | output_buffer_bound = output_buffer_bound + BLCKSZ - |
123 | | (output_buffer_bound % BLCKSZ); |
124 | | |
125 | | bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound); |
126 | | } |
127 | | |
128 | | /* |
129 | | * Prepare to compress the next archive. |
130 | | */ |
131 | | static void |
132 | | bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name) |
133 | | { |
134 | | bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
135 | | char *lz4_archive_name; |
136 | | LZ4F_errorCode_t ctxError; |
137 | | size_t headerSize; |
138 | | |
139 | | ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION); |
140 | | if (LZ4F_isError(ctxError)) |
141 | | elog(ERROR, "could not create lz4 compression context: %s", |
142 | | LZ4F_getErrorName(ctxError)); |
143 | | |
144 | | /* First of all write the frame header to destination buffer. */ |
145 | | headerSize = LZ4F_compressBegin(mysink->ctx, |
146 | | mysink->base.bbs_next->bbs_buffer, |
147 | | mysink->base.bbs_next->bbs_buffer_length, |
148 | | &mysink->prefs); |
149 | | |
150 | | if (LZ4F_isError(headerSize)) |
151 | | elog(ERROR, "could not write lz4 header: %s", |
152 | | LZ4F_getErrorName(headerSize)); |
153 | | |
154 | | /* |
155 | | * We need to write the compressed data after the header in the output |
156 | | * buffer. So, make sure to update the notion of bytes written to output |
157 | | * buffer. |
158 | | */ |
159 | | mysink->bytes_written += headerSize; |
160 | | |
161 | | /* Add ".lz4" to the archive name. */ |
162 | | lz4_archive_name = psprintf("%s.lz4", archive_name); |
163 | | Assert(sink->bbs_next != NULL); |
164 | | bbsink_begin_archive(sink->bbs_next, lz4_archive_name); |
165 | | pfree(lz4_archive_name); |
166 | | } |
167 | | |
168 | | /* |
169 | | * Compress the input data to the output buffer until we run out of input |
170 | | * data. Each time the output buffer falls below the compression bound for |
171 | | * the input buffer, invoke the archive_contents() method for then next sink. |
172 | | * |
173 | | * Note that since we're compressing the input, it may very commonly happen |
174 | | * that we consume all the input data without filling the output buffer. In |
175 | | * that case, the compressed representation of the current input data won't |
176 | | * actually be sent to the next bbsink until a later call to this function, |
177 | | * or perhaps even not until bbsink_lz4_end_archive() is invoked. |
178 | | */ |
179 | | static void |
180 | | bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in) |
181 | | { |
182 | | bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
183 | | size_t compressedSize; |
184 | | size_t avail_in_bound; |
185 | | |
186 | | avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs); |
187 | | |
188 | | /* |
189 | | * If the number of available bytes has fallen below the value computed by |
190 | | * LZ4F_compressBound(), ask the next sink to process the data so that we |
191 | | * can empty the buffer. |
192 | | */ |
193 | | if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) < |
194 | | avail_in_bound) |
195 | | { |
196 | | bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
197 | | mysink->bytes_written = 0; |
198 | | } |
199 | | |
200 | | /* |
201 | | * Compress the input buffer and write it into the output buffer. |
202 | | */ |
203 | | compressedSize = LZ4F_compressUpdate(mysink->ctx, |
204 | | mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, |
205 | | mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, |
206 | | (uint8 *) mysink->base.bbs_buffer, |
207 | | avail_in, |
208 | | NULL); |
209 | | |
210 | | if (LZ4F_isError(compressedSize)) |
211 | | elog(ERROR, "could not compress data: %s", |
212 | | LZ4F_getErrorName(compressedSize)); |
213 | | |
214 | | /* |
215 | | * Update our notion of how many bytes we've written into output buffer. |
216 | | */ |
217 | | mysink->bytes_written += compressedSize; |
218 | | } |
219 | | |
220 | | /* |
221 | | * There might be some data inside lz4's internal buffers; we need to get |
222 | | * that flushed out and also finalize the lz4 frame and then get that forwarded |
223 | | * to the successor sink as archive content. |
224 | | * |
225 | | * Then we can end processing for this archive. |
226 | | */ |
227 | | static void |
228 | | bbsink_lz4_end_archive(bbsink *sink) |
229 | | { |
230 | | bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
231 | | size_t compressedSize; |
232 | | size_t lz4_footer_bound; |
233 | | |
234 | | lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs); |
235 | | |
236 | | Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound); |
237 | | |
238 | | if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) < |
239 | | lz4_footer_bound) |
240 | | { |
241 | | bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
242 | | mysink->bytes_written = 0; |
243 | | } |
244 | | |
245 | | compressedSize = LZ4F_compressEnd(mysink->ctx, |
246 | | mysink->base.bbs_next->bbs_buffer + mysink->bytes_written, |
247 | | mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written, |
248 | | NULL); |
249 | | |
250 | | if (LZ4F_isError(compressedSize)) |
251 | | elog(ERROR, "could not end lz4 compression: %s", |
252 | | LZ4F_getErrorName(compressedSize)); |
253 | | |
254 | | /* Update our notion of how many bytes we've written. */ |
255 | | mysink->bytes_written += compressedSize; |
256 | | |
257 | | /* Send whatever accumulated output bytes we have. */ |
258 | | bbsink_archive_contents(sink->bbs_next, mysink->bytes_written); |
259 | | mysink->bytes_written = 0; |
260 | | |
261 | | /* Release the resources. */ |
262 | | LZ4F_freeCompressionContext(mysink->ctx); |
263 | | mysink->ctx = NULL; |
264 | | |
265 | | /* Pass on the information that this archive has ended. */ |
266 | | bbsink_forward_end_archive(sink); |
267 | | } |
268 | | |
269 | | /* |
270 | | * Manifest contents are not compressed, but we do need to copy them into |
271 | | * the successor sink's buffer, because we have our own. |
272 | | */ |
273 | | static void |
274 | | bbsink_lz4_manifest_contents(bbsink *sink, size_t len) |
275 | | { |
276 | | memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len); |
277 | | bbsink_manifest_contents(sink->bbs_next, len); |
278 | | } |
279 | | |
280 | | /* |
281 | | * In case the backup fails, make sure we free the compression context by |
282 | | * calling LZ4F_freeCompressionContext() if needed to avoid memory leak. |
283 | | */ |
284 | | static void |
285 | | bbsink_lz4_cleanup(bbsink *sink) |
286 | | { |
287 | | bbsink_lz4 *mysink = (bbsink_lz4 *) sink; |
288 | | |
289 | | if (mysink->ctx) |
290 | | { |
291 | | LZ4F_freeCompressionContext(mysink->ctx); |
292 | | mysink->ctx = NULL; |
293 | | } |
294 | | } |
295 | | |
296 | | #endif |