Coverage Report

Created: 2025-10-14 08:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_emitter/emitter.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_compat.h>
21
#include <fluent-bit/flb_info.h>
22
#include <fluent-bit/flb_input.h>
23
#include <fluent-bit/flb_input_plugin.h>
24
#include <fluent-bit/flb_utils.h>
25
#include <fluent-bit/flb_sds.h>
26
#include <fluent-bit/flb_scheduler.h>
27
#include <fluent-bit/flb_ring_buffer.h>
28
29
#include <sys/types.h>
30
#include <sys/stat.h>
31
32
0
#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000
33
34
/* return values */
35
0
#define FLB_EMITTER_BUSY     -2
36
37
struct em_chunk {
38
    flb_sds_t tag;
39
    struct msgpack_sbuffer mp_sbuf;  /* msgpack sbuffer        */
40
    struct msgpack_packer mp_pck;    /* msgpack packer         */
41
    struct mk_list _head;
42
};
43
44
struct input_ref {
45
    struct flb_input_instance *i_ins;
46
    struct mk_list _head;
47
};
48
49
struct flb_emitter {
50
    int coll_fd;                        /* collector id */
51
    struct mk_list chunks;              /* list of all pending chunks */
52
    struct flb_input_instance *ins;     /* input instance */
53
    struct flb_ring_buffer *msgs;       /* ring buffer for cross-thread messages */
54
    int ring_buffer_size;               /* size of the ring buffer */
55
    struct mk_list i_ins_list;          /* instance list of linked/sending inputs */
56
};
57
58
struct em_chunk *em_chunk_create(const char *tag, int tag_len,
59
                                 struct flb_emitter *ctx)
60
0
{
61
0
    struct em_chunk *ec;
62
63
0
    ec = flb_calloc(1, sizeof(struct em_chunk));
64
0
    if (!ec) {
65
0
        flb_errno();
66
0
        return NULL;
67
0
    }
68
69
0
    ec->tag = flb_sds_create_len(tag, tag_len);
70
0
    if (!ec->tag) {
71
0
        flb_errno();
72
0
        flb_free(ec);
73
0
        return NULL;
74
0
    }
75
76
0
    msgpack_sbuffer_init(&ec->mp_sbuf);
77
0
    msgpack_packer_init(&ec->mp_pck, &ec->mp_sbuf, msgpack_sbuffer_write);
78
79
0
    mk_list_add(&ec->_head, &ctx->chunks);
80
81
0
    return ec;
82
0
}
83
84
static void em_chunk_destroy(struct em_chunk *ec)
85
0
{
86
0
    mk_list_del(&ec->_head);
87
0
    flb_sds_destroy(ec->tag);
88
0
    msgpack_sbuffer_destroy(&ec->mp_sbuf);
89
0
    flb_free(ec);
90
0
}
91
92
int static do_in_emitter_add_record(struct em_chunk *ec,
93
                                    struct flb_input_instance *in)
94
0
{
95
0
    struct flb_emitter *ctx = (struct flb_emitter *) in->context;
96
0
    int ret;
97
98
0
    if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
99
0
        flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.",
100
0
                         ctx->ins->name);
101
0
        return FLB_EMITTER_BUSY;
102
0
    }
103
104
    /* Associate this backlog chunk to this instance into the engine */
105
0
    ret = flb_input_log_append(in,
106
0
                               ec->tag, flb_sds_len(ec->tag),
107
0
                               ec->mp_sbuf.data,
108
0
                               ec->mp_sbuf.size);
109
0
    if (ret == -1) {
110
0
        flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag);
111
        /* Release the echunk */
112
0
        em_chunk_destroy(ec);
113
0
        return -1;
114
0
    }
115
0
    em_chunk_destroy(ec);
116
0
    return 0;
117
0
}
118
119
/*
120
 * Function used by filters to ingest custom records with custom tags, at the
121
 * moment it's only used by rewrite_tag filter.
122
 */
123
int in_emitter_add_record(const char *tag, int tag_len,
124
                          const char *buf_data, size_t buf_size,
125
                          struct flb_input_instance *in,
126
                          struct flb_input_instance *i_ins)
127
0
{
128
0
    struct em_chunk temporary_chunk;
129
0
    struct mk_list *head;
130
0
    struct input_ref *i_ref;
131
0
    bool ref_found;
132
0
    struct mk_list *tmp;
133
134
0
    struct em_chunk *ec;
135
0
    struct flb_emitter *ctx;
136
137
0
    ctx = (struct flb_emitter *) in->context;
138
0
    ec = NULL;
139
    /* Iterate over list of already known (source) inputs */
140
    /* If new, add it to the list to be able to pause it later on */
141
0
    ref_found = false;
142
0
    mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
143
0
        i_ref = mk_list_entry(head, struct input_ref, _head);
144
0
        if(i_ref->i_ins == i_ins){
145
0
            ref_found = true;
146
0
            break;
147
0
        }
148
0
    }
149
0
    if (!ref_found) {
150
0
        i_ref = flb_malloc(sizeof(struct input_ref));
151
0
        if (!i_ref) {
152
0
            flb_errno();
153
0
            return FLB_FILTER_NOTOUCH;
154
0
        }
155
0
        i_ref->i_ins = i_ins;
156
0
        mk_list_add(&i_ref->_head, &ctx->i_ins_list);
157
        /* If in_emitter is paused, but new input plugin is not paused, pause it */
158
0
        if (flb_input_buf_paused(ctx->ins) == FLB_TRUE &&
159
0
                flb_input_buf_paused(i_ins) == FLB_FALSE) {
160
0
            flb_input_pause(i_ins);
161
0
        }
162
0
    }
163
164
165
    /* Restricted by mem_buf_limit */
166
0
    if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) {
167
0
        flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record.");
168
0
        return FLB_EMITTER_BUSY;
169
0
    }
170
171
    /* Use the ring buffer first if it exists */
172
0
    if (ctx->msgs) {
173
0
        memset(&temporary_chunk, 0, sizeof(struct em_chunk));
174
175
0
        temporary_chunk.tag = flb_sds_create_len(tag, tag_len);
176
177
0
        if (temporary_chunk.tag == NULL) {
178
0
            flb_plg_error(ctx->ins,
179
0
                          "cannot allocate memory for tag: %s",
180
0
                          tag);
181
0
            return -1;
182
0
        }
183
184
0
        msgpack_sbuffer_init(&temporary_chunk.mp_sbuf);
185
0
        msgpack_sbuffer_write(&temporary_chunk.mp_sbuf, buf_data, buf_size);
186
187
0
        return flb_ring_buffer_write(ctx->msgs,
188
0
                                     (void *) &temporary_chunk,
189
0
                                     sizeof(struct em_chunk));
190
0
    }
191
192
    /* Check if any target chunk already exists */
193
0
    mk_list_foreach(head, &ctx->chunks) {
194
0
        ec = mk_list_entry(head, struct em_chunk, _head);
195
0
        if (flb_sds_cmp(ec->tag, tag, tag_len) != 0) {
196
0
            ec = NULL;
197
0
            continue;
198
0
        }
199
0
        break;
200
0
    }
201
202
    /* No candidate chunk found, so create a new one */
203
0
    if (!ec) {
204
0
        ec = em_chunk_create(tag, tag_len, ctx);
205
0
        if (!ec) {
206
0
            flb_plg_error(ctx->ins, "cannot create new chunk for tag: %s",
207
0
                      tag);
208
0
            return -1;
209
0
        }
210
0
    }
211
212
    /* Append raw msgpack data */
213
0
    msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size);
214
0
    return 0;
215
0
}
216
217
/*
218
 * Triggered by refresh_interval, it re-scan the path looking for new files
219
 * that match the original path pattern.
220
 */
221
static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in,
222
                                  struct flb_config *config, void *context)
223
0
{
224
0
    int ret;
225
0
    struct flb_emitter *ctx = (struct flb_emitter *)context;
226
0
    struct em_chunk ec;
227
0
    (void) config;
228
0
    (void) in;
229
230
231
0
    while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec,
232
0
                                       sizeof(struct em_chunk))) == 0) {
233
0
        ret = flb_input_log_append(in,
234
0
                                   ec.tag, flb_sds_len(ec.tag),
235
0
                                   ec.mp_sbuf.data,
236
0
                                   ec.mp_sbuf.size);
237
0
        flb_sds_destroy(ec.tag);
238
0
        msgpack_sbuffer_destroy(&ec.mp_sbuf);
239
0
    }
240
0
    return ret;
241
0
}
242
243
static int cb_queue_chunks(struct flb_input_instance *in,
244
                           struct flb_config *config, void *data)
245
0
{
246
0
    int ret;
247
0
    struct mk_list *tmp;
248
0
    struct mk_list *head;
249
0
    struct em_chunk *echunk;
250
0
    struct flb_emitter *ctx;
251
252
    /* Get context */
253
0
    ctx = (struct flb_emitter *) data;
254
255
    /* Try to enqueue chunks under our limits */
256
0
    mk_list_foreach_safe(head, tmp, &ctx->chunks) {
257
0
        echunk = mk_list_entry(head, struct em_chunk, _head);
258
259
        /* Associate this backlog chunk to this instance into the engine */
260
0
        ret = do_in_emitter_add_record(echunk, in);
261
0
        if (ret == -1) {
262
0
            continue;
263
0
        }
264
0
    }
265
266
0
    return 0;
267
0
}
268
269
static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx)
270
0
{
271
0
    if (ctx->ring_buffer_size <= 0) {
272
0
        return 0;
273
0
    }
274
275
0
    if (ctx->msgs != NULL) {
276
0
        flb_warn("emitter %s already has a ring buffer",
277
0
                  flb_input_name(in));
278
0
        return 0;
279
0
    }
280
281
0
    ctx->msgs = flb_ring_buffer_create(sizeof(void *) * ctx->ring_buffer_size);
282
0
    if (!ctx->msgs) {
283
0
        flb_error("emitter %s could not initialize ring buffer",
284
0
                  flb_input_name(in));
285
0
        return -1;
286
0
    }
287
288
0
    ctx->coll_fd = flb_input_set_collector_time(in,
289
0
                                                in_emitter_ingest_ring_buffer,
290
0
                                                1, 0, in->config);
291
0
    return (ctx->coll_fd < 0) ? -1 : 0;
292
0
}
293
294
/* Initialize plugin */
295
static int cb_emitter_init(struct flb_input_instance *in,
296
                           struct flb_config *config, void *data)
297
0
{
298
0
    struct flb_sched *scheduler;
299
0
    struct flb_emitter *ctx;
300
0
    int ret;
301
302
0
    scheduler = flb_sched_ctx_get();
303
304
0
    ctx = flb_calloc(1, sizeof(struct flb_emitter));
305
0
    if (!ctx) {
306
0
        flb_errno();
307
0
        return -1;
308
0
    }
309
0
    ctx->ins = in;
310
0
    mk_list_init(&ctx->chunks);
311
312
0
    mk_list_init(&ctx->i_ins_list);
313
314
315
0
    ret = flb_input_config_map_set(in, (void *) ctx);
316
0
    if (ret == -1) {
317
0
        flb_free(ctx);
318
0
        return -1;
319
0
    }
320
321
0
    if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) {
322
0
        ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;
323
0
        flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)",
324
0
                      ctx->ring_buffer_size);
325
0
    }
326
327
0
    if (ctx->ring_buffer_size > 0) {
328
0
        ret = in_emitter_start_ring_buffer(in, ctx);
329
0
        if (ret == -1) {
330
0
            flb_free(ctx);
331
0
            return -1;
332
0
        }
333
0
    }
334
0
    else{
335
0
        ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config);
336
0
        if (ret < 0) {
337
0
            flb_error("[in_emitter] could not create collector");
338
0
            flb_free(ctx);
339
0
            return -1;
340
0
        }
341
0
        ctx->coll_fd = ret;
342
0
    }
343
344
    /* export plugin context */
345
0
    flb_input_set_context(in, ctx);
346
347
0
    return 0;
348
0
}
349
350
static void cb_emitter_pause(void *data, struct flb_config *config)
351
0
{
352
0
    struct flb_emitter *ctx = data;
353
0
    struct mk_list *tmp;
354
0
    struct mk_list *head;
355
0
    struct input_ref *i_ref;
356
357
    /* Pause all known senders */
358
0
    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
359
0
    mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
360
0
        i_ref = mk_list_entry(head, struct input_ref, _head);
361
0
        flb_input_pause(i_ref->i_ins);
362
0
    }
363
0
}
364
365
static void cb_emitter_resume(void *data, struct flb_config *config)
366
0
{
367
0
    struct flb_emitter *ctx = data;
368
0
    struct mk_list *tmp;
369
0
    struct mk_list *head;
370
0
    struct input_ref *i_ref;
371
372
    /* Resume all known senders */
373
0
    flb_input_collector_resume(ctx->coll_fd, ctx->ins);
374
0
    mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) {
375
0
        i_ref = mk_list_entry(head, struct input_ref, _head);
376
0
        flb_input_resume(i_ref->i_ins);
377
0
    }
378
0
}
379
380
static int cb_emitter_exit(void *data, struct flb_config *config)
381
0
{
382
0
    struct mk_list *tmp;
383
0
    struct mk_list *head;
384
0
    struct flb_emitter *ctx = data;
385
0
    struct em_chunk *echunk;
386
0
    struct em_chunk ec;
387
0
    struct input_ref *i_ref;
388
0
    int ret;
389
390
0
    mk_list_foreach_safe(head, tmp, &ctx->chunks) {
391
0
        echunk = mk_list_entry(head, struct em_chunk, _head);
392
0
        mk_list_del(&echunk->_head);
393
0
        flb_free(echunk);
394
0
    }
395
396
0
    if (ctx->msgs) {
397
0
        while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec,
398
0
                                        sizeof(struct em_chunk))) == 0) {
399
0
            flb_sds_destroy(ec.tag);
400
0
            msgpack_sbuffer_destroy(&ec.mp_sbuf);
401
0
        }
402
0
        flb_ring_buffer_destroy(ctx->msgs);
403
0
    }
404
405
0
    mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) {
406
0
        i_ref = mk_list_entry(head, struct input_ref, _head);
407
0
        mk_list_del(&i_ref->_head);
408
0
        flb_free(i_ref);
409
0
    }
410
411
412
0
    flb_free(ctx);
413
0
    return 0;
414
0
}
415
416
static struct flb_config_map config_map[] = {
417
   {
418
    FLB_CONFIG_MAP_INT, "ring_buffer_size", "0",
419
    0, FLB_TRUE, offsetof(struct flb_emitter, ring_buffer_size),
420
    "use a ring buffer to ingest messages for the emitter (required across threads)."
421
   },
422
   {0}
423
};
424
425
/* Plugin reference */
426
struct flb_input_plugin in_emitter_plugin = {
427
    .name         = "emitter",
428
    .description  = "Record Emitter",
429
    .cb_init      = cb_emitter_init,
430
    .cb_pre_run   = NULL,
431
    .cb_collect   = NULL,
432
    .cb_ingest    = NULL,
433
    .cb_flush_buf = NULL,
434
    .config_map   = config_map,
435
    .cb_pause     = cb_emitter_pause,
436
    .cb_resume    = cb_emitter_resume,
437
    .cb_exit      = cb_emitter_exit,
438
439
    /* This plugin can only be configured and invoked by the Engine only */
440
    .flags        = FLB_INPUT_PRIVATE
441
};