Coverage Report

Created: 2023-01-10 06:17

/src/fluent-bit/src/flb_mp.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_utils.h>
22
#include <fluent-bit/flb_mem.h>
23
#include <fluent-bit/flb_log.h>
24
#include <fluent-bit/flb_mp.h>
25
#include <fluent-bit/flb_slist.h>
26
#include <fluent-bit/flb_record_accessor.h>
27
#include <fluent-bit/flb_metrics.h>
28
29
#include <msgpack.h>
30
#include <mpack/mpack.h>
31
32
/* don't do this at home */
33
0
#define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
34
0
#define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
35
36
/* Return the number of msgpack serialized events in the buffer */
37
int flb_mp_count(const void *data, size_t bytes)
38
0
{
39
0
    return flb_mp_count_remaining(data, bytes, NULL);
40
0
}
41
42
int flb_mp_count_remaining(const void *data, size_t bytes, size_t *remaining_bytes)
43
0
{
44
0
    size_t remaining;
45
0
    int count = 0;
46
0
    mpack_reader_t reader;
47
48
0
    mpack_reader_init_data(&reader, (const char *) data, bytes);
49
0
    for (;;) {
50
0
        remaining = mpack_reader_remaining(&reader, NULL);
51
0
        if (!remaining) {
52
0
            break;
53
0
        }
54
0
        mpack_discard(&reader);
55
0
        if (mpack_reader_error(&reader)) {
56
0
            break;
57
0
        }
58
0
        count++;
59
0
    }
60
61
0
    if (remaining_bytes) {
62
0
        *remaining_bytes = remaining;
63
0
    }
64
0
    mpack_reader_destroy(&reader);
65
0
    return count;
66
0
}
67
68
int flb_mp_validate_metric_chunk(const void *data, size_t bytes,
69
                                 int *out_series, size_t *processed_bytes)
70
0
{
71
0
    int ret;
72
0
    int ok = CMT_DECODE_MSGPACK_SUCCESS;
73
0
    int count = 0;
74
0
    size_t off = 0;
75
0
    size_t pre_off = 0;
76
0
    struct cmt *cmt;
77
78
0
    while ((ret = cmt_decode_msgpack_create(&cmt,
79
0
                                            (char *) data, bytes, &off)) == ok) {
80
0
        cmt_destroy(cmt);
81
0
        count++;
82
0
        pre_off = off;
83
0
    }
84
85
0
    switch (ret) {
86
0
        case CMT_DECODE_MSGPACK_INVALID_ARGUMENT_ERROR:
87
0
        case CMT_DECODE_MSGPACK_CORRUPT_INPUT_DATA_ERROR:
88
0
        case CMT_DECODE_MSGPACK_CONSUME_ERROR:
89
0
        case CMT_DECODE_MSGPACK_ENGINE_ERROR:
90
0
        case CMT_DECODE_MSGPACK_PENDING_MAP_ENTRIES:
91
0
        case CMT_DECODE_MSGPACK_PENDING_ARRAY_ENTRIES:
92
0
        case CMT_DECODE_MSGPACK_UNEXPECTED_KEY_ERROR:
93
0
        case CMT_DECODE_MSGPACK_UNEXPECTED_DATA_TYPE_ERROR:
94
0
        case CMT_DECODE_MSGPACK_DICTIONARY_LOOKUP_ERROR:
95
0
        case CMT_DECODE_MSGPACK_VERSION_ERROR:
96
0
            goto error;
97
0
    }
98
99
0
    if (ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && off == bytes) {
100
0
        *out_series = count;
101
0
        *processed_bytes = pre_off;
102
0
        return 0;
103
0
    }
104
105
0
error:
106
0
    *out_series = count;
107
0
    *processed_bytes = pre_off;
108
109
0
    return -1;
110
0
}
111
112
int flb_mp_validate_log_chunk(const void *data, size_t bytes,
113
                              int *out_records, size_t *processed_bytes)
114
0
{
115
0
    int ret;
116
0
    int count = 0;
117
0
    size_t off = 0;
118
0
    size_t pre_off = 0;
119
0
    size_t ptr_size;
120
0
    unsigned char *ptr;
121
0
    msgpack_object array;
122
0
    msgpack_object ts;
123
0
    msgpack_object record;
124
0
    msgpack_unpacked result;
125
126
0
    msgpack_unpacked_init(&result);
127
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
128
0
        array = result.data;
129
130
0
        if (array.type != MSGPACK_OBJECT_ARRAY) {
131
            /*
132
             * Sometimes there is a special case: Chunks might have extra zero
133
             * bytes at the end of a record, meaning: no more records. This is not
134
             * an error and actually it happens if a previous run of Fluent Bit
135
             * was stopped/killed before to adjust the file size.
136
             *
137
             * Just validate if all bytes are zero, if so, adjust counters
138
             * and return zero.
139
             */
140
0
            ptr = (unsigned char *) (data);
141
0
            ptr += pre_off;
142
0
            if (ptr[0] != 0) {
143
0
                goto error;
144
0
            }
145
146
0
            ptr_size = bytes - pre_off;
147
0
            ret = memcmp(ptr, ptr + 1, ptr_size - 1);
148
0
            if (ret == 0) {
149
                /*
150
                 * The chunk is valid, just let the caller know the last processed
151
                 * valid byte.
152
                 */
153
0
                msgpack_unpacked_destroy(&result);
154
0
                *out_records = count;
155
0
                *processed_bytes = pre_off;
156
0
                return 0;
157
0
            }
158
0
            goto error;
159
0
        }
160
161
0
        if (array.via.array.size != 2) {
162
0
            goto error;
163
0
        }
164
165
0
        ts = array.via.array.ptr[0];
166
0
        record = array.via.array.ptr[1];
167
168
0
        if (ts.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
169
0
            ts.type != MSGPACK_OBJECT_FLOAT &&
170
0
            ts.type != MSGPACK_OBJECT_EXT) {
171
0
            goto error;
172
0
        }
173
174
0
        if (record.type != MSGPACK_OBJECT_MAP) {
175
0
            goto error;
176
0
        }
177
178
0
        count++;
179
0
        pre_off = off;
180
0
    }
181
182
0
    msgpack_unpacked_destroy(&result);
183
0
    *out_records = count;
184
0
    *processed_bytes = pre_off;
185
0
    return 0;
186
187
0
 error:
188
0
    msgpack_unpacked_destroy(&result);
189
0
    *out_records = count;
190
0
    *processed_bytes = pre_off;
191
192
0
    return -1;
193
0
}
194
195
/* Adjust a mspack header buffer size */
196
void flb_mp_set_map_header_size(char *buf, int size)
197
0
{
198
0
    uint8_t h;
199
0
    char *tmp = buf;
200
201
0
    h = tmp[0];
202
0
    if (h >> 4 == 0x8) { /* 1000xxxx */
203
0
        *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) size);
204
0
    }
205
0
    else if (h == 0xde) {
206
0
        tmp++;
207
0
        pack_uint16(tmp, size);
208
0
    }
209
0
    else if (h == 0xdf) {
210
0
        tmp++;
211
0
        pack_uint32(tmp, size);
212
0
    }
213
0
}
214
215
void flb_mp_set_array_header_size(char *buf, int size)
216
0
{
217
0
    uint8_t h;
218
0
    char *tmp = buf;
219
220
0
    h = tmp[0];
221
0
    if (h >> 4 == 0x9) { /* 1001xxxx */
222
0
        *tmp = (uint8_t) 0x9 << 4 | ((uint8_t) size);
223
0
    }
224
0
    else if (h == 0xdc) {
225
0
        tmp++;
226
0
        pack_uint16(tmp, size);
227
0
    }
228
0
    else if (h == 0xdd) {
229
0
        tmp++;
230
0
        pack_uint32(tmp, size);
231
0
    }
232
0
}
233
234
/*
235
 * msgpack-c requires to set the number of the entries in a map beforehand. For our
236
 * use case this adds some complexity, having developers to count all possible
237
 * entries that might be added.
238
 *
239
 * As a workaround and to avoid map's recomposition over and over, this simple API
240
 * allows to initialize the array header, 'register' new entries (as counters) and
241
 * finalize, upon finalization the proper array header size is adjusted.
242
 *
243
 * To make things easier, we make sure msgpack-c always register an array type of
244
 * 32 bits (identified by 0xdf, for number of entries >= 65536). Yes, for every
245
 * array using this API it will use 2 more bytes, not a big ideal. So whoever
246
 * uses this API, use it only if you don't know the exact number of entries to add.
247
 *
248
 * MANDATORY: make sure to always initialize, register every entry and finalize,
249
 * otherwise you will get a corrupted or incomplete msgpack buffer.
250
 *
251
 * Usage example
252
 * =============
253
 *
254
 *  struct flb_mp_map_header mh;
255
 *
256
 *  flb_mp_map_header_init(&mh, mp_pck);
257
 *
258
 *  -- First key/value entry --
259
 *  flb_mp_map_header_append(&mh);
260
 *  msgpack_pack_str(mp_pck, 4);
261
 *  msgpack_pack_str_body(mp_pck, "cool", 4);
262
 *  msgpack_pack_true(mp_pck);
263
 *
264
 *  -- Second key/value entry --
265
 *  flb_mp_map_header_append(&mh);
266
 *  msgpack_pack_str(mp_pck, 4);
267
 *  msgpack_pack_str_body(mp_pck, "slow", 4);
268
 *  msgpack_pack_false(mp_pck);
269
 *
270
 *  -- Finalize Map --
271
 *  flb_mp_map_header_end(&mh);
272
 */
273
274
static inline void mp_header_type_init(struct flb_mp_map_header *mh,
275
                                       msgpack_packer *mp_pck,
276
                                       int type)
277
0
{
278
0
    msgpack_sbuffer *mp_sbuf;
279
280
0
    mp_sbuf = (msgpack_sbuffer *) mp_pck->data;
281
282
    /* map sbuffer */
283
0
    mh->data = mp_pck->data;
284
285
    /* Reset entries */
286
0
    mh->entries = 0;
287
288
    /* Store the next byte available */
289
0
    mh->offset = mp_sbuf->size;
290
0
}
291
292
int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
293
0
{
294
    /* Initialize context for a map */
295
0
    mp_header_type_init(mh, mp_pck, FLB_MP_MAP);
296
297
    /*
298
     * Pack a map with size = 65536, so we force the underlaying msgpack-c
299
     * to use a 32 bit buffer size (0xdf), reference:
300
     *
301
     * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
302
     */
303
0
    return msgpack_pack_map(mp_pck, 65536);
304
0
}
305
306
int flb_mp_array_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
307
0
{
308
    /* Initialize context for a map */
309
0
    mp_header_type_init(mh, mp_pck, FLB_MP_ARRAY);
310
311
    /*
312
     * Pack a map with size = 65536, so we force the underlaying msgpack-c
313
     * to use a 32 bit buffer size (0xdf), reference:
314
     *
315
     * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
316
     */
317
0
    return msgpack_pack_array(mp_pck, 65536);
318
0
}
319
320
321
int flb_mp_map_header_append(struct flb_mp_map_header *mh)
322
0
{
323
0
    mh->entries++;
324
0
    return mh->entries;
325
0
}
326
327
int flb_mp_array_header_append(struct flb_mp_map_header *mh)
328
0
{
329
0
    mh->entries++;
330
0
    return mh->entries;
331
0
}
332
333
void flb_mp_map_header_end(struct flb_mp_map_header *mh)
334
0
{
335
0
    char *ptr;
336
0
    msgpack_sbuffer *mp_sbuf;
337
338
0
    mp_sbuf = mh->data;
339
0
    ptr = (char *) mp_sbuf->data + mh->offset;
340
0
    flb_mp_set_map_header_size(ptr, mh->entries);
341
0
}
342
343
void flb_mp_array_header_end(struct flb_mp_map_header *mh)
344
0
{
345
0
    char *ptr;
346
0
    msgpack_sbuffer *mp_sbuf;
347
348
0
    mp_sbuf = mh->data;
349
0
    ptr = (char *) mp_sbuf->data + mh->offset;
350
0
    flb_mp_set_array_header_size(ptr, mh->entries);
351
0
}
352
353
/*
354
 * Create an 'mp accessor' context: this context allows to create a list of
355
 * record accessor patterns based on a 'slist' context, where every slist string
356
 * buffer represents a key accessor.
357
 */
358
struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
359
0
{
360
0
    size_t size;
361
0
    struct mk_list *head;
362
0
    struct flb_slist_entry *entry;
363
0
    struct flb_record_accessor *ra;
364
0
    struct flb_mp_accessor *mpa;
365
366
    /* Allocate context */
367
0
    mpa = flb_calloc(1, sizeof(struct flb_mp_accessor));
368
0
    if (!mpa) {
369
0
        flb_errno();
370
0
        return NULL;
371
0
    }
372
0
    mk_list_init(&mpa->ra_list);
373
374
0
    mk_list_foreach(head, slist_patterns) {
375
0
        entry = mk_list_entry(head, struct flb_slist_entry, _head);
376
377
        /* Create the record accessor context */
378
0
        ra = flb_ra_create(entry->str, FLB_TRUE);
379
0
        if (!ra) {
380
0
            flb_error("[mp accessor] could not create entry for pattern '%s'",
381
0
                      entry->str);
382
0
            flb_mp_accessor_destroy(mpa);
383
0
            return NULL;
384
0
        }
385
0
        mk_list_add(&ra->_head, &mpa->ra_list);
386
0
    }
387
388
0
    if (mk_list_size(&mpa->ra_list) == 0) {
389
0
        return mpa;
390
0
    }
391
392
0
    size = sizeof(struct flb_mp_accessor_match) * mk_list_size(&mpa->ra_list);
393
0
    mpa->matches_size = size;
394
0
    mpa->matches = flb_calloc(1, size);
395
0
    if (!mpa->matches) {
396
0
        flb_errno();
397
0
        flb_mp_accessor_destroy(mpa);
398
0
        return NULL;
399
0
    }
400
401
0
    return mpa;
402
0
}
403
404
static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
405
                                          msgpack_object *key)
406
0
{
407
0
    int i;
408
0
    int count;
409
0
    struct flb_mp_accessor_match *match;
410
411
0
    count = mk_list_size(&mpa->ra_list);
412
0
    for (i = 0; i < count; i++) {
413
0
        match = &mpa->matches[i];
414
0
        if (match->matched == FLB_FALSE) {
415
0
            continue;
416
0
        }
417
418
0
        if (match->start_key == key) {
419
0
            return i;
420
0
        }
421
0
    }
422
423
0
    return -1;
424
0
}
425
426
static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
427
                                    msgpack_packer *mp_pck,
428
                                    msgpack_object *key,
429
                                    msgpack_object *val)
430
0
{
431
0
    int i;
432
0
    int ret;
433
0
    msgpack_object *k;
434
0
    msgpack_object *v;
435
0
    struct flb_mp_map_header mh;
436
437
0
    if (match->key == key || match->key == val) {
438
0
        return FLB_FALSE;
439
0
    }
440
441
0
    if (key) {
442
0
        msgpack_pack_object(mp_pck, *key);
443
0
    }
444
445
0
    if (val->type == MSGPACK_OBJECT_MAP) {
446
0
        flb_mp_map_header_init(&mh, mp_pck);
447
0
        for (i = 0; i < val->via.map.size; i++) {
448
0
            k = &val->via.map.ptr[i].key;
449
0
            v = &val->via.map.ptr[i].val;
450
451
0
            ret = accessor_sub_pack(match, mp_pck, k, v);
452
0
            if (ret == FLB_TRUE) {
453
0
                flb_mp_map_header_append(&mh);
454
0
            }
455
0
        }
456
0
        flb_mp_map_header_end(&mh);
457
0
    }
458
0
    else if (val->type == MSGPACK_OBJECT_ARRAY) {
459
0
        flb_mp_array_header_init(&mh, mp_pck);
460
0
        for (i = 0; i < val->via.array.size; i++) {
461
0
            v = &val->via.array.ptr[i];
462
0
            ret = accessor_sub_pack(match, mp_pck, NULL, v);
463
0
            if (ret == FLB_TRUE) {
464
0
                flb_mp_array_header_append(&mh);
465
0
            }
466
0
        }
467
0
        flb_mp_array_header_end(&mh);
468
0
    }
469
0
    else {
470
0
        msgpack_pack_object(mp_pck, *val);
471
0
    }
472
473
0
    return FLB_TRUE;
474
0
}
475
476
/*
477
 * Remove keys or nested keys from a map. It compose the final result in a
478
 * new buffer. On error, it returns -1, if the map was modified it returns FLB_TRUE,
479
 * if no modification was required it returns FLB_FALSE.
480
 */
481
int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
482
                                msgpack_object *map,
483
                                void **out_buf, size_t *out_size)
484
0
{
485
0
    int i;
486
0
    int ret;
487
0
    int rule_id = 0;
488
0
    int matches = 0;
489
0
    msgpack_object *key;
490
0
    msgpack_object *val;
491
0
    msgpack_object *s_key;
492
0
    msgpack_object *o_key;
493
0
    msgpack_object *o_val;
494
0
    struct mk_list *head;
495
0
    struct flb_record_accessor *ra;
496
0
    struct flb_mp_accessor_match *match;
497
0
    struct flb_mp_map_header mh;
498
0
    msgpack_sbuffer mp_sbuf;
499
0
    msgpack_packer mp_pck;
500
501
0
    if (map->via.map.size == 0) {
502
0
        return FLB_FALSE;
503
0
    }
504
505
    /* Reset matches cache */
506
0
    memset(mpa->matches, '\0', mpa->matches_size);
507
508
0
    mk_list_foreach(head, &mpa->ra_list) {
509
0
        ra = mk_list_entry(head, struct flb_record_accessor, _head);
510
511
        /* Apply the record accessor rule against the map */
512
0
        ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val);
513
0
        if (ret == 0) {
514
            /* There is a match, register in the matches table */
515
0
            match = &mpa->matches[rule_id];
516
0
            match->matched = FLB_TRUE;
517
0
            match->start_key = s_key;        /* Initial key path that matched */
518
0
            match->key = o_key;              /* Final key that matched */
519
0
            match->val = o_val;              /* Final value */
520
0
            match->ra = ra;                  /* Record accessor context */
521
0
            matches++;
522
0
        }
523
0
        rule_id++;
524
0
    }
525
526
    /* If no matches, no modifications were made */
527
0
    if (matches == 0) {
528
0
        return FLB_FALSE;
529
0
    }
530
531
    /* Some rules matched, compose a new outgoing buffer */
532
0
    msgpack_sbuffer_init(&mp_sbuf);
533
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
534
535
    /* Initialize map */
536
0
    flb_mp_map_header_init(&mh, &mp_pck);
537
538
0
    for (i = 0; i < map->via.map.size; i++) {
539
0
        key = &map->via.map.ptr[i].key;
540
0
        val = &map->via.map.ptr[i].val;
541
542
        /*
543
         * For every entry on the path, check if we should do a step-by-step
544
         * repackaging or just pack the whole object.
545
         *
546
         * Just check: does this 'key' exists on any path of the record
547
         * accessor patterns ?
548
         *
549
         * Find if the active key in the map, matches an accessor rule, if
550
         * if match we get the match id as return value, otherwise -1.
551
         */
552
0
        ret = accessor_key_find_match(mpa, key);
553
0
        if (ret == -1) {
554
            /* No matches, it's ok to pack the kv pair */
555
0
            flb_mp_map_header_append(&mh);
556
0
            msgpack_pack_object(&mp_pck, *key);
557
0
            msgpack_pack_object(&mp_pck, *val);
558
0
        }
559
0
        else {
560
            /* The key has a match. Now we do a step-by-step packaging */
561
0
            match = &mpa->matches[ret];
562
0
            ret = accessor_sub_pack(match, &mp_pck, key, val);
563
0
            if (ret == FLB_TRUE) {
564
0
                flb_mp_map_header_append(&mh);
565
0
            }
566
0
        }
567
0
    }
568
0
    flb_mp_map_header_end(&mh);
569
570
0
    *out_buf = mp_sbuf.data;
571
0
    *out_size = mp_sbuf.size;
572
573
0
    return FLB_TRUE;
574
0
}
575
576
void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
577
0
{
578
0
    struct mk_list *tmp;
579
0
    struct mk_list *head;
580
0
    struct flb_record_accessor *ra;
581
582
0
    if (!mpa) {
583
0
        return;
584
0
    }
585
586
0
    mk_list_foreach_safe(head, tmp, &mpa->ra_list) {
587
0
        ra = mk_list_entry(head, struct flb_record_accessor, _head);
588
0
        mk_list_del(&ra->_head);
589
0
        flb_ra_destroy(ra);
590
0
    }
591
592
0
    if (mpa->matches) {
593
0
        flb_free(mpa->matches);
594
0
    }
595
0
    flb_free(mpa);
596
0
}