Coverage Report

Created: 2026-01-10 07:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/dovecot/src/lib/iostream-pump.c
Line
Count
Source
1
/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3
#include "lib.h"
4
#include "buffer.h"
5
#include "str.h"
6
#include "iostream-pump.h"
7
#include "istream.h"
8
#include "ostream.h"
9
#include <unistd.h>
10
11
#undef iostream_pump_set_completion_callback
12
13
struct iostream_pump {
14
  int refcount;
15
16
  struct istream *input;
17
  struct ostream *output;
18
19
  struct io *io;
20
21
  iostream_pump_callback_t *callback;
22
  void *context;
23
24
  bool waiting_output;
25
  bool completed;
26
};
27
28
static void iostream_pump_copy(struct iostream_pump *pump)
29
0
{
30
0
  enum ostream_send_istream_result res;
31
0
  size_t old_size;
32
33
0
  o_stream_cork(pump->output);
34
0
  old_size = o_stream_get_max_buffer_size(pump->output);
35
0
  o_stream_set_max_buffer_size(pump->output,
36
0
    I_MIN(IO_BLOCK_SIZE,
37
0
          o_stream_get_max_buffer_size(pump->output)));
38
0
  res = o_stream_send_istream(pump->output, pump->input);
39
0
  o_stream_set_max_buffer_size(pump->output, old_size);
40
0
  o_stream_uncork(pump->output);
41
42
0
  switch(res) {
43
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
44
0
    io_remove(&pump->io);
45
0
    pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR,
46
0
             pump->context);
47
0
    return;
48
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
49
0
    io_remove(&pump->io);
50
0
    pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
51
0
             pump->context);
52
0
    return;
53
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
54
0
    i_assert(!pump->output->blocking);
55
0
    pump->waiting_output = TRUE;
56
0
    io_remove(&pump->io);
57
0
    return;
58
0
  case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
59
0
    pump->waiting_output = FALSE;
60
0
    io_remove(&pump->io);
61
    /* flush it */
62
0
    switch (o_stream_finish(pump->output)) {
63
0
    case -1:
64
0
      pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
65
0
               pump->context);
66
0
      break;
67
0
    case 0:
68
0
      pump->waiting_output = TRUE;
69
0
      pump->completed = TRUE;
70
0
      break;
71
0
    default:
72
0
      pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF,
73
0
               pump->context);
74
0
      break;
75
0
    }
76
0
    return;
77
0
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
78
0
    i_assert(!pump->input->blocking);
79
0
    pump->waiting_output = FALSE;
80
0
    return;
81
0
  }
82
0
  i_unreached();
83
0
}
84
85
static int iostream_pump_flush(struct iostream_pump *pump)
86
0
{
87
0
  int ret;
88
89
0
  if ((ret = o_stream_flush(pump->output)) <= 0) {
90
0
    if (ret < 0) {
91
0
      pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
92
0
               pump->context);
93
0
    }
94
0
    return ret;
95
0
  }
96
0
  pump->waiting_output = FALSE;
97
0
  if (pump->completed) {
98
0
    pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
99
0
    return 1;
100
0
  }
101
102
0
  if (pump->input->blocking)
103
0
    iostream_pump_copy(pump);
104
0
  else if (pump->io == NULL) {
105
0
    pump->io = io_add_istream(pump->input,
106
0
            iostream_pump_copy, pump);
107
0
    io_set_pending(pump->io);
108
0
  }
109
0
  return ret;
110
0
}
111
112
struct iostream_pump *
113
iostream_pump_create(struct istream *input, struct ostream *output)
114
0
{
115
0
  struct iostream_pump *pump;
116
117
0
  i_assert(input != NULL &&
118
0
     output != NULL);
119
0
  i_assert(!input->blocking || !output->blocking);
120
121
  /* ref streams */
122
0
  i_stream_ref(input);
123
0
  o_stream_ref(output);
124
125
  /* create pump */
126
0
  pump = i_new(struct iostream_pump, 1);
127
0
  pump->refcount = 1;
128
0
  pump->input = input;
129
0
  pump->output = output;
130
131
0
  return pump;
132
0
}
133
134
void iostream_pump_start(struct iostream_pump *pump)
135
0
{
136
0
  i_assert(pump != NULL);
137
0
  i_assert(pump->callback != NULL);
138
139
  /* add flush handler */
140
0
  if (!pump->output->blocking) {
141
0
    o_stream_set_flush_callback(pump->output,
142
0
              iostream_pump_flush, pump);
143
0
  }
144
145
  /* make IO objects */
146
0
  if (pump->input->blocking) {
147
0
    i_assert(!pump->output->blocking);
148
0
    o_stream_set_flush_pending(pump->output, TRUE);
149
0
  } else {
150
0
    pump->io = io_add_istream(pump->input,
151
0
            iostream_pump_copy, pump);
152
0
    io_set_pending(pump->io);
153
0
  }
154
0
}
155
156
struct istream *iostream_pump_get_input(struct iostream_pump *pump)
157
0
{
158
0
  i_assert(pump != NULL);
159
0
  return pump->input;
160
0
}
161
162
struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
163
0
{
164
0
  i_assert(pump != NULL);
165
0
  return pump->output;
166
0
}
167
168
void iostream_pump_set_completion_callback(struct iostream_pump *pump,
169
             iostream_pump_callback_t *callback,
170
             void *context)
171
0
{
172
0
  i_assert(pump != NULL);
173
0
  pump->callback = callback;
174
0
  pump->context = context;
175
0
}
176
177
void iostream_pump_ref(struct iostream_pump *pump)
178
0
{
179
0
  i_assert(pump != NULL);
180
0
  i_assert(pump->refcount > 0);
181
0
  pump->refcount++;
182
0
}
183
184
void iostream_pump_unref(struct iostream_pump **_pump)
185
0
{
186
0
  i_assert(_pump != NULL);
187
0
  struct iostream_pump *pump = *_pump;
188
189
0
  if (pump == NULL)
190
0
    return;
191
192
0
  i_assert(pump->refcount > 0);
193
194
0
  *_pump = NULL;
195
196
0
  if (--pump->refcount > 0)
197
0
    return;
198
199
0
  iostream_pump_stop(pump);
200
201
0
  o_stream_unref(&pump->output);
202
0
  i_stream_unref(&pump->input);
203
0
  i_free(pump);
204
0
}
205
206
void iostream_pump_destroy(struct iostream_pump **_pump)
207
1.08k
{
208
1.08k
  i_assert(_pump != NULL);
209
1.08k
  struct iostream_pump *pump = *_pump;
210
211
1.08k
  if (pump == NULL)
212
1.08k
    return;
213
214
0
  *_pump = NULL;
215
216
0
  iostream_pump_stop(pump);
217
0
  o_stream_unref(&pump->output);
218
0
  i_stream_unref(&pump->input);
219
220
0
  iostream_pump_unref(&pump);
221
0
}
222
223
void iostream_pump_stop(struct iostream_pump *pump)
224
0
{
225
0
  i_assert(pump != NULL);
226
227
0
  if (pump->output != NULL)
228
0
    o_stream_unset_flush_callback(pump->output);
229
230
0
  io_remove(&pump->io);
231
0
}
232
233
bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
234
0
{
235
0
  return pump->waiting_output;
236
0
}
237
238
void iostream_pump_switch_ioloop_to(struct iostream_pump *pump,
239
            struct ioloop *ioloop)
240
0
{
241
0
  i_assert(pump != NULL);
242
0
  if (pump->io != NULL)
243
0
    pump->io = io_loop_move_io_to(ioloop, &pump->io);
244
0
  o_stream_switch_ioloop_to(pump->output, ioloop);
245
0
  i_stream_switch_ioloop_to(pump->input, ioloop);
246
0
}
247
248
void iostream_pump_switch_ioloop(struct iostream_pump *pump)
249
0
{
250
0
  iostream_pump_switch_ioloop_to(pump, current_ioloop);
251
0
}