/src/fluent-bit/src/flb_log_event_decoder.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_log_event_decoder.h> |
21 | | #include <fluent-bit/flb_byteswap.h> |
22 | | #include <fluent-bit/flb_compat.h> |
23 | | |
24 | 69.7k | static int create_empty_map(struct flb_log_event_decoder *context) { |
25 | 69.7k | msgpack_packer packer; |
26 | 69.7k | msgpack_sbuffer buffer; |
27 | 69.7k | int result; |
28 | 69.7k | size_t offset; |
29 | | |
30 | 69.7k | result = FLB_EVENT_DECODER_SUCCESS; |
31 | | |
32 | 69.7k | context->empty_map = NULL; |
33 | | |
34 | 69.7k | msgpack_sbuffer_init(&buffer); |
35 | 69.7k | msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); |
36 | | |
37 | 69.7k | result = msgpack_pack_map(&packer, 0); |
38 | | |
39 | 69.7k | if (result != 0) { |
40 | 0 | result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE; |
41 | 0 | } |
42 | 69.7k | else { |
43 | 69.7k | offset = 0; |
44 | | |
45 | 69.7k | msgpack_unpacked_init(&context->unpacked_empty_map); |
46 | | |
47 | 69.7k | result = msgpack_unpack_next(&context->unpacked_empty_map, |
48 | 69.7k | buffer.data, |
49 | 69.7k | buffer.size, |
50 | 69.7k | &offset); |
51 | | |
52 | 69.7k | if (result != MSGPACK_UNPACK_SUCCESS) { |
53 | 0 | result = FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE; |
54 | 0 | } |
55 | 69.7k | else { |
56 | 69.7k | context->empty_map = &context->unpacked_empty_map.data; |
57 | | |
58 | 69.7k | result = FLB_EVENT_DECODER_SUCCESS; |
59 | 69.7k | } |
60 | 69.7k | } |
61 | | |
62 | 69.7k | msgpack_sbuffer_destroy(&buffer); |
63 | | |
64 | 69.7k | return result; |
65 | 69.7k | } |
66 | | |
67 | | void flb_log_event_decoder_reset(struct flb_log_event_decoder *context, |
68 | | char *input_buffer, |
69 | | size_t input_length) |
70 | 69.7k | { |
71 | 69.7k | context->offset = 0; |
72 | 69.7k | context->buffer = input_buffer; |
73 | 69.7k | context->length = input_length; |
74 | 69.7k | context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA; |
75 | | |
76 | 69.7k | msgpack_unpacked_destroy(&context->unpacked_event); |
77 | 69.7k | msgpack_unpacked_init(&context->unpacked_event); |
78 | 69.7k | } |
79 | | |
80 | | int flb_log_event_decoder_read_groups(struct flb_log_event_decoder *context, |
81 | | int read_groups) |
82 | 0 | { |
83 | 0 | if (context == NULL) { |
84 | 0 | return -1; |
85 | 0 | } |
86 | | |
87 | 0 | if (read_groups != FLB_TRUE && read_groups != FLB_FALSE) { |
88 | 0 | return -1; |
89 | 0 | } |
90 | 0 | context->read_groups = read_groups; |
91 | 0 | return 0; |
92 | 0 | } |
93 | | |
94 | | int flb_log_event_decoder_init(struct flb_log_event_decoder *context, |
95 | | char *input_buffer, |
96 | | size_t input_length) |
97 | 69.7k | { |
98 | 69.7k | if (context == NULL) { |
99 | 0 | return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT; |
100 | 0 | } |
101 | | |
102 | 69.7k | memset(context, 0, sizeof(struct flb_log_event_decoder)); |
103 | | |
104 | 69.7k | context->dynamically_allocated = FLB_FALSE; |
105 | 69.7k | context->initialized = FLB_TRUE; |
106 | 69.7k | context->read_groups = FLB_TRUE; |
107 | | |
108 | 69.7k | flb_log_event_decoder_reset(context, input_buffer, input_length); |
109 | | |
110 | 69.7k | return create_empty_map(context); |
111 | 69.7k | } |
112 | | |
113 | | struct flb_log_event_decoder *flb_log_event_decoder_create( |
114 | | char *input_buffer, |
115 | | size_t input_length) |
116 | 0 | { |
117 | 0 | struct flb_log_event_decoder *context; |
118 | 0 | int result; |
119 | |
|
120 | 0 | context = (struct flb_log_event_decoder *) \ |
121 | 0 | flb_calloc(1, sizeof(struct flb_log_event_decoder)); |
122 | |
|
123 | 0 | result = flb_log_event_decoder_init(context, |
124 | 0 | input_buffer, |
125 | 0 | input_length); |
126 | |
|
127 | 0 | if (context != NULL) { |
128 | 0 | context->dynamically_allocated = FLB_TRUE; |
129 | 0 | if (result != FLB_EVENT_DECODER_SUCCESS) { |
130 | 0 | flb_log_event_decoder_destroy(context); |
131 | 0 | context = NULL; |
132 | 0 | } |
133 | 0 | } |
134 | |
|
135 | 0 | return context; |
136 | 0 | } |
137 | | |
138 | | void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context) |
139 | 69.7k | { |
140 | 69.7k | int dynamically_allocated; |
141 | | |
142 | 69.7k | if (context != NULL) { |
143 | 69.7k | if (context->initialized) { |
144 | 69.7k | msgpack_unpacked_destroy(&context->unpacked_empty_map); |
145 | 69.7k | msgpack_unpacked_destroy(&context->unpacked_event); |
146 | 69.7k | } |
147 | | |
148 | 69.7k | dynamically_allocated = context->dynamically_allocated; |
149 | | |
150 | 69.7k | memset(context, 0, sizeof(struct flb_log_event_decoder)); |
151 | | |
152 | | /* This might look silly and with most of the codebase including |
153 | | * this module as context it might be but just in case we choose |
154 | | * to stray away from the assumption of FLB_FALSE being zero and |
155 | | * FLB_TRUE being one in favor of explicitly comparing variables to |
156 | | * the the constants I will leave this here. |
157 | | */ |
158 | 69.7k | context->initialized = FLB_FALSE; |
159 | | |
160 | 69.7k | if (dynamically_allocated) { |
161 | 0 | flb_free(context); |
162 | 0 | } |
163 | 69.7k | } |
164 | 69.7k | } |
165 | | |
166 | | int flb_log_event_decoder_decode_timestamp(msgpack_object *input, |
167 | | struct flb_time *output) |
168 | 185 | { |
169 | 185 | flb_time_zero(output); |
170 | | |
171 | 185 | if (input->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { |
172 | 130 | output->tm.tv_sec = input->via.u64; |
173 | 130 | } |
174 | 55 | else if(input->type == MSGPACK_OBJECT_FLOAT) { |
175 | 18 | output->tm.tv_sec = input->via.f64; |
176 | 18 | output->tm.tv_nsec = ((input->via.f64 - output->tm.tv_sec) * 1000000000); |
177 | 18 | } |
178 | 37 | else if(input->type == MSGPACK_OBJECT_EXT) { |
179 | 37 | if (input->via.ext.type != 0 || input->via.ext.size != 8) { |
180 | 0 | return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE; |
181 | 0 | } |
182 | | |
183 | 37 | output->tm.tv_sec = |
184 | 37 | (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER( |
185 | 37 | FLB_ALIGNED_DWORD_READ( |
186 | 37 | (unsigned char *) &input->via.ext.ptr[0])); |
187 | | |
188 | 37 | output->tm.tv_nsec = |
189 | 37 | (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER( |
190 | 37 | FLB_ALIGNED_DWORD_READ( |
191 | 37 | (unsigned char *) &input->via.ext.ptr[4])); |
192 | 37 | } |
193 | 0 | else { |
194 | 0 | return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE; |
195 | 0 | } |
196 | | |
197 | 185 | return FLB_EVENT_DECODER_SUCCESS; |
198 | 185 | } |
199 | | |
200 | | int flb_event_decoder_decode_object(struct flb_log_event_decoder *context, |
201 | | struct flb_log_event *event, |
202 | | msgpack_object *input) |
203 | 67.3k | { |
204 | 67.3k | msgpack_object *timestamp; |
205 | 67.3k | msgpack_object *metadata; |
206 | 67.3k | int result; |
207 | 67.3k | int format; |
208 | 67.3k | msgpack_object *header; |
209 | 67.3k | msgpack_object *body; |
210 | 67.3k | msgpack_object *root; |
211 | | |
212 | 67.3k | memset(event, 0, sizeof(struct flb_log_event)); |
213 | | |
214 | | /* Ensure that the root element is a 2 element array*/ |
215 | 67.3k | root = input; |
216 | | |
217 | 67.3k | if (root->type != MSGPACK_OBJECT_ARRAY) { |
218 | 66.5k | return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE; |
219 | 66.5k | } |
220 | | |
221 | 798 | if (root->via.array.size != \ |
222 | 798 | FLB_LOG_EVENT_EXPECTED_ROOT_ELEMENT_COUNT) { |
223 | 545 | return FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE; |
224 | 545 | } |
225 | | |
226 | 253 | header = &root->via.array.ptr[0]; |
227 | | |
228 | | /* Determine if the first element is the header or |
229 | | * a legacy timestamp (int, float or ext). |
230 | | */ |
231 | 253 | if (header->type == MSGPACK_OBJECT_ARRAY) { |
232 | 52 | if (header->via.array.size != \ |
233 | 52 | FLB_LOG_EVENT_EXPECTED_HEADER_ELEMENT_COUNT) { |
234 | 12 | return FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE; |
235 | 12 | } |
236 | | |
237 | 40 | timestamp = &header->via.array.ptr[0]; |
238 | 40 | metadata = &header->via.array.ptr[1]; |
239 | | |
240 | 40 | format = FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2; |
241 | 40 | } |
242 | 201 | else { |
243 | 201 | header = NULL; |
244 | 201 | timestamp = &root->via.array.ptr[0]; |
245 | 201 | metadata = context->empty_map; |
246 | | |
247 | 201 | format = FLB_LOG_EVENT_FORMAT_FORWARD; |
248 | 201 | } |
249 | | |
250 | 241 | if (timestamp->type != MSGPACK_OBJECT_POSITIVE_INTEGER && |
251 | 241 | timestamp->type != MSGPACK_OBJECT_FLOAT && |
252 | 241 | timestamp->type != MSGPACK_OBJECT_EXT) { |
253 | 43 | return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE; |
254 | 43 | } |
255 | | |
256 | 198 | if (metadata->type != MSGPACK_OBJECT_MAP) { |
257 | 0 | return FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE; |
258 | 0 | } |
259 | | |
260 | 198 | body = &root->via.array.ptr[1]; |
261 | | |
262 | 198 | if (body->type != MSGPACK_OBJECT_MAP) { |
263 | 13 | return FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE; |
264 | 13 | } |
265 | | |
266 | 185 | result = flb_log_event_decoder_decode_timestamp(timestamp, &event->timestamp); |
267 | | |
268 | 185 | if (result != FLB_EVENT_DECODER_SUCCESS) { |
269 | 0 | return result; |
270 | 0 | } |
271 | | |
272 | 185 | event->raw_timestamp = timestamp; |
273 | 185 | event->metadata = metadata; |
274 | 185 | event->format = format; |
275 | 185 | event->body = body; |
276 | 185 | event->root = root; |
277 | | |
278 | 185 | context->record_base = \ |
279 | 185 | (const char *) &context->buffer[context->previous_offset]; |
280 | 185 | context->record_length = context->offset - context->previous_offset; |
281 | | |
282 | 185 | return FLB_EVENT_DECODER_SUCCESS; |
283 | 185 | } |
284 | | |
285 | | int flb_log_event_decoder_get_last_result(struct flb_log_event_decoder *context) |
286 | 67.4k | { |
287 | 67.4k | if (context->last_result == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA && |
288 | 67.4k | context->offset == context->length) { |
289 | 260 | context->last_result = FLB_EVENT_DECODER_SUCCESS; |
290 | 260 | } |
291 | | |
292 | 67.4k | return context->last_result; |
293 | 67.4k | } |
294 | | |
295 | | int flb_log_event_decoder_next(struct flb_log_event_decoder *context, |
296 | | struct flb_log_event *event) |
297 | 67.6k | { |
298 | 67.6k | int ret; |
299 | 67.6k | int result; |
300 | 67.6k | int record_type; |
301 | 67.6k | size_t previous_offset; |
302 | | |
303 | 67.6k | if (context == NULL) { |
304 | 0 | return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT; |
305 | 0 | } |
306 | 67.6k | if (context->length == 0) { |
307 | 0 | context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA; |
308 | 0 | return context->last_result; |
309 | 0 | } |
310 | | |
311 | 67.6k | context->record_base = NULL; |
312 | 67.6k | context->record_length = 0; |
313 | | |
314 | 67.6k | if (event == NULL) { |
315 | 0 | context->last_result = FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT; |
316 | 0 | return context->last_result; |
317 | 0 | } |
318 | | |
319 | 67.6k | previous_offset = context->offset; |
320 | 67.6k | result = msgpack_unpack_next(&context->unpacked_event, |
321 | 67.6k | context->buffer, |
322 | 67.6k | context->length, |
323 | 67.6k | &context->offset); |
324 | | |
325 | 67.6k | if (result == MSGPACK_UNPACK_CONTINUE) { |
326 | 260 | context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA; |
327 | 260 | return context->last_result; |
328 | 260 | } |
329 | 67.3k | else if (result != MSGPACK_UNPACK_SUCCESS) { |
330 | 26 | context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; |
331 | 26 | return context->last_result; |
332 | 26 | } |
333 | | |
334 | 67.3k | context->previous_offset = previous_offset; |
335 | 67.3k | context->last_result = flb_event_decoder_decode_object(context, |
336 | 67.3k | event, |
337 | 67.3k | &context->unpacked_event.data); |
338 | | |
339 | 67.3k | if (context->last_result == FLB_EVENT_DECODER_SUCCESS) { |
340 | | /* get log event type */ |
341 | 185 | ret = flb_log_event_decoder_get_record_type(event, &record_type); |
342 | 185 | if (ret != 0) { |
343 | 7 | context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; |
344 | 7 | return context->last_result; |
345 | 7 | } |
346 | | |
347 | | /* |
348 | | * if we hava a group type record and the caller don't want groups, just |
349 | | * skip this record and move to the next one. |
350 | | */ |
351 | 178 | if (record_type != FLB_LOG_EVENT_NORMAL && !context->read_groups) { |
352 | 0 | return flb_log_event_decoder_next(context, event); |
353 | 0 | } |
354 | 178 | } |
355 | | |
356 | 67.3k | return context->last_result; |
357 | 67.3k | } |
358 | | |
359 | | int flb_log_event_decoder_get_record_type(struct flb_log_event *event, int32_t *type) |
360 | 185 | { |
361 | 185 | int32_t s; |
362 | | |
363 | 185 | s = (int32_t) event->timestamp.tm.tv_sec; |
364 | | |
365 | 185 | if (s >= 0) { |
366 | 176 | *type = FLB_LOG_EVENT_NORMAL; |
367 | 176 | return 0; |
368 | 176 | } |
369 | 9 | else if (s == FLB_LOG_EVENT_GROUP_START) { |
370 | 2 | *type = FLB_LOG_EVENT_GROUP_START; |
371 | 2 | return 0; |
372 | 2 | } |
373 | 7 | else if (s == FLB_LOG_EVENT_GROUP_END) { |
374 | 0 | *type = FLB_LOG_EVENT_GROUP_END; |
375 | 0 | return 0; |
376 | 0 | } |
377 | | |
378 | 7 | return -1; |
379 | 185 | } |
380 | | |
381 | | const char *flb_log_event_decoder_get_error_description(int error_code) |
382 | 134k | { |
383 | 134k | const char *ret; |
384 | | |
385 | 134k | switch (error_code) { |
386 | 0 | case FLB_EVENT_DECODER_SUCCESS: |
387 | 0 | ret = "Success"; |
388 | 0 | break; |
389 | | |
390 | 0 | case FLB_EVENT_DECODER_ERROR_INITIALIZATION_FAILURE: |
391 | 0 | ret = "Initialization failure"; |
392 | 0 | break; |
393 | | |
394 | 0 | case FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT: |
395 | 0 | ret = "Invalid context"; |
396 | 0 | break; |
397 | | |
398 | 0 | case FLB_EVENT_DECODER_ERROR_INVALID_ARGUMENT: |
399 | 0 | ret = "Invalid argument"; |
400 | 0 | break; |
401 | | |
402 | 133k | case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE: |
403 | 133k | ret = "Wrong root type"; |
404 | 133k | break; |
405 | | |
406 | 1.08k | case FLB_EVENT_DECODER_ERROR_WRONG_ROOT_SIZE: |
407 | 1.08k | ret = "Wrong root size"; |
408 | 1.08k | break; |
409 | | |
410 | 0 | case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_TYPE: |
411 | 0 | ret = "Wrong header type"; |
412 | 0 | break; |
413 | | |
414 | 23 | case FLB_EVENT_DECODER_ERROR_WRONG_HEADER_SIZE: |
415 | 23 | ret = "Wrong header size"; |
416 | 23 | break; |
417 | | |
418 | 83 | case FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE: |
419 | 83 | ret = "Wrong timestamp type"; |
420 | 83 | break; |
421 | | |
422 | 0 | case FLB_EVENT_DECODER_ERROR_WRONG_METADATA_TYPE: |
423 | 0 | ret = "Wrong metadata type"; |
424 | 0 | break; |
425 | | |
426 | 26 | case FLB_EVENT_DECODER_ERROR_WRONG_BODY_TYPE: |
427 | 26 | ret = "Wrong body type"; |
428 | 26 | break; |
429 | | |
430 | 65 | case FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE: |
431 | 65 | ret = "Deserialization failure"; |
432 | 65 | break; |
433 | | |
434 | 0 | case FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA: |
435 | 0 | ret = "Insufficient data"; |
436 | 0 | break; |
437 | | |
438 | 0 | default: |
439 | 0 | ret = "Unknown error"; |
440 | 134k | } |
441 | 134k | return ret; |
442 | 134k | } |