Coverage Report

Created: 2026-06-20 07:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_udp/udp.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_pack.h>
22
#include <fluent-bit/flb_str.h>
23
#include <fluent-bit/flb_time.h>
24
#include <fluent-bit/flb_utils.h>
25
#include <fluent-bit/flb_pack.h>
26
#include <fluent-bit/flb_sds.h>
27
#include <fluent-bit/flb_config_map.h>
28
#include <fluent-bit/flb_log_event_decoder.h>
29
#include <msgpack.h>
30
31
#include <stdio.h>
32
#include <stdlib.h>
33
#include <assert.h>
34
35
#include "udp.h"
36
#include "udp_conf.h"
37
38
static int cb_udp_init(struct flb_output_instance *ins,
39
                       struct flb_config *config, void *data)
40
0
{
41
0
    struct flb_out_udp *ctx = NULL;
42
0
    (void) data;
43
44
0
    ctx = flb_udp_conf_create(ins, config);
45
0
    if (!ctx) {
46
0
        return -1;
47
0
    }
48
49
    /* Set the plugin context */
50
0
    flb_output_set_context(ins, ctx);
51
52
0
    return 0;
53
0
}
54
55
static int deliver_chunks_raw(struct flb_out_udp *ctx,
56
                              const char *tag, int tag_len,
57
                              const void *in_data, size_t in_size)
58
0
{
59
0
    int ret;
60
0
    flb_sds_t buf = NULL;
61
0
    flb_sds_t str;
62
0
    msgpack_object map;
63
0
    ssize_t send_result;
64
0
    struct flb_log_event_decoder log_decoder;
65
0
    struct flb_log_event log_event;
66
67
0
    buf = flb_sds_create_size(in_size);
68
0
    if (!buf) {
69
0
        return FLB_ERROR;
70
0
    }
71
72
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) in_data, in_size);
73
74
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
75
0
        flb_plg_error(ctx->ins,
76
0
                      "Log event decoder initialization error : %d", ret);
77
78
0
        flb_sds_destroy(buf);
79
80
0
        return -1;
81
0
    }
82
83
0
    while ((ret = flb_log_event_decoder_next(
84
0
                    &log_decoder,
85
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
86
0
        map = *log_event.body;
87
88
0
        str = flb_ra_translate(ctx->ra_raw_message_key, (char *) tag, tag_len, map, NULL);
89
0
        if (!str) {
90
0
            continue;
91
0
        }
92
93
0
        ret = flb_sds_cat_safe(&buf, str, flb_sds_len(str));
94
0
        if (ret != 0) {
95
0
            flb_plg_error(ctx->ins, "failed to compose payload from '%s'", str);
96
0
        }
97
0
        flb_sds_destroy(str);
98
99
        /* append a new line */
100
0
        flb_sds_cat_safe(&buf, "\n", 1);
101
102
0
        if (flb_sds_len(buf) > 65535) {
103
0
            flb_plg_debug(ctx->ins, "record size exceeds maximum datagram size : %zu", flb_sds_len(buf));
104
0
        }
105
106
0
        send_result = send(ctx->endpoint_descriptor,
107
0
                           buf,
108
0
                           flb_sds_len(buf),
109
0
                           0);
110
111
0
        if (send_result == -1) {
112
0
            flb_log_event_decoder_destroy(&log_decoder);
113
0
            flb_sds_destroy(buf);
114
115
0
            return FLB_RETRY;
116
0
        }
117
118
0
        flb_sds_len_set(buf, 0);
119
0
        buf[0] = '\0';
120
0
    }
121
122
0
    flb_log_event_decoder_destroy(&log_decoder);
123
0
    flb_sds_destroy(buf);
124
125
0
    return FLB_OK;
126
0
}
127
128
static int deliver_chunks_json(struct flb_out_udp *ctx,
129
                               const char *tag, int tag_len,
130
                               const void *in_data, size_t in_size,
131
                               struct flb_config *config)
132
0
{
133
0
    int ret;
134
0
    size_t off = 0;
135
0
    flb_sds_t json = NULL;
136
0
    ssize_t send_result;
137
0
    size_t previous_offset;
138
0
    int append_new_line;
139
0
    struct flb_log_event_decoder log_decoder;
140
0
    struct flb_log_event log_event;
141
142
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) in_data, in_size);
143
144
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
145
0
        flb_plg_error(ctx->ins,
146
0
                      "Log event decoder initialization error : %d", ret);
147
148
0
        return FLB_ERROR;
149
0
    }
150
151
0
    previous_offset = 0;
152
153
0
    while ((ret = flb_log_event_decoder_next(
154
0
                    &log_decoder,
155
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
156
0
        off = log_decoder.offset;
157
158
0
        json = flb_pack_msgpack_to_json_format(&((char *) in_data)[previous_offset],
159
0
                                               off - previous_offset,
160
0
                                               ctx->out_format,
161
0
                                               ctx->json_date_format,
162
0
                                               ctx->date_key,
163
0
                                               config->json_escape_unicode);
164
0
        if (!json) {
165
0
            flb_plg_error(ctx->ins, "error formatting JSON payload");
166
167
0
            flb_log_event_decoder_destroy(&log_decoder);
168
169
0
            return FLB_ERROR;
170
0
        }
171
172
0
        previous_offset = off;
173
0
        append_new_line = FLB_FALSE;
174
175
0
        if (flb_sds_len(json) > 0) {
176
0
            if (json[flb_sds_len(json) - 1] != '\n') {
177
0
                append_new_line = FLB_TRUE;
178
0
            }
179
180
0
            if (append_new_line) {
181
0
                ret = flb_sds_cat_safe(&json, "\n", 1);
182
183
0
                if (ret != 0) {
184
0
                    flb_log_event_decoder_destroy(&log_decoder);
185
0
                    flb_sds_destroy(json);
186
187
0
                    return FLB_RETRY;
188
0
                }
189
0
            }
190
191
0
            if (flb_sds_len(json) > 65535) {
192
0
                flb_plg_debug(ctx->ins, "record size exceeds maximum datagram size : %zu", flb_sds_len(json));
193
0
            }
194
195
0
            send_result = send(ctx->endpoint_descriptor,
196
0
                               json,
197
0
                               flb_sds_len(json),
198
0
                               0);
199
200
0
            if (send_result == -1) {
201
0
                flb_log_event_decoder_destroy(&log_decoder);
202
0
                flb_sds_destroy(json);
203
204
0
                return FLB_RETRY;
205
0
            }
206
0
        }
207
208
0
        flb_sds_destroy(json);
209
0
    }
210
211
0
    flb_log_event_decoder_destroy(&log_decoder);
212
213
0
    return FLB_OK;
214
0
}
215
216
static int deliver_chunks_msgpack(struct flb_out_udp *ctx,
217
                                  const char *tag, int tag_len,
218
                                  const void *in_data, size_t in_size)
219
0
{
220
0
    size_t off = 0;
221
0
    ssize_t send_result;
222
0
    size_t previous_offset;
223
0
    struct flb_log_event_decoder log_decoder;
224
0
    struct flb_log_event log_event;
225
0
    int ret;
226
227
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) in_data, in_size);
228
229
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
230
0
        flb_plg_error(ctx->ins,
231
0
                      "Log event decoder initialization error : %d", ret);
232
233
0
        return FLB_RETRY;
234
0
    }
235
236
0
    previous_offset = 0;
237
238
0
    while ((ret = flb_log_event_decoder_next(
239
0
                    &log_decoder,
240
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
241
0
        off = log_decoder.offset;
242
243
0
        if ((off - previous_offset) > 65535) {
244
0
            flb_plg_debug(ctx->ins, "record size exceeds maximum datagram size : %zu", (off - previous_offset));
245
0
        }
246
247
0
        send_result = send(ctx->endpoint_descriptor,
248
0
                           &((char *) in_data)[previous_offset],
249
0
                           off - previous_offset,
250
0
                           0);
251
252
0
        if (send_result == -1) {
253
0
            flb_log_event_decoder_destroy(&log_decoder);
254
255
0
            return FLB_RETRY;
256
0
        }
257
258
0
        previous_offset = off;
259
0
    }
260
261
0
    flb_log_event_decoder_destroy(&log_decoder);
262
263
0
    return FLB_OK;
264
0
}
265
266
static void cb_udp_flush(struct flb_event_chunk *event_chunk,
267
                         struct flb_output_flush *out_flush,
268
                         struct flb_input_instance *i_ins,
269
                         void *out_context,
270
                         struct flb_config *config)
271
0
{
272
0
    int ret = FLB_ERROR;
273
0
    struct flb_out_udp *ctx = out_context;
274
275
0
    (void) i_ins;
276
277
0
    if (ctx->ra_raw_message_key != NULL) {
278
0
        ret = deliver_chunks_raw(ctx,
279
0
                                 event_chunk->tag,
280
0
                                 flb_sds_len(event_chunk->tag),
281
0
                                 event_chunk->data,
282
0
                                 event_chunk->size);
283
0
    }
284
0
    else if (ctx->out_format == FLB_PACK_JSON_FORMAT_NONE) {
285
0
        ret = deliver_chunks_msgpack(ctx,
286
0
                                     event_chunk->tag,
287
0
                                     flb_sds_len(event_chunk->tag),
288
0
                                     event_chunk->data,
289
0
                                     event_chunk->size);
290
0
    }
291
0
    else {
292
0
        ret = deliver_chunks_json(ctx,
293
0
                                  event_chunk->tag,
294
0
                                  flb_sds_len(event_chunk->tag),
295
0
                                  event_chunk->data,
296
0
                                  event_chunk->size,
297
0
                                  config);
298
0
    }
299
300
0
    return FLB_OUTPUT_RETURN(ret);
301
0
}
302
303
static int cb_udp_exit(void *data, struct flb_config *config)
304
0
{
305
0
    struct flb_out_udp *ctx = data;
306
307
0
    flb_udp_conf_destroy(ctx);
308
309
0
    return 0;
310
0
}
311
312
/* Configuration properties map */
313
static struct flb_config_map config_map[] = {
314
    {
315
     FLB_CONFIG_MAP_STR, "format", "json_lines",
316
     0, FLB_FALSE, 0,
317
     "Specify the payload format, supported formats: msgpack, json, "
318
     "json_lines or json_stream."
319
    },
320
321
    {
322
     FLB_CONFIG_MAP_STR, "json_date_format", "double",
323
     0, FLB_FALSE, 0,
324
     FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
325
    },
326
327
    {
328
     FLB_CONFIG_MAP_STR, "json_date_key", "date",
329
     0, FLB_TRUE, offsetof(struct flb_out_udp, json_date_key),
330
     "Specify the name of the date field in output."
331
    },
332
333
    {
334
     FLB_CONFIG_MAP_STR, "raw_message_key", NULL,
335
     0, FLB_TRUE, offsetof(struct flb_out_udp, raw_message_key),
336
     "use a raw message key for the message."
337
    },
338
339
    /* EOF */
340
    {0}
341
};
342
343
/* Plugin reference */
344
struct flb_output_plugin out_udp_plugin = {
345
    .name           = "udp",
346
    .description    = "UDP Output",
347
    .cb_init        = cb_udp_init,
348
    .cb_flush       = cb_udp_flush,
349
    .cb_exit        = cb_udp_exit,
350
    .config_map     = config_map,
351
352
    .workers        = 2,
353
    .flags          = FLB_OUTPUT_NET,
354
};