/src/dovecot/src/lib/iostream-pump.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */ |
2 | | |
3 | | #include "lib.h" |
4 | | #include "buffer.h" |
5 | | #include "str.h" |
6 | | #include "iostream-pump.h" |
7 | | #include "istream.h" |
8 | | #include "ostream.h" |
9 | | #include <unistd.h> |
10 | | |
11 | | #undef iostream_pump_set_completion_callback |
12 | | |
13 | | struct iostream_pump { |
14 | | int refcount; |
15 | | |
16 | | struct istream *input; |
17 | | struct ostream *output; |
18 | | |
19 | | struct io *io; |
20 | | |
21 | | iostream_pump_callback_t *callback; |
22 | | void *context; |
23 | | |
24 | | bool waiting_output; |
25 | | bool completed; |
26 | | }; |
27 | | |
28 | | static void iostream_pump_copy(struct iostream_pump *pump) |
29 | 6.63k | { |
30 | 6.63k | enum ostream_send_istream_result res; |
31 | 6.63k | size_t old_size; |
32 | | |
33 | 6.63k | o_stream_cork(pump->output); |
34 | 6.63k | old_size = o_stream_get_max_buffer_size(pump->output); |
35 | 6.63k | o_stream_set_max_buffer_size(pump->output, |
36 | 6.63k | I_MIN(IO_BLOCK_SIZE, |
37 | 6.63k | o_stream_get_max_buffer_size(pump->output))); |
38 | 6.63k | res = o_stream_send_istream(pump->output, pump->input); |
39 | 6.63k | o_stream_set_max_buffer_size(pump->output, old_size); |
40 | 6.63k | o_stream_uncork(pump->output); |
41 | | |
42 | 6.63k | switch(res) { |
43 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: |
44 | 0 | io_remove(&pump->io); |
45 | 0 | pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR, |
46 | 0 | pump->context); |
47 | 0 | return; |
48 | 0 | case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: |
49 | 0 | io_remove(&pump->io); |
50 | 0 | pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, |
51 | 0 | pump->context); |
52 | 0 | return; |
53 | 288 | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: |
54 | 288 | i_assert(!pump->output->blocking); |
55 | 288 | pump->waiting_output = TRUE; |
56 | 288 | io_remove(&pump->io); |
57 | 288 | return; |
58 | 6.35k | case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: |
59 | 6.35k | pump->waiting_output = FALSE; |
60 | 6.35k | io_remove(&pump->io); |
61 | | /* flush it */ |
62 | 6.35k | switch (o_stream_finish(pump->output)) { |
63 | 0 | case -1: |
64 | 0 | pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, |
65 | 0 | pump->context); |
66 | 0 | break; |
67 | 81 | case 0: |
68 | 81 | pump->waiting_output = TRUE; |
69 | 81 | pump->completed = TRUE; |
70 | 81 | break; |
71 | 6.27k | default: |
72 | 6.27k | pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, |
73 | 6.27k | pump->context); |
74 | 6.27k | break; |
75 | 6.35k | } |
76 | 6.35k | return; |
77 | 6.35k | case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: |
78 | 0 | i_assert(!pump->input->blocking); |
79 | 0 | pump->waiting_output = FALSE; |
80 | 0 | return; |
81 | 6.63k | } |
82 | 6.63k | i_unreached(); |
83 | 6.63k | } |
84 | | |
85 | | static int iostream_pump_flush(struct iostream_pump *pump) |
86 | 6.70k | { |
87 | 6.70k | int ret; |
88 | | |
89 | 6.70k | if ((ret = o_stream_flush(pump->output)) <= 0) { |
90 | 0 | if (ret < 0) { |
91 | 0 | pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR, |
92 | 0 | pump->context); |
93 | 0 | } |
94 | 0 | return ret; |
95 | 0 | } |
96 | 6.70k | pump->waiting_output = FALSE; |
97 | 6.70k | if (pump->completed) { |
98 | 70 | pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context); |
99 | 70 | return 1; |
100 | 70 | } |
101 | | |
102 | 6.63k | if (pump->input->blocking) |
103 | 6.63k | iostream_pump_copy(pump); |
104 | 0 | else if (pump->io == NULL) { |
105 | 0 | pump->io = io_add_istream(pump->input, |
106 | 0 | iostream_pump_copy, pump); |
107 | 0 | io_set_pending(pump->io); |
108 | 0 | } |
109 | 6.63k | return ret; |
110 | 6.70k | } |
111 | | |
112 | | struct iostream_pump * |
113 | | iostream_pump_create(struct istream *input, struct ostream *output) |
114 | 6.37k | { |
115 | 6.37k | struct iostream_pump *pump; |
116 | | |
117 | 6.37k | i_assert(input != NULL && |
118 | 6.37k | output != NULL); |
119 | 6.37k | i_assert(!input->blocking || !output->blocking); |
120 | | |
121 | | /* ref streams */ |
122 | 6.37k | i_stream_ref(input); |
123 | 6.37k | o_stream_ref(output); |
124 | | |
125 | | /* create pump */ |
126 | 6.37k | pump = i_new(struct iostream_pump, 1); |
127 | 6.37k | pump->refcount = 1; |
128 | 6.37k | pump->input = input; |
129 | 6.37k | pump->output = output; |
130 | | |
131 | 6.37k | return pump; |
132 | 6.37k | } |
133 | | |
134 | | void iostream_pump_start(struct iostream_pump *pump) |
135 | 6.37k | { |
136 | 6.37k | i_assert(pump != NULL); |
137 | 6.37k | i_assert(pump->callback != NULL); |
138 | | |
139 | | /* add flush handler */ |
140 | 6.37k | if (!pump->output->blocking) { |
141 | 6.37k | o_stream_set_flush_callback(pump->output, |
142 | 6.37k | iostream_pump_flush, pump); |
143 | 6.37k | } |
144 | | |
145 | | /* make IO objects */ |
146 | 6.37k | if (pump->input->blocking) { |
147 | 6.37k | i_assert(!pump->output->blocking); |
148 | 6.37k | o_stream_set_flush_pending(pump->output, TRUE); |
149 | 6.37k | } else { |
150 | 0 | pump->io = io_add_istream(pump->input, |
151 | 0 | iostream_pump_copy, pump); |
152 | 0 | io_set_pending(pump->io); |
153 | 0 | } |
154 | 6.37k | } |
155 | | |
156 | | struct istream *iostream_pump_get_input(struct iostream_pump *pump) |
157 | 6.34k | { |
158 | 6.34k | i_assert(pump != NULL); |
159 | 6.34k | return pump->input; |
160 | 6.34k | } |
161 | | |
162 | | struct ostream *iostream_pump_get_output(struct iostream_pump *pump) |
163 | 6.34k | { |
164 | 6.34k | i_assert(pump != NULL); |
165 | 6.34k | return pump->output; |
166 | 6.34k | } |
167 | | |
168 | | void iostream_pump_set_completion_callback(struct iostream_pump *pump, |
169 | | iostream_pump_callback_t *callback, |
170 | | void *context) |
171 | 6.37k | { |
172 | 6.37k | i_assert(pump != NULL); |
173 | 6.37k | pump->callback = callback; |
174 | 6.37k | pump->context = context; |
175 | 6.37k | } |
176 | | |
177 | | void iostream_pump_ref(struct iostream_pump *pump) |
178 | 0 | { |
179 | 0 | i_assert(pump != NULL); |
180 | 0 | i_assert(pump->refcount > 0); |
181 | 0 | pump->refcount++; |
182 | 0 | } |
183 | | |
184 | | void iostream_pump_unref(struct iostream_pump **_pump) |
185 | 6.37k | { |
186 | 6.37k | i_assert(_pump != NULL); |
187 | 6.37k | struct iostream_pump *pump = *_pump; |
188 | | |
189 | 6.37k | if (pump == NULL) |
190 | 0 | return; |
191 | | |
192 | 6.37k | i_assert(pump->refcount > 0); |
193 | | |
194 | 6.37k | *_pump = NULL; |
195 | | |
196 | 6.37k | if (--pump->refcount > 0) |
197 | 0 | return; |
198 | | |
199 | 6.37k | iostream_pump_stop(pump); |
200 | | |
201 | 6.37k | o_stream_unref(&pump->output); |
202 | 6.37k | i_stream_unref(&pump->input); |
203 | 6.37k | i_free(pump); |
204 | 6.37k | } |
205 | | |
206 | | void iostream_pump_destroy(struct iostream_pump **_pump) |
207 | 26.9k | { |
208 | 26.9k | i_assert(_pump != NULL); |
209 | 26.9k | struct iostream_pump *pump = *_pump; |
210 | | |
211 | 26.9k | if (pump == NULL) |
212 | 20.5k | return; |
213 | | |
214 | 6.37k | *_pump = NULL; |
215 | | |
216 | 6.37k | iostream_pump_stop(pump); |
217 | 6.37k | o_stream_unref(&pump->output); |
218 | 6.37k | i_stream_unref(&pump->input); |
219 | | |
220 | 6.37k | iostream_pump_unref(&pump); |
221 | 6.37k | } |
222 | | |
223 | | void iostream_pump_stop(struct iostream_pump *pump) |
224 | 12.7k | { |
225 | 12.7k | i_assert(pump != NULL); |
226 | | |
227 | 12.7k | if (pump->output != NULL) |
228 | 6.37k | o_stream_unset_flush_callback(pump->output); |
229 | | |
230 | 12.7k | io_remove(&pump->io); |
231 | 12.7k | } |
232 | | |
233 | | bool iostream_pump_is_waiting_output(struct iostream_pump *pump) |
234 | 0 | { |
235 | 0 | return pump->waiting_output; |
236 | 0 | } |
237 | | |
238 | | void iostream_pump_switch_ioloop_to(struct iostream_pump *pump, |
239 | | struct ioloop *ioloop) |
240 | 0 | { |
241 | 0 | i_assert(pump != NULL); |
242 | 0 | if (pump->io != NULL) |
243 | 0 | pump->io = io_loop_move_io_to(ioloop, &pump->io); |
244 | 0 | o_stream_switch_ioloop_to(pump->output, ioloop); |
245 | 0 | i_stream_switch_ioloop_to(pump->input, ioloop); |
246 | 0 | } |
247 | | |
248 | | void iostream_pump_switch_ioloop(struct iostream_pump *pump) |
249 | 0 | { |
250 | 0 | iostream_pump_switch_ioloop_to(pump, current_ioloop); |
251 | 0 | } |