Coverage Report

Created: 2023-03-26 07:01

/src/fluent-bit/lib/monkey/mk_server/mk_fifo.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Monkey HTTP Server
4
 *  ==================
5
 *  Copyright 2001-2017 Eduardo Silva <eduardo@monkey.io>
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <monkey/mk_fifo.h>
21
#include <monkey/mk_scheduler.h>
22
23
#ifdef _WIN32
24
#include <event.h>
25
#endif
26
27
static struct mk_fifo_worker *mk_fifo_worker_create(struct mk_fifo *ctx,
28
                                                    void *data)
29
0
{
30
0
    int id;
31
0
    int ret;
32
0
    struct mk_fifo_worker *fw;
33
34
    /* Get an ID */
35
0
    id = mk_list_size(&ctx->workers);
36
37
0
    fw = mk_mem_alloc(sizeof(struct mk_fifo_worker));
38
0
    if (!fw) {
39
0
        perror("malloc");
40
0
        return NULL;
41
0
    }
42
0
    MK_EVENT_NEW(&fw->event);
43
44
0
    fw->worker_id = id;
45
0
    fw->data = data;
46
0
    fw->fifo = ctx;
47
48
0
    fw->buf_data = mk_mem_alloc(MK_FIFO_BUF_SIZE);
49
0
    if (!fw->buf_data) {
50
0
        perror("malloc");
51
0
        mk_mem_free(fw);
52
0
        return NULL;
53
0
    }
54
0
    fw->buf_len = 0;
55
0
    fw->buf_size = MK_FIFO_BUF_SIZE;
56
57
#ifdef _WIN32
58
    ret = evutil_socketpair(AF_INET, SOCK_STREAM, 0, fw->channel);
59
    if (ret == -1) {
60
        perror("socketpair");
61
        mk_mem_free(fw);
62
        return NULL;
63
    }
64
#else
65
0
    ret = pipe(fw->channel);
66
0
    if (ret == -1) {
67
0
        perror("pipe");
68
0
        mk_mem_free(fw);
69
0
        return NULL;
70
0
    }
71
0
#endif
72
73
0
    mk_list_add(&fw->_head, &ctx->workers);
74
0
    return fw;
75
0
}
76
77
/*
78
 * Function used as a callback triggered by mk_worker_callback() or
79
 * through a mk_sched_worker_cb_add(). It purpose is to prepare the
80
 * channels on the final worker thread so it can consume pushed
81
 * messages.
82
 */
83
void mk_fifo_worker_setup(void *data)
84
0
{
85
0
    struct mk_fifo_worker *mw = NULL;
86
0
    struct mk_fifo *ctx = data;
87
88
0
    pthread_mutex_lock(&ctx->mutex_init);
89
90
0
    mw = mk_fifo_worker_create(ctx, data);
91
0
    if (!mw) {
92
0
        mk_err("[msg] error configuring msg-worker context ");
93
0
        pthread_mutex_unlock(&ctx->mutex_init);
94
0
        return;
95
0
    }
96
97
    /* Make the current worker context available */
98
0
    pthread_setspecific(*ctx->key, mw);
99
0
    pthread_mutex_unlock(&ctx->mutex_init);
100
0
}
101
102
struct mk_fifo *mk_fifo_create(pthread_key_t *key, void *data)
103
0
{
104
0
    struct mk_fifo *ctx;
105
106
0
    ctx = mk_mem_alloc(sizeof(struct mk_fifo));
107
0
    if (!ctx) {
108
0
        perror("malloc");
109
0
        return NULL;
110
0
    }
111
0
    ctx->data = data;
112
113
    /* Lists */
114
0
    mk_list_init(&ctx->queues);
115
0
    mk_list_init(&ctx->workers);
116
117
118
    /* Pthread specifics */
119
120
    /* We need to isolate this because there is a key that's shared between monkey
121
     * instances by design.
122
     */
123
0
    if (key != NULL) {
124
0
        ctx->key = key;
125
0
        pthread_key_create(ctx->key, NULL);
126
0
    }
127
128
0
    pthread_mutex_init(&ctx->mutex_init, NULL);
129
130
0
    return ctx;
131
0
}
132
133
int mk_fifo_queue_create(struct mk_fifo *ctx, char *name,
134
                         void (*cb)(struct mk_fifo_queue *, void *,
135
                                    size_t, void *),
136
                         void *data)
137
138
0
{
139
0
    int id = -1;
140
0
    int len;
141
0
    struct mk_list *head;
142
0
    struct mk_fifo_queue *q;
143
144
    /* Get ID for the new queue */
145
0
    if (mk_list_is_empty(&ctx->queues) == 0) {
146
0
        id = 0;
147
0
    }
148
0
    else {
149
0
        q = mk_list_entry_last(&ctx->queues, struct mk_fifo_queue, _head);
150
0
        id = q->id + 1;
151
0
    }
152
153
    /* queue name might need to be truncated if is too long */
154
0
    len = strlen(name);
155
0
    if (len > (int) sizeof(q->name) - 1) {
156
0
        len = sizeof(q->name) - 1;
157
0
    }
158
159
    /* Validate that name is not a duplicated */
160
0
    mk_list_foreach(head, &ctx->queues) {
161
0
        q = mk_list_entry(head, struct mk_fifo_queue, _head);
162
0
        if (strlen(q->name) != (unsigned int) len) {
163
0
            continue;
164
0
        }
165
166
0
        if (strncmp(q->name, name, len) == 0) {
167
0
            return -1;
168
0
        }
169
0
    }
170
171
    /* Allocate and register queue */
172
0
    q = mk_mem_alloc(sizeof(struct mk_fifo_queue));
173
0
    if (!q) {
174
0
        perror("malloc");
175
0
        return -1;
176
0
    }
177
0
    q->id = id;
178
0
    q->cb_message = cb;
179
0
    q->data = data;
180
181
0
    strncpy(q->name, name, len);
182
0
    q->name[len] = '\0';
183
0
    mk_list_add(&q->_head, &ctx->queues);
184
185
0
    return id;
186
0
}
187
188
struct mk_fifo_queue *mk_fifo_queue_get(struct mk_fifo *ctx, int id)
189
0
{
190
0
    struct mk_list *head;
191
0
    struct mk_fifo_queue *q = NULL;
192
193
0
    mk_list_foreach(head, &ctx->queues) {
194
0
        q = mk_list_entry(head, struct mk_fifo_queue, _head);
195
0
        if (q->id == id) {
196
0
            return q;
197
0
        }
198
0
    }
199
200
0
    return NULL;
201
0
}
202
203
int mk_fifo_queue_destroy(struct mk_fifo *ctx, struct mk_fifo_queue *q)
204
0
{
205
0
    (void) ctx;
206
207
0
    mk_list_del(&q->_head);
208
0
    mk_mem_free(q);
209
0
    return 0;
210
0
}
211
212
int mk_fifo_queue_id_destroy(struct mk_fifo *ctx, int id)
213
0
{
214
0
    struct mk_fifo_queue *q;
215
216
0
    q = mk_fifo_queue_get(ctx, id);
217
0
    if (!q) {
218
0
        return -1;
219
0
    }
220
221
0
    mk_fifo_queue_destroy(ctx, q);
222
0
    return 0;
223
0
}
224
225
static int mk_fifo_queue_destroy_all(struct mk_fifo *ctx)
226
0
{
227
0
    int c = 0;
228
0
    struct mk_list *tmp;
229
0
    struct mk_list *head;
230
0
    struct mk_fifo_queue *q;
231
232
0
    mk_list_foreach_safe(head, tmp, &ctx->queues) {
233
0
        q = mk_list_entry(head, struct mk_fifo_queue, _head);
234
0
        mk_fifo_queue_destroy(ctx, q);
235
0
        c++;
236
0
    }
237
238
0
    return c;
239
0
}
240
241
static int mk_fifo_worker_destroy_all(struct mk_fifo *ctx)
242
0
{
243
0
    int c = 0;
244
0
    struct mk_list *tmp;
245
0
    struct mk_list *head;
246
0
    struct mk_fifo_worker *fw;
247
248
0
    mk_list_foreach_safe(head, tmp, &ctx->workers) {
249
0
        fw = mk_list_entry(head, struct mk_fifo_worker, _head);
250
251
#ifdef _WIN32
252
        evutil_closesocket(fw->channel[0]);
253
        evutil_closesocket(fw->channel[1]);
254
#else
255
0
        close(fw->channel[0]);
256
0
        close(fw->channel[1]);
257
0
#endif
258
0
        mk_list_del(&fw->_head);
259
0
        mk_mem_free(fw->buf_data);
260
0
        mk_mem_free(fw);
261
0
        c++;
262
0
    }
263
264
0
    return c;
265
0
}
266
267
static int msg_write(int fd, void *buf, size_t count)
268
0
{
269
0
    ssize_t bytes;
270
0
    size_t total = 0;
271
272
0
    do {
273
#ifdef _WIN32
274
        bytes = send(fd, (uint8_t *)buf + total, count - total, 0);
275
#else
276
0
        bytes = write(fd, (uint8_t *)buf + total, count - total);
277
0
#endif
278
0
        if (bytes == -1) {
279
0
            if (errno == EAGAIN) {
280
                /*
281
                 * This could happen, since this function goal is not to
282
                 * return until all data have been read, just sleep a little
283
                 * bit (0.05 seconds)
284
                 */
285
286
#ifdef _WIN32
287
                Sleep(5);
288
#else
289
0
                usleep(50000);
290
0
#endif
291
0
                continue;
292
0
            }
293
0
        }
294
0
        else if (bytes == 0) {
295
            /* Broken pipe ? */
296
0
            perror("write");
297
0
            return -1;
298
0
        }
299
0
        total += bytes;
300
301
0
    } while (total < count);
302
303
0
    return total;
304
0
}
305
306
/*
307
 * Push a message into a queue: this function runs from the parent thread
308
 * so it needs to write the message to every thread pipe channel.
309
 */
310
int mk_fifo_send(struct mk_fifo *ctx, int id, void *data, size_t size)
311
0
{
312
0
    int ret;
313
0
    struct mk_list *head;
314
0
    struct mk_fifo_msg msg;
315
0
    struct mk_fifo_queue *q;
316
0
    struct mk_fifo_worker *fw;
317
318
    /* Validate queue ID */
319
0
    q = mk_fifo_queue_get(ctx, id);
320
0
    if (!q) {
321
0
        return -1;
322
0
    }
323
324
0
    pthread_mutex_lock(&ctx->mutex_init);
325
326
0
    mk_list_foreach(head, &ctx->workers) {
327
0
        fw = mk_list_entry(head, struct mk_fifo_worker, _head);
328
329
0
        msg.length = size;
330
0
        msg.flags = 0;
331
0
        msg.queue_id = (uint16_t) id;
332
333
0
        ret = msg_write(fw->channel[1], &msg, sizeof(struct mk_fifo_msg));
334
0
        if (ret == -1) {
335
0
            pthread_mutex_unlock(&ctx->mutex_init);
336
0
            perror("write");
337
0
            fprintf(stderr, "[msg] error writing message header\n");
338
0
            return -1;
339
0
        }
340
341
0
        ret = msg_write(fw->channel[1], data, size);
342
0
        if (ret == -1) {
343
0
            pthread_mutex_unlock(&ctx->mutex_init);
344
0
            perror("write");
345
0
            fprintf(stderr, "[msg] error writing message body\n");
346
0
            return -1;
347
0
        }
348
0
    }
349
350
0
    pthread_mutex_unlock(&ctx->mutex_init);
351
352
0
    return 0;
353
0
}
354
355
static inline void consume_bytes(char *buf, int bytes, int length)
356
0
{
357
0
    memmove(buf, buf + bytes, length - bytes);
358
0
}
359
360
static inline int fifo_drop_msg(struct mk_fifo_worker *fw)
361
0
{
362
0
    size_t drop_bytes;
363
0
    struct mk_fifo_msg *msg;
364
365
0
    msg = (struct mk_fifo_msg *) fw->buf_data;
366
0
    drop_bytes = (sizeof(struct mk_fifo_msg) + msg->length);
367
0
    consume_bytes(fw->buf_data, drop_bytes, fw->buf_len);
368
0
    fw->buf_len -= drop_bytes;
369
370
0
    return 0;
371
0
}
372
373
static inline int fifo_is_msg_ready(struct mk_fifo_worker *fw)
374
0
{
375
0
    struct mk_fifo_msg *msg;
376
377
0
    msg = (struct mk_fifo_msg *) fw->buf_data;
378
0
    if (fw->buf_len >= (msg->length + sizeof(struct mk_fifo_msg))) {
379
0
        return MK_TRUE;
380
0
    }
381
382
0
    return MK_FALSE;
383
0
}
384
385
int mk_fifo_worker_read(void *event)
386
0
{
387
0
    int available;
388
0
    char *tmp;
389
0
    size_t size;
390
0
    ssize_t bytes;
391
0
    struct mk_fifo_msg *fm;
392
0
    struct mk_fifo_worker *fw;
393
0
    struct mk_fifo_queue *fq;
394
395
0
    fw = (struct mk_fifo_worker *) event;
396
397
    /* Check available space */
398
0
    available = fw->buf_size - fw->buf_len;
399
0
    if (available <= 1) {
400
0
        size = fw->buf_size + (MK_FIFO_BUF_SIZE / 2);
401
0
        tmp = mk_mem_realloc(fw->buf_data, size);
402
0
        if (!tmp) {
403
0
            perror("realloc");
404
0
            return -1;
405
0
        }
406
0
        fw->buf_data = tmp;
407
0
        fw->buf_size = size;
408
0
        available = fw->buf_size - fw->buf_len;
409
0
    }
410
411
    /* Read data from pipe */
412
#ifdef _WIN32
413
    bytes = recv(fw->channel[0], fw->buf_data + fw->buf_len, available, 0);
414
#else
415
0
    bytes = read(fw->channel[0], fw->buf_data + fw->buf_len, available);
416
0
#endif
417
418
0
    if (bytes == 0) {
419
0
        return -1;
420
0
    }
421
0
    else if (bytes == -1){
422
0
        perror("read");
423
0
        return -1;
424
0
    }
425
426
0
    fw->buf_len += bytes;
427
428
    /* Find messages and trigger callbacks */
429
0
    while (fw->buf_len > 0) {
430
0
        if (fifo_is_msg_ready(fw) == MK_TRUE) {
431
            /* we got a complete message */
432
0
            fm = (struct mk_fifo_msg *) fw->buf_data;
433
0
            fq = mk_fifo_queue_get(fw->fifo, fm->queue_id);
434
0
            if (!fq) {
435
                /* Invalid queue */
436
0
                fprintf(stderr, "[fifo worker read] invalid queue id %i\n",
437
0
                        fm->queue_id);
438
0
                fifo_drop_msg(fw);
439
0
                continue;
440
0
            }
441
442
            /* Trigger callback if any */
443
0
            if (fq->cb_message) {
444
0
                fq->cb_message(fq, fm->data, fm->length, fq->data);
445
0
            }
446
0
            fifo_drop_msg(fw);
447
0
        }
448
0
        else {
449
            /* msg not ready */
450
0
            break;
451
0
        }
452
0
    }
453
454
0
    return 0;
455
0
}
456
457
int mk_fifo_destroy(struct mk_fifo *ctx)
458
0
{
459
0
    mk_fifo_queue_destroy_all(ctx);
460
0
    mk_fifo_worker_destroy_all(ctx);
461
0
    mk_mem_free(ctx);
462
0
    return 0;
463
0
}