/src/dovecot/src/lib/ostream.c
Line | Count | Source |
1 | | /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */ |
2 | | |
3 | | #include "lib.h" |
4 | | #include "istream.h" |
5 | | #include "ostream-private.h" |
6 | | |
7 | | void o_stream_set_name(struct ostream *stream, const char *name) |
8 | 0 | { |
9 | 0 | i_free(stream->real_stream->iostream.name); |
10 | 0 | stream->real_stream->iostream.name = i_strdup(name); |
11 | 0 | } |
12 | | |
13 | | const char *o_stream_get_name(struct ostream *stream) |
14 | 0 | { |
15 | 0 | while (stream->real_stream->iostream.name == NULL) { |
16 | 0 | stream = stream->real_stream->parent; |
17 | 0 | if (stream == NULL) |
18 | 0 | return ""; |
19 | 0 | } |
20 | 0 | return stream->real_stream->iostream.name; |
21 | 0 | } |
22 | | |
23 | | int o_stream_get_fd(struct ostream *stream) |
24 | 0 | { |
25 | 0 | return stream->real_stream->fd; |
26 | 0 | } |
27 | | |
28 | | const char *o_stream_get_error(struct ostream *stream) |
29 | 0 | { |
30 | 0 | struct ostream *s; |
31 | | |
32 | | /* we'll only return errors for streams that have stream_errno set. |
33 | | we might be returning unintended error otherwise. */ |
34 | 0 | if (stream->stream_errno == 0) |
35 | 0 | return "<no error>"; |
36 | | |
37 | 0 | for (s = stream; s != NULL; s = s->real_stream->parent) { |
38 | 0 | if (s->stream_errno == 0) |
39 | 0 | break; |
40 | 0 | if (s->real_stream->iostream.error != NULL) |
41 | 0 | return s->real_stream->iostream.error; |
42 | 0 | } |
43 | 0 | return strerror(stream->stream_errno); |
44 | 0 | } |
45 | | |
46 | | const char *o_stream_get_disconnect_reason(struct ostream *stream) |
47 | 0 | { |
48 | 0 | return io_stream_get_disconnect_reason(NULL, stream); |
49 | 0 | } |
50 | | |
51 | | static void o_stream_close_full(struct ostream *stream, bool close_parents) |
52 | 0 | { |
53 | | /* Ideally o_stream_finish() would be called for all non-failed |
54 | | ostreams, but strictly requiring it would cause unnecessary |
55 | | complexity for many callers. Just require that at this point |
56 | | after flushing there isn't anything in the output buffer or that |
57 | | we're ignoring all errors. */ |
58 | 0 | bool last_errors_not_checked = |
59 | 0 | stream->real_stream->last_errors_not_checked; |
60 | |
|
61 | 0 | if (o_stream_flush(stream) == 0) |
62 | 0 | i_assert(stream->real_stream->error_handling_disabled); |
63 | | |
64 | | /* We don't want this auto-flushing to remove the need for |
65 | | proper error checking. */ |
66 | 0 | if (last_errors_not_checked) |
67 | 0 | stream->real_stream->last_errors_not_checked = TRUE; |
68 | |
|
69 | 0 | if (!stream->closed && !stream->real_stream->closing) { |
70 | | /* first mark the stream as being closed so the |
71 | | o_stream_copy_error_from_parent() won't recurse us back |
72 | | here. but don't immediately mark the stream closed, because |
73 | | we may still want to write something to it. */ |
74 | 0 | stream->real_stream->closing = TRUE; |
75 | 0 | io_stream_close(&stream->real_stream->iostream, close_parents); |
76 | 0 | stream->closed = TRUE; |
77 | 0 | } |
78 | |
|
79 | 0 | if (stream->stream_errno == 0) |
80 | 0 | stream->stream_errno = EPIPE; |
81 | 0 | } |
82 | | |
83 | | void o_stream_destroy(struct ostream **_stream) |
84 | 0 | { |
85 | 0 | struct ostream *stream = *_stream; |
86 | |
|
87 | 0 | if (stream == NULL) |
88 | 0 | return; |
89 | | |
90 | 0 | *_stream = NULL; |
91 | 0 | o_stream_close_full(stream, FALSE); |
92 | 0 | o_stream_unref(&stream); |
93 | 0 | } |
94 | | |
95 | | void o_stream_ref(struct ostream *stream) |
96 | 0 | { |
97 | 0 | io_stream_ref(&stream->real_stream->iostream); |
98 | 0 | } |
99 | | |
100 | | void o_stream_unref(struct ostream **_stream) |
101 | 0 | { |
102 | 0 | struct ostream *stream; |
103 | |
|
104 | 0 | if (*_stream == NULL) |
105 | 0 | return; |
106 | | |
107 | 0 | stream = *_stream; |
108 | |
|
109 | 0 | if (stream->real_stream->last_errors_not_checked && |
110 | 0 | !stream->real_stream->error_handling_disabled && |
111 | 0 | stream->real_stream->iostream.refcount == 1) { |
112 | 0 | i_panic("output stream %s is missing error handling", |
113 | 0 | o_stream_get_name(stream)); |
114 | 0 | } |
115 | | |
116 | 0 | if (!io_stream_unref(&stream->real_stream->iostream)) |
117 | 0 | io_stream_free(&stream->real_stream->iostream); |
118 | 0 | *_stream = NULL; |
119 | 0 | } |
120 | | |
121 | | #undef o_stream_add_destroy_callback |
122 | | void o_stream_add_destroy_callback(struct ostream *stream, |
123 | | ostream_callback_t *callback, void *context) |
124 | 0 | { |
125 | 0 | io_stream_add_destroy_callback(&stream->real_stream->iostream, |
126 | 0 | callback, context); |
127 | 0 | } |
128 | | |
129 | | void o_stream_remove_destroy_callback(struct ostream *stream, |
130 | | void (*callback)()) |
131 | 0 | { |
132 | 0 | io_stream_remove_destroy_callback(&stream->real_stream->iostream, |
133 | 0 | callback); |
134 | 0 | } |
135 | | |
136 | | void o_stream_close(struct ostream *stream) |
137 | 0 | { |
138 | 0 | if (stream != NULL) |
139 | 0 | o_stream_close_full(stream, TRUE); |
140 | 0 | } |
141 | | |
142 | | static int o_stream_default_buffering_flush(struct ostream_private *_stream) |
143 | 0 | { |
144 | 0 | struct ostream *ostream = &_stream->ostream; |
145 | 0 | int ret, ret2; |
146 | | |
147 | | /* try to actually flush the pending data */ |
148 | 0 | if ((ret = o_stream_flush(_stream->buffering_parent)) < 0) |
149 | 0 | return -1; |
150 | | |
151 | | /* we may be able to copy more data, try it */ |
152 | 0 | o_stream_ref(ostream); |
153 | 0 | if (_stream->callback != NULL) |
154 | 0 | ret2 = _stream->callback(_stream->context); |
155 | 0 | else |
156 | 0 | ret2 = o_stream_flush(ostream); |
157 | 0 | if (ret2 == 0) |
158 | 0 | o_stream_set_flush_pending(_stream->buffering_parent, TRUE); |
159 | 0 | o_stream_unref(&ostream); |
160 | 0 | if (ret2 < 0) |
161 | 0 | return -1; |
162 | 0 | return ret > 0 && ret2 > 0 ? 1 : 0; |
163 | 0 | } |
164 | | |
165 | | void o_stream_init_buffering_flush(struct ostream_private *_stream, |
166 | | struct ostream *parent) |
167 | 0 | { |
168 | 0 | _stream->buffering_parent = parent; |
169 | 0 | _stream->callback = parent->real_stream->callback; |
170 | 0 | _stream->context = parent->real_stream->context; |
171 | |
|
172 | 0 | o_stream_set_flush_callback(parent, o_stream_default_buffering_flush, |
173 | 0 | _stream); |
174 | 0 | } |
175 | | |
176 | | #undef o_stream_set_flush_callback |
177 | | void o_stream_set_flush_callback(struct ostream *stream, |
178 | | stream_flush_callback_t *callback, |
179 | | void *context) |
180 | 0 | { |
181 | 0 | struct ostream_private *_stream = stream->real_stream; |
182 | |
|
183 | 0 | _stream->set_flush_callback(_stream, callback, context); |
184 | 0 | } |
185 | | |
186 | | void o_stream_unset_flush_callback(struct ostream *stream) |
187 | 0 | { |
188 | 0 | struct ostream_private *_stream = stream->real_stream; |
189 | |
|
190 | 0 | _stream->set_flush_callback(_stream, NULL, NULL); |
191 | 0 | } |
192 | | |
193 | | stream_flush_callback_t * |
194 | | o_stream_get_flush_callback(struct ostream *stream, void **context_r) |
195 | 0 | { |
196 | 0 | struct ostream_private *_stream = stream->real_stream; |
197 | 0 | *context_r = _stream->context; |
198 | 0 | return _stream->callback; |
199 | 0 | } |
200 | | |
201 | | void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size) |
202 | 0 | { |
203 | 0 | io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size); |
204 | 0 | } |
205 | | |
206 | | size_t o_stream_get_max_buffer_size(struct ostream *stream) |
207 | 0 | { |
208 | 0 | return stream->real_stream->max_buffer_size; |
209 | 0 | } |
210 | | |
211 | | void o_stream_cork(struct ostream *stream) |
212 | 0 | { |
213 | 0 | struct ostream_private *_stream = stream->real_stream; |
214 | |
|
215 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) |
216 | 0 | return; |
217 | | |
218 | 0 | _stream->cork(_stream, TRUE); |
219 | 0 | } |
220 | | |
221 | | void o_stream_uncork(struct ostream *stream) |
222 | 0 | { |
223 | 0 | struct ostream_private *_stream = stream->real_stream; |
224 | |
|
225 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) |
226 | 0 | return; |
227 | | |
228 | 0 | _stream->cork(_stream, FALSE); |
229 | 0 | } |
230 | | |
231 | | bool o_stream_is_corked(struct ostream *stream) |
232 | 0 | { |
233 | 0 | struct ostream_private *_stream = stream->real_stream; |
234 | |
|
235 | 0 | return _stream->corked; |
236 | 0 | } |
237 | | |
238 | | int o_stream_flush(struct ostream *stream) |
239 | 0 | { |
240 | 0 | struct ostream_private *_stream = stream->real_stream; |
241 | 0 | int ret = 1; |
242 | |
|
243 | 0 | o_stream_ignore_last_errors(stream); |
244 | |
|
245 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) { |
246 | 0 | errno = stream->stream_errno; |
247 | 0 | return -1; |
248 | 0 | } |
249 | | |
250 | 0 | if (unlikely(_stream->noverflow)) { |
251 | 0 | io_stream_set_error(&_stream->iostream, |
252 | 0 | "Output stream buffer was full (%zu bytes)", |
253 | 0 | o_stream_get_max_buffer_size(stream)); |
254 | 0 | errno = stream->stream_errno = ENOBUFS; |
255 | 0 | return -1; |
256 | 0 | } |
257 | | |
258 | 0 | if (unlikely((ret = _stream->flush(_stream)) < 0)) { |
259 | 0 | i_assert(stream->stream_errno != 0); |
260 | 0 | errno = stream->stream_errno; |
261 | 0 | } |
262 | 0 | return ret; |
263 | 0 | } |
264 | | |
265 | | void o_stream_set_flush_pending(struct ostream *stream, bool set) |
266 | 0 | { |
267 | 0 | struct ostream_private *_stream = stream->real_stream; |
268 | |
|
269 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) |
270 | 0 | return; |
271 | | |
272 | 0 | _stream->flush_pending(_stream, set); |
273 | 0 | } |
274 | | |
275 | | size_t o_stream_get_buffer_used_size(const struct ostream *stream) |
276 | 0 | { |
277 | 0 | const struct ostream_private *_stream = stream->real_stream; |
278 | |
|
279 | 0 | return _stream->get_buffer_used_size(_stream); |
280 | 0 | } |
281 | | |
282 | | size_t o_stream_get_buffer_avail_size(const struct ostream *stream) |
283 | 0 | { |
284 | 0 | const struct ostream_private *_stream = stream->real_stream; |
285 | |
|
286 | 0 | return _stream->get_buffer_avail_size(_stream); |
287 | 0 | } |
288 | | |
289 | | int o_stream_seek(struct ostream *stream, uoff_t offset) |
290 | 0 | { |
291 | 0 | struct ostream_private *_stream = stream->real_stream; |
292 | |
|
293 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) { |
294 | 0 | errno = stream->stream_errno; |
295 | 0 | return -1; |
296 | 0 | } |
297 | | |
298 | 0 | if (unlikely(_stream->seek(_stream, offset) < 0)) { |
299 | 0 | i_assert(stream->stream_errno != 0); |
300 | 0 | errno = stream->stream_errno; |
301 | 0 | return -1; |
302 | 0 | } |
303 | 0 | return 1; |
304 | 0 | } |
305 | | |
306 | | ssize_t o_stream_send(struct ostream *stream, const void *data, size_t size) |
307 | 0 | { |
308 | 0 | struct const_iovec iov; |
309 | |
|
310 | 0 | i_zero(&iov); |
311 | 0 | iov.iov_base = data; |
312 | 0 | iov.iov_len = size; |
313 | |
|
314 | 0 | return o_stream_sendv(stream, &iov, 1); |
315 | 0 | } |
316 | | |
317 | | static ssize_t |
318 | | o_stream_sendv_int(struct ostream *stream, const struct const_iovec *iov, |
319 | | unsigned int iov_count, bool *overflow_r) |
320 | 0 | { |
321 | 0 | struct ostream_private *_stream = stream->real_stream; |
322 | 0 | unsigned int i; |
323 | 0 | size_t total_size; |
324 | 0 | ssize_t ret; |
325 | |
|
326 | 0 | *overflow_r = FALSE; |
327 | |
|
328 | 0 | for (i = 0, total_size = 0; i < iov_count; i++) |
329 | 0 | total_size += iov[i].iov_len; |
330 | 0 | if (total_size == 0) |
331 | 0 | return 0; |
332 | | |
333 | 0 | i_assert(!_stream->finished); |
334 | 0 | ret = _stream->sendv(_stream, iov, iov_count); |
335 | 0 | if (ret > 0) |
336 | 0 | stream->real_stream->last_write_timeval = ioloop_timeval; |
337 | 0 | if (unlikely(ret != (ssize_t)total_size)) { |
338 | 0 | if (ret < 0) { |
339 | 0 | i_assert(stream->stream_errno != 0); |
340 | 0 | errno = stream->stream_errno; |
341 | 0 | } else { |
342 | 0 | i_assert(!stream->blocking); |
343 | 0 | stream->overflow = TRUE; |
344 | 0 | *overflow_r = TRUE; |
345 | 0 | } |
346 | 0 | } |
347 | 0 | return ret; |
348 | 0 | } |
349 | | |
350 | | ssize_t o_stream_sendv(struct ostream *stream, const struct const_iovec *iov, |
351 | | unsigned int iov_count) |
352 | 0 | { |
353 | 0 | bool overflow; |
354 | |
|
355 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) { |
356 | 0 | errno = stream->stream_errno; |
357 | 0 | return -1; |
358 | 0 | } |
359 | 0 | return o_stream_sendv_int(stream, iov, iov_count, &overflow); |
360 | 0 | } |
361 | | |
362 | | ssize_t o_stream_send_str(struct ostream *stream, const char *str) |
363 | 0 | { |
364 | 0 | return o_stream_send(stream, str, strlen(str)); |
365 | 0 | } |
366 | | |
367 | | void o_stream_nsend(struct ostream *stream, const void *data, size_t size) |
368 | 0 | { |
369 | 0 | struct const_iovec iov; |
370 | |
|
371 | 0 | i_zero(&iov); |
372 | 0 | iov.iov_base = data; |
373 | 0 | iov.iov_len = size; |
374 | |
|
375 | 0 | o_stream_nsendv(stream, &iov, 1); |
376 | 0 | } |
377 | | |
378 | | void o_stream_nsendv(struct ostream *stream, const struct const_iovec *iov, |
379 | | unsigned int iov_count) |
380 | 0 | { |
381 | 0 | bool overflow; |
382 | |
|
383 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0 || |
384 | 0 | stream->real_stream->noverflow)) |
385 | 0 | return; |
386 | 0 | (void)o_stream_sendv_int(stream, iov, iov_count, &overflow); |
387 | 0 | if (overflow) |
388 | 0 | stream->real_stream->noverflow = TRUE; |
389 | 0 | stream->real_stream->last_errors_not_checked = TRUE; |
390 | 0 | } |
391 | | |
392 | | void o_stream_nsend_str(struct ostream *stream, const char *str) |
393 | 0 | { |
394 | 0 | o_stream_nsend(stream, str, strlen(str)); |
395 | 0 | } |
396 | | |
397 | | int o_stream_finish(struct ostream *stream) |
398 | 0 | { |
399 | 0 | stream->real_stream->finished = TRUE; |
400 | 0 | return o_stream_flush(stream); |
401 | 0 | } |
402 | | |
403 | | void o_stream_set_finish_also_parent(struct ostream *stream, bool set) |
404 | 0 | { |
405 | 0 | stream->real_stream->finish_also_parent = set; |
406 | 0 | } |
407 | | |
408 | | void o_stream_set_finish_via_child(struct ostream *stream, bool set) |
409 | 0 | { |
410 | 0 | stream->real_stream->finish_via_child = set; |
411 | 0 | } |
412 | | |
413 | | void o_stream_ignore_last_errors(struct ostream *stream) |
414 | 0 | { |
415 | 0 | while (stream != NULL) { |
416 | 0 | stream->real_stream->last_errors_not_checked = FALSE; |
417 | 0 | stream = stream->real_stream->parent; |
418 | 0 | } |
419 | 0 | } |
420 | | |
421 | | void o_stream_abort(struct ostream *stream) |
422 | 0 | { |
423 | 0 | o_stream_ignore_last_errors(stream); |
424 | 0 | if (stream->stream_errno != 0) |
425 | 0 | return; |
426 | 0 | io_stream_set_error(&stream->real_stream->iostream, "aborted writing"); |
427 | 0 | stream->stream_errno = EPIPE; |
428 | 0 | } |
429 | | |
430 | | void o_stream_set_no_error_handling(struct ostream *stream, bool set) |
431 | 0 | { |
432 | 0 | stream->real_stream->error_handling_disabled = set; |
433 | 0 | } |
434 | | |
435 | | enum ostream_send_istream_result |
436 | | o_stream_send_istream(struct ostream *outstream, struct istream *instream) |
437 | 0 | { |
438 | 0 | struct ostream_private *_outstream = outstream->real_stream; |
439 | 0 | uoff_t old_outstream_offset = outstream->offset; |
440 | 0 | uoff_t old_instream_offset = instream->v_offset; |
441 | 0 | enum ostream_send_istream_result res; |
442 | |
|
443 | 0 | if (unlikely(instream->closed || instream->stream_errno != 0)) { |
444 | 0 | errno = instream->stream_errno; |
445 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; |
446 | 0 | } |
447 | 0 | if (unlikely(outstream->closed || outstream->stream_errno != 0)) { |
448 | 0 | errno = outstream->stream_errno; |
449 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; |
450 | 0 | } |
451 | | |
452 | 0 | i_assert(!_outstream->finished); |
453 | 0 | res = _outstream->send_istream(_outstream, instream); |
454 | 0 | switch (res) { |
455 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: |
456 | 0 | i_assert(instream->stream_errno == 0); |
457 | 0 | i_assert(outstream->stream_errno == 0); |
458 | 0 | i_assert(!i_stream_have_bytes_left(instream)); |
459 | 0 | break; |
460 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: |
461 | 0 | i_assert(!instream->blocking); |
462 | 0 | break; |
463 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: |
464 | 0 | i_assert(!outstream->blocking); |
465 | 0 | o_stream_set_flush_pending(outstream, TRUE); |
466 | 0 | break; |
467 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: |
468 | 0 | i_assert(instream->stream_errno != 0); |
469 | 0 | return res; |
470 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: |
471 | 0 | i_assert(outstream->stream_errno != 0); |
472 | 0 | return res; |
473 | 0 | } |
474 | | /* non-failure - make sure stream offsets match */ |
475 | 0 | i_assert((outstream->offset - old_outstream_offset) == |
476 | 0 | (instream->v_offset - old_instream_offset)); |
477 | | |
478 | 0 | if (outstream->offset != old_outstream_offset) |
479 | 0 | outstream->real_stream->last_write_timeval = ioloop_timeval; |
480 | 0 | return res; |
481 | 0 | } |
482 | | |
483 | | void o_stream_nsend_istream(struct ostream *outstream, struct istream *instream) |
484 | 0 | { |
485 | 0 | i_assert(instream->blocking); |
486 | | |
487 | 0 | switch (o_stream_send_istream(outstream, instream)) { |
488 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: |
489 | 0 | break; |
490 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: |
491 | 0 | i_unreached(); |
492 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: |
493 | 0 | outstream->real_stream->noverflow = TRUE; |
494 | 0 | break; |
495 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: |
496 | 0 | outstream->stream_errno = instream->stream_errno; |
497 | 0 | io_stream_set_error(&outstream->real_stream->iostream, |
498 | 0 | "nsend-istream: read(%s) failed: %s", |
499 | 0 | i_stream_get_name(instream), |
500 | 0 | i_stream_get_error(instream)); |
501 | 0 | break; |
502 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: |
503 | 0 | break; |
504 | 0 | } |
505 | 0 | outstream->real_stream->last_errors_not_checked = TRUE; |
506 | 0 | } |
507 | | |
508 | | int o_stream_pwrite(struct ostream *stream, const void *data, size_t size, |
509 | | uoff_t offset) |
510 | 0 | { |
511 | 0 | int ret; |
512 | |
|
513 | 0 | if (unlikely(stream->closed || stream->stream_errno != 0)) { |
514 | 0 | errno = stream->stream_errno; |
515 | 0 | return -1; |
516 | 0 | } |
517 | | |
518 | 0 | i_assert(!stream->real_stream->finished); |
519 | 0 | ret = stream->real_stream->write_at(stream->real_stream, |
520 | 0 | data, size, offset); |
521 | 0 | if (ret > 0) |
522 | 0 | stream->real_stream->last_write_timeval = ioloop_timeval; |
523 | 0 | else if (unlikely(ret < 0)) { |
524 | 0 | i_assert(stream->stream_errno != 0); |
525 | 0 | errno = stream->stream_errno; |
526 | 0 | } |
527 | | |
528 | 0 | return ret; |
529 | 0 | } |
530 | | |
531 | | void o_stream_get_last_write_time(struct ostream *stream, struct timeval *tv_r) |
532 | 0 | { |
533 | 0 | *tv_r = stream->real_stream->last_write_timeval; |
534 | 0 | } |
535 | | |
536 | | enum ostream_send_istream_result |
537 | | io_stream_copy(struct ostream *outstream, struct istream *instream) |
538 | 0 | { |
539 | 0 | struct const_iovec iov; |
540 | 0 | const unsigned char *data; |
541 | 0 | ssize_t ret; |
542 | |
|
543 | 0 | while (i_stream_read_more(instream, &data, &iov.iov_len) > 0) { |
544 | 0 | iov.iov_base = data; |
545 | 0 | if ((ret = o_stream_sendv(outstream, &iov, 1)) < 0) |
546 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT; |
547 | 0 | else if (ret == 0) |
548 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT; |
549 | 0 | i_stream_skip(instream, ret); |
550 | 0 | } |
551 | | |
552 | 0 | if (instream->stream_errno != 0) |
553 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT; |
554 | 0 | if (i_stream_have_bytes_left(instream)) |
555 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT; |
556 | 0 | return OSTREAM_SEND_ISTREAM_RESULT_FINISHED; |
557 | 0 | } |
558 | | |
559 | | void o_stream_switch_ioloop_to(struct ostream *stream, struct ioloop *ioloop) |
560 | 0 | { |
561 | 0 | struct ostream_private *_stream = stream->real_stream; |
562 | |
|
563 | 0 | io_stream_switch_ioloop_to(&_stream->iostream, ioloop); |
564 | |
|
565 | 0 | _stream->switch_ioloop_to(_stream, ioloop); |
566 | 0 | } |
567 | | |
568 | | void o_stream_switch_ioloop(struct ostream *stream) |
569 | 0 | { |
570 | 0 | o_stream_switch_ioloop_to(stream, current_ioloop); |
571 | 0 | } |
572 | | |
573 | | static void o_stream_default_close(struct iostream_private *stream, |
574 | | bool close_parent) |
575 | 0 | { |
576 | 0 | struct ostream_private *_stream = |
577 | 0 | container_of(stream, struct ostream_private, iostream); |
578 | |
|
579 | 0 | (void)o_stream_flush(&_stream->ostream); |
580 | 0 | if (close_parent) |
581 | 0 | o_stream_close(_stream->parent); |
582 | 0 | } |
583 | | |
584 | | static void o_stream_default_destroy(struct iostream_private *stream) |
585 | 0 | { |
586 | 0 | struct ostream_private *_stream = |
587 | 0 | container_of(stream, struct ostream_private, iostream); |
588 | |
|
589 | 0 | o_stream_unref(&_stream->parent); |
590 | 0 | } |
591 | | |
592 | | static void |
593 | | o_stream_default_set_max_buffer_size(struct iostream_private *stream, |
594 | | size_t max_size) |
595 | 0 | { |
596 | 0 | struct ostream_private *_stream = |
597 | 0 | container_of(stream, struct ostream_private, iostream); |
598 | |
|
599 | 0 | if (_stream->parent != NULL) |
600 | 0 | o_stream_set_max_buffer_size(_stream->parent, max_size); |
601 | 0 | _stream->max_buffer_size = max_size; |
602 | 0 | } |
603 | | |
604 | | static void o_stream_default_cork(struct ostream_private *_stream, bool set) |
605 | 0 | { |
606 | 0 | _stream->corked = set; |
607 | 0 | if (set) { |
608 | 0 | if (_stream->parent != NULL) |
609 | 0 | o_stream_cork(_stream->parent); |
610 | 0 | } else { |
611 | 0 | (void)o_stream_flush(&_stream->ostream); |
612 | 0 | _stream->last_errors_not_checked = TRUE; |
613 | |
|
614 | 0 | if (_stream->parent != NULL) |
615 | 0 | o_stream_uncork(_stream->parent); |
616 | 0 | } |
617 | 0 | } |
618 | | |
619 | | void o_stream_copy_error_from_parent(struct ostream_private *_stream) |
620 | 0 | { |
621 | 0 | struct ostream *src = _stream->parent; |
622 | 0 | struct ostream *dest = &_stream->ostream; |
623 | |
|
624 | 0 | i_assert(src->stream_errno != 0); |
625 | | |
626 | 0 | dest->stream_errno = src->stream_errno; |
627 | 0 | dest->overflow = src->overflow; |
628 | 0 | if (src->closed) |
629 | 0 | o_stream_close(dest); |
630 | 0 | } |
631 | | |
632 | | int o_stream_flush_parent_if_needed(struct ostream_private *_stream) |
633 | 0 | { |
634 | 0 | if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE) { |
635 | | /* we already have quite a lot of data in parent stream. |
636 | | unless we can flush it, don't add any more to it or we |
637 | | could keep wasting memory by just increasing the buffer |
638 | | size all the time. */ |
639 | 0 | if (o_stream_flush(_stream->parent) < 0) { |
640 | 0 | o_stream_copy_error_from_parent(_stream); |
641 | 0 | return -1; |
642 | 0 | } |
643 | 0 | if (o_stream_get_buffer_used_size(_stream->parent) >= IO_BLOCK_SIZE) |
644 | 0 | return 0; |
645 | 0 | } |
646 | 0 | return 1; |
647 | 0 | } |
648 | | |
649 | | int o_stream_flush_parent(struct ostream_private *_stream) |
650 | 0 | { |
651 | 0 | int ret; |
652 | |
|
653 | 0 | i_assert(_stream->parent != NULL); |
654 | | |
655 | 0 | if (!_stream->finished || !_stream->finish_also_parent || |
656 | 0 | !_stream->parent->real_stream->finish_via_child) |
657 | 0 | ret = o_stream_flush(_stream->parent); |
658 | 0 | else |
659 | 0 | ret = o_stream_finish(_stream->parent); |
660 | 0 | if (ret < 0) |
661 | 0 | o_stream_copy_error_from_parent(_stream); |
662 | 0 | return ret; |
663 | 0 | } |
664 | | |
665 | | static int o_stream_default_flush(struct ostream_private *_stream) |
666 | 0 | { |
667 | 0 | if (_stream->parent == NULL) |
668 | 0 | return 1; |
669 | | |
670 | 0 | return o_stream_flush_parent(_stream); |
671 | 0 | } |
672 | | |
673 | | static void |
674 | | o_stream_default_set_flush_callback(struct ostream_private *_stream, |
675 | | stream_flush_callback_t *callback, |
676 | | void *context) |
677 | 0 | { |
678 | 0 | if (_stream->parent != NULL && _stream->buffering_parent == NULL) |
679 | 0 | o_stream_set_flush_callback(_stream->parent, callback, context); |
680 | |
|
681 | 0 | _stream->callback = callback; |
682 | 0 | _stream->context = context; |
683 | 0 | } |
684 | | |
685 | | static void |
686 | | o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set) |
687 | 0 | { |
688 | 0 | if (_stream->parent != NULL) |
689 | 0 | o_stream_set_flush_pending(_stream->parent, set); |
690 | 0 | } |
691 | | |
692 | | static size_t |
693 | | o_stream_default_get_buffer_used_size(const struct ostream_private *_stream) |
694 | 0 | { |
695 | 0 | if (_stream->parent == NULL) |
696 | 0 | return 0; |
697 | 0 | else |
698 | 0 | return o_stream_get_buffer_used_size(_stream->parent); |
699 | 0 | } |
700 | | |
701 | | static size_t |
702 | | o_stream_default_get_buffer_avail_size(const struct ostream_private *_stream) |
703 | 0 | { |
704 | | /* This default implementation assumes that the returned buffer size is |
705 | | between 0..max_buffer_size. There's no assert though, in case the |
706 | | max_buffer_size changes. */ |
707 | 0 | size_t used = o_stream_get_buffer_used_size(&_stream->ostream); |
708 | |
|
709 | 0 | return _stream->max_buffer_size <= used ? 0 : |
710 | 0 | _stream->max_buffer_size - used; |
711 | 0 | } |
712 | | |
713 | | static int |
714 | | o_stream_default_seek(struct ostream_private *_stream, |
715 | | uoff_t offset ATTR_UNUSED) |
716 | 0 | { |
717 | 0 | _stream->ostream.stream_errno = ESPIPE; |
718 | 0 | return -1; |
719 | 0 | } |
720 | | |
721 | | static ssize_t |
722 | | o_stream_default_sendv(struct ostream_private *stream, |
723 | | const struct const_iovec *iov, unsigned int iov_count) |
724 | 0 | { |
725 | 0 | ssize_t ret; |
726 | |
|
727 | 0 | if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) { |
728 | 0 | o_stream_copy_error_from_parent(stream); |
729 | 0 | return -1; |
730 | 0 | } |
731 | 0 | stream->ostream.offset += ret; |
732 | 0 | return ret; |
733 | 0 | } |
734 | | |
735 | | static int |
736 | | o_stream_default_write_at(struct ostream_private *_stream, |
737 | | const void *data ATTR_UNUSED, |
738 | | size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED) |
739 | 0 | { |
740 | 0 | _stream->ostream.stream_errno = ESPIPE; |
741 | 0 | return -1; |
742 | 0 | } |
743 | | |
744 | | static enum ostream_send_istream_result |
745 | | o_stream_default_send_istream(struct ostream_private *outstream, |
746 | | struct istream *instream) |
747 | 0 | { |
748 | 0 | return io_stream_copy(&outstream->ostream, instream); |
749 | 0 | } |
750 | | |
751 | | static void |
752 | | o_stream_default_switch_ioloop_to(struct ostream_private *_stream, |
753 | | struct ioloop *ioloop) |
754 | 0 | { |
755 | 0 | if (_stream->parent != NULL) |
756 | 0 | o_stream_switch_ioloop_to(_stream->parent, ioloop); |
757 | 0 | } |
758 | | |
759 | | struct ostream * |
760 | | o_stream_create(struct ostream_private *_stream, struct ostream *parent, int fd) |
761 | 0 | { |
762 | 0 | _stream->finish_also_parent = TRUE; |
763 | 0 | _stream->finish_via_child = TRUE; |
764 | 0 | _stream->fd = fd; |
765 | 0 | _stream->ostream.real_stream = _stream; |
766 | 0 | if (parent != NULL) { |
767 | 0 | _stream->ostream.blocking = parent->blocking; |
768 | 0 | _stream->parent = parent; |
769 | 0 | o_stream_ref(parent); |
770 | |
|
771 | 0 | if (_stream->buffering_parent == NULL) { |
772 | 0 | _stream->callback = parent->real_stream->callback; |
773 | 0 | _stream->context = parent->real_stream->context; |
774 | 0 | } |
775 | 0 | _stream->max_buffer_size = parent->real_stream->max_buffer_size; |
776 | 0 | _stream->error_handling_disabled = |
777 | 0 | parent->real_stream->error_handling_disabled; |
778 | 0 | } |
779 | |
|
780 | 0 | if (_stream->iostream.close == NULL) |
781 | 0 | _stream->iostream.close = o_stream_default_close; |
782 | 0 | if (_stream->iostream.destroy == NULL) |
783 | 0 | _stream->iostream.destroy = o_stream_default_destroy; |
784 | 0 | if (_stream->iostream.set_max_buffer_size == NULL) { |
785 | 0 | _stream->iostream.set_max_buffer_size = |
786 | 0 | o_stream_default_set_max_buffer_size; |
787 | 0 | } |
788 | |
|
789 | 0 | if (_stream->cork == NULL) |
790 | 0 | _stream->cork = o_stream_default_cork; |
791 | 0 | if (_stream->flush == NULL) |
792 | 0 | _stream->flush = o_stream_default_flush; |
793 | 0 | if (_stream->set_flush_callback == NULL) { |
794 | 0 | _stream->set_flush_callback = |
795 | 0 | o_stream_default_set_flush_callback; |
796 | 0 | } |
797 | 0 | if (_stream->flush_pending == NULL) |
798 | 0 | _stream->flush_pending = o_stream_default_set_flush_pending; |
799 | 0 | if (_stream->get_buffer_used_size == NULL) |
800 | 0 | _stream->get_buffer_used_size = |
801 | 0 | o_stream_default_get_buffer_used_size; |
802 | 0 | if (_stream->get_buffer_avail_size == NULL) { |
803 | 0 | _stream->get_buffer_avail_size = |
804 | 0 | o_stream_default_get_buffer_avail_size; |
805 | 0 | } |
806 | 0 | if (_stream->seek == NULL) |
807 | 0 | _stream->seek = o_stream_default_seek; |
808 | 0 | if (_stream->sendv == NULL) |
809 | 0 | _stream->sendv = o_stream_default_sendv; |
810 | 0 | if (_stream->write_at == NULL) |
811 | 0 | _stream->write_at = o_stream_default_write_at; |
812 | 0 | if (_stream->send_istream == NULL) |
813 | 0 | _stream->send_istream = o_stream_default_send_istream; |
814 | 0 | if (_stream->switch_ioloop_to == NULL) |
815 | 0 | _stream->switch_ioloop_to = o_stream_default_switch_ioloop_to; |
816 | |
|
817 | 0 | io_stream_init(&_stream->iostream); |
818 | 0 | return &_stream->ostream; |
819 | 0 | } |
820 | | |
821 | | struct ostream *o_stream_create_error(int stream_errno) |
822 | 0 | { |
823 | 0 | struct ostream_private *stream; |
824 | 0 | struct ostream *output; |
825 | |
|
826 | 0 | stream = i_new(struct ostream_private, 1); |
827 | 0 | stream->ostream.blocking = TRUE; |
828 | 0 | stream->ostream.closed = TRUE; |
829 | 0 | stream->ostream.stream_errno = stream_errno; |
830 | |
|
831 | 0 | output = o_stream_create(stream, NULL, -1); |
832 | 0 | o_stream_set_no_error_handling(output, TRUE); |
833 | 0 | o_stream_set_name(output, "(error)"); |
834 | 0 | return output; |
835 | 0 | } |
836 | | |
837 | | struct ostream * |
838 | | o_stream_create_error_str(int stream_errno, const char *fmt, ...) |
839 | 0 | { |
840 | 0 | struct ostream *output; |
841 | 0 | va_list args; |
842 | |
|
843 | 0 | va_start(args, fmt); |
844 | 0 | output = o_stream_create_error(stream_errno); |
845 | 0 | io_stream_set_verror(&output->real_stream->iostream, fmt, args); |
846 | 0 | va_end(args); |
847 | 0 | return output; |
848 | 0 | } |
849 | | |
850 | | struct ostream *o_stream_create_passthrough(struct ostream *output) |
851 | 0 | { |
852 | 0 | struct ostream_private *stream; |
853 | |
|
854 | 0 | stream = i_new(struct ostream_private, 1); |
855 | 0 | return o_stream_create(stream, output, o_stream_get_fd(output)); |
856 | 0 | } |