/src/fluent-bit/plugins/in_emitter/emitter.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_compat.h> |
21 | | #include <fluent-bit/flb_info.h> |
22 | | #include <fluent-bit/flb_input.h> |
23 | | #include <fluent-bit/flb_input_plugin.h> |
24 | | #include <fluent-bit/flb_utils.h> |
25 | | #include <fluent-bit/flb_sds.h> |
26 | | #include <fluent-bit/flb_scheduler.h> |
27 | | #include <fluent-bit/flb_ring_buffer.h> |
28 | | |
29 | | #include <sys/types.h> |
30 | | #include <sys/stat.h> |
31 | | |
32 | 0 | #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 |
33 | | |
34 | | /* return values */ |
35 | 0 | #define FLB_EMITTER_BUSY -2 |
36 | | |
37 | | struct em_chunk { |
38 | | flb_sds_t tag; |
39 | | struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */ |
40 | | struct msgpack_packer mp_pck; /* msgpack packer */ |
41 | | struct mk_list _head; |
42 | | }; |
43 | | |
44 | | struct input_ref { |
45 | | struct flb_input_instance *i_ins; |
46 | | struct mk_list _head; |
47 | | }; |
48 | | |
49 | | struct flb_emitter { |
50 | | int coll_fd; /* collector id */ |
51 | | struct mk_list chunks; /* list of all pending chunks */ |
52 | | struct flb_input_instance *ins; /* input instance */ |
53 | | struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ |
54 | | int ring_buffer_size; /* size of the ring buffer */ |
55 | | struct mk_list i_ins_list; /* instance list of linked/sending inputs */ |
56 | | }; |
57 | | |
58 | | struct em_chunk *em_chunk_create(const char *tag, int tag_len, |
59 | | struct flb_emitter *ctx) |
60 | 0 | { |
61 | 0 | struct em_chunk *ec; |
62 | |
|
63 | 0 | ec = flb_calloc(1, sizeof(struct em_chunk)); |
64 | 0 | if (!ec) { |
65 | 0 | flb_errno(); |
66 | 0 | return NULL; |
67 | 0 | } |
68 | | |
69 | 0 | ec->tag = flb_sds_create_len(tag, tag_len); |
70 | 0 | if (!ec->tag) { |
71 | 0 | flb_errno(); |
72 | 0 | flb_free(ec); |
73 | 0 | return NULL; |
74 | 0 | } |
75 | | |
76 | 0 | msgpack_sbuffer_init(&ec->mp_sbuf); |
77 | 0 | msgpack_packer_init(&ec->mp_pck, &ec->mp_sbuf, msgpack_sbuffer_write); |
78 | |
|
79 | 0 | mk_list_add(&ec->_head, &ctx->chunks); |
80 | |
|
81 | 0 | return ec; |
82 | 0 | } |
83 | | |
84 | | static void em_chunk_destroy(struct em_chunk *ec) |
85 | 0 | { |
86 | 0 | mk_list_del(&ec->_head); |
87 | 0 | flb_sds_destroy(ec->tag); |
88 | 0 | msgpack_sbuffer_destroy(&ec->mp_sbuf); |
89 | 0 | flb_free(ec); |
90 | 0 | } |
91 | | |
92 | | int static do_in_emitter_add_record(struct em_chunk *ec, |
93 | | struct flb_input_instance *in) |
94 | 0 | { |
95 | 0 | struct flb_emitter *ctx = (struct flb_emitter *) in->context; |
96 | 0 | int ret; |
97 | |
|
98 | 0 | if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { |
99 | 0 | flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", |
100 | 0 | ctx->ins->name); |
101 | 0 | return FLB_EMITTER_BUSY; |
102 | 0 | } |
103 | | |
104 | | /* Associate this backlog chunk to this instance into the engine */ |
105 | 0 | ret = flb_input_log_append(in, |
106 | 0 | ec->tag, flb_sds_len(ec->tag), |
107 | 0 | ec->mp_sbuf.data, |
108 | 0 | ec->mp_sbuf.size); |
109 | 0 | if (ret == -1) { |
110 | 0 | flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag); |
111 | | /* Release the echunk */ |
112 | 0 | em_chunk_destroy(ec); |
113 | 0 | return -1; |
114 | 0 | } |
115 | 0 | em_chunk_destroy(ec); |
116 | 0 | return 0; |
117 | 0 | } |
118 | | |
119 | | /* |
120 | | * Function used by filters to ingest custom records with custom tags, at the |
121 | | * moment it's only used by rewrite_tag filter. |
122 | | */ |
123 | | int in_emitter_add_record(const char *tag, int tag_len, |
124 | | const char *buf_data, size_t buf_size, |
125 | | struct flb_input_instance *in, |
126 | | struct flb_input_instance *i_ins) |
127 | 0 | { |
128 | 0 | struct em_chunk temporary_chunk; |
129 | 0 | struct mk_list *head; |
130 | 0 | struct input_ref *i_ref; |
131 | 0 | bool ref_found; |
132 | 0 | struct mk_list *tmp; |
133 | |
|
134 | 0 | struct em_chunk *ec; |
135 | 0 | struct flb_emitter *ctx; |
136 | |
|
137 | 0 | ctx = (struct flb_emitter *) in->context; |
138 | 0 | ec = NULL; |
139 | | /* Iterate over list of already known (source) inputs */ |
140 | | /* If new, add it to the list to be able to pause it later on */ |
141 | 0 | ref_found = false; |
142 | 0 | mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { |
143 | 0 | i_ref = mk_list_entry(head, struct input_ref, _head); |
144 | 0 | if(i_ref->i_ins == i_ins){ |
145 | 0 | ref_found = true; |
146 | 0 | break; |
147 | 0 | } |
148 | 0 | } |
149 | 0 | if (!ref_found) { |
150 | 0 | i_ref = flb_malloc(sizeof(struct input_ref)); |
151 | 0 | if (!i_ref) { |
152 | 0 | flb_errno(); |
153 | 0 | return FLB_FILTER_NOTOUCH; |
154 | 0 | } |
155 | 0 | i_ref->i_ins = i_ins; |
156 | 0 | mk_list_add(&i_ref->_head, &ctx->i_ins_list); |
157 | | /* If in_emitter is paused, but new input plugin is not paused, pause it */ |
158 | 0 | if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && |
159 | 0 | flb_input_buf_paused(i_ins) == FLB_FALSE) { |
160 | 0 | flb_input_pause(i_ins); |
161 | 0 | } |
162 | 0 | } |
163 | | |
164 | | |
165 | | /* Restricted by mem_buf_limit */ |
166 | 0 | if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { |
167 | 0 | flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); |
168 | 0 | return FLB_EMITTER_BUSY; |
169 | 0 | } |
170 | | |
171 | | /* Use the ring buffer first if it exists */ |
172 | 0 | if (ctx->msgs) { |
173 | 0 | memset(&temporary_chunk, 0, sizeof(struct em_chunk)); |
174 | |
|
175 | 0 | temporary_chunk.tag = flb_sds_create_len(tag, tag_len); |
176 | |
|
177 | 0 | if (temporary_chunk.tag == NULL) { |
178 | 0 | flb_plg_error(ctx->ins, |
179 | 0 | "cannot allocate memory for tag: %s", |
180 | 0 | tag); |
181 | 0 | return -1; |
182 | 0 | } |
183 | | |
184 | 0 | msgpack_sbuffer_init(&temporary_chunk.mp_sbuf); |
185 | 0 | msgpack_sbuffer_write(&temporary_chunk.mp_sbuf, buf_data, buf_size); |
186 | |
|
187 | 0 | return flb_ring_buffer_write(ctx->msgs, |
188 | 0 | (void *) &temporary_chunk, |
189 | 0 | sizeof(struct em_chunk)); |
190 | 0 | } |
191 | | |
192 | | /* Check if any target chunk already exists */ |
193 | 0 | mk_list_foreach(head, &ctx->chunks) { |
194 | 0 | ec = mk_list_entry(head, struct em_chunk, _head); |
195 | 0 | if (flb_sds_cmp(ec->tag, tag, tag_len) != 0) { |
196 | 0 | ec = NULL; |
197 | 0 | continue; |
198 | 0 | } |
199 | 0 | break; |
200 | 0 | } |
201 | | |
202 | | /* No candidate chunk found, so create a new one */ |
203 | 0 | if (!ec) { |
204 | 0 | ec = em_chunk_create(tag, tag_len, ctx); |
205 | 0 | if (!ec) { |
206 | 0 | flb_plg_error(ctx->ins, "cannot create new chunk for tag: %s", |
207 | 0 | tag); |
208 | 0 | return -1; |
209 | 0 | } |
210 | 0 | } |
211 | | |
212 | | /* Append raw msgpack data */ |
213 | 0 | msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); |
214 | 0 | return 0; |
215 | 0 | } |
216 | | |
217 | | /* |
218 | | * Triggered by refresh_interval, it re-scan the path looking for new files |
219 | | * that match the original path pattern. |
220 | | */ |
221 | | static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, |
222 | | struct flb_config *config, void *context) |
223 | 0 | { |
224 | 0 | int ret; |
225 | 0 | struct flb_emitter *ctx = (struct flb_emitter *)context; |
226 | 0 | struct em_chunk ec; |
227 | 0 | (void) config; |
228 | 0 | (void) in; |
229 | | |
230 | |
|
231 | 0 | while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec, |
232 | 0 | sizeof(struct em_chunk))) == 0) { |
233 | 0 | ret = flb_input_log_append(in, |
234 | 0 | ec.tag, flb_sds_len(ec.tag), |
235 | 0 | ec.mp_sbuf.data, |
236 | 0 | ec.mp_sbuf.size); |
237 | 0 | flb_sds_destroy(ec.tag); |
238 | 0 | msgpack_sbuffer_destroy(&ec.mp_sbuf); |
239 | 0 | } |
240 | 0 | return ret; |
241 | 0 | } |
242 | | |
243 | | static int cb_queue_chunks(struct flb_input_instance *in, |
244 | | struct flb_config *config, void *data) |
245 | 0 | { |
246 | 0 | int ret; |
247 | 0 | struct mk_list *tmp; |
248 | 0 | struct mk_list *head; |
249 | 0 | struct em_chunk *echunk; |
250 | 0 | struct flb_emitter *ctx; |
251 | | |
252 | | /* Get context */ |
253 | 0 | ctx = (struct flb_emitter *) data; |
254 | | |
255 | | /* Try to enqueue chunks under our limits */ |
256 | 0 | mk_list_foreach_safe(head, tmp, &ctx->chunks) { |
257 | 0 | echunk = mk_list_entry(head, struct em_chunk, _head); |
258 | | |
259 | | /* Associate this backlog chunk to this instance into the engine */ |
260 | 0 | ret = do_in_emitter_add_record(echunk, in); |
261 | 0 | if (ret == -1) { |
262 | 0 | continue; |
263 | 0 | } |
264 | 0 | } |
265 | |
|
266 | 0 | return 0; |
267 | 0 | } |
268 | | |
269 | | static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx) |
270 | 0 | { |
271 | 0 | if (ctx->ring_buffer_size <= 0) { |
272 | 0 | return 0; |
273 | 0 | } |
274 | | |
275 | 0 | if (ctx->msgs != NULL) { |
276 | 0 | flb_warn("emitter %s already has a ring buffer", |
277 | 0 | flb_input_name(in)); |
278 | 0 | return 0; |
279 | 0 | } |
280 | | |
281 | 0 | ctx->msgs = flb_ring_buffer_create(sizeof(void *) * ctx->ring_buffer_size); |
282 | 0 | if (!ctx->msgs) { |
283 | 0 | flb_error("emitter %s could not initialize ring buffer", |
284 | 0 | flb_input_name(in)); |
285 | 0 | return -1; |
286 | 0 | } |
287 | | |
288 | 0 | ctx->coll_fd = flb_input_set_collector_time(in, |
289 | 0 | in_emitter_ingest_ring_buffer, |
290 | 0 | 1, 0, in->config); |
291 | 0 | return (ctx->coll_fd < 0) ? -1 : 0; |
292 | 0 | } |
293 | | |
294 | | /* Initialize plugin */ |
295 | | static int cb_emitter_init(struct flb_input_instance *in, |
296 | | struct flb_config *config, void *data) |
297 | 0 | { |
298 | 0 | struct flb_sched *scheduler; |
299 | 0 | struct flb_emitter *ctx; |
300 | 0 | int ret; |
301 | |
|
302 | 0 | scheduler = flb_sched_ctx_get(); |
303 | |
|
304 | 0 | ctx = flb_calloc(1, sizeof(struct flb_emitter)); |
305 | 0 | if (!ctx) { |
306 | 0 | flb_errno(); |
307 | 0 | return -1; |
308 | 0 | } |
309 | 0 | ctx->ins = in; |
310 | 0 | mk_list_init(&ctx->chunks); |
311 | |
|
312 | 0 | mk_list_init(&ctx->i_ins_list); |
313 | | |
314 | |
|
315 | 0 | ret = flb_input_config_map_set(in, (void *) ctx); |
316 | 0 | if (ret == -1) { |
317 | 0 | flb_free(ctx); |
318 | 0 | return -1; |
319 | 0 | } |
320 | | |
321 | 0 | if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) { |
322 | 0 | ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY; |
323 | 0 | flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)", |
324 | 0 | ctx->ring_buffer_size); |
325 | 0 | } |
326 | |
|
327 | 0 | if (ctx->ring_buffer_size > 0) { |
328 | 0 | ret = in_emitter_start_ring_buffer(in, ctx); |
329 | 0 | if (ret == -1) { |
330 | 0 | flb_free(ctx); |
331 | 0 | return -1; |
332 | 0 | } |
333 | 0 | } |
334 | 0 | else{ |
335 | 0 | ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config); |
336 | 0 | if (ret < 0) { |
337 | 0 | flb_error("[in_emitter] could not create collector"); |
338 | 0 | flb_free(ctx); |
339 | 0 | return -1; |
340 | 0 | } |
341 | 0 | ctx->coll_fd = ret; |
342 | 0 | } |
343 | | |
344 | | /* export plugin context */ |
345 | 0 | flb_input_set_context(in, ctx); |
346 | |
|
347 | 0 | return 0; |
348 | 0 | } |
349 | | |
350 | | static void cb_emitter_pause(void *data, struct flb_config *config) |
351 | 0 | { |
352 | 0 | struct flb_emitter *ctx = data; |
353 | 0 | struct mk_list *tmp; |
354 | 0 | struct mk_list *head; |
355 | 0 | struct input_ref *i_ref; |
356 | | |
357 | | /* Pause all known senders */ |
358 | 0 | flb_input_collector_pause(ctx->coll_fd, ctx->ins); |
359 | 0 | mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { |
360 | 0 | i_ref = mk_list_entry(head, struct input_ref, _head); |
361 | 0 | flb_input_pause(i_ref->i_ins); |
362 | 0 | } |
363 | 0 | } |
364 | | |
365 | | static void cb_emitter_resume(void *data, struct flb_config *config) |
366 | 0 | { |
367 | 0 | struct flb_emitter *ctx = data; |
368 | 0 | struct mk_list *tmp; |
369 | 0 | struct mk_list *head; |
370 | 0 | struct input_ref *i_ref; |
371 | | |
372 | | /* Resume all known senders */ |
373 | 0 | flb_input_collector_resume(ctx->coll_fd, ctx->ins); |
374 | 0 | mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { |
375 | 0 | i_ref = mk_list_entry(head, struct input_ref, _head); |
376 | 0 | flb_input_resume(i_ref->i_ins); |
377 | 0 | } |
378 | 0 | } |
379 | | |
380 | | static int cb_emitter_exit(void *data, struct flb_config *config) |
381 | 0 | { |
382 | 0 | struct mk_list *tmp; |
383 | 0 | struct mk_list *head; |
384 | 0 | struct flb_emitter *ctx = data; |
385 | 0 | struct em_chunk *echunk; |
386 | 0 | struct em_chunk ec; |
387 | 0 | struct input_ref *i_ref; |
388 | 0 | int ret; |
389 | |
|
390 | 0 | mk_list_foreach_safe(head, tmp, &ctx->chunks) { |
391 | 0 | echunk = mk_list_entry(head, struct em_chunk, _head); |
392 | 0 | mk_list_del(&echunk->_head); |
393 | 0 | flb_free(echunk); |
394 | 0 | } |
395 | |
|
396 | 0 | if (ctx->msgs) { |
397 | 0 | while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec, |
398 | 0 | sizeof(struct em_chunk))) == 0) { |
399 | 0 | flb_sds_destroy(ec.tag); |
400 | 0 | msgpack_sbuffer_destroy(&ec.mp_sbuf); |
401 | 0 | } |
402 | 0 | flb_ring_buffer_destroy(ctx->msgs); |
403 | 0 | } |
404 | |
|
405 | 0 | mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { |
406 | 0 | i_ref = mk_list_entry(head, struct input_ref, _head); |
407 | 0 | mk_list_del(&i_ref->_head); |
408 | 0 | flb_free(i_ref); |
409 | 0 | } |
410 | | |
411 | |
|
412 | 0 | flb_free(ctx); |
413 | 0 | return 0; |
414 | 0 | } |
415 | | |
416 | | static struct flb_config_map config_map[] = { |
417 | | { |
418 | | FLB_CONFIG_MAP_INT, "ring_buffer_size", "0", |
419 | | 0, FLB_TRUE, offsetof(struct flb_emitter, ring_buffer_size), |
420 | | "use a ring buffer to ingest messages for the emitter (required across threads)." |
421 | | }, |
422 | | {0} |
423 | | }; |
424 | | |
425 | | /* Plugin reference */ |
426 | | struct flb_input_plugin in_emitter_plugin = { |
427 | | .name = "emitter", |
428 | | .description = "Record Emitter", |
429 | | .cb_init = cb_emitter_init, |
430 | | .cb_pre_run = NULL, |
431 | | .cb_collect = NULL, |
432 | | .cb_ingest = NULL, |
433 | | .cb_flush_buf = NULL, |
434 | | .config_map = config_map, |
435 | | .cb_pause = cb_emitter_pause, |
436 | | .cb_resume = cb_emitter_resume, |
437 | | .cb_exit = cb_emitter_exit, |
438 | | |
439 | | /* This plugin can only be configured and invoked by the Engine only */ |
440 | | .flags = FLB_INPUT_PRIVATE |
441 | | }; |