Coverage Report

Created: 2025-01-28 07:34

/src/fluent-bit/src/flb_storage.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_input.h>
22
#include <fluent-bit/flb_log.h>
23
#include <fluent-bit/flb_storage.h>
24
#include <fluent-bit/flb_scheduler.h>
25
#include <fluent-bit/flb_utils.h>
26
#include <fluent-bit/flb_http_server.h>
27
28
static struct cmt *metrics_context_create(struct flb_storage_metrics *sm)
29
2.76k
{
30
2.76k
    struct cmt *cmt;
31
32
2.76k
    cmt = cmt_create();
33
2.76k
    if (!cmt) {
34
0
        return NULL;
35
0
    }
36
37
2.76k
    sm->cmt_chunks = cmt_gauge_create(cmt,
38
2.76k
                                      "fluentbit", "storage", "chunks",
39
2.76k
                                      "Total number of chunks in the storage layer.",
40
2.76k
                                      0, (char *[]) { NULL });
41
42
2.76k
    sm->cmt_mem_chunks = cmt_gauge_create(cmt,
43
2.76k
                                          "fluentbit", "storage", "mem_chunks",
44
2.76k
                                          "Total number of memory chunks.",
45
2.76k
                                          0, (char *[]) { NULL });
46
47
2.76k
    sm->cmt_fs_chunks = cmt_gauge_create(cmt,
48
2.76k
                                         "fluentbit", "storage", "fs_chunks",
49
2.76k
                                         "Total number of filesystem chunks.",
50
2.76k
                                         0, (char *[]) { NULL });
51
52
2.76k
    sm->cmt_fs_chunks_up = cmt_gauge_create(cmt,
53
2.76k
                                            "fluentbit", "storage", "fs_chunks_up",
54
2.76k
                                            "Total number of filesystem chunks up in memory.",
55
2.76k
                                            0, (char *[]) { NULL });
56
57
2.76k
    sm->cmt_fs_chunks_down = cmt_gauge_create(cmt,
58
2.76k
                                              "fluentbit", "storage", "fs_chunks_down",
59
2.76k
                                              "Total number of filesystem chunks down.",
60
2.76k
                                              0, (char *[]) { NULL });
61
62
2.76k
    return cmt;
63
2.76k
}
64
65
66
/* This function collect the 'global' metrics of the storage layer (cmetrics) */
67
int flb_storage_metrics_update(struct flb_config *ctx, struct flb_storage_metrics *sm)
68
0
{
69
0
    uint64_t ts;
70
0
    struct cio_stats st;
71
72
    /* Retrieve general stats from the storage layer */
73
0
    cio_stats_get(ctx->cio, &st);
74
75
0
    ts = cfl_time_now();
76
77
0
    cmt_gauge_set(sm->cmt_chunks, ts, st.chunks_total, 0, NULL);
78
0
    cmt_gauge_set(sm->cmt_mem_chunks, ts, st.chunks_mem, 0, NULL);
79
0
    cmt_gauge_set(sm->cmt_fs_chunks, ts, st.chunks_fs, 0, NULL);
80
0
    cmt_gauge_set(sm->cmt_fs_chunks_up, ts, st.chunks_fs_up, 0, NULL);
81
0
    cmt_gauge_set(sm->cmt_fs_chunks_down, ts, st.chunks_fs_down, 0, NULL);
82
83
0
    return 0;
84
0
}
85
86
static void metrics_append_general(msgpack_packer *mp_pck,
87
                                   struct flb_config *ctx,
88
                                   struct flb_storage_metrics *sm)
89
3
{
90
3
    struct cio_stats storage_st;
91
92
    /* Retrieve general stats from the storage layer */
93
3
    cio_stats_get(ctx->cio, &storage_st);
94
95
3
    msgpack_pack_str(mp_pck, 13);
96
3
    msgpack_pack_str_body(mp_pck, "storage_layer", 13);
97
3
    msgpack_pack_map(mp_pck, 1);
98
99
    /* Chunks */
100
3
    msgpack_pack_str(mp_pck, 6);
101
3
    msgpack_pack_str_body(mp_pck, "chunks", 6);
102
3
    msgpack_pack_map(mp_pck, 5);
103
104
    /* chunks['total_chunks'] */
105
3
    msgpack_pack_str(mp_pck, 12);
106
3
    msgpack_pack_str_body(mp_pck, "total_chunks", 12);
107
3
    msgpack_pack_uint64(mp_pck, storage_st.chunks_total);
108
109
    /* chunks['mem_chunks'] */
110
3
    msgpack_pack_str(mp_pck, 10);
111
3
    msgpack_pack_str_body(mp_pck, "mem_chunks", 10);
112
3
    msgpack_pack_uint64(mp_pck, storage_st.chunks_mem);
113
114
    /* chunks['fs_chunks'] */
115
3
    msgpack_pack_str(mp_pck, 9);
116
3
    msgpack_pack_str_body(mp_pck, "fs_chunks", 9);
117
3
    msgpack_pack_uint64(mp_pck, storage_st.chunks_fs);
118
119
    /* chunks['fs_up_chunks'] */
120
3
    msgpack_pack_str(mp_pck, 12);
121
3
    msgpack_pack_str_body(mp_pck, "fs_chunks_up", 12);
122
3
    msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_up);
123
124
    /* chunks['fs_down_chunks'] */
125
3
    msgpack_pack_str(mp_pck, 14);
126
3
    msgpack_pack_str_body(mp_pck, "fs_chunks_down", 14);
127
3
    msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_down);
128
3
}
129
130
static void metrics_append_input(msgpack_packer *mp_pck,
131
                                 struct flb_config *ctx,
132
                                 struct flb_storage_metrics *sm)
133
3
{
134
3
    int len;
135
3
    int ret;
136
3
    uint64_t ts;
137
3
    const char *tmp;
138
3
    char buf[32];
139
3
    ssize_t size;
140
3
    size_t total_chunks;
141
142
    /* chunks */
143
3
    int up;
144
3
    int down;
145
3
    int busy;
146
3
    char *name;
147
3
    ssize_t busy_size;
148
3
    struct mk_list *head;
149
3
    struct mk_list *h_chunks;
150
3
    struct flb_input_instance *i;
151
3
    struct flb_input_chunk *ic;
152
153
    /*
154
     * DISCLAIMER: This interface will be deprecated once we extend Chunk I/O
155
     * stats per stream.
156
     *
157
     * For now and to avoid duplication of iterating chunks we are adding the
158
     * metrics counting for CMetrics inside the same logic for the old code.
159
     */
160
161
3
    msgpack_pack_str(mp_pck, 12);
162
3
    msgpack_pack_str_body(mp_pck, "input_chunks", 12);
163
3
    msgpack_pack_map(mp_pck, mk_list_size(&ctx->inputs));
164
165
    /* current time */
166
3
    ts = cfl_time_now();
167
168
    /* Input Plugins Ingestion */
169
3
    mk_list_foreach(head, &ctx->inputs) {
170
3
        i = mk_list_entry(head, struct flb_input_instance, _head);
171
172
3
        name = (char *) flb_input_name(i);
173
3
        total_chunks = mk_list_size(&i->chunks);
174
175
3
        tmp = flb_input_name(i);
176
3
        len = strlen(tmp);
177
178
3
        msgpack_pack_str(mp_pck, len);
179
3
        msgpack_pack_str_body(mp_pck, tmp, len);
180
181
        /* Map for 'status' and 'chunks' */
182
3
        msgpack_pack_map(mp_pck, 2);
183
184
        /*
185
         * Status
186
         * ======
187
         */
188
3
        msgpack_pack_str(mp_pck, 6);
189
3
        msgpack_pack_str_body(mp_pck, "status", 6);
190
191
        /* 'status' map has 2 keys: overlimit and chunks */
192
3
        msgpack_pack_map(mp_pck, 3);
193
194
        /* status['overlimit'] */
195
3
        msgpack_pack_str(mp_pck, 9);
196
3
        msgpack_pack_str_body(mp_pck, "overlimit", 9);
197
198
199
        /* CMetrics */
200
3
        ret = FLB_FALSE;
201
3
        if (i->mem_buf_limit > 0) {
202
0
            if (i->mem_chunks_size >= i->mem_buf_limit) {
203
0
                ret = FLB_TRUE;
204
0
            }
205
0
        }
206
3
        if (ret == FLB_TRUE) {
207
            /* cmetrics */
208
0
            cmt_gauge_set(i->cmt_storage_overlimit, ts, 1,
209
0
                          1, (char *[]) {name});
210
211
            /* old code */
212
0
            msgpack_pack_true(mp_pck);
213
0
        }
214
3
        else {
215
            /* cmetrics */
216
3
            cmt_gauge_set(i->cmt_storage_overlimit, ts, 0,
217
3
                          1, (char *[]) {name});
218
219
            /* old code */
220
3
            msgpack_pack_false(mp_pck);
221
3
        }
222
223
        /* fluentbit_storage_memory_bytes */
224
3
        cmt_gauge_set(i->cmt_storage_memory_bytes, ts, i->mem_chunks_size,
225
3
                      1, (char *[]) {name});
226
227
        /* status['mem_size'] */
228
3
        msgpack_pack_str(mp_pck, 8);
229
3
        msgpack_pack_str_body(mp_pck, "mem_size", 8);
230
231
        /* Current memory size used based on last ingestion */
232
3
        flb_utils_bytes_to_human_readable_size(i->mem_chunks_size,
233
3
                                               buf, sizeof(buf) - 1);
234
3
        len = strlen(buf);
235
3
        msgpack_pack_str(mp_pck, len);
236
3
        msgpack_pack_str_body(mp_pck, buf, len);
237
238
        /* status['mem_limit'] */
239
3
        msgpack_pack_str(mp_pck, 9);
240
3
        msgpack_pack_str_body(mp_pck, "mem_limit", 9);
241
242
3
        flb_utils_bytes_to_human_readable_size(i->mem_buf_limit,
243
3
                                               buf, sizeof(buf) - 1);
244
3
        len = strlen(buf);
245
3
        msgpack_pack_str(mp_pck, len);
246
3
        msgpack_pack_str_body(mp_pck, buf, len);
247
248
        /*
249
         * Chunks
250
         * ======
251
         */
252
253
        /* cmetrics */
254
3
        cmt_gauge_set(i->cmt_storage_chunks, ts, total_chunks,
255
3
                      1, (char *[]) {name});
256
257
258
        /* old code */
259
3
        msgpack_pack_str(mp_pck, 6);
260
3
        msgpack_pack_str_body(mp_pck, "chunks", 6);
261
262
        /* 'chunks' has 3 keys: total, up, down, busy and busy_size */
263
3
        msgpack_pack_map(mp_pck, 5);
264
265
        /* chunks['total_chunks'] */
266
3
        msgpack_pack_str(mp_pck, 5);
267
3
        msgpack_pack_str_body(mp_pck, "total", 5);
268
3
        msgpack_pack_uint64(mp_pck, total_chunks);
269
270
        /*
271
         * chunks Details: chunks marked as 'busy' are 'locked' since they are in
272
         * a 'flush' state. No more data can be appended to a busy chunk.
273
         */
274
3
        busy = 0;
275
3
        busy_size = 0;
276
277
        /* up/down */
278
3
        up = 0;
279
3
        down = 0;
280
281
        /* Iterate chunks for the input instance in question */
282
3
        mk_list_foreach(h_chunks, &i->chunks) {
283
0
            ic = mk_list_entry(h_chunks, struct flb_input_chunk, _head);
284
0
            if (ic->busy == FLB_TRUE) {
285
0
                busy++;
286
0
                size = cio_chunk_get_content_size(ic->chunk);
287
0
                if (size >= 0) {
288
0
                    busy_size += size;
289
0
                }
290
0
            }
291
292
0
            if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
293
0
                up++;
294
0
            }
295
0
            else {
296
0
                down++;
297
0
            }
298
299
0
        }
300
301
        /* fluentbit_storage_chunks_up */
302
3
        cmt_gauge_set(i->cmt_storage_chunks_up, ts, up,
303
3
                      1, (char *[]) {name});
304
305
        /* chunks['up'] */
306
3
        msgpack_pack_str(mp_pck, 2);
307
3
        msgpack_pack_str_body(mp_pck, "up", 2);
308
3
        msgpack_pack_uint64(mp_pck, up);
309
310
        /* fluentbit_storage_chunks_down */
311
3
        cmt_gauge_set(i->cmt_storage_chunks_down, ts, down,
312
3
                      1, (char *[]) {name});
313
314
        /* chunks['down'] */
315
3
        msgpack_pack_str(mp_pck, 4);
316
3
        msgpack_pack_str_body(mp_pck, "down", 4);
317
3
        msgpack_pack_uint64(mp_pck, down);
318
319
        /* fluentbit_storage_chunks_busy */
320
3
        cmt_gauge_set(i->cmt_storage_chunks_busy, ts, busy,
321
3
                      1, (char *[]) {name});
322
323
        /* chunks['busy'] */
324
3
        msgpack_pack_str(mp_pck, 4);
325
3
        msgpack_pack_str_body(mp_pck, "busy", 4);
326
3
        msgpack_pack_uint64(mp_pck, busy);
327
328
        /* fluentbit_storage_chunks_busy_size */
329
3
        cmt_gauge_set(i->cmt_storage_chunks_busy_bytes, ts, busy_size,
330
3
                      1, (char *[]) {name});
331
332
        /* chunks['busy_size'] */
333
3
        msgpack_pack_str(mp_pck, 9);
334
3
        msgpack_pack_str_body(mp_pck, "busy_size", 9);
335
336
3
        flb_utils_bytes_to_human_readable_size(busy_size, buf, sizeof(buf) - 1);
337
3
        len = strlen(buf);
338
3
        msgpack_pack_str(mp_pck, len);
339
3
        msgpack_pack_str_body(mp_pck, buf, len);
340
3
    }
341
3
}
342
343
static void cb_storage_metrics_collect(struct flb_config *ctx, void *data)
344
3
{
345
3
    msgpack_sbuffer mp_sbuf;
346
3
    msgpack_packer mp_pck;
347
348
    /* Prepare new outgoing buffer */
349
3
    msgpack_sbuffer_init(&mp_sbuf);
350
3
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
351
352
    /* Pack main map and append relevant data */
353
3
    msgpack_pack_map(&mp_pck, 2);
354
3
    metrics_append_general(&mp_pck, ctx, data);
355
3
    metrics_append_input(&mp_pck, ctx, data);
356
357
3
#ifdef FLB_HAVE_HTTP_SERVER
358
3
    if (ctx->http_server == FLB_TRUE && ctx->storage_metrics == FLB_TRUE) {
359
0
        flb_hs_push_storage_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size);
360
0
    }
361
3
#endif
362
3
    msgpack_sbuffer_destroy(&mp_sbuf);
363
3
}
364
365
struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx)
366
2.76k
{
367
2.76k
    int ret;
368
2.76k
    struct flb_storage_metrics *sm;
369
370
2.76k
    sm = flb_calloc(1, sizeof(struct flb_storage_metrics));
371
2.76k
    if (!sm) {
372
0
        flb_errno();
373
0
        return NULL;
374
0
    }
375
2.76k
    sm->cmt = metrics_context_create(sm);
376
2.76k
    if(!sm->cmt) {
377
0
        flb_free(sm);
378
0
        return NULL;
379
0
    }
380
381
2.76k
    ret = flb_sched_timer_cb_create(ctx->sched, FLB_SCHED_TIMER_CB_PERM, 5000,
382
2.76k
                                    cb_storage_metrics_collect,
383
2.76k
                                    ctx->storage_metrics_ctx, NULL);
384
2.76k
    if (ret == -1) {
385
0
        flb_error("[storage metrics] cannot create timer to collect metrics");
386
0
        flb_free(sm);
387
0
        return NULL;
388
0
    }
389
390
2.76k
    return sm;
391
2.76k
}
392
393
static int sort_chunk_cmp(const void *a_arg, const void *b_arg)
394
0
{
395
0
    char *p;
396
0
    struct cio_chunk *chunk_a = *(struct cio_chunk **) a_arg;
397
0
    struct cio_chunk *chunk_b = *(struct cio_chunk **) b_arg;
398
0
    struct timespec tm_a;
399
0
    struct timespec tm_b;
400
401
    /* Scan Chunk A */
402
0
    p = strchr(chunk_a->name, '-');
403
0
    if (!p) {
404
0
        return -1;
405
0
    }
406
0
    p++;
407
408
0
    sscanf(p, "%lu.%lu.flb", &tm_a.tv_sec, &tm_a.tv_nsec);
409
410
    /* Scan Chunk B */
411
0
    p = strchr(chunk_b->name, '-');
412
0
    if (!p) {
413
0
        return -1;
414
0
    }
415
0
    p++;
416
0
    sscanf(p, "%lu.%lu.flb", &tm_b.tv_sec, &tm_b.tv_nsec);
417
418
    /* Compare */
419
0
    if (tm_a.tv_sec != tm_b.tv_sec) {
420
0
        if (tm_a.tv_sec > tm_b.tv_sec) {
421
0
            return 1;
422
0
        }
423
0
        else {
424
0
            return -1;
425
0
        }
426
0
    }
427
0
    else {
428
0
        if (tm_a.tv_nsec > tm_b.tv_nsec) {
429
0
            return 1;
430
0
        }
431
0
        else if (tm_a.tv_nsec < tm_b.tv_nsec) {
432
0
            return -1;
433
0
        }
434
0
    }
435
436
0
    return 0;
437
0
}
438
439
static void print_storage_info(struct flb_config *ctx, struct cio_ctx *cio)
440
2.76k
{
441
2.76k
    char *type;
442
2.76k
    char *sync;
443
2.76k
    char *checksum;
444
2.76k
    struct flb_input_instance *in;
445
446
2.76k
    if (cio->options.root_path) {
447
1.01k
        type = "memory+filesystem";
448
1.01k
    }
449
1.75k
    else {
450
1.75k
        type = "memory";
451
1.75k
    }
452
453
2.76k
    if (cio->options.flags & CIO_FULL_SYNC) {
454
0
        sync = "full";
455
0
    }
456
2.76k
    else {
457
2.76k
        sync = "normal";
458
2.76k
    }
459
460
2.76k
    if (cio->options.flags & CIO_CHECKSUM) {
461
0
        checksum = "on";
462
0
    }
463
2.76k
    else {
464
2.76k
        checksum = "off";
465
2.76k
    }
466
467
2.76k
    flb_info("[storage] ver=%s, type=%s, sync=%s, checksum=%s, max_chunks_up=%i",
468
2.76k
             cio_version(), type, sync, checksum, ctx->storage_max_chunks_up);
469
470
    /* Storage input plugin */
471
2.76k
    if (ctx->storage_input_plugin) {
472
1.01k
        in = (struct flb_input_instance *) ctx->storage_input_plugin;
473
1.01k
        flb_info("[storage] backlog input plugin: %s", in->name);
474
1.01k
    }
475
2.76k
}
476
477
static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line,
478
                  char *str)
479
1
{
480
1
    if (level == CIO_LOG_ERROR) {
481
0
        flb_error("[storage] %s", str);
482
0
    }
483
1
    else if (level == CIO_LOG_WARN) {
484
0
        flb_warn("[storage] %s", str);
485
0
    }
486
1
    else if (level == CIO_LOG_INFO) {
487
1
        flb_info("[storage] %s", str);
488
1
    }
489
0
    else if (level == CIO_LOG_DEBUG) {
490
0
        flb_debug("[storage] %s", str);
491
0
    }
492
493
1
    return 0;
494
1
}
495
496
int flb_storage_input_create(struct cio_ctx *cio,
497
                             struct flb_input_instance *in)
498
3.78k
{
499
3.78k
    int cio_storage_type;
500
3.78k
    struct flb_storage_input *si;
501
3.78k
    struct cio_stream *stream;
502
503
    /* storage config: get stream type */
504
3.78k
    if (in->storage_type == -1) {
505
2.76k
        in->storage_type = FLB_STORAGE_MEM;
506
2.76k
    }
507
508
3.78k
    if (in->storage_type == FLB_STORAGE_FS && cio->options.root_path == NULL) {
509
0
        flb_error("[storage] instance '%s' requested filesystem storage "
510
0
                  "but no filesystem path was defined.",
511
0
                  flb_input_name(in));
512
0
        return -1;
513
0
    }
514
515
    /*
516
     * The input instance can define it owns storage type which is based on some
517
     * specific Chunk I/O storage type. We handle the proper initialization here.
518
     */
519
3.78k
    cio_storage_type = in->storage_type;
520
3.78k
    if (in->storage_type == FLB_STORAGE_MEMRB) {
521
0
        cio_storage_type = FLB_STORAGE_MEM;
522
0
    }
523
524
    /* Check for duplicates */
525
3.78k
    stream = cio_stream_get(cio, in->name);
526
3.78k
    if (!stream) {
527
        /* create stream for input instance */
528
2.76k
        stream = cio_stream_create(cio, in->name, cio_storage_type);
529
2.76k
        if (!stream) {
530
0
            flb_error("[storage] cannot create stream for instance %s",
531
0
                      in->name);
532
0
            return -1;
533
0
        }
534
2.76k
    }
535
1.01k
    else if (stream->type != cio_storage_type) {
536
0
        flb_debug("[storage] storage type mismatch. input type=%s",
537
0
                  flb_storage_get_type(in->storage_type));
538
0
        if (stream->type == FLB_STORAGE_FS) {
539
0
            flb_warn("[storage] Need to remove '%s/%s' if it is empty", cio->options.root_path, in->name);
540
0
        }
541
542
0
        cio_stream_destroy(stream);
543
0
        stream = cio_stream_create(cio, in->name, cio_storage_type);
544
0
        if (!stream) {
545
0
            flb_error("[storage] cannot create stream for instance %s",
546
0
                      in->name);
547
0
            return -1;
548
0
        }
549
0
        flb_info("[storage] re-create stream type=%s", flb_storage_get_type(in->storage_type));
550
0
    }
551
552
    /* allocate storage context for the input instance */
553
3.78k
    si = flb_malloc(sizeof(struct flb_storage_input));
554
3.78k
    if (!si) {
555
0
        flb_errno();
556
0
        return -1;
557
0
    }
558
559
3.78k
    si->stream = stream;
560
3.78k
    si->cio = cio;
561
3.78k
    si->type = in->storage_type;
562
3.78k
    in->storage = si;
563
564
3.78k
    return 0;
565
3.78k
}
566
567
void flb_storage_input_destroy(struct flb_input_instance *in)
568
7.56k
{
569
7.56k
    struct mk_list *tmp;
570
7.56k
    struct mk_list *head;
571
7.56k
    struct flb_input_chunk *ic;
572
573
    /* Save current temporary data and destroy chunk references */
574
7.56k
    mk_list_foreach_safe(head, tmp, &in->chunks) {
575
0
        ic = mk_list_entry(head, struct flb_input_chunk, _head);
576
0
        flb_input_chunk_destroy(ic, FLB_FALSE);
577
0
    }
578
579
7.56k
    flb_free(in->storage);
580
7.56k
    in->storage = NULL;
581
7.56k
}
582
583
static int storage_contexts_create(struct flb_config *config)
584
2.76k
{
585
2.76k
    int c = 0;
586
2.76k
    int ret;
587
2.76k
    struct mk_list *head;
588
2.76k
    struct flb_input_instance *in;
589
590
    /* Iterate each input instance and create a stream for it */
591
3.78k
    mk_list_foreach(head, &config->inputs) {
592
3.78k
        in = mk_list_entry(head, struct flb_input_instance, _head);
593
3.78k
        ret = flb_storage_input_create(config->cio, in);
594
3.78k
        if (ret == -1) {
595
0
            flb_error("[storage] could not create storage for instance: %s",
596
0
                      in->name);
597
0
            return -1;
598
0
        }
599
3.78k
        c++;
600
3.78k
    }
601
602
2.76k
    return c;
603
2.76k
}
604
605
int flb_storage_create(struct flb_config *ctx)
606
2.76k
{
607
2.76k
    int ret;
608
2.76k
    int flags;
609
2.76k
    struct flb_input_instance *in = NULL;
610
2.76k
    struct cio_ctx *cio;
611
2.76k
    struct cio_options opts = {0};
612
613
    /* always use read/write mode */
614
2.76k
    flags = CIO_OPEN;
615
616
    /* if explicitly stated any irrecoverably corrupted
617
     * chunks will be deleted */
618
2.76k
    if (ctx->storage_del_bad_chunks) {
619
0
        flags |= CIO_DELETE_IRRECOVERABLE;
620
0
    }
621
622
    /* synchronization mode */
623
2.76k
    if (ctx->storage_sync) {
624
0
        if (strcasecmp(ctx->storage_sync, "normal") == 0) {
625
            /* do nothing, keep the default */
626
0
        }
627
0
        else if (strcasecmp(ctx->storage_sync, "full") == 0) {
628
0
            flags |= CIO_FULL_SYNC;
629
0
        }
630
0
        else {
631
0
            flb_error("[storage] invalid synchronization mode");
632
0
            return -1;
633
0
        }
634
0
    }
635
636
    /* checksum */
637
2.76k
    if (ctx->storage_checksum == FLB_TRUE) {
638
0
        flags |= CIO_CHECKSUM;
639
0
    }
640
641
    /* file trimming */
642
2.76k
    if (ctx->storage_trim_files == FLB_TRUE) {
643
0
        flags |= CIO_TRIM_FILES;
644
0
    }
645
646
    /* chunkio options */
647
2.76k
    cio_options_init(&opts);
648
649
2.76k
    opts.root_path = ctx->storage_path;
650
2.76k
    opts.flags = flags;
651
2.76k
    opts.log_cb = log_cb;
652
2.76k
    opts.log_level = CIO_LOG_INFO;
653
654
    /* Create chunkio context */
655
2.76k
    cio = cio_create(&opts);
656
2.76k
    if (!cio) {
657
0
        flb_error("[storage] error initializing storage engine");
658
0
        return -1;
659
0
    }
660
2.76k
    ctx->cio = cio;
661
662
    /* Set Chunk I/O maximum number of chunks up */
663
2.76k
    if (ctx->storage_max_chunks_up == 0) {
664
2.76k
        ctx->storage_max_chunks_up = FLB_STORAGE_MAX_CHUNKS_UP;
665
2.76k
    }
666
2.76k
    cio_set_max_chunks_up(ctx->cio, ctx->storage_max_chunks_up);
667
668
    /* Load content from the file system if any */
669
2.76k
    ret = cio_load(ctx->cio, NULL);
670
2.76k
    if (ret == -1) {
671
0
        flb_error("[storage] error scanning root path content: %s",
672
0
                  ctx->storage_path);
673
0
        cio_destroy(ctx->cio);
674
0
        return -1;
675
0
    }
676
677
    /* Sort chunks */
678
2.76k
    cio_qsort(ctx->cio, sort_chunk_cmp);
679
680
    /*
681
     * If we have a filesystem storage path, create an instance of the
682
     * storage_backlog input plugin to consume any possible pending
683
     * chunks.
684
     */
685
2.76k
    if (ctx->storage_path) {
686
1.01k
        in = flb_input_new(ctx, "storage_backlog", cio, FLB_FALSE);
687
1.01k
        if (!in) {
688
0
            flb_error("[storage] cannot init storage backlog input plugin");
689
0
            cio_destroy(cio);
690
0
            ctx->cio = NULL;
691
0
            return -1;
692
0
        }
693
1.01k
        ctx->storage_input_plugin = in;
694
695
        /* Set a queue memory limit */
696
1.01k
        if (!ctx->storage_bl_mem_limit) {
697
1.01k
            ctx->storage_bl_mem_limit = flb_strdup(FLB_STORAGE_BL_MEM_LIMIT);
698
1.01k
        }
699
1.01k
    }
700
701
    /* Create streams for input instances */
702
2.76k
    ret = storage_contexts_create(ctx);
703
2.76k
    if (ret == -1) {
704
0
        return -1;
705
0
    }
706
707
    /* print storage info */
708
2.76k
    print_storage_info(ctx, cio);
709
710
2.76k
    return 0;
711
2.76k
}
712
713
void flb_storage_destroy(struct flb_config *ctx)
714
2.76k
{
715
2.76k
    struct cio_ctx *cio;
716
2.76k
    struct flb_storage_metrics *sm;
717
718
    /* Destroy Chunk I/O context */
719
2.76k
    cio = (struct cio_ctx *) ctx->cio;
720
721
2.76k
    if (!cio) {
722
0
        return;
723
0
    }
724
725
2.76k
    sm = ctx->storage_metrics_ctx;
726
2.76k
    if (ctx->storage_metrics == FLB_TRUE && sm != NULL) {
727
2.76k
        cmt_destroy(sm->cmt);
728
2.76k
        flb_free(sm);
729
2.76k
        ctx->storage_metrics_ctx = NULL;
730
2.76k
    }
731
732
2.76k
    cio_destroy(cio);
733
2.76k
    ctx->cio = NULL;
734
2.76k
}