Coverage Report

Created: 2024-09-19 07:08

/src/fluent-bit/src/flb_log_event_decoder.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_log_event_decoder.h>
21
#include <fluent-bit/flb_byteswap.h>
22
#include <fluent-bit/flb_compat.h>
23
24
69.7k
static int create_empty_map(struct flb_log_event_decoder *context) {
25
69.7k
    msgpack_packer  packer;
26
69.7k
    msgpack_sbuffer buffer;
27
69.7k
    int             result;
28
69.7k
    size_t          offset;
29
30
69.7k
    result = FLB_EVENT_DECODER_SUCCESS;
31
32
69.7k
    context->empty_map = NULL;
33
34
69.7k
    msgpack_sbuffer_init(&buffer);
35
69.7k
    msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
36
37
69.7k
    result = msgpack_pack_map(&packer, 0);
38
39
69.7k
    if (result != 0) {
40
0
        result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE;
41
0
    }
42
69.7k
    else {
43
69.7k
        offset = 0;
44
45
69.7k
        msgpack_unpacked_init(&context->unpacked_empty_map);
46
47
69.7k
        result = msgpack_unpack_next(&context->unpacked_empty_map,
48
69.7k
                                     buffer.data,
49
69.7k
                                     buffer.size,
50
69.7k
                                     &offset);
51
52
69.7k
        if (result != MSGPACK_UNPACK_SUCCESS) {
53
0
            result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE;
54
0
        }
55
69.7k
        else {
56
69.7k
            context->empty_map = &context->unpacked_empty_map.data;
57
58
69.7k
            result = FLB_EVENT_DECODER_SUCCESS;
59
69.7k
        }
60
69.7k
    }
61
62
69.7k
    msgpack_sbuffer_destroy(&buffer);
63
64
69.7k
    return result;
65
69.7k
}
66
67
void flb_log_event_decoder_reset(struct flb_log_event_decoder *context,
68
                                 char *input_buffer,
69
                                 size_t input_length)
70
69.7k
{
71
69.7k
    context->offset = 0;
72
69.7k
    context->buffer = input_buffer;
73
69.7k
    context->length = input_length;
74
69.7k
    context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
75
76
69.7k
    msgpack_unpacked_destroy(&context->unpacked_event);
77
69.7k
    msgpack_unpacked_init(&context->unpacked_event);
78
69.7k
}
79
80
int flb_log_event_decoder_read_groups(struct flb_log_event_decoder *context,
81
                                      int read_groups)
82
0
{
83
0
    if (context == NULL) {
84
0
        return -1;
85
0
    }
86
87
0
    if (read_groups != FLB_TRUE && read_groups != FLB_FALSE) {
88
0
        return -1;
89
0
    }
90
0
    context->read_groups = read_groups;
91
0
    return 0;
92
0
}
93
94
int flb_log_event_decoder_init(struct flb_log_event_decoder *context,
95
                               char *input_buffer,
96
                               size_t input_length)
97
69.7k
{
98
69.7k
    if (context == NULL) {
99
0
        return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
100
0
    }
101
102
69.7k
    memset(context, 0, sizeof(struct flb_log_event_decoder));
103
104
69.7k
    context->dynamically_allocated = FLB_FALSE;
105
69.7k
    context->initialized = FLB_TRUE;
106
69.7k
    context->read_groups = FLB_TRUE;
107
108
69.7k
    flb_log_event_decoder_reset(context, input_buffer, input_length);
109
110
69.7k
    return create_empty_map(context);
111
69.7k
}
112
113
struct flb_log_event_decoder *flb_log_event_decoder_create(
114
    char *input_buffer,
115
    size_t input_length)
116
0
{
117
0
    struct flb_log_event_decoder *context;
118
0
    int                           result;
119
120
0
    context = (struct flb_log_event_decoder *) \
121
0
        flb_calloc(1, sizeof(struct flb_log_event_decoder));
122
123
0
    result = flb_log_event_decoder_init(context,
124
0
                                        input_buffer,
125
0
                                        input_length);
126
127
0
    if (context != NULL) {
128
0
        context->dynamically_allocated = FLB_TRUE;
129
0
        if (result != FLB_EVENT_DECODER_SUCCESS) {
130
0
            flb_log_event_decoder_destroy(context);
131
0
            context = NULL;
132
0
        }
133
0
    }
134
135
0
    return context;
136
0
}
137
138
void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context)
139
69.7k
{
140
69.7k
    int dynamically_allocated;
141
142
69.7k
    if (context != NULL) {
143
69.7k
        if (context->initialized) {
144
69.7k
            msgpack_unpacked_destroy(&context->unpacked_empty_map);
145
69.7k
            msgpack_unpacked_destroy(&context->unpacked_event);
146
69.7k
        }
147
148
69.7k
        dynamically_allocated = context->dynamically_allocated;
149
150
69.7k
        memset(context, 0, sizeof(struct flb_log_event_decoder));
151
152
        /* This might look silly and with most of the codebase including
153
         * this module as context it might be but just in case we choose
154
         * to stray away from the assumption of FLB_FALSE being zero and
155
         * FLB_TRUE being one in favor of explicitly comparing variables to
156
         * the the constants I will leave this here.
157
         */
158
69.7k
        context->initialized = FLB_FALSE;
159
160
69.7k
        if (dynamically_allocated) {
161
0
            flb_free(context);
162
0
        }
163
69.7k
    }
164
69.7k
}
165
166
int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
167
                                           struct flb_time *output)
168
185
{
169
185
    flb_time_zero(output);
170
171
185
    if (input->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
172
130
        output->tm.tv_sec  = input->via.u64;
173
130
    }
174
55
    else if(input->type == MSGPACK_OBJECT_FLOAT) {
175
18
        output->tm.tv_sec  = input->via.f64;
176
18
        output->tm.tv_nsec = ((input->via.f64 - output->tm.tv_sec) * 1000000000);
177
18
    }
178
37
    else if(input->type == MSGPACK_OBJECT_EXT) {
179
37
        if (input->via.ext.type != 0 || input->via.ext.size != 8) {
180
0
            return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
181
0
        }
182
183
37
        output->tm.tv_sec  = 
184
37
            (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
185
37
                        FLB_ALIGNED_DWORD_READ(
186
37
                            (unsigned char *) &input->via.ext.ptr[0]));
187
188
37
        output->tm.tv_nsec  = 
189
37
            (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
190
37
                        FLB_ALIGNED_DWORD_READ(
191
37
                            (unsigned char *) &input->via.ext.ptr[4]));
192
37
    }
193
0
    else {
194
0
        return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
195
0
    }
196
197
185
    return FLB_EVENT_DECODER_SUCCESS;
198
185
}
199
200
int flb_event_decoder_decode_object(struct flb_log_event_decoder *context,
201
                                    struct flb_log_event *event,
202
                                    msgpack_object *input)
203
67.3k
{
204
67.3k
    msgpack_object *timestamp;
205
67.3k
    msgpack_object *metadata;
206
67.3k
    int             result;
207
67.3k
    int             format;
208
67.3k
    msgpack_object *header;
209
67.3k
    msgpack_object *body;
210
67.3k
    msgpack_object *root;
211
212
67.3k
    memset(event, 0, sizeof(struct flb_log_event));
213
214
    /* Ensure that the root element is a 2 element array*/
215
67.3k
    root = input;
216
217
67.3k
    if (root->type != MSGPACK_OBJECT_ARRAY) {
218
66.5k
        return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE;
219
66.5k
    }
220
221
798
    if (root->via.array.size != \
222
798
        FLB_LOG_EVENT_EXPECTED_ROOT_ELEMENT_COUNT) {
223
545
        return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE;
224
545
    }
225
226
253
    header = &root->via.array.ptr[0];
227
228
    /* Determine if the first element is the header or
229
     * a legacy timestamp (int, float or ext).
230
     */
231
253
    if (header->type == MSGPACK_OBJECT_ARRAY) {
232
52
        if (header->via.array.size != \
233
52
            FLB_LOG_EVENT_EXPECTED_HEADER_ELEMENT_COUNT) {
234
12
            return FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE;
235
12
        }
236
237
40
        timestamp = &header->via.array.ptr[0];
238
40
        metadata = &header->via.array.ptr[1];
239
240
40
        format = FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2;
241
40
    }
242
201
    else {
243
201
        header = NULL;
244
201
        timestamp = &root->via.array.ptr[0];
245
201
        metadata = context->empty_map;
246
247
201
        format = FLB_LOG_EVENT_FORMAT_FORWARD;
248
201
    }
249
250
241
    if (timestamp->type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
251
241
        timestamp->type != MSGPACK_OBJECT_FLOAT &&
252
241
        timestamp->type != MSGPACK_OBJECT_EXT) {
253
43
        return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
254
43
    }
255
256
198
    if (metadata->type != MSGPACK_OBJECT_MAP) {
257
0
        return FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE;
258
0
    }
259
260
198
    body = &root->via.array.ptr[1];
261
262
198
    if (body->type != MSGPACK_OBJECT_MAP) {
263
13
        return FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE;
264
13
    }
265
266
185
    result = flb_log_event_decoder_decode_timestamp(timestamp, &event->timestamp);
267
268
185
    if (result != FLB_EVENT_DECODER_SUCCESS) {
269
0
        return result;
270
0
    }
271
272
185
    event->raw_timestamp = timestamp;
273
185
    event->metadata = metadata;
274
185
    event->format = format;
275
185
    event->body = body;
276
185
    event->root = root;
277
278
185
    context->record_base   = \
279
185
        (const char *) &context->buffer[context->previous_offset];
280
185
    context->record_length = context->offset - context->previous_offset;
281
282
185
    return FLB_EVENT_DECODER_SUCCESS;
283
185
}
284
285
int flb_log_event_decoder_get_last_result(struct flb_log_event_decoder *context)
286
67.4k
{
287
67.4k
    if (context->last_result == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA &&
288
67.4k
        context->offset == context->length) {
289
260
        context->last_result = FLB_EVENT_DECODER_SUCCESS;
290
260
    }
291
292
67.4k
    return context->last_result;
293
67.4k
}
294
295
int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
296
                               struct flb_log_event *event)
297
67.6k
{
298
67.6k
    int ret;
299
67.6k
    int result;
300
67.6k
    int record_type;
301
67.6k
    size_t previous_offset;
302
303
67.6k
    if (context == NULL) {
304
0
        return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
305
0
    }
306
67.6k
    if (context->length == 0) {
307
0
        context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
308
0
        return context->last_result;
309
0
    }
310
311
67.6k
    context->record_base = NULL;
312
67.6k
    context->record_length = 0;
313
314
67.6k
    if (event == NULL) {
315
0
        context->last_result = FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT;
316
0
        return context->last_result;
317
0
    }
318
319
67.6k
    previous_offset = context->offset;
320
67.6k
    result = msgpack_unpack_next(&context->unpacked_event,
321
67.6k
                                 context->buffer,
322
67.6k
                                 context->length,
323
67.6k
                                 &context->offset);
324
325
67.6k
    if (result == MSGPACK_UNPACK_CONTINUE) {
326
260
        context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
327
260
        return context->last_result;
328
260
    }
329
67.3k
    else if (result != MSGPACK_UNPACK_SUCCESS) {
330
26
        context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
331
26
        return context->last_result;
332
26
    }
333
334
67.3k
    context->previous_offset = previous_offset;
335
67.3k
    context->last_result = flb_event_decoder_decode_object(context,
336
67.3k
                                                           event,
337
67.3k
                                                           &context->unpacked_event.data);
338
339
67.3k
    if (context->last_result == FLB_EVENT_DECODER_SUCCESS) {
340
        /* get log event type */
341
185
        ret = flb_log_event_decoder_get_record_type(event, &record_type);
342
185
        if (ret != 0) {
343
7
            context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
344
7
            return context->last_result;
345
7
        }
346
347
        /*
348
         * if we hava a group type record and the caller don't want groups, just
349
         * skip this record and move to the next one.
350
         */
351
178
        if (record_type != FLB_LOG_EVENT_NORMAL && !context->read_groups) {
352
0
            return flb_log_event_decoder_next(context, event);
353
0
        }
354
178
    }
355
356
67.3k
    return context->last_result;
357
67.3k
}
358
359
int flb_log_event_decoder_get_record_type(struct flb_log_event *event, int32_t *type)
360
185
{
361
185
    int32_t s;
362
363
185
    s = (int32_t) event->timestamp.tm.tv_sec;
364
365
185
    if (s >= 0) {
366
176
        *type = FLB_LOG_EVENT_NORMAL;
367
176
        return 0;
368
176
    }
369
9
    else if (s == FLB_LOG_EVENT_GROUP_START) {
370
2
        *type = FLB_LOG_EVENT_GROUP_START;
371
2
        return 0;
372
2
    }
373
7
    else if (s == FLB_LOG_EVENT_GROUP_END) {
374
0
        *type = FLB_LOG_EVENT_GROUP_END;
375
0
        return 0;
376
0
    }
377
378
7
    return -1;
379
185
}
380
381
const char *flb_log_event_decoder_get_error_description(int error_code)
382
134k
{
383
134k
    const char *ret;
384
385
134k
    switch (error_code) {
386
0
    case FLB_EVENT_DECODER_SUCCESS:
387
0
        ret = "Success";
388
0
        break;
389
390
0
    case FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE:
391
0
        ret = "Initialization failure";
392
0
        break;
393
394
0
    case FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT:
395
0
        ret = "Invalid context";
396
0
        break;
397
398
0
    case FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT:
399
0
        ret = "Invalid argument";
400
0
        break;
401
402
133k
    case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE:
403
133k
        ret = "Wrong root type";
404
133k
        break;
405
406
1.08k
    case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE:
407
1.08k
        ret = "Wrong root size";
408
1.08k
        break;
409
410
0
    case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_TYPE:
411
0
        ret = "Wrong header type";
412
0
        break;
413
414
23
    case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE:
415
23
        ret = "Wrong header size";
416
23
        break;
417
418
83
    case FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE:
419
83
        ret = "Wrong timestamp type";
420
83
        break;
421
422
0
    case FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE:
423
0
        ret = "Wrong metadata type";
424
0
        break;
425
426
26
    case FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE:
427
26
        ret = "Wrong body type";
428
26
        break;
429
430
65
    case FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE:
431
65
        ret = "Deserialization failure";
432
65
        break;
433
434
0
    case FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA:
435
0
        ret = "Insufficient data";
436
0
        break;
437
438
0
    default:
439
0
        ret = "Unknown error";
440
134k
    }
441
134k
    return ret;
442
134k
}