Coverage Report

Created: 2025-06-24 08:09

/src/fluent-bit/plugins/processor_sampling/sampling_tail.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2025 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_processor_plugin.h>
21
22
#include "sampling.h"
23
#include "sampling_span_registry.h"
24
25
struct sampling_ctrace_entry {
26
    struct ctrace *ctr;
27
    struct cfl_list _head; /* sampling_settings->list_ctraces */
28
};
29
30
struct sampling_settings {
31
    int decision_wait;
32
    uint64_t max_traces;
33
34
    /* internal */
35
    void *parent;                   /* struct sampling *ctx */
36
    uint64_t decision_wait_ms;
37
38
    /* linked list with a reference to all the ctraces contexts */
39
    struct cfl_list list_ctraces;
40
41
    /* span registry */
42
    struct sampling_span_registry *span_reg;
43
};
44
45
static struct flb_config_map settings_config_map[] = {
46
    {
47
        FLB_CONFIG_MAP_TIME, "decision_wait", "30s",
48
        0, FLB_TRUE, offsetof(struct sampling_settings, decision_wait),
49
    },
50
51
    {
52
        FLB_CONFIG_MAP_INT, "max_traces", "50000",
53
        0, FLB_TRUE, offsetof(struct sampling_settings, max_traces),
54
    },
55
56
    /* EOF */
57
    {0}
58
};
59
60
static struct cfl_array *copy_array(struct cfl_array *array);
61
static struct cfl_variant *copy_variant(struct cfl_variant *val);
62
static struct cfl_kvlist *copy_kvlist(struct cfl_kvlist *kv);
63
64
/* delete a list ctrace entry */
65
static void list_ctrace_delete_entry(struct sampling *ctx, struct sampling_ctrace_entry *ctrace_entry)
66
0
{
67
0
    ctr_destroy(ctrace_entry->ctr);
68
0
    cfl_list_del(&ctrace_entry->_head);
69
0
    flb_free(ctrace_entry);
70
0
}
71
72
/* delete ctrace entries with no spans */
73
static void list_ctrace_delete_empty(struct sampling *ctx, struct sampling_settings *settings)
74
0
{
75
0
    struct cfl_list *tmp;
76
0
    struct cfl_list *head;
77
0
    struct sampling_ctrace_entry *ctrace_entry;
78
79
0
    cfl_list_foreach_safe(head, tmp, &settings->list_ctraces) {
80
0
        ctrace_entry = cfl_list_entry(head, struct sampling_ctrace_entry, _head);
81
0
        if (cfl_list_size(&ctrace_entry->ctr->span_list) == 0) {
82
0
           list_ctrace_delete_entry(ctx, ctrace_entry);
83
0
        }
84
0
    }
85
0
}
86
87
static void list_ctrace_delete_all(struct sampling *ctx, struct sampling_settings *settings)
88
0
{
89
0
    struct cfl_list *tmp;
90
0
    struct cfl_list *head;
91
0
    struct sampling_ctrace_entry *ctrace_entry;
92
93
0
    cfl_list_foreach_safe(head, tmp, &settings->list_ctraces) {
94
0
        ctrace_entry = cfl_list_entry(head, struct sampling_ctrace_entry, _head);
95
0
        list_ctrace_delete_entry(ctx, ctrace_entry);
96
0
    }
97
0
}
98
99
static struct cfl_kvlist *copy_kvlist(struct cfl_kvlist *kv)
100
0
{
101
0
    struct cfl_kvlist *kvlist = NULL;
102
0
    struct cfl_kvpair *pair;
103
0
    struct cfl_variant *v;
104
0
    struct cfl_list *head;
105
106
0
    kvlist = cfl_kvlist_create();
107
0
    if (!kvlist) {
108
0
        return NULL;
109
0
    }
110
111
0
    cfl_list_foreach(head, &kv->list) {
112
0
        pair = cfl_list_entry(head, struct cfl_kvpair, _head);
113
0
        v = copy_variant(pair->val);
114
0
        if (!v) {
115
0
            cfl_kvlist_destroy(kvlist);
116
0
            return NULL;
117
0
        }
118
0
        cfl_kvlist_insert(kvlist, pair->key, v);
119
0
    }
120
121
0
    return kvlist;
122
0
}
123
124
static struct cfl_variant *copy_variant(struct cfl_variant *val)
125
0
{
126
0
    struct cfl_kvlist *kvlist;
127
0
    struct cfl_array *array;
128
0
    struct cfl_variant *var = NULL;
129
130
0
    switch (val->type) {
131
0
    case CFL_VARIANT_STRING:
132
0
        var = cfl_variant_create_from_string_s(val->data.as_string,
133
0
                                               cfl_variant_size_get(val),
134
0
                                               CFL_FALSE);
135
0
        break;
136
0
    case CFL_VARIANT_BYTES:
137
0
        var = cfl_variant_create_from_bytes(val->data.as_bytes,
138
0
                                            cfl_variant_size_get(val),
139
0
                                            CFL_FALSE);
140
0
        break;
141
0
    case CFL_VARIANT_BOOL:
142
0
        var = cfl_variant_create_from_bool(val->data.as_bool);
143
0
        break;
144
0
    case CFL_VARIANT_INT:
145
0
        var = cfl_variant_create_from_int64(val->data.as_int64);
146
0
        break;
147
0
    case CFL_VARIANT_UINT:
148
0
        var = cfl_variant_create_from_uint64(val->data.as_uint64);
149
0
        break;
150
0
    case CFL_VARIANT_DOUBLE:
151
0
        var = cfl_variant_create_from_double(val->data.as_double);
152
0
        break;
153
0
    case CFL_VARIANT_NULL:
154
0
        var = cfl_variant_create_from_null();
155
0
        break;
156
0
    case CFL_VARIANT_ARRAY:
157
0
        array = copy_array(val->data.as_array);
158
0
        if (!array) {
159
0
            return NULL;
160
0
        }
161
0
        var = cfl_variant_create_from_array(array);
162
0
        break;
163
0
    case CFL_VARIANT_KVLIST:
164
0
        kvlist = copy_kvlist(val->data.as_kvlist);
165
0
        if (!kvlist) {
166
0
            return NULL;
167
0
        }
168
0
        var = cfl_variant_create_from_kvlist(kvlist);
169
0
        break;
170
0
    default:
171
0
        var = NULL;
172
0
    }
173
174
0
    return var;
175
0
}
176
177
static struct cfl_array *copy_array(struct cfl_array *array)
178
0
{
179
0
    int i;
180
0
    struct cfl_array *copy;
181
0
    struct cfl_variant *v ;
182
183
0
    copy = cfl_array_create(array->entry_count);
184
0
    if (!copy) {
185
0
        return NULL;
186
0
    }
187
188
0
    for (i = 0; i < array->entry_count; i++) {
189
0
        v = copy_variant(array->entries[i]);
190
0
        if (!v) {
191
0
            cfl_array_destroy(copy);
192
0
            return NULL;
193
0
        }
194
0
        cfl_array_append(copy, v);
195
0
    }
196
197
0
    return copy;
198
0
}
199
200
struct ctrace_attributes *copy_attributes(struct sampling *ctx, struct ctrace_attributes *attr)
201
0
{
202
0
    int ret = -1;
203
0
    struct cfl_list *head;
204
0
    struct cfl_kvpair *pair;
205
0
    struct cfl_array *array;
206
0
    struct cfl_kvlist *kvlist;
207
0
    struct ctrace_attributes *attr_copy;
208
209
0
    attr_copy = ctr_attributes_create();
210
0
    if (!attr_copy) {
211
0
        return NULL;
212
0
    }
213
214
0
    cfl_list_foreach(head, &attr->kv->list) {
215
0
        pair = cfl_list_entry(head, struct cfl_kvpair, _head);
216
217
0
        if (pair->val->type == CFL_VARIANT_STRING) {
218
0
            ret = ctr_attributes_set_string(attr_copy, pair->key, pair->val->data.as_string);
219
0
        }
220
0
        else if (pair->val->type == CFL_VARIANT_BOOL) {
221
0
            ret = ctr_attributes_set_bool(attr_copy, pair->key, pair->val->data.as_bool);
222
0
        }
223
0
        else if (pair->val->type == CFL_VARIANT_INT) {
224
0
            ret = ctr_attributes_set_int64(attr_copy, pair->key, pair->val->data.as_int64);
225
0
        }
226
0
        else if (pair->val->type == CFL_VARIANT_DOUBLE) {
227
0
            ret = ctr_attributes_set_double(attr_copy, pair->key, pair->val->data.as_double);
228
0
        }
229
0
        else if (pair->val->type == CFL_VARIANT_ARRAY) {
230
0
            array = copy_array(pair->val->data.as_array);
231
0
            if (!array) {
232
0
                flb_plg_error(ctx->ins, "could not copy array attribute");
233
0
                ctr_attributes_destroy(attr_copy);
234
0
                return NULL;
235
0
            }
236
237
0
            ret = ctr_attributes_set_array(attr_copy, pair->key, array);
238
0
            if (ret != 0) {
239
0
                cfl_array_destroy(array);
240
0
            }
241
0
        }
242
0
        else if (pair->val->type == CFL_VARIANT_KVLIST) {
243
0
            kvlist = copy_kvlist(pair->val->data.as_kvlist);
244
0
            if (!kvlist) {
245
0
                flb_plg_error(ctx->ins, "could not copy kvlist attribute");
246
0
                ctr_attributes_destroy(attr_copy);
247
0
                return NULL;
248
0
            }
249
0
            ret = ctr_attributes_set_kvlist(attr_copy, pair->key, kvlist);
250
0
            if (ret != 0) {
251
0
                cfl_kvlist_destroy(kvlist);
252
0
            }
253
0
        }
254
0
        else {
255
0
            flb_plg_error(ctx->ins, "unsupported attribute type %i", pair->val->type);
256
0
            ctr_attributes_destroy(attr_copy);
257
0
            return NULL;
258
0
        }
259
0
    }
260
261
0
    if (ret != 0) {
262
0
        ctr_attributes_destroy(attr_copy);
263
0
        return NULL;
264
0
    }
265
266
0
    return attr_copy;
267
0
};
268
269
static struct ctrace *reconcile_and_create_ctrace(struct sampling *ctx, struct sampling_settings *settings, struct trace_entry *t_entry)
270
0
{
271
0
    struct cfl_list *tmp;
272
0
    struct cfl_list *head;
273
0
    struct trace_span *t_span;
274
0
    struct ctrace *ctr = NULL;
275
0
    struct ctrace_resource_span *resource_span = NULL;
276
0
    struct ctrace_resource *resource = NULL;
277
0
    struct ctrace_scope_span *scope_span = NULL;
278
0
    struct ctrace_instrumentation_scope *instrumentation_scope = NULL;
279
0
    struct ctrace_span *span;
280
0
    struct ctrace_attributes *attr;
281
282
    /* for each complete trace, reconcile, convert to ctrace context and enqueue it */
283
0
    cfl_list_foreach_safe(head, tmp, &t_entry->span_list) {
284
0
        t_span = cfl_list_entry(head, struct trace_span, _head);
285
0
        span = t_span->span;
286
287
        /* create a new ctraces context if does not exists */
288
0
        if (!ctr) {
289
0
            ctr = ctr_create(NULL);
290
0
            if (!ctr) {
291
0
                flb_plg_error(ctx->ins, "could not create ctrace context");
292
0
                return NULL;
293
0
            }
294
0
        }
295
296
        /* create a resource span */
297
0
        if (!resource_span) {
298
0
            resource_span = ctr_resource_span_create(ctr);
299
0
            if (!resource_span) {
300
0
                flb_plg_error(ctx->ins, "could not create resource span");
301
0
                ctr_destroy(ctr);
302
0
                return NULL;
303
0
            }
304
0
        }
305
306
0
        if (!resource) {
307
0
            resource = ctr_resource_span_get_resource(resource_span);
308
0
            if (!resource) {
309
0
                flb_plg_error(ctx->ins, "could not get resource");
310
0
                ctr_destroy(ctr);
311
0
                return NULL;
312
0
            }
313
314
            /* resource attributes */
315
0
            if (span->scope_span->resource_span->resource->attr) {
316
0
                attr = copy_attributes(ctx, span->scope_span->resource_span->resource->attr);
317
0
                if (attr) {
318
0
                    ctr_resource_set_attributes(resource, attr);
319
0
                }
320
0
            }
321
322
            /* resource dropped attributes count */
323
0
            if (span->scope_span->resource_span->resource->dropped_attr_count) {
324
0
                ctr_resource_set_dropped_attr_count(resource, span->scope_span->resource_span->resource->dropped_attr_count);
325
0
            }
326
327
            /* resource schema url */
328
0
            if (span->scope_span->resource_span->schema_url) {
329
0
                ctr_resource_span_set_schema_url(resource_span, span->scope_span->resource_span->schema_url);
330
0
            }
331
0
        }
332
333
0
        if (!scope_span) {
334
0
            scope_span = ctr_scope_span_create(resource_span);
335
0
            if (!scope_span) {
336
0
                flb_plg_error(ctx->ins, "could not create scope span");
337
0
                ctr_destroy(ctr);
338
0
                return NULL;
339
0
            }
340
0
        }
341
342
0
        if (!instrumentation_scope) {
343
            /* this is optional, check in the original span context if we have some instrumentation associated */
344
0
            if (span->scope_span->instrumentation_scope) {
345
0
                attr = NULL;
346
0
                if (span->scope_span->instrumentation_scope->attr) {
347
0
                    attr = copy_attributes(ctx, span->scope_span->instrumentation_scope->attr);
348
0
                }
349
350
0
                instrumentation_scope = ctr_instrumentation_scope_create(span->scope_span->instrumentation_scope->name,
351
0
                                                                         span->scope_span->instrumentation_scope->version,
352
0
                                                                         span->scope_span->instrumentation_scope->dropped_attr_count,
353
0
                                                                         attr);
354
0
                if (instrumentation_scope) {
355
0
                    ctr_scope_span_set_instrumentation_scope(scope_span, instrumentation_scope);
356
0
                }
357
0
            }
358
0
        }
359
360
        /*
361
         * Detach the span from its previous context completely and
362
         * re-attach it to the new one. If we only move the local list
363
         * reference (span->_head) the span would still belong to the
364
         * original ctrace context which later on might lead to use after
365
         * free issues when the new context is destroyed. Make sure to
366
         * update all references.
367
         */
368
369
        /* detach from the original scope span and global list */
370
0
        cfl_list_del(&span->_head);
371
0
        cfl_list_del(&span->_head_global);
372
373
        /* update parent references */
374
0
        span->scope_span = scope_span;
375
0
        span->ctx = ctr;
376
377
        /* link to the new scope span and ctrace context */
378
0
        cfl_list_add(&span->_head, &scope_span->spans);
379
0
        cfl_list_add(&span->_head_global, &ctr->span_list);
380
381
        /* reset all the contexts */
382
0
        resource_span = NULL;
383
0
        resource = NULL;
384
0
        scope_span = NULL;
385
0
        instrumentation_scope = NULL;
386
387
        /* remote t_span entry */
388
0
        cfl_list_del(&t_span->_head);
389
0
        flb_free(t_span);
390
0
    }
391
392
0
    sampling_span_registry_delete_entry(ctx, settings->span_reg, t_entry, FLB_FALSE);
393
394
0
    return ctr;
395
0
}
396
397
static int check_conditions(struct sampling *ctx, struct trace_entry *t_entry)
398
0
{
399
0
    int ret;
400
0
    struct cfl_list *head;
401
0
    struct trace_span *t_span;
402
403
0
    cfl_list_foreach(head, &t_entry->span_list) {
404
0
        t_span = cfl_list_entry(head, struct trace_span, _head);
405
0
        ret = sampling_conditions_check(ctx, ctx->sampling_conditions, t_entry, t_span->span);
406
0
        if (ret == FLB_TRUE) {
407
0
            return FLB_TRUE;
408
0
        }
409
0
    }
410
411
0
    return FLB_FALSE;
412
0
}
413
414
static void trace_entry_delete_spans(struct trace_entry *t_entry)
415
0
{
416
0
    struct cfl_list *tmp;
417
0
    struct cfl_list *head;
418
0
    struct trace_span *t_span;
419
420
0
    cfl_list_foreach_safe(head, tmp, &t_entry->span_list) {
421
0
        t_span = cfl_list_entry(head, struct trace_span, _head);
422
0
        cfl_list_del(&t_span->_head);
423
0
        ctr_span_destroy(t_span->span);
424
0
        flb_free(t_span);
425
0
    }
426
0
}
427
428
static int reconcile_and_dispatch_traces(struct sampling *ctx, struct sampling_settings *settings)
429
0
{
430
0
    int ret;
431
0
    time_t now;
432
0
    struct cfl_list *tmp;
433
0
    struct cfl_list *head;
434
0
    struct trace_entry *t_entry;
435
0
    struct ctrace *ctr = NULL;
436
437
0
    now = time(NULL);
438
439
    /* for each complete trace, reconcile, convert to ctraces contexts (plural) and enqueue them */
440
0
    cfl_list_foreach_safe(head, tmp, &settings->span_reg->trace_list) {
441
0
        t_entry = cfl_list_entry(head, struct trace_entry, _head);
442
443
        /* check if this trace still need to wait */
444
0
        if (t_entry->ts_created + settings->decision_wait > now) {
445
0
            continue;
446
0
        }
447
448
        /*
449
         * check if the spans registered to this trace entry matches the conditions: if only one span
450
         * matches, we keep the trace entry, otherwise we discard it
451
         */
452
0
        ret = check_conditions(ctx, t_entry);
453
0
        if (ret == FLB_FALSE) {
454
            /* t_entry has many t_spans, since the spans will be discarded is safe to remove it original ctr_span reference  */
455
0
            trace_entry_delete_spans(t_entry);
456
457
            /* remove the trace entry */
458
0
            sampling_span_registry_delete_entry(ctx, settings->span_reg, t_entry, FLB_FALSE);
459
0
            continue;
460
0
        }
461
462
        /* Compose a new ctrace context using the spans associated to the same trace_id */
463
0
        ctr = reconcile_and_create_ctrace(ctx, settings, t_entry);
464
0
        if (!ctr) {
465
0
            flb_plg_error(ctx->ins, "could not reconcile and create ctrace context");
466
0
            return -1;
467
0
        }
468
469
        /* add the new ctrace contex to the pipeline */
470
0
        ret = flb_input_trace_append_skip_processor_stages(ctx->input_ins, ctx->ins->pu->stage + 1, NULL, 0, ctr);
471
0
        if (ret != 0) {
472
0
            flb_plg_error(ctx->ins, "could not enqueue ctrace context");
473
0
            ctr_destroy(ctr);
474
0
            return -1;
475
0
        }
476
0
    }
477
478
0
    return 0;
479
0
}
480
481
static void cb_timer_flush(struct flb_config *config, void *data)
482
0
{
483
0
    int ret;
484
0
    struct sampling_settings *settings;
485
0
    struct sampling *ctx;
486
487
0
    settings = (struct sampling_settings *) data;
488
0
    ctx = settings->parent;
489
490
0
    ret = reconcile_and_dispatch_traces(ctx, settings);
491
0
    if (ret != 0) {
492
0
        flb_plg_error(ctx->ins, "could not reconcile and dispatch traces");
493
0
    }
494
495
    /* delete empty ctraces contexts */
496
0
    list_ctrace_delete_empty(ctx, settings);
497
0
}
498
499
static int cb_init(struct flb_config *config, struct sampling *ctx)
500
0
{
501
0
    int ret;
502
0
    struct sampling_settings *settings;
503
0
    struct flb_sched *sched;
504
505
0
    flb_plg_info(ctx->ins, "initializing 'tail' sampling processor");
506
507
0
    settings = flb_calloc(1, sizeof(struct sampling_settings));
508
0
    if (!settings) {
509
0
        flb_errno();
510
0
        return -1;
511
0
    }
512
0
    settings->parent = ctx;
513
0
    cfl_list_init(&settings->list_ctraces);
514
515
    /* get the scheduler context */
516
0
    sched = flb_sched_ctx_get();
517
0
    if (!sched) {
518
0
        flb_plg_error(ctx->ins, "could not get scheduler context");
519
0
        return -1;
520
0
    }
521
522
0
    ret = flb_config_map_set(&ctx->plugin_settings_properties, ctx->plugin_config_map, (void *) settings);
523
0
    if (ret == -1) {
524
0
        flb_free(settings);
525
0
        return -1;
526
0
    }
527
528
    /* convert decision wait to milliseconds*/
529
0
    settings->decision_wait_ms = settings->decision_wait * 1000;
530
531
    /* set a timer callback */
532
0
    ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
533
0
                                    settings->decision_wait_ms, cb_timer_flush,
534
0
                                    settings, NULL);
535
0
    if (ret != 0) {
536
0
        flb_plg_error(ctx->ins, "could not create timer");
537
0
        flb_free(settings);
538
0
        return -1;
539
0
    }
540
541
0
    settings->span_reg = sampling_span_registry_create(settings->max_traces);
542
0
    if (!settings->span_reg) {
543
0
        flb_plg_error(ctx->ins, "could not span registry");
544
0
        flb_free(settings);
545
0
        return -1;
546
0
    }
547
548
0
    sampling_set_context(ctx, settings);
549
0
    return 0;
550
0
}
551
552
static int cb_do_sampling(struct sampling *ctx, void *plugin_context,
553
                          struct ctrace *in_ctr, struct ctrace **out_ctr)
554
0
{
555
0
    int ret;
556
0
    struct sampling_ctrace_entry *ctrace_entry;
557
0
    struct sampling_settings *settings = plugin_context;
558
559
0
    ret = sampling_span_registry_add_trace(ctx, settings->span_reg, in_ctr);
560
0
    if (ret == -1) {
561
0
        flb_plg_error(ctx->ins, "failed to add trace to span registry");
562
0
        return FLB_PROCESSOR_FAILURE;
563
0
    }
564
565
    /* register the ctrace context */
566
0
    ctrace_entry = flb_malloc(sizeof(struct sampling_ctrace_entry));
567
0
    if (!ctrace_entry) {
568
0
        flb_errno();
569
0
        return FLB_PROCESSOR_FAILURE;
570
0
    }
571
0
    ctrace_entry->ctr = in_ctr;
572
0
    cfl_list_add(&ctrace_entry->_head, &settings->list_ctraces);
573
574
    /* caller must not destroy the ctrace reference */
575
0
    *out_ctr = NULL;
576
577
0
    return FLB_PROCESSOR_SUCCESS;
578
0
}
579
580
static int cb_exit(struct flb_config *config, void *data)
581
0
{
582
0
     struct sampling_settings *settings = data;
583
584
0
     if (!settings) {
585
0
        return 0;
586
0
     }
587
588
0
     if (settings->span_reg) {
589
0
        sampling_span_registry_destroy(settings->span_reg);
590
0
     }
591
592
0
     list_ctrace_delete_all(settings->parent, settings);
593
594
595
0
     flb_free(settings);
596
0
     return 0;
597
0
}
598
599
struct sampling_plugin sampling_tail_plugin = {
600
    .type           = SAMPLING_TYPE_TAIL,
601
    .name           = "tail",
602
    .config_map     = settings_config_map,
603
    .cb_init        = cb_init,
604
    .cb_do_sampling = cb_do_sampling,
605
    .cb_exit        = cb_exit,
606
};