Coverage Report

Created: 2025-08-12 06:43

/src/postgres/src/backend/backup/basebackup_lz4.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * basebackup_lz4.c
4
 *    Basebackup sink implementing lz4 compression.
5
 *
6
 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7
 *
8
 * IDENTIFICATION
9
 *    src/backend/backup/basebackup_lz4.c
10
 *
11
 *-------------------------------------------------------------------------
12
 */
13
#include "postgres.h"
14
15
#ifdef USE_LZ4
16
#include <lz4frame.h>
17
#endif
18
19
#include "backup/basebackup_sink.h"
20
21
#ifdef USE_LZ4
22
23
typedef struct bbsink_lz4
24
{
25
  /* Common information for all types of sink. */
26
  bbsink    base;
27
28
  /* Compression level. */
29
  int     compresslevel;
30
31
  LZ4F_compressionContext_t ctx;
32
  LZ4F_preferences_t prefs;
33
34
  /* Number of bytes staged in output buffer. */
35
  size_t    bytes_written;
36
} bbsink_lz4;
37
38
static void bbsink_lz4_begin_backup(bbsink *sink);
39
static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
40
static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
41
static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
42
static void bbsink_lz4_end_archive(bbsink *sink);
43
static void bbsink_lz4_cleanup(bbsink *sink);
44
45
static const bbsink_ops bbsink_lz4_ops = {
46
  .begin_backup = bbsink_lz4_begin_backup,
47
  .begin_archive = bbsink_lz4_begin_archive,
48
  .archive_contents = bbsink_lz4_archive_contents,
49
  .end_archive = bbsink_lz4_end_archive,
50
  .begin_manifest = bbsink_forward_begin_manifest,
51
  .manifest_contents = bbsink_lz4_manifest_contents,
52
  .end_manifest = bbsink_forward_end_manifest,
53
  .end_backup = bbsink_forward_end_backup,
54
  .cleanup = bbsink_lz4_cleanup
55
};
56
#endif
57
58
/*
59
 * Create a new basebackup sink that performs lz4 compression.
60
 */
61
bbsink *
62
bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
63
0
{
64
0
#ifndef USE_LZ4
65
0
  ereport(ERROR,
66
0
      (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67
0
       errmsg("lz4 compression is not supported by this build")));
68
0
  return NULL;       /* keep compiler quiet */
69
#else
70
  bbsink_lz4 *sink;
71
  int     compresslevel;
72
73
  Assert(next != NULL);
74
75
  compresslevel = compress->level;
76
  Assert(compresslevel >= 0 && compresslevel <= 12);
77
78
  sink = palloc0(sizeof(bbsink_lz4));
79
  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
80
  sink->base.bbs_next = next;
81
  sink->compresslevel = compresslevel;
82
83
  return &sink->base;
84
#endif
85
0
}
86
87
#ifdef USE_LZ4
88
89
/*
90
 * Begin backup.
91
 */
92
static void
93
bbsink_lz4_begin_backup(bbsink *sink)
94
{
95
  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
96
  size_t    output_buffer_bound;
97
  LZ4F_preferences_t *prefs = &mysink->prefs;
98
99
  /* Initialize compressor object. */
100
  memset(prefs, 0, sizeof(LZ4F_preferences_t));
101
  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
102
  prefs->compressionLevel = mysink->compresslevel;
103
104
  /*
105
   * We need our own buffer, because we're going to pass different data to
106
   * the next sink than what gets passed to us.
107
   */
108
  mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
109
110
  /*
111
   * Since LZ4F_compressUpdate() requires the output buffer of size equal or
112
   * greater than that of LZ4F_compressBound(), make sure we have the next
113
   * sink's bbs_buffer of length that can accommodate the compressed input
114
   * buffer.
115
   */
116
  output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
117
                       &mysink->prefs);
118
119
  /*
120
   * The buffer length is expected to be a multiple of BLCKSZ, so round up.
121
   */
122
  output_buffer_bound = output_buffer_bound + BLCKSZ -
123
    (output_buffer_bound % BLCKSZ);
124
125
  bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
126
}
127
128
/*
129
 * Prepare to compress the next archive.
130
 */
131
static void
132
bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
133
{
134
  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
135
  char     *lz4_archive_name;
136
  LZ4F_errorCode_t ctxError;
137
  size_t    headerSize;
138
139
  ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
140
  if (LZ4F_isError(ctxError))
141
    elog(ERROR, "could not create lz4 compression context: %s",
142
       LZ4F_getErrorName(ctxError));
143
144
  /* First of all write the frame header to destination buffer. */
145
  headerSize = LZ4F_compressBegin(mysink->ctx,
146
                  mysink->base.bbs_next->bbs_buffer,
147
                  mysink->base.bbs_next->bbs_buffer_length,
148
                  &mysink->prefs);
149
150
  if (LZ4F_isError(headerSize))
151
    elog(ERROR, "could not write lz4 header: %s",
152
       LZ4F_getErrorName(headerSize));
153
154
  /*
155
   * We need to write the compressed data after the header in the output
156
   * buffer. So, make sure to update the notion of bytes written to output
157
   * buffer.
158
   */
159
  mysink->bytes_written += headerSize;
160
161
  /* Add ".lz4" to the archive name. */
162
  lz4_archive_name = psprintf("%s.lz4", archive_name);
163
  Assert(sink->bbs_next != NULL);
164
  bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
165
  pfree(lz4_archive_name);
166
}
167
168
/*
169
 * Compress the input data to the output buffer until we run out of input
170
 * data. Each time the output buffer falls below the compression bound for
171
 * the input buffer, invoke the archive_contents() method for then next sink.
172
 *
173
 * Note that since we're compressing the input, it may very commonly happen
174
 * that we consume all the input data without filling the output buffer. In
175
 * that case, the compressed representation of the current input data won't
176
 * actually be sent to the next bbsink until a later call to this function,
177
 * or perhaps even not until bbsink_lz4_end_archive() is invoked.
178
 */
179
static void
180
bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
181
{
182
  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
183
  size_t    compressedSize;
184
  size_t    avail_in_bound;
185
186
  avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
187
188
  /*
189
   * If the number of available bytes has fallen below the value computed by
190
   * LZ4F_compressBound(), ask the next sink to process the data so that we
191
   * can empty the buffer.
192
   */
193
  if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
194
    avail_in_bound)
195
  {
196
    bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
197
    mysink->bytes_written = 0;
198
  }
199
200
  /*
201
   * Compress the input buffer and write it into the output buffer.
202
   */
203
  compressedSize = LZ4F_compressUpdate(mysink->ctx,
204
                     mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
205
                     mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
206
                     (uint8 *) mysink->base.bbs_buffer,
207
                     avail_in,
208
                     NULL);
209
210
  if (LZ4F_isError(compressedSize))
211
    elog(ERROR, "could not compress data: %s",
212
       LZ4F_getErrorName(compressedSize));
213
214
  /*
215
   * Update our notion of how many bytes we've written into output buffer.
216
   */
217
  mysink->bytes_written += compressedSize;
218
}
219
220
/*
221
 * There might be some data inside lz4's internal buffers; we need to get
222
 * that flushed out and also finalize the lz4 frame and then get that forwarded
223
 * to the successor sink as archive content.
224
 *
225
 * Then we can end processing for this archive.
226
 */
227
static void
228
bbsink_lz4_end_archive(bbsink *sink)
229
{
230
  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
231
  size_t    compressedSize;
232
  size_t    lz4_footer_bound;
233
234
  lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
235
236
  Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
237
238
  if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
239
    lz4_footer_bound)
240
  {
241
    bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
242
    mysink->bytes_written = 0;
243
  }
244
245
  compressedSize = LZ4F_compressEnd(mysink->ctx,
246
                    mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
247
                    mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
248
                    NULL);
249
250
  if (LZ4F_isError(compressedSize))
251
    elog(ERROR, "could not end lz4 compression: %s",
252
       LZ4F_getErrorName(compressedSize));
253
254
  /* Update our notion of how many bytes we've written. */
255
  mysink->bytes_written += compressedSize;
256
257
  /* Send whatever accumulated output bytes we have. */
258
  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
259
  mysink->bytes_written = 0;
260
261
  /* Release the resources. */
262
  LZ4F_freeCompressionContext(mysink->ctx);
263
  mysink->ctx = NULL;
264
265
  /* Pass on the information that this archive has ended. */
266
  bbsink_forward_end_archive(sink);
267
}
268
269
/*
270
 * Manifest contents are not compressed, but we do need to copy them into
271
 * the successor sink's buffer, because we have our own.
272
 */
273
static void
274
bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
275
{
276
  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
277
  bbsink_manifest_contents(sink->bbs_next, len);
278
}
279
280
/*
281
 * In case the backup fails, make sure we free the compression context by
282
 * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
283
 */
284
static void
285
bbsink_lz4_cleanup(bbsink *sink)
286
{
287
  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
288
289
  if (mysink->ctx)
290
  {
291
    LZ4F_freeCompressionContext(mysink->ctx);
292
    mysink->ctx = NULL;
293
  }
294
}
295
296
#endif