Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/lib/cprofiles/cprof_mpack_utils.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  CProfiles
4
 *  ========
5
 *  Copyright 2024 The CProfiles Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <cprofiles/cprof_mpack_utils.h>
21
#include <cfl/cfl_sds.h>
22
#include <mpack/mpack.h>
23
24
int cprof_mpack_consume_string_or_nil_tag(mpack_reader_t *reader, cfl_sds_t *output_buffer)
25
0
{
26
0
    int result;
27
28
0
    if (output_buffer == NULL) {
29
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
30
0
    }
31
32
0
    if (cprof_mpack_peek_type(reader) == mpack_type_str) {
33
0
        result = cprof_mpack_consume_string_tag(reader, output_buffer);
34
0
    }
35
0
    else if (cprof_mpack_peek_type(reader) == mpack_type_nil) {
36
0
        result = cprof_mpack_consume_nil_tag(reader);
37
38
0
        if (result == CPROF_MPACK_SUCCESS) {
39
0
            *output_buffer = NULL;
40
0
        }
41
0
    }
42
0
    else {
43
0
        result = CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
44
0
    }
45
46
0
    return result;
47
0
}
48
49
int cprof_mpack_consume_binary_or_nil_tag(mpack_reader_t *reader, cfl_sds_t *output_buffer)
50
0
{
51
0
    int result;
52
53
0
    if (output_buffer == NULL) {
54
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
55
0
    }
56
57
0
    if (cprof_mpack_peek_type(reader) == mpack_type_bin) {
58
0
        result = cprof_mpack_consume_binary_tag(reader, output_buffer);
59
0
    }
60
0
    else if (cprof_mpack_peek_type(reader) == mpack_type_nil) {
61
0
        result = cprof_mpack_consume_nil_tag(reader);
62
63
0
        if (result == CPROF_MPACK_SUCCESS) {
64
0
            *output_buffer = NULL;
65
0
        }
66
0
    }
67
0
    else {
68
0
        result = CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
69
0
    }
70
71
0
    return result;
72
0
}
73
74
int cprof_mpack_consume_nil_tag(mpack_reader_t *reader)
75
0
{
76
0
    mpack_tag_t tag;
77
78
0
    if (NULL == reader) {
79
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
80
0
    }
81
82
0
    tag = mpack_read_tag(reader);
83
84
0
    if (mpack_ok != mpack_reader_error(reader)) {
85
0
        return CPROF_MPACK_ENGINE_ERROR;
86
0
    }
87
88
0
    if (mpack_type_nil != mpack_tag_type(&tag)) {
89
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
90
0
    }
91
92
0
    return CPROF_MPACK_SUCCESS;
93
0
}
94
95
int cprof_mpack_consume_double_tag(mpack_reader_t *reader, double *output_buffer)
96
0
{
97
0
    mpack_tag_t tag;
98
99
0
    if (NULL == output_buffer) {
100
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
101
0
    }
102
103
0
    if (NULL == reader) {
104
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
105
0
    }
106
107
0
    tag = mpack_read_tag(reader);
108
109
0
    if (mpack_ok != mpack_reader_error(reader)) {
110
0
        return CPROF_MPACK_ENGINE_ERROR;
111
0
    }
112
113
0
    if (mpack_type_double != mpack_tag_type(&tag)) {
114
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
115
0
    }
116
117
0
    *output_buffer = mpack_tag_double_value(&tag);
118
119
0
    return CPROF_MPACK_SUCCESS;
120
0
}
121
122
int cprof_mpack_consume_uint_tag(mpack_reader_t *reader, uint64_t *output_buffer)
123
0
{
124
0
    mpack_tag_t tag;
125
126
0
    if (NULL == output_buffer) {
127
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
128
0
    }
129
130
0
    if (NULL == reader) {
131
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
132
0
    }
133
134
0
    tag = mpack_read_tag(reader);
135
136
0
    if (mpack_ok != mpack_reader_error(reader)) {
137
0
        return CPROF_MPACK_ENGINE_ERROR;
138
0
    }
139
140
0
    if (mpack_type_int == mpack_tag_type(&tag)) {
141
0
        *output_buffer = (uint64_t) mpack_tag_int_value(&tag);
142
0
    }
143
0
    else if (mpack_type_uint == mpack_tag_type(&tag)) {
144
0
        *output_buffer = (uint64_t) mpack_tag_uint_value(&tag);
145
0
    }
146
0
    else {
147
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
148
0
    }
149
150
0
    return CPROF_MPACK_SUCCESS;
151
0
}
152
153
int cprof_mpack_consume_uint32_tag(mpack_reader_t *reader, uint32_t *output_buffer)
154
0
{
155
0
    int      result;
156
0
    uint64_t value;
157
158
0
    result = cprof_mpack_consume_uint_tag(reader, &value);
159
160
0
    if (result == CPROF_MPACK_SUCCESS) {
161
0
        *output_buffer = (uint32_t) value;
162
0
    }
163
164
0
    return result;
165
0
}
166
167
int cprof_mpack_consume_uint64_tag(mpack_reader_t *reader, uint64_t *output_buffer)
168
0
{
169
0
    return cprof_mpack_consume_uint_tag(reader, output_buffer);
170
0
}
171
172
int cprof_mpack_consume_int_tag(mpack_reader_t *reader, int64_t *output_buffer)
173
0
{
174
0
    mpack_tag_t tag;
175
176
0
    if (NULL == output_buffer) {
177
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
178
0
    }
179
180
0
    if (NULL == reader) {
181
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
182
0
    }
183
184
0
    tag = mpack_read_tag(reader);
185
186
0
    if (mpack_ok != mpack_reader_error(reader)) {
187
0
        return CPROF_MPACK_ENGINE_ERROR;
188
0
    }
189
190
0
    if (mpack_type_int == mpack_tag_type(&tag)) {
191
0
        *output_buffer = (int64_t) mpack_tag_int_value(&tag);
192
0
    }
193
0
    else if (mpack_type_uint == mpack_tag_type(&tag)) {
194
0
        *output_buffer = (int64_t) mpack_tag_uint_value(&tag);
195
0
    }
196
0
    else {
197
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
198
0
    }
199
200
0
    return CPROF_MPACK_SUCCESS;
201
0
}
202
203
int cprof_mpack_consume_int32_tag(mpack_reader_t *reader, int32_t *output_buffer)
204
0
{
205
0
    int     result;
206
0
    int64_t value;
207
208
0
    result = cprof_mpack_consume_int_tag(reader, &value);
209
210
0
    if (result == CPROF_MPACK_SUCCESS) {
211
0
        *output_buffer = (int32_t) value;
212
0
    }
213
214
0
    return result;
215
0
}
216
217
int cprof_mpack_consume_int64_tag(mpack_reader_t *reader, int64_t *output_buffer)
218
0
{
219
0
    return cprof_mpack_consume_int_tag(reader, output_buffer);
220
0
}
221
222
int cprof_mpack_consume_string_tag(mpack_reader_t *reader, cfl_sds_t *output_buffer)
223
0
{
224
0
    uint32_t    string_length;
225
0
    mpack_tag_t tag;
226
227
0
    if (NULL == output_buffer) {
228
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
229
0
    }
230
231
0
    if (NULL == reader) {
232
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
233
0
    }
234
235
0
    tag = mpack_read_tag(reader);
236
237
0
    if (mpack_ok != mpack_reader_error(reader)) {
238
0
        return CPROF_MPACK_ENGINE_ERROR;
239
0
    }
240
241
0
    if (mpack_type_str != mpack_tag_type(&tag)) {
242
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
243
0
    }
244
245
0
    string_length = mpack_tag_str_length(&tag);
246
247
    /* This validation only applies to cmetrics and its use cases, we know
248
     * for a fact that our label names and values are not supposed to be really
249
     * long so a huge value here probably means that the data stream got corrupted.
250
     */
251
252
0
    if (CPROF_MPACK_MAX_STRING_LENGTH < string_length) {
253
0
        return CPROF_MPACK_CORRUPT_INPUT_DATA_ERROR;
254
0
    }
255
256
0
    *output_buffer = cfl_sds_create_size(string_length + 1);
257
258
0
    if (NULL == *output_buffer) {
259
0
        return CPROF_MPACK_ALLOCATION_ERROR;
260
0
    }
261
262
0
    cfl_sds_set_len(*output_buffer, string_length);
263
264
0
    mpack_read_cstr(reader, *output_buffer, string_length + 1, string_length);
265
266
0
    if (mpack_ok != mpack_reader_error(reader)) {
267
0
        cfl_sds_destroy(*output_buffer);
268
269
0
        *output_buffer = NULL;
270
271
0
        return CPROF_MPACK_ENGINE_ERROR;
272
0
    }
273
274
0
    mpack_done_str(reader);
275
276
0
    if (mpack_ok != mpack_reader_error(reader)) {
277
0
        cfl_sds_destroy(*output_buffer);
278
279
0
        *output_buffer = NULL;
280
281
0
        return CPROF_MPACK_ENGINE_ERROR;
282
0
    }
283
284
0
    return CPROF_MPACK_SUCCESS;
285
0
}
286
287
int cprof_mpack_consume_binary_tag(mpack_reader_t *reader, cfl_sds_t *output_buffer)
288
0
{
289
0
    uint32_t    string_length;
290
0
    mpack_tag_t tag;
291
292
0
    if (NULL == output_buffer) {
293
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
294
0
    }
295
296
0
    if (NULL == reader) {
297
0
        return CPROF_MPACK_INVALID_ARGUMENT_ERROR;
298
0
    }
299
300
0
    tag = mpack_read_tag(reader);
301
302
0
    if (mpack_ok != mpack_reader_error(reader)) {
303
0
        return CPROF_MPACK_ENGINE_ERROR;
304
0
    }
305
306
0
    if (mpack_type_bin != mpack_tag_type(&tag)) {
307
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
308
0
    }
309
310
0
    string_length = mpack_tag_bin_length(&tag);
311
312
0
    if (CPROF_MPACK_MAX_STRING_LENGTH < string_length) {
313
0
        return CPROF_MPACK_CORRUPT_INPUT_DATA_ERROR;
314
0
    }
315
316
0
    *output_buffer = cfl_sds_create_size(string_length);
317
318
0
    if (NULL == *output_buffer) {
319
0
        return CPROF_MPACK_ALLOCATION_ERROR;
320
0
    }
321
322
0
    cfl_sds_set_len(*output_buffer, string_length);
323
324
0
    mpack_read_bytes(reader, *output_buffer, string_length);
325
326
0
    if (mpack_ok != mpack_reader_error(reader)) {
327
0
        cfl_sds_destroy(*output_buffer);
328
329
0
        *output_buffer = NULL;
330
331
0
        return CPROF_MPACK_ENGINE_ERROR;
332
0
    }
333
334
0
    mpack_done_bin(reader);
335
336
0
    if (mpack_ok != mpack_reader_error(reader)) {
337
0
        cfl_sds_destroy(*output_buffer);
338
339
0
        *output_buffer = NULL;
340
341
0
        return CPROF_MPACK_ENGINE_ERROR;
342
0
    }
343
344
0
    return CPROF_MPACK_SUCCESS;
345
0
}
346
347
int cprof_mpack_unpack_map(mpack_reader_t *reader,
348
                         struct cprof_mpack_map_entry_callback_t *callback_list,
349
                         void *context)
350
0
{
351
0
    struct cprof_mpack_map_entry_callback_t *callback_entry;
352
0
    uint32_t                               entry_index;
353
0
    uint32_t                               entry_count;
354
0
    cfl_sds_t                              key_name;
355
0
    int                                    result;
356
0
    mpack_tag_t                            tag;
357
358
0
    tag = mpack_read_tag(reader);
359
360
0
    if (mpack_ok != mpack_reader_error(reader)) {
361
0
        return CPROF_MPACK_ENGINE_ERROR;
362
0
    }
363
364
0
    if (mpack_type_map != mpack_tag_type(&tag)) {
365
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
366
0
    }
367
368
0
    entry_count = mpack_tag_map_count(&tag);
369
370
    /* This validation only applies to cmetrics and its use cases, we know
371
     * how our schema looks and how many entries the different fields have and none
372
     * of those exceed the number we set CPROF_MPACK_MAX_MAP_ENTRY_COUNT to which is 10.
373
     * Making these sanity checks optional or configurable in runtime might be worth
374
     * the itme and complexity cost but that's something I don't know at the moment.
375
     */
376
377
0
    if (CPROF_MPACK_MAX_MAP_ENTRY_COUNT < entry_count) {
378
0
        return CPROF_MPACK_CORRUPT_INPUT_DATA_ERROR;
379
0
    }
380
381
0
    result = 0;
382
383
0
    for (entry_index = 0 ; 0 == result && entry_index < entry_count ; entry_index++) {
384
0
        result = cprof_mpack_consume_string_tag(reader, &key_name);
385
386
0
        if (CPROF_MPACK_SUCCESS == result) {
387
0
            callback_entry = callback_list;
388
0
            result = CPROF_MPACK_UNEXPECTED_KEY_ERROR;
389
390
0
            while (CPROF_MPACK_UNEXPECTED_KEY_ERROR == result &&
391
0
                   NULL != callback_entry->identifier) {
392
393
0
                if (0 == strcmp(callback_entry->identifier, key_name)) {
394
0
                    result = callback_entry->handler(reader, entry_index, context);
395
0
                }
396
397
0
                callback_entry++;
398
0
            }
399
400
0
            cfl_sds_destroy(key_name);
401
0
        }
402
0
    }
403
404
0
    if (CPROF_MPACK_SUCCESS == result) {
405
0
        mpack_done_map(reader);
406
407
0
        if (mpack_ok != mpack_reader_error(reader))
408
0
        {
409
0
            return CPROF_MPACK_PENDING_MAP_ENTRIES;
410
0
        }
411
0
    }
412
413
0
    return result;
414
0
}
415
416
int cprof_mpack_unpack_array(mpack_reader_t *reader,
417
                           cprof_mpack_unpacker_entry_callback_fn_t entry_processor_callback,
418
                           void *context)
419
0
{
420
0
    uint32_t              entry_index;
421
0
    uint32_t              entry_count;
422
0
    mpack_tag_t           tag;
423
0
    int                   result;
424
425
0
    tag = mpack_read_tag(reader);
426
427
0
    if (mpack_ok != mpack_reader_error(reader))
428
0
    {
429
0
        return CPROF_MPACK_ENGINE_ERROR;
430
0
    }
431
432
0
    if (mpack_type_array != mpack_tag_type(&tag)) {
433
0
        return CPROF_MPACK_UNEXPECTED_DATA_TYPE_ERROR;
434
0
    }
435
436
0
    entry_count = mpack_tag_array_count(&tag);
437
438
    /* This validation only applies to cmetrics and its use cases, we know
439
     * that in our schema we have the following arrays :
440
     *     label text dictionary (strings)
441
     *     dimension labels (indexes)
442
     *     metric values
443
     *         dimension values
444
     *
445
     * IMO none of these arrays should be huge so I think using 65535 as a limit
446
     * gives us more than enough wiggle space (in reality I don't expect any of these
447
     * arrays to hold more than 128 values but I could be wrong as that probably depends
448
     * on the flush interval)
449
     */
450
451
0
    if (CPROF_MPACK_MAX_ARRAY_ENTRY_COUNT < entry_count) {
452
0
        return CPROF_MPACK_CORRUPT_INPUT_DATA_ERROR;
453
0
    }
454
455
0
    result = CPROF_MPACK_SUCCESS;
456
457
0
    for (entry_index = 0 ;
458
0
         CPROF_MPACK_SUCCESS == result && entry_index < entry_count ;
459
0
         entry_index++) {
460
0
        result = entry_processor_callback(reader, entry_index, context);
461
0
    }
462
463
0
    if (CPROF_MPACK_SUCCESS == result) {
464
0
        mpack_done_array(reader);
465
466
0
        if (mpack_ok != mpack_reader_error(reader))
467
0
        {
468
0
            return CPROF_MPACK_PENDING_ARRAY_ENTRIES;
469
0
        }
470
0
    }
471
472
0
    return result;
473
0
}
474
475
int cprof_mpack_peek_array_length(mpack_reader_t *reader)
476
0
{
477
0
    mpack_tag_t tag;
478
479
0
    tag = mpack_peek_tag(reader);
480
481
0
    if (mpack_ok != mpack_reader_error(reader))
482
0
    {
483
0
        return 0;
484
0
    }
485
486
0
    if (mpack_type_array != mpack_tag_type(&tag)) {
487
0
        return 0;
488
0
    }
489
490
0
    return mpack_tag_array_count(&tag);
491
0
}
492
493
mpack_type_t cprof_mpack_peek_type(mpack_reader_t *reader)
494
0
{
495
0
    mpack_tag_t tag;
496
497
0
    tag = mpack_peek_tag(reader);
498
499
0
    if (mpack_reader_error(reader) != mpack_ok) {
500
0
        return mpack_type_missing;
501
0
    }
502
503
0
    return mpack_tag_type(&tag);
504
0
}