Coverage Report

Created: 2026-02-09 07:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/src/flb_lib.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit Demo
4
 *  ===============
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
21
#include <fluent-bit/flb_lib.h>
22
#include <fluent-bit/flb_mem.h>
23
#include <fluent-bit/flb_compat.h>
24
#include <fluent-bit/flb_pipe.h>
25
#include <fluent-bit/flb_engine.h>
26
#include <fluent-bit/flb_input.h>
27
#include <fluent-bit/flb_output.h>
28
#include <fluent-bit/flb_filter.h>
29
#include <fluent-bit/flb_utils.h>
30
#include <fluent-bit/flb_time.h>
31
#include <fluent-bit/flb_coro.h>
32
#include <fluent-bit/flb_callback.h>
33
#include <fluent-bit/flb_kv.h>
34
#include <fluent-bit/flb_metrics.h>
35
#include <fluent-bit/flb_upstream.h>
36
#include <fluent-bit/flb_downstream.h>
37
#include <fluent-bit/tls/flb_tls.h>
38
#include <fluent-bit/config_format/flb_cf.h>
39
40
#include <signal.h>
41
#include <stdarg.h>
42
#include <sys/stat.h>
43
#include <errno.h>
44
#include <stdlib.h>
45
46
#ifdef FLB_HAVE_MTRACE
47
#include <mcheck.h>
48
#endif
49
50
#ifdef FLB_HAVE_AWS_ERROR_REPORTER
51
#include <fluent-bit/aws/flb_aws_error_reporter.h>
52
53
struct flb_aws_error_reporter *error_reporter;
54
#endif
55
56
/* thread initializator */
57
static pthread_once_t flb_lib_once = PTHREAD_ONCE_INIT;
58
59
/* reference to the last 'flb_lib_ctx' context started through flb_start() */
60
FLB_TLS_DEFINE(flb_ctx_t, flb_lib_active_context);
61
62
/* reference to the last 'flb_cf' context started through flb_start() */
63
FLB_TLS_DEFINE(struct flb_cf, flb_lib_active_cf_context);
64
65
#ifdef FLB_SYSTEM_WINDOWS
66
static inline int flb_socket_init_win32(void)
67
{
68
    WSADATA wsaData;
69
    int err;
70
71
    err = WSAStartup(MAKEWORD(2, 2), &wsaData);
72
    if (err != 0) {
73
        fprintf(stderr, "WSAStartup failed with error: %d\n", err);
74
        return err;
75
    }
76
    return 0;
77
}
78
#endif
79
80
static inline struct flb_input_instance *in_instance_get(flb_ctx_t *ctx,
81
                                                         int ffd)
82
22.5M
{
83
22.5M
    struct mk_list *head;
84
22.5M
    struct flb_input_instance *i_ins;
85
86
22.5M
    mk_list_foreach(head, &ctx->config->inputs) {
87
22.5M
        i_ins = mk_list_entry(head, struct flb_input_instance, _head);
88
22.5M
        if (i_ins->id == ffd) {
89
22.5M
            return i_ins;
90
22.5M
        }
91
22.5M
    }
92
93
0
    return NULL;
94
22.5M
}
95
96
static inline struct flb_output_instance *out_instance_get(flb_ctx_t *ctx,
97
                                                           int ffd)
98
123
{
99
123
    struct mk_list *head;
100
123
    struct flb_output_instance *o_ins;
101
102
123
    mk_list_foreach(head, &ctx->config->outputs) {
103
123
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
104
123
        if (o_ins->id == ffd) {
105
123
            return o_ins;
106
123
        }
107
123
    }
108
109
0
    return NULL;
110
123
}
111
112
static inline struct flb_filter_instance *filter_instance_get(flb_ctx_t *ctx,
113
                                                              int ffd)
114
2
{
115
2
    struct mk_list *head;
116
2
    struct flb_filter_instance *f_ins;
117
118
2
    mk_list_foreach(head, &ctx->config->filters) {
119
2
        f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
120
2
        if (f_ins->id == ffd) {
121
2
            return f_ins;
122
2
        }
123
2
    }
124
125
0
    return NULL;
126
2
}
127
128
void flb_init_env()
129
3
{
130
3
    flb_tls_init();
131
3
    flb_coro_init();
132
3
    flb_upstream_init();
133
3
    flb_downstream_init();
134
3
    flb_output_prepare();
135
136
3
    FLB_TLS_INIT(flb_lib_active_context);
137
3
    FLB_TLS_INIT(flb_lib_active_cf_context);
138
139
    /* libraries */
140
3
    cmt_initialize();
141
3
}
142
143
flb_ctx_t *flb_create()
144
109
{
145
109
    int ret;
146
109
    flb_ctx_t *ctx;
147
109
    struct flb_config *config;
148
149
#ifdef FLB_HAVE_MTRACE
150
    /* Start tracing malloc and free */
151
    mtrace();
152
#endif
153
154
#ifdef FLB_SYSTEM_WINDOWS
155
    /* Ensure we initialized Windows Sockets */
156
    if (flb_socket_init_win32()) {
157
        return NULL;
158
    }
159
#endif
160
161
109
    ctx = flb_calloc(1, sizeof(flb_ctx_t));
162
109
    if (!ctx) {
163
0
        perror("malloc");
164
0
        return NULL;
165
0
    }
166
167
109
    config = flb_config_init();
168
109
    if (!config) {
169
0
        flb_free(ctx);
170
0
        return NULL;
171
0
    }
172
109
    ctx->config = config;
173
109
    ctx->status = FLB_LIB_NONE;
174
175
    /*
176
     * Initialize our pipe to send data to our worker, used
177
     * by 'lib' input plugin.
178
     */
179
109
    ret = flb_pipe_create(config->ch_data);
180
109
    if (ret == -1) {
181
0
        perror("pipe");
182
0
        flb_config_exit(ctx->config);
183
0
        flb_free(ctx);
184
0
        return NULL;
185
0
    }
186
187
    /* Create the event loop to receive notifications */
188
109
    ctx->event_loop = mk_event_loop_create(256);
189
109
    if (!ctx->event_loop) {
190
0
        flb_config_exit(ctx->config);
191
0
        flb_free(ctx);
192
0
        return NULL;
193
0
    }
194
109
    config->ch_evl = ctx->event_loop;
195
196
    /* Prepare the notification channels */
197
109
    ctx->event_channel = flb_calloc(1, sizeof(struct mk_event));
198
109
    if (!ctx->event_channel) {
199
0
        perror("calloc");
200
0
        flb_config_exit(ctx->config);
201
0
        flb_free(ctx);
202
0
        return NULL;
203
0
    }
204
205
109
    MK_EVENT_ZERO(ctx->event_channel);
206
207
109
    ret = mk_event_channel_create(config->ch_evl,
208
109
                                  &config->ch_notif[0],
209
109
                                  &config->ch_notif[1],
210
109
                                  ctx->event_channel);
211
109
    if (ret != 0) {
212
0
        flb_error("[lib] could not create notification channels");
213
0
        flb_stop(ctx);
214
0
        flb_destroy(ctx);
215
0
        return NULL;
216
0
    }
217
218
    #ifdef FLB_HAVE_AWS_ERROR_REPORTER
219
    if (is_error_reporting_enabled()) {
220
        error_reporter = flb_aws_error_reporter_create();
221
    }
222
    #endif
223
224
109
    return ctx;
225
109
}
226
227
/* Release resources associated to the library context */
228
void flb_destroy(flb_ctx_t *ctx)
229
107
{
230
107
    if (!ctx) {
231
0
        return;
232
0
    }
233
234
107
    if (ctx->event_channel) {
235
107
        mk_event_del(ctx->event_loop, ctx->event_channel);
236
107
        flb_free(ctx->event_channel);
237
107
    }
238
239
    /* Remove resources from the event loop */
240
107
    mk_event_loop_destroy(ctx->event_loop);
241
242
    /* cfg->is_running is set to false when flb_engine_shutdown has been invoked (event loop) */
243
107
    if (ctx->config) {
244
107
        if (ctx->config->is_running == FLB_TRUE) {
245
0
            flb_engine_shutdown(ctx->config);
246
0
        }
247
107
        flb_config_exit(ctx->config);
248
107
    }
249
250
    #ifdef FLB_HAVE_AWS_ERROR_REPORTER
251
    if (is_error_reporting_enabled()) {
252
        flb_aws_error_reporter_destroy(error_reporter);
253
    }
254
    #endif
255
256
107
    flb_free(ctx);
257
107
    ctx = NULL;
258
259
#ifdef FLB_HAVE_MTRACE
260
    /* Stop tracing malloc and free */
261
    muntrace();
262
#endif
263
107
}
264
265
/* Defines a new input instance */
266
int flb_input(flb_ctx_t *ctx, const char *input, void *data)
267
242
{
268
242
    struct flb_input_instance *i_ins;
269
270
242
    i_ins = flb_input_new(ctx->config, input, data, FLB_TRUE);
271
242
    if (!i_ins) {
272
123
        return -1;
273
123
    }
274
275
119
    return i_ins->id;
276
242
}
277
278
/* Defines a new output instance */
279
int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb)
280
290
{
281
290
    struct flb_output_instance *o_ins;
282
283
290
    o_ins = flb_output_new(ctx->config, output, cb, FLB_TRUE);
284
290
    if (!o_ins) {
285
135
        return -1;
286
135
    }
287
288
155
    return o_ins->id;
289
290
}
290
291
/* Defines a new filter instance */
292
int flb_filter(flb_ctx_t *ctx, const char *filter, void *data)
293
132
{
294
132
    struct flb_filter_instance *f_ins;
295
296
132
    f_ins = flb_filter_new(ctx->config, filter, data);
297
132
    if (!f_ins) {
298
121
        return -1;
299
121
    }
300
301
11
    return f_ins->id;
302
132
}
303
304
/* Set an input interface property */
305
int flb_input_set(flb_ctx_t *ctx, int ffd, ...)
306
119
{
307
119
    int ret;
308
119
    char *key;
309
119
    char *value;
310
119
    va_list va;
311
119
    struct flb_input_instance *i_ins;
312
313
119
    i_ins = in_instance_get(ctx, ffd);
314
119
    if (!i_ins) {
315
0
        return -1;
316
0
    }
317
318
119
    va_start(va, ffd);
319
226
    while ((key = va_arg(va, char *))) {
320
119
        value = va_arg(va, char *);
321
119
        if (!value) {
322
            /* Wrong parameter */
323
12
            va_end(va);
324
12
            return -1;
325
12
        }
326
107
        ret = flb_input_set_property(i_ins, key, value);
327
107
        if (ret != 0) {
328
0
            va_end(va);
329
0
            return -1;
330
0
        }
331
107
    }
332
333
119
    va_end(va);
334
107
    return 0;
335
119
}
336
337
int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
338
0
{
339
0
    struct flb_input_instance *i_ins;
340
341
0
    i_ins = in_instance_get(ctx, ffd);
342
0
    if (!i_ins) {
343
0
        return -1;
344
0
    }
345
346
0
    if (i_ins->processor) {
347
0
        flb_processor_destroy(i_ins->processor);
348
0
    }
349
350
0
    i_ins->processor = proc;
351
352
0
    return 0;
353
0
}
354
355
int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
356
                       void (*in_callback) (void *, int, int, void *, size_t, void *),
357
                       void *in_callback_data)
358
0
{
359
0
    struct flb_input_instance *i_ins;
360
361
0
    i_ins = in_instance_get(ctx, ffd);
362
0
    if (!i_ins) {
363
0
        return -1;
364
0
    }
365
366
    /*
367
     * Enabling a test, set the output instance in 'test' mode, so no real
368
     * flush callback is invoked, only the desired implemented test.
369
     */
370
371
    /* Formatter test */
372
0
    if (strcmp(test_name, "formatter") == 0) {
373
0
        i_ins->test_mode = FLB_TRUE;
374
0
        i_ins->test_formatter.rt_ctx = ctx;
375
0
        i_ins->test_formatter.rt_ffd = ffd;
376
0
        i_ins->test_formatter.rt_in_callback = in_callback;
377
0
        i_ins->test_formatter.rt_data = in_callback_data;
378
0
    }
379
0
    else {
380
0
        return -1;
381
0
    }
382
383
0
    return 0;
384
0
}
385
386
int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name,
387
                             void (*out_response) (void *, int, int, void *, size_t, void *),
388
                             void *out_callback_data)
389
0
{
390
0
    struct flb_output_instance *o_ins;
391
392
0
    o_ins = out_instance_get(ctx, ffd);
393
0
    if (!o_ins) {
394
0
        return -1;
395
0
    }
396
397
    /*
398
     * Enabling a test, set the output instance in 'test' mode, so no real
399
     * flush callback is invoked, only the desired implemented test.
400
     */
401
402
    /* Response test */
403
0
    if (strcmp(test_name, "response") == 0) {
404
0
        o_ins->test_mode = FLB_TRUE;
405
0
        o_ins->test_response.rt_ctx = ctx;
406
0
        o_ins->test_response.rt_ffd = ffd;
407
0
        o_ins->test_response.rt_out_response = out_response;
408
0
        o_ins->test_response.rt_data = out_callback_data;
409
0
    }
410
0
    else {
411
0
        return -1;
412
0
    }
413
414
0
    return 0;
415
0
}
416
417
static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val)
418
0
{
419
0
    struct flb_kv *kv;
420
0
    struct mk_list properties;
421
0
    int r;
422
423
0
    mk_list_init(&properties);
424
425
0
    kv = flb_kv_item_create(&properties, (char *) key, (char *) val);
426
0
    if (!kv) {
427
0
        return FLB_LIB_ERROR;
428
0
    }
429
430
0
    r = flb_config_map_properties_check(plugin_name, &properties, config_map);
431
0
    flb_kv_item_destroy(kv);
432
0
    return r;
433
0
}
434
435
/* Check if a given k, v is a valid config directive for the given output plugin */
436
int flb_output_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
437
0
{
438
0
    struct flb_output_instance *o_ins;
439
0
    struct mk_list *config_map;
440
0
    struct flb_output_plugin *p;
441
0
    int r;
442
443
0
    o_ins = out_instance_get(ctx, ffd);
444
0
    if (!o_ins) {
445
0
      return FLB_LIB_ERROR;
446
0
    }
447
448
0
    p = o_ins->p;
449
0
    if (!p->config_map) {
450
0
        return FLB_LIB_NO_CONFIG_MAP;
451
0
    }
452
453
0
    config_map = flb_config_map_create(ctx->config, p->config_map);
454
0
    if (!config_map) {
455
0
        return FLB_LIB_ERROR;
456
0
    }
457
458
0
    r = flb_config_map_property_check(p->name, config_map, key, val);
459
0
    flb_config_map_destroy(config_map);
460
0
    return r;
461
0
}
462
463
/* Check if a given k, v is a valid config directive for the given input plugin */
464
int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
465
0
{
466
0
    struct flb_input_instance *i_ins;
467
0
    struct flb_input_plugin *p;
468
0
    struct mk_list *config_map;
469
0
    int r;
470
471
0
    i_ins = in_instance_get(ctx, ffd);
472
0
    if (!i_ins) {
473
0
      return FLB_LIB_ERROR;
474
0
    }
475
476
0
    p = i_ins->p;
477
0
    if (!p->config_map) {
478
0
        return FLB_LIB_NO_CONFIG_MAP;
479
0
    }
480
481
0
    config_map = flb_config_map_create(ctx->config, p->config_map);
482
0
    if (!config_map) {
483
0
        return FLB_LIB_ERROR;
484
0
    }
485
486
0
    r = flb_config_map_property_check(p->name, config_map, key, val);
487
0
    flb_config_map_destroy(config_map);
488
0
    return r;
489
0
}
490
491
/* Check if a given k, v is a valid config directive for the given filter plugin */
492
int flb_filter_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
493
0
{
494
0
    struct flb_filter_instance *f_ins;
495
0
    struct flb_filter_plugin *p;
496
0
    struct mk_list *config_map;
497
0
    int r;
498
499
0
    f_ins = filter_instance_get(ctx, ffd);
500
0
    if (!f_ins) {
501
0
      return FLB_LIB_ERROR;
502
0
    }
503
504
0
    p = f_ins->p;
505
0
    if (!p->config_map) {
506
0
        return FLB_LIB_NO_CONFIG_MAP;
507
0
    }
508
509
0
    config_map = flb_config_map_create(ctx->config, p->config_map);
510
0
    if (!config_map) {
511
0
        return FLB_LIB_ERROR;
512
0
    }
513
514
0
    r = flb_config_map_property_check(p->name, config_map, key, val);
515
0
    flb_config_map_destroy(config_map);
516
0
    return r;
517
0
}
518
519
/* Set an output interface property */
520
int flb_output_set(flb_ctx_t *ctx, int ffd, ...)
521
123
{
522
123
    int ret;
523
123
    char *key;
524
123
    char *value;
525
123
    va_list va;
526
123
    struct flb_output_instance *o_ins;
527
528
123
    o_ins = out_instance_get(ctx, ffd);
529
123
    if (!o_ins) {
530
0
        return -1;
531
0
    }
532
533
123
    va_start(va, ffd);
534
248
    while ((key = va_arg(va, char *))) {
535
125
        value = va_arg(va, char *);
536
125
        if (!value) {
537
            /* Wrong parameter */
538
0
            va_end(va);
539
0
            return -1;
540
0
        }
541
542
125
        ret = flb_output_set_property(o_ins, key, value);
543
125
        if (ret != 0) {
544
0
            va_end(va);
545
0
            return -1;
546
0
        }
547
125
    }
548
549
123
    va_end(va);
550
123
    return 0;
551
123
}
552
553
int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
554
0
{
555
0
    struct flb_output_instance *o_ins;
556
557
0
    o_ins = out_instance_get(ctx, ffd);
558
0
    if (!o_ins) {
559
0
        return -1;
560
0
    }
561
562
0
    if (o_ins->processor) {
563
0
        flb_processor_destroy(o_ins->processor);
564
0
    }
565
566
0
    o_ins->processor = proc;
567
568
0
    return 0;
569
0
}
570
571
int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name,
572
                            void (*cb)(char *, void *, void *))
573
0
{
574
0
    struct flb_output_instance *o_ins;
575
576
0
    o_ins = out_instance_get(ctx, ffd);
577
0
    if (!o_ins) {
578
0
        return -1;
579
0
    }
580
581
0
    return flb_callback_set(o_ins->callback, name, cb);
582
0
}
583
584
int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
585
                        void (*out_callback) (void *, int, int, void *, size_t, void *),
586
                        void *out_callback_data,
587
                        void *test_ctx)
588
0
{
589
0
    struct flb_output_instance *o_ins;
590
591
0
    o_ins = out_instance_get(ctx, ffd);
592
0
    if (!o_ins) {
593
0
        return -1;
594
0
    }
595
596
    /*
597
     * Enabling a test, set the output instance in 'test' mode, so no real
598
     * flush callback is invoked, only the desired implemented test.
599
     */
600
601
    /* Formatter test */
602
0
    if (strcmp(test_name, "formatter") == 0) {
603
0
        o_ins->test_mode = FLB_TRUE;
604
0
        o_ins->test_formatter.rt_ctx = ctx;
605
0
        o_ins->test_formatter.rt_ffd = ffd;
606
0
        o_ins->test_formatter.rt_out_callback = out_callback;
607
0
        o_ins->test_formatter.rt_data = out_callback_data;
608
0
        o_ins->test_formatter.flush_ctx = test_ctx;
609
0
    }
610
0
    else {
611
0
        return -1;
612
0
    }
613
614
0
    return 0;
615
0
}
616
617
/* Set an filter interface property */
618
int flb_filter_set(flb_ctx_t *ctx, int ffd, ...)
619
2
{
620
2
    int ret;
621
2
    char *key;
622
2
    char *value;
623
2
    va_list va;
624
2
    struct flb_filter_instance *f_ins;
625
626
2
    f_ins = filter_instance_get(ctx, ffd);
627
2
    if (!f_ins) {
628
0
        return -1;
629
0
    }
630
631
2
    va_start(va, ffd);
632
10
    while ((key = va_arg(va, char *))) {
633
8
        value = va_arg(va, char *);
634
8
        if (!value) {
635
            /* Wrong parameter */
636
0
            va_end(va);
637
0
            return -1;
638
0
        }
639
640
8
        ret = flb_filter_set_property(f_ins, key, value);
641
8
        if (ret != 0) {
642
0
            va_end(va);
643
0
            return -1;
644
0
        }
645
8
    }
646
647
2
    va_end(va);
648
2
    return 0;
649
2
}
650
651
/* Set a service property */
652
int flb_service_set(flb_ctx_t *ctx, ...)
653
109
{
654
109
    int ret;
655
109
    char *key;
656
109
    char *value;
657
109
    va_list va;
658
659
109
    va_start(va, ctx);
660
661
436
    while ((key = va_arg(va, char *))) {
662
327
        value = va_arg(va, char *);
663
327
        if (!value) {
664
            /* Wrong parameter */
665
0
            va_end(va);
666
0
            return -1;
667
0
        }
668
669
327
        ret = flb_config_set_property(ctx->config, key, value);
670
327
        if (ret != 0) {
671
0
            va_end(va);
672
0
            return -1;
673
0
        }
674
327
    }
675
676
109
    va_end(va);
677
109
    return 0;
678
109
}
679
680
/* Load a configuration file that may be used by the input or output plugin */
681
int flb_lib_config_file(struct flb_lib_ctx *ctx, const char *path)
682
0
{
683
0
    struct flb_cf *cf;
684
0
    int ret;
685
0
    char tmp[PATH_MAX + 1];
686
0
    char *cfg = NULL;
687
0
    char *end;
688
0
    char *real_path;
689
0
    struct stat st;
690
691
    /* Check if file exists and resolve path */
692
0
    ret = stat(path, &st);
693
0
    if (ret == -1 && errno == ENOENT) {
694
        /* Try to resolve the real path (if exists) */
695
0
        if (path[0] == '/') {
696
0
            fprintf(stderr, "Error: configuration file not found: %s\n", path);
697
0
            return -1;
698
0
        }
699
700
0
        if (ctx->config->conf_path) {
701
0
            snprintf(tmp, PATH_MAX, "%s%s", ctx->config->conf_path, path);
702
0
            cfg = tmp;
703
0
        }
704
0
        else {
705
0
            cfg = (char *) path;
706
0
        }
707
0
    }
708
0
    else {
709
0
        cfg = (char *) path;
710
0
    }
711
712
0
    if (access(cfg, R_OK) != 0) {
713
0
        perror("access");
714
0
        fprintf(stderr, "Error: cannot read configuration file: %s\n", cfg);
715
0
        return -1;
716
0
    }
717
718
    /* Use modern config format API that supports both .conf and .yaml/.yml */
719
0
    cf = flb_cf_create_from_file(NULL, cfg);
720
0
    if (!cf) {
721
0
        fprintf(stderr, "Error reading configuration file: %s\n", cfg);
722
0
        return -1;
723
0
    }
724
725
    /* Set configuration root path */
726
0
    if (cfg) {
727
0
        real_path = realpath(cfg, NULL);
728
0
        if (real_path) {
729
0
            end = strrchr(real_path, FLB_DIRCHAR);
730
0
            if (end) {
731
0
                end++;
732
0
                *end = '\0';
733
0
                if (ctx->config->conf_path) {
734
0
                    flb_free(ctx->config->conf_path);
735
0
                }
736
0
                ctx->config->conf_path = flb_strdup(real_path);
737
0
            }
738
0
            free(real_path);
739
0
        }
740
0
    }
741
742
    /* Load the configuration format into the config */
743
0
    ret = flb_config_load_config_format(ctx->config, cf);
744
0
    if (ret != 0) {
745
0
        flb_cf_destroy(cf);
746
0
        fprintf(stderr, "Error loading configuration from file: %s\n", cfg);
747
0
        return -1;
748
0
    }
749
750
    /* Destroy old cf_main if it exists (created by flb_config_init) */
751
0
    if (ctx->config->cf_main) {
752
0
        flb_cf_destroy(ctx->config->cf_main);
753
0
    }
754
755
    /* Store the config format object */
756
0
    ctx->config->cf_main = cf;
757
758
0
    return 0;
759
0
}
760
761
/* This is a wrapper to release a buffer which comes from out_lib_flush() */
762
int flb_lib_free(void* data)
763
0
{
764
0
    if (data == NULL) {
765
0
        return -1;
766
0
    }
767
0
    flb_free(data);
768
0
    return 0;
769
0
}
770
771
static int flb_input_run_formatter(flb_ctx_t *ctx, struct flb_input_instance *i_ins,
772
                                   const void *data, size_t len)
773
0
{
774
0
    int ret;
775
0
    void *out_buf = NULL;
776
0
    size_t out_size = 0;
777
0
    struct flb_test_in_formatter *itf;
778
779
0
    if (!i_ins) {
780
0
        return -1;
781
0
    }
782
783
0
    itf = &i_ins->test_formatter;
784
785
    /* Invoke the input plugin formatter test callback */
786
0
    ret = itf->callback(ctx->config,
787
0
                        i_ins,
788
0
                        i_ins->context,
789
0
                        data, len,
790
0
                        &out_buf, &out_size);
791
792
    /* Call the runtime test callback checker */
793
0
    if (itf->rt_in_callback) {
794
0
        itf->rt_in_callback(itf->rt_ctx,
795
0
                            itf->rt_ffd,
796
0
                            ret,
797
0
                            out_buf, out_size,
798
0
                            itf->rt_data);
799
0
    }
800
0
    else {
801
0
        flb_free(out_buf);
802
0
    }
803
804
0
    return 0;
805
0
}
806
807
static int flb_output_run_response(flb_ctx_t *ctx, struct flb_output_instance *o_ins,
808
                                   int status, const void *data, size_t len)
809
0
{
810
0
    int ret;
811
0
    void *out_buf = NULL;
812
0
    size_t out_size = 0;
813
0
    struct flb_test_out_response *resp;
814
815
0
    if (!o_ins) {
816
0
        return -1;
817
0
    }
818
819
0
    resp = &o_ins->test_response;
820
821
    /* Invoke the input plugin formatter test callback */
822
0
    ret = resp->callback(ctx->config,
823
0
                         o_ins->context,
824
0
                         status, data, len,
825
0
                         &out_buf, &out_size);
826
827
    /* Call the runtime test callback checker */
828
0
    if (resp->rt_out_response) {
829
0
        resp->rt_out_response(resp->rt_ctx,
830
0
                              resp->rt_ffd,
831
0
                              ret,
832
0
                              out_buf, out_size,
833
0
                              resp->rt_data);
834
0
    }
835
0
    else {
836
0
        flb_free(out_buf);
837
0
    }
838
839
0
    return 0;
840
0
}
841
842
/* Push some data into the Engine */
843
int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
844
22.5M
{
845
22.5M
    int ret;
846
22.5M
    struct flb_input_instance *i_ins;
847
848
22.5M
    if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
849
6
        flb_error("[lib] cannot push data, engine is not running");
850
6
        return -1;
851
6
    }
852
853
22.5M
    i_ins = in_instance_get(ctx, ffd);
854
22.5M
    if (!i_ins) {
855
0
        return -1;
856
0
    }
857
858
    /* If input's test_formatter is registered, priorize to run it. */
859
22.5M
    if (i_ins->test_formatter.callback != NULL) {
860
0
        ret = flb_input_run_formatter(ctx, i_ins, data, len);
861
0
    }
862
22.5M
    else {
863
22.5M
        ret = flb_pipe_w(i_ins->channel[1], data, len);
864
22.5M
        if (ret == -1) {
865
0
            flb_pipe_error();
866
0
            return -1;
867
0
        }
868
22.5M
    }
869
22.5M
    return ret;
870
22.5M
}
871
872
/* Emulate some data from the response */
873
int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len)
874
0
{
875
0
    int ret = -1;
876
0
    struct flb_output_instance *o_ins;
877
878
0
    if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
879
0
        flb_error("[lib] cannot push data, engine is not running");
880
0
        return -1;
881
0
    }
882
883
0
    o_ins = out_instance_get(ctx, ffd);
884
0
    if (!o_ins) {
885
0
        return -1;
886
0
    }
887
888
    /* If output's test_response callback is registered, prioritize to run it. */
889
0
    if (o_ins->test_response.callback != NULL) {
890
0
        ret = flb_output_run_response(ctx, o_ins, status, data, len);
891
0
    }
892
0
    return ret;
893
0
}
894
895
static void flb_lib_worker(void *data)
896
109
{
897
109
    int ret;
898
109
    flb_ctx_t *ctx = data;
899
109
    struct flb_config *config;
900
901
109
    config = ctx->config;
902
109
    flb_context_set(ctx);
903
109
    mk_utils_worker_rename("flb-pipeline");
904
109
    ret = flb_engine_start(config);
905
109
    if (ret == -1) {
906
2
        flb_engine_failed(config);
907
2
        flb_engine_shutdown(config);
908
2
    }
909
109
    config->exit_status_code = ret;
910
109
    ctx->status = FLB_LIB_NONE;
911
109
}
912
913
/* Return the current time to be used by lib callers */
914
double flb_time_now()
915
4
{
916
4
    struct flb_time t;
917
918
4
    flb_time_get(&t);
919
4
    return flb_time_to_double(&t);
920
4
}
921
922
int static do_start(flb_ctx_t *ctx)
923
109
{
924
109
    int fd;
925
109
    int bytes;
926
109
    int ret;
927
109
    uint64_t val;
928
109
    pthread_t tid;
929
109
    struct mk_event *event;
930
109
    struct flb_config *config;
931
932
109
    pthread_once(&flb_lib_once, flb_init_env);
933
934
109
    flb_debug("[lib] context set: %p", ctx);
935
936
    /* set context as the last active one */
937
938
    /* spawn worker thread */
939
109
    config = ctx->config;
940
109
    ret = mk_utils_worker_spawn(flb_lib_worker, ctx, &tid);
941
109
    if (ret == -1) {
942
0
        return -1;
943
0
    }
944
109
    config->worker = tid;
945
946
    /* Wait for the started signal so we can return to the caller */
947
109
    mk_event_wait(config->ch_evl);
948
109
    mk_event_foreach(event, config->ch_evl) {
949
109
        fd = event->fd;
950
109
        bytes = flb_pipe_r(fd, &val, sizeof(uint64_t));
951
109
        if (bytes <= 0) {
952
#if defined(FLB_SYSTEM_MACOS)
953
            pthread_cancel(tid);
954
#endif
955
0
            pthread_join(tid, NULL);
956
0
            ctx->status = FLB_LIB_ERROR;
957
0
            return -1;
958
0
        }
959
960
109
        if (val == FLB_ENGINE_STARTED) {
961
107
            flb_debug("[lib] backend started");
962
107
            ctx->status = FLB_LIB_OK;
963
107
            break;
964
107
        }
965
2
        else if (val == FLB_ENGINE_FAILED) {
966
2
            flb_debug("[lib] backend failed");
967
#if defined(FLB_SYSTEM_MACOS)
968
            pthread_cancel(tid);
969
#endif
970
2
            pthread_join(tid, NULL);
971
2
            ctx->status = FLB_LIB_ERROR;
972
2
            return -1;
973
2
        }
974
0
        else {
975
0
            flb_error("[lib] other error");
976
0
        }
977
109
    }
978
979
107
    return 0;
980
109
}
981
982
/* Start the engine */
983
int flb_start(flb_ctx_t *ctx)
984
109
{
985
109
    int ret;
986
987
109
    ret = do_start(ctx);
988
109
    if (ret == 0) {
989
        /* set context as the last active one */
990
107
        flb_context_set(ctx);
991
107
    }
992
993
109
    return ret;
994
109
}
995
996
/* Start the engine without setting the global context */
997
int flb_start_trace(flb_ctx_t *ctx)
998
0
{
999
0
    return do_start(ctx);
1000
0
}
1001
1002
int flb_loop(flb_ctx_t *ctx)
1003
0
{
1004
0
    while (ctx->status == FLB_LIB_OK) {
1005
0
        sleep(1);
1006
0
    }
1007
0
    return 0;
1008
0
}
1009
1010
/* Stop the engine */
1011
int flb_stop(flb_ctx_t *ctx)
1012
107
{
1013
107
    int ret;
1014
107
    pthread_t tid;
1015
1016
107
    flb_debug("[lib] ctx stop address: %p, config context=%p\n", ctx, ctx->config);
1017
1018
107
    tid = ctx->config->worker;
1019
1020
107
    if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
1021
        /*
1022
         * There is a chance the worker thread is still active while
1023
         * the service exited for some reason (plugin action). Always
1024
         * wait and double check that the child thread is not running.
1025
         */
1026
#if defined(FLB_SYSTEM_MACOS)
1027
        pthread_cancel(tid);
1028
#endif
1029
0
        pthread_join(tid, NULL);
1030
0
        return 0;
1031
0
    }
1032
1033
107
    if (!ctx->config) {
1034
0
        return 0;
1035
0
    }
1036
1037
107
    if (ctx->config->cf_main) {
1038
107
        flb_cf_destroy(ctx->config->cf_main);
1039
107
        ctx->config->cf_main = NULL;
1040
107
    }
1041
1042
107
    flb_debug("[lib] sending STOP signal to the engine");
1043
1044
107
    flb_engine_exit(ctx->config);
1045
#if defined(FLB_SYSTEM_MACOS)
1046
    pthread_cancel(tid);
1047
#endif
1048
107
    ret = pthread_join(tid, NULL);
1049
107
    if (ret != 0) {
1050
0
        flb_errno();
1051
0
    }
1052
107
    flb_debug("[lib] Fluent Bit engine stopped");
1053
1054
107
    return ret;
1055
107
}
1056
1057
1058
void flb_context_set(flb_ctx_t *ctx)
1059
216
{
1060
216
    FLB_TLS_SET(flb_lib_active_context, ctx);
1061
216
}
1062
1063
flb_ctx_t *flb_context_get()
1064
0
{
1065
0
    flb_ctx_t *ctx;
1066
1067
0
    ctx = FLB_TLS_GET(flb_lib_active_context);
1068
0
    return ctx;
1069
0
}
1070
1071
void flb_cf_context_set(struct flb_cf *cf)
1072
0
{
1073
0
    FLB_TLS_SET(flb_lib_active_cf_context, cf);
1074
0
}
1075
1076
struct flb_cf *flb_cf_context_get()
1077
0
{
1078
0
    struct flb_cf *cf;
1079
1080
0
    cf = FLB_TLS_GET(flb_lib_active_cf_context);
1081
0
    return cf;
1082
0
}