Coverage Report

Created: 2024-02-11 07:30

/src/fluent-bit/src/flb_fstore.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_fstore.h>
22
#include <fluent-bit/flb_log.h>
23
#include <fluent-bit/flb_mem.h>
24
#include <fluent-bit/flb_sds.h>
25
#include <chunkio/chunkio.h>
26
27
static int log_cb(struct cio_ctx *ctx, int level, const char *file, int line,
28
                  char *str)
29
131
{
30
131
    if (level == CIO_LOG_ERROR) {
31
0
        flb_error("[fstore] %s", str);
32
0
    }
33
131
    else if (level == CIO_LOG_WARN) {
34
0
        flb_warn("[fstore] %s", str);
35
0
    }
36
131
    else if (level == CIO_LOG_INFO) {
37
131
        flb_info("[fstore] %s", str);
38
131
    }
39
0
    else if (level == CIO_LOG_DEBUG) {
40
0
        flb_debug("[fstore] %s", str);
41
0
    }
42
43
131
    return 0;
44
131
}
45
46
/*
47
 * this function sets metadata into a fstore_file structure, note that it makes
48
 * it own copy of the data to set a NULL byte at the end.
49
 */
50
static int meta_set(struct flb_fstore_file *fsf, void *meta, size_t size)
51
0
{
52
53
0
    char *p;
54
55
0
    p = flb_calloc(1, size + 1);
56
0
    if (!p) {
57
0
        flb_errno();
58
0
        flb_error("[fstore] could not cache metadata in file: %s:%s",
59
0
                  fsf->stream->name, fsf->chunk->name);
60
0
        return -1;
61
0
    }
62
63
0
    if (fsf->meta_buf) {
64
0
        flb_free(fsf->meta_buf);
65
0
    }
66
0
    fsf->meta_buf = p;
67
0
    memcpy(fsf->meta_buf, meta, size);
68
0
    fsf->meta_size = size;
69
70
0
    return 0;
71
0
}
72
73
/* Set a file metadata */
74
int flb_fstore_file_meta_set(struct flb_fstore *fs,
75
                             struct flb_fstore_file *fsf,
76
                             void *meta, size_t size)
77
0
{
78
0
    int ret;
79
0
    int set_down = FLB_FALSE;
80
81
    /* Check if the chunk is up */
82
0
    if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
83
0
        ret = cio_chunk_up_force(fsf->chunk);
84
0
        if (ret != CIO_OK) {
85
0
            flb_error("[fstore] error loading up file chunk");
86
0
            return -1;
87
0
        }
88
0
        set_down = FLB_TRUE;
89
0
    }
90
91
0
    ret = cio_meta_write(fsf->chunk, meta, size);
92
0
    if (ret == -1) {
93
0
        flb_error("[fstore] could not write metadata to file: %s:%s",
94
0
                  fsf->stream->name, fsf->chunk->name);
95
96
0
        if (set_down == FLB_TRUE) {
97
0
            cio_chunk_down(fsf->chunk);
98
0
        }
99
100
0
        return -1;
101
0
    }
102
103
0
    if (set_down == FLB_TRUE) {
104
0
        cio_chunk_down(fsf->chunk);
105
0
    }
106
107
0
    return meta_set(fsf, meta, size);
108
0
}
109
110
/* Re-read Chunk I/O metadata into fstore file */
111
int flb_fstore_file_meta_get(struct flb_fstore *fs,
112
                             struct flb_fstore_file *fsf)
113
0
{
114
0
    int ret;
115
0
    int set_down = FLB_FALSE;
116
0
    char *meta_buf = NULL;
117
0
    int meta_size = 0;
118
119
    /* Check if the chunk is up */
120
0
    if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
121
0
        ret = cio_chunk_up_force(fsf->chunk);
122
0
        if (ret != CIO_OK) {
123
0
            flb_error("[fstore] error loading up file chunk");
124
0
            return -1;
125
0
        }
126
0
        set_down = FLB_TRUE;
127
0
    }
128
129
0
    ret = cio_meta_read(fsf->chunk, &meta_buf, &meta_size);
130
0
    if (ret == -1) {
131
0
        flb_error("[fstore] error reading file chunk metadata");
132
0
        if (set_down == FLB_TRUE) {
133
0
            cio_chunk_down(fsf->chunk);
134
0
        }
135
0
    }
136
137
0
    ret = meta_set(fsf, meta_buf, meta_size);
138
0
    if (ret == -1) {
139
0
        flb_free(meta_buf);
140
0
        if (set_down == FLB_TRUE) {
141
0
            cio_chunk_down(fsf->chunk);
142
0
        }
143
0
        return -1;
144
0
    }
145
146
0
    if (set_down == FLB_TRUE) {
147
0
        cio_chunk_down(fsf->chunk);
148
0
    }
149
0
    return 0;
150
0
}
151
152
/* Create a new file */
153
struct flb_fstore_file *flb_fstore_file_create(struct flb_fstore *fs,
154
                                               struct flb_fstore_stream *fs_stream,
155
                                               char *name, size_t size)
156
125
{
157
125
    int err;
158
125
    struct cio_chunk *chunk;
159
125
    struct flb_fstore_file *fsf;
160
161
125
    fsf = flb_calloc(1, sizeof(struct flb_fstore_file));
162
125
    if (!fsf) {
163
1
        flb_errno();
164
1
        return NULL;
165
1
    }
166
124
    fsf->stream = fs_stream->stream;
167
168
124
    fsf->name = flb_sds_create(name);
169
124
    if (!fsf->name) {
170
1
        flb_error("[fstore] could not create file: %s:%s",
171
1
                  fsf->stream->name, name);
172
1
        flb_free(fsf);
173
1
        return NULL;
174
1
    }
175
176
123
    chunk = cio_chunk_open(fs->cio, fs_stream->stream, name,
177
123
                           CIO_OPEN, size, &err);
178
123
    if (!chunk) {
179
0
        flb_error("[fstore] could not create file: %s:%s",
180
0
                  fsf->stream->name, name);
181
0
        flb_sds_destroy(fsf->name);
182
0
        flb_free(fsf);
183
0
        return NULL;
184
0
    }
185
186
123
    fsf->chunk = chunk;
187
123
    mk_list_add(&fsf->_head, &fs_stream->files);
188
189
123
    return fsf;
190
123
}
191
192
/* Lookup file on stream by using it name */
193
struct flb_fstore_file *flb_fstore_file_get(struct flb_fstore *fs,
194
                                            struct flb_fstore_stream *fs_stream,
195
                                            char *name, size_t size)
196
0
{
197
0
    struct mk_list *head;
198
0
    struct flb_fstore_file *fsf;
199
200
0
    mk_list_foreach(head, &fs_stream->files) {
201
0
        fsf = mk_list_entry(head, struct flb_fstore_file, _head);
202
0
        if (flb_sds_len(fsf->name) != size) {
203
0
            continue;
204
0
        }
205
206
0
        if (strncmp(fsf->name, name, size) == 0) {
207
0
            return fsf;
208
0
        }
209
0
    }
210
211
0
    return NULL;
212
0
}
213
214
/*
215
 * Set a file to inactive mode. Inactive means just to remove the reference
216
 * from the list.
217
 */
218
int flb_fstore_file_inactive(struct flb_fstore *fs,
219
                             struct flb_fstore_file *fsf)
220
123
{
221
    /* close the Chunk I/O reference, but don't delete the real file */
222
123
    if (fsf->chunk) {
223
123
        cio_chunk_close(fsf->chunk, CIO_FALSE);
224
123
    }
225
226
    /* release */
227
123
    mk_list_del(&fsf->_head);
228
123
    flb_sds_destroy(fsf->name);
229
123
    if (fsf->meta_buf) {
230
0
        flb_free(fsf->meta_buf);
231
0
    }
232
123
    flb_free(fsf);
233
234
123
    return 0;
235
123
}
236
237
/* Delete a file (permantent deletion) */
238
int flb_fstore_file_delete(struct flb_fstore *fs,
239
                           struct flb_fstore_file *fsf)
240
0
{
241
    /* close the Chunk I/O reference, but don't delete it the real file */
242
0
    cio_chunk_close(fsf->chunk, CIO_TRUE);
243
244
    /* release */
245
0
    mk_list_del(&fsf->_head);
246
0
    if (fsf->meta_buf) {
247
0
        flb_free(fsf->meta_buf);
248
0
    }
249
0
    flb_sds_destroy(fsf->name);
250
0
    flb_free(fsf);
251
252
0
    return 0;
253
0
}
254
255
/*
256
 * Set an output buffer that contains a copy of the file. Note that this buffer
257
 * needs to be freed by the caller (heap memory).
258
 */
259
int flb_fstore_file_content_copy(struct flb_fstore *fs,
260
                                 struct flb_fstore_file *fsf,
261
                                 void **out_buf, size_t *out_size)
262
123
{
263
123
    int ret;
264
265
123
    ret = cio_chunk_get_content_copy(fsf->chunk, out_buf, out_size);
266
123
    if (ret == CIO_OK) {
267
123
        return 0;
268
123
    }
269
270
0
    return -1;
271
123
}
272
273
/* Append data to an existing file */
274
int flb_fstore_file_append(struct flb_fstore_file *fsf, void *data, size_t size)
275
123
{
276
123
    int ret;
277
123
    int set_down = FLB_FALSE;
278
279
    /* Check if the chunk is up */
280
123
    if (cio_chunk_is_up(fsf->chunk) == CIO_FALSE) {
281
0
        ret = cio_chunk_up_force(fsf->chunk);
282
0
        if (ret != CIO_OK) {
283
0
            flb_error("[fstore] error loading up file chunk");
284
0
            return -1;
285
0
        }
286
0
        set_down = FLB_TRUE;
287
0
    }
288
289
123
    ret = cio_chunk_write(fsf->chunk, data, size);
290
123
    if (ret != CIO_OK) {
291
0
        flb_error("[fstore] could not write data to file %s", fsf->name);
292
293
0
        if (set_down == FLB_TRUE) {
294
0
            cio_chunk_down(fsf->chunk);
295
0
        }
296
297
0
        return -1;
298
0
    }
299
300
123
    if (set_down == FLB_TRUE) {
301
0
        cio_chunk_down(fsf->chunk);
302
0
    }
303
304
123
    return 0;
305
123
}
306
307
/*
308
 * Create a new stream, if it already exists, it returns the stream
309
 * reference.
310
 */
311
struct flb_fstore_stream *flb_fstore_stream_create(struct flb_fstore *fs,
312
                                                   char *stream_name)
313
128
{
314
128
    flb_sds_t path = NULL;
315
128
    struct mk_list *head;
316
128
    struct cio_ctx *ctx = NULL;
317
128
    struct cio_stream *stream = NULL;
318
128
    struct flb_fstore_stream *fs_stream = NULL;
319
320
128
    ctx = fs->cio;
321
322
    /* Check if the stream already exists in Chunk I/O */
323
128
    mk_list_foreach(head, &ctx->streams) {
324
0
        stream = mk_list_entry(head, struct cio_stream, _head);
325
0
        if (strcmp(stream->name, stream_name) == 0) {
326
0
            break;
327
0
        }
328
0
        stream = NULL;
329
0
    }
330
331
    /* If the stream exists, check if we have a fstore_stream reference */
332
128
    if (stream) {
333
0
        mk_list_foreach(head, &fs->streams) {
334
0
            fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
335
0
            if (fs_stream->stream == stream) {
336
0
                break;
337
0
            }
338
0
            fs_stream = NULL;
339
0
        }
340
341
        /* The stream was found, just return the reference */
342
0
        if (fs_stream) {
343
0
            return fs_stream;
344
0
        }
345
0
    }
346
347
128
    if (!stream) {
348
        /* create file-system based stream */
349
128
        stream = cio_stream_create(fs->cio, stream_name, fs->store_type);
350
128
        if (!stream) {
351
0
            flb_error("[fstore] cannot create stream %s", stream_name);
352
0
            return NULL;
353
0
        }
354
128
    }
355
356
128
    fs_stream = flb_calloc(1, sizeof(struct flb_fstore_stream));
357
128
    if (!fs_stream) {
358
2
        flb_errno();
359
2
        cio_stream_destroy(stream);
360
2
        return NULL;
361
2
    }
362
126
    fs_stream->stream = stream;
363
364
126
    path = flb_sds_create_size(256);
365
126
    if (!path) {
366
1
        cio_stream_destroy(stream);
367
1
        flb_free(fs_stream);
368
1
        return NULL;
369
1
    }
370
125
    path = flb_sds_printf(&path, "%s/%s", fs->root_path, stream->name);
371
125
    fs_stream->path = path;
372
125
    fs_stream->name = stream->name;
373
374
125
    mk_list_init(&fs_stream->files);
375
125
    mk_list_add(&fs_stream->_head, &fs->streams);
376
377
125
    return fs_stream;
378
126
}
379
380
void flb_fstore_stream_destroy(struct flb_fstore_stream *stream, int delete)
381
125
{
382
125
    if (delete == FLB_TRUE) {
383
2
        cio_stream_delete(stream->stream);
384
2
    }
385
386
    /*
387
     * FYI: in this function we just release the fstore_stream context, the
388
     * underlaying cio_stream is closed when the main Chunk I/O is destroyed.
389
     */
390
125
    mk_list_del(&stream->_head);
391
125
    flb_sds_destroy(stream->path);
392
125
    flb_free(stream);
393
125
}
394
395
static int map_chunks(struct flb_fstore *ctx, struct flb_fstore_stream *fs_stream,
396
                      struct cio_stream *stream)
397
0
{
398
0
    struct mk_list *head;
399
0
    struct cio_chunk *chunk;
400
0
    struct flb_fstore_file *fsf;
401
402
0
    mk_list_foreach(head, &stream->chunks) {
403
0
        chunk = mk_list_entry(head, struct cio_chunk, _head);
404
405
0
        fsf = flb_calloc(1, sizeof(struct flb_fstore_file));
406
0
        if (!fsf) {
407
0
            flb_errno();
408
0
            return -1;
409
0
        }
410
0
        fsf->name = flb_sds_create(chunk->name);
411
0
        if (!fsf->name) {
412
0
            flb_free(fsf);
413
0
            flb_error("[fstore] could not create file: %s:%s",
414
0
                      stream->name, chunk->name);
415
0
            return -1;
416
0
        }
417
418
0
        fsf->chunk = chunk;
419
420
        /* load metadata */
421
0
        flb_fstore_file_meta_get(ctx, fsf);
422
0
        mk_list_add(&fsf->_head, &fs_stream->files);
423
0
    }
424
425
0
    return 0;
426
0
}
427
428
static int load_references(struct flb_fstore *fs)
429
128
{
430
128
    int ret;
431
128
    struct mk_list *head;
432
128
    struct cio_stream *stream;
433
128
    struct flb_fstore_stream *fs_stream;
434
435
128
    mk_list_foreach(head, &fs->cio->streams) {
436
0
        stream = mk_list_entry(head, struct cio_stream, _head);
437
0
        fs_stream = flb_fstore_stream_create(fs, stream->name);
438
0
        if (!fs_stream) {
439
0
            flb_error("[fstore] error loading stream reference: %s",
440
0
                      stream->name);
441
0
            return -1;
442
0
        }
443
444
        /* Map chunks */
445
0
        ret = map_chunks(fs, fs_stream, stream);
446
0
        if (ret == -1) {
447
0
            return -1;
448
0
        }
449
0
    }
450
451
128
    return 0;
452
128
}
453
454
struct flb_fstore *flb_fstore_create(char *path, int store_type)
455
131
{
456
131
    int ret;
457
131
    int flags;
458
131
    struct cio_ctx *cio;
459
131
    struct flb_fstore *fs;
460
131
    struct cio_options opts = {0};
461
131
    flags = CIO_OPEN;
462
463
    /* Create Chunk I/O context */
464
131
    cio_options_init(&opts);
465
466
131
    opts.root_path = path;
467
131
    opts.log_cb = log_cb;
468
131
    opts.flags = flags;
469
131
    opts.log_level = CIO_LOG_INFO;
470
471
131
    cio = cio_create(&opts);
472
131
    if (!cio) {
473
0
        flb_error("[fstore] error initializing on path '%s'", path);
474
0
        return NULL;
475
0
    }
476
477
    /* Load content from the file system if any */
478
131
    ret = cio_load(cio, NULL);
479
131
    if (ret == -1) {
480
0
        flb_error("[fstore] error scanning root path content: %s", path);
481
0
        cio_destroy(cio);
482
0
        return NULL;
483
0
    }
484
485
131
    fs = flb_calloc(1, sizeof(struct flb_fstore));
486
131
    if (!fs) {
487
3
        flb_errno();
488
3
        cio_destroy(cio);
489
3
        return NULL;
490
3
    }
491
128
    fs->cio = cio;
492
128
    fs->root_path = cio->options.root_path;
493
128
    fs->store_type = store_type;
494
128
    mk_list_init(&fs->streams);
495
496
    /* Map Chunk I/O streams and chunks into fstore context */
497
128
    load_references(fs);
498
499
128
    return fs;
500
131
}
501
502
int flb_fstore_destroy(struct flb_fstore *fs)
503
128
{
504
128
    int files = 0;
505
128
    int delete;
506
128
    struct mk_list *head;
507
128
    struct mk_list *f_head;
508
128
    struct mk_list *tmp;
509
128
    struct mk_list *f_tmp;
510
128
    struct flb_fstore_stream *fs_stream;
511
128
    struct flb_fstore_file *fsf;
512
513
128
    mk_list_foreach_safe(head, tmp, &fs->streams) {
514
125
        fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
515
516
        /* delete file references */
517
125
        files = 0;
518
125
        mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
519
123
            fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
520
123
            flb_fstore_file_inactive(fs, fsf);
521
123
            files++;
522
123
        }
523
524
125
        if (files == 0) {
525
2
            delete = FLB_TRUE;
526
2
        }
527
123
        else {
528
123
            delete = FLB_FALSE;
529
123
        }
530
531
125
        flb_fstore_stream_destroy(fs_stream, delete);
532
125
    }
533
534
128
    if (fs->cio) {
535
128
        cio_destroy(fs->cio);
536
128
    }
537
128
    flb_free(fs);
538
128
    return 0;
539
128
}
540
541
void flb_fstore_dump(struct flb_fstore *fs)
542
128
{
543
128
    struct mk_list *head;
544
128
    struct mk_list *f_head;
545
128
    struct flb_fstore_stream *fs_stream;
546
128
    struct flb_fstore_file *fsf;
547
548
128
    printf("===== FSTORE DUMP =====\n");
549
128
    mk_list_foreach(head, &fs->streams) {
550
125
        fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
551
125
        printf("- stream: %s\n", fs_stream->name);
552
125
        mk_list_foreach(f_head, &fs_stream->files) {
553
123
            fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
554
123
            printf("          %s/%s\n", fsf->stream->name, fsf->name);
555
123
        }
556
125
    }
557
128
    printf("\n");
558
128
}