Coverage Report

Created: 2025-12-14 06:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/dovecot/src/lib/istream-seekable.c
Line
Count
Source
1
/* Copyright (c) 2005-2018 Dovecot authors, see the included COPYING file */
2
3
#include "lib.h"
4
#include "buffer.h"
5
#include "str.h"
6
#include "memarea.h"
7
#include "read-full.h"
8
#include "write-full.h"
9
#include "safe-mkstemp.h"
10
#include "istream-private.h"
11
#include "istream-concat.h"
12
#include "istream-seekable.h"
13
14
#include <unistd.h>
15
16
0
#define MAX_MEMORY_FALLBACK_SIZE (1024*1024*10)
17
18
struct seekable_istream {
19
  struct istream_private istream;
20
21
  char *temp_path, *write_failed_temp_path;
22
  uoff_t write_peak;
23
  int write_failed_errno;
24
  uoff_t size;
25
  size_t buffer_peak;
26
27
  int (*fd_callback)(const char **path_r, void *context);
28
  void *context;
29
30
  struct istream **input, *cur_input;
31
  struct istream *fd_input;
32
  unsigned int cur_idx;
33
  int fd;
34
  bool free_context;
35
};
36
37
static void i_stream_seekable_close(struct iostream_private *stream,
38
            bool close_parent ATTR_UNUSED)
39
0
{
40
0
  struct seekable_istream *sstream =
41
0
    container_of(stream, struct seekable_istream, istream.iostream);
42
43
0
  sstream->fd = -1;
44
0
  i_stream_close(sstream->fd_input);
45
0
}
46
47
static void unref_streams(struct seekable_istream *sstream)
48
0
{
49
0
  unsigned int i;
50
51
0
  for (i = 0; sstream->input[i] != NULL; i++)
52
0
    i_stream_unref(&sstream->input[i]);
53
0
}
54
55
static void i_stream_seekable_destroy(struct iostream_private *stream)
56
0
{
57
0
  struct seekable_istream *sstream =
58
0
    container_of(stream, struct seekable_istream, istream.iostream);
59
60
0
  i_stream_free_buffer(&sstream->istream);
61
0
  i_stream_unref(&sstream->fd_input);
62
0
  unref_streams(sstream);
63
64
0
  if (sstream->free_context)
65
0
    i_free(sstream->context);
66
0
  i_free(sstream->temp_path);
67
0
  i_free(sstream->write_failed_temp_path);
68
0
  i_free(sstream->input);
69
0
}
70
71
static void
72
i_stream_seekable_set_max_buffer_size(struct iostream_private *stream,
73
              size_t max_size)
74
0
{
75
0
  struct seekable_istream *sstream =
76
0
    container_of(stream, struct seekable_istream, istream.iostream);
77
0
  unsigned int i;
78
79
0
  sstream->istream.max_buffer_size = max_size;
80
0
  if (sstream->fd_input != NULL)
81
0
    i_stream_set_max_buffer_size(sstream->fd_input, max_size);
82
0
  for (i = 0; sstream->input[i] != NULL; i++)
83
0
    i_stream_set_max_buffer_size(sstream->input[i], max_size);
84
0
}
85
86
static int copy_to_temp_file(struct seekable_istream *sstream)
87
0
{
88
0
  struct istream_private *stream = &sstream->istream;
89
0
  const char *path;
90
0
  const unsigned char *buffer;
91
0
  size_t size;
92
0
  int fd;
93
94
0
  fd = sstream->fd_callback(&path, sstream->context);
95
0
  if (fd == -1)
96
0
    return -1;
97
98
  /* copy our currently read buffer to it */
99
0
  i_assert(stream->pos <= sstream->buffer_peak);
100
0
  if (write_full(fd, stream->buffer, sstream->buffer_peak) < 0) {
101
0
    if (!ENOSPACE(errno))
102
0
      i_error("istream-seekable: write_full(%s) failed: %m", path);
103
0
    i_close_fd(&fd);
104
0
    return -1;
105
0
  }
106
0
  sstream->temp_path = i_strdup(path);
107
0
  sstream->write_peak = sstream->buffer_peak;
108
109
0
  sstream->fd = fd;
110
0
  sstream->fd_input = i_stream_create_fd_autoclose(&fd,
111
0
    I_MAX(stream->pos, sstream->istream.max_buffer_size));
112
0
  i_stream_set_name(sstream->fd_input, t_strdup_printf(
113
0
    "(seekable temp-istream for: %s)", i_stream_get_name(&stream->istream)));
114
115
  /* read back the data we just had in our buffer */
116
0
  for (;;) {
117
0
    buffer = i_stream_get_data(sstream->fd_input, &size);
118
0
    if (size >= stream->pos)
119
0
      break;
120
121
0
    ssize_t ret;
122
0
    if ((ret = i_stream_read_memarea(sstream->fd_input)) <= 0) {
123
0
      i_assert(ret != 0);
124
0
      i_assert(ret != -2);
125
0
      i_error("istream-seekable: Couldn't read back "
126
0
        "in-memory input %s: %s",
127
0
        i_stream_get_name(&stream->istream),
128
0
        i_stream_get_error(sstream->fd_input));
129
0
      i_stream_destroy(&sstream->fd_input);
130
0
      sstream->fd = -1; /* autoclosed by fd_input */
131
0
      return -1;
132
0
    }
133
0
  }
134
  /* Set the max buffer size only after we've already read everything
135
     into memory. For example with istream-data it's possible that
136
     more data exists in buffer than max_buffer_size. */
137
0
  i_stream_set_max_buffer_size(sstream->fd_input,
138
0
             sstream->istream.max_buffer_size);
139
0
  stream->buffer = buffer;
140
0
  i_stream_free_buffer(&sstream->istream);
141
0
  return 0;
142
0
}
143
144
static ssize_t read_more(struct seekable_istream *sstream)
145
0
{
146
0
  size_t size;
147
0
  ssize_t ret;
148
149
0
  if (sstream->cur_input == NULL) {
150
0
    sstream->istream.istream.eof = TRUE;
151
0
    return -1;
152
0
  }
153
154
0
  while ((ret = i_stream_read_memarea(sstream->cur_input)) == -1) {
155
0
    if (sstream->cur_input->stream_errno != 0) {
156
0
      io_stream_set_error(&sstream->istream.iostream,
157
0
        "read(%s) failed: %s",
158
0
        i_stream_get_name(sstream->cur_input),
159
0
        i_stream_get_error(sstream->cur_input));
160
0
      sstream->istream.istream.eof = TRUE;
161
0
      sstream->istream.istream.stream_errno =
162
0
        sstream->cur_input->stream_errno;
163
0
      return -1;
164
0
    }
165
166
    /* go to next stream */
167
0
    sstream->cur_input = sstream->input[sstream->cur_idx++];
168
0
    if (sstream->cur_input == NULL) {
169
      /* last one, EOF */
170
0
      sstream->size = sstream->istream.istream.v_offset +
171
0
        (sstream->istream.pos - sstream->istream.skip);
172
0
      sstream->istream.istream.eof = TRUE;
173
      /* Now that EOF is reached, the stream can't return 0
174
         anymore. Callers can now use this stream in places
175
         that assert that blocking==TRUE. */
176
0
      sstream->istream.istream.blocking = TRUE;
177
0
      unref_streams(sstream);
178
0
      return -1;
179
0
    }
180
181
    /* see if stream has pending data */
182
0
    size = i_stream_get_data_size(sstream->cur_input);
183
0
    if (size != 0)
184
0
      return size;
185
0
  }
186
0
  return ret;
187
0
}
188
189
static bool read_from_buffer(struct seekable_istream *sstream, ssize_t *ret_r)
190
0
{
191
0
  struct istream_private *stream = &sstream->istream;
192
0
  const unsigned char *data;
193
0
  size_t size, avail_size;
194
195
0
  if (stream->pos < sstream->buffer_peak) {
196
    /* This could be the first read() or we could have already
197
       seeked backwards. */
198
0
    i_assert(stream->pos == 0 && stream->skip == 0);
199
0
    i_assert(sstream->buffer_peak >= stream->istream.v_offset);
200
0
    stream->skip = stream->istream.v_offset;
201
0
    stream->pos = sstream->buffer_peak;
202
0
    size = stream->pos - stream->skip;
203
0
    if (stream->istream.v_offset == sstream->buffer_peak) {
204
      /* this could happen after write to temp file failed */
205
0
      return read_from_buffer(sstream, ret_r);
206
0
    }
207
0
  } else {
208
    /* need to read more */
209
0
    i_assert(stream->pos == sstream->buffer_peak);
210
0
    size = sstream->cur_input == NULL ? 0 :
211
0
      i_stream_get_data_size(sstream->cur_input);
212
0
    if (size == 0) {
213
      /* read more to buffer */
214
0
      *ret_r = read_more(sstream);
215
0
      if (*ret_r == 0 || *ret_r == -1)
216
0
        return TRUE;
217
0
    }
218
219
    /* we should have more now. */
220
0
    data = i_stream_get_data(sstream->cur_input, &size);
221
0
    i_assert(size > 0);
222
223
    /* change skip to 0 temporarily so i_stream_try_alloc() won't try to
224
       compress the buffer. */
225
0
    size_t old_skip = stream->skip;
226
0
    stream->skip = 0;
227
0
    bool have_space = i_stream_try_alloc(stream, size, &avail_size);
228
0
    stream->skip = old_skip;
229
0
    if (!have_space)
230
0
      return FALSE;
231
232
0
    if (size > avail_size)
233
0
      size = avail_size;
234
0
    memcpy(stream->w_buffer + stream->pos, data, size);
235
0
    stream->pos += size;
236
0
    sstream->buffer_peak += size;
237
0
    i_stream_skip(sstream->cur_input, size);
238
0
  }
239
240
0
  *ret_r = size;
241
0
  i_assert(*ret_r > 0);
242
0
  return TRUE;
243
0
}
244
245
static int i_stream_seekable_write_failed(struct seekable_istream *sstream)
246
0
{
247
0
  struct istream_private *stream = &sstream->istream;
248
0
  void *data;
249
0
  size_t old_pos = stream->pos;
250
0
  int write_errno = errno;
251
252
0
  i_assert(sstream->fd != -1);
253
0
  i_assert(stream->skip == 0);
254
255
0
  stream->max_buffer_size = MAX_MEMORY_FALLBACK_SIZE;
256
0
  stream->pos = 0;
257
0
  data = i_stream_alloc(stream, sstream->write_peak);
258
0
  stream->pos = old_pos;
259
260
0
  if (pread_full(sstream->fd, data, sstream->write_peak, 0) < 0) {
261
0
    sstream->istream.istream.stream_errno = errno;
262
0
    sstream->istream.istream.eof = TRUE;
263
0
    io_stream_set_error(&sstream->istream.iostream,
264
0
            "istream-seekable: write(%s) failed: %s, and attempt to read() it back failed: %m",
265
0
            sstream->temp_path, strerror(write_errno));
266
0
    return -1;
267
0
  }
268
0
  sstream->buffer_peak = sstream->write_peak;
269
0
  i_stream_destroy(&sstream->fd_input);
270
0
  sstream->fd = -1; /* autoclosed by fd_input */
271
272
0
  i_free_and_null(sstream->temp_path);
273
0
  return 0;
274
0
}
275
276
static ssize_t i_stream_seekable_read(struct istream_private *stream)
277
0
{
278
0
  struct seekable_istream *sstream =
279
0
    container_of(stream, struct seekable_istream, istream);
280
0
  const unsigned char *data;
281
0
  size_t size, pos;
282
0
  ssize_t ret;
283
284
0
  if (sstream->fd == -1) {
285
0
    if (read_from_buffer(sstream, &ret))
286
0
      return ret;
287
288
0
    if (sstream->write_failed_errno != 0) {
289
0
      errno = sstream->write_failed_errno;
290
0
      sstream->istream.istream.stream_errno = errno;
291
0
      sstream->istream.istream.eof = TRUE;
292
0
      io_stream_set_error(&sstream->istream.iostream,
293
0
        "istream-seekable: write(%s) failed: %m - "
294
0
        "stream is too large for memory",
295
0
        sstream->write_failed_temp_path);
296
0
      return -1;
297
0
    }
298
299
    /* copy everything to temp file and use it as the stream */
300
0
    if (copy_to_temp_file(sstream) < 0) {
301
0
      stream->max_buffer_size = SIZE_MAX;
302
0
      if (!read_from_buffer(sstream, &ret))
303
0
        i_unreached();
304
0
      return ret;
305
0
    }
306
0
    i_assert(sstream->fd != -1);
307
0
  }
308
309
0
  stream->buffer = CONST_PTR_OFFSET(stream->buffer, stream->skip);
310
0
  stream->pos -= stream->skip;
311
0
  stream->skip = 0;
312
313
0
  i_assert(stream->istream.v_offset + stream->pos <= sstream->write_peak);
314
0
  if (stream->istream.v_offset + stream->pos == sstream->write_peak) {
315
    /* need to read more */
316
0
    if (sstream->cur_input == NULL ||
317
0
        i_stream_get_data_size(sstream->cur_input) == 0) {
318
0
      ret = read_more(sstream);
319
0
      if (ret == -1 || ret == 0)
320
0
        return ret;
321
0
    }
322
323
    /* save to our file */
324
0
    data = i_stream_get_data(sstream->cur_input, &size);
325
0
    i_assert(size > 0);
326
0
    ret = write(sstream->fd, data, size);
327
0
    i_assert(ret != 0);
328
0
    if (ret < 0) {
329
0
      if (sstream->write_peak + size >= MAX_MEMORY_FALLBACK_SIZE) {
330
0
        sstream->istream.istream.stream_errno = errno;
331
0
        sstream->istream.istream.eof = TRUE;
332
0
        io_stream_set_error(&sstream->istream.iostream,
333
0
          "istream-seekable: write(%s) failed: %m",
334
0
          sstream->temp_path);
335
0
        return -1;
336
0
      }
337
338
0
      sstream->write_failed_errno = errno;
339
0
      sstream->write_failed_temp_path =
340
0
        i_strdup(sstream->temp_path);
341
0
      if (i_stream_seekable_write_failed(sstream) < 0)
342
0
        return -1;
343
0
      if (!read_from_buffer(sstream, &ret))
344
0
        i_unreached();
345
346
0
      errno = sstream->write_failed_errno;
347
0
      i_warning("istream-seekable: write_full(%s) failed: %m - "
348
0
          "fallback to using only memory",
349
0
          sstream->write_failed_temp_path);
350
0
      return ret;
351
0
    }
352
0
    i_stream_sync(sstream->fd_input);
353
0
    i_stream_skip(sstream->cur_input, ret);
354
0
    sstream->write_peak += ret;
355
0
  }
356
357
0
  i_stream_seek(sstream->fd_input, stream->istream.v_offset);
358
0
  ret = i_stream_read_memarea(sstream->fd_input);
359
0
  if (ret <= 0) {
360
0
    stream->istream.eof = sstream->fd_input->eof;
361
0
    stream->istream.stream_errno =
362
0
      sstream->fd_input->stream_errno;
363
0
  } else {
364
0
    ret = -2;
365
0
  }
366
367
0
  stream->buffer = i_stream_get_data(sstream->fd_input, &pos);
368
0
  stream->pos -= stream->skip;
369
0
  stream->skip = 0;
370
371
0
  ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : ret;
372
0
  stream->pos = pos;
373
0
  return ret;
374
0
}
375
376
static int
377
i_stream_seekable_stat(struct istream_private *stream, bool exact)
378
0
{
379
0
  struct seekable_istream *sstream =
380
0
    container_of(stream, struct seekable_istream, istream);
381
0
  const struct stat *st;
382
0
  uoff_t old_offset, len;
383
0
  ssize_t ret;
384
385
0
  if (sstream->size != UOFF_T_MAX) {
386
    /* we've already reached EOF and know the size */
387
0
    stream->statbuf.st_size = sstream->size;
388
0
    return 0;
389
0
  }
390
391
  /* we want to know the full size of the file, so read until
392
     we're finished */
393
0
  old_offset = stream->istream.v_offset;
394
0
  do {
395
0
    i_stream_skip(&stream->istream,
396
0
            stream->pos - stream->skip);
397
0
  } while ((ret = i_stream_seekable_read(stream)) > 0);
398
399
0
  if (ret == 0) {
400
0
    i_panic("i_stream_stat() used for non-blocking "
401
0
      "seekable stream %s offset %"PRIuUOFF_T,
402
0
      i_stream_get_name(sstream->cur_input),
403
0
      sstream->cur_input->v_offset);
404
0
  }
405
0
  i_stream_skip(&stream->istream, stream->pos - stream->skip);
406
0
  len = stream->pos;
407
0
  i_stream_seek(&stream->istream, old_offset);
408
0
  unref_streams(sstream);
409
410
0
  if (stream->istream.stream_errno != 0)
411
0
    return -1;
412
413
0
  if (sstream->fd_input != NULL) {
414
    /* using a file backed buffer, we can use real fstat() */
415
0
    if (i_stream_stat(sstream->fd_input, exact, &st) < 0)
416
0
      return -1;
417
0
    stream->statbuf = *st;
418
0
  } else {
419
    /* buffer is completely in memory */
420
0
    i_assert(sstream->fd == -1);
421
422
0
    stream->statbuf.st_size = len;
423
0
  }
424
0
  return 0;
425
0
}
426
427
static void i_stream_seekable_seek(struct istream_private *stream,
428
           uoff_t v_offset, bool mark)
429
0
{
430
0
  if (v_offset <= stream->istream.v_offset) {
431
    /* seeking backwards */
432
0
    stream->istream.v_offset = v_offset;
433
0
    stream->skip = stream->pos = 0;
434
0
  } else {
435
    /* we can't skip over data we haven't yet read and written to
436
       our buffer/temp file */
437
0
    i_stream_default_seek_nonseekable(stream, v_offset, mark);
438
0
  }
439
0
}
440
441
static struct istream_snapshot *
442
i_stream_seekable_snapshot(struct istream_private *stream,
443
         struct istream_snapshot *prev_snapshot)
444
0
{
445
0
  struct seekable_istream *sstream =
446
0
    container_of(stream, struct seekable_istream, istream);
447
448
0
  if (sstream->fd == -1) {
449
    /* still in memory */
450
0
    if (stream->memarea == NULL)
451
0
      return prev_snapshot;
452
0
    return i_stream_default_snapshot(stream, prev_snapshot);
453
0
  } else {
454
    /* using the fd_input stream */
455
0
    return sstream->fd_input->real_stream->
456
0
      snapshot(sstream->fd_input->real_stream, prev_snapshot);
457
0
  }
458
0
}
459
460
struct istream *
461
i_streams_merge(struct istream *input[], size_t max_buffer_size,
462
    int (*fd_callback)(const char **path_r, void *context),
463
    void *context) ATTR_NULL(4)
464
0
{
465
0
  struct seekable_istream *sstream;
466
0
  const unsigned char *data;
467
0
  unsigned int count;
468
0
  size_t size;
469
0
  bool blocking = TRUE;
470
471
0
  i_assert(max_buffer_size > 0);
472
473
  /* if any of the streams isn't blocking, set ourself also nonblocking */
474
0
  for (count = 0; input[count] != NULL; count++) {
475
0
    if (!input[count]->blocking)
476
0
      blocking = FALSE;
477
0
    i_stream_ref(input[count]);
478
0
  }
479
0
  i_assert(count != 0);
480
481
0
  sstream = i_new(struct seekable_istream, 1);
482
0
  sstream->fd_callback = fd_callback;
483
0
  sstream->context = context;
484
0
        sstream->istream.max_buffer_size = max_buffer_size;
485
0
  sstream->fd = -1;
486
0
  sstream->size = UOFF_T_MAX;
487
488
0
  sstream->input = i_new(struct istream *, count + 1);
489
0
  memcpy(sstream->input, input, sizeof(*input) * count);
490
0
  sstream->cur_input = sstream->input[0];
491
492
0
  sstream->istream.iostream.close = i_stream_seekable_close;
493
0
  sstream->istream.iostream.destroy = i_stream_seekable_destroy;
494
0
  sstream->istream.iostream.set_max_buffer_size =
495
0
    i_stream_seekable_set_max_buffer_size;
496
497
0
  sstream->istream.read = i_stream_seekable_read;
498
0
  sstream->istream.stat = i_stream_seekable_stat;
499
0
  sstream->istream.seek = i_stream_seekable_seek;
500
0
  sstream->istream.snapshot = i_stream_seekable_snapshot;
501
502
0
  sstream->istream.istream.readable_fd = FALSE;
503
0
  sstream->istream.istream.blocking = blocking;
504
0
  sstream->istream.istream.seekable = TRUE;
505
0
  (void)i_stream_create(&sstream->istream, NULL, -1, 0);
506
507
  /* initialize our buffer from first stream's pending data */
508
0
  data = i_stream_get_data(sstream->cur_input, &size);
509
0
  if (size > 0) {
510
0
    memcpy(i_stream_alloc(&sstream->istream, size), data, size);
511
0
    sstream->buffer_peak = size;
512
0
    i_stream_skip(sstream->cur_input, size);
513
0
  }
514
0
  return &sstream->istream.istream;
515
0
}
516
517
static bool inputs_are_seekable(struct istream *input[])
518
0
{
519
0
  unsigned int count;
520
521
0
  for (count = 0; input[count] != NULL; count++) {
522
0
    if (!input[count]->seekable)
523
0
      return FALSE;
524
0
  }
525
0
  return TRUE;
526
0
}
527
528
struct istream *
529
i_stream_create_seekable(struct istream *input[],
530
       size_t max_buffer_size,
531
       int (*fd_callback)(const char **path_r, void *context),
532
       void *context)
533
0
{
534
0
  i_assert(max_buffer_size > 0);
535
536
  /* If all input streams are seekable, use concat istream instead */
537
0
  if (inputs_are_seekable(input))
538
0
    return i_stream_create_concat(input);
539
540
0
  return i_streams_merge(input, max_buffer_size, fd_callback, context);
541
0
}
542
543
static int seekable_fd_callback(const char **path_r, void *context)
544
0
{
545
0
  char *temp_path_prefix = context;
546
0
  string_t *path;
547
0
  int fd;
548
549
0
  path = t_str_new(128);
550
0
  str_append(path, temp_path_prefix);
551
0
  fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1);
552
0
  if (fd == -1) {
553
0
    i_error("istream-seekable: safe_mkstemp(%s) failed: %m", str_c(path));
554
0
    return -1;
555
0
  }
556
557
  /* we just want the fd, unlink it */
558
0
  if (i_unlink(str_c(path)) < 0) {
559
    /* shouldn't happen.. */
560
0
    i_close_fd(&fd);
561
0
    return -1;
562
0
  }
563
564
0
  *path_r = str_c(path);
565
0
  return fd;
566
0
}
567
568
struct istream *
569
i_stream_create_seekable_path(struct istream *input[],
570
            size_t max_buffer_size,
571
            const char *temp_path_prefix)
572
0
{
573
0
  struct seekable_istream *sstream;
574
0
  struct istream *stream;
575
576
0
  i_assert(temp_path_prefix != NULL);
577
0
  i_assert(max_buffer_size > 0);
578
579
0
  if (inputs_are_seekable(input))
580
0
    return i_stream_create_concat(input);
581
582
0
  stream = i_stream_create_seekable(input, max_buffer_size,
583
0
            seekable_fd_callback,
584
0
            i_strdup(temp_path_prefix));
585
0
  sstream = container_of(stream->real_stream,
586
0
             struct seekable_istream, istream);
587
0
  sstream->free_context = TRUE;
588
0
  return stream;
589
0
}