Coverage Report

Created: 2023-03-10 06:37

/src/fluent-bit/src/flb_pack.c
Line
Count
Source (jump to first uncovered line)
1
/*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdlib.h>
21
#include <string.h>
22
23
#include <fluent-bit/flb_info.h>
24
#include <fluent-bit/flb_mem.h>
25
#include <fluent-bit/flb_sds.h>
26
#include <fluent-bit/flb_error.h>
27
#include <fluent-bit/flb_utils.h>
28
#include <fluent-bit/flb_sds.h>
29
#include <fluent-bit/flb_time.h>
30
#include <fluent-bit/flb_pack.h>
31
#include <fluent-bit/flb_unescape.h>
32
33
/* cmetrics */
34
#include <cmetrics/cmetrics.h>
35
#include <cmetrics/cmt_decode_msgpack.h>
36
#include <cmetrics/cmt_encode_text.h>
37
38
#include <msgpack.h>
39
#include <math.h>
40
#include <jsmn/jsmn.h>
41
42
0
#define try_to_write_str  flb_utils_write_str
43
44
static int convert_nan_to_null = FLB_FALSE;
45
46
0
static int flb_pack_set_null_as_nan(int b) {
47
0
    if (b == FLB_TRUE || b == FLB_FALSE) {
48
0
        convert_nan_to_null = b;
49
0
    }
50
0
    return convert_nan_to_null;
51
0
}
52
53
int flb_json_tokenise(const char *js, size_t len,
54
                      struct flb_pack_state *state)
55
0
{
56
0
    int ret;
57
0
    int new_tokens = 256;
58
0
    size_t old_size;
59
0
    size_t new_size;
60
0
    void *tmp;
61
62
0
    ret = jsmn_parse(&state->parser, js, len,
63
0
                     state->tokens, state->tokens_size);
64
0
    while (ret == JSMN_ERROR_NOMEM) {
65
        /* Get current size of the array in bytes */
66
0
        old_size = state->tokens_size * sizeof(jsmntok_t);
67
68
        /* New size: add capacity for new 256 entries */
69
0
        new_size = old_size + (sizeof(jsmntok_t) * new_tokens);
70
71
0
        tmp = flb_realloc(state->tokens, new_size);
72
0
        if (!tmp) {
73
0
            flb_errno();
74
0
            return -1;
75
0
        }
76
0
        state->tokens = tmp;
77
0
        state->tokens_size += new_tokens;
78
79
0
        ret = jsmn_parse(&state->parser, js, len,
80
0
                         state->tokens, state->tokens_size);
81
0
    }
82
83
0
    if (ret == JSMN_ERROR_INVAL) {
84
0
        return FLB_ERR_JSON_INVAL;
85
0
    }
86
87
0
    if (ret == JSMN_ERROR_PART) {
88
        /* This is a partial JSON message, just stop */
89
0
        flb_trace("[json tokenise] incomplete");
90
0
        return FLB_ERR_JSON_PART;
91
0
    }
92
93
0
    state->tokens_count += ret;
94
0
    return 0;
95
0
}
96
97
static inline int is_float(const char *buf, int len)
98
0
{
99
0
    const char *end = buf + len;
100
0
    const char *p = buf;
101
102
0
    while (p <= end) {
103
0
        if (*p == 'e' && p < end && *(p + 1) == '-') {
104
0
            return 1;
105
0
        }
106
0
        else if (*p == '.') {
107
0
            return 1;
108
0
        }
109
0
        p++;
110
0
    }
111
112
0
    return 0;
113
0
}
114
115
/* Sanitize incoming JSON string */
116
static inline int pack_string_token(struct flb_pack_state *state,
117
                                    const char *str, int len,
118
                                    msgpack_packer *pck)
119
0
{
120
0
    int s;
121
0
    int out_len;
122
0
    char *tmp;
123
0
    char *out_buf;
124
125
0
    if (state->buf_size < len + 1) {
126
0
        s = len + 1;
127
0
        tmp = flb_realloc(state->buf_data, s);
128
0
        if (!tmp) {
129
0
            flb_errno();
130
0
            return -1;
131
0
        }
132
0
        else {
133
0
            state->buf_data = tmp;
134
0
            state->buf_size = s;
135
0
        }
136
0
    }
137
0
    out_buf = state->buf_data;
138
139
    /* Always decode any UTF-8 or special characters */
140
0
    out_len = flb_unescape_string_utf8(str, len, out_buf);
141
142
    /* Pack decoded text */
143
0
    msgpack_pack_str(pck, out_len);
144
0
    msgpack_pack_str_body(pck, out_buf, out_len);
145
146
0
    return out_len;
147
0
}
148
149
/* Receive a tokenized JSON message and convert it to MsgPack */
150
static char *tokens_to_msgpack(struct flb_pack_state *state,
151
                               const char *js,
152
                               int *out_size, int *last_byte,
153
                               int *out_records)
154
0
{
155
0
    int i;
156
0
    int flen;
157
0
    int arr_size;
158
0
    int records = 0;
159
0
    const char *p;
160
0
    char *buf;
161
0
    const jsmntok_t *t;
162
0
    msgpack_packer pck;
163
0
    msgpack_sbuffer sbuf;
164
0
    jsmntok_t *tokens;
165
166
0
    tokens = state->tokens;
167
0
    arr_size = state->tokens_count;
168
169
0
    if (arr_size == 0) {
170
0
        return NULL;
171
0
    }
172
173
    /* initialize buffers */
174
0
    msgpack_sbuffer_init(&sbuf);
175
0
    msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
176
177
0
    for (i = 0; i < arr_size ; i++) {
178
0
        t = &tokens[i];
179
180
0
        if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
181
0
            break;
182
0
        }
183
184
0
        if (t->parent == -1) {
185
0
            *last_byte = t->end;
186
0
            records++;
187
0
        }
188
189
0
        flen = (t->end - t->start);
190
0
        switch (t->type) {
191
0
        case JSMN_OBJECT:
192
0
            msgpack_pack_map(&pck, t->size);
193
0
            break;
194
0
        case JSMN_ARRAY:
195
0
            msgpack_pack_array(&pck, t->size);
196
0
            break;
197
0
        case JSMN_STRING:
198
0
            pack_string_token(state, js + t->start, flen, &pck);
199
0
            break;
200
0
        case JSMN_PRIMITIVE:
201
0
            p = js + t->start;
202
0
            if (*p == 'f') {
203
0
                msgpack_pack_false(&pck);
204
0
            }
205
0
            else if (*p == 't') {
206
0
                msgpack_pack_true(&pck);
207
0
            }
208
0
            else if (*p == 'n') {
209
0
                msgpack_pack_nil(&pck);
210
0
            }
211
0
            else {
212
0
                if (is_float(p, flen)) {
213
0
                    msgpack_pack_double(&pck, atof(p));
214
0
                }
215
0
                else {
216
0
                    msgpack_pack_int64(&pck, atoll(p));
217
0
                }
218
0
            }
219
0
            break;
220
0
        case JSMN_UNDEFINED:
221
0
            msgpack_sbuffer_destroy(&sbuf);
222
0
            return NULL;
223
0
        }
224
0
    }
225
226
0
    *out_size = sbuf.size;
227
0
    *out_records = records;
228
0
    buf = sbuf.data;
229
230
0
    return buf;
231
0
}
232
233
/*
234
 * It parse a JSON string and convert it to MessagePack format, this packer is
235
 * useful when a complete JSON message exists, otherwise it will fail until
236
 * the message is complete.
237
 *
238
 * This routine do not keep a state in the parser, do not use it for big
239
 * JSON messages.
240
 */
241
static int pack_json_to_msgpack(const char *js, size_t len, char **buffer,
242
                                size_t *size, int *root_type, int *records)
243
0
{
244
0
    int ret = -1;
245
0
    int n_records;
246
0
    int out;
247
0
    int last;
248
0
    char *buf = NULL;
249
0
    struct flb_pack_state state;
250
251
0
    ret = flb_pack_state_init(&state);
252
0
    if (ret != 0) {
253
0
        return -1;
254
0
    }
255
0
    ret = flb_json_tokenise(js, len, &state);
256
0
    if (ret != 0) {
257
0
        ret = -1;
258
0
        goto flb_pack_json_end;
259
0
    }
260
261
0
    if (state.tokens_count == 0) {
262
0
        ret = -1;
263
0
        goto flb_pack_json_end;
264
0
    }
265
266
0
    buf = tokens_to_msgpack(&state, js, &out, &last, &n_records);
267
0
    if (!buf) {
268
0
        ret = -1;
269
0
        goto flb_pack_json_end;
270
0
    }
271
272
0
    *root_type = state.tokens[0].type;
273
0
    *size = out;
274
0
    *buffer = buf;
275
0
    *records = n_records;
276
0
    ret = 0;
277
278
0
 flb_pack_json_end:
279
0
    flb_pack_state_reset(&state);
280
0
    return ret;
281
0
}
282
283
/* Pack unlimited serialized JSON messages into msgpack */
284
int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
285
                  int *root_type)
286
0
{
287
0
    int records;
288
289
0
    return pack_json_to_msgpack(js, len, buffer, size, root_type, &records);
290
0
}
291
292
/*
293
 * Pack unlimited serialized JSON messages into msgpack, finally it writes on
294
 * 'out_records' the number of messages.
295
 */
296
int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
297
                       int *root_type, int *out_records)
298
0
{
299
0
    return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records);
300
0
}
301
302
/* Initialize a JSON packer state */
303
int flb_pack_state_init(struct flb_pack_state *s)
304
0
{
305
0
    int tokens = 256;
306
0
    size_t size = 256;
307
308
0
    jsmn_init(&s->parser);
309
310
0
    size = sizeof(jsmntok_t) * tokens;
311
0
    s->tokens = flb_malloc(size);
312
0
    if (!s->tokens) {
313
0
        flb_errno();
314
0
        return -1;
315
0
    }
316
0
    s->tokens_size   = tokens;
317
0
    s->tokens_count  = 0;
318
0
    s->last_byte     = 0;
319
0
    s->multiple      = FLB_FALSE;
320
321
0
    s->buf_data = flb_malloc(size);
322
0
    if (!s->buf_data) {
323
0
        flb_errno();
324
0
        flb_free(s->tokens);
325
0
        s->tokens = NULL;
326
0
        return -1;
327
0
    }
328
0
    s->buf_size = size;
329
0
    s->buf_len = 0;
330
331
0
    return 0;
332
0
}
333
334
void flb_pack_state_reset(struct flb_pack_state *s)
335
0
{
336
0
    flb_free(s->tokens);
337
0
    s->tokens = NULL;
338
0
    s->tokens_size  = 0;
339
0
    s->tokens_count = 0;
340
0
    s->last_byte    = 0;
341
0
    s->buf_size     = 0;
342
0
    flb_free(s->buf_data);
343
0
    s->buf_data = NULL;
344
0
}
345
346
347
/*
348
 * It parse a JSON string and convert it to MessagePack format. The main
349
 * difference of this function and the previous flb_pack_json() is that it
350
 * keeps a parser and tokens state, allowing to process big messages and
351
 * resume the parsing process instead of start from zero.
352
 */
353
int flb_pack_json_state(const char *js, size_t len,
354
                        char **buffer, int *size,
355
                        struct flb_pack_state *state)
356
0
{
357
0
    int ret;
358
0
    int out;
359
0
    int delim = 0;
360
0
    int last =  0;
361
0
    int records;
362
0
    char *buf;
363
0
    jsmntok_t *t;
364
365
0
    ret = flb_json_tokenise(js, len, state);
366
0
    state->multiple = FLB_TRUE;
367
0
    if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) {
368
        /*
369
         * If the caller enabled 'multiple' flag, it means that the incoming
370
         * JSON message may have multiple messages concatenated and likely
371
         * the last one is only incomplete.
372
         *
373
         * The following routine aims to determinate how many JSON messages
374
         * are OK in the array of tokens, if any, process them and adjust
375
         * the JSMN context/buffers.
376
         */
377
378
        /*
379
         * jsmn_parse updates jsmn_parser members. (state->parser)
380
         * A member 'toknext' points next incomplete object token.
381
         * We use toknext - 1 as an index of last member of complete JSON.
382
         */
383
0
        int i;
384
0
        int found = 0;
385
386
0
        if (state->parser.toknext == 0) {
387
0
            return ret;
388
0
        }
389
390
0
        for (i = (int)state->parser.toknext - 1; i >= 1; i--) {
391
0
            t = &state->tokens[i];
392
393
0
            if (t->parent == -1 && (t->end != 0)) {
394
0
                found++;
395
0
                delim = i;
396
0
                break;
397
0
            }
398
0
        }
399
400
0
        if (found == 0) {
401
0
            return ret; /* FLB_ERR_JSON_PART */
402
0
        }
403
0
        state->tokens_count += delim;
404
0
    }
405
0
    else if (ret != 0) {
406
0
        return ret;
407
0
    }
408
409
0
    if (state->tokens_count == 0 || state->tokens == NULL) {
410
0
        state->last_byte = last;
411
0
        return FLB_ERR_JSON_INVAL;
412
0
    }
413
414
0
    buf = tokens_to_msgpack(state, js, &out, &last, &records);
415
0
    if (!buf) {
416
0
        return -1;
417
0
    }
418
419
0
    *size = out;
420
0
    *buffer = buf;
421
0
    state->last_byte = last;
422
423
0
    return 0;
424
0
}
425
426
static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result)
427
0
{
428
0
    msgpack_object o;
429
0
    msgpack_object *obj;
430
0
    msgpack_object root;
431
0
    struct flb_time tms;
432
433
0
    root = result.data;
434
0
    if (root.type != MSGPACK_OBJECT_ARRAY) {
435
0
        return -1;
436
0
    }
437
438
    /* decode expected timestamp only (integer, float or ext) */
439
0
    o = root.via.array.ptr[0];
440
0
    if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
441
0
        o.type != MSGPACK_OBJECT_FLOAT &&
442
0
        o.type != MSGPACK_OBJECT_EXT) {
443
0
        return -1;
444
0
    }
445
446
    /* This is a Fluent Bit record, just do the proper unpacking/printing */
447
0
    flb_time_pop_from_msgpack(&tms, &result, &obj);
448
449
0
    fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt,
450
0
            (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec);
451
0
    msgpack_object_print(stdout, *obj);
452
0
    fprintf(stdout, "]\n");
453
454
0
    return 0;
455
0
}
456
457
void flb_pack_print(const char *data, size_t bytes)
458
0
{
459
0
    int ret;
460
0
    msgpack_unpacked result;
461
0
    size_t off = 0, cnt = 0;
462
463
0
    msgpack_unpacked_init(&result);
464
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
465
        /* Check if we are processing an internal Fluent Bit record */
466
0
        ret = pack_print_fluent_record(cnt, result);
467
0
        if (ret == 0) {
468
0
            continue;
469
0
        }
470
471
0
        printf("[%zd] ", cnt++);
472
0
        msgpack_object_print(stdout, result.data);
473
0
        printf("\n");
474
0
    }
475
0
    msgpack_unpacked_destroy(&result);
476
0
}
477
478
void flb_pack_print_metrics(const char *data, size_t bytes)
479
0
{
480
0
    int ret;
481
0
    size_t off = 0;
482
0
    cfl_sds_t text;
483
0
    struct cmt *cmt = NULL;
484
485
    /* get cmetrics context */
486
0
    ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
487
0
    if (ret != 0) {
488
0
        flb_error("could not process metrics payload");
489
0
        return;
490
0
    }
491
492
    /* convert to text representation */
493
0
    text = cmt_encode_text_create(cmt);
494
495
    /* destroy cmt context */
496
0
    cmt_destroy(cmt);
497
498
0
    printf("%s", text);
499
0
    fflush(stdout);
500
501
0
    cmt_encode_text_destroy(text);
502
0
}
503
504
static inline int try_to_write(char *buf, int *off, size_t left,
505
                               const char *str, size_t str_len)
506
0
{
507
0
    if (str_len <= 0){
508
0
        str_len = strlen(str);
509
0
    }
510
0
    if (left <= *off+str_len) {
511
0
        return FLB_FALSE;
512
0
    }
513
0
    memcpy(buf+*off, str, str_len);
514
0
    *off += str_len;
515
0
    return FLB_TRUE;
516
0
}
517
518
519
/*
520
 * Check if a key exists in the map using the 'offset' as an index to define
521
 * which element needs to start looking from
522
 */
523
static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset)
524
0
{
525
0
    int i;
526
0
    msgpack_object p;
527
528
0
    if (key.type != MSGPACK_OBJECT_STR) {
529
0
        return FLB_FALSE;
530
0
    }
531
532
0
    for (i = offset; i < map.via.map.size; i++) {
533
0
        p = map.via.map.ptr[i].key;
534
0
        if (p.type != MSGPACK_OBJECT_STR) {
535
0
            continue;
536
0
        }
537
538
0
        if (key.via.str.size != p.via.str.size) {
539
0
            continue;
540
0
        }
541
542
0
        if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) {
543
0
            return FLB_TRUE;
544
0
        }
545
0
    }
546
547
0
    return FLB_FALSE;
548
0
}
549
550
static int msgpack2json(char *buf, int *off, size_t left,
551
                        const msgpack_object *o)
552
0
{
553
0
    int i;
554
0
    int dup;
555
0
    int ret = FLB_FALSE;
556
0
    int loop;
557
0
    int packed;
558
559
0
    switch(o->type) {
560
0
    case MSGPACK_OBJECT_NIL:
561
0
        ret = try_to_write(buf, off, left, "null", 4);
562
0
        break;
563
564
0
    case MSGPACK_OBJECT_BOOLEAN:
565
0
        ret = try_to_write(buf, off, left,
566
0
                           (o->via.boolean ? "true":"false"),0);
567
568
0
        break;
569
570
0
    case MSGPACK_OBJECT_POSITIVE_INTEGER:
571
0
        {
572
0
            char temp[32] = {0};
573
0
            i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64);
574
0
            ret = try_to_write(buf, off, left, temp, i);
575
0
        }
576
0
        break;
577
578
0
    case MSGPACK_OBJECT_NEGATIVE_INTEGER:
579
0
        {
580
0
            char temp[32] = {0};
581
0
            i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64);
582
0
            ret = try_to_write(buf, off, left, temp, i);
583
0
        }
584
0
        break;
585
0
    case MSGPACK_OBJECT_FLOAT32:
586
0
    case MSGPACK_OBJECT_FLOAT64:
587
0
        {
588
0
            char temp[512] = {0};
589
0
            if (o->via.f64 == (double)(long long int)o->via.f64) {
590
0
                i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64);
591
0
            }
592
0
            else if (convert_nan_to_null && isnan(o->via.f64) ) {
593
0
                i = snprintf(temp, sizeof(temp)-1, "null");
594
0
            }
595
0
            else {
596
0
                i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64);
597
0
            }
598
0
            ret = try_to_write(buf, off, left, temp, i);
599
0
        }
600
0
        break;
601
602
0
    case MSGPACK_OBJECT_STR:
603
0
        if (try_to_write(buf, off, left, "\"", 1) &&
604
0
            (o->via.str.size > 0 ?
605
0
             try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size)
606
0
             : 1/* nothing to do */) &&
607
0
            try_to_write(buf, off, left, "\"", 1)) {
608
0
            ret = FLB_TRUE;
609
0
        }
610
0
        break;
611
612
0
    case MSGPACK_OBJECT_BIN:
613
0
        if (try_to_write(buf, off, left, "\"", 1) &&
614
0
            (o->via.bin.size > 0 ?
615
0
             try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size)
616
0
              : 1 /* nothing to do */) &&
617
0
            try_to_write(buf, off, left, "\"", 1)) {
618
0
            ret = FLB_TRUE;
619
0
        }
620
0
        break;
621
622
0
    case MSGPACK_OBJECT_EXT:
623
0
        if (!try_to_write(buf, off, left, "\"", 1)) {
624
0
            goto msg2json_end;
625
0
        }
626
        /* ext body. fortmat is similar to printf(1) */
627
0
        {
628
0
            char temp[32] = {0};
629
0
            int  len;
630
0
            loop = o->via.ext.size;
631
0
            for(i=0; i<loop; i++) {
632
0
                len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]);
633
0
                if (!try_to_write(buf, off, left, temp, len)) {
634
0
                    goto msg2json_end;
635
0
                }
636
0
            }
637
0
        }
638
0
        if (!try_to_write(buf, off, left, "\"", 1)) {
639
0
            goto msg2json_end;
640
0
        }
641
0
        ret = FLB_TRUE;
642
0
        break;
643
644
0
    case MSGPACK_OBJECT_ARRAY:
645
0
        loop = o->via.array.size;
646
647
0
        if (!try_to_write(buf, off, left, "[", 1)) {
648
0
            goto msg2json_end;
649
0
        }
650
0
        if (loop != 0) {
651
0
            msgpack_object* p = o->via.array.ptr;
652
0
            if (!msgpack2json(buf, off, left, p)) {
653
0
                goto msg2json_end;
654
0
            }
655
0
            for (i=1; i<loop; i++) {
656
0
                if (!try_to_write(buf, off, left, ",", 1) ||
657
0
                    !msgpack2json(buf, off, left, p+i)) {
658
0
                    goto msg2json_end;
659
0
                }
660
0
            }
661
0
        }
662
663
0
        ret = try_to_write(buf, off, left, "]", 1);
664
0
        break;
665
666
0
    case MSGPACK_OBJECT_MAP:
667
0
        loop = o->via.map.size;
668
0
        if (!try_to_write(buf, off, left, "{", 1)) {
669
0
            goto msg2json_end;
670
0
        }
671
0
        if (loop != 0) {
672
0
            msgpack_object k;
673
0
            msgpack_object_kv *p = o->via.map.ptr;
674
675
0
            packed = 0;
676
0
            dup = FLB_FALSE;
677
678
0
            k = o->via.map.ptr[0].key;
679
0
            for (i = 0; i < loop; i++) {
680
0
                k = o->via.map.ptr[i].key;
681
0
                dup = key_exists_in_map(k, *o, i + 1);
682
0
                if (dup == FLB_TRUE) {
683
0
                    continue;
684
0
                }
685
686
0
                if (packed > 0) {
687
0
                    if (!try_to_write(buf, off, left, ",", 1)) {
688
0
                        goto msg2json_end;
689
0
                    }
690
0
                }
691
692
0
                if (
693
0
                    !msgpack2json(buf, off, left, &(p+i)->key) ||
694
0
                    !try_to_write(buf, off, left, ":", 1)  ||
695
0
                    !msgpack2json(buf, off, left, &(p+i)->val) ) {
696
0
                    goto msg2json_end;
697
0
                }
698
0
                packed++;
699
0
            }
700
0
        }
701
702
0
        ret = try_to_write(buf, off, left, "}", 1);
703
0
        break;
704
705
0
    default:
706
0
        flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type);
707
0
    }
708
709
0
 msg2json_end:
710
0
    return ret;
711
0
}
712
713
/**
714
 *  convert msgpack to JSON string.
715
 *  This API is similar to snprintf.
716
 *
717
 *  @param  json_str  The buffer to fill JSON string.
718
 *  @param  json_size The size of json_str.
719
 *  @param  data      The msgpack_unpacked data.
720
 *  @return success   ? a number characters filled : negative value
721
 */
722
int flb_msgpack_to_json(char *json_str, size_t json_size,
723
                        const msgpack_object *obj)
724
0
{
725
0
    int ret = -1;
726
0
    int off = 0;
727
728
0
    if (json_str == NULL || obj == NULL) {
729
0
        return -1;
730
0
    }
731
732
0
    ret = msgpack2json(json_str, &off, json_size - 1, obj);
733
0
    json_str[off] = '\0';
734
0
    return ret ? off: ret;
735
0
}
736
737
flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size)
738
0
{
739
0
    int ret;
740
0
    size_t off = 0;
741
0
    size_t out_size;
742
0
    size_t realloc_size;
743
744
0
    msgpack_unpacked result;
745
0
    msgpack_object *root;
746
0
    flb_sds_t out_buf;
747
0
    flb_sds_t tmp_buf;
748
749
    /* buffer size strategy */
750
0
    out_size = in_size * FLB_MSGPACK_TO_JSON_INIT_BUFFER_SIZE;
751
0
    realloc_size = in_size * FLB_MSGPACK_TO_JSON_REALLOC_BUFFER_SIZE;
752
0
    if (realloc_size < 256) {
753
0
        realloc_size = 256;
754
0
    }
755
756
0
    out_buf = flb_sds_create_size(out_size);
757
0
    if (!out_buf) {
758
0
        flb_errno();
759
0
        return NULL;
760
0
    }
761
762
0
    msgpack_unpacked_init(&result);
763
0
    ret = msgpack_unpack_next(&result, in_buf, in_size, &off);
764
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
765
0
        flb_sds_destroy(out_buf);
766
0
        msgpack_unpacked_destroy(&result);
767
0
        return NULL;
768
0
    }
769
770
0
    root = &result.data;
771
0
    while (1) {
772
0
        ret = flb_msgpack_to_json(out_buf, out_size, root);
773
0
        if (ret <= 0) {
774
0
            tmp_buf = flb_sds_increase(out_buf, realloc_size);
775
0
            if (tmp_buf) {
776
0
                out_buf = tmp_buf;
777
0
                out_size += realloc_size;
778
0
            }
779
0
            else {
780
0
                flb_errno();
781
0
                flb_sds_destroy(out_buf);
782
0
                msgpack_unpacked_destroy(&result);
783
0
                return NULL;
784
0
            }
785
0
        }
786
0
        else {
787
0
            break;
788
0
        }
789
0
    }
790
791
0
    msgpack_unpacked_destroy(&result);
792
0
    flb_sds_len_set(out_buf, ret);
793
794
0
    return out_buf;
795
0
}
796
797
/*
798
 * Given a 'format' string type, return it integer representation. This
799
 * is used by output plugins that uses pack functions to convert
800
 * msgpack records to JSON.
801
 */
802
int flb_pack_to_json_format_type(const char *str)
803
0
{
804
0
    if (strcasecmp(str, "msgpack") == 0) {
805
0
        return FLB_PACK_JSON_FORMAT_NONE;
806
0
    }
807
0
    else if (strcasecmp(str, "json") == 0) {
808
0
        return FLB_PACK_JSON_FORMAT_JSON;
809
0
    }
810
0
    else if (strcasecmp(str, "json_stream") == 0) {
811
0
        return FLB_PACK_JSON_FORMAT_STREAM;
812
0
    }
813
0
    else if (strcasecmp(str, "json_lines") == 0) {
814
0
        return FLB_PACK_JSON_FORMAT_LINES;
815
0
    }
816
817
0
    return -1;
818
0
}
819
820
/* Given a 'date string type', return it integer representation */
821
int flb_pack_to_json_date_type(const char *str)
822
0
{
823
0
    if (strcasecmp(str, "double") == 0) {
824
0
        return FLB_PACK_JSON_DATE_DOUBLE;
825
0
    }
826
0
    else if (strcasecmp(str, "java_sql_timestamp") == 0) {
827
0
        return FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP;
828
0
    }
829
0
    else if (strcasecmp(str, "iso8601") == 0) {
830
0
        return FLB_PACK_JSON_DATE_ISO8601;
831
0
    }
832
0
    else if (strcasecmp(str, "epoch") == 0) {
833
0
        return FLB_PACK_JSON_DATE_EPOCH;
834
0
    }
835
0
    else if (strcasecmp(str, "epoch_ms") == 0 ||
836
0
             strcasecmp(str, "epoch_millis") == 0 ||
837
0
             strcasecmp(str, "epoch_milliseconds") == 0) {
838
0
        return FLB_PACK_JSON_DATE_EPOCH_MS;
839
0
    }
840
841
0
    return -1;
842
0
}
843
844
845
static int msgpack_pack_formatted_datetime(flb_sds_t out_buf, char time_formatted[], int max_len,
846
                                           msgpack_packer* tmp_pck, struct flb_time* tms,
847
                                           const char *date_format,
848
                                           const char *time_format)
849
0
{
850
0
    int len;
851
0
    size_t s;
852
0
    struct tm tm;
853
854
0
    gmtime_r(&tms->tm.tv_sec, &tm);
855
856
0
    s = strftime(time_formatted, max_len,
857
0
                 date_format, &tm);
858
0
    if (!s) {
859
0
        flb_debug("strftime failed in flb_pack_msgpack_to_json_format");
860
0
        return 1;
861
0
    }
862
863
    /* Format the time, use microsecond precision not nanoseconds */
864
0
    max_len -= s;
865
0
    len = snprintf(&time_formatted[s],
866
0
                    max_len,
867
0
                    time_format,
868
0
                    (uint64_t) tms->tm.tv_nsec / 1000);
869
0
    if (len >= max_len) {
870
0
        flb_debug("snprintf: %d >= %d in flb_pack_msgpack_to_json_format", len, max_len);
871
0
        return 2;
872
0
    }
873
0
    s += len;
874
0
    msgpack_pack_str(tmp_pck, s);
875
0
    msgpack_pack_str_body(tmp_pck, time_formatted, s);
876
0
    return 0;
877
0
}
878
879
flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
880
                                          int json_format, int date_format,
881
                                          flb_sds_t date_key)
882
0
{
883
0
    int i;
884
0
    int ok = MSGPACK_UNPACK_SUCCESS;
885
0
    int records = 0;
886
0
    int map_size;
887
0
    size_t off = 0;
888
0
    char time_formatted[38];
889
0
    flb_sds_t out_tmp;
890
0
    flb_sds_t out_js;
891
0
    flb_sds_t out_buf = NULL;
892
0
    msgpack_unpacked result;
893
0
    msgpack_object root;
894
0
    msgpack_object map;
895
0
    msgpack_sbuffer tmp_sbuf;
896
0
    msgpack_packer tmp_pck;
897
0
    msgpack_object *obj;
898
0
    msgpack_object *k;
899
0
    msgpack_object *v;
900
0
    struct flb_time tms;
901
902
    /* For json lines and streams mode we need a pre-allocated buffer */
903
0
    if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
904
0
        json_format == FLB_PACK_JSON_FORMAT_STREAM) {
905
0
        out_buf = flb_sds_create_size(bytes + bytes / 4);
906
0
        if (!out_buf) {
907
0
            flb_errno();
908
0
            return NULL;
909
0
        }
910
0
    }
911
912
    /* Create temporary msgpack buffer */
913
0
    msgpack_sbuffer_init(&tmp_sbuf);
914
0
    msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
915
916
    /*
917
     * If the format is the original msgpack style of one big array,
918
     * registrate the array, otherwise is not necessary. FYI, original format:
919
     *
920
     * [
921
     *   [timestamp, map],
922
     *   [timestamp, map],
923
     *   [T, M]...
924
     * ]
925
     */
926
0
    if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
927
0
        records = flb_mp_count(data, bytes);
928
0
        if (records <= 0) {
929
0
            msgpack_sbuffer_destroy(&tmp_sbuf);
930
0
            return NULL;
931
0
        }
932
0
        msgpack_pack_array(&tmp_pck, records);
933
0
    }
934
935
0
    msgpack_unpacked_init(&result);
936
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
937
        /* Each array must have two entries: time and record */
938
0
        root = result.data;
939
0
        if (root.type != MSGPACK_OBJECT_ARRAY) {
940
0
            continue;
941
0
        }
942
0
        if (root.via.array.size != 2) {
943
0
            continue;
944
0
        }
945
946
        /* Unpack time */
947
0
        flb_time_pop_from_msgpack(&tms, &result, &obj);
948
949
        /* Get the record/map */
950
0
        map = root.via.array.ptr[1];
951
0
        if (map.type != MSGPACK_OBJECT_MAP) {
952
0
            continue;
953
0
        }
954
0
        map_size = map.via.map.size;
955
956
0
        if (date_key != NULL) {
957
0
            msgpack_pack_map(&tmp_pck, map_size + 1);
958
0
        }
959
0
        else {
960
0
            msgpack_pack_map(&tmp_pck, map_size);
961
0
        }
962
963
0
        if (date_key != NULL) {
964
            /* Append date key */
965
0
            msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
966
0
            msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));
967
968
            /* Append date value */
969
0
            switch (date_format) {
970
0
            case FLB_PACK_JSON_DATE_DOUBLE:
971
0
                msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
972
0
                break;
973
0
            case FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP:
974
0
                if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms,
975
0
                                                    FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) {
976
0
                    flb_sds_destroy(out_buf);
977
0
                    msgpack_sbuffer_destroy(&tmp_sbuf);
978
0
                    msgpack_unpacked_destroy(&result);
979
0
                    return NULL;
980
0
                }
981
0
                break;
982
0
            case FLB_PACK_JSON_DATE_ISO8601:
983
0
                if (msgpack_pack_formatted_datetime(out_buf, time_formatted, sizeof(time_formatted), &tmp_pck, &tms,
984
0
                                                    FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) {
985
0
                    flb_sds_destroy(out_buf);
986
0
                    msgpack_sbuffer_destroy(&tmp_sbuf);
987
0
                    msgpack_unpacked_destroy(&result);
988
0
                    return NULL;
989
0
                }
990
0
                break;
991
0
            case FLB_PACK_JSON_DATE_EPOCH:
992
0
                msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
993
0
                break;
994
0
            case FLB_PACK_JSON_DATE_EPOCH_MS:
995
0
                msgpack_pack_uint64(&tmp_pck, flb_time_to_millisec(&tms));
996
0
                break;
997
0
            }
998
0
        }
999
1000
        /* Append remaining keys/values */
1001
0
        for (i = 0; i < map_size; i++) {
1002
0
            k = &map.via.map.ptr[i].key;
1003
0
            v = &map.via.map.ptr[i].val;
1004
0
            msgpack_pack_object(&tmp_pck, *k);
1005
0
            msgpack_pack_object(&tmp_pck, *v);
1006
0
        }
1007
1008
        /*
1009
         * If the format is the original msgpack style, just continue since
1010
         * we don't care about separator or JSON convertion at this point.
1011
         */
1012
0
        if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
1013
0
            continue;
1014
0
        }
1015
1016
        /*
1017
         * Here we handle two types of records concatenation:
1018
         *
1019
         * FLB_PACK_JSON_FORMAT_LINES: add  breakline (\n) after each record
1020
         *
1021
         *
1022
         *     {'ts':abc,'k1':1}
1023
         *     {'ts':abc,'k1':2}
1024
         *     {N}
1025
         *
1026
         * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g:
1027
         *
1028
         *     {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N}
1029
         */
1030
0
        if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
1031
0
            json_format == FLB_PACK_JSON_FORMAT_STREAM) {
1032
1033
            /* Encode current record into JSON in a temporary variable */
1034
0
            out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
1035
0
            if (!out_js) {
1036
0
                flb_sds_destroy(out_buf);
1037
0
                msgpack_sbuffer_destroy(&tmp_sbuf);
1038
0
                msgpack_unpacked_destroy(&result);
1039
0
                return NULL;
1040
0
            }
1041
1042
            /*
1043
             * One map record has been converted, now append it to the
1044
             * outgoing out_buf sds variable.
1045
             */
1046
0
            out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js));
1047
0
            if (!out_tmp) {
1048
0
                flb_sds_destroy(out_js);
1049
0
                flb_sds_destroy(out_buf);
1050
0
                msgpack_sbuffer_destroy(&tmp_sbuf);
1051
0
                msgpack_unpacked_destroy(&result);
1052
0
                return NULL;
1053
0
            }
1054
1055
            /* Release temporary json sds buffer */
1056
0
            flb_sds_destroy(out_js);
1057
1058
            /* If a realloc happened, check the returned address */
1059
0
            if (out_tmp != out_buf) {
1060
0
                out_buf = out_tmp;
1061
0
            }
1062
1063
            /* Append the breakline only for json lines mode */
1064
0
            if (json_format == FLB_PACK_JSON_FORMAT_LINES) {
1065
0
                out_tmp = flb_sds_cat(out_buf, "\n", 1);
1066
0
                if (!out_tmp) {
1067
0
                    flb_sds_destroy(out_buf);
1068
0
                    msgpack_sbuffer_destroy(&tmp_sbuf);
1069
0
                    msgpack_unpacked_destroy(&result);
1070
0
                    return NULL;
1071
0
                }
1072
0
                if (out_tmp != out_buf) {
1073
0
                    out_buf = out_tmp;
1074
0
                }
1075
0
            }
1076
0
            msgpack_sbuffer_clear(&tmp_sbuf);
1077
0
        }
1078
0
    }
1079
1080
    /* Release the unpacker */
1081
0
    msgpack_unpacked_destroy(&result);
1082
1083
    /* Format to JSON */
1084
0
    if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
1085
0
        out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
1086
0
        msgpack_sbuffer_destroy(&tmp_sbuf);
1087
1088
0
        if (!out_buf) {
1089
0
            return NULL;
1090
0
        }
1091
0
    }
1092
0
    else {
1093
0
        msgpack_sbuffer_destroy(&tmp_sbuf);
1094
0
    }
1095
1096
0
    if (out_buf && flb_sds_len(out_buf) == 0) {
1097
0
        flb_sds_destroy(out_buf);
1098
0
        return NULL;
1099
0
    }
1100
1101
0
    return out_buf;
1102
0
}
1103
1104
/**
1105
 *  convert msgpack to JSON string.
1106
 *  This API is similar to snprintf.
1107
 *  @param  size     Estimated length of json str.
1108
 *  @param  data     The msgpack_unpacked data.
1109
 *  @return success  ? allocated json str ptr : NULL
1110
 */
1111
char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj)
1112
0
{
1113
0
    int ret;
1114
0
    char *buf = NULL;
1115
0
    char *tmp;
1116
1117
0
    if (obj == NULL) {
1118
0
        return NULL;
1119
0
    }
1120
1121
0
    if (size <= 0) {
1122
0
        size = 128;
1123
0
    }
1124
1125
0
    buf = flb_malloc(size);
1126
0
    if (!buf) {
1127
0
        flb_errno();
1128
0
        return NULL;
1129
0
    }
1130
1131
0
    while (1) {
1132
0
        ret = flb_msgpack_to_json(buf, size, obj);
1133
0
        if (ret <= 0) {
1134
            /* buffer is small. retry.*/
1135
0
            size += 128;
1136
0
            tmp = flb_realloc(buf, size);
1137
0
            if (tmp) {
1138
0
                buf = tmp;
1139
0
            }
1140
0
            else {
1141
0
                flb_free(buf);
1142
0
                flb_errno();
1143
0
                return NULL;
1144
0
            }
1145
0
        }
1146
0
        else {
1147
0
            break;
1148
0
        }
1149
0
    }
1150
1151
0
    return buf;
1152
0
}
1153
1154
int flb_pack_time_now(msgpack_packer *pck)
1155
0
{
1156
0
    int ret;
1157
0
    struct flb_time t;
1158
1159
0
    flb_time_get(&t);
1160
0
    ret = flb_time_append_to_msgpack(&t, pck, 0);
1161
1162
0
    return ret;
1163
0
}
1164
1165
int flb_msgpack_expand_map(char *map_data, size_t map_size,
1166
                           msgpack_object_kv **kv_arr, int kv_arr_len,
1167
                           char** out_buf, int* out_size)
1168
0
{
1169
0
    msgpack_sbuffer sbuf;
1170
0
    msgpack_packer  pck;
1171
0
    msgpack_unpacked result;
1172
0
    size_t off = 0;
1173
0
    char *ret_buf;
1174
0
    int map_num;
1175
0
    int i;
1176
0
    int len;
1177
1178
0
    if (map_data == NULL){
1179
0
        return -1;
1180
0
    }
1181
1182
0
    msgpack_unpacked_init(&result);
1183
0
    if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) !=
1184
0
        MSGPACK_UNPACK_SUCCESS ) {
1185
0
        msgpack_unpacked_destroy(&result);
1186
0
        return -1;
1187
0
    }
1188
0
    if (result.data.type != MSGPACK_OBJECT_MAP) {
1189
0
        msgpack_unpacked_destroy(&result);
1190
0
        return -1;
1191
0
    }
1192
1193
0
    len = result.data.via.map.size;
1194
0
    map_num = kv_arr_len + len;
1195
1196
0
    msgpack_sbuffer_init(&sbuf);
1197
0
    msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
1198
0
    msgpack_pack_map(&pck, map_num);
1199
1200
0
    for (i=0; i<len; i++) {
1201
0
        msgpack_pack_object(&pck, result.data.via.map.ptr[i].key);
1202
0
        msgpack_pack_object(&pck, result.data.via.map.ptr[i].val);
1203
0
    }
1204
0
    for (i=0; i<kv_arr_len; i++){
1205
0
        msgpack_pack_object(&pck, kv_arr[i]->key);
1206
0
        msgpack_pack_object(&pck, kv_arr[i]->val);
1207
0
    }
1208
0
    msgpack_unpacked_destroy(&result);
1209
1210
0
    *out_size = sbuf.size;
1211
0
    ret_buf  = flb_malloc(sbuf.size);
1212
0
    *out_buf = ret_buf;
1213
0
    if (*out_buf == NULL) {
1214
0
        flb_errno();
1215
0
        msgpack_sbuffer_destroy(&sbuf);
1216
0
        return -1;
1217
0
    }
1218
0
    memcpy(*out_buf, sbuf.data, sbuf.size);
1219
0
    msgpack_sbuffer_destroy(&sbuf);
1220
1221
0
    return 0;
1222
0
}
1223
1224
int flb_pack_init(struct flb_config *config)
1225
0
{
1226
0
    int ret;
1227
1228
0
    if (config == NULL) {
1229
0
        return -1;
1230
0
    }
1231
0
    ret = flb_pack_set_null_as_nan(config->convert_nan_to_null);
1232
1233
0
    return ret;
1234
0
}