Coverage Report

Created: 2025-10-14 08:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  CMetrics
4
 *  ========
5
 *  Copyright 2021-2022 The CMetrics Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <cmetrics/cmetrics.h>
21
#include <cmetrics/cmt_metric.h>
22
#include <cmetrics/cmt_map.h>
23
#include <cmetrics/cmt_histogram.h>
24
#include <cmetrics/cmt_summary.h>
25
#include <cmetrics/cmt_counter.h>
26
#include <cmetrics/cmt_gauge.h>
27
#include <cmetrics/cmt_untyped.h>
28
#include <cmetrics/cmt_compat.h>
29
#include <cmetrics/cmt_encode_msgpack.h>
30
#include <cmetrics/cmt_variant_utils.h>
31
32
struct cmt_map_label *create_label(char *label_text)
33
0
{
34
0
    struct cmt_map_label *new_label;
35
36
0
    new_label = calloc(1, sizeof(struct cmt_map_label));
37
38
0
    if (NULL != new_label) {
39
0
        new_label->name = cfl_sds_create(label_text);
40
41
0
        if (NULL == new_label->name) {
42
0
            free(new_label);
43
44
0
            new_label = NULL;
45
0
        }
46
0
    }
47
48
0
    return new_label;
49
0
}
50
51
static void pack_header(mpack_writer_t *writer, struct cmt *cmt, struct cmt_map *map)
52
0
{
53
0
    struct cmt_opts      *opts;
54
0
    struct cfl_list       *head;
55
0
    struct cmt_map_label *label;
56
0
    size_t                index;
57
0
    struct cmt_summary   *summary = NULL;
58
0
    struct cmt_histogram *histogram = NULL;
59
0
    struct cmt_counter   *counter = NULL;
60
0
    size_t                meta_field_count;
61
62
0
    opts = map->opts;
63
0
    meta_field_count = 4;
64
65
0
    if (map->type == CMT_HISTOGRAM) {
66
0
        histogram = (struct cmt_histogram *) map->parent;
67
68
0
        meta_field_count++;
69
0
    }
70
0
    else if (map->type == CMT_SUMMARY) {
71
0
        summary = (struct cmt_summary *) map->parent;
72
73
0
        meta_field_count++;
74
0
    }
75
0
    else if (map->type == CMT_COUNTER){
76
0
        counter = (struct cmt_counter *) map->parent;
77
78
0
        meta_field_count++;
79
0
    }
80
81
    /* 'meta' */
82
0
    mpack_write_cstr(writer, "meta");
83
0
    mpack_start_map(writer, meta_field_count);
84
85
    /* 'ver' */
86
0
    mpack_write_cstr(writer, "ver");
87
0
    mpack_write_uint(writer, MSGPACK_ENCODER_VERSION);
88
89
    /* 'type' */
90
0
    mpack_write_cstr(writer, "type");
91
0
    mpack_write_uint(writer, map->type);
92
93
    /* 'opts' */
94
0
    mpack_write_cstr(writer, "opts");
95
0
    mpack_start_map(writer, 4);
96
97
    /* opts['ns'] */
98
0
    mpack_write_cstr(writer, "ns");
99
0
    mpack_write_cstr(writer, opts->ns);
100
101
    /* opts['subsystem'] */
102
0
    mpack_write_cstr(writer, "ss");
103
0
    mpack_write_cstr(writer, opts->subsystem);
104
105
    /* opts['name'] */
106
0
    mpack_write_cstr(writer, "name");
107
0
    mpack_write_cstr(writer, opts->name);
108
109
    /* opts['description'] */
110
0
    mpack_write_cstr(writer, "desc");
111
0
    mpack_write_cstr(writer, opts->description);
112
113
0
    mpack_finish_map(writer); /* 'opts' */
114
115
    /* 'labels' (label keys) */
116
0
    mpack_write_cstr(writer, "labels");
117
0
    mpack_start_array(writer, map->label_count);
118
0
    cfl_list_foreach(head, &map->label_keys) {
119
0
        label = cfl_list_entry(head, struct cmt_map_label, _head);
120
121
0
        mpack_write_cstr(writer, label->name);
122
0
    }
123
0
    mpack_finish_array(writer);
124
125
0
    if (map->type == CMT_HISTOGRAM) {
126
        /* 'buckets' (histogram buckets) */
127
0
        mpack_write_cstr(writer, "buckets");
128
129
0
        if (histogram->buckets != NULL) {
130
0
            mpack_start_array(writer, histogram->buckets->count);
131
132
0
            for (index = 0 ;
133
0
                 index < histogram->buckets->count ;
134
0
                 index++) {
135
0
                mpack_write_double(writer, histogram->buckets->upper_bounds[index]);
136
0
            }
137
0
        }
138
0
        else {
139
0
            mpack_start_array(writer, 0);
140
0
        }
141
142
0
        mpack_finish_array(writer);
143
0
    }
144
0
    else if (map->type == CMT_SUMMARY) {
145
        /* 'quantiles' (summary quantiles) */
146
0
        mpack_write_cstr(writer, "quantiles");
147
148
0
        mpack_start_array(writer, summary->quantiles_count);
149
150
0
        for (index = 0 ;
151
0
             index < summary->quantiles_count ;
152
0
             index++) {
153
0
            mpack_write_double(writer, summary->quantiles[index]);
154
0
        }
155
156
0
        mpack_finish_array(writer);
157
0
    }
158
0
    else if (map->type == CMT_COUNTER){
159
        /* aggregation_type */
160
0
        mpack_write_cstr(writer, "aggregation_type");
161
0
        mpack_write_int(writer, counter->aggregation_type);
162
0
    }
163
164
0
    mpack_finish_map(writer); /* 'meta' */
165
0
}
166
167
static int pack_metric(mpack_writer_t *writer, struct cmt_map *map, struct cmt_metric *metric)
168
0
{
169
0
    int c_labels;
170
0
    int s;
171
0
    double val;
172
0
    size_t index;
173
0
    struct cfl_list *head;
174
0
    struct cmt_map_label *label;
175
0
    struct cmt_summary *summary;
176
0
    struct cmt_histogram *histogram;
177
178
0
    c_labels = cfl_list_size(&metric->labels);
179
180
0
    s = 3;
181
182
0
    if (c_labels > 0) {
183
0
        s++;
184
0
    }
185
186
0
    mpack_start_map(writer, s);
187
188
0
    mpack_write_cstr(writer, "ts");
189
0
    mpack_write_uint(writer, metric->timestamp);
190
191
0
    if (map->type == CMT_HISTOGRAM) {
192
0
        histogram = (struct cmt_histogram *) map->parent;
193
194
0
        mpack_write_cstr(writer, "histogram");
195
0
        mpack_start_map(writer, 3);
196
197
0
        mpack_write_cstr(writer, "buckets");
198
0
        mpack_start_array(writer, histogram->buckets->count + 1);
199
0
        for (index = 0 ;
200
0
             index <= histogram->buckets->count ;
201
0
             index++) {
202
0
            mpack_write_uint(writer, cmt_metric_hist_get_value(metric, index));
203
0
        }
204
205
0
        mpack_finish_array(writer);
206
207
0
        mpack_write_cstr(writer, "sum");
208
0
        mpack_write_double(writer, cmt_metric_hist_get_sum_value(metric));
209
210
0
        mpack_write_cstr(writer, "count");
211
0
        mpack_write_uint(writer, cmt_metric_hist_get_count_value(metric));
212
213
0
        mpack_finish_map(writer); /* 'histogram' */
214
0
    }
215
0
    else if (map->type == CMT_SUMMARY) {
216
0
        summary = (struct cmt_summary *) map->parent;
217
218
0
        mpack_write_cstr(writer, "summary");
219
0
        mpack_start_map(writer, 4);
220
221
0
        mpack_write_cstr(writer, "quantiles_set");
222
0
        mpack_write_uint(writer, metric->sum_quantiles_set);
223
224
0
        mpack_write_cstr(writer, "quantiles");
225
0
        mpack_start_array(writer, summary->quantiles_count);
226
227
0
        for (index = 0 ; index < summary->quantiles_count ; index++) {
228
0
            mpack_write_uint(writer, metric->sum_quantiles[index]);
229
0
        }
230
231
0
        mpack_finish_array(writer);
232
233
0
        mpack_write_cstr(writer, "count");
234
0
        mpack_write_uint(writer, cmt_summary_get_count_value(metric));
235
236
0
        mpack_write_cstr(writer, "sum");
237
0
        mpack_write_uint(writer, metric->sum_sum);
238
239
0
        mpack_finish_map(writer); /* 'summary' */
240
0
    }
241
0
    else {
242
0
        mpack_write_cstr(writer, "value");
243
0
        val = cmt_metric_get_value(metric);
244
0
        mpack_write_double(writer, val);
245
0
    }
246
247
0
    s = cfl_list_size(&metric->labels);
248
0
    if (s > 0) {
249
0
        mpack_write_cstr(writer, "labels");
250
0
        mpack_start_array(writer, c_labels);
251
252
0
        cfl_list_foreach(head, &metric->labels) {
253
0
            label = cfl_list_entry(head, struct cmt_map_label, _head);
254
255
0
            if (label->name != NULL) {
256
0
                mpack_write_cstr(writer, label->name);
257
0
            }
258
0
            else {
259
0
                mpack_write_nil(writer);
260
0
            }
261
0
        }
262
263
0
        mpack_finish_array(writer);
264
0
    }
265
266
0
    mpack_write_cstr(writer, "hash");
267
0
    mpack_write_uint(writer, metric->hash);
268
269
0
    mpack_finish_map(writer);
270
271
0
    return 0;
272
0
}
273
274
static int pack_basic_type(mpack_writer_t *writer, struct cmt *cmt, struct cmt_map *map)
275
0
{
276
0
    int values_size = 0;
277
0
    struct cfl_list *head;
278
0
    struct cmt_metric *metric;
279
280
    /* metric scope dictionary that holds meta and values*/
281
0
    mpack_start_map(writer, 2);
282
283
0
    pack_header(writer, cmt, map);
284
285
0
    if (map->metric_static_set) {
286
0
        values_size++;
287
0
    }
288
0
    values_size += cfl_list_size(&map->metrics);
289
290
0
    mpack_write_cstr(writer, "values");
291
0
    mpack_start_array(writer, values_size);
292
293
0
    if (map->metric_static_set) {
294
0
        pack_metric(writer, map, &map->metric);
295
0
    }
296
297
0
    cfl_list_foreach(head, &map->metrics) {
298
0
        metric = cfl_list_entry(head, struct cmt_metric, _head);
299
0
        pack_metric(writer, map, metric);
300
0
    }
301
0
    mpack_finish_array(writer);
302
303
0
    mpack_finish_map(writer);
304
305
0
    return 0;
306
0
}
307
308
static void pack_static_labels(mpack_writer_t *writer, struct cmt *cmt)
309
0
{
310
0
    struct cmt_label *static_label;
311
0
    struct cfl_list  *head;
312
313
    /* 'static_labels' (static labels) */
314
0
    mpack_write_cstr(writer, "static_labels");
315
316
0
    mpack_start_array(writer, cfl_list_size(&cmt->static_labels->list));
317
318
0
    cfl_list_foreach(head, &cmt->static_labels->list) {
319
0
        static_label = cfl_list_entry(head, struct cmt_label, _head);
320
321
0
        mpack_start_array(writer, 2);
322
323
0
        mpack_write_cstr(writer, static_label->key);
324
0
        mpack_write_cstr(writer, static_label->val);
325
326
0
        mpack_finish_array(writer);
327
0
    }
328
329
0
    mpack_finish_array(writer);
330
0
}
331
332
static int pack_static_processing_section(mpack_writer_t *writer, struct cmt *cmt)
333
0
{
334
0
    mpack_write_cstr(writer, "processing");
335
336
0
    mpack_start_map(writer, 1);
337
338
0
    pack_static_labels(writer, cmt);
339
340
0
    mpack_finish_map(writer); /* 'processing' */
341
342
0
    return 0;
343
0
}
344
345
static int pack_context_header(mpack_writer_t *writer, struct cmt *cmt)
346
0
{
347
0
    int result;
348
349
0
    mpack_write_cstr(writer, "meta");
350
0
    mpack_start_map(writer, 3);
351
352
0
    mpack_write_cstr(writer, "cmetrics");
353
0
    result = pack_cfl_variant_kvlist(writer, cmt->internal_metadata);
354
355
0
    if (result != 0) {
356
0
        return -1;
357
0
    }
358
359
0
    mpack_write_cstr(writer, "external");
360
0
    result = pack_cfl_variant_kvlist(writer, cmt->external_metadata);
361
362
0
    if (result != 0) {
363
0
        return -2;
364
0
    }
365
366
0
    pack_static_processing_section(writer, cmt);
367
368
0
    mpack_finish_map(writer); /* 'context_header' */
369
370
0
    return 0;
371
0
}
372
373
static int pack_context_metrics(mpack_writer_t *writer, struct cmt *cmt)
374
0
{
375
0
    size_t                metric_count;
376
0
    struct cmt_histogram *histogram;
377
0
    struct cmt_summary   *summary;
378
0
    struct cmt_untyped   *untyped;
379
0
    struct cmt_counter   *counter;
380
0
    struct cmt_gauge     *gauge;
381
0
    struct cfl_list      *head;
382
383
0
    metric_count  = 0;
384
0
    metric_count += cfl_list_size(&cmt->counters);
385
0
    metric_count += cfl_list_size(&cmt->gauges);
386
0
    metric_count += cfl_list_size(&cmt->untypeds);
387
0
    metric_count += cfl_list_size(&cmt->summaries);
388
0
    metric_count += cfl_list_size(&cmt->histograms);
389
390
0
    mpack_write_cstr(writer, "metrics");
391
0
    mpack_start_array(writer, metric_count);
392
393
    /* Counters */
394
0
    cfl_list_foreach(head, &cmt->counters) {
395
0
        counter = cfl_list_entry(head, struct cmt_counter, _head);
396
0
        pack_basic_type(writer, cmt, counter->map);
397
0
    }
398
399
    /* Gauges */
400
0
    cfl_list_foreach(head, &cmt->gauges) {
401
0
        gauge = cfl_list_entry(head, struct cmt_gauge, _head);
402
0
        pack_basic_type(writer, cmt, gauge->map);
403
0
    }
404
405
    /* Untyped */
406
0
    cfl_list_foreach(head, &cmt->untypeds) {
407
0
        untyped = cfl_list_entry(head, struct cmt_untyped, _head);
408
0
        pack_basic_type(writer, cmt, untyped->map);
409
0
    }
410
411
    /* Summary */
412
0
    cfl_list_foreach(head, &cmt->summaries) {
413
0
        summary = cfl_list_entry(head, struct cmt_summary, _head);
414
0
        pack_basic_type(writer, cmt, summary->map);
415
0
    }
416
417
    /* Histogram */
418
0
    cfl_list_foreach(head, &cmt->histograms) {
419
0
        histogram = cfl_list_entry(head, struct cmt_histogram, _head);
420
0
        pack_basic_type(writer, cmt, histogram->map);
421
0
    }
422
423
0
    mpack_finish_array(writer);
424
425
0
    return 0;
426
0
}
427
428
static int pack_context(mpack_writer_t *writer, struct cmt *cmt)
429
0
{
430
0
    int result;
431
432
0
    mpack_start_map(writer, 2);
433
434
0
    result = pack_context_header(writer, cmt);
435
436
0
    if (result != 0) {
437
0
        return -1;
438
0
    }
439
440
0
    result = pack_context_metrics(writer, cmt);
441
442
0
    if (result != 0) {
443
0
        return -2;
444
0
    }
445
446
0
    mpack_finish_map(writer); /* outermost context scope */
447
448
0
    return 0;
449
0
}
450
451
/* Takes a cmetrics context and serialize it using msgpack */
452
int cmt_encode_msgpack_create(struct cmt *cmt, char **out_buf, size_t *out_size)
453
0
{
454
0
    char *data;
455
0
    size_t size;
456
0
    mpack_writer_t writer;
457
0
    int result;
458
459
    /*
460
     * CMetrics data schema
461
462
        {
463
            'meta' => {
464
                'cmetrics' => {
465
                                'producer': STRING
466
                },
467
                'external' => { ... },
468
                'processing' => {
469
                                    'static_labels' =>  [
470
                                                            [STRING, STRING], ...
471
                                                        ]
472
                                }
473
            },
474
            'metrics' =>    [
475
                                {
476
                                    'meta' => {
477
                                                'ver'  => INTEGER
478
                                                'type' => INTEGER
479
                                                            '0' = counter
480
                                                            '1' = gauge
481
                                                            '2' = histogram (WIP)
482
                                                'opts' => {
483
                                                            'ns'          => ns
484
                                                            'subsystem'   => subsystem
485
                                                            'name'        => name
486
                                                            'description' => description
487
                                                },
488
                                                'label_keys' => [STRING, ...],
489
                                                'buckets' => [n, ...]
490
                                    },
491
                                    'values' => [
492
                                        {
493
                                            'ts'   : nanosec timestamp,
494
                                            'value': float64 value,
495
                                            'label_values': [STRING, ...],
496
                                            'histogram':{
497
                                                            'sum': float64,
498
                                                            'count': uint64,
499
                                                            'buckets': [n, ...]
500
                                                        },
501
                                            'summary':  {
502
                                                            'sum': float64,
503
                                                            'count': uint64,
504
                                                            'quantiles': [n, ...],
505
                                                            'quantiles_set': uint64
506
                                                        },
507
                                            'hash': uint64 value
508
                                        }
509
                                    ]
510
                                }, ...
511
            ]
512
        }
513
     *
514
     *
515
     * The following fields are metric type specific and are only
516
     * included for histograms :
517
     *      meta->buckets
518
     *      values[n]->buckets
519
     *      values[n]->count
520
     *      values[n]->sum
521
     */
522
523
0
    if (cmt == NULL) {
524
0
        return -1;
525
0
    }
526
527
0
    mpack_writer_init_growable(&writer, &data, &size);
528
529
0
    result = pack_context(&writer, cmt);
530
531
0
    if (mpack_writer_destroy(&writer) != mpack_ok) {
532
0
        fprintf(stderr, "An error occurred encoding the data!\n");
533
534
0
        return -1;
535
0
    }
536
537
0
    if (result != 0) {
538
0
        return result;
539
0
    }
540
541
0
    *out_buf = data;
542
0
    *out_size = size;
543
544
0
    return 0;
545
0
}
546
547
void cmt_encode_msgpack_destroy(char *out_buf)
548
0
{
549
0
    if (NULL != out_buf) {
550
0
        MPACK_FREE(out_buf);
551
0
    }
552
0
}