/src/fluent-bit/src/flb_lib.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit Demo |
4 | | * =============== |
5 | | * Copyright (C) 2015-2026 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | |
21 | | #include <fluent-bit/flb_lib.h> |
22 | | #include <fluent-bit/flb_mem.h> |
23 | | #include <fluent-bit/flb_compat.h> |
24 | | #include <fluent-bit/flb_pipe.h> |
25 | | #include <fluent-bit/flb_engine.h> |
26 | | #include <fluent-bit/flb_input.h> |
27 | | #include <fluent-bit/flb_output.h> |
28 | | #include <fluent-bit/flb_filter.h> |
29 | | #include <fluent-bit/flb_utils.h> |
30 | | #include <fluent-bit/flb_time.h> |
31 | | #include <fluent-bit/flb_coro.h> |
32 | | #include <fluent-bit/flb_callback.h> |
33 | | #include <fluent-bit/flb_kv.h> |
34 | | #include <fluent-bit/flb_metrics.h> |
35 | | #include <fluent-bit/flb_upstream.h> |
36 | | #include <fluent-bit/flb_downstream.h> |
37 | | #include <fluent-bit/tls/flb_tls.h> |
38 | | #include <fluent-bit/config_format/flb_cf.h> |
39 | | |
40 | | #include <signal.h> |
41 | | #include <stdarg.h> |
42 | | #include <sys/stat.h> |
43 | | #include <errno.h> |
44 | | #include <stdlib.h> |
45 | | |
46 | | #ifdef FLB_HAVE_MTRACE |
47 | | #include <mcheck.h> |
48 | | #endif |
49 | | |
50 | | #ifdef FLB_HAVE_AWS_ERROR_REPORTER |
51 | | #include <fluent-bit/aws/flb_aws_error_reporter.h> |
52 | | |
53 | | struct flb_aws_error_reporter *error_reporter; |
54 | | #endif |
55 | | |
56 | | /* thread initializator */ |
57 | | static pthread_once_t flb_lib_once = PTHREAD_ONCE_INIT; |
58 | | |
59 | | /* reference to the last 'flb_lib_ctx' context started through flb_start() */ |
60 | | FLB_TLS_DEFINE(flb_ctx_t, flb_lib_active_context); |
61 | | |
62 | | /* reference to the last 'flb_cf' context started through flb_start() */ |
63 | | FLB_TLS_DEFINE(struct flb_cf, flb_lib_active_cf_context); |
64 | | |
65 | | #ifdef FLB_SYSTEM_WINDOWS |
66 | | static inline int flb_socket_init_win32(void) |
67 | | { |
68 | | WSADATA wsaData; |
69 | | int err; |
70 | | |
71 | | err = WSAStartup(MAKEWORD(2, 2), &wsaData); |
72 | | if (err != 0) { |
73 | | fprintf(stderr, "WSAStartup failed with error: %d\n", err); |
74 | | return err; |
75 | | } |
76 | | return 0; |
77 | | } |
78 | | #endif |
79 | | |
80 | | static inline struct flb_input_instance *in_instance_get(flb_ctx_t *ctx, |
81 | | int ffd) |
82 | 22.5M | { |
83 | 22.5M | struct mk_list *head; |
84 | 22.5M | struct flb_input_instance *i_ins; |
85 | | |
86 | 22.5M | mk_list_foreach(head, &ctx->config->inputs) { |
87 | 22.5M | i_ins = mk_list_entry(head, struct flb_input_instance, _head); |
88 | 22.5M | if (i_ins->id == ffd) { |
89 | 22.5M | return i_ins; |
90 | 22.5M | } |
91 | 22.5M | } |
92 | | |
93 | 0 | return NULL; |
94 | 22.5M | } |
95 | | |
96 | | static inline struct flb_output_instance *out_instance_get(flb_ctx_t *ctx, |
97 | | int ffd) |
98 | 123 | { |
99 | 123 | struct mk_list *head; |
100 | 123 | struct flb_output_instance *o_ins; |
101 | | |
102 | 123 | mk_list_foreach(head, &ctx->config->outputs) { |
103 | 123 | o_ins = mk_list_entry(head, struct flb_output_instance, _head); |
104 | 123 | if (o_ins->id == ffd) { |
105 | 123 | return o_ins; |
106 | 123 | } |
107 | 123 | } |
108 | | |
109 | 0 | return NULL; |
110 | 123 | } |
111 | | |
112 | | static inline struct flb_filter_instance *filter_instance_get(flb_ctx_t *ctx, |
113 | | int ffd) |
114 | 2 | { |
115 | 2 | struct mk_list *head; |
116 | 2 | struct flb_filter_instance *f_ins; |
117 | | |
118 | 2 | mk_list_foreach(head, &ctx->config->filters) { |
119 | 2 | f_ins = mk_list_entry(head, struct flb_filter_instance, _head); |
120 | 2 | if (f_ins->id == ffd) { |
121 | 2 | return f_ins; |
122 | 2 | } |
123 | 2 | } |
124 | | |
125 | 0 | return NULL; |
126 | 2 | } |
127 | | |
128 | | void flb_init_env() |
129 | 3 | { |
130 | 3 | flb_tls_init(); |
131 | 3 | flb_coro_init(); |
132 | 3 | flb_upstream_init(); |
133 | 3 | flb_downstream_init(); |
134 | 3 | flb_output_prepare(); |
135 | | |
136 | 3 | FLB_TLS_INIT(flb_lib_active_context); |
137 | 3 | FLB_TLS_INIT(flb_lib_active_cf_context); |
138 | | |
139 | | /* libraries */ |
140 | 3 | cmt_initialize(); |
141 | 3 | } |
142 | | |
143 | | flb_ctx_t *flb_create() |
144 | 109 | { |
145 | 109 | int ret; |
146 | 109 | flb_ctx_t *ctx; |
147 | 109 | struct flb_config *config; |
148 | | |
149 | | #ifdef FLB_HAVE_MTRACE |
150 | | /* Start tracing malloc and free */ |
151 | | mtrace(); |
152 | | #endif |
153 | | |
154 | | #ifdef FLB_SYSTEM_WINDOWS |
155 | | /* Ensure we initialized Windows Sockets */ |
156 | | if (flb_socket_init_win32()) { |
157 | | return NULL; |
158 | | } |
159 | | #endif |
160 | | |
161 | 109 | ctx = flb_calloc(1, sizeof(flb_ctx_t)); |
162 | 109 | if (!ctx) { |
163 | 0 | perror("malloc"); |
164 | 0 | return NULL; |
165 | 0 | } |
166 | | |
167 | 109 | config = flb_config_init(); |
168 | 109 | if (!config) { |
169 | 0 | flb_free(ctx); |
170 | 0 | return NULL; |
171 | 0 | } |
172 | 109 | ctx->config = config; |
173 | 109 | ctx->status = FLB_LIB_NONE; |
174 | | |
175 | | /* |
176 | | * Initialize our pipe to send data to our worker, used |
177 | | * by 'lib' input plugin. |
178 | | */ |
179 | 109 | ret = flb_pipe_create(config->ch_data); |
180 | 109 | if (ret == -1) { |
181 | 0 | perror("pipe"); |
182 | 0 | flb_config_exit(ctx->config); |
183 | 0 | flb_free(ctx); |
184 | 0 | return NULL; |
185 | 0 | } |
186 | | |
187 | | /* Create the event loop to receive notifications */ |
188 | 109 | ctx->event_loop = mk_event_loop_create(256); |
189 | 109 | if (!ctx->event_loop) { |
190 | 0 | flb_config_exit(ctx->config); |
191 | 0 | flb_free(ctx); |
192 | 0 | return NULL; |
193 | 0 | } |
194 | 109 | config->ch_evl = ctx->event_loop; |
195 | | |
196 | | /* Prepare the notification channels */ |
197 | 109 | ctx->event_channel = flb_calloc(1, sizeof(struct mk_event)); |
198 | 109 | if (!ctx->event_channel) { |
199 | 0 | perror("calloc"); |
200 | 0 | flb_config_exit(ctx->config); |
201 | 0 | flb_free(ctx); |
202 | 0 | return NULL; |
203 | 0 | } |
204 | | |
205 | 109 | MK_EVENT_ZERO(ctx->event_channel); |
206 | | |
207 | 109 | ret = mk_event_channel_create(config->ch_evl, |
208 | 109 | &config->ch_notif[0], |
209 | 109 | &config->ch_notif[1], |
210 | 109 | ctx->event_channel); |
211 | 109 | if (ret != 0) { |
212 | 0 | flb_error("[lib] could not create notification channels"); |
213 | 0 | flb_stop(ctx); |
214 | 0 | flb_destroy(ctx); |
215 | 0 | return NULL; |
216 | 0 | } |
217 | | |
218 | | #ifdef FLB_HAVE_AWS_ERROR_REPORTER |
219 | | if (is_error_reporting_enabled()) { |
220 | | error_reporter = flb_aws_error_reporter_create(); |
221 | | } |
222 | | #endif |
223 | | |
224 | 109 | return ctx; |
225 | 109 | } |
226 | | |
227 | | /* Release resources associated to the library context */ |
228 | | void flb_destroy(flb_ctx_t *ctx) |
229 | 107 | { |
230 | 107 | if (!ctx) { |
231 | 0 | return; |
232 | 0 | } |
233 | | |
234 | 107 | if (ctx->event_channel) { |
235 | 107 | mk_event_del(ctx->event_loop, ctx->event_channel); |
236 | 107 | flb_free(ctx->event_channel); |
237 | 107 | } |
238 | | |
239 | | /* Remove resources from the event loop */ |
240 | 107 | mk_event_loop_destroy(ctx->event_loop); |
241 | | |
242 | | /* cfg->is_running is set to false when flb_engine_shutdown has been invoked (event loop) */ |
243 | 107 | if (ctx->config) { |
244 | 107 | if (ctx->config->is_running == FLB_TRUE) { |
245 | 0 | flb_engine_shutdown(ctx->config); |
246 | 0 | } |
247 | 107 | flb_config_exit(ctx->config); |
248 | 107 | } |
249 | | |
250 | | #ifdef FLB_HAVE_AWS_ERROR_REPORTER |
251 | | if (is_error_reporting_enabled()) { |
252 | | flb_aws_error_reporter_destroy(error_reporter); |
253 | | } |
254 | | #endif |
255 | | |
256 | 107 | flb_free(ctx); |
257 | 107 | ctx = NULL; |
258 | | |
259 | | #ifdef FLB_HAVE_MTRACE |
260 | | /* Stop tracing malloc and free */ |
261 | | muntrace(); |
262 | | #endif |
263 | 107 | } |
264 | | |
265 | | /* Defines a new input instance */ |
266 | | int flb_input(flb_ctx_t *ctx, const char *input, void *data) |
267 | 242 | { |
268 | 242 | struct flb_input_instance *i_ins; |
269 | | |
270 | 242 | i_ins = flb_input_new(ctx->config, input, data, FLB_TRUE); |
271 | 242 | if (!i_ins) { |
272 | 123 | return -1; |
273 | 123 | } |
274 | | |
275 | 119 | return i_ins->id; |
276 | 242 | } |
277 | | |
278 | | /* Defines a new output instance */ |
279 | | int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb) |
280 | 290 | { |
281 | 290 | struct flb_output_instance *o_ins; |
282 | | |
283 | 290 | o_ins = flb_output_new(ctx->config, output, cb, FLB_TRUE); |
284 | 290 | if (!o_ins) { |
285 | 135 | return -1; |
286 | 135 | } |
287 | | |
288 | 155 | return o_ins->id; |
289 | 290 | } |
290 | | |
291 | | /* Defines a new filter instance */ |
292 | | int flb_filter(flb_ctx_t *ctx, const char *filter, void *data) |
293 | 132 | { |
294 | 132 | struct flb_filter_instance *f_ins; |
295 | | |
296 | 132 | f_ins = flb_filter_new(ctx->config, filter, data); |
297 | 132 | if (!f_ins) { |
298 | 121 | return -1; |
299 | 121 | } |
300 | | |
301 | 11 | return f_ins->id; |
302 | 132 | } |
303 | | |
304 | | /* Set an input interface property */ |
305 | | int flb_input_set(flb_ctx_t *ctx, int ffd, ...) |
306 | 119 | { |
307 | 119 | int ret; |
308 | 119 | char *key; |
309 | 119 | char *value; |
310 | 119 | va_list va; |
311 | 119 | struct flb_input_instance *i_ins; |
312 | | |
313 | 119 | i_ins = in_instance_get(ctx, ffd); |
314 | 119 | if (!i_ins) { |
315 | 0 | return -1; |
316 | 0 | } |
317 | | |
318 | 119 | va_start(va, ffd); |
319 | 226 | while ((key = va_arg(va, char *))) { |
320 | 119 | value = va_arg(va, char *); |
321 | 119 | if (!value) { |
322 | | /* Wrong parameter */ |
323 | 12 | va_end(va); |
324 | 12 | return -1; |
325 | 12 | } |
326 | 107 | ret = flb_input_set_property(i_ins, key, value); |
327 | 107 | if (ret != 0) { |
328 | 0 | va_end(va); |
329 | 0 | return -1; |
330 | 0 | } |
331 | 107 | } |
332 | | |
333 | 119 | va_end(va); |
334 | 107 | return 0; |
335 | 119 | } |
336 | | |
337 | | int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) |
338 | 0 | { |
339 | 0 | struct flb_input_instance *i_ins; |
340 | |
|
341 | 0 | i_ins = in_instance_get(ctx, ffd); |
342 | 0 | if (!i_ins) { |
343 | 0 | return -1; |
344 | 0 | } |
345 | | |
346 | 0 | if (i_ins->processor) { |
347 | 0 | flb_processor_destroy(i_ins->processor); |
348 | 0 | } |
349 | |
|
350 | 0 | i_ins->processor = proc; |
351 | |
|
352 | 0 | return 0; |
353 | 0 | } |
354 | | |
355 | | int flb_input_set_test(flb_ctx_t *ctx, int ffd, char *test_name, |
356 | | void (*in_callback) (void *, int, int, void *, size_t, void *), |
357 | | void *in_callback_data) |
358 | 0 | { |
359 | 0 | struct flb_input_instance *i_ins; |
360 | |
|
361 | 0 | i_ins = in_instance_get(ctx, ffd); |
362 | 0 | if (!i_ins) { |
363 | 0 | return -1; |
364 | 0 | } |
365 | | |
366 | | /* |
367 | | * Enabling a test, set the output instance in 'test' mode, so no real |
368 | | * flush callback is invoked, only the desired implemented test. |
369 | | */ |
370 | | |
371 | | /* Formatter test */ |
372 | 0 | if (strcmp(test_name, "formatter") == 0) { |
373 | 0 | i_ins->test_mode = FLB_TRUE; |
374 | 0 | i_ins->test_formatter.rt_ctx = ctx; |
375 | 0 | i_ins->test_formatter.rt_ffd = ffd; |
376 | 0 | i_ins->test_formatter.rt_in_callback = in_callback; |
377 | 0 | i_ins->test_formatter.rt_data = in_callback_data; |
378 | 0 | } |
379 | 0 | else { |
380 | 0 | return -1; |
381 | 0 | } |
382 | | |
383 | 0 | return 0; |
384 | 0 | } |
385 | | |
386 | | int flb_output_set_http_test(flb_ctx_t *ctx, int ffd, char *test_name, |
387 | | void (*out_response) (void *, int, int, void *, size_t, void *), |
388 | | void *out_callback_data) |
389 | 0 | { |
390 | 0 | struct flb_output_instance *o_ins; |
391 | |
|
392 | 0 | o_ins = out_instance_get(ctx, ffd); |
393 | 0 | if (!o_ins) { |
394 | 0 | return -1; |
395 | 0 | } |
396 | | |
397 | | /* |
398 | | * Enabling a test, set the output instance in 'test' mode, so no real |
399 | | * flush callback is invoked, only the desired implemented test. |
400 | | */ |
401 | | |
402 | | /* Response test */ |
403 | 0 | if (strcmp(test_name, "response") == 0) { |
404 | 0 | o_ins->test_mode = FLB_TRUE; |
405 | 0 | o_ins->test_response.rt_ctx = ctx; |
406 | 0 | o_ins->test_response.rt_ffd = ffd; |
407 | 0 | o_ins->test_response.rt_out_response = out_response; |
408 | 0 | o_ins->test_response.rt_data = out_callback_data; |
409 | 0 | } |
410 | 0 | else { |
411 | 0 | return -1; |
412 | 0 | } |
413 | | |
414 | 0 | return 0; |
415 | 0 | } |
416 | | |
417 | | static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val) |
418 | 0 | { |
419 | 0 | struct flb_kv *kv; |
420 | 0 | struct mk_list properties; |
421 | 0 | int r; |
422 | |
|
423 | 0 | mk_list_init(&properties); |
424 | |
|
425 | 0 | kv = flb_kv_item_create(&properties, (char *) key, (char *) val); |
426 | 0 | if (!kv) { |
427 | 0 | return FLB_LIB_ERROR; |
428 | 0 | } |
429 | | |
430 | 0 | r = flb_config_map_properties_check(plugin_name, &properties, config_map); |
431 | 0 | flb_kv_item_destroy(kv); |
432 | 0 | return r; |
433 | 0 | } |
434 | | |
435 | | /* Check if a given k, v is a valid config directive for the given output plugin */ |
436 | | int flb_output_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val) |
437 | 0 | { |
438 | 0 | struct flb_output_instance *o_ins; |
439 | 0 | struct mk_list *config_map; |
440 | 0 | struct flb_output_plugin *p; |
441 | 0 | int r; |
442 | |
|
443 | 0 | o_ins = out_instance_get(ctx, ffd); |
444 | 0 | if (!o_ins) { |
445 | 0 | return FLB_LIB_ERROR; |
446 | 0 | } |
447 | | |
448 | 0 | p = o_ins->p; |
449 | 0 | if (!p->config_map) { |
450 | 0 | return FLB_LIB_NO_CONFIG_MAP; |
451 | 0 | } |
452 | | |
453 | 0 | config_map = flb_config_map_create(ctx->config, p->config_map); |
454 | 0 | if (!config_map) { |
455 | 0 | return FLB_LIB_ERROR; |
456 | 0 | } |
457 | | |
458 | 0 | r = flb_config_map_property_check(p->name, config_map, key, val); |
459 | 0 | flb_config_map_destroy(config_map); |
460 | 0 | return r; |
461 | 0 | } |
462 | | |
463 | | /* Check if a given k, v is a valid config directive for the given input plugin */ |
464 | | int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val) |
465 | 0 | { |
466 | 0 | struct flb_input_instance *i_ins; |
467 | 0 | struct flb_input_plugin *p; |
468 | 0 | struct mk_list *config_map; |
469 | 0 | int r; |
470 | |
|
471 | 0 | i_ins = in_instance_get(ctx, ffd); |
472 | 0 | if (!i_ins) { |
473 | 0 | return FLB_LIB_ERROR; |
474 | 0 | } |
475 | | |
476 | 0 | p = i_ins->p; |
477 | 0 | if (!p->config_map) { |
478 | 0 | return FLB_LIB_NO_CONFIG_MAP; |
479 | 0 | } |
480 | | |
481 | 0 | config_map = flb_config_map_create(ctx->config, p->config_map); |
482 | 0 | if (!config_map) { |
483 | 0 | return FLB_LIB_ERROR; |
484 | 0 | } |
485 | | |
486 | 0 | r = flb_config_map_property_check(p->name, config_map, key, val); |
487 | 0 | flb_config_map_destroy(config_map); |
488 | 0 | return r; |
489 | 0 | } |
490 | | |
491 | | /* Check if a given k, v is a valid config directive for the given filter plugin */ |
492 | | int flb_filter_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val) |
493 | 0 | { |
494 | 0 | struct flb_filter_instance *f_ins; |
495 | 0 | struct flb_filter_plugin *p; |
496 | 0 | struct mk_list *config_map; |
497 | 0 | int r; |
498 | |
|
499 | 0 | f_ins = filter_instance_get(ctx, ffd); |
500 | 0 | if (!f_ins) { |
501 | 0 | return FLB_LIB_ERROR; |
502 | 0 | } |
503 | | |
504 | 0 | p = f_ins->p; |
505 | 0 | if (!p->config_map) { |
506 | 0 | return FLB_LIB_NO_CONFIG_MAP; |
507 | 0 | } |
508 | | |
509 | 0 | config_map = flb_config_map_create(ctx->config, p->config_map); |
510 | 0 | if (!config_map) { |
511 | 0 | return FLB_LIB_ERROR; |
512 | 0 | } |
513 | | |
514 | 0 | r = flb_config_map_property_check(p->name, config_map, key, val); |
515 | 0 | flb_config_map_destroy(config_map); |
516 | 0 | return r; |
517 | 0 | } |
518 | | |
519 | | /* Set an output interface property */ |
520 | | int flb_output_set(flb_ctx_t *ctx, int ffd, ...) |
521 | 123 | { |
522 | 123 | int ret; |
523 | 123 | char *key; |
524 | 123 | char *value; |
525 | 123 | va_list va; |
526 | 123 | struct flb_output_instance *o_ins; |
527 | | |
528 | 123 | o_ins = out_instance_get(ctx, ffd); |
529 | 123 | if (!o_ins) { |
530 | 0 | return -1; |
531 | 0 | } |
532 | | |
533 | 123 | va_start(va, ffd); |
534 | 248 | while ((key = va_arg(va, char *))) { |
535 | 125 | value = va_arg(va, char *); |
536 | 125 | if (!value) { |
537 | | /* Wrong parameter */ |
538 | 0 | va_end(va); |
539 | 0 | return -1; |
540 | 0 | } |
541 | | |
542 | 125 | ret = flb_output_set_property(o_ins, key, value); |
543 | 125 | if (ret != 0) { |
544 | 0 | va_end(va); |
545 | 0 | return -1; |
546 | 0 | } |
547 | 125 | } |
548 | | |
549 | 123 | va_end(va); |
550 | 123 | return 0; |
551 | 123 | } |
552 | | |
553 | | int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc) |
554 | 0 | { |
555 | 0 | struct flb_output_instance *o_ins; |
556 | |
|
557 | 0 | o_ins = out_instance_get(ctx, ffd); |
558 | 0 | if (!o_ins) { |
559 | 0 | return -1; |
560 | 0 | } |
561 | | |
562 | 0 | if (o_ins->processor) { |
563 | 0 | flb_processor_destroy(o_ins->processor); |
564 | 0 | } |
565 | |
|
566 | 0 | o_ins->processor = proc; |
567 | |
|
568 | 0 | return 0; |
569 | 0 | } |
570 | | |
571 | | int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name, |
572 | | void (*cb)(char *, void *, void *)) |
573 | 0 | { |
574 | 0 | struct flb_output_instance *o_ins; |
575 | |
|
576 | 0 | o_ins = out_instance_get(ctx, ffd); |
577 | 0 | if (!o_ins) { |
578 | 0 | return -1; |
579 | 0 | } |
580 | | |
581 | 0 | return flb_callback_set(o_ins->callback, name, cb); |
582 | 0 | } |
583 | | |
584 | | int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name, |
585 | | void (*out_callback) (void *, int, int, void *, size_t, void *), |
586 | | void *out_callback_data, |
587 | | void *test_ctx) |
588 | 0 | { |
589 | 0 | struct flb_output_instance *o_ins; |
590 | |
|
591 | 0 | o_ins = out_instance_get(ctx, ffd); |
592 | 0 | if (!o_ins) { |
593 | 0 | return -1; |
594 | 0 | } |
595 | | |
596 | | /* |
597 | | * Enabling a test, set the output instance in 'test' mode, so no real |
598 | | * flush callback is invoked, only the desired implemented test. |
599 | | */ |
600 | | |
601 | | /* Formatter test */ |
602 | 0 | if (strcmp(test_name, "formatter") == 0) { |
603 | 0 | o_ins->test_mode = FLB_TRUE; |
604 | 0 | o_ins->test_formatter.rt_ctx = ctx; |
605 | 0 | o_ins->test_formatter.rt_ffd = ffd; |
606 | 0 | o_ins->test_formatter.rt_out_callback = out_callback; |
607 | 0 | o_ins->test_formatter.rt_data = out_callback_data; |
608 | 0 | o_ins->test_formatter.flush_ctx = test_ctx; |
609 | 0 | } |
610 | 0 | else { |
611 | 0 | return -1; |
612 | 0 | } |
613 | | |
614 | 0 | return 0; |
615 | 0 | } |
616 | | |
617 | | /* Set an filter interface property */ |
618 | | int flb_filter_set(flb_ctx_t *ctx, int ffd, ...) |
619 | 2 | { |
620 | 2 | int ret; |
621 | 2 | char *key; |
622 | 2 | char *value; |
623 | 2 | va_list va; |
624 | 2 | struct flb_filter_instance *f_ins; |
625 | | |
626 | 2 | f_ins = filter_instance_get(ctx, ffd); |
627 | 2 | if (!f_ins) { |
628 | 0 | return -1; |
629 | 0 | } |
630 | | |
631 | 2 | va_start(va, ffd); |
632 | 10 | while ((key = va_arg(va, char *))) { |
633 | 8 | value = va_arg(va, char *); |
634 | 8 | if (!value) { |
635 | | /* Wrong parameter */ |
636 | 0 | va_end(va); |
637 | 0 | return -1; |
638 | 0 | } |
639 | | |
640 | 8 | ret = flb_filter_set_property(f_ins, key, value); |
641 | 8 | if (ret != 0) { |
642 | 0 | va_end(va); |
643 | 0 | return -1; |
644 | 0 | } |
645 | 8 | } |
646 | | |
647 | 2 | va_end(va); |
648 | 2 | return 0; |
649 | 2 | } |
650 | | |
651 | | /* Set a service property */ |
652 | | int flb_service_set(flb_ctx_t *ctx, ...) |
653 | 109 | { |
654 | 109 | int ret; |
655 | 109 | char *key; |
656 | 109 | char *value; |
657 | 109 | va_list va; |
658 | | |
659 | 109 | va_start(va, ctx); |
660 | | |
661 | 436 | while ((key = va_arg(va, char *))) { |
662 | 327 | value = va_arg(va, char *); |
663 | 327 | if (!value) { |
664 | | /* Wrong parameter */ |
665 | 0 | va_end(va); |
666 | 0 | return -1; |
667 | 0 | } |
668 | | |
669 | 327 | ret = flb_config_set_property(ctx->config, key, value); |
670 | 327 | if (ret != 0) { |
671 | 0 | va_end(va); |
672 | 0 | return -1; |
673 | 0 | } |
674 | 327 | } |
675 | | |
676 | 109 | va_end(va); |
677 | 109 | return 0; |
678 | 109 | } |
679 | | |
680 | | /* Load a configuration file that may be used by the input or output plugin */ |
681 | | int flb_lib_config_file(struct flb_lib_ctx *ctx, const char *path) |
682 | 0 | { |
683 | 0 | struct flb_cf *cf; |
684 | 0 | int ret; |
685 | 0 | char tmp[PATH_MAX + 1]; |
686 | 0 | char *cfg = NULL; |
687 | 0 | char *end; |
688 | 0 | char *real_path; |
689 | 0 | struct stat st; |
690 | | |
691 | | /* Check if file exists and resolve path */ |
692 | 0 | ret = stat(path, &st); |
693 | 0 | if (ret == -1 && errno == ENOENT) { |
694 | | /* Try to resolve the real path (if exists) */ |
695 | 0 | if (path[0] == '/') { |
696 | 0 | fprintf(stderr, "Error: configuration file not found: %s\n", path); |
697 | 0 | return -1; |
698 | 0 | } |
699 | | |
700 | 0 | if (ctx->config->conf_path) { |
701 | 0 | snprintf(tmp, PATH_MAX, "%s%s", ctx->config->conf_path, path); |
702 | 0 | cfg = tmp; |
703 | 0 | } |
704 | 0 | else { |
705 | 0 | cfg = (char *) path; |
706 | 0 | } |
707 | 0 | } |
708 | 0 | else { |
709 | 0 | cfg = (char *) path; |
710 | 0 | } |
711 | | |
712 | 0 | if (access(cfg, R_OK) != 0) { |
713 | 0 | perror("access"); |
714 | 0 | fprintf(stderr, "Error: cannot read configuration file: %s\n", cfg); |
715 | 0 | return -1; |
716 | 0 | } |
717 | | |
718 | | /* Use modern config format API that supports both .conf and .yaml/.yml */ |
719 | 0 | cf = flb_cf_create_from_file(NULL, cfg); |
720 | 0 | if (!cf) { |
721 | 0 | fprintf(stderr, "Error reading configuration file: %s\n", cfg); |
722 | 0 | return -1; |
723 | 0 | } |
724 | | |
725 | | /* Set configuration root path */ |
726 | 0 | if (cfg) { |
727 | 0 | real_path = realpath(cfg, NULL); |
728 | 0 | if (real_path) { |
729 | 0 | end = strrchr(real_path, FLB_DIRCHAR); |
730 | 0 | if (end) { |
731 | 0 | end++; |
732 | 0 | *end = '\0'; |
733 | 0 | if (ctx->config->conf_path) { |
734 | 0 | flb_free(ctx->config->conf_path); |
735 | 0 | } |
736 | 0 | ctx->config->conf_path = flb_strdup(real_path); |
737 | 0 | } |
738 | 0 | free(real_path); |
739 | 0 | } |
740 | 0 | } |
741 | | |
742 | | /* Load the configuration format into the config */ |
743 | 0 | ret = flb_config_load_config_format(ctx->config, cf); |
744 | 0 | if (ret != 0) { |
745 | 0 | flb_cf_destroy(cf); |
746 | 0 | fprintf(stderr, "Error loading configuration from file: %s\n", cfg); |
747 | 0 | return -1; |
748 | 0 | } |
749 | | |
750 | | /* Destroy old cf_main if it exists (created by flb_config_init) */ |
751 | 0 | if (ctx->config->cf_main) { |
752 | 0 | flb_cf_destroy(ctx->config->cf_main); |
753 | 0 | } |
754 | | |
755 | | /* Store the config format object */ |
756 | 0 | ctx->config->cf_main = cf; |
757 | |
|
758 | 0 | return 0; |
759 | 0 | } |
760 | | |
761 | | /* This is a wrapper to release a buffer which comes from out_lib_flush() */ |
762 | | int flb_lib_free(void* data) |
763 | 0 | { |
764 | 0 | if (data == NULL) { |
765 | 0 | return -1; |
766 | 0 | } |
767 | 0 | flb_free(data); |
768 | 0 | return 0; |
769 | 0 | } |
770 | | |
771 | | static int flb_input_run_formatter(flb_ctx_t *ctx, struct flb_input_instance *i_ins, |
772 | | const void *data, size_t len) |
773 | 0 | { |
774 | 0 | int ret; |
775 | 0 | void *out_buf = NULL; |
776 | 0 | size_t out_size = 0; |
777 | 0 | struct flb_test_in_formatter *itf; |
778 | |
|
779 | 0 | if (!i_ins) { |
780 | 0 | return -1; |
781 | 0 | } |
782 | | |
783 | 0 | itf = &i_ins->test_formatter; |
784 | | |
785 | | /* Invoke the input plugin formatter test callback */ |
786 | 0 | ret = itf->callback(ctx->config, |
787 | 0 | i_ins, |
788 | 0 | i_ins->context, |
789 | 0 | data, len, |
790 | 0 | &out_buf, &out_size); |
791 | | |
792 | | /* Call the runtime test callback checker */ |
793 | 0 | if (itf->rt_in_callback) { |
794 | 0 | itf->rt_in_callback(itf->rt_ctx, |
795 | 0 | itf->rt_ffd, |
796 | 0 | ret, |
797 | 0 | out_buf, out_size, |
798 | 0 | itf->rt_data); |
799 | 0 | } |
800 | 0 | else { |
801 | 0 | flb_free(out_buf); |
802 | 0 | } |
803 | |
|
804 | 0 | return 0; |
805 | 0 | } |
806 | | |
807 | | static int flb_output_run_response(flb_ctx_t *ctx, struct flb_output_instance *o_ins, |
808 | | int status, const void *data, size_t len) |
809 | 0 | { |
810 | 0 | int ret; |
811 | 0 | void *out_buf = NULL; |
812 | 0 | size_t out_size = 0; |
813 | 0 | struct flb_test_out_response *resp; |
814 | |
|
815 | 0 | if (!o_ins) { |
816 | 0 | return -1; |
817 | 0 | } |
818 | | |
819 | 0 | resp = &o_ins->test_response; |
820 | | |
821 | | /* Invoke the input plugin formatter test callback */ |
822 | 0 | ret = resp->callback(ctx->config, |
823 | 0 | o_ins->context, |
824 | 0 | status, data, len, |
825 | 0 | &out_buf, &out_size); |
826 | | |
827 | | /* Call the runtime test callback checker */ |
828 | 0 | if (resp->rt_out_response) { |
829 | 0 | resp->rt_out_response(resp->rt_ctx, |
830 | 0 | resp->rt_ffd, |
831 | 0 | ret, |
832 | 0 | out_buf, out_size, |
833 | 0 | resp->rt_data); |
834 | 0 | } |
835 | 0 | else { |
836 | 0 | flb_free(out_buf); |
837 | 0 | } |
838 | |
|
839 | 0 | return 0; |
840 | 0 | } |
841 | | |
842 | | /* Push some data into the Engine */ |
843 | | int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) |
844 | 22.5M | { |
845 | 22.5M | int ret; |
846 | 22.5M | struct flb_input_instance *i_ins; |
847 | | |
848 | 22.5M | if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { |
849 | 6 | flb_error("[lib] cannot push data, engine is not running"); |
850 | 6 | return -1; |
851 | 6 | } |
852 | | |
853 | 22.5M | i_ins = in_instance_get(ctx, ffd); |
854 | 22.5M | if (!i_ins) { |
855 | 0 | return -1; |
856 | 0 | } |
857 | | |
858 | | /* If input's test_formatter is registered, priorize to run it. */ |
859 | 22.5M | if (i_ins->test_formatter.callback != NULL) { |
860 | 0 | ret = flb_input_run_formatter(ctx, i_ins, data, len); |
861 | 0 | } |
862 | 22.5M | else { |
863 | 22.5M | ret = flb_pipe_w(i_ins->channel[1], data, len); |
864 | 22.5M | if (ret == -1) { |
865 | 0 | flb_pipe_error(); |
866 | 0 | return -1; |
867 | 0 | } |
868 | 22.5M | } |
869 | 22.5M | return ret; |
870 | 22.5M | } |
871 | | |
872 | | /* Emulate some data from the response */ |
873 | | int flb_lib_response(flb_ctx_t *ctx, int ffd, int status, const void *data, size_t len) |
874 | 0 | { |
875 | 0 | int ret = -1; |
876 | 0 | struct flb_output_instance *o_ins; |
877 | |
|
878 | 0 | if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { |
879 | 0 | flb_error("[lib] cannot push data, engine is not running"); |
880 | 0 | return -1; |
881 | 0 | } |
882 | | |
883 | 0 | o_ins = out_instance_get(ctx, ffd); |
884 | 0 | if (!o_ins) { |
885 | 0 | return -1; |
886 | 0 | } |
887 | | |
888 | | /* If output's test_response callback is registered, prioritize to run it. */ |
889 | 0 | if (o_ins->test_response.callback != NULL) { |
890 | 0 | ret = flb_output_run_response(ctx, o_ins, status, data, len); |
891 | 0 | } |
892 | 0 | return ret; |
893 | 0 | } |
894 | | |
895 | | static void flb_lib_worker(void *data) |
896 | 109 | { |
897 | 109 | int ret; |
898 | 109 | flb_ctx_t *ctx = data; |
899 | 109 | struct flb_config *config; |
900 | | |
901 | 109 | config = ctx->config; |
902 | 109 | flb_context_set(ctx); |
903 | 109 | mk_utils_worker_rename("flb-pipeline"); |
904 | 109 | ret = flb_engine_start(config); |
905 | 109 | if (ret == -1) { |
906 | 2 | flb_engine_failed(config); |
907 | 2 | flb_engine_shutdown(config); |
908 | 2 | } |
909 | 109 | config->exit_status_code = ret; |
910 | 109 | ctx->status = FLB_LIB_NONE; |
911 | 109 | } |
912 | | |
913 | | /* Return the current time to be used by lib callers */ |
914 | | double flb_time_now() |
915 | 4 | { |
916 | 4 | struct flb_time t; |
917 | | |
918 | 4 | flb_time_get(&t); |
919 | 4 | return flb_time_to_double(&t); |
920 | 4 | } |
921 | | |
922 | | int static do_start(flb_ctx_t *ctx) |
923 | 109 | { |
924 | 109 | int fd; |
925 | 109 | int bytes; |
926 | 109 | int ret; |
927 | 109 | uint64_t val; |
928 | 109 | pthread_t tid; |
929 | 109 | struct mk_event *event; |
930 | 109 | struct flb_config *config; |
931 | | |
932 | 109 | pthread_once(&flb_lib_once, flb_init_env); |
933 | | |
934 | 109 | flb_debug("[lib] context set: %p", ctx); |
935 | | |
936 | | /* set context as the last active one */ |
937 | | |
938 | | /* spawn worker thread */ |
939 | 109 | config = ctx->config; |
940 | 109 | ret = mk_utils_worker_spawn(flb_lib_worker, ctx, &tid); |
941 | 109 | if (ret == -1) { |
942 | 0 | return -1; |
943 | 0 | } |
944 | 109 | config->worker = tid; |
945 | | |
946 | | /* Wait for the started signal so we can return to the caller */ |
947 | 109 | mk_event_wait(config->ch_evl); |
948 | 109 | mk_event_foreach(event, config->ch_evl) { |
949 | 109 | fd = event->fd; |
950 | 109 | bytes = flb_pipe_r(fd, &val, sizeof(uint64_t)); |
951 | 109 | if (bytes <= 0) { |
952 | | #if defined(FLB_SYSTEM_MACOS) |
953 | | pthread_cancel(tid); |
954 | | #endif |
955 | 0 | pthread_join(tid, NULL); |
956 | 0 | ctx->status = FLB_LIB_ERROR; |
957 | 0 | return -1; |
958 | 0 | } |
959 | | |
960 | 109 | if (val == FLB_ENGINE_STARTED) { |
961 | 107 | flb_debug("[lib] backend started"); |
962 | 107 | ctx->status = FLB_LIB_OK; |
963 | 107 | break; |
964 | 107 | } |
965 | 2 | else if (val == FLB_ENGINE_FAILED) { |
966 | 2 | flb_debug("[lib] backend failed"); |
967 | | #if defined(FLB_SYSTEM_MACOS) |
968 | | pthread_cancel(tid); |
969 | | #endif |
970 | 2 | pthread_join(tid, NULL); |
971 | 2 | ctx->status = FLB_LIB_ERROR; |
972 | 2 | return -1; |
973 | 2 | } |
974 | 0 | else { |
975 | 0 | flb_error("[lib] other error"); |
976 | 0 | } |
977 | 109 | } |
978 | | |
979 | 107 | return 0; |
980 | 109 | } |
981 | | |
982 | | /* Start the engine */ |
983 | | int flb_start(flb_ctx_t *ctx) |
984 | 109 | { |
985 | 109 | int ret; |
986 | | |
987 | 109 | ret = do_start(ctx); |
988 | 109 | if (ret == 0) { |
989 | | /* set context as the last active one */ |
990 | 107 | flb_context_set(ctx); |
991 | 107 | } |
992 | | |
993 | 109 | return ret; |
994 | 109 | } |
995 | | |
996 | | /* Start the engine without setting the global context */ |
997 | | int flb_start_trace(flb_ctx_t *ctx) |
998 | 0 | { |
999 | 0 | return do_start(ctx); |
1000 | 0 | } |
1001 | | |
1002 | | int flb_loop(flb_ctx_t *ctx) |
1003 | 0 | { |
1004 | 0 | while (ctx->status == FLB_LIB_OK) { |
1005 | 0 | sleep(1); |
1006 | 0 | } |
1007 | 0 | return 0; |
1008 | 0 | } |
1009 | | |
1010 | | /* Stop the engine */ |
1011 | | int flb_stop(flb_ctx_t *ctx) |
1012 | 107 | { |
1013 | 107 | int ret; |
1014 | 107 | pthread_t tid; |
1015 | | |
1016 | 107 | flb_debug("[lib] ctx stop address: %p, config context=%p\n", ctx, ctx->config); |
1017 | | |
1018 | 107 | tid = ctx->config->worker; |
1019 | | |
1020 | 107 | if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { |
1021 | | /* |
1022 | | * There is a chance the worker thread is still active while |
1023 | | * the service exited for some reason (plugin action). Always |
1024 | | * wait and double check that the child thread is not running. |
1025 | | */ |
1026 | | #if defined(FLB_SYSTEM_MACOS) |
1027 | | pthread_cancel(tid); |
1028 | | #endif |
1029 | 0 | pthread_join(tid, NULL); |
1030 | 0 | return 0; |
1031 | 0 | } |
1032 | | |
1033 | 107 | if (!ctx->config) { |
1034 | 0 | return 0; |
1035 | 0 | } |
1036 | | |
1037 | 107 | if (ctx->config->cf_main) { |
1038 | 107 | flb_cf_destroy(ctx->config->cf_main); |
1039 | 107 | ctx->config->cf_main = NULL; |
1040 | 107 | } |
1041 | | |
1042 | 107 | flb_debug("[lib] sending STOP signal to the engine"); |
1043 | | |
1044 | 107 | flb_engine_exit(ctx->config); |
1045 | | #if defined(FLB_SYSTEM_MACOS) |
1046 | | pthread_cancel(tid); |
1047 | | #endif |
1048 | 107 | ret = pthread_join(tid, NULL); |
1049 | 107 | if (ret != 0) { |
1050 | 0 | flb_errno(); |
1051 | 0 | } |
1052 | 107 | flb_debug("[lib] Fluent Bit engine stopped"); |
1053 | | |
1054 | 107 | return ret; |
1055 | 107 | } |
1056 | | |
1057 | | |
1058 | | void flb_context_set(flb_ctx_t *ctx) |
1059 | 216 | { |
1060 | 216 | FLB_TLS_SET(flb_lib_active_context, ctx); |
1061 | 216 | } |
1062 | | |
1063 | | flb_ctx_t *flb_context_get() |
1064 | 0 | { |
1065 | 0 | flb_ctx_t *ctx; |
1066 | |
|
1067 | 0 | ctx = FLB_TLS_GET(flb_lib_active_context); |
1068 | 0 | return ctx; |
1069 | 0 | } |
1070 | | |
1071 | | void flb_cf_context_set(struct flb_cf *cf) |
1072 | 0 | { |
1073 | 0 | FLB_TLS_SET(flb_lib_active_cf_context, cf); |
1074 | 0 | } |
1075 | | |
1076 | | struct flb_cf *flb_cf_context_get() |
1077 | 0 | { |
1078 | 0 | struct flb_cf *cf; |
1079 | |
|
1080 | 0 | cf = FLB_TLS_GET(flb_lib_active_cf_context); |
1081 | 0 | return cf; |
1082 | 0 | } |