/src/fluent-bit/src/flb_pack.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2022 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <stdlib.h> |
21 | | #include <string.h> |
22 | | |
23 | | #include <fluent-bit/flb_info.h> |
24 | | #include <fluent-bit/flb_mem.h> |
25 | | #include <fluent-bit/flb_sds.h> |
26 | | #include <fluent-bit/flb_error.h> |
27 | | #include <fluent-bit/flb_utils.h> |
28 | | #include <fluent-bit/flb_sds.h> |
29 | | #include <fluent-bit/flb_time.h> |
30 | | #include <fluent-bit/flb_pack.h> |
31 | | #include <fluent-bit/flb_unescape.h> |
32 | | |
33 | | /* cmetrics */ |
34 | | #include <cmetrics/cmetrics.h> |
35 | | #include <cmetrics/cmt_decode_msgpack.h> |
36 | | #include <cmetrics/cmt_encode_text.h> |
37 | | |
38 | | #include <msgpack.h> |
39 | | #include <math.h> |
40 | | #include <jsmn/jsmn.h> |
41 | | |
42 | 0 | #define try_to_write_str flb_utils_write_str |
43 | | |
44 | | static int convert_nan_to_null = FLB_FALSE; |
45 | | |
46 | 0 | static int flb_pack_set_null_as_nan(int b) { |
47 | 0 | if (b == FLB_TRUE || b == FLB_FALSE) { |
48 | 0 | convert_nan_to_null = b; |
49 | 0 | } |
50 | 0 | return convert_nan_to_null; |
51 | 0 | } |
52 | | |
53 | | int flb_json_tokenise(const char *js, size_t len, |
54 | | struct flb_pack_state *state) |
55 | 0 | { |
56 | 0 | int ret; |
57 | 0 | int new_tokens = 256; |
58 | 0 | size_t old_size; |
59 | 0 | size_t new_size; |
60 | 0 | void *tmp; |
61 | |
|
62 | 0 | ret = jsmn_parse(&state->parser, js, len, |
63 | 0 | state->tokens, state->tokens_size); |
64 | 0 | while (ret == JSMN_ERROR_NOMEM) { |
65 | | /* Get current size of the array in bytes */ |
66 | 0 | old_size = state->tokens_size * sizeof(jsmntok_t); |
67 | | |
68 | | /* New size: add capacity for new 256 entries */ |
69 | 0 | new_size = old_size + (sizeof(jsmntok_t) * new_tokens); |
70 | |
|
71 | 0 | tmp = flb_realloc(state->tokens, new_size); |
72 | 0 | if (!tmp) { |
73 | 0 | flb_errno(); |
74 | 0 | return -1; |
75 | 0 | } |
76 | 0 | state->tokens = tmp; |
77 | 0 | state->tokens_size += new_tokens; |
78 | |
|
79 | 0 | ret = jsmn_parse(&state->parser, js, len, |
80 | 0 | state->tokens, state->tokens_size); |
81 | 0 | } |
82 | | |
83 | 0 | if (ret == JSMN_ERROR_INVAL) { |
84 | 0 | return FLB_ERR_JSON_INVAL; |
85 | 0 | } |
86 | | |
87 | 0 | if (ret == JSMN_ERROR_PART) { |
88 | | /* This is a partial JSON message, just stop */ |
89 | 0 | flb_trace("[json tokenise] incomplete"); |
90 | 0 | return FLB_ERR_JSON_PART; |
91 | 0 | } |
92 | | |
93 | 0 | state->tokens_count += ret; |
94 | 0 | return 0; |
95 | 0 | } |
96 | | |
97 | | static inline int is_float(const char *buf, int len) |
98 | 0 | { |
99 | 0 | const char *end = buf + len; |
100 | 0 | const char *p = buf; |
101 | |
|
102 | 0 | while (p <= end) { |
103 | 0 | if (*p == 'e' && p < end && *(p + 1) == '-') { |
104 | 0 | return 1; |
105 | 0 | } |
106 | 0 | else if (*p == '.') { |
107 | 0 | return 1; |
108 | 0 | } |
109 | 0 | p++; |
110 | 0 | } |
111 | | |
112 | 0 | return 0; |
113 | 0 | } |
114 | | |
115 | | /* Sanitize incoming JSON string */ |
116 | | static inline int pack_string_token(struct flb_pack_state *state, |
117 | | const char *str, int len, |
118 | | msgpack_packer *pck) |
119 | 0 | { |
120 | 0 | int s; |
121 | 0 | int out_len; |
122 | 0 | char *tmp; |
123 | 0 | char *out_buf; |
124 | |
|
125 | 0 | if (state->buf_size < len + 1) { |
126 | 0 | s = len + 1; |
127 | 0 | tmp = flb_realloc(state->buf_data, s); |
128 | 0 | if (!tmp) { |
129 | 0 | flb_errno(); |
130 | 0 | return -1; |
131 | 0 | } |
132 | 0 | else { |
133 | 0 | state->buf_data = tmp; |
134 | 0 | state->buf_size = s; |
135 | 0 | } |
136 | 0 | } |
137 | 0 | out_buf = state->buf_data; |
138 | | |
139 | | /* Always decode any UTF-8 or special characters */ |
140 | 0 | out_len = flb_unescape_string_utf8(str, len, out_buf); |
141 | | |
142 | | /* Pack decoded text */ |
143 | 0 | msgpack_pack_str(pck, out_len); |
144 | 0 | msgpack_pack_str_body(pck, out_buf, out_len); |
145 | |
|
146 | 0 | return out_len; |
147 | 0 | } |
148 | | |
149 | | /* Receive a tokenized JSON message and convert it to MsgPack */ |
150 | | static char *tokens_to_msgpack(struct flb_pack_state *state, |
151 | | const char *js, |
152 | | int *out_size, int *last_byte, |
153 | | int *out_records) |
154 | 0 | { |
155 | 0 | int i; |
156 | 0 | int flen; |
157 | 0 | int arr_size; |
158 | 0 | int records = 0; |
159 | 0 | const char *p; |
160 | 0 | char *buf; |
161 | 0 | const jsmntok_t *t; |
162 | 0 | msgpack_packer pck; |
163 | 0 | msgpack_sbuffer sbuf; |
164 | 0 | jsmntok_t *tokens; |
165 | |
|
166 | 0 | tokens = state->tokens; |
167 | 0 | arr_size = state->tokens_count; |
168 | |
|
169 | 0 | if (arr_size == 0) { |
170 | 0 | return NULL; |
171 | 0 | } |
172 | | |
173 | | /* initialize buffers */ |
174 | 0 | msgpack_sbuffer_init(&sbuf); |
175 | 0 | msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); |
176 | |
|
177 | 0 | for (i = 0; i < arr_size ; i++) { |
178 | 0 | t = &tokens[i]; |
179 | |
|
180 | 0 | if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { |
181 | 0 | break; |
182 | 0 | } |
183 | | |
184 | 0 | if (t->parent == -1) { |
185 | 0 | *last_byte = t->end; |
186 | 0 | records++; |
187 | 0 | } |
188 | |
|
189 | 0 | flen = (t->end - t->start); |
190 | 0 | switch (t->type) { |
191 | 0 | case JSMN_OBJECT: |
192 | 0 | msgpack_pack_map(&pck, t->size); |
193 | 0 | break; |
194 | 0 | case JSMN_ARRAY: |
195 | 0 | msgpack_pack_array(&pck, t->size); |
196 | 0 | break; |
197 | 0 | case JSMN_STRING: |
198 | 0 | pack_string_token(state, js + t->start, flen, &pck); |
199 | 0 | break; |
200 | 0 | case JSMN_PRIMITIVE: |
201 | 0 | p = js + t->start; |
202 | 0 | if (*p == 'f') { |
203 | 0 | msgpack_pack_false(&pck); |
204 | 0 | } |
205 | 0 | else if (*p == 't') { |
206 | 0 | msgpack_pack_true(&pck); |
207 | 0 | } |
208 | 0 | else if (*p == 'n') { |
209 | 0 | msgpack_pack_nil(&pck); |
210 | 0 | } |
211 | 0 | else { |
212 | 0 | if (is_float(p, flen)) { |
213 | 0 | msgpack_pack_double(&pck, atof(p)); |
214 | 0 | } |
215 | 0 | else { |
216 | 0 | msgpack_pack_int64(&pck, atoll(p)); |
217 | 0 | } |
218 | 0 | } |
219 | 0 | break; |
220 | 0 | case JSMN_UNDEFINED: |
221 | 0 | msgpack_sbuffer_destroy(&sbuf); |
222 | 0 | return NULL; |
223 | 0 | } |
224 | 0 | } |
225 | | |
226 | 0 | *out_size = sbuf.size; |
227 | 0 | *out_records = records; |
228 | 0 | buf = sbuf.data; |
229 | |
|
230 | 0 | return buf; |
231 | 0 | } |
232 | | |
233 | | /* |
234 | | * It parse a JSON string and convert it to MessagePack format, this packer is |
235 | | * useful when a complete JSON message exists, otherwise it will fail until |
236 | | * the message is complete. |
237 | | * |
238 | | * This routine do not keep a state in the parser, do not use it for big |
239 | | * JSON messages. |
240 | | */ |
241 | | static int pack_json_to_msgpack(const char *js, size_t len, char **buffer, |
242 | | size_t *size, int *root_type, int *records) |
243 | 0 | { |
244 | 0 | int ret = -1; |
245 | 0 | int n_records; |
246 | 0 | int out; |
247 | 0 | int last; |
248 | 0 | char *buf = NULL; |
249 | 0 | struct flb_pack_state state; |
250 | |
|
251 | 0 | ret = flb_pack_state_init(&state); |
252 | 0 | if (ret != 0) { |
253 | 0 | return -1; |
254 | 0 | } |
255 | 0 | ret = flb_json_tokenise(js, len, &state); |
256 | 0 | if (ret != 0) { |
257 | 0 | ret = -1; |
258 | 0 | goto flb_pack_json_end; |
259 | 0 | } |
260 | | |
261 | 0 | if (state.tokens_count == 0) { |
262 | 0 | ret = -1; |
263 | 0 | goto flb_pack_json_end; |
264 | 0 | } |
265 | | |
266 | 0 | buf = tokens_to_msgpack(&state, js, &out, &last, &n_records); |
267 | 0 | if (!buf) { |
268 | 0 | ret = -1; |
269 | 0 | goto flb_pack_json_end; |
270 | 0 | } |
271 | | |
272 | 0 | *root_type = state.tokens[0].type; |
273 | 0 | *size = out; |
274 | 0 | *buffer = buf; |
275 | 0 | *records = n_records; |
276 | 0 | ret = 0; |
277 | |
|
278 | 0 | flb_pack_json_end: |
279 | 0 | flb_pack_state_reset(&state); |
280 | 0 | return ret; |
281 | 0 | } |
282 | | |
283 | | /* Pack unlimited serialized JSON messages into msgpack */ |
284 | | int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size, |
285 | | int *root_type) |
286 | 0 | { |
287 | 0 | int records; |
288 | |
|
289 | 0 | return pack_json_to_msgpack(js, len, buffer, size, root_type, &records); |
290 | 0 | } |
291 | | |
292 | | /* |
293 | | * Pack unlimited serialized JSON messages into msgpack, finally it writes on |
294 | | * 'out_records' the number of messages. |
295 | | */ |
296 | | int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size, |
297 | | int *root_type, int *out_records) |
298 | 0 | { |
299 | 0 | return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records); |
300 | 0 | } |
301 | | |
302 | | /* Initialize a JSON packer state */ |
303 | | int flb_pack_state_init(struct flb_pack_state *s) |
304 | 0 | { |
305 | 0 | int tokens = 256; |
306 | 0 | size_t size = 256; |
307 | |
|
308 | 0 | jsmn_init(&s->parser); |
309 | |
|
310 | 0 | size = sizeof(jsmntok_t) * tokens; |
311 | 0 | s->tokens = flb_malloc(size); |
312 | 0 | if (!s->tokens) { |
313 | 0 | flb_errno(); |
314 | 0 | return -1; |
315 | 0 | } |
316 | 0 | s->tokens_size = tokens; |
317 | 0 | s->tokens_count = 0; |
318 | 0 | s->last_byte = 0; |
319 | 0 | s->multiple = FLB_FALSE; |
320 | |
|
321 | 0 | s->buf_data = flb_malloc(size); |
322 | 0 | if (!s->buf_data) { |
323 | 0 | flb_errno(); |
324 | 0 | flb_free(s->tokens); |
325 | 0 | s->tokens = NULL; |
326 | 0 | return -1; |
327 | 0 | } |
328 | 0 | s->buf_size = size; |
329 | 0 | s->buf_len = 0; |
330 | |
|
331 | 0 | return 0; |
332 | 0 | } |
333 | | |
334 | | void flb_pack_state_reset(struct flb_pack_state *s) |
335 | 0 | { |
336 | 0 | flb_free(s->tokens); |
337 | 0 | s->tokens = NULL; |
338 | 0 | s->tokens_size = 0; |
339 | 0 | s->tokens_count = 0; |
340 | 0 | s->last_byte = 0; |
341 | 0 | s->buf_size = 0; |
342 | 0 | flb_free(s->buf_data); |
343 | 0 | s->buf_data = NULL; |
344 | 0 | } |
345 | | |
346 | | |
347 | | /* |
348 | | * It parse a JSON string and convert it to MessagePack format. The main |
349 | | * difference of this function and the previous flb_pack_json() is that it |
350 | | * keeps a parser and tokens state, allowing to process big messages and |
351 | | * resume the parsing process instead of start from zero. |
352 | | */ |
353 | | int flb_pack_json_state(const char *js, size_t len, |
354 | | char **buffer, int *size, |
355 | | struct flb_pack_state *state) |
356 | 0 | { |
357 | 0 | int ret; |
358 | 0 | int out; |
359 | 0 | int delim = 0; |
360 | 0 | int last = 0; |
361 | 0 | int records; |
362 | 0 | char *buf; |
363 | 0 | jsmntok_t *t; |
364 | |
|
365 | 0 | ret = flb_json_tokenise(js, len, state); |
366 | 0 | state->multiple = FLB_TRUE; |
367 | 0 | if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) { |
368 | | /* |
369 | | * If the caller enabled 'multiple' flag, it means that the incoming |
370 | | * JSON message may have multiple messages concatenated and likely |
371 | | * the last one is only incomplete. |
372 | | * |
373 | | * The following routine aims to determinate how many JSON messages |
374 | | * are OK in the array of tokens, if any, process them and adjust |
375 | | * the JSMN context/buffers. |
376 | | */ |
377 | | |
378 | | /* |
379 | | * jsmn_parse updates jsmn_parser members. (state->parser) |
380 | | * A member 'toknext' points next incomplete object token. |
381 | | * We use toknext - 1 as an index of last member of complete JSON. |
382 | | */ |
383 | 0 | int i; |
384 | 0 | int found = 0; |
385 | |
|
386 | 0 | if (state->parser.toknext == 0) { |
387 | 0 | return ret; |
388 | 0 | } |
389 | | |
390 | 0 | for (i = (int)state->parser.toknext - 1; i >= 1; i--) { |
391 | 0 | t = &state->tokens[i]; |
392 | |
|
393 | 0 | if (t->parent == -1 && (t->end != 0)) { |
394 | 0 | found++; |
395 | 0 | delim = i; |
396 | 0 | break; |
397 | 0 | } |
398 | 0 | } |
399 | |
|
400 | 0 | if (found == 0) { |
401 | 0 | return ret; /* FLB_ERR_JSON_PART */ |
402 | 0 | } |
403 | 0 | state->tokens_count += delim; |
404 | 0 | } |
405 | 0 | else if (ret != 0) { |
406 | 0 | return ret; |
407 | 0 | } |
408 | | |
409 | 0 | if (state->tokens_count == 0 || state->tokens == NULL) { |
410 | 0 | state->last_byte = last; |
411 | 0 | return FLB_ERR_JSON_INVAL; |
412 | 0 | } |
413 | | |
414 | 0 | buf = tokens_to_msgpack(state, js, &out, &last, &records); |
415 | 0 | if (!buf) { |
416 | 0 | return -1; |
417 | 0 | } |
418 | | |
419 | 0 | *size = out; |
420 | 0 | *buffer = buf; |
421 | 0 | state->last_byte = last; |
422 | |
|
423 | 0 | return 0; |
424 | 0 | } |
425 | | |
426 | | static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result) |
427 | 0 | { |
428 | 0 | msgpack_object o; |
429 | 0 | msgpack_object *obj; |
430 | 0 | msgpack_object root; |
431 | 0 | struct flb_time tms; |
432 | |
|
433 | 0 | root = result.data; |
434 | 0 | if (root.type != MSGPACK_OBJECT_ARRAY) { |
435 | 0 | return -1; |
436 | 0 | } |
437 | | |
438 | | /* decode expected timestamp only (integer, float or ext) */ |
439 | 0 | o = root.via.array.ptr[0]; |
440 | 0 | if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER && |
441 | 0 | o.type != MSGPACK_OBJECT_FLOAT && |
442 | 0 | o.type != MSGPACK_OBJECT_EXT) { |
443 | 0 | return -1; |
444 | 0 | } |
445 | | |
446 | | /* This is a Fluent Bit record, just do the proper unpacking/printing */ |
447 | 0 | flb_time_pop_from_msgpack(&tms, &result, &obj); |
448 | |
|
449 | 0 | fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt, |
450 | 0 | (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec); |
451 | 0 | msgpack_object_print(stdout, *obj); |
452 | 0 | fprintf(stdout, "]\n"); |
453 | |
|
454 | 0 | return 0; |
455 | 0 | } |
456 | | |
457 | | void flb_pack_print(const char *data, size_t bytes) |
458 | 0 | { |
459 | 0 | int ret; |
460 | 0 | msgpack_unpacked result; |
461 | 0 | size_t off = 0, cnt = 0; |
462 | |
|
463 | 0 | msgpack_unpacked_init(&result); |
464 | 0 | while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { |
465 | | /* Check if we are processing an internal Fluent Bit record */ |
466 | 0 | ret = pack_print_fluent_record(cnt, result); |
467 | 0 | if (ret == 0) { |
468 | 0 | continue; |
469 | 0 | } |
470 | | |
471 | 0 | printf("[%zd] ", cnt++); |
472 | 0 | msgpack_object_print(stdout, result.data); |
473 | 0 | printf("\n"); |
474 | 0 | } |
475 | 0 | msgpack_unpacked_destroy(&result); |
476 | 0 | } |
477 | | |
478 | | void flb_pack_print_metrics(const char *data, size_t bytes) |
479 | 0 | { |
480 | 0 | int ret; |
481 | 0 | size_t off = 0; |
482 | 0 | cfl_sds_t text; |
483 | 0 | struct cmt *cmt = NULL; |
484 | | |
485 | | /* get cmetrics context */ |
486 | 0 | ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); |
487 | 0 | if (ret != 0) { |
488 | 0 | flb_error("could not process metrics payload"); |
489 | 0 | return; |
490 | 0 | } |
491 | | |
492 | | /* convert to text representation */ |
493 | 0 | text = cmt_encode_text_create(cmt); |
494 | | |
495 | | /* destroy cmt context */ |
496 | 0 | cmt_destroy(cmt); |
497 | |
|
498 | 0 | printf("%s", text); |
499 | 0 | fflush(stdout); |
500 | |
|
501 | 0 | cmt_encode_text_destroy(text); |
502 | 0 | } |
503 | | |
504 | | static inline int try_to_write(char *buf, int *off, size_t left, |
505 | | const char *str, size_t str_len) |
506 | 0 | { |
507 | 0 | if (str_len <= 0){ |
508 | 0 | str_len = strlen(str); |
509 | 0 | } |
510 | 0 | if (left <= *off+str_len) { |
511 | 0 | return FLB_FALSE; |
512 | 0 | } |
513 | 0 | memcpy(buf+*off, str, str_len); |
514 | 0 | *off += str_len; |
515 | 0 | return FLB_TRUE; |
516 | 0 | } |
517 | | |
518 | | |
519 | | /* |
520 | | * Check if a key exists in the map using the 'offset' as an index to define |
521 | | * which element needs to start looking from |
522 | | */ |
523 | | static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset) |
524 | 0 | { |
525 | 0 | int i; |
526 | 0 | msgpack_object p; |
527 | |
|
528 | 0 | if (key.type != MSGPACK_OBJECT_STR) { |
529 | 0 | return FLB_FALSE; |
530 | 0 | } |
531 | | |
532 | 0 | for (i = offset; i < map.via.map.size; i++) { |
533 | 0 | p = map.via.map.ptr[i].key; |
534 | 0 | if (p.type != MSGPACK_OBJECT_STR) { |
535 | 0 | continue; |
536 | 0 | } |
537 | | |
538 | 0 | if (key.via.str.size != p.via.str.size) { |
539 | 0 | continue; |
540 | 0 | } |
541 | | |
542 | 0 | if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) { |
543 | 0 | return FLB_TRUE; |
544 | 0 | } |
545 | 0 | } |
546 | | |
547 | 0 | return FLB_FALSE; |
548 | 0 | } |
549 | | |
550 | | static int msgpack2json(char *buf, int *off, size_t left, |
551 | | const msgpack_object *o) |
552 | 0 | { |
553 | 0 | int i; |
554 | 0 | int dup; |
555 | 0 | int ret = FLB_FALSE; |
556 | 0 | int loop; |
557 | 0 | int packed; |
558 | |
|
559 | 0 | switch(o->type) { |
560 | 0 | case MSGPACK_OBJECT_NIL: |
561 | 0 | ret = try_to_write(buf, off, left, "null", 4); |
562 | 0 | break; |
563 | | |
564 | 0 | case MSGPACK_OBJECT_BOOLEAN: |
565 | 0 | ret = try_to_write(buf, off, left, |
566 | 0 | (o->via.boolean ? "true":"false"),0); |
567 | |
|
568 | 0 | break; |
569 | | |
570 | 0 | case MSGPACK_OBJECT_POSITIVE_INTEGER: |
571 | 0 | { |
572 | 0 | char temp[32] = {0}; |
573 | 0 | i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64); |
574 | 0 | ret = try_to_write(buf, off, left, temp, i); |
575 | 0 | } |
576 | 0 | break; |
577 | | |
578 | 0 | case MSGPACK_OBJECT_NEGATIVE_INTEGER: |
579 | 0 | { |
580 | 0 | char temp[32] = {0}; |
581 | 0 | i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64); |
582 | 0 | ret = try_to_write(buf, off, left, temp, i); |
583 | 0 | } |
584 | 0 | break; |
585 | 0 | case MSGPACK_OBJECT_FLOAT32: |
586 | 0 | case MSGPACK_OBJECT_FLOAT64: |
587 | 0 | { |
588 | 0 | char temp[512] = {0}; |
589 | 0 | if (o->via.f64 == (double)(long long int)o->via.f64) { |
590 | 0 | i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64); |
591 | 0 | } |
592 | 0 | else if (convert_nan_to_null && isnan(o->via.f64) ) { |
593 | 0 | i = snprintf(temp, sizeof(temp)-1, "null"); |
594 | 0 | } |
595 | 0 | else { |
596 | 0 | i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64); |
597 | 0 | } |
598 | 0 | ret = try_to_write(buf, off, left, temp, i); |
599 | 0 | } |
600 | 0 | break; |
601 | | |
602 | 0 | case MSGPACK_OBJECT_STR: |
603 | 0 | if (try_to_write(buf, off, left, "\"", 1) && |
604 | 0 | (o->via.str.size > 0 ? |
605 | 0 | try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size) |
606 | 0 | : 1/* nothing to do */) && |
607 | 0 | try_to_write(buf, off, left, "\"", 1)) { |
608 | 0 | ret = FLB_TRUE; |
609 | 0 | } |
610 | 0 | break; |
611 | | |
612 | 0 | case MSGPACK_OBJECT_BIN: |
613 | 0 | if (try_to_write(buf, off, left, "\"", 1) && |
614 | 0 | (o->via.bin.size > 0 ? |
615 | 0 | try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size) |
616 | 0 | : 1 /* nothing to do */) && |
617 | 0 | try_to_write(buf, off, left, "\"", 1)) { |
618 | 0 | ret = FLB_TRUE; |
619 | 0 | } |
620 | 0 | break; |
621 | | |
622 | 0 | case MSGPACK_OBJECT_EXT: |
623 | 0 | if (!try_to_write(buf, off, left, "\"", 1)) { |
624 | 0 | goto msg2json_end; |
625 | 0 | } |
626 | | /* ext body. fortmat is similar to printf(1) */ |
627 | 0 | { |
628 | 0 | char temp[32] = {0}; |
629 | 0 | int len; |
630 | 0 | loop = o->via.ext.size; |
631 | 0 | for(i=0; i<loop; i++) { |
632 | 0 | len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]); |
633 | 0 | if (!try_to_write(buf, off, left, temp, len)) { |
634 | 0 | goto msg2json_end; |
635 | 0 | } |
636 | 0 | } |
637 | 0 | } |
638 | 0 | if (!try_to_write(buf, off, left, "\"", 1)) { |
639 | 0 | goto msg2json_end; |
640 | 0 | } |
641 | 0 | ret = FLB_TRUE; |
642 | 0 | break; |
643 | | |
644 | 0 | case MSGPACK_OBJECT_ARRAY: |
645 | 0 | loop = o->via.array.size; |
646 | |
|
647 | 0 | if (!try_to_write(buf, off, left, "[", 1)) { |
648 | 0 | goto msg2json_end; |
649 | 0 | } |
650 | 0 | if (loop != 0) { |
651 | 0 | msgpack_object* p = o->via.array.ptr; |
652 | 0 | if (!msgpack2json(buf, off, left, p)) { |
653 | 0 | goto msg2json_end; |
654 | 0 | } |
655 | 0 | for (i=1; i<loop; i++) { |
656 | 0 | if (!try_to_write(buf, off, left, ",", 1) || |
657 | 0 | !msgpack2json(buf, off, left, p+i)) { |
658 | 0 | goto msg2json_end; |
659 | 0 | } |
660 | 0 | } |
661 | 0 | } |
662 | | |
663 | 0 | ret = try_to_write(buf, off, left, "]", 1); |
664 | 0 | break; |
665 | | |
666 | 0 | case MSGPACK_OBJECT_MAP: |
667 | 0 | loop = o->via.map.size; |
668 | 0 | if (!try_to_write(buf, off, left, "{", 1)) { |
669 | 0 | goto msg2json_end; |
670 | 0 | } |
671 | 0 | if (loop != 0) { |
672 | 0 | msgpack_object k; |
673 | 0 | msgpack_object_kv *p = o->via.map.ptr; |
674 | |
|
675 | 0 | packed = 0; |
676 | 0 | dup = FLB_FALSE; |
677 | |
|
678 | 0 | k = o->via.map.ptr[0].key; |
679 | 0 | for (i = 0; i < loop; i++) { |
680 | 0 | k = o->via.map.ptr[i].key; |
681 | 0 | dup = key_exists_in_map(k, *o, i + 1); |
682 | 0 | if (dup == FLB_TRUE) { |
683 | 0 | continue; |
684 | 0 | } |
685 | | |
686 | 0 | if (packed > 0) { |
687 | 0 | if (!try_to_write(buf, off, left, ",", 1)) { |
688 | 0 | goto msg2json_end; |
689 | 0 | } |
690 | 0 | } |
691 | | |
692 | 0 | if ( |
693 | 0 | !msgpack2json(buf, off, left, &(p+i)->key) || |
694 | 0 | !try_to_write(buf, off, left, ":", 1) || |
695 | 0 | !msgpack2json(buf, off, left, &(p+i)->val) ) { |
696 | 0 | goto msg2json_end; |
697 | 0 | } |
698 | 0 | packed++; |
699 | 0 | } |
700 | 0 | } |
701 | | |
702 | 0 | ret = try_to_write(buf, off, left, "}", 1); |
703 | 0 | break; |
704 | | |
705 | 0 | default: |
706 | 0 | flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type); |
707 | 0 | } |
708 | | |
709 | 0 | msg2json_end: |
710 | 0 | return ret; |
711 | 0 | } |
712 | | |
713 | | /** |
714 | | * convert msgpack to JSON string. |
715 | | * This API is similar to snprintf. |
716 | | * |
717 | | * @param json_str The buffer to fill JSON string. |
718 | | * @param json_size The size of json_str. |
719 | | * @param data The msgpack_unpacked data. |
720 | | * @return success ? a number characters filled : negative value |
721 | | */ |
722 | | int flb_msgpack_to_json(char *json_str, size_t json_size, |
723 | | const msgpack_object *obj) |
724 | 0 | { |
725 | 0 | int ret = -1; |
726 | 0 | int off = 0; |
727 | |
|
728 | 0 | if (json_str == NULL || obj == NULL) { |
729 | 0 | return -1; |
730 | 0 | } |
731 | | |
732 | 0 | ret = msgpack2json(json_str, &off, json_size - 1, obj); |
733 | 0 | json_str[off] = '\0'; |
734 | 0 | return ret ? off: ret; |
735 | 0 | } |
736 | | |
737 | | flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size) |
738 | 0 | { |
739 | 0 | int ret; |
740 | 0 | size_t off = 0; |
741 | 0 | size_t out_size; |
742 | 0 | size_t realloc_size; |
743 | |
|
744 | 0 | msgpack_unpacked result; |
745 | 0 | msgpack_object *root; |
746 | 0 | flb_sds_t out_buf; |
747 | 0 | flb_sds_t tmp_buf; |
748 | | |
749 | | /* buffer size strategy */ |
750 | 0 | out_size = in_size * FLB_MSGPACK_TO_JSON_INIT_BUFFER_SIZE; |
751 | 0 | realloc_size = in_size * FLB_MSGPACK_TO_JSON_REALLOC_BUFFER_SIZE; |
752 | 0 | if (realloc_size < 256) { |
753 | 0 | realloc_size = 256; |
754 | 0 | } |
755 | |
|
756 | 0 | out_buf = flb_sds_create_size(out_size); |
757 | 0 | if (!out_buf) { |
758 | 0 | flb_errno(); |
759 | 0 | return NULL; |
760 | 0 | } |
761 | | |
762 | 0 | msgpack_unpacked_init(&result); |
763 | 0 | ret = msgpack_unpack_next(&result, in_buf, in_size, &off); |
764 | 0 | if (ret != MSGPACK_UNPACK_SUCCESS) { |
765 | 0 | flb_sds_destroy(out_buf); |
766 | 0 | msgpack_unpacked_destroy(&result); |
767 | 0 | return NULL; |
768 | 0 | } |
769 | | |
770 | 0 | root = &result.data; |
771 | 0 | while (1) { |
772 | 0 | ret = flb_msgpack_to_json(out_buf, out_size, root); |
773 | 0 | if (ret <= 0) { |
774 | 0 | tmp_buf = flb_sds_increase(out_buf, realloc_size); |
775 | 0 | if (tmp_buf) { |
776 | 0 | out_buf = tmp_buf; |
777 | 0 | out_size += realloc_size; |
778 | 0 | } |
779 | 0 | else { |
780 | 0 | flb_errno(); |
781 | 0 | flb_sds_destroy(out_buf); |
782 | 0 | msgpack_unpacked_destroy(&result); |
783 | 0 | return NULL; |
784 | 0 | } |
785 | 0 | } |
786 | 0 | else { |
787 | 0 | break; |
788 | 0 | } |
789 | 0 | } |
790 | | |
791 | 0 | msgpack_unpacked_destroy(&result); |
792 | 0 | flb_sds_len_set(out_buf, ret); |
793 | |
|
794 | 0 | return out_buf; |
795 | 0 | } |
796 | | |
797 | | /* |
798 | | * Given a 'format' string type, return it integer representation. This |
799 | | * is used by output plugins that uses pack functions to convert |
800 | | * msgpack records to JSON. |
801 | | */ |
802 | | int flb_pack_to_json_format_type(const char *str) |
803 | 0 | { |
804 | 0 | if (strcasecmp(str, "msgpack") == 0) { |
805 | 0 | return FLB_PACK_JSON_FORMAT_NONE; |
806 | 0 | } |
807 | 0 | else if (strcasecmp(str, "json") == 0) { |
808 | 0 | return FLB_PACK_JSON_FORMAT_JSON; |
809 | 0 | } |
810 | 0 | else if (strcasecmp(str, "json_stream") == 0) { |
811 | 0 | return FLB_PACK_JSON_FORMAT_STREAM; |
812 | 0 | } |
813 | 0 | else if (strcasecmp(str, "json_lines") == 0) { |
814 | 0 | return FLB_PACK_JSON_FORMAT_LINES; |
815 | 0 | } |
816 | | |
817 | 0 | return -1; |
818 | 0 | } |
819 | | |
820 | | /* Given a 'date string type', return it integer representation */ |
821 | | int flb_pack_to_json_date_type(const char *str) |
822 | 0 | { |
823 | 0 | if (strcasecmp(str, "double") == 0) { |
824 | 0 | return FLB_PACK_JSON_DATE_DOUBLE; |
825 | 0 | } |
826 | 0 | else if (strcasecmp(str, "java_sql_timestamp") == 0) { |
827 | 0 | return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP; |
828 | 0 | } |
829 | 0 | else if (strcasecmp(str, "iso8601") == 0) { |
830 | 0 | return FLB_PACK_JSON_DATE_ISO8601; |
831 | 0 | } |
832 | 0 | else if (strcasecmp(str, "epoch") == 0) { |
833 | 0 | return FLB_PACK_JSON_DATE_EPOCH; |
834 | 0 | } |
835 | 0 | else if (strcasecmp(str, "epoch_ms") == 0 || |
836 | 0 | strcasecmp(str, "epoch_millis") == 0 || |
837 | 0 | strcasecmp(str, "epoch_milliseconds") == 0) { |
838 | 0 | return FLB_PACK_JSON_DATE_EPOCH_MS; |
839 | 0 | } |
840 | | |
841 | 0 | return -1; |
842 | 0 | } |
843 | | |
844 | | |
845 | | static int msgpack_pack_formatted_datetime(flb_sds_t out_buf, char time_formatted[], int max_len, |
846 | | msgpack_packer* tmp_pck, struct flb_time* tms, |
847 | | const char *date_format, |
848 | | const char *time_format) |
849 | 0 | { |
850 | 0 | int len; |
851 | 0 | size_t s; |
852 | 0 | struct tm tm; |
853 | |
|
854 | 0 | gmtime_r(&tms->tm.tv_sec, &tm); |
855 | |
|
856 | 0 | s = strftime(time_formatted, max_len, |
857 | 0 | date_format, &tm); |
858 | 0 | if (!s) { |
859 | 0 | flb_debug("strftime failed in flb_pack_msgpack_to_json_format"); |
860 | 0 | return 1; |
861 | 0 | } |
862 | | |
863 | | /* Format the time, use microsecond precision not nanoseconds */ |
864 | 0 | max_len -= s; |
865 | 0 | len = snprintf(&time_formatted[s], |
866 | 0 | max_len, |
867 | 0 | time_format, |
868 | 0 | (uint64_t) tms->tm.tv_nsec / 1000); |
869 | 0 | if (len >= max_len) { |
870 | 0 | flb_debug("snprintf: %d >= %d in flb_pack_msgpack_to_json_format", len, max_len); |
871 | 0 | return 2; |
872 | 0 | } |
873 | 0 | s += len; |
874 | 0 | msgpack_pack_str(tmp_pck, s); |
875 | 0 | msgpack_pack_str_body(tmp_pck, time_formatted, s); |
876 | 0 | return 0; |
877 | 0 | } |
878 | | |
879 | | flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, |
880 | | int json_format, int date_format, |
881 | | flb_sds_t date_key) |
882 | 0 | { |
883 | 0 | int i; |
884 | 0 | int ok = MSGPACK_UNPACK_SUCCESS; |
885 | 0 | int records = 0; |
886 | 0 | int map_size; |
887 | 0 | size_t off = 0; |
888 | 0 | char time_formatted[38]; |
889 | 0 | flb_sds_t out_tmp; |
890 | 0 | flb_sds_t out_js; |
891 | 0 | flb_sds_t out_buf = NULL; |
892 | 0 | msgpack_unpacked result; |
893 | 0 | msgpack_object root; |
894 | 0 | msgpack_object map; |
895 | 0 | msgpack_sbuffer tmp_sbuf; |
896 | 0 | msgpack_packer tmp_pck; |
897 | 0 | msgpack_object *obj; |
898 | 0 | msgpack_object *k; |
899 | 0 | msgpack_object *v; |
900 | 0 | struct flb_time tms; |
901 | | |
902 | | /* For json lines and streams mode we need a pre-allocated buffer */ |
903 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_LINES || |
904 | 0 | json_format == FLB_PACK_JSON_FORMAT_STREAM) { |
905 | 0 | out_buf = flb_sds_create_size(bytes + bytes / 4); |
906 | 0 | if (!out_buf) { |
907 | 0 | flb_errno(); |
908 | 0 | return NULL; |
909 | 0 | } |
910 | 0 | } |
911 | | |
912 | | /* Create temporary msgpack buffer */ |
913 | 0 | msgpack_sbuffer_init(&tmp_sbuf); |
914 | 0 | msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); |
915 | | |
916 | | /* |
917 | | * If the format is the original msgpack style of one big array, |
918 | | * registrate the array, otherwise is not necessary. FYI, original format: |
919 | | * |
920 | | * [ |
921 | | * [timestamp, map], |
922 | | * [timestamp, map], |
923 | | * [T, M]... |
924 | | * ] |
925 | | */ |
926 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_JSON) { |
927 | 0 | records = flb_mp_count(data, bytes); |
928 | 0 | if (records <= 0) { |
929 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
930 | 0 | return NULL; |
931 | 0 | } |
932 | 0 | msgpack_pack_array(&tmp_pck, records); |
933 | 0 | } |
934 | | |
935 | 0 | msgpack_unpacked_init(&result); |
936 | 0 | while (msgpack_unpack_next(&result, data, bytes, &off) == ok) { |
937 | | /* Each array must have two entries: time and record */ |
938 | 0 | root = result.data; |
939 | 0 | if (root.type != MSGPACK_OBJECT_ARRAY) { |
940 | 0 | continue; |
941 | 0 | } |
942 | 0 | if (root.via.array.size != 2) { |
943 | 0 | continue; |
944 | 0 | } |
945 | | |
946 | | /* Unpack time */ |
947 | 0 | flb_time_pop_from_msgpack(&tms, &result, &obj); |
948 | | |
949 | | /* Get the record/map */ |
950 | 0 | map = root.via.array.ptr[1]; |
951 | 0 | if (map.type != MSGPACK_OBJECT_MAP) { |
952 | 0 | continue; |
953 | 0 | } |
954 | 0 | map_size = map.via.map.size; |
955 | |
|
956 | 0 | if (date_key != NULL) { |
957 | 0 | msgpack_pack_map(&tmp_pck, map_size + 1); |
958 | 0 | } |
959 | 0 | else { |
960 | 0 | msgpack_pack_map(&tmp_pck, map_size); |
961 | 0 | } |
962 | |
|
963 | 0 | if (date_key != NULL) { |
964 | | /* Append date key */ |
965 | 0 | msgpack_pack_str(&tmp_pck, flb_sds_len(date_key)); |
966 | 0 | msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key)); |
967 | | |
968 | | /* Append date value */ |
969 | 0 | switch (date_format) { |
970 | 0 | case FLB_PACK_JSON_DATE_DOUBLE: |
971 | 0 | msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms)); |
972 | 0 | break; |
973 | 0 | case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP: |
974 | 0 | if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, |
975 | 0 | FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) { |
976 | 0 | flb_sds_destroy(out_buf); |
977 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
978 | 0 | msgpack_unpacked_destroy(&result); |
979 | 0 | return NULL; |
980 | 0 | } |
981 | 0 | break; |
982 | 0 | case FLB_PACK_JSON_DATE_ISO8601: |
983 | 0 | if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms, |
984 | 0 | FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) { |
985 | 0 | flb_sds_destroy(out_buf); |
986 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
987 | 0 | msgpack_unpacked_destroy(&result); |
988 | 0 | return NULL; |
989 | 0 | } |
990 | 0 | break; |
991 | 0 | case FLB_PACK_JSON_DATE_EPOCH: |
992 | 0 | msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec)); |
993 | 0 | break; |
994 | 0 | case FLB_PACK_JSON_DATE_EPOCH_MS: |
995 | 0 | msgpack_pack_uint64(&tmp_pck, flb_time_to_millisec(&tms)); |
996 | 0 | break; |
997 | 0 | } |
998 | 0 | } |
999 | | |
1000 | | /* Append remaining keys/values */ |
1001 | 0 | for (i = 0; i < map_size; i++) { |
1002 | 0 | k = &map.via.map.ptr[i].key; |
1003 | 0 | v = &map.via.map.ptr[i].val; |
1004 | 0 | msgpack_pack_object(&tmp_pck, *k); |
1005 | 0 | msgpack_pack_object(&tmp_pck, *v); |
1006 | 0 | } |
1007 | | |
1008 | | /* |
1009 | | * If the format is the original msgpack style, just continue since |
1010 | | * we don't care about separator or JSON convertion at this point. |
1011 | | */ |
1012 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_JSON) { |
1013 | 0 | continue; |
1014 | 0 | } |
1015 | | |
1016 | | /* |
1017 | | * Here we handle two types of records concatenation: |
1018 | | * |
1019 | | * FLB_PACK_JSON_FORMAT_LINES: add breakline (\n) after each record |
1020 | | * |
1021 | | * |
1022 | | * {'ts':abc,'k1':1} |
1023 | | * {'ts':abc,'k1':2} |
1024 | | * {N} |
1025 | | * |
1026 | | * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g: |
1027 | | * |
1028 | | * {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N} |
1029 | | */ |
1030 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_LINES || |
1031 | 0 | json_format == FLB_PACK_JSON_FORMAT_STREAM) { |
1032 | | |
1033 | | /* Encode current record into JSON in a temporary variable */ |
1034 | 0 | out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); |
1035 | 0 | if (!out_js) { |
1036 | 0 | flb_sds_destroy(out_buf); |
1037 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
1038 | 0 | msgpack_unpacked_destroy(&result); |
1039 | 0 | return NULL; |
1040 | 0 | } |
1041 | | |
1042 | | /* |
1043 | | * One map record has been converted, now append it to the |
1044 | | * outgoing out_buf sds variable. |
1045 | | */ |
1046 | 0 | out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js)); |
1047 | 0 | if (!out_tmp) { |
1048 | 0 | flb_sds_destroy(out_js); |
1049 | 0 | flb_sds_destroy(out_buf); |
1050 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
1051 | 0 | msgpack_unpacked_destroy(&result); |
1052 | 0 | return NULL; |
1053 | 0 | } |
1054 | | |
1055 | | /* Release temporary json sds buffer */ |
1056 | 0 | flb_sds_destroy(out_js); |
1057 | | |
1058 | | /* If a realloc happened, check the returned address */ |
1059 | 0 | if (out_tmp != out_buf) { |
1060 | 0 | out_buf = out_tmp; |
1061 | 0 | } |
1062 | | |
1063 | | /* Append the breakline only for json lines mode */ |
1064 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_LINES) { |
1065 | 0 | out_tmp = flb_sds_cat(out_buf, "\n", 1); |
1066 | 0 | if (!out_tmp) { |
1067 | 0 | flb_sds_destroy(out_buf); |
1068 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
1069 | 0 | msgpack_unpacked_destroy(&result); |
1070 | 0 | return NULL; |
1071 | 0 | } |
1072 | 0 | if (out_tmp != out_buf) { |
1073 | 0 | out_buf = out_tmp; |
1074 | 0 | } |
1075 | 0 | } |
1076 | 0 | msgpack_sbuffer_clear(&tmp_sbuf); |
1077 | 0 | } |
1078 | 0 | } |
1079 | | |
1080 | | /* Release the unpacker */ |
1081 | 0 | msgpack_unpacked_destroy(&result); |
1082 | | |
1083 | | /* Format to JSON */ |
1084 | 0 | if (json_format == FLB_PACK_JSON_FORMAT_JSON) { |
1085 | 0 | out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); |
1086 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
1087 | |
|
1088 | 0 | if (!out_buf) { |
1089 | 0 | return NULL; |
1090 | 0 | } |
1091 | 0 | } |
1092 | 0 | else { |
1093 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
1094 | 0 | } |
1095 | | |
1096 | 0 | if (out_buf && flb_sds_len(out_buf) == 0) { |
1097 | 0 | flb_sds_destroy(out_buf); |
1098 | 0 | return NULL; |
1099 | 0 | } |
1100 | | |
1101 | 0 | return out_buf; |
1102 | 0 | } |
1103 | | |
1104 | | /** |
1105 | | * convert msgpack to JSON string. |
1106 | | * This API is similar to snprintf. |
1107 | | * @param size Estimated length of json str. |
1108 | | * @param data The msgpack_unpacked data. |
1109 | | * @return success ? allocated json str ptr : NULL |
1110 | | */ |
1111 | | char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj) |
1112 | 0 | { |
1113 | 0 | int ret; |
1114 | 0 | char *buf = NULL; |
1115 | 0 | char *tmp; |
1116 | |
|
1117 | 0 | if (obj == NULL) { |
1118 | 0 | return NULL; |
1119 | 0 | } |
1120 | | |
1121 | 0 | if (size <= 0) { |
1122 | 0 | size = 128; |
1123 | 0 | } |
1124 | |
|
1125 | 0 | buf = flb_malloc(size); |
1126 | 0 | if (!buf) { |
1127 | 0 | flb_errno(); |
1128 | 0 | return NULL; |
1129 | 0 | } |
1130 | | |
1131 | 0 | while (1) { |
1132 | 0 | ret = flb_msgpack_to_json(buf, size, obj); |
1133 | 0 | if (ret <= 0) { |
1134 | | /* buffer is small. retry.*/ |
1135 | 0 | size += 128; |
1136 | 0 | tmp = flb_realloc(buf, size); |
1137 | 0 | if (tmp) { |
1138 | 0 | buf = tmp; |
1139 | 0 | } |
1140 | 0 | else { |
1141 | 0 | flb_free(buf); |
1142 | 0 | flb_errno(); |
1143 | 0 | return NULL; |
1144 | 0 | } |
1145 | 0 | } |
1146 | 0 | else { |
1147 | 0 | break; |
1148 | 0 | } |
1149 | 0 | } |
1150 | | |
1151 | 0 | return buf; |
1152 | 0 | } |
1153 | | |
1154 | | int flb_pack_time_now(msgpack_packer *pck) |
1155 | 0 | { |
1156 | 0 | int ret; |
1157 | 0 | struct flb_time t; |
1158 | |
|
1159 | 0 | flb_time_get(&t); |
1160 | 0 | ret = flb_time_append_to_msgpack(&t, pck, 0); |
1161 | |
|
1162 | 0 | return ret; |
1163 | 0 | } |
1164 | | |
1165 | | int flb_msgpack_expand_map(char *map_data, size_t map_size, |
1166 | | msgpack_object_kv **kv_arr, int kv_arr_len, |
1167 | | char** out_buf, int* out_size) |
1168 | 0 | { |
1169 | 0 | msgpack_sbuffer sbuf; |
1170 | 0 | msgpack_packer pck; |
1171 | 0 | msgpack_unpacked result; |
1172 | 0 | size_t off = 0; |
1173 | 0 | char *ret_buf; |
1174 | 0 | int map_num; |
1175 | 0 | int i; |
1176 | 0 | int len; |
1177 | |
|
1178 | 0 | if (map_data == NULL){ |
1179 | 0 | return -1; |
1180 | 0 | } |
1181 | | |
1182 | 0 | msgpack_unpacked_init(&result); |
1183 | 0 | if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) != |
1184 | 0 | MSGPACK_UNPACK_SUCCESS ) { |
1185 | 0 | msgpack_unpacked_destroy(&result); |
1186 | 0 | return -1; |
1187 | 0 | } |
1188 | 0 | if (result.data.type != MSGPACK_OBJECT_MAP) { |
1189 | 0 | msgpack_unpacked_destroy(&result); |
1190 | 0 | return -1; |
1191 | 0 | } |
1192 | | |
1193 | 0 | len = result.data.via.map.size; |
1194 | 0 | map_num = kv_arr_len + len; |
1195 | |
|
1196 | 0 | msgpack_sbuffer_init(&sbuf); |
1197 | 0 | msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write); |
1198 | 0 | msgpack_pack_map(&pck, map_num); |
1199 | |
|
1200 | 0 | for (i=0; i<len; i++) { |
1201 | 0 | msgpack_pack_object(&pck, result.data.via.map.ptr[i].key); |
1202 | 0 | msgpack_pack_object(&pck, result.data.via.map.ptr[i].val); |
1203 | 0 | } |
1204 | 0 | for (i=0; i<kv_arr_len; i++){ |
1205 | 0 | msgpack_pack_object(&pck, kv_arr[i]->key); |
1206 | 0 | msgpack_pack_object(&pck, kv_arr[i]->val); |
1207 | 0 | } |
1208 | 0 | msgpack_unpacked_destroy(&result); |
1209 | |
|
1210 | 0 | *out_size = sbuf.size; |
1211 | 0 | ret_buf = flb_malloc(sbuf.size); |
1212 | 0 | *out_buf = ret_buf; |
1213 | 0 | if (*out_buf == NULL) { |
1214 | 0 | flb_errno(); |
1215 | 0 | msgpack_sbuffer_destroy(&sbuf); |
1216 | 0 | return -1; |
1217 | 0 | } |
1218 | 0 | memcpy(*out_buf, sbuf.data, sbuf.size); |
1219 | 0 | msgpack_sbuffer_destroy(&sbuf); |
1220 | |
|
1221 | 0 | return 0; |
1222 | 0 | } |
1223 | | |
1224 | | int flb_pack_init(struct flb_config *config) |
1225 | 0 | { |
1226 | 0 | int ret; |
1227 | |
|
1228 | 0 | if (config == NULL) { |
1229 | 0 | return -1; |
1230 | 0 | } |
1231 | 0 | ret = flb_pack_set_null_as_nan(config->convert_nan_to_null); |
1232 | |
|
1233 | 0 | return ret; |
1234 | 0 | } |