Coverage Report

Created: 2025-12-14 06:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/dovecot/src/lib/ostream.c
Line
Count
Source
1
/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3
#include "lib.h"
4
#include "istream.h"
5
#include "ostream-private.h"
6
7
void o_stream_set_name(struct ostream *stream, const char *name)
8
0
{
9
0
  i_free(stream->real_stream->iostream.name);
10
0
  stream->real_stream->iostream.name = i_strdup(name);
11
0
}
12
13
const char *o_stream_get_name(struct ostream *stream)
14
0
{
15
0
  while (stream->real_stream->iostream.name == NULL) {
16
0
    stream = stream->real_stream->parent;
17
0
    if (stream == NULL)
18
0
      return "";
19
0
  }
20
0
  return stream->real_stream->iostream.name;
21
0
}
22
23
int o_stream_get_fd(struct ostream *stream)
24
0
{
25
0
  return stream->real_stream->fd;
26
0
}
27
28
const char *o_stream_get_error(struct ostream *stream)
29
0
{
30
0
  struct ostream *s;
31
32
  /* we'll only return errors for streams that have stream_errno set.
33
     we might be returning unintended error otherwise. */
34
0
  if (stream->stream_errno == 0)
35
0
    return "<no error>";
36
37
0
  for (s = stream; s != NULL; s = s->real_stream->parent) {
38
0
    if (s->stream_errno == 0)
39
0
      break;
40
0
    if (s->real_stream->iostream.error != NULL)
41
0
      return s->real_stream->iostream.error;
42
0
  }
43
0
  return strerror(stream->stream_errno);
44
0
}
45
46
const char *o_stream_get_disconnect_reason(struct ostream *stream)
47
0
{
48
0
  return io_stream_get_disconnect_reason(NULL, stream);
49
0
}
50
51
static void o_stream_close_full(struct ostream *stream, bool close_parents)
52
0
{
53
  /* Ideally o_stream_finish() would be called for all non-failed
54
     ostreams, but strictly requiring it would cause unnecessary
55
     complexity for many callers. Just require that at this point
56
     after flushing there isn't anything in the output buffer or that
57
     we're ignoring all errors. */
58
0
  bool last_errors_not_checked =
59
0
    stream->real_stream->last_errors_not_checked;
60
61
0
  if (o_stream_flush(stream) == 0)
62
0
    i_assert(stream->real_stream->error_handling_disabled);
63
64
  /* We don't want this auto-flushing to remove the need for
65
     proper error checking. */
66
0
  if (last_errors_not_checked)
67
0
    stream->real_stream->last_errors_not_checked = TRUE;
68
69
0
  if (!stream->closed && !stream->real_stream->closing) {
70
    /* first mark the stream as being closed so the
71
       o_stream_copy_error_from_parent() won't recurse us back
72
       here. but don't immediately mark the stream closed, because
73
       we may still want to write something to it. */
74
0
    stream->real_stream->closing = TRUE;
75
0
    io_stream_close(&stream->real_stream->iostream, close_parents);
76
0
    stream->closed = TRUE;
77
0
  }
78
79
0
  if (stream->stream_errno == 0)
80
0
    stream->stream_errno = EPIPE;
81
0
}
82
83
void o_stream_destroy(struct ostream **_stream)
84
0
{
85
0
  struct ostream *stream = *_stream;
86
87
0
  if (stream == NULL)
88
0
    return;
89
90
0
  *_stream = NULL;
91
0
  o_stream_close_full(stream, FALSE);
92
0
  o_stream_unref(&stream);
93
0
}
94
95
void o_stream_ref(struct ostream *stream)
96
0
{
97
0
  io_stream_ref(&stream->real_stream->iostream);
98
0
}
99
100
void o_stream_unref(struct ostream **_stream)
101
0
{
102
0
  struct ostream *stream;
103
104
0
  if (*_stream == NULL)
105
0
    return;
106
107
0
  stream = *_stream;
108
109
0
  if (stream->real_stream->last_errors_not_checked &&
110
0
      !stream->real_stream->error_handling_disabled &&
111
0
      stream->real_stream->iostream.refcount == 1) {
112
0
    i_panic("output stream %s is missing error handling",
113
0
      o_stream_get_name(stream));
114
0
  }
115
116
0
  if (!io_stream_unref(&stream->real_stream->iostream))
117
0
    io_stream_free(&stream->real_stream->iostream);
118
0
  *_stream = NULL;
119
0
}
120
121
#undef o_stream_add_destroy_callback
122
void o_stream_add_destroy_callback(struct ostream *stream,
123
           ostream_callback_t *callback, void *context)
124
0
{
125
0
  io_stream_add_destroy_callback(&stream->real_stream->iostream,
126
0
               callback, context);
127
0
}
128
129
void o_stream_remove_destroy_callback(struct ostream *stream,
130
              void (*callback)())
131
0
{
132
0
  io_stream_remove_destroy_callback(&stream->real_stream->iostream,
133
0
            callback);
134
0
}
135
136
void o_stream_close(struct ostream *stream)
137
0
{
138
0
  if (stream != NULL)
139
0
    o_stream_close_full(stream, TRUE);
140
0
}
141
142
static int o_stream_default_buffering_flush(struct ostream_private *_stream)
143
0
{
144
0
  struct ostream *ostream = &_stream->ostream;
145
0
  int ret, ret2;
146
147
  /* try to actually flush the pending data */
148
0
  if ((ret = o_stream_flush(_stream->buffering_parent)) < 0)
149
0
    return -1;
150
151
  /* we may be able to copy more data, try it */
152
0
  o_stream_ref(ostream);
153
0
  if (_stream->callback != NULL)
154
0
    ret2 = _stream->callback(_stream->context);
155
0
  else
156
0
    ret2 = o_stream_flush(ostream);
157
0
  if (ret2 == 0)
158
0
    o_stream_set_flush_pending(_stream->buffering_parent, TRUE);
159
0
  o_stream_unref(&ostream);
160
0
  if (ret2 < 0)
161
0
    return -1;
162
0
  return ret > 0 && ret2 > 0 ? 1 : 0;
163
0
}
164
165
void o_stream_init_buffering_flush(struct ostream_private *_stream,
166
           struct ostream *parent)
167
0
{
168
0
  _stream->buffering_parent = parent;
169
0
  _stream->callback = parent->real_stream->callback;
170
0
  _stream->context = parent->real_stream->context;
171
172
0
  o_stream_set_flush_callback(parent, o_stream_default_buffering_flush,
173
0
            _stream);
174
0
}
175
176
#undef o_stream_set_flush_callback
177
void o_stream_set_flush_callback(struct ostream *stream,
178
         stream_flush_callback_t *callback,
179
         void *context)
180
0
{
181
0
  struct ostream_private *_stream = stream->real_stream;
182
183
0
  _stream->set_flush_callback(_stream, callback, context);
184
0
}
185
186
void o_stream_unset_flush_callback(struct ostream *stream)
187
0
{
188
0
  struct ostream_private *_stream = stream->real_stream;
189
190
0
  _stream->set_flush_callback(_stream, NULL, NULL);
191
0
}
192
193
stream_flush_callback_t *
194
o_stream_get_flush_callback(struct ostream *stream, void **context_r)
195
0
{
196
0
  struct ostream_private *_stream = stream->real_stream;
197
0
  *context_r = _stream->context;
198
0
  return _stream->callback;
199
0
}
200
201
void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
202
0
{
203
0
  io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
204
0
}
205
206
size_t o_stream_get_max_buffer_size(struct ostream *stream)
207
0
{
208
0
  return stream->real_stream->max_buffer_size;
209
0
}
210
211
void o_stream_cork(struct ostream *stream)
212
0
{
213
0
  struct ostream_private *_stream = stream->real_stream;
214
215
0
  if (unlikely(stream->closed || stream->stream_errno != 0))
216
0
    return;
217
218
0
  _stream->cork(_stream, TRUE);
219
0
}
220
221
void o_stream_uncork(struct ostream *stream)
222
0
{
223
0
  struct ostream_private *_stream = stream->real_stream;
224
225
0
  if (unlikely(stream->closed || stream->stream_errno != 0))
226
0
    return;
227
228
0
  _stream->cork(_stream, FALSE);
229
0
}
230
231
bool o_stream_is_corked(struct ostream *stream)
232
0
{
233
0
  struct ostream_private *_stream = stream->real_stream;
234
235
0
  return _stream->corked;
236
0
}
237
238
int o_stream_flush(struct ostream *stream)
239
0
{
240
0
  struct ostream_private *_stream = stream->real_stream;
241
0
  int ret = 1;
242
243
0
  o_stream_ignore_last_errors(stream);
244
245
0
  if (unlikely(stream->closed || stream->stream_errno != 0)) {
246
0
    errno = stream->stream_errno;
247
0
    return -1;
248
0
  }
249
250
0
  if (unlikely(_stream->noverflow)) {
251
0
    io_stream_set_error(&_stream->iostream,
252
0
      "Output stream buffer was full (%zu bytes)",
253
0
      o_stream_get_max_buffer_size(stream));
254
0
    errno = stream->stream_errno = ENOBUFS;
255
0
    return -1;
256
0
  }
257
258
0
  if (unlikely((ret = _stream->flush(_stream)) < 0)) {
259
0
    i_assert(stream->stream_errno != 0);
260
0
    errno = stream->stream_errno;
261
0
  }
262
0
  return ret;
263
0
}
264
265
void o_stream_set_flush_pending(struct ostream *stream, bool set)
266
0
{
267
0
  struct ostream_private *_stream = stream->real_stream;
268
269
0
  if (unlikely(stream->closed || stream->stream_errno != 0))
270
0
    return;
271
272
0
  _stream->flush_pending(_stream, set);
273
0
}
274
275
size_t o_stream_get_buffer_used_size(const struct ostream *stream)
276
0
{
277
0
  const struct ostream_private *_stream = stream->real_stream;
278
279
0
  return _stream->get_buffer_used_size(_stream);
280
0
}
281
282
size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
283
0
{
284
0
  const struct ostream_private *_stream = stream->real_stream;
285
286
0
  return _stream->get_buffer_avail_size(_stream);
287
0
}
288
289
int o_stream_seek(struct ostream *stream, uoff_t offset)
290
0
{
291
0
  struct ostream_private *_stream = stream->real_stream;
292
293
0
  if (unlikely(stream->closed || stream->stream_errno != 0)) {
294
0
    errno = stream->stream_errno;
295
0
    return -1;
296
0
  }
297
298
0
  if (unlikely(_stream->seek(_stream, offset) < 0)) {
299
0
    i_assert(stream->stream_errno != 0);
300
0
    errno = stream->stream_errno;
301
0
    return -1;
302
0
  }
303
0
  return 1;
304
0
}
305
306
ssize_t o_stream_send(struct ostream *stream, const void *data, size_t size)
307
0
{
308
0
  struct const_iovec iov;
309
310
0
  i_zero(&iov);
311
0
  iov.iov_base = data;
312
0
  iov.iov_len = size;
313
314
0
  return o_stream_sendv(stream, &iov, 1);
315
0
}
316
317
static ssize_t
318
o_stream_sendv_int(struct ostream *stream, const struct const_iovec *iov,
319
       unsigned int iov_count, bool *overflow_r)
320
0
{
321
0
  struct ostream_private *_stream = stream->real_stream;
322
0
  unsigned int i;
323
0
  size_t total_size;
324
0
  ssize_t ret;
325
326
0
  *overflow_r = FALSE;
327
328
0
  for (i = 0, total_size = 0; i < iov_count; i++)
329
0
    total_size += iov[i].iov_len;
330
0
  if (total_size == 0)
331
0
    return 0;
332
333
0
  i_assert(!_stream->finished);
334
0
  ret = _stream->sendv(_stream, iov, iov_count);
335
0
  if (ret > 0)
336
0
    stream->real_stream->last_write_timeval = ioloop_timeval;
337
0
  if (unlikely(ret != (ssize_t)total_size)) {
338
0
    if (ret < 0) {
339
0
      i_assert(stream->stream_errno != 0);
340
0
      errno = stream->stream_errno;
341
0
    } else {
342
0
      i_assert(!stream->blocking);
343
0
      stream->overflow = TRUE;
344
0
      *overflow_r = TRUE;
345
0
    }
346
0
  }
347
0
  return ret;
348
0
}
349
350
ssize_t o_stream_sendv(struct ostream *stream, const struct const_iovec *iov,
351
           unsigned int iov_count)
352
0
{
353
0
  bool overflow;
354
355
0
  if (unlikely(stream->closed || stream->stream_errno != 0)) {
356
0
    errno = stream->stream_errno;
357
0
    return -1;
358
0
  }
359
0
  return o_stream_sendv_int(stream, iov, iov_count, &overflow);
360
0
}
361
362
ssize_t o_stream_send_str(struct ostream *stream, const char *str)
363
0
{
364
0
  return o_stream_send(stream, str, strlen(str));
365
0
}
366
367
void o_stream_nsend(struct ostream *stream, const void *data, size_t size)
368
0
{
369
0
  struct const_iovec iov;
370
371
0
  i_zero(&iov);
372
0
  iov.iov_base = data;
373
0
  iov.iov_len = size;
374
375
0
  o_stream_nsendv(stream, &iov, 1);
376
0
}
377
378
void o_stream_nsendv(struct ostream *stream, const struct const_iovec *iov,
379
         unsigned int iov_count)
380
0
{
381
0
  bool overflow;
382
383
0
  if (unlikely(stream->closed || stream->stream_errno != 0 ||
384
0
         stream->real_stream->noverflow))
385
0
    return;
386
0
  (void)o_stream_sendv_int(stream, iov, iov_count, &overflow);
387
0
  if (overflow)
388
0
    stream->real_stream->noverflow = TRUE;
389
0
  stream->real_stream->last_errors_not_checked = TRUE;
390
0
}
391
392
void o_stream_nsend_str(struct ostream *stream, const char *str)
393
0
{
394
0
  o_stream_nsend(stream, str, strlen(str));
395
0
}
396
397
int o_stream_finish(struct ostream *stream)
398
0
{
399
0
  stream->real_stream->finished = TRUE;
400
0
  return o_stream_flush(stream);
401
0
}
402
403
void o_stream_set_finish_also_parent(struct ostream *stream, bool set)
404
0
{
405
0
  stream->real_stream->finish_also_parent = set;
406
0
}
407
408
void o_stream_set_finish_via_child(struct ostream *stream, bool set)
409
0
{
410
0
  stream->real_stream->finish_via_child = set;
411
0
}
412
413
void o_stream_ignore_last_errors(struct ostream *stream)
414
0
{
415
0
  while (stream != NULL) {
416
0
    stream->real_stream->last_errors_not_checked = FALSE;
417
0
    stream = stream->real_stream->parent;
418
0
  }
419
0
}
420
421
void o_stream_abort(struct ostream *stream)
422
0
{
423
0
  o_stream_ignore_last_errors(stream);
424
0
  if (stream->stream_errno != 0)
425
0
    return;
426
0
  io_stream_set_error(&stream->real_stream->iostream, "aborted writing");
427
0
  stream->stream_errno = EPIPE;
428
0
}
429
430
void o_stream_set_no_error_handling(struct ostream *stream, bool set)
431
0
{
432
0
  stream->real_stream->error_handling_disabled = set;
433
0
}
434
435
enum ostream_send_istream_result
436
o_stream_send_istream(struct ostream *outstream, struct istream *instream)
437
0
{
438
0
  struct ostream_private *_outstream = outstream->real_stream;
439
0
  uoff_t old_outstream_offset = outstream->offset;
440
0
  uoff_t old_instream_offset = instream->v_offset;
441
0
  enum ostream_send_istream_result res;
442
443
0
  if (unlikely(instream->closed || instream->stream_errno != 0)) {
444
0
    errno = instream->stream_errno;
445
0
    return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
446
0
  }
447
0
  if (unlikely(outstream->closed || outstream->stream_errno != 0)) {
448
0
    errno = outstream->stream_errno;
449
0
    return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
450
0
  }
451
452
0
  i_assert(!_outstream->finished);
453
0
  res = _outstream->send_istream(_outstream, instream);
454
0
  switch (res) {
455
0
  case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
456
0
    i_assert(instream->stream_errno == 0);
457
0
    i_assert(outstream->stream_errno == 0);
458
0
    i_assert(!i_stream_have_bytes_left(instream));
459
0
    break;
460
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
461
0
    i_assert(!instream->blocking);
462
0
    break;
463
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
464
0
    i_assert(!outstream->blocking);
465
0
    o_stream_set_flush_pending(outstream, TRUE);
466
0
    break;
467
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
468
0
    i_assert(instream->stream_errno != 0);
469
0
    return res;
470
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
471
0
    i_assert(outstream->stream_errno != 0);
472
0
    return res;
473
0
  }
474
  /* non-failure - make sure stream offsets match */
475
0
  i_assert((outstream->offset - old_outstream_offset) ==
476
0
     (instream->v_offset - old_instream_offset));
477
478
0
  if (outstream->offset != old_outstream_offset)
479
0
    outstream->real_stream->last_write_timeval = ioloop_timeval;
480
0
  return res;
481
0
}
482
483
void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream)
484
0
{
485
0
  i_assert(instream->blocking);
486
487
0
  switch (o_stream_send_istream(outstream, instream)) {
488
0
  case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
489
0
    break;
490
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
491
0
    i_unreached();
492
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
493
0
    outstream->real_stream->noverflow = TRUE;
494
0
    break;
495
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
496
0
    outstream->stream_errno = instream->stream_errno;
497
0
    io_stream_set_error(&outstream->real_stream->iostream,
498
0
      "nsend-istream: read(%s) failed: %s",
499
0
      i_stream_get_name(instream),
500
0
      i_stream_get_error(instream));
501
0
    break;
502
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
503
0
    break;
504
0
  }
505
0
  outstream->real_stream->last_errors_not_checked = TRUE;
506
0
}
507
508
int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
509
        uoff_t offset)
510
0
{
511
0
  int ret;
512
513
0
  if (unlikely(stream->closed || stream->stream_errno != 0)) {
514
0
    errno = stream->stream_errno;
515
0
    return -1;
516
0
  }
517
518
0
  i_assert(!stream->real_stream->finished);
519
0
  ret = stream->real_stream->write_at(stream->real_stream,
520
0
              data, size, offset);
521
0
  if (ret > 0)
522
0
    stream->real_stream->last_write_timeval = ioloop_timeval;
523
0
  else if (unlikely(ret < 0)) {
524
0
    i_assert(stream->stream_errno != 0);
525
0
    errno = stream->stream_errno;
526
0
  }
527
528
0
  return ret;
529
0
}
530
531
void o_stream_get_last_write_time(struct ostream *stream, struct timeval *tv_r)
532
0
{
533
0
  *tv_r = stream->real_stream->last_write_timeval;
534
0
}
535
536
enum ostream_send_istream_result
537
io_stream_copy(struct ostream *outstream, struct istream *instream)
538
0
{
539
0
  struct const_iovec iov;
540
0
  const unsigned char *data;
541
0
  ssize_t ret;
542
543
0
  while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) {
544
0
    iov.iov_base = data;
545
0
    if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0)
546
0
      return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT;
547
0
    else if (ret == 0)
548
0
      return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT;
549
0
    i_stream_skip(instream, ret);
550
0
  }
551
552
0
  if (instream->stream_errno != 0)
553
0
    return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT;
554
0
  if (i_stream_have_bytes_left(instream))
555
0
    return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT;
556
0
  return OSTREAM_SEND_ISTREAM_RESULT_FINISHED;
557
0
}
558
559
void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop)
560
0
{
561
0
  struct ostream_private *_stream = stream->real_stream;
562
563
0
  io_stream_switch_ioloop_to(&_stream->iostream, ioloop);
564
565
0
  _stream->switch_ioloop_to(_stream, ioloop);
566
0
}
567
568
void o_stream_switch_ioloop(struct ostream *stream)
569
0
{
570
0
  o_stream_switch_ioloop_to(stream, current_ioloop);
571
0
}
572
573
static void o_stream_default_close(struct iostream_private *stream,
574
           bool close_parent)
575
0
{
576
0
  struct ostream_private *_stream =
577
0
    container_of(stream, struct ostream_private, iostream);
578
579
0
  (void)o_stream_flush(&_stream->ostream);
580
0
  if (close_parent)
581
0
    o_stream_close(_stream->parent);
582
0
}
583
584
static void o_stream_default_destroy(struct iostream_private *stream)
585
0
{
586
0
  struct ostream_private *_stream =
587
0
    container_of(stream, struct ostream_private, iostream);
588
589
0
  o_stream_unref(&_stream->parent);
590
0
}
591
592
static void
593
o_stream_default_set_max_buffer_size(struct iostream_private *stream,
594
             size_t max_size)
595
0
{
596
0
  struct ostream_private *_stream =
597
0
    container_of(stream, struct ostream_private, iostream);
598
599
0
  if (_stream->parent != NULL)
600
0
    o_stream_set_max_buffer_size(_stream->parent, max_size);
601
0
  _stream->max_buffer_size = max_size;
602
0
}
603
604
static void o_stream_default_cork(struct ostream_private *_stream, bool set)
605
0
{
606
0
  _stream->corked = set;
607
0
  if (set) {
608
0
    if (_stream->parent != NULL)
609
0
      o_stream_cork(_stream->parent);
610
0
  } else {
611
0
    (void)o_stream_flush(&_stream->ostream);
612
0
    _stream->last_errors_not_checked = TRUE;
613
614
0
    if (_stream->parent != NULL)
615
0
      o_stream_uncork(_stream->parent);
616
0
  }
617
0
}
618
619
void o_stream_copy_error_from_parent(struct ostream_private *_stream)
620
0
{
621
0
  struct ostream *src = _stream->parent;
622
0
  struct ostream *dest = &_stream->ostream;
623
624
0
  i_assert(src->stream_errno != 0);
625
626
0
  dest->stream_errno = src->stream_errno;
627
0
  dest->overflow = src->overflow;
628
0
  if (src->closed)
629
0
    o_stream_close(dest);
630
0
}
631
632
int o_stream_flush_parent_if_needed(struct ostream_private *_stream)
633
0
{
634
0
  if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE) {
635
    /* we already have quite a lot of data in parent stream.
636
       unless we can flush it, don't add any more to it or we
637
       could keep wasting memory by just increasing the buffer
638
       size all the time. */
639
0
    if (o_stream_flush(_stream->parent) < 0) {
640
0
      o_stream_copy_error_from_parent(_stream);
641
0
      return -1;
642
0
    }
643
0
    if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE)
644
0
      return 0;
645
0
  }
646
0
  return 1;
647
0
}
648
649
int o_stream_flush_parent(struct ostream_private *_stream)
650
0
{
651
0
  int ret;
652
653
0
  i_assert(_stream->parent != NULL);
654
655
0
  if (!_stream->finished || !_stream->finish_also_parent ||
656
0
      !_stream->parent->real_stream->finish_via_child)
657
0
    ret = o_stream_flush(_stream->parent);
658
0
  else
659
0
    ret = o_stream_finish(_stream->parent);
660
0
  if (ret < 0)
661
0
    o_stream_copy_error_from_parent(_stream);
662
0
  return ret;
663
0
}
664
665
static int o_stream_default_flush(struct ostream_private *_stream)
666
0
{
667
0
  if (_stream->parent == NULL)
668
0
    return 1;
669
670
0
  return o_stream_flush_parent(_stream);
671
0
}
672
673
static void
674
o_stream_default_set_flush_callback(struct ostream_private *_stream,
675
            stream_flush_callback_t *callback,
676
            void *context)
677
0
{
678
0
  if (_stream->parent != NULL && _stream->buffering_parent == NULL)
679
0
    o_stream_set_flush_callback(_stream->parent, callback, context);
680
681
0
  _stream->callback = callback;
682
0
  _stream->context = context;
683
0
}
684
685
static void
686
o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
687
0
{
688
0
  if (_stream->parent != NULL)
689
0
    o_stream_set_flush_pending(_stream->parent, set);
690
0
}
691
692
static size_t
693
o_stream_default_get_buffer_used_size(const struct ostream_private *_stream)
694
0
{
695
0
  if (_stream->parent == NULL)
696
0
    return 0;
697
0
  else
698
0
    return o_stream_get_buffer_used_size(_stream->parent);
699
0
}
700
701
static size_t
702
o_stream_default_get_buffer_avail_size(const struct ostream_private *_stream)
703
0
{
704
  /* This default implementation assumes that the returned buffer size is
705
     between 0..max_buffer_size. There's no assert though, in case the
706
     max_buffer_size changes. */
707
0
  size_t used = o_stream_get_buffer_used_size(&_stream->ostream);
708
709
0
  return _stream->max_buffer_size <= used ? 0 :
710
0
    _stream->max_buffer_size - used;
711
0
}
712
713
static int
714
o_stream_default_seek(struct ostream_private *_stream,
715
          uoff_t offset ATTR_UNUSED)
716
0
{
717
0
  _stream->ostream.stream_errno = ESPIPE;
718
0
  return -1;
719
0
}
720
721
static ssize_t
722
o_stream_default_sendv(struct ostream_private *stream,
723
           const struct const_iovec *iov, unsigned int iov_count)
724
0
{
725
0
  ssize_t ret;
726
727
0
  if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
728
0
    o_stream_copy_error_from_parent(stream);
729
0
    return -1;
730
0
  }
731
0
  stream->ostream.offset += ret;
732
0
  return ret;
733
0
}
734
735
static int
736
o_stream_default_write_at(struct ostream_private *_stream,
737
        const void *data ATTR_UNUSED,
738
        size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
739
0
{
740
0
  _stream->ostream.stream_errno = ESPIPE;
741
0
  return -1;
742
0
}
743
744
static enum ostream_send_istream_result
745
o_stream_default_send_istream(struct ostream_private *outstream,
746
            struct istream *instream)
747
0
{
748
0
  return io_stream_copy(&outstream->ostream, instream);
749
0
}
750
751
static void
752
o_stream_default_switch_ioloop_to(struct ostream_private *_stream,
753
          struct ioloop *ioloop)
754
0
{
755
0
  if (_stream->parent != NULL)
756
0
    o_stream_switch_ioloop_to(_stream->parent, ioloop);
757
0
}
758
759
struct ostream *
760
o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd)
761
0
{
762
0
  _stream->finish_also_parent = TRUE;
763
0
  _stream->finish_via_child = TRUE;
764
0
  _stream->fd = fd;
765
0
  _stream->ostream.real_stream = _stream;
766
0
  if (parent != NULL) {
767
0
    _stream->ostream.blocking = parent->blocking;
768
0
    _stream->parent = parent;
769
0
    o_stream_ref(parent);
770
771
0
    if (_stream->buffering_parent == NULL) {
772
0
      _stream->callback = parent->real_stream->callback;
773
0
      _stream->context = parent->real_stream->context;
774
0
    }
775
0
    _stream->max_buffer_size = parent->real_stream->max_buffer_size;
776
0
    _stream->error_handling_disabled =
777
0
      parent->real_stream->error_handling_disabled;
778
0
  }
779
780
0
  if (_stream->iostream.close == NULL)
781
0
    _stream->iostream.close = o_stream_default_close;
782
0
  if (_stream->iostream.destroy == NULL)
783
0
    _stream->iostream.destroy = o_stream_default_destroy;
784
0
  if (_stream->iostream.set_max_buffer_size == NULL) {
785
0
    _stream->iostream.set_max_buffer_size =
786
0
      o_stream_default_set_max_buffer_size;
787
0
  }
788
789
0
  if (_stream->cork == NULL)
790
0
    _stream->cork = o_stream_default_cork;
791
0
  if (_stream->flush == NULL)
792
0
    _stream->flush = o_stream_default_flush;
793
0
  if (_stream->set_flush_callback == NULL) {
794
0
    _stream->set_flush_callback =
795
0
      o_stream_default_set_flush_callback;
796
0
  }
797
0
  if (_stream->flush_pending == NULL)
798
0
    _stream->flush_pending = o_stream_default_set_flush_pending;
799
0
  if (_stream->get_buffer_used_size == NULL)
800
0
    _stream->get_buffer_used_size =
801
0
      o_stream_default_get_buffer_used_size;
802
0
  if (_stream->get_buffer_avail_size == NULL) {
803
0
    _stream->get_buffer_avail_size =
804
0
      o_stream_default_get_buffer_avail_size;
805
0
  }
806
0
  if (_stream->seek == NULL)
807
0
    _stream->seek = o_stream_default_seek;
808
0
  if (_stream->sendv == NULL)
809
0
    _stream->sendv = o_stream_default_sendv;
810
0
  if (_stream->write_at == NULL)
811
0
    _stream->write_at = o_stream_default_write_at;
812
0
  if (_stream->send_istream == NULL)
813
0
    _stream->send_istream = o_stream_default_send_istream;
814
0
  if (_stream->switch_ioloop_to == NULL)
815
0
    _stream->switch_ioloop_to = o_stream_default_switch_ioloop_to;
816
817
0
  io_stream_init(&_stream->iostream);
818
0
  return &_stream->ostream;
819
0
}
820
821
struct ostream *o_stream_create_error(int stream_errno)
822
0
{
823
0
  struct ostream_private *stream;
824
0
  struct ostream *output;
825
826
0
  stream = i_new(struct ostream_private, 1);
827
0
  stream->ostream.blocking = TRUE;
828
0
  stream->ostream.closed = TRUE;
829
0
  stream->ostream.stream_errno = stream_errno;
830
831
0
  output = o_stream_create(stream, NULL, -1);
832
0
  o_stream_set_no_error_handling(output, TRUE);
833
0
  o_stream_set_name(output, "(error)");
834
0
  return output;
835
0
}
836
837
struct ostream *
838
o_stream_create_error_str(int stream_errno, const char *fmt, ...)
839
0
{
840
0
  struct ostream *output;
841
0
  va_list args;
842
843
0
  va_start(args, fmt);
844
0
  output = o_stream_create_error(stream_errno);
845
0
  io_stream_set_verror(&output->real_stream->iostream, fmt, args);
846
0
  va_end(args);
847
0
  return output;
848
0
}
849
850
struct ostream *o_stream_create_passthrough(struct ostream *output)
851
0
{
852
0
  struct ostream_private *stream;
853
854
0
  stream = i_new(struct ostream_private, 1);
855
0
  return o_stream_create(stream, output, o_stream_get_fd(output));
856
0
}