Coverage Report

Created: 2025-07-04 07:08

/src/fluent-bit/lib/chunkio/src/cio_chunk.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Chunk I/O
4
 *  =========
5
 *  Copyright 2018 Eduardo Silva <eduardo@monkey.io>
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <chunkio/chunkio_compat.h>
21
#include <chunkio/chunkio.h>
22
#include <chunkio/cio_version.h>
23
#include <chunkio/cio_file.h>
24
#include <chunkio/cio_memfs.h>
25
#include <chunkio/cio_log.h>
26
#include <chunkio/cio_error.h>
27
28
#include <string.h>
29
30
struct cio_chunk *cio_chunk_open(struct cio_ctx *ctx, struct cio_stream *st,
31
                                 const char *name, int flags, size_t size,
32
                                 int *err)
33
328
{
34
328
    int len;
35
328
    void *backend = NULL;
36
328
    struct cio_chunk *ch;
37
38
328
    if (!st) {
39
0
        cio_log_error(ctx, "[cio chunk] invalid stream");
40
0
        return NULL;
41
0
    }
42
43
328
    if (!name) {
44
0
        cio_log_error(ctx, "[cio chunk] invalid file name");
45
0
        return NULL;
46
0
    }
47
48
328
    len = strlen(name);
49
328
    if (len == 0) {
50
0
        cio_log_error(ctx, "[cio chunk] invalid file name");
51
0
        return NULL;
52
0
    }
53
#ifndef CIO_HAVE_BACKEND_FILESYSTEM
54
    if (st->type == CIO_STORE_FS) {
55
        cio_log_error(ctx, "[cio chunk] file system backend not supported");
56
        return NULL;
57
    }
58
#endif
59
60
    /* allocate chunk context */
61
328
    ch = malloc(sizeof(struct cio_chunk));
62
328
    if (!ch) {
63
0
        cio_errno();
64
0
        return NULL;
65
0
    }
66
328
    ch->name = strdup(name);
67
328
    ch->ctx = ctx;
68
328
    ch->st = st;
69
328
    ch->lock = CIO_FALSE;
70
328
    ch->tx_active = CIO_FALSE;
71
328
    ch->tx_crc = 0;
72
328
    ch->tx_content_length = 0;
73
328
    ch->backend = NULL;
74
75
328
    mk_list_add(&ch->_head, &st->chunks);
76
77
328
    cio_error_reset(ch);
78
79
    /* create backend context */
80
328
    if (st->type == CIO_STORE_FS) {
81
328
        backend = cio_file_open(ctx, st, ch, flags, size, err);
82
328
    }
83
0
    else if (st->type == CIO_STORE_MEM) {
84
0
        *err = CIO_OK;
85
0
        backend = cio_memfs_open(ctx, st, ch, flags, size);
86
0
    }
87
88
328
    if (!backend) {
89
0
        mk_list_del(&ch->_head);
90
0
        free(ch->name);
91
0
        free(ch);
92
0
        return NULL;
93
0
    }
94
95
328
    ch->backend = backend;
96
97
    /* Adjust counter */
98
328
    cio_chunk_counter_total_add(ctx);
99
100
    /* Link the chunk state to the proper stream list */
101
328
    if (cio_chunk_is_up(ch) == CIO_TRUE) {
102
328
        mk_list_add(&ch->_state_head, &st->chunks_up);
103
328
    }
104
0
    else {
105
0
        mk_list_add(&ch->_state_head, &st->chunks_down);
106
0
    }
107
108
328
    return ch;
109
328
}
110
111
void cio_chunk_close(struct cio_chunk *ch, int delete)
112
328
{
113
328
    int type;
114
328
    struct cio_ctx *ctx;
115
116
328
    if (!ch) {
117
0
        return;
118
0
    }
119
120
328
    cio_error_reset(ch);
121
122
328
    ctx = ch->ctx;
123
328
    type = ch->st->type;
124
328
    if (type == CIO_STORE_MEM) {
125
0
        cio_memfs_close(ch);
126
0
    }
127
328
    else if (type == CIO_STORE_FS) {
128
328
        cio_file_close(ch, delete);
129
328
    }
130
131
328
    mk_list_del(&ch->_head);
132
328
    mk_list_del(&ch->_state_head);
133
328
    free(ch->name);
134
328
    free(ch);
135
136
    /* Adjust counter */
137
328
    cio_chunk_counter_total_sub(ctx);
138
328
}
139
140
int cio_chunk_delete(struct cio_ctx *ctx, struct cio_stream *st, const char *name)
141
0
{
142
0
    int result;
143
144
0
    if (st == NULL) {
145
0
        cio_log_error(ctx, "[cio chunk] invalid stream");
146
147
0
        return CIO_ERROR;
148
0
    }
149
150
0
    if (name == NULL) {
151
0
        cio_log_error(ctx, "[cio chunk] invalid file name");
152
153
0
        return CIO_ERROR;
154
0
    }
155
156
0
    if (strlen(name) == 0) {
157
0
        cio_log_error(ctx, "[cio chunk] invalid file name");
158
159
0
        return CIO_ERROR;
160
0
    }
161
162
#ifndef CIO_HAVE_BACKEND_FILESYSTEM
163
    if (st->type == CIO_STORE_FS) {
164
        cio_log_error(ctx, "[cio chunk] file system backend not supported");
165
166
        return CIO_ERROR;
167
    }
168
#endif
169
170
0
    if (st->type == CIO_STORE_FS) {
171
0
        result = cio_file_delete(ctx, st, name);
172
0
    }
173
0
    else {
174
0
        result = CIO_ERROR;
175
0
    }
176
177
0
    return result;
178
0
}
179
180
/*
181
 * Write at a specific offset of the content area. Offset must be >= 0 and
182
 * less than current data length.
183
 */
184
int cio_chunk_write_at(struct cio_chunk *ch, off_t offset,
185
                       const void *buf, size_t count)
186
0
{
187
0
    int type;
188
0
    struct cio_memfs *mf;
189
0
    struct cio_file *cf;
190
191
0
    cio_error_reset(ch);
192
193
0
    type = ch->st->type;
194
0
    if (type == CIO_STORE_MEM) {
195
0
        mf = ch->backend;
196
0
        mf->buf_len = offset;
197
0
    }
198
0
    else if (type == CIO_STORE_FS) {
199
0
        cf = ch->backend;
200
0
        cf->data_size = offset;
201
0
        cf->crc_reset = CIO_TRUE;
202
0
    }
203
204
    /*
205
     * By default backends (fs, mem) appends data after the it last position,
206
     * so we just adjust the content size to the given offset.
207
     */
208
0
    return cio_chunk_write(ch, buf, count);
209
0
}
210
211
int cio_chunk_write(struct cio_chunk *ch, const void *buf, size_t count)
212
328
{
213
328
    int ret = 0;
214
328
    int type;
215
216
328
    cio_error_reset(ch);
217
218
328
    type = ch->st->type;
219
328
    if (type == CIO_STORE_MEM) {
220
0
        ret = cio_memfs_write(ch, buf, count);
221
0
    }
222
328
    else if (type == CIO_STORE_FS) {
223
328
        ret = cio_file_write(ch, buf, count);
224
328
    }
225
226
328
    return ret;
227
328
}
228
229
int cio_chunk_sync(struct cio_chunk *ch)
230
202
{
231
202
    int ret = 0;
232
202
    int type;
233
234
202
    cio_error_reset(ch);
235
236
202
    type = ch->st->type;
237
202
    if (type == CIO_STORE_FS) {
238
202
        ret = cio_file_sync(ch);
239
202
    }
240
241
202
    return ret;
242
202
}
243
244
int cio_chunk_get_content(struct cio_chunk *ch, char **buf, size_t *size)
245
202
{
246
202
    int ret = 0;
247
202
    int type;
248
202
    struct cio_memfs *mf;
249
202
    struct cio_file *cf;
250
251
202
    cio_error_reset(ch);
252
253
202
    type = ch->st->type;
254
202
    if (type == CIO_STORE_MEM) {
255
0
        mf = ch->backend;
256
0
        *size = mf->buf_len;
257
0
        *buf = mf->buf_data;
258
0
        return ret;
259
0
    }
260
202
    else if (type == CIO_STORE_FS) {
261
202
        cf = ch->backend;
262
202
        ret = cio_file_read_prepare(ch->ctx, ch);
263
202
        if (ret != CIO_OK) {
264
0
            return ret;
265
0
        }
266
202
        *size = cf->data_size;
267
202
        *buf = cio_file_st_get_content(cf->map);
268
202
        return ret;
269
202
    }
270
271
0
    return CIO_ERROR;
272
202
}
273
274
/* Using the content of the chunk, generate a copy using the heap */
275
int cio_chunk_get_content_copy(struct cio_chunk *ch,
276
                               void **out_buf, size_t *out_size)
277
124
{
278
124
    int type;
279
280
124
    cio_error_reset(ch);
281
282
124
    type = ch->st->type;
283
124
    if (type == CIO_STORE_MEM) {
284
0
        return cio_memfs_content_copy(ch, out_buf, out_size);
285
0
    }
286
124
    else if (type == CIO_STORE_FS) {
287
124
        return cio_file_content_copy(ch, out_buf, out_size);
288
124
    }
289
290
0
    return CIO_ERROR;
291
124
}
292
293
size_t cio_chunk_get_content_end_pos(struct cio_chunk *ch)
294
0
{
295
0
    int type;
296
0
    off_t pos = 0;
297
0
    struct cio_memfs *mf;
298
0
    struct cio_file *cf;
299
300
0
    cio_error_reset(ch);
301
302
0
    type = ch->st->type;
303
0
    if (type == CIO_STORE_MEM) {
304
0
        mf = ch->backend;
305
0
        pos = (off_t) (mf->buf_data + mf->buf_len);
306
0
    }
307
0
    else if (type == CIO_STORE_FS) {
308
0
        cf = ch->backend;
309
0
        pos = (off_t) (cio_file_st_get_content(cf->map) + cf->data_size);
310
0
    }
311
312
0
    return pos;
313
0
}
314
315
ssize_t cio_chunk_get_content_size(struct cio_chunk *ch)
316
1.13k
{
317
1.13k
    int type;
318
1.13k
    struct cio_memfs *mf;
319
1.13k
    struct cio_file *cf;
320
321
1.13k
    cio_error_reset(ch);
322
323
1.13k
    type = ch->st->type;
324
1.13k
    if (type == CIO_STORE_MEM) {
325
0
        mf = ch->backend;
326
0
        return mf->buf_len;
327
0
    }
328
1.13k
    else if (type == CIO_STORE_FS) {
329
1.13k
        cf = ch->backend;
330
1.13k
        return cf->data_size;
331
1.13k
    }
332
333
0
    return -1;
334
1.13k
}
335
336
ssize_t cio_chunk_get_real_size(struct cio_chunk *ch)
337
923
{
338
923
    int type;
339
923
    struct cio_memfs *mf;
340
923
    struct cio_file *cf;
341
342
923
    cio_error_reset(ch);
343
344
923
    type = ch->st->type;
345
923
    if (type == CIO_STORE_MEM) {
346
0
        mf = ch->backend;
347
0
        return mf->buf_len;
348
0
    }
349
923
    else if (type == CIO_STORE_FS) {
350
923
        cf = ch->backend;
351
352
        /* If the file is not open we need to explicitly get its size */
353
923
        if (cf->fs_size == 0) {
354
519
            return cio_file_real_size(cf);
355
519
        }
356
357
404
        return cf->fs_size;
358
923
    }
359
360
0
    return -1;
361
923
}
362
363
void cio_chunk_close_stream(struct cio_stream *st)
364
2.40k
{
365
2.40k
    struct mk_list *tmp;
366
2.40k
    struct mk_list *head;
367
2.40k
    struct cio_chunk *ch;
368
369
2.40k
    mk_list_foreach_safe(head, tmp, &st->chunks) {
370
0
        ch = mk_list_entry(head, struct cio_chunk, _head);
371
0
        cio_chunk_close(ch, CIO_FALSE);
372
0
    }
373
2.40k
}
374
375
char *cio_chunk_hash(struct cio_chunk *ch)
376
0
{
377
0
    if (ch->st->type == CIO_STORE_FS) {
378
0
        return cio_file_hash(ch->backend);
379
0
    }
380
381
0
    return NULL;
382
0
}
383
384
int cio_chunk_lock(struct cio_chunk *ch)
385
202
{
386
202
    cio_error_reset(ch);
387
388
202
    if (ch->lock == CIO_TRUE) {
389
0
        return CIO_ERROR;
390
0
    }
391
392
202
    ch->lock = CIO_TRUE;
393
394
202
    if (cio_chunk_is_up(ch) == CIO_TRUE) {
395
202
        return cio_chunk_sync(ch);
396
202
    }
397
398
0
    return CIO_OK;
399
202
}
400
401
int cio_chunk_unlock(struct cio_chunk *ch)
402
0
{
403
0
    cio_error_reset(ch);
404
405
0
    if (ch->lock == CIO_FALSE) {
406
0
        return CIO_ERROR;
407
0
    }
408
409
0
    ch->lock = CIO_FALSE;
410
0
    return CIO_OK;
411
0
}
412
413
int cio_chunk_is_locked(struct cio_chunk *ch)
414
113
{
415
113
    return ch->lock;
416
113
}
417
418
/*
419
 * Start a transaction context: it keep a state of the current calculated
420
 * CRC32 (if enabled) and the current number of bytes in the content
421
 * area.
422
 */
423
int cio_chunk_tx_begin(struct cio_chunk *ch)
424
0
{
425
0
    int type;
426
0
    struct cio_memfs *mf;
427
0
    struct cio_file *cf;
428
429
0
    cio_error_reset(ch);
430
431
0
    if (cio_chunk_is_locked(ch)) {
432
0
        return CIO_RETRY;
433
0
    }
434
435
0
    if (ch->tx_active == CIO_TRUE) {
436
0
        return CIO_OK;
437
0
    }
438
439
0
    ch->tx_active = CIO_TRUE;
440
0
    type = ch->st->type;
441
0
    if (type == CIO_STORE_MEM) {
442
0
        mf = ch->backend;
443
0
        ch->tx_crc = mf->crc_cur;
444
0
        ch->tx_content_length = mf->buf_len;
445
0
    }
446
0
    else if (type == CIO_STORE_FS) {
447
0
        cf = ch->backend;
448
0
        ch->tx_crc = cf->crc_cur;
449
0
        ch->tx_content_length = cf->data_size;
450
0
    }
451
452
0
    return CIO_OK;
453
0
}
454
455
/*
456
 * Commit transaction changes, reset transaction context and leave new
457
 * changes in place.
458
 */
459
int cio_chunk_tx_commit(struct cio_chunk *ch)
460
0
{
461
0
    int ret;
462
463
0
    cio_error_reset(ch);
464
465
0
    ret = cio_chunk_sync(ch);
466
0
    if (ret == -1) {
467
0
        return CIO_ERROR;
468
0
    }
469
470
0
    ch->tx_active = CIO_FALSE;
471
0
    return CIO_OK;
472
0
}
473
474
/*
475
 * Drop changes done since a transaction was initiated */
476
int cio_chunk_tx_rollback(struct cio_chunk *ch)
477
0
{
478
0
    int type;
479
0
    struct cio_memfs *mf;
480
0
    struct cio_file *cf;
481
482
0
    cio_error_reset(ch);
483
484
0
    if (ch->tx_active == CIO_FALSE) {
485
0
        return -1;
486
0
    }
487
488
0
    type = ch->st->type;
489
0
    if (type == CIO_STORE_MEM) {
490
0
        mf = ch->backend;
491
0
        mf->crc_cur = ch->tx_crc;
492
0
        mf->buf_len = ch->tx_content_length;
493
0
    }
494
0
    else if (type == CIO_STORE_FS) {
495
0
        cf = ch->backend;
496
0
        cf->crc_cur = ch->tx_crc;
497
0
        cf->data_size = ch->tx_content_length;
498
0
    }
499
500
0
    ch->tx_active = CIO_FALSE;
501
0
    return CIO_OK;
502
0
}
503
504
/*
505
 * Determinate if a Chunk content is available in memory for I/O operations. For
506
 * Memory backend this is always true, for Filesystem backend it checks if the
507
 * memory map exists and file descriptor is open.
508
 */
509
int cio_chunk_is_up(struct cio_chunk *ch)
510
3.01k
{
511
3.01k
    int type;
512
3.01k
    struct cio_file *cf;
513
514
3.01k
    type = ch->st->type;
515
3.01k
    if (type == CIO_STORE_MEM) {
516
0
        return CIO_TRUE;
517
0
    }
518
3.01k
    else if (type == CIO_STORE_FS) {
519
3.01k
        cf = ch->backend;
520
3.01k
        return cio_file_is_up(ch, cf);
521
3.01k
    }
522
523
0
    return CIO_FALSE;
524
3.01k
}
525
526
int cio_chunk_is_file(struct cio_chunk *ch)
527
0
{
528
0
    int type;
529
530
0
    type = ch->st->type;
531
0
    if (type == CIO_STORE_FS) {
532
0
        return CIO_TRUE;
533
0
    }
534
535
0
    return CIO_FALSE;
536
0
}
537
538
static inline void chunk_state_sync(struct cio_chunk *ch)
539
404
{
540
404
    struct cio_stream *st;
541
542
404
    if (!ch) {
543
0
        return;
544
0
    }
545
546
404
    mk_list_del(&ch->_state_head);
547
404
    st = ch->st;
548
404
    if (cio_chunk_is_up(ch) == CIO_TRUE) {
549
202
        mk_list_add(&ch->_state_head, &st->chunks_up);
550
202
    }
551
202
    else {
552
202
        mk_list_add(&ch->_state_head, &st->chunks_down);
553
202
    }
554
404
}
555
556
int cio_chunk_down(struct cio_chunk *ch)
557
202
{
558
202
    int ret;
559
202
    int type;
560
561
202
    cio_error_reset(ch);
562
563
202
    type = ch->st->type;
564
202
    if (type == CIO_STORE_FS) {
565
202
        ret = cio_file_down(ch);
566
202
        chunk_state_sync(ch);
567
202
        return ret;
568
202
    }
569
570
0
    return CIO_OK;
571
202
}
572
573
int cio_chunk_up(struct cio_chunk *ch)
574
202
{
575
202
    int ret;
576
202
    int type;
577
578
202
    cio_error_reset(ch);
579
580
202
    type = ch->st->type;
581
202
    if (type == CIO_STORE_FS) {
582
202
        ret = cio_file_up(ch);
583
202
        chunk_state_sync(ch);
584
202
        return ret;
585
202
    }
586
587
0
    return CIO_OK;
588
202
}
589
590
int cio_chunk_up_force(struct cio_chunk *ch)
591
0
{
592
0
    int ret;
593
0
    int type;
594
595
0
    cio_error_reset(ch);
596
597
0
    type = ch->st->type;
598
0
    if (type == CIO_STORE_FS) {
599
0
        ret = cio_file_up_force(ch);
600
0
        chunk_state_sync(ch);
601
0
        return ret;
602
0
    }
603
604
0
    return CIO_OK;
605
0
}
606
607
char *cio_version()
608
2
{
609
2
    return CIO_VERSION_STR;
610
2
}
611
612
/*
613
 * Counters API
614
 */
615
616
/* Increase the number of total chunks registered (+1) */
617
size_t cio_chunk_counter_total_add(struct cio_ctx *ctx)
618
328
{
619
328
    ctx->total_chunks++;
620
328
    return ctx->total_chunks;
621
328
}
622
623
/* Decrease the total number of chunks (-1) */
624
size_t cio_chunk_counter_total_sub(struct cio_ctx *ctx)
625
328
{
626
328
    ctx->total_chunks--;
627
328
    return ctx->total_chunks;
628
328
}
629
630
/* Increase the number of total chunks up in memory (+1) */
631
size_t cio_chunk_counter_total_up_add(struct cio_ctx *ctx)
632
530
{
633
530
    ctx->total_chunks_up++;
634
530
    return ctx->total_chunks_up;
635
530
}
636
637
/* Decrease the total number of chunks up in memory (-1) */
638
size_t cio_chunk_counter_total_up_sub(struct cio_ctx *ctx)
639
530
{
640
530
    ctx->total_chunks_up--;
641
530
    return ctx->total_chunks_up;
642
530
}