Coverage Report

Created: 2023-03-10 07:33

/src/fluent-bit/plugins/in_storage_backlog/sb.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output.h>
21
#include <fluent-bit/flb_input_plugin.h>
22
#include <fluent-bit/flb_input_chunk.h>
23
#include <fluent-bit/flb_storage.h>
24
#include <fluent-bit/flb_utils.h>
25
#include <chunkio/chunkio.h>
26
#include <chunkio/cio_error.h>
27
28
#include <sys/types.h>
29
#include <sys/stat.h>
30
31
#ifndef FLB_SYSTEM_WINDOWS
32
#include <unistd.h>
33
#endif
34
35
struct sb_out_chunk {
36
    struct cio_chunk  *chunk;
37
    struct cio_stream *stream;
38
    size_t             size;
39
    struct mk_list    _head;
40
};
41
42
struct sb_out_queue {
43
    struct flb_output_instance *ins;
44
    struct mk_list              chunks; /* head for every sb_out_chunk */
45
    struct mk_list              _head;
46
};
47
48
struct flb_sb {
49
    int coll_fd;                    /* collector id */
50
    size_t mem_limit;               /* memory limit */
51
    struct flb_input_instance *ins; /* input instance */
52
    struct cio_ctx *cio;            /* chunk i/o instance */
53
    struct mk_list backlogs;        /* list of all pending chunks segregated by output plugin */
54
};
55
56
57
static inline struct flb_sb *sb_get_context(struct flb_config *config);
58
59
static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk,
60
                                              struct cio_stream *stream,
61
                                              size_t size);
62
63
static void sb_destroy_chunk(struct sb_out_chunk *chunk);
64
65
static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context);
66
67
static int  sb_allocate_backlogs(struct flb_sb *ctx);
68
69
static void sb_destroy_backlogs(struct flb_sb *ctx);
70
71
static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance(
72
                                struct flb_output_instance *output_plugin,
73
                                struct flb_sb              *context);
74
75
static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk    *target_chunk,
76
                                                    struct sb_out_queue *backlog,
77
                                                    int                  destroy);
78
79
static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *chunk,
80
                                                     struct flb_sb    *context);
81
82
static int sb_append_chunk_to_segregated_backlog(struct cio_chunk    *target_chunk,
83
                                                 struct cio_stream   *stream,
84
                                                 size_t               target_chunk_size,
85
                                                 struct sb_out_queue *backlog);
86
87
static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk  *target_chunk,
88
                                                  struct cio_stream *stream,
89
                                                  struct flb_sb     *context);
90
91
int sb_segregate_chunks(struct flb_config *config);
92
93
int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
94
                                  size_t                      required_space);
95
96
ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
97
                                             size_t                      required_space);
98
99
100
static inline struct flb_sb *sb_get_context(struct flb_config *config)
101
1.68k
{
102
1.68k
    if (config == NULL) {
103
0
        return NULL;
104
0
    }
105
106
1.68k
    if (config->storage_input_plugin == NULL) {
107
1.68k
        return NULL;
108
1.68k
    }
109
110
0
    return ((struct flb_input_instance *) config->storage_input_plugin)->context;
111
1.68k
}
112
113
static struct sb_out_chunk *sb_allocate_chunk(struct cio_chunk *chunk,
114
                                              struct cio_stream *stream,
115
                                              size_t size)
116
0
{
117
0
    struct sb_out_chunk *result;
118
119
0
    result = (struct sb_out_chunk *) flb_calloc(1, sizeof(struct sb_out_chunk));
120
121
0
    if (result != NULL) {
122
0
        result->chunk  = chunk;
123
0
        result->stream = stream;
124
0
        result->size   = size;
125
0
    }
126
127
0
    return result;
128
0
}
129
130
static void sb_destroy_chunk(struct sb_out_chunk *chunk)
131
0
{
132
0
    flb_free(chunk);
133
0
}
134
135
static void sb_destroy_backlog(struct sb_out_queue *backlog, struct flb_sb *context)
136
0
{
137
0
    struct mk_list      *chunk_iterator_tmp;
138
0
    struct mk_list      *chunk_iterator;
139
0
    struct sb_out_chunk *chunk;
140
141
0
    mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) {
142
0
        chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head);
143
144
0
        sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context);
145
0
    }
146
0
}
147
148
static int sb_allocate_backlogs(struct flb_sb *context)
149
0
{
150
0
    struct mk_list             *output_plugin_iterator;
151
0
    struct flb_output_instance *output_plugin;
152
0
    struct sb_out_queue        *backlog;
153
154
0
    mk_list_foreach(output_plugin_iterator, &context->ins->config->outputs) {
155
0
        output_plugin = mk_list_entry(output_plugin_iterator,
156
0
                                      struct flb_output_instance,
157
0
                                      _head);
158
159
0
        backlog = (struct sb_out_queue *) \
160
0
                        flb_calloc(1, sizeof(struct sb_out_queue));
161
162
0
        if (backlog == NULL) {
163
0
            sb_destroy_backlogs(context);
164
165
0
            return -1;
166
0
        }
167
168
0
        backlog->ins = output_plugin;
169
170
0
        mk_list_init(&backlog->chunks);
171
172
0
        mk_list_add(&backlog->_head, &context->backlogs);
173
0
    }
174
175
0
    return 0;
176
0
}
177
178
static void sb_destroy_backlogs(struct flb_sb *context)
179
0
{
180
0
    struct mk_list      *backlog_iterator_tmp;
181
0
    struct mk_list      *backlog_iterator;
182
0
    struct sb_out_queue *backlog;
183
184
0
    mk_list_foreach_safe(backlog_iterator, backlog_iterator_tmp, &context->backlogs) {
185
0
        backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head);
186
187
0
        mk_list_del(&backlog->_head);
188
189
0
        sb_destroy_backlog(backlog, context);
190
191
0
        flb_free(backlog);
192
0
    }
193
0
}
194
195
static struct sb_out_queue *sb_find_segregated_backlog_by_output_plugin_instance(
196
                                struct flb_output_instance *output_plugin,
197
                                struct flb_sb              *context)
198
0
{
199
0
    struct mk_list      *backlog_iterator;
200
0
    struct sb_out_queue *backlog;
201
202
0
    mk_list_foreach(backlog_iterator, &context->backlogs) {
203
0
        backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head);
204
205
0
        if (output_plugin == backlog->ins) {
206
0
            return backlog;
207
0
        }
208
0
    }
209
210
0
    return NULL;
211
0
}
212
213
static void sb_remove_chunk_from_segregated_backlog(struct cio_chunk    *target_chunk,
214
                                                    struct sb_out_queue *backlog,
215
                                                    int                  destroy)
216
0
{
217
0
    struct mk_list      *chunk_iterator_tmp;
218
0
    struct mk_list      *chunk_iterator;
219
0
    struct sb_out_chunk *chunk;
220
221
0
    mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) {
222
0
        chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head);
223
224
0
        if (chunk->chunk == target_chunk) {
225
0
            mk_list_del(&chunk->_head);
226
227
0
            backlog->ins->fs_backlog_chunks_size -= cio_chunk_get_real_size(target_chunk);
228
229
0
            if (destroy) {
230
0
                sb_destroy_chunk(chunk);
231
0
            }
232
233
0
            break;
234
0
        }
235
0
    }
236
0
}
237
238
static void sb_remove_chunk_from_segregated_backlogs(struct cio_chunk *target_chunk,
239
                                                     struct flb_sb    *context)
240
0
{
241
0
    struct mk_list      *backlog_iterator;
242
0
    struct sb_out_queue *backlog;
243
244
0
    mk_list_foreach(backlog_iterator, &context->backlogs) {
245
0
        backlog = mk_list_entry(backlog_iterator, struct sb_out_queue, _head);
246
247
0
        sb_remove_chunk_from_segregated_backlog(target_chunk, backlog, FLB_TRUE);
248
0
    }
249
0
}
250
251
static int sb_append_chunk_to_segregated_backlog(struct cio_chunk    *target_chunk,
252
                                                 struct cio_stream   *stream,
253
                                                 size_t               target_chunk_size,
254
                                                 struct sb_out_queue *backlog)
255
0
{
256
0
    struct sb_out_chunk *chunk;
257
258
0
    chunk = sb_allocate_chunk(target_chunk, stream, target_chunk_size);
259
260
0
    if (chunk == NULL) {
261
0
        flb_errno();
262
0
        return -1;
263
0
    }
264
265
0
    mk_list_add(&chunk->_head, &backlog->chunks);
266
267
0
    backlog->ins->fs_backlog_chunks_size += target_chunk_size;
268
269
0
    return 0;
270
0
}
271
272
static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk  *target_chunk,
273
                                                  struct cio_stream *stream,
274
                                                  struct flb_sb     *context)
275
0
{
276
0
    struct flb_input_chunk  dummy_input_chunk;
277
0
    struct mk_list         *tmp;
278
0
    struct mk_list         *head;
279
0
    size_t                  chunk_size;
280
0
    struct sb_out_queue    *backlog;
281
0
    int                     tag_len;
282
0
    const char *            tag_buf;
283
0
    int                     result;
284
285
0
    memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));
286
287
0
    dummy_input_chunk.in    = context->ins;
288
0
    dummy_input_chunk.chunk = target_chunk;
289
290
0
    chunk_size = cio_chunk_get_real_size(target_chunk);
291
292
0
    if (chunk_size < 0) {
293
0
        flb_warn("[storage backlog] could not get real size of chunk %s/%s",
294
0
                  stream->name, target_chunk->name);
295
0
        return -1;
296
0
    }
297
298
0
    result = flb_input_chunk_get_tag(&dummy_input_chunk, &tag_buf, &tag_len);
299
0
    if (result == -1) {
300
0
        flb_error("[storage backlog] could not retrieve chunk tag from %s/%s, "
301
0
                  "removing it from the queue",
302
0
                  stream->name, target_chunk->name);
303
0
        return -2;
304
0
    }
305
306
0
    flb_routes_mask_set_by_tag(dummy_input_chunk.routes_mask, tag_buf, tag_len,
307
0
                               context->ins);
308
309
0
    mk_list_foreach_safe(head, tmp, &context->backlogs) {
310
0
        backlog = mk_list_entry(head, struct sb_out_queue, _head);
311
0
        if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask,
312
0
                                    backlog->ins->id)) {
313
0
            result = sb_append_chunk_to_segregated_backlog(target_chunk, stream,
314
0
                                                           chunk_size, backlog);
315
0
            if (result) {
316
0
                return -3;
317
0
            }
318
0
        }
319
0
    }
320
321
0
    return 0;
322
0
}
323
324
int sb_segregate_chunks(struct flb_config *config)
325
1.68k
{
326
1.68k
    int                ret;
327
1.68k
    size_t             size;
328
1.68k
    struct mk_list    *tmp;
329
1.68k
    struct mk_list    *stream_iterator;
330
1.68k
    struct mk_list    *chunk_iterator;
331
1.68k
    int                chunk_error;
332
1.68k
    struct flb_sb     *context;
333
1.68k
    struct cio_stream *stream;
334
1.68k
    struct cio_chunk  *chunk;
335
336
1.68k
    context = sb_get_context(config);
337
338
1.68k
    if (context == NULL) {
339
1.68k
        return 0;
340
1.68k
    }
341
342
0
    ret = sb_allocate_backlogs(context);
343
0
    if (ret) {
344
0
        return -2;
345
0
    }
346
347
0
    mk_list_foreach(stream_iterator, &context->cio->streams) {
348
0
        stream = mk_list_entry(stream_iterator, struct cio_stream, _head);
349
350
0
        mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) {
351
0
            chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head);
352
353
0
            if (!cio_chunk_is_up(chunk)) {
354
0
                ret = cio_chunk_up_force(chunk);
355
0
                if (ret == CIO_CORRUPTED) {
356
0
                    if (config->storage_del_bad_chunks) {
357
0
                        chunk_error = cio_error_get(chunk);
358
359
0
                        if (chunk_error == CIO_ERR_BAD_FILE_SIZE ||
360
0
                            chunk_error == CIO_ERR_BAD_LAYOUT)
361
0
                        {
362
0
                            flb_plg_error(context->ins, "discarding irrecoverable chunk %s/%s", stream->name, chunk->name);
363
364
0
                            cio_chunk_close(chunk, CIO_TRUE);
365
0
                        }
366
0
                    }
367
368
0
                    continue;
369
0
                }
370
0
            }
371
372
0
            if (!cio_chunk_is_up(chunk)) {
373
0
                return -3;
374
0
            }
375
376
            /* try to segregate a chunk */
377
0
            ret = sb_append_chunk_to_segregated_backlogs(chunk, stream, context);
378
0
            if (ret) {
379
                /*
380
                 * if the chunk could not be segregated, just remove it from the
381
                 * queue and continue.
382
                 *
383
                 * if content size is zero, it's safe to 'delete it'.
384
                 */
385
0
                size = cio_chunk_get_content_size(chunk);
386
0
                if (size <= 0) {
387
0
                    cio_chunk_close(chunk, CIO_TRUE);
388
0
                }
389
0
                else {
390
0
                    cio_chunk_close(chunk, CIO_FALSE);
391
0
                }
392
0
                continue;
393
0
            }
394
395
            /* lock the chunk */
396
0
            flb_plg_info(context->ins, "register %s/%s", stream->name, chunk->name);
397
398
0
            cio_chunk_lock(chunk);
399
0
            cio_chunk_down(chunk);
400
0
        }
401
0
    }
402
403
0
    return 0;
404
0
}
405
406
ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
407
                                             size_t                      required_space)
408
0
{
409
0
    ssize_t              releasable_space;
410
0
    struct mk_list      *chunk_iterator;
411
0
    struct flb_sb       *context;
412
0
    struct sb_out_queue *backlog;
413
0
    struct sb_out_chunk *chunk;
414
415
0
    context = sb_get_context(output_plugin->config);
416
417
0
    if (context == NULL) {
418
0
        return 0;
419
0
    }
420
421
0
    backlog = sb_find_segregated_backlog_by_output_plugin_instance(
422
0
                                output_plugin, context);
423
424
0
    if (backlog == NULL) {
425
0
        return 0;
426
0
    }
427
428
0
    releasable_space = 0;
429
430
0
    mk_list_foreach(chunk_iterator, &backlog->chunks) {
431
0
        chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head);
432
433
0
        releasable_space += chunk->size;
434
435
0
        if (releasable_space >= required_space) {
436
0
            break;
437
0
        }
438
0
    }
439
440
0
    return releasable_space;
441
0
}
442
443
int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
444
                                  size_t                      required_space)
445
0
{
446
0
    struct mk_list      *chunk_iterator_tmp;
447
0
    struct mk_list      *chunk_iterator;
448
0
    size_t               released_space;
449
0
    struct flb_sb       *context;
450
0
    struct sb_out_queue *backlog;
451
0
    struct sb_out_chunk *chunk;
452
453
0
    context = sb_get_context(output_plugin->config);
454
455
0
    if (context == NULL) {
456
0
        return -1;
457
0
    }
458
459
0
    backlog = sb_find_segregated_backlog_by_output_plugin_instance(
460
0
                                                        output_plugin, context);
461
462
0
    if (backlog == NULL) {
463
0
        return -2;
464
0
    }
465
466
0
    released_space = 0;
467
468
0
    mk_list_foreach_safe(chunk_iterator, chunk_iterator_tmp, &backlog->chunks) {
469
0
        chunk = mk_list_entry(chunk_iterator, struct sb_out_chunk, _head);
470
471
0
        released_space += chunk->size;
472
473
0
        cio_chunk_close(chunk->chunk, FLB_TRUE);
474
0
        sb_remove_chunk_from_segregated_backlogs(chunk->chunk, context);
475
476
0
        if (released_space >= required_space) {
477
0
            break;
478
0
        }
479
0
    }
480
481
0
    if (released_space < required_space) {
482
0
        return -3;
483
0
    }
484
485
0
    return 0;
486
0
}
487
488
/* Collection callback */
489
static int cb_queue_chunks(struct flb_input_instance *in,
490
                           struct flb_config *config, void *data)
491
0
{
492
0
    size_t                  empty_output_queue_count;
493
0
    struct mk_list         *output_queue_iterator;
494
0
    struct sb_out_queue    *output_queue_instance;
495
0
    struct sb_out_chunk    *chunk_instance;
496
0
    struct flb_sb          *ctx;
497
0
    struct flb_input_chunk *ic;
498
0
    struct flb_input_chunk  tmp_ic;
499
0
    void                   *ch;
500
0
    size_t                  total = 0;
501
0
    ssize_t                 size;
502
0
    int                     ret;
503
0
    int                     event_type;
504
505
    /* Get context */
506
0
    ctx = (struct flb_sb *) data;
507
508
    /* Get the total number of bytes already enqueued */
509
0
    total = flb_input_chunk_total_size(in);
510
511
    /* If we already hitted our limit, just wait and re-check later */
512
0
    if (total >= ctx->mem_limit) {
513
0
        return 0;
514
0
    }
515
516
0
    empty_output_queue_count = 0;
517
518
0
    while (total < ctx->mem_limit &&
519
0
           empty_output_queue_count < mk_list_size(&ctx->backlogs)) {
520
0
        empty_output_queue_count = 0;
521
522
0
        mk_list_foreach(output_queue_iterator, &ctx->backlogs) {
523
0
            output_queue_instance = mk_list_entry(output_queue_iterator,
524
0
                                                  struct sb_out_queue,
525
0
                                                  _head);
526
527
0
            if (mk_list_is_empty(&output_queue_instance->chunks) != 0) {
528
0
                chunk_instance = mk_list_entry_first(&output_queue_instance->chunks,
529
0
                                                     struct sb_out_chunk,
530
0
                                                     _head);
531
532
                /* Try to enqueue one chunk */
533
                /*
534
                 * All chunks on this backlog are 'file' based, always try to set
535
                 * them up. We validate the status.
536
                 */
537
0
                ret = cio_chunk_is_up(chunk_instance->chunk);
538
539
0
                if (ret == CIO_FALSE) {
540
0
                    ret = cio_chunk_up_force(chunk_instance->chunk);
541
542
0
                    if (ret == CIO_CORRUPTED) {
543
0
                        flb_plg_error(ctx->ins, "removing corrupted chunk from the "
544
0
                                      "queue %s:%s",
545
0
                                      chunk_instance->stream->name, chunk_instance->chunk->name);
546
0
                        cio_chunk_close(chunk_instance->chunk, FLB_FALSE);
547
0
                        sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx);
548
                        /* This function will indirecly release chunk_instance so it has to be
549
                         * called last.
550
                         */
551
0
                        continue;
552
0
                    }
553
0
                    else if (ret == CIO_ERROR || ret == CIO_RETRY) {
554
0
                        continue;
555
0
                    }
556
0
                }
557
558
                /*
559
                 * Map the chunk file context into a temporary buffer since the
560
                 * flb_input_chunk_get_event_type() interface needs an
561
                 * struct fb_input_chunk argument.
562
                 */
563
0
                tmp_ic.chunk = chunk_instance->chunk;
564
565
                /* Retrieve the event type: FLB_INPUT_LOGS, FLB_INPUT_METRICS of FLB_INPUT_TRACES */
566
0
                ret = flb_input_chunk_get_event_type(&tmp_ic);
567
0
                if (ret == -1) {
568
0
                    flb_plg_error(ctx->ins, "removing chunk with wrong metadata "
569
0
                                  "from the queue %s:%s",
570
0
                                  chunk_instance->stream->name,
571
0
                                  chunk_instance->chunk->name);
572
0
                    cio_chunk_close(chunk_instance->chunk, FLB_TRUE);
573
0
                    sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk,
574
0
                                                             ctx);
575
0
                    continue;
576
0
                }
577
0
                event_type = ret;
578
579
                /* get the number of bytes being used by the chunk */
580
0
                size = cio_chunk_get_content_size(chunk_instance->chunk);
581
0
                if (size <= 0) {
582
0
                    flb_plg_error(ctx->ins, "removing empty chunk from the "
583
0
                                  "queue %s:%s",
584
0
                                  chunk_instance->stream->name, chunk_instance->chunk->name);
585
0
                    cio_chunk_close(chunk_instance->chunk, FLB_TRUE);
586
0
                    sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx);
587
                    /* This function will indirecly release chunk_instance so it has to be
588
                     * called last.
589
                     */
590
0
                    continue;
591
0
                }
592
593
0
                ch = chunk_instance->chunk;
594
595
                /* Associate this backlog chunk to this instance into the engine */
596
0
                ic = flb_input_chunk_map(in, event_type, ch);
597
0
                if (!ic) {
598
0
                    flb_plg_error(ctx->ins, "removing chunk %s:%s from the queue",
599
0
                                  chunk_instance->stream->name, chunk_instance->chunk->name);
600
0
                    cio_chunk_down(chunk_instance->chunk);
601
602
                    /*
603
                     * If the file cannot be mapped, just drop it. Failures are all
604
                     * associated with data corruption.
605
                     */
606
0
                    cio_chunk_close(chunk_instance->chunk, FLB_TRUE);
607
0
                    sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx);
608
                    /* This function will indirecly release chunk_instance so it has to be
609
                     * called last.
610
                     */
611
0
                    continue;
612
0
                }
613
614
0
                flb_plg_info(ctx->ins, "queueing %s:%s",
615
0
                             chunk_instance->stream->name, chunk_instance->chunk->name);
616
617
                /* We are removing this chunk reference from this specific backlog
618
                 * queue but we need to leave it in the remainder queues.
619
                 */
620
0
                sb_remove_chunk_from_segregated_backlogs(chunk_instance->chunk, ctx);
621
622
                /* check our limits */
623
0
                total += size;
624
0
            }
625
0
            else {
626
0
                empty_output_queue_count++;
627
0
            }
628
0
        }
629
0
    }
630
631
0
    return 0;
632
0
}
633
634
/* Initialize plugin */
635
static int cb_sb_init(struct flb_input_instance *in,
636
                      struct flb_config *config, void *data)
637
0
{
638
0
    int ret;
639
0
    char mem[32];
640
0
    struct flb_sb *ctx;
641
642
0
    ctx = flb_malloc(sizeof(struct flb_sb));
643
0
    if (!ctx) {
644
0
        flb_errno();
645
0
        return -1;
646
0
    }
647
648
0
    ctx->cio = data;
649
0
    ctx->ins = in;
650
0
    ctx->mem_limit = flb_utils_size_to_bytes(config->storage_bl_mem_limit);
651
652
0
    mk_list_init(&ctx->backlogs);
653
654
0
    flb_utils_bytes_to_human_readable_size(ctx->mem_limit, mem, sizeof(mem) - 1);
655
0
    flb_plg_info(ctx->ins, "queue memory limit: %s", mem);
656
657
    /* export plugin context */
658
0
    flb_input_set_context(in, ctx);
659
660
    /* Set a collector to trigger the callback to queue data every second */
661
0
    ret = flb_input_set_collector_time(in, cb_queue_chunks, 1, 0, config);
662
0
    if (ret < 0) {
663
0
        flb_plg_error(ctx->ins, "could not create collector");
664
0
        flb_free(ctx);
665
0
        return -1;
666
0
    }
667
0
    ctx->coll_fd = ret;
668
669
0
    return 0;
670
0
}
671
672
static void cb_sb_pause(void *data, struct flb_config *config)
673
0
{
674
0
    struct flb_sb *ctx = data;
675
0
    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
676
0
}
677
678
static void cb_sb_resume(void *data, struct flb_config *config)
679
0
{
680
0
    struct flb_sb *ctx = data;
681
0
    flb_input_collector_resume(ctx->coll_fd, ctx->ins);
682
0
}
683
684
static int cb_sb_exit(void *data, struct flb_config *config)
685
0
{
686
0
    struct flb_sb *ctx = data;
687
688
0
    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
689
690
0
    sb_destroy_backlogs(ctx);
691
692
0
    flb_free(ctx);
693
694
0
    return 0;
695
0
}
696
697
/* Plugin reference */
698
struct flb_input_plugin in_storage_backlog_plugin = {
699
    .name         = "storage_backlog",
700
    .description  = "Storage Backlog",
701
    .cb_init      = cb_sb_init,
702
    .cb_pre_run   = NULL,
703
    .cb_collect   = NULL,
704
    .cb_ingest    = NULL,
705
    .cb_flush_buf = NULL,
706
    .cb_pause     = cb_sb_pause,
707
    .cb_resume    = cb_sb_resume,
708
    .cb_exit      = cb_sb_exit,
709
710
    /* This plugin can only be configured and invoked by the Engine */
711
    .flags        = FLB_INPUT_PRIVATE
712
};