/src/fluent-bit/plugins/in_storage_backlog/sb.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2022 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output.h> |
21 | | #include <fluent-bit/flb_input_plugin.h> |
22 | | #include <fluent-bit/flb_input_chunk.h> |
23 | | #include <fluent-bit/flb_storage.h> |
24 | | #include <fluent-bit/flb_utils.h> |
25 | | #include <chunkio/chunkio.h> |
26 | | #include <chunkio/cio_error.h> |
27 | | |
28 | | #include <sys/types.h> |
29 | | #include <sys/stat.h> |
30 | | |
31 | | #ifndef FLB_SYSTEM_WINDOWS |
32 | | #include <unistd.h> |
33 | | #endif |
34 | | |
35 | | struct sb_out_chunk { |
36 | | struct cio_chunk *chunk; |
37 | | struct cio_stream *stream; |
38 | | size_t size; |
39 | | struct mk_list _head; |
40 | | }; |
41 | | |
42 | | struct sb_out_queue { |
43 | | struct flb_output_instance *ins; |
44 | | struct mk_list chunks; /* head for every sb_out_chunk */ |
45 | | struct mk_list _head; |
46 | | }; |
47 | | |
48 | | struct flb_sb { |
49 | | int coll_fd; /* collector id */ |
50 | | size_t mem_limit; /* memory limit */ |
51 | | struct flb_input_instance *ins; /* input instance */ |
52 | | struct cio_ctx *cio; /* chunk i/o instance */ |
53 | | struct mk_list backlogs; /* list of all pending chunks segregated by output plugin */ |
54 | | }; |
55 | | |
56 | | |
57 | | static inline struct flb_sb *sb_get_context(struct flb_config *config); |
58 | | |
59 | | static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, |
60 | | struct cio_stream *stream, |
61 | | size_t size); |
62 | | |
63 | | static void sb_destroy_chunk(struct sb_out_chunk *chunk); |
64 | | |
65 | | static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context); |
66 | | |
67 | | static int sb_allocate_backlogs(struct flb_sb *ctx); |
68 | | |
69 | | static void sb_destroy_backlogs(struct flb_sb *ctx); |
70 | | |
71 | | static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( |
72 | | struct flb_output_instance *output_plugin, |
73 | | struct flb_sb *context); |
74 | | |
75 | | static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, |
76 | | struct sb_out_queue *backlog, |
77 | | int destroy); |
78 | | |
79 | | static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *chunk, |
80 | | struct flb_sb *context); |
81 | | |
82 | | static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, |
83 | | struct cio_stream *stream, |
84 | | size_t target_chunk_size, |
85 | | struct sb_out_queue *backlog); |
86 | | |
87 | | static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, |
88 | | struct cio_stream *stream, |
89 | | struct flb_sb *context); |
90 | | |
91 | | int sb_segregate_chunks(struct flb_config *config); |
92 | | |
93 | | int sb_release_output_queue_space(struct flb_output_instance *output_plugin, |
94 | | size_t required_space); |
95 | | |
96 | | ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, |
97 | | size_t required_space); |
98 | | |
99 | | |
100 | | static inline struct flb_sb *sb_get_context(struct flb_config *config) |
101 | 1.68k | { |
102 | 1.68k | if (config == NULL) { |
103 | 0 | return NULL; |
104 | 0 | } |
105 | | |
106 | 1.68k | if (config->storage_input_plugin == NULL) { |
107 | 1.68k | return NULL; |
108 | 1.68k | } |
109 | | |
110 | 0 | return ((struct flb_input_instance *) config->storage_input_plugin)->context; |
111 | 1.68k | } |
112 | | |
113 | | static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk, |
114 | | struct cio_stream *stream, |
115 | | size_t size) |
116 | 0 | { |
117 | 0 | struct sb_out_chunk *result; |
118 | |
|
119 | 0 | result = (struct sb_out_chunk *) flb_calloc(1, sizeof(struct sb_out_chunk)); |
120 | |
|
121 | 0 | if (result != NULL) { |
122 | 0 | result->chunk = chunk; |
123 | 0 | result->stream = stream; |
124 | 0 | result->size = size; |
125 | 0 | } |
126 | |
|
127 | 0 | return result; |
128 | 0 | } |
129 | | |
130 | | static void sb_destroy_chunk(struct sb_out_chunk *chunk) |
131 | 0 | { |
132 | 0 | flb_free(chunk); |
133 | 0 | } |
134 | | |
135 | | static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context) |
136 | 0 | { |
137 | 0 | struct mk_list *chunk_iterator_tmp; |
138 | 0 | struct mk_list *chunk_iterator; |
139 | 0 | struct sb_out_chunk *chunk; |
140 | |
|
141 | 0 | mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { |
142 | 0 | chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); |
143 | |
|
144 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context); |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | | static int sb_allocate_backlogs(struct flb_sb *context) |
149 | 0 | { |
150 | 0 | struct mk_list *output_plugin_iterator; |
151 | 0 | struct flb_output_instance *output_plugin; |
152 | 0 | struct sb_out_queue *backlog; |
153 | |
|
154 | 0 | mk_list_foreach(output_plugin_iterator, &context->ins->config->outputs) { |
155 | 0 | output_plugin = mk_list_entry(output_plugin_iterator, |
156 | 0 | struct flb_output_instance, |
157 | 0 | _head); |
158 | |
|
159 | 0 | backlog = (struct sb_out_queue *) \ |
160 | 0 | flb_calloc(1, sizeof(struct sb_out_queue)); |
161 | |
|
162 | 0 | if (backlog == NULL) { |
163 | 0 | sb_destroy_backlogs(context); |
164 | |
|
165 | 0 | return -1; |
166 | 0 | } |
167 | | |
168 | 0 | backlog->ins = output_plugin; |
169 | |
|
170 | 0 | mk_list_init(&backlog->chunks); |
171 | |
|
172 | 0 | mk_list_add(&backlog->_head, &context->backlogs); |
173 | 0 | } |
174 | | |
175 | 0 | return 0; |
176 | 0 | } |
177 | | |
178 | | static void sb_destroy_backlogs(struct flb_sb *context) |
179 | 0 | { |
180 | 0 | struct mk_list *backlog_iterator_tmp; |
181 | 0 | struct mk_list *backlog_iterator; |
182 | 0 | struct sb_out_queue *backlog; |
183 | |
|
184 | 0 | mk_list_foreach_safe(backlog_iterator, backlog_iterator_tmp, &context->backlogs) { |
185 | 0 | backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); |
186 | |
|
187 | 0 | mk_list_del(&backlog->_head); |
188 | |
|
189 | 0 | sb_destroy_backlog(backlog, context); |
190 | |
|
191 | 0 | flb_free(backlog); |
192 | 0 | } |
193 | 0 | } |
194 | | |
195 | | static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance( |
196 | | struct flb_output_instance *output_plugin, |
197 | | struct flb_sb *context) |
198 | 0 | { |
199 | 0 | struct mk_list *backlog_iterator; |
200 | 0 | struct sb_out_queue *backlog; |
201 | |
|
202 | 0 | mk_list_foreach(backlog_iterator, &context->backlogs) { |
203 | 0 | backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); |
204 | |
|
205 | 0 | if (output_plugin == backlog->ins) { |
206 | 0 | return backlog; |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | 0 | return NULL; |
211 | 0 | } |
212 | | |
213 | | static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk *target_chunk, |
214 | | struct sb_out_queue *backlog, |
215 | | int destroy) |
216 | 0 | { |
217 | 0 | struct mk_list *chunk_iterator_tmp; |
218 | 0 | struct mk_list *chunk_iterator; |
219 | 0 | struct sb_out_chunk *chunk; |
220 | |
|
221 | 0 | mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { |
222 | 0 | chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); |
223 | |
|
224 | 0 | if (chunk->chunk == target_chunk) { |
225 | 0 | mk_list_del(&chunk->_head); |
226 | |
|
227 | 0 | backlog->ins->fs_backlog_chunks_size -= cio_chunk_get_real_size(target_chunk); |
228 | |
|
229 | 0 | if (destroy) { |
230 | 0 | sb_destroy_chunk(chunk); |
231 | 0 | } |
232 | |
|
233 | 0 | break; |
234 | 0 | } |
235 | 0 | } |
236 | 0 | } |
237 | | |
238 | | static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *target_chunk, |
239 | | struct flb_sb *context) |
240 | 0 | { |
241 | 0 | struct mk_list *backlog_iterator; |
242 | 0 | struct sb_out_queue *backlog; |
243 | |
|
244 | 0 | mk_list_foreach(backlog_iterator, &context->backlogs) { |
245 | 0 | backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head); |
246 | |
|
247 | 0 | sb_remove_chunk_from_segregated_backlog(target_chunk, backlog, FLB_TRUE); |
248 | 0 | } |
249 | 0 | } |
250 | | |
251 | | static int sb_append_chunk_to_segregated_backlog(struct cio_chunk *target_chunk, |
252 | | struct cio_stream *stream, |
253 | | size_t target_chunk_size, |
254 | | struct sb_out_queue *backlog) |
255 | 0 | { |
256 | 0 | struct sb_out_chunk *chunk; |
257 | |
|
258 | 0 | chunk = sb_allocate_chunk(target_chunk, stream, target_chunk_size); |
259 | |
|
260 | 0 | if (chunk == NULL) { |
261 | 0 | flb_errno(); |
262 | 0 | return -1; |
263 | 0 | } |
264 | | |
265 | 0 | mk_list_add(&chunk->_head, &backlog->chunks); |
266 | |
|
267 | 0 | backlog->ins->fs_backlog_chunks_size += target_chunk_size; |
268 | |
|
269 | 0 | return 0; |
270 | 0 | } |
271 | | |
272 | | static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chunk, |
273 | | struct cio_stream *stream, |
274 | | struct flb_sb *context) |
275 | 0 | { |
276 | 0 | struct flb_input_chunk dummy_input_chunk; |
277 | 0 | struct mk_list *tmp; |
278 | 0 | struct mk_list *head; |
279 | 0 | size_t chunk_size; |
280 | 0 | struct sb_out_queue *backlog; |
281 | 0 | int tag_len; |
282 | 0 | const char * tag_buf; |
283 | 0 | int result; |
284 | |
|
285 | 0 | memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); |
286 | |
|
287 | 0 | dummy_input_chunk.in = context->ins; |
288 | 0 | dummy_input_chunk.chunk = target_chunk; |
289 | |
|
290 | 0 | chunk_size = cio_chunk_get_real_size(target_chunk); |
291 | |
|
292 | 0 | if (chunk_size < 0) { |
293 | 0 | flb_warn("[storage backlog] could not get real size of chunk %s/%s", |
294 | 0 | stream->name, target_chunk->name); |
295 | 0 | return -1; |
296 | 0 | } |
297 | | |
298 | 0 | result = flb_input_chunk_get_tag(&dummy_input_chunk, &tag_buf, &tag_len); |
299 | 0 | if (result == -1) { |
300 | 0 | flb_error("[storage backlog] could not retrieve chunk tag from %s/%s, " |
301 | 0 | "removing it from the queue", |
302 | 0 | stream->name, target_chunk->name); |
303 | 0 | return -2; |
304 | 0 | } |
305 | | |
306 | 0 | flb_routes_mask_set_by_tag(dummy_input_chunk.routes_mask, tag_buf, tag_len, |
307 | 0 | context->ins); |
308 | |
|
309 | 0 | mk_list_foreach_safe(head, tmp, &context->backlogs) { |
310 | 0 | backlog = mk_list_entry(head, struct sb_out_queue, _head); |
311 | 0 | if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, |
312 | 0 | backlog->ins->id)) { |
313 | 0 | result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, |
314 | 0 | chunk_size, backlog); |
315 | 0 | if (result) { |
316 | 0 | return -3; |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | | |
321 | 0 | return 0; |
322 | 0 | } |
323 | | |
324 | | int sb_segregate_chunks(struct flb_config *config) |
325 | 1.68k | { |
326 | 1.68k | int ret; |
327 | 1.68k | size_t size; |
328 | 1.68k | struct mk_list *tmp; |
329 | 1.68k | struct mk_list *stream_iterator; |
330 | 1.68k | struct mk_list *chunk_iterator; |
331 | 1.68k | int chunk_error; |
332 | 1.68k | struct flb_sb *context; |
333 | 1.68k | struct cio_stream *stream; |
334 | 1.68k | struct cio_chunk *chunk; |
335 | | |
336 | 1.68k | context = sb_get_context(config); |
337 | | |
338 | 1.68k | if (context == NULL) { |
339 | 1.68k | return 0; |
340 | 1.68k | } |
341 | | |
342 | 0 | ret = sb_allocate_backlogs(context); |
343 | 0 | if (ret) { |
344 | 0 | return -2; |
345 | 0 | } |
346 | | |
347 | 0 | mk_list_foreach(stream_iterator, &context->cio->streams) { |
348 | 0 | stream = mk_list_entry(stream_iterator, struct cio_stream, _head); |
349 | |
|
350 | 0 | mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) { |
351 | 0 | chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head); |
352 | |
|
353 | 0 | if (!cio_chunk_is_up(chunk)) { |
354 | 0 | ret = cio_chunk_up_force(chunk); |
355 | 0 | if (ret == CIO_CORRUPTED) { |
356 | 0 | if (config->storage_del_bad_chunks) { |
357 | 0 | chunk_error = cio_error_get(chunk); |
358 | |
|
359 | 0 | if (chunk_error == CIO_ERR_BAD_FILE_SIZE || |
360 | 0 | chunk_error == CIO_ERR_BAD_LAYOUT) |
361 | 0 | { |
362 | 0 | flb_plg_error(context->ins, "discarding irrecoverable chunk %s/%s", stream->name, chunk->name); |
363 | |
|
364 | 0 | cio_chunk_close(chunk, CIO_TRUE); |
365 | 0 | } |
366 | 0 | } |
367 | |
|
368 | 0 | continue; |
369 | 0 | } |
370 | 0 | } |
371 | | |
372 | 0 | if (!cio_chunk_is_up(chunk)) { |
373 | 0 | return -3; |
374 | 0 | } |
375 | | |
376 | | /* try to segregate a chunk */ |
377 | 0 | ret = sb_append_chunk_to_segregated_backlogs(chunk, stream, context); |
378 | 0 | if (ret) { |
379 | | /* |
380 | | * if the chunk could not be segregated, just remove it from the |
381 | | * queue and continue. |
382 | | * |
383 | | * if content size is zero, it's safe to 'delete it'. |
384 | | */ |
385 | 0 | size = cio_chunk_get_content_size(chunk); |
386 | 0 | if (size <= 0) { |
387 | 0 | cio_chunk_close(chunk, CIO_TRUE); |
388 | 0 | } |
389 | 0 | else { |
390 | 0 | cio_chunk_close(chunk, CIO_FALSE); |
391 | 0 | } |
392 | 0 | continue; |
393 | 0 | } |
394 | | |
395 | | /* lock the chunk */ |
396 | 0 | flb_plg_info(context->ins, "register %s/%s", stream->name, chunk->name); |
397 | |
|
398 | 0 | cio_chunk_lock(chunk); |
399 | 0 | cio_chunk_down(chunk); |
400 | 0 | } |
401 | 0 | } |
402 | | |
403 | 0 | return 0; |
404 | 0 | } |
405 | | |
406 | | ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin, |
407 | | size_t required_space) |
408 | 0 | { |
409 | 0 | ssize_t releasable_space; |
410 | 0 | struct mk_list *chunk_iterator; |
411 | 0 | struct flb_sb *context; |
412 | 0 | struct sb_out_queue *backlog; |
413 | 0 | struct sb_out_chunk *chunk; |
414 | |
|
415 | 0 | context = sb_get_context(output_plugin->config); |
416 | |
|
417 | 0 | if (context == NULL) { |
418 | 0 | return 0; |
419 | 0 | } |
420 | | |
421 | 0 | backlog = sb_find_segregated_backlog_by_output_plugin_instance( |
422 | 0 | output_plugin, context); |
423 | |
|
424 | 0 | if (backlog == NULL) { |
425 | 0 | return 0; |
426 | 0 | } |
427 | | |
428 | 0 | releasable_space = 0; |
429 | |
|
430 | 0 | mk_list_foreach(chunk_iterator, &backlog->chunks) { |
431 | 0 | chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); |
432 | |
|
433 | 0 | releasable_space += chunk->size; |
434 | |
|
435 | 0 | if (releasable_space >= required_space) { |
436 | 0 | break; |
437 | 0 | } |
438 | 0 | } |
439 | |
|
440 | 0 | return releasable_space; |
441 | 0 | } |
442 | | |
443 | | int sb_release_output_queue_space(struct flb_output_instance *output_plugin, |
444 | | size_t required_space) |
445 | 0 | { |
446 | 0 | struct mk_list *chunk_iterator_tmp; |
447 | 0 | struct mk_list *chunk_iterator; |
448 | 0 | size_t released_space; |
449 | 0 | struct flb_sb *context; |
450 | 0 | struct sb_out_queue *backlog; |
451 | 0 | struct sb_out_chunk *chunk; |
452 | |
|
453 | 0 | context = sb_get_context(output_plugin->config); |
454 | |
|
455 | 0 | if (context == NULL) { |
456 | 0 | return -1; |
457 | 0 | } |
458 | | |
459 | 0 | backlog = sb_find_segregated_backlog_by_output_plugin_instance( |
460 | 0 | output_plugin, context); |
461 | |
|
462 | 0 | if (backlog == NULL) { |
463 | 0 | return -2; |
464 | 0 | } |
465 | | |
466 | 0 | released_space = 0; |
467 | |
|
468 | 0 | mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) { |
469 | 0 | chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head); |
470 | |
|
471 | 0 | released_space += chunk->size; |
472 | |
|
473 | 0 | cio_chunk_close(chunk->chunk, FLB_TRUE); |
474 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context); |
475 | |
|
476 | 0 | if (released_space >= required_space) { |
477 | 0 | break; |
478 | 0 | } |
479 | 0 | } |
480 | |
|
481 | 0 | if (released_space < required_space) { |
482 | 0 | return -3; |
483 | 0 | } |
484 | | |
485 | 0 | return 0; |
486 | 0 | } |
487 | | |
488 | | /* Collection callback */ |
489 | | static int cb_queue_chunks(struct flb_input_instance *in, |
490 | | struct flb_config *config, void *data) |
491 | 0 | { |
492 | 0 | size_t empty_output_queue_count; |
493 | 0 | struct mk_list *output_queue_iterator; |
494 | 0 | struct sb_out_queue *output_queue_instance; |
495 | 0 | struct sb_out_chunk *chunk_instance; |
496 | 0 | struct flb_sb *ctx; |
497 | 0 | struct flb_input_chunk *ic; |
498 | 0 | struct flb_input_chunk tmp_ic; |
499 | 0 | void *ch; |
500 | 0 | size_t total = 0; |
501 | 0 | ssize_t size; |
502 | 0 | int ret; |
503 | 0 | int event_type; |
504 | | |
505 | | /* Get context */ |
506 | 0 | ctx = (struct flb_sb *) data; |
507 | | |
508 | | /* Get the total number of bytes already enqueued */ |
509 | 0 | total = flb_input_chunk_total_size(in); |
510 | | |
511 | | /* If we already hitted our limit, just wait and re-check later */ |
512 | 0 | if (total >= ctx->mem_limit) { |
513 | 0 | return 0; |
514 | 0 | } |
515 | | |
516 | 0 | empty_output_queue_count = 0; |
517 | |
|
518 | 0 | while (total < ctx->mem_limit && |
519 | 0 | empty_output_queue_count < mk_list_size(&ctx->backlogs)) { |
520 | 0 | empty_output_queue_count = 0; |
521 | |
|
522 | 0 | mk_list_foreach(output_queue_iterator, &ctx->backlogs) { |
523 | 0 | output_queue_instance = mk_list_entry(output_queue_iterator, |
524 | 0 | struct sb_out_queue, |
525 | 0 | _head); |
526 | |
|
527 | 0 | if (mk_list_is_empty(&output_queue_instance->chunks) != 0) { |
528 | 0 | chunk_instance = mk_list_entry_first(&output_queue_instance->chunks, |
529 | 0 | struct sb_out_chunk, |
530 | 0 | _head); |
531 | | |
532 | | /* Try to enqueue one chunk */ |
533 | | /* |
534 | | * All chunks on this backlog are 'file' based, always try to set |
535 | | * them up. We validate the status. |
536 | | */ |
537 | 0 | ret = cio_chunk_is_up(chunk_instance->chunk); |
538 | |
|
539 | 0 | if (ret == CIO_FALSE) { |
540 | 0 | ret = cio_chunk_up_force(chunk_instance->chunk); |
541 | |
|
542 | 0 | if (ret == CIO_CORRUPTED) { |
543 | 0 | flb_plg_error(ctx->ins, "removing corrupted chunk from the " |
544 | 0 | "queue %s:%s", |
545 | 0 | chunk_instance->stream->name, chunk_instance->chunk->name); |
546 | 0 | cio_chunk_close(chunk_instance->chunk, FLB_FALSE); |
547 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); |
548 | | /* This function will indirecly release chunk_instance so it has to be |
549 | | * called last. |
550 | | */ |
551 | 0 | continue; |
552 | 0 | } |
553 | 0 | else if (ret == CIO_ERROR || ret == CIO_RETRY) { |
554 | 0 | continue; |
555 | 0 | } |
556 | 0 | } |
557 | | |
558 | | /* |
559 | | * Map the chunk file context into a temporary buffer since the |
560 | | * flb_input_chunk_get_event_type() interface needs an |
561 | | * struct fb_input_chunk argument. |
562 | | */ |
563 | 0 | tmp_ic.chunk = chunk_instance->chunk; |
564 | | |
565 | | /* Retrieve the event type: FLB_INPUT_LOGS, FLB_INPUT_METRICS of FLB_INPUT_TRACES */ |
566 | 0 | ret = flb_input_chunk_get_event_type(&tmp_ic); |
567 | 0 | if (ret == -1) { |
568 | 0 | flb_plg_error(ctx->ins, "removing chunk with wrong metadata " |
569 | 0 | "from the queue %s:%s", |
570 | 0 | chunk_instance->stream->name, |
571 | 0 | chunk_instance->chunk->name); |
572 | 0 | cio_chunk_close(chunk_instance->chunk, FLB_TRUE); |
573 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, |
574 | 0 | ctx); |
575 | 0 | continue; |
576 | 0 | } |
577 | 0 | event_type = ret; |
578 | | |
579 | | /* get the number of bytes being used by the chunk */ |
580 | 0 | size = cio_chunk_get_content_size(chunk_instance->chunk); |
581 | 0 | if (size <= 0) { |
582 | 0 | flb_plg_error(ctx->ins, "removing empty chunk from the " |
583 | 0 | "queue %s:%s", |
584 | 0 | chunk_instance->stream->name, chunk_instance->chunk->name); |
585 | 0 | cio_chunk_close(chunk_instance->chunk, FLB_TRUE); |
586 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); |
587 | | /* This function will indirecly release chunk_instance so it has to be |
588 | | * called last. |
589 | | */ |
590 | 0 | continue; |
591 | 0 | } |
592 | | |
593 | 0 | ch = chunk_instance->chunk; |
594 | | |
595 | | /* Associate this backlog chunk to this instance into the engine */ |
596 | 0 | ic = flb_input_chunk_map(in, event_type, ch); |
597 | 0 | if (!ic) { |
598 | 0 | flb_plg_error(ctx->ins, "removing chunk %s:%s from the queue", |
599 | 0 | chunk_instance->stream->name, chunk_instance->chunk->name); |
600 | 0 | cio_chunk_down(chunk_instance->chunk); |
601 | | |
602 | | /* |
603 | | * If the file cannot be mapped, just drop it. Failures are all |
604 | | * associated with data corruption. |
605 | | */ |
606 | 0 | cio_chunk_close(chunk_instance->chunk, FLB_TRUE); |
607 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); |
608 | | /* This function will indirecly release chunk_instance so it has to be |
609 | | * called last. |
610 | | */ |
611 | 0 | continue; |
612 | 0 | } |
613 | | |
614 | 0 | flb_plg_info(ctx->ins, "queueing %s:%s", |
615 | 0 | chunk_instance->stream->name, chunk_instance->chunk->name); |
616 | | |
617 | | /* We are removing this chunk reference from this specific backlog |
618 | | * queue but we need to leave it in the remainder queues. |
619 | | */ |
620 | 0 | sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx); |
621 | | |
622 | | /* check our limits */ |
623 | 0 | total += size; |
624 | 0 | } |
625 | 0 | else { |
626 | 0 | empty_output_queue_count++; |
627 | 0 | } |
628 | 0 | } |
629 | 0 | } |
630 | |
|
631 | 0 | return 0; |
632 | 0 | } |
633 | | |
634 | | /* Initialize plugin */ |
635 | | static int cb_sb_init(struct flb_input_instance *in, |
636 | | struct flb_config *config, void *data) |
637 | 0 | { |
638 | 0 | int ret; |
639 | 0 | char mem[32]; |
640 | 0 | struct flb_sb *ctx; |
641 | |
|
642 | 0 | ctx = flb_malloc(sizeof(struct flb_sb)); |
643 | 0 | if (!ctx) { |
644 | 0 | flb_errno(); |
645 | 0 | return -1; |
646 | 0 | } |
647 | | |
648 | 0 | ctx->cio = data; |
649 | 0 | ctx->ins = in; |
650 | 0 | ctx->mem_limit = flb_utils_size_to_bytes(config->storage_bl_mem_limit); |
651 | |
|
652 | 0 | mk_list_init(&ctx->backlogs); |
653 | |
|
654 | 0 | flb_utils_bytes_to_human_readable_size(ctx->mem_limit, mem, sizeof(mem) - 1); |
655 | 0 | flb_plg_info(ctx->ins, "queue memory limit: %s", mem); |
656 | | |
657 | | /* export plugin context */ |
658 | 0 | flb_input_set_context(in, ctx); |
659 | | |
660 | | /* Set a collector to trigger the callback to queue data every second */ |
661 | 0 | ret = flb_input_set_collector_time(in, cb_queue_chunks, 1, 0, config); |
662 | 0 | if (ret < 0) { |
663 | 0 | flb_plg_error(ctx->ins, "could not create collector"); |
664 | 0 | flb_free(ctx); |
665 | 0 | return -1; |
666 | 0 | } |
667 | 0 | ctx->coll_fd = ret; |
668 | |
|
669 | 0 | return 0; |
670 | 0 | } |
671 | | |
672 | | static void cb_sb_pause(void *data, struct flb_config *config) |
673 | 0 | { |
674 | 0 | struct flb_sb *ctx = data; |
675 | 0 | flb_input_collector_pause(ctx->coll_fd, ctx->ins); |
676 | 0 | } |
677 | | |
678 | | static void cb_sb_resume(void *data, struct flb_config *config) |
679 | 0 | { |
680 | 0 | struct flb_sb *ctx = data; |
681 | 0 | flb_input_collector_resume(ctx->coll_fd, ctx->ins); |
682 | 0 | } |
683 | | |
684 | | static int cb_sb_exit(void *data, struct flb_config *config) |
685 | 0 | { |
686 | 0 | struct flb_sb *ctx = data; |
687 | |
|
688 | 0 | flb_input_collector_pause(ctx->coll_fd, ctx->ins); |
689 | |
|
690 | 0 | sb_destroy_backlogs(ctx); |
691 | |
|
692 | 0 | flb_free(ctx); |
693 | |
|
694 | 0 | return 0; |
695 | 0 | } |
696 | | |
697 | | /* Plugin reference */ |
698 | | struct flb_input_plugin in_storage_backlog_plugin = { |
699 | | .name = "storage_backlog", |
700 | | .description = "Storage Backlog", |
701 | | .cb_init = cb_sb_init, |
702 | | .cb_pre_run = NULL, |
703 | | .cb_collect = NULL, |
704 | | .cb_ingest = NULL, |
705 | | .cb_flush_buf = NULL, |
706 | | .cb_pause = cb_sb_pause, |
707 | | .cb_resume = cb_sb_resume, |
708 | | .cb_exit = cb_sb_exit, |
709 | | |
710 | | /* This plugin can only be configured and invoked by the Engine */ |
711 | | .flags = FLB_INPUT_PRIVATE |
712 | | }; |