Coverage Report

Created: 2023-11-19 07:36

/src/fluent-bit/plugins/in_udp/udp_conn.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_utils.h>
22
#include <fluent-bit/flb_engine.h>
23
#include <fluent-bit/flb_network.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_error.h>
26
27
#include "udp.h"
28
#include "udp_conn.h"
29
30
static inline void consume_bytes(char *buf, int bytes, int length)
31
0
{
32
0
    memmove(buf, buf + bytes, length - bytes);
33
0
}
34
35
static int append_message_to_record_data(char **result_buffer,
36
                                         size_t *result_size,
37
                                         flb_sds_t message_key_name,
38
                                         char *base_object_buffer,
39
                                         size_t base_object_size,
40
                                         char *message_buffer,
41
                                         size_t message_size,
42
                                         int message_type)
43
0
{
44
0
    int                result = FLB_MAP_NOT_MODIFIED;
45
0
    char              *modified_data_buffer;
46
0
    int                modified_data_size;
47
0
    msgpack_object_kv *new_map_entries[1];
48
0
    msgpack_object_kv  message_entry;
49
0
    *result_buffer = NULL;
50
0
    *result_size = 0;
51
0
    modified_data_buffer = NULL;
52
53
0
    if (message_key_name != NULL) {
54
0
        new_map_entries[0] = &message_entry;
55
56
0
        message_entry.key.type = MSGPACK_OBJECT_STR;
57
0
        message_entry.key.via.str.size = flb_sds_len(message_key_name);
58
0
        message_entry.key.via.str.ptr  = message_key_name;
59
60
0
        if (message_type == MSGPACK_OBJECT_BIN) {
61
0
            message_entry.val.type = MSGPACK_OBJECT_BIN;
62
0
            message_entry.val.via.bin.size = message_size;
63
0
            message_entry.val.via.bin.ptr  = message_buffer;
64
0
        }
65
0
        else if (message_type == MSGPACK_OBJECT_STR) {
66
0
            message_entry.val.type = MSGPACK_OBJECT_STR;
67
0
            message_entry.val.via.str.size = message_size;
68
0
            message_entry.val.via.str.ptr  = message_buffer;
69
0
        }
70
0
        else {
71
0
            result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
72
0
        }
73
74
0
        if (result == FLB_MAP_NOT_MODIFIED) {
75
0
            result = flb_msgpack_expand_map(base_object_buffer,
76
0
                                            base_object_size,
77
0
                                            new_map_entries, 1,
78
0
                                            &modified_data_buffer,
79
0
                                            &modified_data_size);
80
0
            if (result == 0) {
81
0
                result = FLB_MAP_EXPAND_SUCCESS;
82
0
            }
83
0
            else {
84
0
                result = FLB_MAP_EXPANSION_ERROR;
85
0
            }
86
0
        }
87
0
    }
88
89
0
    if (result == FLB_MAP_EXPAND_SUCCESS) {
90
0
        *result_buffer = modified_data_buffer;
91
0
        *result_size = modified_data_size;
92
0
    }
93
94
0
    return result;
95
0
}
96
97
static inline int process_pack(struct udp_conn *conn,
98
                               char *pack, size_t size)
99
0
{
100
0
    int ret;
101
0
    size_t off = 0;
102
0
    msgpack_unpacked result;
103
0
    msgpack_object entry;
104
0
    msgpack_sbuffer sbuf;
105
0
    msgpack_packer  pck;
106
0
    struct flb_in_udp_config *ctx;
107
0
    char   *appended_address_buffer;
108
0
    size_t  appended_address_size;
109
0
    char   *source_address;
110
0
    int i;
111
0
    int len;
112
113
0
    ctx = conn->ctx;
114
115
0
    flb_log_event_encoder_reset(ctx->log_encoder);
116
117
    /* First pack the results, iterate concatenated messages */
118
0
    msgpack_unpacked_init(&result);
119
0
    while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
120
0
        entry = result.data;
121
122
0
        appended_address_buffer = NULL;
123
0
        source_address = NULL;
124
125
0
        ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
126
127
0
        if (ret == FLB_EVENT_ENCODER_SUCCESS) {
128
0
            ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
129
0
        }
130
131
0
        if (ctx->source_address_key != NULL) {
132
0
            source_address = flb_connection_get_remote_address(conn->connection);
133
0
        }
134
135
0
        if (ret == FLB_EVENT_ENCODER_SUCCESS) {
136
0
            if (entry.type == MSGPACK_OBJECT_MAP) {
137
0
                if (source_address != NULL) {
138
0
                    msgpack_sbuffer_init(&sbuf);
139
0
                    msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
140
141
0
                    len = entry.via.map.size;
142
0
                    msgpack_pack_map(&pck, len);
143
144
0
                    for (i=0; i<len; i++) {
145
0
                        msgpack_pack_object(&pck, entry.via.map.ptr[i].key);
146
0
                        msgpack_pack_object(&pck, entry.via.map.ptr[i].val);
147
0
                    }
148
149
0
                    ret = append_message_to_record_data(&appended_address_buffer,
150
0
                                                        &appended_address_size,
151
0
                                                        ctx->source_address_key,
152
0
                                                        sbuf.data,
153
0
                                                        sbuf.size,
154
0
                                                        source_address,
155
0
                                                        strlen(source_address),
156
0
                                                        MSGPACK_OBJECT_STR);
157
0
                    msgpack_sbuffer_destroy(&sbuf);
158
0
                }
159
160
0
                if (ret == FLB_MAP_EXPANSION_ERROR) {
161
0
                    flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
162
0
                }
163
164
0
                if (appended_address_buffer != NULL) {
165
0
                    ret = flb_log_event_encoder_set_body_from_raw_msgpack(
166
0
                            ctx->log_encoder, appended_address_buffer, appended_address_size);
167
0
                }
168
0
                else {
169
0
                    ret = flb_log_event_encoder_set_body_from_msgpack_object(
170
0
                            ctx->log_encoder, &entry);
171
0
                }
172
0
            }
173
0
            else if (entry.type == MSGPACK_OBJECT_ARRAY) {
174
0
                if (source_address != NULL) {
175
0
                    ret = flb_log_event_encoder_append_body_values(
176
0
                        ctx->log_encoder,
177
0
                        FLB_LOG_EVENT_CSTRING_VALUE("msg"),
178
0
                        FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
179
0
                        FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
180
0
                        FLB_LOG_EVENT_CSTRING_VALUE(source_address));
181
0
                }
182
0
                else {
183
0
                    ret = flb_log_event_encoder_append_body_values(
184
0
                        ctx->log_encoder,
185
0
                        FLB_LOG_EVENT_CSTRING_VALUE("msg"),
186
0
                        FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
187
0
                }
188
0
            }
189
0
            else {
190
0
                ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
191
0
            }
192
193
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
194
0
                ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
195
0
            }
196
197
0
            if (appended_address_buffer != NULL) {
198
0
                flb_free(appended_address_buffer);
199
0
            }
200
201
0
            if (ret != FLB_EVENT_ENCODER_SUCCESS) {
202
0
                break;
203
0
            }
204
0
        }
205
0
    }
206
207
0
    msgpack_unpacked_destroy(&result);
208
209
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
210
0
        flb_input_log_append(conn->ins, NULL, 0,
211
0
                             ctx->log_encoder->output_buffer,
212
0
                             ctx->log_encoder->output_length);
213
0
        ret = 0;
214
0
    }
215
0
    else {
216
0
        flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
217
218
0
        ret = -1;
219
0
    }
220
221
0
    return ret;
222
0
}
223
224
/* Process a JSON payload, return the number of processed bytes */
225
static ssize_t parse_payload_json(struct udp_conn *conn)
226
0
{
227
0
    int ret;
228
0
    int out_size;
229
0
    char *pack;
230
231
0
    ret = flb_pack_json_state(conn->buf_data, conn->buf_len,
232
0
                              &pack, &out_size, &conn->pack_state);
233
0
    if (ret == FLB_ERR_JSON_PART) {
234
0
        flb_plg_debug(conn->ins, "JSON incomplete, waiting for more data...");
235
0
        return 0;
236
0
    }
237
0
    else if (ret == FLB_ERR_JSON_INVAL) {
238
0
        flb_plg_warn(conn->ins, "invalid JSON message, skipping");
239
0
        conn->buf_len = 0;
240
0
        conn->pack_state.multiple = FLB_TRUE;
241
0
        return -1;
242
0
    }
243
0
    else if (ret == -1) {
244
0
        return -1;
245
0
    }
246
247
    /* Process the packaged JSON and return the last byte used */
248
0
    process_pack(conn, pack, out_size);
249
0
    flb_free(pack);
250
251
0
    return conn->pack_state.last_byte;
252
0
}
253
254
/*
255
 * Process a raw text payload, uses the delimited character to split records,
256
 * return the number of processed bytes
257
 */
258
static ssize_t parse_payload_none(struct udp_conn *conn)
259
0
{
260
0
    int ret;
261
0
    int len;
262
0
    int sep_len;
263
0
    size_t consumed = 0;
264
0
    char *buf;
265
0
    char *s;
266
0
    char *separator;
267
0
    struct flb_in_udp_config *ctx;
268
269
0
    ctx = conn->ctx;
270
271
0
    separator = conn->ctx->separator;
272
0
    sep_len = flb_sds_len(conn->ctx->separator);
273
274
0
    buf = conn->buf_data;
275
0
    ret = FLB_EVENT_ENCODER_SUCCESS;
276
277
0
    flb_log_event_encoder_reset(ctx->log_encoder);
278
279
0
    while ((s = strstr(buf, separator))) {
280
0
        len = (s - buf);
281
0
        if (len == 0) {
282
0
            break;
283
0
        }
284
0
        else if (len > 0) {
285
0
            ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
286
287
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
288
0
                ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
289
0
            }
290
291
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
292
0
                ret = flb_log_event_encoder_append_body_values(
293
0
                        ctx->log_encoder,
294
0
                        FLB_LOG_EVENT_CSTRING_VALUE("log"),
295
0
                        FLB_LOG_EVENT_STRING_VALUE(buf, len));
296
0
            }
297
298
0
            if (ret == FLB_EVENT_ENCODER_SUCCESS) {
299
0
                ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
300
0
            }
301
302
0
            if (ret != FLB_EVENT_ENCODER_SUCCESS) {
303
0
                break;
304
0
            }
305
306
0
            consumed += len + 1;
307
0
            buf += len + sep_len;
308
0
        }
309
0
        else {
310
0
            break;
311
0
        }
312
0
    }
313
314
0
    if (ret == FLB_EVENT_ENCODER_SUCCESS) {
315
0
        flb_input_log_append(conn->ins, NULL, 0,
316
0
                             ctx->log_encoder->output_buffer,
317
0
                             ctx->log_encoder->output_length);
318
0
    }
319
0
    else {
320
0
        flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
321
0
    }
322
323
0
    return consumed;
324
0
}
325
326
/* Callback invoked every time an event is triggered for a connection */
327
int udp_conn_event(void *data)
328
0
{
329
0
    int bytes;
330
0
    int available;
331
0
    int size;
332
0
    ssize_t ret_payload = -1;
333
0
    char *tmp;
334
0
    struct udp_conn *conn;
335
0
    struct flb_connection *connection;
336
0
    struct flb_in_udp_config *ctx;
337
338
0
    connection = (struct flb_connection *) data;
339
340
0
    conn = connection->user_data;
341
342
0
    ctx = conn->ctx;
343
344
0
    if (ctx->format == FLB_UDP_FMT_JSON &&
345
0
        conn->buf_len > 0) {
346
0
        flb_pack_state_reset(&conn->pack_state);
347
0
        flb_pack_state_init(&conn->pack_state);
348
349
0
        conn->pack_state.multiple = FLB_TRUE;
350
0
    }
351
352
0
    conn->buf_len = 0;
353
354
0
    available = (conn->buf_size - conn->buf_len) - 1;
355
0
    if (available < 1) {
356
0
        if (conn->buf_size + ctx->chunk_size > ctx->buffer_size) {
357
0
            flb_plg_trace(ctx->ins,
358
0
                          "fd=%i incoming data exceed limit (%zu KB)",
359
0
                          connection->fd, (ctx->buffer_size / 1024));
360
0
            return -1;
361
0
        }
362
363
0
        size = conn->buf_size + ctx->chunk_size;
364
0
        tmp = flb_realloc(conn->buf_data, size);
365
0
        if (!tmp) {
366
0
            flb_errno();
367
0
            return -1;
368
0
        }
369
0
        flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %i",
370
0
                      connection->fd, conn->buf_size, size);
371
372
0
        conn->buf_data = tmp;
373
0
        conn->buf_size = size;
374
0
        available = (conn->buf_size - conn->buf_len) - 1;
375
0
    }
376
377
    /* Read data */
378
0
    bytes = flb_io_net_read(connection,
379
0
                            (void *) &conn->buf_data[conn->buf_len],
380
0
                            available);
381
382
0
    if (bytes <= 0) {
383
0
        return -1;
384
0
    }
385
386
0
    flb_plg_trace(ctx->ins, "read()=%i pre_len=%i now_len=%i",
387
0
                  bytes, conn->buf_len, conn->buf_len + bytes);
388
0
    conn->buf_len += bytes;
389
0
    conn->buf_data[conn->buf_len] = '\0';
390
391
    /* Strip CR or LF if found at first byte */
392
0
    if (conn->buf_data[0] == '\r' || conn->buf_data[0] == '\n') {
393
        /* Skip message with one byte with CR or LF */
394
0
        flb_plg_trace(ctx->ins, "skip one byte message with ASCII code=%i",
395
0
                  conn->buf_data[0]);
396
0
        consume_bytes(conn->buf_data, 1, conn->buf_len);
397
0
        conn->buf_len--;
398
0
        conn->buf_data[conn->buf_len] = '\0';
399
0
    }
400
401
    /* JSON Format handler */
402
0
    if (ctx->format == FLB_UDP_FMT_JSON) {
403
0
        ret_payload = parse_payload_json(conn);
404
0
        if (ret_payload == 0) {
405
            /* Incomplete JSON message, we need more data */
406
0
            return -1;
407
0
        }
408
0
        else if (ret_payload == -1) {
409
0
            flb_pack_state_reset(&conn->pack_state);
410
0
            flb_pack_state_init(&conn->pack_state);
411
0
            conn->pack_state.multiple = FLB_TRUE;
412
0
            return -1;
413
0
        }
414
0
    }
415
0
    else if (ctx->format == FLB_UDP_FMT_NONE) {
416
0
        ret_payload = parse_payload_none(conn);
417
0
        if (ret_payload == 0) {
418
0
            return -1;
419
0
        }
420
0
        else if (ret_payload == -1) {
421
0
            conn->buf_len = 0;
422
0
            return -1;
423
0
        }
424
0
    }
425
426
0
    consume_bytes(conn->buf_data, ret_payload, conn->buf_len);
427
0
    conn->buf_len -= ret_payload;
428
0
    conn->buf_data[conn->buf_len] = '\0';
429
430
0
    if (ctx->format == FLB_UDP_FMT_JSON) {
431
0
        jsmn_init(&conn->pack_state.parser);
432
0
        conn->pack_state.tokens_count = 0;
433
0
        conn->pack_state.last_byte = 0;
434
0
        conn->pack_state.buf_len = 0;
435
0
    }
436
437
0
    return bytes;
438
0
}
439
440
struct udp_conn *udp_conn_add(struct flb_connection *connection,
441
                              struct flb_in_udp_config *ctx)
442
0
{
443
0
    struct udp_conn *conn;
444
445
0
    conn = flb_malloc(sizeof(struct udp_conn));
446
0
    if (!conn) {
447
0
        flb_errno();
448
0
        return NULL;
449
0
    }
450
451
0
    conn->connection = connection;
452
453
    /* Set data for the event-loop */
454
455
0
    MK_EVENT_NEW(&connection->event);
456
457
0
    connection->user_data     = conn;
458
0
    connection->event.type    = FLB_ENGINE_EV_CUSTOM;
459
0
    connection->event.handler = udp_conn_event;
460
461
    /* Connection info */
462
0
    conn->ctx     = ctx;
463
0
    conn->buf_len = 0;
464
465
0
    conn->buf_data = flb_malloc(ctx->chunk_size);
466
0
    if (!conn->buf_data) {
467
0
        flb_errno();
468
469
0
        flb_plg_error(ctx->ins, "could not allocate new connection");
470
0
        flb_free(conn);
471
472
0
        return NULL;
473
0
    }
474
0
    conn->buf_size = ctx->chunk_size;
475
0
    conn->ins      = ctx->ins;
476
477
    /* Initialize JSON parser */
478
0
    if (ctx->format == FLB_UDP_FMT_JSON) {
479
0
        flb_pack_state_init(&conn->pack_state);
480
0
        conn->pack_state.multiple = FLB_TRUE;
481
0
    }
482
483
0
    return conn;
484
0
}
485
486
int udp_conn_del(struct udp_conn *conn)
487
0
{
488
0
    struct flb_in_udp_config *ctx;
489
490
0
    ctx = conn->ctx;
491
492
0
    if (ctx->format == FLB_UDP_FMT_JSON) {
493
0
        flb_pack_state_reset(&conn->pack_state);
494
0
    }
495
496
0
    flb_free(conn->buf_data);
497
0
    flb_free(conn);
498
499
0
    return 0;
500
0
}