Coverage Report

Created: 2025-01-28 06:34

/src/fluent-bit/src/flb_mp.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_utils.h>
22
#include <fluent-bit/flb_mem.h>
23
#include <fluent-bit/flb_log.h>
24
#include <fluent-bit/flb_mp.h>
25
#include <fluent-bit/flb_mp_chunk.h>
26
27
#include <fluent-bit/flb_slist.h>
28
#include <fluent-bit/flb_record_accessor.h>
29
#include <fluent-bit/flb_metrics.h>
30
31
#include <fluent-bit/flb_log_event_encoder.h>
32
#include <fluent-bit/flb_log_event_decoder.h>
33
34
#include <msgpack.h>
35
#include <mpack/mpack.h>
36
37
/* don't do this at home */
38
0
#define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
39
0
#define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
40
41
/* Return the number of msgpack serialized events in the buffer */
42
int flb_mp_count(const void *data, size_t bytes)
43
0
{
44
0
    return flb_mp_count_remaining(data, bytes, NULL);
45
0
}
46
47
int flb_mp_count_remaining(const void *data, size_t bytes, size_t *remaining_bytes)
48
0
{
49
0
    size_t remaining;
50
0
    int count = 0;
51
0
    mpack_reader_t reader;
52
53
0
    mpack_reader_init_data(&reader, (const char *) data, bytes);
54
0
    for (;;) {
55
0
        remaining = mpack_reader_remaining(&reader, NULL);
56
0
        if (!remaining) {
57
0
            break;
58
0
        }
59
0
        mpack_discard(&reader);
60
0
        if (mpack_reader_error(&reader)) {
61
0
            break;
62
0
        }
63
0
        count++;
64
0
    }
65
66
0
    if (remaining_bytes) {
67
0
        *remaining_bytes = remaining;
68
0
    }
69
0
    mpack_reader_destroy(&reader);
70
0
    return count;
71
0
}
72
73
int flb_mp_validate_metric_chunk(const void *data, size_t bytes,
74
                                 int *out_series, size_t *processed_bytes)
75
0
{
76
0
    int ret;
77
0
    int ok = CMT_DECODE_MSGPACK_SUCCESS;
78
0
    int count = 0;
79
0
    size_t off = 0;
80
0
    size_t pre_off = 0;
81
0
    struct cmt *cmt;
82
83
0
    while ((ret = cmt_decode_msgpack_create(&cmt,
84
0
                                            (char *) data, bytes, &off)) == ok) {
85
0
        cmt_destroy(cmt);
86
0
        count++;
87
0
        pre_off = off;
88
0
    }
89
90
0
    switch (ret) {
91
0
        case CMT_DECODE_MSGPACK_INVALID_ARGUMENT_ERROR:
92
0
        case CMT_DECODE_MSGPACK_CORRUPT_INPUT_DATA_ERROR:
93
0
        case CMT_DECODE_MSGPACK_CONSUME_ERROR:
94
0
        case CMT_DECODE_MSGPACK_ENGINE_ERROR:
95
0
        case CMT_DECODE_MSGPACK_PENDING_MAP_ENTRIES:
96
0
        case CMT_DECODE_MSGPACK_PENDING_ARRAY_ENTRIES:
97
0
        case CMT_DECODE_MSGPACK_UNEXPECTED_KEY_ERROR:
98
0
        case CMT_DECODE_MSGPACK_UNEXPECTED_DATA_TYPE_ERROR:
99
0
        case CMT_DECODE_MSGPACK_DICTIONARY_LOOKUP_ERROR:
100
0
        case CMT_DECODE_MSGPACK_VERSION_ERROR:
101
0
            goto error;
102
0
    }
103
104
0
    if (ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && off == bytes) {
105
0
        *out_series = count;
106
0
        *processed_bytes = pre_off;
107
0
        return 0;
108
0
    }
109
110
0
error:
111
0
    *out_series = count;
112
0
    *processed_bytes = pre_off;
113
114
0
    return -1;
115
0
}
116
117
int flb_mp_validate_log_chunk(const void *data, size_t bytes,
118
                              int *out_records, size_t *processed_bytes)
119
0
{
120
0
    int ret;
121
0
    int count = 0;
122
0
    size_t off = 0;
123
0
    size_t pre_off = 0;
124
0
    size_t ptr_size;
125
0
    unsigned char *ptr;
126
0
    msgpack_object array;
127
0
    msgpack_object ts;
128
0
    msgpack_object header;
129
0
    msgpack_object record;
130
0
    msgpack_object metadata;
131
0
    msgpack_unpacked result;
132
133
0
    msgpack_unpacked_init(&result);
134
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
135
0
        array = result.data;
136
137
0
        if (array.type != MSGPACK_OBJECT_ARRAY) {
138
            /*
139
             * Sometimes there is a special case: Chunks might have extra zero
140
             * bytes at the end of a record, meaning: no more records. This is not
141
             * an error and actually it happens if a previous run of Fluent Bit
142
             * was stopped/killed before to adjust the file size.
143
             *
144
             * Just validate if all bytes are zero, if so, adjust counters
145
             * and return zero.
146
             */
147
0
            ptr = (unsigned char *) (data);
148
0
            ptr += pre_off;
149
0
            if (ptr[0] != 0) {
150
0
                goto error;
151
0
            }
152
153
0
            ptr_size = bytes - pre_off;
154
0
            ret = memcmp(ptr, ptr + 1, ptr_size - 1);
155
0
            if (ret == 0) {
156
                /*
157
                 * The chunk is valid, just let the caller know the last processed
158
                 * valid byte.
159
                 */
160
0
                msgpack_unpacked_destroy(&result);
161
0
                *out_records = count;
162
0
                *processed_bytes = pre_off;
163
0
                return 0;
164
0
            }
165
0
            goto error;
166
0
        }
167
168
0
        if (array.via.array.size != 2) {
169
0
            goto error;
170
0
        }
171
172
0
        header = array.via.array.ptr[0];
173
0
        record = array.via.array.ptr[1];
174
175
0
        if (header.type == MSGPACK_OBJECT_ARRAY) {
176
0
            if (header.via.array.size != 2) {
177
0
                goto error;
178
0
            }
179
180
0
            ts = header.via.array.ptr[0];
181
0
            metadata = header.via.array.ptr[1];
182
183
0
            if (metadata.type != MSGPACK_OBJECT_MAP) {
184
0
                goto error;
185
0
            }
186
0
        }
187
0
        else {
188
0
            ts = header;
189
0
        }
190
191
0
        if (ts.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
192
0
            ts.type != MSGPACK_OBJECT_FLOAT &&
193
0
            ts.type != MSGPACK_OBJECT_EXT) {
194
0
            goto error;
195
0
        }
196
197
0
        if (record.type != MSGPACK_OBJECT_MAP) {
198
0
            goto error;
199
0
        }
200
201
0
        count++;
202
0
        pre_off = off;
203
0
    }
204
205
0
    msgpack_unpacked_destroy(&result);
206
0
    *out_records = count;
207
0
    *processed_bytes = pre_off;
208
0
    return 0;
209
210
0
 error:
211
0
    msgpack_unpacked_destroy(&result);
212
0
    *out_records = count;
213
0
    *processed_bytes = pre_off;
214
215
0
    return -1;
216
0
}
217
218
/* Adjust a mspack header buffer size */
219
void flb_mp_set_map_header_size(char *buf, int size)
220
0
{
221
0
    uint8_t h;
222
0
    char *tmp = buf;
223
224
0
    h = tmp[0];
225
0
    if (h >> 4 == 0x8) { /* 1000xxxx */
226
0
        *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) size);
227
0
    }
228
0
    else if (h == 0xde) {
229
0
        tmp++;
230
0
        pack_uint16(tmp, size);
231
0
    }
232
0
    else if (h == 0xdf) {
233
0
        tmp++;
234
0
        pack_uint32(tmp, size);
235
0
    }
236
0
}
237
238
void flb_mp_set_array_header_size(char *buf, int size)
239
0
{
240
0
    uint8_t h;
241
0
    char *tmp = buf;
242
243
0
    h = tmp[0];
244
0
    if (h >> 4 == 0x9) { /* 1001xxxx */
245
0
        *tmp = (uint8_t) 0x9 << 4 | ((uint8_t) size);
246
0
    }
247
0
    else if (h == 0xdc) {
248
0
        tmp++;
249
0
        pack_uint16(tmp, size);
250
0
    }
251
0
    else if (h == 0xdd) {
252
0
        tmp++;
253
0
        pack_uint32(tmp, size);
254
0
    }
255
0
}
256
257
/*
258
 * msgpack-c requires to set the number of the entries in a map beforehand. For our
259
 * use case this adds some complexity, having developers to count all possible
260
 * entries that might be added.
261
 *
262
 * As a workaround and to avoid map's recomposition over and over, this simple API
263
 * allows to initialize the array header, 'register' new entries (as counters) and
264
 * finalize, upon finalization the proper array header size is adjusted.
265
 *
266
 * To make things easier, we make sure msgpack-c always register an array type of
267
 * 32 bits (identified by 0xdf, for number of entries >= 65536). Yes, for every
268
 * array using this API it will use 2 more bytes, not a big ideal. So whoever
269
 * uses this API, use it only if you don't know the exact number of entries to add.
270
 *
271
 * MANDATORY: make sure to always initialize, register every entry and finalize,
272
 * otherwise you will get a corrupted or incomplete msgpack buffer.
273
 *
274
 * Usage example
275
 * =============
276
 *
277
 *  struct flb_mp_map_header mh;
278
 *
279
 *  flb_mp_map_header_init(&mh, mp_pck);
280
 *
281
 *  -- First key/value entry --
282
 *  flb_mp_map_header_append(&mh);
283
 *  msgpack_pack_str(mp_pck, 4);
284
 *  msgpack_pack_str_body(mp_pck, "cool", 4);
285
 *  msgpack_pack_true(mp_pck);
286
 *
287
 *  -- Second key/value entry --
288
 *  flb_mp_map_header_append(&mh);
289
 *  msgpack_pack_str(mp_pck, 4);
290
 *  msgpack_pack_str_body(mp_pck, "slow", 4);
291
 *  msgpack_pack_false(mp_pck);
292
 *
293
 *  -- Finalize Map --
294
 *  flb_mp_map_header_end(&mh);
295
 */
296
297
static inline void mp_header_type_init(struct flb_mp_map_header *mh,
298
                                       msgpack_packer *mp_pck,
299
                                       int type)
300
0
{
301
0
    msgpack_sbuffer *mp_sbuf;
302
303
0
    mp_sbuf = (msgpack_sbuffer *) mp_pck->data;
304
305
    /* map sbuffer */
306
0
    mh->data = mp_pck->data;
307
308
    /* Reset entries */
309
0
    mh->entries = 0;
310
311
    /* Store the next byte available */
312
0
    mh->offset = mp_sbuf->size;
313
0
}
314
315
int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
316
0
{
317
    /* Initialize context for a map */
318
0
    mp_header_type_init(mh, mp_pck, FLB_MP_MAP);
319
320
    /*
321
     * Pack a map with size = 65536, so we force the underlaying msgpack-c
322
     * to use a 32 bit buffer size (0xdf), reference:
323
     *
324
     * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
325
     */
326
0
    return msgpack_pack_map(mp_pck, 65536);
327
0
}
328
329
int flb_mp_array_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
330
0
{
331
    /* Initialize context for a map */
332
0
    mp_header_type_init(mh, mp_pck, FLB_MP_ARRAY);
333
334
    /*
335
     * Pack a map with size = 65536, so we force the underlaying msgpack-c
336
     * to use a 32 bit buffer size (0xdf), reference:
337
     *
338
     * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
339
     */
340
0
    return msgpack_pack_array(mp_pck, 65536);
341
0
}
342
343
344
int flb_mp_map_header_append(struct flb_mp_map_header *mh)
345
0
{
346
0
    mh->entries++;
347
0
    return mh->entries;
348
0
}
349
350
int flb_mp_array_header_append(struct flb_mp_map_header *mh)
351
0
{
352
0
    mh->entries++;
353
0
    return mh->entries;
354
0
}
355
356
void flb_mp_map_header_end(struct flb_mp_map_header *mh)
357
0
{
358
0
    char *ptr;
359
0
    msgpack_sbuffer *mp_sbuf;
360
361
0
    mp_sbuf = mh->data;
362
0
    ptr = (char *) mp_sbuf->data + mh->offset;
363
0
    flb_mp_set_map_header_size(ptr, mh->entries);
364
0
}
365
366
void flb_mp_array_header_end(struct flb_mp_map_header *mh)
367
0
{
368
0
    char *ptr;
369
0
    msgpack_sbuffer *mp_sbuf;
370
371
0
    mp_sbuf = mh->data;
372
0
    ptr = (char *) mp_sbuf->data + mh->offset;
373
0
    flb_mp_set_array_header_size(ptr, mh->entries);
374
0
}
375
376
static int insert_by_subkey_count(struct flb_record_accessor *ra, struct flb_mp_accessor *mpa)
377
0
{
378
0
    int subkey_count;
379
0
    int count;
380
0
    struct mk_list *head;
381
0
    struct flb_mp_accessor_ra *val_ra;
382
0
    struct flb_mp_accessor_ra *mp_ra;
383
384
0
    mp_ra = flb_calloc(1, sizeof(struct flb_mp_accessor_ra));
385
0
    if (!mp_ra) {
386
0
        flb_errno();
387
0
        return -1;
388
0
    }
389
0
    mp_ra->is_active = FLB_TRUE;
390
0
    mp_ra->ra = ra;
391
392
    /*
393
     * sort flb_record_accessor by number of subkey
394
     *
395
     *  e.g.
396
     *    $kubernetes
397
     *    $kubernetes[2]['a']
398
     *    $kubernetes[2]['annotations']['fluentbit.io/tag']
399
     */
400
0
    subkey_count = flb_ra_subkey_count(ra);
401
0
    mk_list_foreach(head, &mpa->ra_list) {
402
0
        val_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
403
0
        count = flb_ra_subkey_count(val_ra->ra);
404
0
        if (count >=  subkey_count) {
405
0
            mk_list_add_before(&mp_ra->_head, &val_ra->_head, &mpa->ra_list);
406
0
            return 0;
407
0
        }
408
0
    }
409
410
    /* add to tail of list */
411
0
    mk_list_add(&mp_ra->_head, &mpa->ra_list);
412
0
    return 0;
413
0
}
414
415
/* Set the active status for all record accessor patterns */
416
void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status)
417
0
{
418
0
    struct mk_list *head;
419
0
    struct flb_mp_accessor_ra *mp_ra;
420
421
0
    mk_list_foreach(head, &mpa->ra_list) {
422
0
        mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
423
0
        mp_ra->is_active = status;
424
0
    }
425
0
}
426
427
/* Set the active status for a specific record accessor pattern */
428
int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa,
429
                                          const char *pattern, int status)
430
0
{
431
0
    int len;
432
0
    struct mk_list *head;
433
0
    struct flb_mp_accessor_ra *mp_ra;
434
435
0
    len = strlen(pattern);
436
437
0
    mk_list_foreach(head, &mpa->ra_list) {
438
0
        mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
439
440
0
        if (len != flb_sds_len(mp_ra->ra->pattern)) {
441
0
            continue;
442
0
        }
443
444
0
        if (strcmp(mp_ra->ra->pattern, pattern) == 0) {
445
0
            mp_ra->is_active = status;
446
0
            return 0;
447
0
        }
448
0
    }
449
450
0
    return -1;
451
0
}
452
453
/*
454
 * Create an 'mp accessor' context: this context allows to create a list of
455
 * record accessor patterns based on a 'slist' context, where every slist string
456
 * buffer represents a key accessor.
457
 */
458
struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
459
0
{
460
0
    size_t size;
461
0
    struct mk_list *head;
462
0
    struct flb_slist_entry *entry;
463
0
    struct flb_record_accessor *ra;
464
0
    struct flb_mp_accessor *mpa;
465
466
    /* Allocate context */
467
0
    mpa = flb_calloc(1, sizeof(struct flb_mp_accessor));
468
0
    if (!mpa) {
469
0
        flb_errno();
470
0
        return NULL;
471
0
    }
472
0
    mk_list_init(&mpa->ra_list);
473
474
0
    mk_list_foreach(head, slist_patterns) {
475
0
        entry = mk_list_entry(head, struct flb_slist_entry, _head);
476
477
        /* Create the record accessor context */
478
0
        ra = flb_ra_create(entry->str, FLB_TRUE);
479
0
        if (!ra) {
480
0
            flb_error("[mp accessor] could not create entry for pattern '%s'",
481
0
                      entry->str);
482
0
            flb_mp_accessor_destroy(mpa);
483
0
            return NULL;
484
0
        }
485
0
        insert_by_subkey_count(ra, mpa);
486
0
    }
487
488
0
    if (mk_list_size(&mpa->ra_list) == 0) {
489
0
        return mpa;
490
0
    }
491
492
0
    size = sizeof(struct flb_mp_accessor_match) * mk_list_size(&mpa->ra_list);
493
0
    mpa->matches_size = size;
494
0
    mpa->matches = flb_calloc(1, size);
495
0
    if (!mpa->matches) {
496
0
        flb_errno();
497
0
        flb_mp_accessor_destroy(mpa);
498
0
        return NULL;
499
0
    }
500
501
0
    return mpa;
502
0
}
503
504
static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
505
                                          msgpack_object *key)
506
0
{
507
0
    int i;
508
0
    int count;
509
0
    struct flb_mp_accessor_match *match;
510
511
0
    count = mk_list_size(&mpa->ra_list);
512
0
    for (i = 0; i < count; i++) {
513
0
        match = &mpa->matches[i];
514
0
        if (match->matched == FLB_FALSE) {
515
0
            continue;
516
0
        }
517
518
0
        if (match->start_key == key) {
519
0
            return i;
520
0
        }
521
0
    }
522
523
0
    return -1;
524
0
}
525
526
static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
527
                                    msgpack_packer *mp_pck,
528
                                    msgpack_object *key,
529
                                    msgpack_object *val)
530
0
{
531
0
    int i;
532
0
    int ret;
533
0
    msgpack_object *k;
534
0
    msgpack_object *v;
535
0
    struct flb_mp_map_header mh;
536
537
0
    if (match->key == key || match->key == val) {
538
0
        return FLB_FALSE;
539
0
    }
540
541
0
    if (key) {
542
0
        msgpack_pack_object(mp_pck, *key);
543
0
    }
544
545
0
    if (val->type == MSGPACK_OBJECT_MAP) {
546
0
        flb_mp_map_header_init(&mh, mp_pck);
547
0
        for (i = 0; i < val->via.map.size; i++) {
548
0
            k = &val->via.map.ptr[i].key;
549
0
            v = &val->via.map.ptr[i].val;
550
551
0
            ret = accessor_sub_pack(match, mp_pck, k, v);
552
0
            if (ret == FLB_TRUE) {
553
0
                flb_mp_map_header_append(&mh);
554
0
            }
555
0
        }
556
0
        flb_mp_map_header_end(&mh);
557
0
    }
558
0
    else if (val->type == MSGPACK_OBJECT_ARRAY) {
559
0
        flb_mp_array_header_init(&mh, mp_pck);
560
0
        for (i = 0; i < val->via.array.size; i++) {
561
0
            v = &val->via.array.ptr[i];
562
0
            ret = accessor_sub_pack(match, mp_pck, NULL, v);
563
0
            if (ret == FLB_TRUE) {
564
0
                flb_mp_array_header_append(&mh);
565
0
            }
566
0
        }
567
0
        flb_mp_array_header_end(&mh);
568
0
    }
569
0
    else {
570
0
        msgpack_pack_object(mp_pck, *val);
571
0
    }
572
573
0
    return FLB_TRUE;
574
0
}
575
576
/*
577
 * Remove keys or nested keys from a map. It compose the final result in a
578
 * new buffer. On error, it returns -1, if the map was modified it returns FLB_TRUE,
579
 * if no modification was required it returns FLB_FALSE.
580
 */
581
int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
582
                                msgpack_object *map,
583
                                void **out_buf, size_t *out_size)
584
0
{
585
0
    int i;
586
0
    int ret;
587
0
    int rule_id = 0;
588
0
    int matches = 0;
589
0
    msgpack_object *key;
590
0
    msgpack_object *val;
591
0
    msgpack_object *s_key;
592
0
    msgpack_object *o_key;
593
0
    msgpack_object *o_val;
594
0
    struct mk_list *head;
595
0
    struct flb_mp_accessor_match *match;
596
0
    struct flb_mp_accessor_ra *mp_ra;
597
0
    struct flb_mp_map_header mh;
598
0
    msgpack_sbuffer mp_sbuf;
599
0
    msgpack_packer mp_pck;
600
601
0
    if (map->via.map.size == 0) {
602
0
        return FLB_FALSE;
603
0
    }
604
605
    /* Reset matches cache */
606
0
    memset(mpa->matches, '\0', mpa->matches_size);
607
608
0
    mk_list_foreach(head, &mpa->ra_list) {
609
0
        mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
610
611
0
        if (mp_ra->is_active == FLB_FALSE) {
612
0
            rule_id++;
613
0
            continue;
614
0
        }
615
616
        /* Apply the record accessor rule against the map */
617
0
        ret = flb_ra_get_kv_pair(mp_ra->ra, *map, &s_key, &o_key, &o_val);
618
0
        if (ret == 0) {
619
            /* There is a match, register in the matches table */
620
0
            match = &mpa->matches[rule_id];
621
0
            match->matched = FLB_TRUE;
622
0
            match->start_key = s_key;        /* Initial key path that matched */
623
0
            match->key = o_key;              /* Final key that matched */
624
0
            match->val = o_val;              /* Final value */
625
0
            match->ra = mp_ra->ra;           /* Record accessor context */
626
0
            matches++;
627
0
        }
628
0
        rule_id++;
629
0
    }
630
631
    /* If no matches, no modifications were made */
632
0
    if (matches == 0) {
633
0
        return FLB_FALSE;
634
0
    }
635
636
    /* Some rules matched, compose a new outgoing buffer */
637
0
    msgpack_sbuffer_init(&mp_sbuf);
638
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
639
640
    /* Initialize map */
641
0
    flb_mp_map_header_init(&mh, &mp_pck);
642
643
0
    for (i = 0; i < map->via.map.size; i++) {
644
0
        key = &map->via.map.ptr[i].key;
645
0
        val = &map->via.map.ptr[i].val;
646
647
        /*
648
         * For every entry on the path, check if we should do a step-by-step
649
         * repackaging or just pack the whole object.
650
         *
651
         * Just check: does this 'key' exists on any path of the record
652
         * accessor patterns ?
653
         *
654
         * Find if the active key in the map, matches an accessor rule, if
655
         * if match we get the match id as return value, otherwise -1.
656
         */
657
0
        ret = accessor_key_find_match(mpa, key);
658
0
        if (ret == -1) {
659
            /* No matches, it's ok to pack the kv pair */
660
0
            flb_mp_map_header_append(&mh);
661
0
            msgpack_pack_object(&mp_pck, *key);
662
0
            msgpack_pack_object(&mp_pck, *val);
663
0
        }
664
0
        else {
665
            /* The key has a match. Now we do a step-by-step packaging */
666
0
            match = &mpa->matches[ret];
667
0
            ret = accessor_sub_pack(match, &mp_pck, key, val);
668
0
            if (ret == FLB_TRUE) {
669
0
                flb_mp_map_header_append(&mh);
670
0
            }
671
0
        }
672
0
    }
673
0
    flb_mp_map_header_end(&mh);
674
675
0
    *out_buf = mp_sbuf.data;
676
0
    *out_size = mp_sbuf.size;
677
678
0
    return FLB_TRUE;
679
0
}
680
681
void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
682
0
{
683
0
    struct mk_list *tmp;
684
0
    struct mk_list *head;
685
0
    struct flb_mp_accessor_ra *mp_ra;
686
687
0
    if (!mpa) {
688
0
        return;
689
0
    }
690
691
0
    mk_list_foreach_safe(head, tmp, &mpa->ra_list) {
692
0
        mp_ra = mk_list_entry(head, struct flb_mp_accessor_ra, _head);
693
0
        mk_list_del(&mp_ra->_head);
694
0
        flb_ra_destroy(mp_ra->ra);
695
0
        flb_free(mp_ra);
696
0
    }
697
698
0
    if (mpa->matches) {
699
0
        flb_free(mpa->matches);
700
0
    }
701
702
0
    flb_free(mpa);
703
0
}
704
705
706
static int mp_object_to_cfl(void **ptr, msgpack_object *o)
707
0
{
708
0
    int i;
709
0
    int ret = -1;
710
0
    struct cfl_array *array;
711
0
    struct cfl_kvlist *kvlist;
712
0
    void *var;
713
0
    msgpack_object key;
714
0
    msgpack_object val;
715
716
0
    switch(o->type) {
717
0
    case MSGPACK_OBJECT_NIL:
718
0
        var = cfl_variant_create_from_null();
719
0
        if (!var) {
720
0
            return -1;
721
0
        }
722
0
        *ptr = var;
723
0
        ret = CFL_OBJECT_VARIANT;
724
0
        break;
725
0
    case MSGPACK_OBJECT_BOOLEAN:
726
0
        var = cfl_variant_create_from_bool(o->via.boolean);
727
0
        if (!var) {
728
0
            return -1;
729
0
        }
730
0
        *ptr = var;
731
0
        ret = CFL_OBJECT_VARIANT;
732
0
        break;
733
0
    case MSGPACK_OBJECT_POSITIVE_INTEGER:
734
0
        var = cfl_variant_create_from_uint64(o->via.u64);
735
0
        if (!var) {
736
0
            return -1;
737
0
        }
738
0
        *ptr = var;
739
0
        ret = CFL_OBJECT_VARIANT;
740
0
        break;
741
0
    case MSGPACK_OBJECT_NEGATIVE_INTEGER:
742
0
        var = cfl_variant_create_from_int64(o->via.i64);
743
0
        if (!var) {
744
0
            return -1;
745
0
        }
746
0
        *ptr = var;
747
0
        ret = CFL_OBJECT_VARIANT;
748
0
        break;
749
0
    case MSGPACK_OBJECT_FLOAT32:
750
0
    case MSGPACK_OBJECT_FLOAT64:
751
0
        var = cfl_variant_create_from_double(o->via.f64);
752
0
        if (!var) {
753
0
            return -1;
754
0
        }
755
0
        *ptr = var;
756
0
        ret = CFL_OBJECT_VARIANT;
757
0
        break;
758
0
    case MSGPACK_OBJECT_STR:
759
0
        var = cfl_variant_create_from_string_s((char *) o->via.str.ptr,
760
0
                                               o->via.str.size, CFL_TRUE);
761
0
        if (!var) {
762
0
            return -1;
763
0
        }
764
0
        *ptr = var;
765
0
        ret = CFL_OBJECT_VARIANT;
766
0
        break;
767
0
    case MSGPACK_OBJECT_BIN:
768
0
        var = cfl_variant_create_from_bytes((char *) o->via.str.ptr,
769
0
                                            o->via.str.size, CFL_TRUE);
770
0
        if (!var) {
771
0
            return -1;
772
0
        }
773
774
0
        *ptr = var;
775
0
        ret = CFL_OBJECT_VARIANT;
776
0
        break;
777
0
    case MSGPACK_OBJECT_EXT:
778
        /* we do not pack extension type content */
779
0
        *ptr = NULL;
780
0
        ret = CFL_OBJECT_NONE;
781
0
        break;
782
0
    case MSGPACK_OBJECT_ARRAY:
783
0
        array = cfl_array_create(o->via.map.size);
784
0
        if (!array) {
785
0
            return -1;
786
0
        }
787
0
        ret = 0;
788
789
0
        for (i = 0; i < o->via.map.size; i++) {
790
0
            ret = mp_object_to_cfl((void *) &var, &o->via.array.ptr[i]);
791
0
            if (ret == CFL_OBJECT_KVLIST) {
792
0
                ret = cfl_array_append_kvlist(array, var);
793
0
            }
794
0
            else if (ret == CFL_OBJECT_VARIANT) {
795
0
                ret = cfl_array_append(array, var);
796
0
            }
797
0
            else if (ret == CFL_OBJECT_ARRAY) {
798
0
                ret = cfl_array_append_array(array, var);
799
0
            }
800
0
            else {
801
0
                ret = -1;
802
0
                break;
803
0
            }
804
0
        }
805
806
0
        if (ret == -1) {
807
0
            cfl_array_destroy(array);
808
0
            return -1;
809
0
        }
810
811
0
        *ptr = array;
812
0
        ret = CFL_OBJECT_ARRAY;
813
0
        break;
814
0
    case MSGPACK_OBJECT_MAP:
815
0
        kvlist = cfl_kvlist_create();
816
0
        if (!kvlist) {
817
0
            return -1;
818
0
        }
819
820
0
        ret = 0;
821
0
        for (i = 0; i < o->via.map.size; i++) {
822
0
            key = o->via.map.ptr[i].key;
823
0
            val = o->via.map.ptr[i].val;
824
825
            /* force key type to be strin, otherwise just abort */
826
0
            if (key.type != MSGPACK_OBJECT_STR) {
827
0
                ret = -1;
828
0
                break;
829
0
            }
830
831
            /* key variant is ready, now we need the value variant */
832
0
            ret = mp_object_to_cfl((void *) &var, &val);
833
0
            if (ret == -1) {
834
0
                 break;
835
0
            }
836
837
0
            if (ret == CFL_OBJECT_KVLIST) {
838
0
                ret = cfl_kvlist_insert_kvlist_s(kvlist,
839
0
                                                 (char *) key.via.str.ptr, key.via.str.size,
840
0
                                                 var);
841
0
            }
842
0
            else if (ret == CFL_OBJECT_VARIANT) {
843
0
                ret = cfl_kvlist_insert_s(kvlist,
844
0
                                          (char *) key.via.str.ptr, key.via.str.size,
845
0
                                          var);
846
0
            }
847
0
            else if (ret == CFL_OBJECT_ARRAY) {
848
0
                ret = cfl_kvlist_insert_array_s(kvlist,
849
0
                                                (char *) key.via.str.ptr, key.via.str.size,
850
0
                                                var);
851
0
            }
852
0
            else {
853
0
                ret = -1;
854
0
                break;
855
0
            }
856
0
        }
857
858
0
        if (ret == -1) {
859
0
            cfl_kvlist_destroy(kvlist);
860
0
            return -1;
861
0
        }
862
863
0
        *ptr = kvlist;
864
0
        ret = CFL_OBJECT_KVLIST;
865
0
        break;
866
0
    default:
867
0
        break;
868
0
    }
869
870
0
    return ret;
871
0
}
872
873
874
/* Convert a msgpack object to a cfl_object */
875
struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o)
876
0
{
877
0
    int ret;
878
0
    void *out = NULL;
879
0
    struct cfl_object *obj;
880
881
    /* For now, only allow to convert to map (kvlist) or array */
882
0
    if (o->type != MSGPACK_OBJECT_MAP && o->type != MSGPACK_OBJECT_ARRAY) {
883
0
        return NULL;
884
0
    }
885
886
0
    obj = cfl_object_create();
887
0
    if (!obj) {
888
0
        return NULL;
889
0
    }
890
891
0
    ret = mp_object_to_cfl(&out, o);
892
0
    if (ret < 0) {
893
0
        cfl_object_destroy(obj);
894
0
        return NULL;
895
0
    }
896
897
0
    ret = cfl_object_set(obj, ret, out);
898
0
    if (ret == -1) {
899
0
        if (ret == CFL_OBJECT_KVLIST) {
900
0
            cfl_kvlist_destroy(out);
901
0
        }
902
0
        else if (ret == CFL_OBJECT_ARRAY) {
903
0
            cfl_array_destroy(out);
904
0
        }
905
0
        cfl_object_destroy(obj);
906
0
        return NULL;
907
0
    }
908
909
0
    return obj;
910
0
}
911
912
static int mp_cfl_to_msgpack(struct cfl_variant *var,
913
                             msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck)
914
0
{
915
0
    int i;
916
0
    int ret;
917
0
    struct cfl_list *head;
918
0
    struct cfl_kvpair *kv;
919
0
    struct cfl_kvlist *kvlist;
920
0
    struct cfl_variant *variant;
921
0
    struct flb_mp_map_header mh;
922
923
0
    switch (var->type) {
924
0
        case CFL_VARIANT_BOOL:
925
0
            if (var->data.as_bool) {
926
0
                msgpack_pack_true(mp_pck);
927
0
            }
928
0
            else {
929
0
                msgpack_pack_false(mp_pck);
930
0
            }
931
0
            break;
932
0
        case CFL_VARIANT_INT:
933
0
            msgpack_pack_int64(mp_pck, var->data.as_int64);
934
0
            break;
935
0
        case CFL_VARIANT_UINT:
936
0
            msgpack_pack_uint64(mp_pck, var->data.as_uint64);
937
0
            break;
938
0
        case CFL_VARIANT_DOUBLE:
939
0
            msgpack_pack_double(mp_pck, var->data.as_double);
940
0
            break;
941
0
        case CFL_VARIANT_NULL:
942
0
            msgpack_pack_nil(mp_pck);
943
0
            break;
944
0
        case CFL_VARIANT_REFERENCE:
945
            /* we don't save references */
946
0
            break;
947
0
        case CFL_VARIANT_STRING:
948
0
            msgpack_pack_str(mp_pck, cfl_variant_size_get(var));
949
0
            msgpack_pack_str_body(mp_pck,
950
0
                                  var->data.as_string, cfl_variant_size_get(var));
951
0
            break;
952
0
        case CFL_VARIANT_BYTES:
953
0
            msgpack_pack_bin(mp_pck, cfl_variant_size_get(var));
954
0
            msgpack_pack_bin_body(mp_pck,
955
0
                                  var->data.as_bytes, cfl_variant_size_get(var));
956
0
            break;
957
0
        case CFL_VARIANT_ARRAY:
958
0
        msgpack_pack_array(mp_pck, var->data.as_array->entry_count);
959
0
            for (i = 0; i < var->data.as_array->entry_count; i++) {
960
0
                variant = var->data.as_array->entries[i];
961
0
                ret = mp_cfl_to_msgpack(variant, mp_sbuf, mp_pck);
962
0
                if (ret == -1) {
963
0
                    return -1;
964
0
                }
965
0
            }
966
0
            break;
967
0
        case CFL_VARIANT_KVLIST:
968
0
            kvlist = var->data.as_kvlist;
969
0
            flb_mp_map_header_init(&mh, mp_pck);
970
0
            cfl_list_foreach(head, &kvlist->list) {
971
0
                kv = cfl_list_entry(head, struct cfl_kvpair, _head);
972
973
0
                flb_mp_map_header_append(&mh);
974
975
                /* key */
976
0
                msgpack_pack_str(mp_pck, cfl_sds_len(kv->key));
977
0
                msgpack_pack_str_body(mp_pck, kv->key, cfl_sds_len(kv->key));
978
979
                /* value */
980
0
                ret = mp_cfl_to_msgpack(kv->val, mp_sbuf,  mp_pck);
981
0
                if (ret == -1) {
982
0
                    return -1;
983
0
                }
984
0
            }
985
0
            flb_mp_map_header_end(&mh);
986
0
            break;
987
0
    }
988
989
0
    return 0;
990
0
}
991
992
/* Convert a CFL Object and serialize it content in a msgpack buffer */
993
int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size)
994
0
{
995
0
    int ret;
996
0
    msgpack_sbuffer mp_sbuf;
997
0
    msgpack_packer mp_pck;
998
999
0
    if (!obj) {
1000
0
        return -1;
1001
0
    }
1002
1003
    /* unitialized CFL object ? */
1004
0
    if (obj->type == CFL_OBJECT_NONE) {
1005
0
        return -1;
1006
0
    }
1007
1008
    /* initialize msgpack buffer */
1009
0
    msgpack_sbuffer_init(&mp_sbuf);
1010
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1011
1012
0
    ret = mp_cfl_to_msgpack(obj->variant, &mp_sbuf, &mp_pck);
1013
0
    if (ret == -1) {
1014
0
        return -1;
1015
0
    }
1016
1017
0
    *out_buf = mp_sbuf.data;
1018
0
    *out_size = mp_sbuf.size;
1019
1020
0
    return 0;
1021
0
}
1022
1023
struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj *chunk_cobj)
1024
0
{
1025
0
    struct flb_mp_chunk_record *record;
1026
1027
0
    record = flb_calloc(1, sizeof(struct flb_mp_chunk_record));
1028
0
    if (!record) {
1029
0
        flb_errno();
1030
0
        return NULL;
1031
0
    }
1032
0
    record->modified = FLB_FALSE;
1033
1034
0
    return record;
1035
0
}
1036
1037
struct flb_mp_chunk_cobj *flb_mp_chunk_cobj_create(struct flb_log_event_encoder *log_encoder, struct flb_log_event_decoder *log_decoder)
1038
0
{
1039
0
    struct flb_mp_chunk_cobj *chunk_cobj;
1040
1041
0
    if (!log_encoder || !log_decoder) {
1042
0
        return NULL;
1043
0
    }
1044
1045
0
    chunk_cobj = flb_calloc(1, sizeof(struct flb_mp_chunk_cobj));
1046
0
    if (!chunk_cobj) {
1047
0
        flb_errno();
1048
0
        return NULL;
1049
0
    }
1050
0
    cfl_list_init(&chunk_cobj->records);
1051
0
    chunk_cobj->record_pos  = NULL;
1052
0
    chunk_cobj->log_encoder = log_encoder;
1053
0
    chunk_cobj->log_decoder = log_decoder;
1054
1055
0
    return chunk_cobj;
1056
0
}
1057
1058
static int generate_empty_msgpack_map(char **out_buf, size_t *out_size)
1059
0
{
1060
0
    msgpack_sbuffer mp_sbuf;
1061
0
    msgpack_packer mp_pck;
1062
1063
    /* initialize msgpack buffer */
1064
0
    msgpack_sbuffer_init(&mp_sbuf);
1065
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1066
1067
0
    msgpack_pack_map(&mp_pck, 0);
1068
1069
0
    *out_buf = mp_sbuf.data;
1070
0
    *out_size = mp_sbuf.size;
1071
1072
0
    return 0;
1073
0
}
1074
1075
int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size)
1076
0
{
1077
0
    int ret;
1078
0
    char *mp_buf;
1079
0
    size_t mp_size;
1080
0
    struct cfl_list *head;
1081
0
    struct flb_mp_chunk_record *record;
1082
1083
0
    if (!chunk_cobj) {
1084
0
        return -1;
1085
0
    }
1086
1087
    /* Iterate all records */
1088
0
    cfl_list_foreach(head, &chunk_cobj->records) {
1089
0
        record = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
1090
1091
0
        ret = flb_log_event_encoder_begin_record(chunk_cobj->log_encoder);
1092
0
        if (ret == -1) {
1093
0
            return -1;
1094
0
        }
1095
1096
0
        ret = flb_log_event_encoder_set_timestamp(chunk_cobj->log_encoder, &record->event.timestamp);
1097
0
        if (ret == -1) {
1098
0
            return -1;
1099
0
        }
1100
1101
0
        if (record->cobj_metadata) {
1102
0
            ret = flb_mp_cfl_to_msgpack(record->cobj_metadata, &mp_buf, &mp_size);
1103
0
            if (ret == -1) {
1104
0
                return -1;
1105
0
            }
1106
0
        }
1107
0
        else {
1108
0
            ret = generate_empty_msgpack_map(&mp_buf, &mp_size);
1109
0
            if (ret == -1) {
1110
0
                return -1;
1111
0
            }
1112
0
        }
1113
1114
0
        ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(chunk_cobj->log_encoder, mp_buf, mp_size);
1115
0
        if (ret != FLB_EVENT_ENCODER_SUCCESS) {
1116
0
            flb_free(mp_buf);
1117
0
            return -1;
1118
0
        }
1119
0
        flb_free(mp_buf);
1120
1121
0
        if (record->cobj_record) {
1122
0
            ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size);
1123
0
            if (ret == -1) {
1124
0
                return -1;
1125
0
            }
1126
0
        }
1127
0
        else {
1128
0
            ret = generate_empty_msgpack_map(&mp_buf, &mp_size);
1129
0
            if (ret == -1) {
1130
0
                return -1;
1131
0
            }
1132
0
        }
1133
1134
0
        ret = flb_log_event_encoder_set_body_from_raw_msgpack(chunk_cobj->log_encoder, mp_buf, mp_size);
1135
0
        if (ret != FLB_EVENT_ENCODER_SUCCESS) {
1136
0
            flb_free(mp_buf);
1137
0
            return -1;
1138
0
        }
1139
0
        flb_free(mp_buf);
1140
1141
0
        ret = flb_log_event_encoder_commit_record(chunk_cobj->log_encoder);
1142
0
        if (ret == -1) {
1143
0
            return -1;
1144
0
        }
1145
0
    }
1146
1147
    /* set new output buffer */
1148
0
    *out_buf = chunk_cobj->log_encoder->output_buffer;
1149
0
    *out_size = chunk_cobj->log_encoder->output_length;
1150
1151
0
    flb_log_event_encoder_claim_internal_buffer_ownership(chunk_cobj->log_encoder);
1152
0
    return 0;
1153
0
}
1154
1155
int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj)
1156
0
{
1157
0
    struct cfl_list *tmp;
1158
0
    struct cfl_list *head;
1159
0
    struct flb_mp_chunk_record *record;
1160
1161
0
    if (!chunk_cobj) {
1162
0
        return -1;
1163
0
    }
1164
1165
0
    cfl_list_foreach_safe(head, tmp, &chunk_cobj->records) {
1166
0
        record = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
1167
0
        if (record->cobj_metadata) {
1168
0
            cfl_object_destroy(record->cobj_metadata);
1169
0
        }
1170
0
        if (record->cobj_record) {
1171
0
            cfl_object_destroy(record->cobj_record);
1172
0
        }
1173
0
        cfl_list_del(&record->_head);
1174
0
        flb_free(record);
1175
0
    }
1176
1177
0
    flb_free(chunk_cobj);
1178
0
    return 0;
1179
0
}
1180
1181
int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
1182
                                  struct flb_mp_chunk_record **out_record)
1183
0
{
1184
0
    int ret = FLB_MP_CHUNK_RECORD_EOF;
1185
0
    size_t bytes;
1186
0
    struct flb_mp_chunk_record *record = NULL;
1187
1188
0
    *out_record = NULL;
1189
0
    bytes = chunk_cobj->log_decoder->length - chunk_cobj->log_decoder->offset;
1190
1191
    /*
1192
     * if there are remaining decoder bytes, keep iterating msgpack and populate
1193
     * the cobj list. Otherwise it means all the content is ready as a chunk_cobj_record.
1194
     */
1195
0
    if (bytes > 0) {
1196
0
        record = flb_mp_chunk_record_create(chunk_cobj);
1197
0
        if (!record) {
1198
0
            return FLB_MP_CHUNK_RECORD_ERROR;
1199
0
        }
1200
1201
0
        ret = flb_log_event_decoder_next(chunk_cobj->log_decoder, &record->event);
1202
0
        if (ret != FLB_EVENT_DECODER_SUCCESS) {
1203
0
            flb_free(record);
1204
0
            return -1;
1205
0
        }
1206
1207
0
        record->cobj_metadata = flb_mp_object_to_cfl(record->event.metadata);
1208
0
        if (!record->cobj_metadata) {
1209
0
            flb_free(record);
1210
0
            return FLB_MP_CHUNK_RECORD_ERROR;
1211
0
        }
1212
1213
0
        record->cobj_record = flb_mp_object_to_cfl(record->event.body);
1214
0
        if (!record->cobj_record) {
1215
0
            cfl_object_destroy(record->cobj_metadata);
1216
0
            flb_free(record);
1217
0
            return -1;
1218
0
        }
1219
1220
0
        cfl_list_add(&record->_head, &chunk_cobj->records);
1221
0
        ret = FLB_MP_CHUNK_RECORD_OK;
1222
0
    }
1223
0
    else if (chunk_cobj->record_pos != NULL) {
1224
        /* is the actual record the last one ? */
1225
0
        if (chunk_cobj->record_pos == cfl_list_entry_last(&chunk_cobj->records, struct flb_mp_chunk_record, _head)) {
1226
0
            chunk_cobj->record_pos = NULL;
1227
0
            return FLB_MP_CHUNK_RECORD_EOF;
1228
0
        }
1229
1230
0
         record = cfl_list_entry_next(&chunk_cobj->record_pos->_head, struct flb_mp_chunk_record,
1231
0
                                      _head, &chunk_cobj->records);
1232
0
         ret = FLB_MP_CHUNK_RECORD_OK;
1233
0
    }
1234
0
    else {
1235
0
        if (cfl_list_size(&chunk_cobj->records) == 0) {
1236
0
            return FLB_MP_CHUNK_RECORD_EOF;
1237
0
        }
1238
1239
        /* check if we are the last in the list */
1240
0
        record = cfl_list_entry_first(&chunk_cobj->records, struct flb_mp_chunk_record, _head);
1241
0
        ret = FLB_MP_CHUNK_RECORD_OK;
1242
0
    }
1243
1244
0
    chunk_cobj->record_pos = record;
1245
0
    *out_record = chunk_cobj->record_pos;
1246
1247
0
    return ret;
1248
0
}
1249
1250
int flb_mp_chunk_cobj_record_destroy(struct flb_mp_chunk_cobj *chunk_cobj,
1251
                                     struct flb_mp_chunk_record *record)
1252
0
{
1253
0
    struct flb_mp_chunk_record *first;
1254
0
    struct flb_mp_chunk_record *last;
1255
1256
0
    if (!record) {
1257
0
        return -1;
1258
0
    }
1259
1260
0
    if (chunk_cobj && chunk_cobj->record_pos) {
1261
0
        first = cfl_list_entry_first(&chunk_cobj->records, struct flb_mp_chunk_record, _head);
1262
0
        last = cfl_list_entry_last(&chunk_cobj->records, struct flb_mp_chunk_record, _head);
1263
1264
0
        if (record == first || record == last) {
1265
0
            chunk_cobj->record_pos = NULL;
1266
0
        }
1267
0
    }
1268
1269
0
    if (record->cobj_metadata) {
1270
0
        cfl_object_destroy(record->cobj_metadata);
1271
0
    }
1272
0
    if (record->cobj_record) {
1273
0
        cfl_object_destroy(record->cobj_record);
1274
0
    }
1275
1276
0
    cfl_list_del(&record->_head);
1277
0
    flb_free(record);
1278
1279
0
    return 0;
1280
0
}