Coverage Report

Created: 2025-09-04 07:51

/src/fluent-bit/lib/monkey/mk_server/mk_stream.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Monkey HTTP Server
4
 *  ==================
5
 *  Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <monkey/monkey.h>
21
#include <monkey/mk_stream.h>
22
#include <assert.h>
23
24
/* Create a new channel */
25
struct mk_channel *mk_channel_new(int type, int fd)
26
0
{
27
0
    struct mk_channel *channel;
28
29
0
    channel = mk_mem_alloc(sizeof(struct mk_channel));
30
0
    if (!channel) {
31
0
        return NULL;
32
0
    }
33
0
    channel->type   = type;
34
0
    channel->fd     = fd;
35
0
    channel->status = MK_CHANNEL_OK;
36
0
    mk_list_init(&channel->streams);
37
38
0
    return channel;
39
0
}
40
41
int mk_channel_release(struct mk_channel *channel)
42
0
{
43
0
    mk_mem_free(channel);
44
0
    return 0;
45
0
}
46
47
static inline size_t channel_write_in_file(struct mk_channel *channel,
48
                                           struct mk_stream_input *in)
49
0
{
50
0
    ssize_t bytes = 0;
51
52
0
    MK_TRACE("[CH %i] STREAM_FILE [fd=%i], bytes=%lu",
53
0
             channel->fd, in->fd, in->bytes_total);
54
55
    /* Direct write */
56
0
    bytes = mk_sched_conn_sendfile(channel,
57
0
                                   in->fd,
58
0
                                   &in->bytes_offset,
59
0
                                   in->bytes_total
60
0
                                   );
61
0
    MK_TRACE("[CH=%d] [FD=%i] WRITE STREAM FILE: %lu bytes",
62
0
             channel->fd, in->fd, bytes);
63
64
0
    return bytes;
65
0
}
66
67
size_t mk_stream_size(struct mk_stream *stream)
68
0
{
69
0
    return (stream->bytes_total - stream->bytes_offset);
70
0
}
71
72
/*
73
 * It 'intent' to write a few streams over the channel and alter the
74
 * channel notification side if required: READ -> WRITE.
75
 */
76
int mk_channel_flush(struct mk_channel *channel)
77
0
{
78
0
    int ret = 0;
79
0
    size_t count = 0;
80
0
    size_t total = 0;
81
0
    uint32_t stop = (MK_CHANNEL_DONE | MK_CHANNEL_ERROR | MK_CHANNEL_EMPTY);
82
83
0
    do {
84
0
        ret = mk_channel_write(channel, &count);
85
0
        total += count;
86
87
#ifdef MK_HAVE_TRACE
88
        MK_TRACE("Channel flush: %d bytes", count);
89
        if (ret & MK_CHANNEL_DONE) {
90
            MK_TRACE("Channel was empty");
91
        }
92
        if (ret & MK_CHANNEL_ERROR) {
93
            MK_TRACE("Channel error");
94
        }
95
        if (ret & MK_CHANNEL_EMPTY) {
96
            MK_TRACE("Channel empty");
97
        }
98
#endif
99
0
    } while (total <= 4096 && ((ret & stop) == 0));
100
101
0
    if (ret == MK_CHANNEL_DONE) {
102
0
        MK_TRACE("Channel done");
103
0
        return ret;
104
0
    }
105
0
    else if (ret & (MK_CHANNEL_FLUSH | MK_CHANNEL_BUSY)) {
106
0
        MK_TRACE("Channel FLUSH | BUSY");
107
0
        if ((channel->event->mask & MK_EVENT_WRITE) == 0) {
108
0
            mk_event_add(mk_sched_loop(),
109
0
                         channel->fd,
110
0
                         MK_EVENT_CONNECTION,
111
0
                         MK_EVENT_WRITE,
112
0
                         channel->event);
113
0
        }
114
0
    }
115
116
0
    return ret;
117
0
}
118
119
int mk_stream_in_release(struct mk_stream_input *in)
120
0
{
121
0
    if (in->cb_finished) {
122
0
        in->cb_finished(in);
123
0
    }
124
125
0
    mk_stream_input_unlink(in);
126
0
    if (in->dynamic == MK_TRUE) {
127
0
        mk_mem_free(in);
128
0
    }
129
130
0
    return 0;
131
0
}
132
133
int mk_channel_stream_write(struct mk_stream *stream, size_t *count)
134
0
{
135
0
    ssize_t bytes = 0;
136
0
    struct mk_iov *iov;
137
0
    struct mk_list *tmp;
138
0
    struct mk_list *head;
139
0
    struct mk_channel *channel;
140
0
    struct mk_stream_input *input;
141
142
0
    channel = stream->channel;
143
144
    /* Validate channel status */
145
0
    if (channel->status != MK_CHANNEL_OK) {
146
0
        return -MK_CHANNEL_ERROR;
147
0
    }
148
149
    /* Iterate inputs and process stream */
150
0
    mk_list_foreach_safe(head, tmp, &stream->inputs) {
151
0
        input = mk_list_entry(head, struct mk_stream_input, _head);
152
0
        if (input->type == MK_STREAM_FILE) {
153
0
            bytes = channel_write_in_file(channel, input);
154
0
        }
155
0
        else if (input->type == MK_STREAM_IOV) {
156
0
            iov = input->buffer;
157
0
            if (!iov) {
158
0
                return MK_CHANNEL_EMPTY;
159
0
            }
160
161
0
            bytes = mk_sched_conn_writev(channel, iov);
162
163
0
            MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
164
0
                     channel->fd, bytes);
165
0
            if (bytes > 0) {
166
                /* Perform the adjustment on mk_iov */
167
0
                mk_iov_consume(iov, bytes);
168
0
            }
169
0
        }
170
0
        else if (input->type == MK_STREAM_RAW) {
171
0
            bytes = mk_sched_conn_write(channel,
172
0
                                        input->buffer, input->bytes_total);
173
0
            MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu\n",
174
0
                     channel->fd, bytes, input->bytes_total);
175
0
        }
176
177
0
        if (bytes > 0) {
178
0
            *count = bytes;
179
0
            mk_stream_input_consume(input, bytes);
180
181
            /* notification callback, optional */
182
0
            if (stream->cb_bytes_consumed) {
183
0
                stream->cb_bytes_consumed(stream, bytes);
184
0
            }
185
186
0
            if (input->cb_consumed) {
187
0
                input->cb_consumed(input, bytes);
188
0
            }
189
190
0
            if (input->bytes_total == 0) {
191
0
                MK_TRACE("Input done, unlinking (channel=%p)", channel);
192
0
                mk_stream_in_release(input);
193
0
            }
194
0
            MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
195
0
        }
196
0
        else if (bytes < 0) {
197
0
            mk_stream_in_release(input);
198
0
            return -MK_CHANNEL_ERROR;
199
0
        }
200
0
        else if (bytes == 0) {
201
0
            mk_stream_in_release(input);
202
0
            return -MK_CHANNEL_ERROR;
203
0
        }
204
0
    }
205
206
0
    return bytes;
207
0
}
208
209
/* It perform a direct stream I/O write through the network layer */
210
int mk_channel_write(struct mk_channel *channel, size_t *count)
211
0
{
212
0
    ssize_t bytes = -1;
213
0
    struct mk_iov *iov;
214
0
    struct mk_stream *stream = NULL;
215
0
    struct mk_stream_input *input;
216
217
0
    errno = 0;
218
219
0
    if (mk_list_is_empty(&channel->streams) == 0) {
220
0
        MK_TRACE("[CH %i] CHANNEL_EMPTY", channel->fd);
221
0
        return MK_CHANNEL_EMPTY;
222
0
    }
223
224
    /* Get the input source */
225
0
    stream = mk_list_entry_first(&channel->streams, struct mk_stream, _head);
226
0
    if (mk_list_is_empty(&stream->inputs) == 0) {
227
0
        return MK_CHANNEL_EMPTY;
228
0
    }
229
0
    input = mk_list_entry_first(&stream->inputs, struct mk_stream_input, _head);
230
231
    /*
232
     * Based on the Stream Input type we consume on that way, not all inputs
233
     * requires to read from buffer, e.g: Static File, Pipes.
234
     */
235
0
    if (channel->type == MK_CHANNEL_SOCKET) {
236
0
        if (input->type == MK_STREAM_FILE) {
237
0
            bytes = channel_write_in_file(channel, input);
238
0
        }
239
0
        else if (input->type == MK_STREAM_IOV) {
240
0
            iov   = input->buffer;
241
0
            if (!iov) {
242
0
                return MK_CHANNEL_EMPTY;
243
0
            }
244
245
0
            bytes = mk_sched_conn_writev(channel, iov);
246
247
0
            MK_TRACE("[CH %i] STREAM_IOV, wrote %d bytes",
248
0
                     channel->fd, bytes);
249
0
            if (bytes > 0) {
250
                /* Perform the adjustment on mk_iov */
251
0
                mk_iov_consume(iov, bytes);
252
0
            }
253
0
        }
254
0
        else if (input->type == MK_STREAM_RAW) {
255
0
            bytes = mk_sched_conn_write(channel,
256
0
                                        input->buffer, input->bytes_total);
257
0
            MK_TRACE("[CH %i] STREAM_RAW, bytes=%lu/%lu",
258
0
                     channel->fd, bytes, input->bytes_total);
259
0
            if (bytes > 0) {
260
                /* DEPRECATED: consume_raw(input, bytes); */
261
0
            }
262
0
        }
263
264
0
        if (bytes > 0) {
265
0
            *count = bytes;
266
0
            mk_stream_input_consume(input, bytes);
267
268
            /* notification callback, optional */
269
0
            if (stream->cb_bytes_consumed) {
270
0
                stream->cb_bytes_consumed(stream, bytes);
271
0
            }
272
273
0
            if (input->cb_consumed) {
274
0
                input->cb_consumed(input, bytes);
275
0
            }
276
277
0
            if (input->bytes_total == 0) {
278
0
                MK_TRACE("Input done, unlinking (channel=%p)", channel);
279
0
                mk_stream_in_release(input);
280
0
            }
281
282
0
            if (mk_list_is_empty(&stream->inputs) == 0) {
283
                /* Everytime the stream is empty, we notify the trigger the cb */
284
0
                if (stream->cb_finished) {
285
0
                    stream->cb_finished(stream);
286
0
                }
287
288
0
                if (mk_channel_is_empty(channel) == 0) {
289
0
                    MK_TRACE("[CH %i] CHANNEL_DONE", channel->fd);
290
0
                    return MK_CHANNEL_DONE;
291
0
                }
292
0
                else {
293
0
                    MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
294
0
                    return MK_CHANNEL_FLUSH;
295
0
                }
296
0
            }
297
298
0
            MK_TRACE("[CH %i] CHANNEL_FLUSH", channel->fd);
299
0
            return MK_CHANNEL_FLUSH;
300
0
        }
301
0
        else if (bytes < 0) {
302
0
            if (errno == EAGAIN) {
303
0
                return MK_CHANNEL_BUSY;
304
0
            }
305
306
0
            mk_stream_in_release(input);
307
0
            return MK_CHANNEL_ERROR;
308
0
        }
309
0
        else if (bytes == 0) {
310
0
            mk_stream_in_release(input);
311
0
            return MK_CHANNEL_ERROR;
312
0
        }
313
0
    }
314
315
0
    return MK_CHANNEL_ERROR;
316
0
}
317
318
/* Remove any dynamic memory associated */
319
int mk_channel_clean(struct mk_channel *channel)
320
0
{
321
0
    struct mk_list *tmp;
322
0
    struct mk_list *tmp_in;
323
0
    struct mk_list *head;
324
0
    struct mk_list *head_in;
325
0
    struct mk_stream *stream;
326
0
    struct mk_stream_input *in;
327
328
0
    mk_list_foreach_safe(head, tmp, &channel->streams) {
329
0
        stream = mk_list_entry(head, struct mk_stream, _head);
330
0
        mk_list_foreach_safe(head_in, tmp_in, &stream->inputs) {
331
0
            in = mk_list_entry(head_in, struct mk_stream_input, _head);
332
0
            mk_stream_in_release(in);
333
0
        }
334
0
        mk_stream_release(stream);
335
0
    }
336
337
0
    return 0;
338
0
}