Coverage Report

Created: 2025-08-03 06:27

/src/dovecot/src/lib/iostream-pump.c
Line
Count
Source (jump to first uncovered line)
1
/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */
2
3
#include "lib.h"
4
#include "buffer.h"
5
#include "str.h"
6
#include "iostream-pump.h"
7
#include "istream.h"
8
#include "ostream.h"
9
#include <unistd.h>
10
11
#undef iostream_pump_set_completion_callback
12
13
struct iostream_pump {
14
  int refcount;
15
16
  struct istream *input;
17
  struct ostream *output;
18
19
  struct io *io;
20
21
  iostream_pump_callback_t *callback;
22
  void *context;
23
24
  bool waiting_output;
25
  bool completed;
26
};
27
28
static void iostream_pump_copy(struct iostream_pump *pump)
29
6.63k
{
30
6.63k
  enum ostream_send_istream_result res;
31
6.63k
  size_t old_size;
32
33
6.63k
  o_stream_cork(pump->output);
34
6.63k
  old_size = o_stream_get_max_buffer_size(pump->output);
35
6.63k
  o_stream_set_max_buffer_size(pump->output,
36
6.63k
    I_MIN(IO_BLOCK_SIZE,
37
6.63k
          o_stream_get_max_buffer_size(pump->output)));
38
6.63k
  res = o_stream_send_istream(pump->output, pump->input);
39
6.63k
  o_stream_set_max_buffer_size(pump->output, old_size);
40
6.63k
  o_stream_uncork(pump->output);
41
42
6.63k
  switch(res) {
43
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
44
0
    io_remove(&pump->io);
45
0
    pump->callback(IOSTREAM_PUMP_STATUS_INPUT_ERROR,
46
0
             pump->context);
47
0
    return;
48
0
  case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
49
0
    io_remove(&pump->io);
50
0
    pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
51
0
             pump->context);
52
0
    return;
53
288
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
54
288
    i_assert(!pump->output->blocking);
55
288
    pump->waiting_output = TRUE;
56
288
    io_remove(&pump->io);
57
288
    return;
58
6.35k
  case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
59
6.35k
    pump->waiting_output = FALSE;
60
6.35k
    io_remove(&pump->io);
61
    /* flush it */
62
6.35k
    switch (o_stream_finish(pump->output)) {
63
0
    case -1:
64
0
      pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
65
0
               pump->context);
66
0
      break;
67
81
    case 0:
68
81
      pump->waiting_output = TRUE;
69
81
      pump->completed = TRUE;
70
81
      break;
71
6.27k
    default:
72
6.27k
      pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF,
73
6.27k
               pump->context);
74
6.27k
      break;
75
6.35k
    }
76
6.35k
    return;
77
6.35k
  case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
78
0
    i_assert(!pump->input->blocking);
79
0
    pump->waiting_output = FALSE;
80
0
    return;
81
6.63k
  }
82
6.63k
  i_unreached();
83
6.63k
}
84
85
static int iostream_pump_flush(struct iostream_pump *pump)
86
6.70k
{
87
6.70k
  int ret;
88
89
6.70k
  if ((ret = o_stream_flush(pump->output)) <= 0) {
90
0
    if (ret < 0) {
91
0
      pump->callback(IOSTREAM_PUMP_STATUS_OUTPUT_ERROR,
92
0
               pump->context);
93
0
    }
94
0
    return ret;
95
0
  }
96
6.70k
  pump->waiting_output = FALSE;
97
6.70k
  if (pump->completed) {
98
70
    pump->callback(IOSTREAM_PUMP_STATUS_INPUT_EOF, pump->context);
99
70
    return 1;
100
70
  }
101
102
6.63k
  if (pump->input->blocking)
103
6.63k
    iostream_pump_copy(pump);
104
0
  else if (pump->io == NULL) {
105
0
    pump->io = io_add_istream(pump->input,
106
0
            iostream_pump_copy, pump);
107
0
    io_set_pending(pump->io);
108
0
  }
109
6.63k
  return ret;
110
6.70k
}
111
112
struct iostream_pump *
113
iostream_pump_create(struct istream *input, struct ostream *output)
114
6.37k
{
115
6.37k
  struct iostream_pump *pump;
116
117
6.37k
  i_assert(input != NULL &&
118
6.37k
     output != NULL);
119
6.37k
  i_assert(!input->blocking || !output->blocking);
120
121
  /* ref streams */
122
6.37k
  i_stream_ref(input);
123
6.37k
  o_stream_ref(output);
124
125
  /* create pump */
126
6.37k
  pump = i_new(struct iostream_pump, 1);
127
6.37k
  pump->refcount = 1;
128
6.37k
  pump->input = input;
129
6.37k
  pump->output = output;
130
131
6.37k
  return pump;
132
6.37k
}
133
134
void iostream_pump_start(struct iostream_pump *pump)
135
6.37k
{
136
6.37k
  i_assert(pump != NULL);
137
6.37k
  i_assert(pump->callback != NULL);
138
139
  /* add flush handler */
140
6.37k
  if (!pump->output->blocking) {
141
6.37k
    o_stream_set_flush_callback(pump->output,
142
6.37k
              iostream_pump_flush, pump);
143
6.37k
  }
144
145
  /* make IO objects */
146
6.37k
  if (pump->input->blocking) {
147
6.37k
    i_assert(!pump->output->blocking);
148
6.37k
    o_stream_set_flush_pending(pump->output, TRUE);
149
6.37k
  } else {
150
0
    pump->io = io_add_istream(pump->input,
151
0
            iostream_pump_copy, pump);
152
0
    io_set_pending(pump->io);
153
0
  }
154
6.37k
}
155
156
struct istream *iostream_pump_get_input(struct iostream_pump *pump)
157
6.34k
{
158
6.34k
  i_assert(pump != NULL);
159
6.34k
  return pump->input;
160
6.34k
}
161
162
struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
163
6.34k
{
164
6.34k
  i_assert(pump != NULL);
165
6.34k
  return pump->output;
166
6.34k
}
167
168
void iostream_pump_set_completion_callback(struct iostream_pump *pump,
169
             iostream_pump_callback_t *callback,
170
             void *context)
171
6.37k
{
172
6.37k
  i_assert(pump != NULL);
173
6.37k
  pump->callback = callback;
174
6.37k
  pump->context = context;
175
6.37k
}
176
177
void iostream_pump_ref(struct iostream_pump *pump)
178
0
{
179
0
  i_assert(pump != NULL);
180
0
  i_assert(pump->refcount > 0);
181
0
  pump->refcount++;
182
0
}
183
184
void iostream_pump_unref(struct iostream_pump **_pump)
185
6.37k
{
186
6.37k
  i_assert(_pump != NULL);
187
6.37k
  struct iostream_pump *pump = *_pump;
188
189
6.37k
  if (pump == NULL)
190
0
    return;
191
192
6.37k
  i_assert(pump->refcount > 0);
193
194
6.37k
  *_pump = NULL;
195
196
6.37k
  if (--pump->refcount > 0)
197
0
    return;
198
199
6.37k
  iostream_pump_stop(pump);
200
201
6.37k
  o_stream_unref(&pump->output);
202
6.37k
  i_stream_unref(&pump->input);
203
6.37k
  i_free(pump);
204
6.37k
}
205
206
void iostream_pump_destroy(struct iostream_pump **_pump)
207
26.9k
{
208
26.9k
  i_assert(_pump != NULL);
209
26.9k
  struct iostream_pump *pump = *_pump;
210
211
26.9k
  if (pump == NULL)
212
20.5k
    return;
213
214
6.37k
  *_pump = NULL;
215
216
6.37k
  iostream_pump_stop(pump);
217
6.37k
  o_stream_unref(&pump->output);
218
6.37k
  i_stream_unref(&pump->input);
219
220
6.37k
  iostream_pump_unref(&pump);
221
6.37k
}
222
223
void iostream_pump_stop(struct iostream_pump *pump)
224
12.7k
{
225
12.7k
  i_assert(pump != NULL);
226
227
12.7k
  if (pump->output != NULL)
228
6.37k
    o_stream_unset_flush_callback(pump->output);
229
230
12.7k
  io_remove(&pump->io);
231
12.7k
}
232
233
bool iostream_pump_is_waiting_output(struct iostream_pump *pump)
234
0
{
235
0
  return pump->waiting_output;
236
0
}
237
238
void iostream_pump_switch_ioloop_to(struct iostream_pump *pump,
239
            struct ioloop *ioloop)
240
0
{
241
0
  i_assert(pump != NULL);
242
0
  if (pump->io != NULL)
243
0
    pump->io = io_loop_move_io_to(ioloop, &pump->io);
244
0
  o_stream_switch_ioloop_to(pump->output, ioloop);
245
0
  i_stream_switch_ioloop_to(pump->input, ioloop);
246
0
}
247
248
void iostream_pump_switch_ioloop(struct iostream_pump *pump)
249
0
{
250
0
  iostream_pump_switch_ioloop_to(pump, current_ioloop);
251
0
}