Coverage Report

Created: 2025-06-24 08:09

/src/fluent-bit/plugins/processor_sampling/sampling.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2025 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_processor_plugin.h>
21
#include <fluent-bit/flb_processor.h>
22
#include <fluent-bit/flb_hash_table.h>
23
24
#include "sampling.h"
25
#include "sampling_span_registry.h"
26
27
static int clean_empty_resource_spans(struct ctrace *ctr)
28
0
{
29
0
    int count = 0;
30
0
    struct cfl_list *head;
31
0
    struct cfl_list *head_scope_span;
32
0
    struct cfl_list *tmp;
33
0
    struct cfl_list *tmp_scope_span;
34
0
    struct ctrace_resource_span *resource_span;
35
0
    struct ctrace_scope_span *scope_span;
36
37
0
    cfl_list_foreach_safe(head, tmp, &ctr->resource_spans) {
38
0
        resource_span = cfl_list_entry(head, struct ctrace_resource_span, _head);
39
40
        /* iterate scope spans */
41
0
        cfl_list_foreach_safe(head_scope_span, tmp_scope_span, &resource_span->scope_spans) {
42
0
            scope_span = cfl_list_entry(head_scope_span, struct ctrace_scope_span, _head);
43
0
            if (cfl_list_is_empty(&scope_span->spans)) {
44
0
                ctr_scope_span_destroy(scope_span);
45
0
            }
46
0
        }
47
48
        /* check if resource span is now empty */
49
0
        if (cfl_list_is_empty(&resource_span->scope_spans)) {
50
0
            cfl_list_del(&resource_span->_head);
51
0
            ctr_resource_span_destroy(resource_span);
52
0
            count++;
53
0
        }
54
0
    }
55
56
0
    return count;
57
0
}
58
59
static void debug_trace(struct sampling *ctx, struct ctrace *ctr, int is_before)
60
0
{
61
0
    char tmp[128];
62
0
    struct sampling_span_registry *reg = NULL;
63
64
0
    reg = sampling_span_registry_create(100);
65
0
    if (!reg) {
66
0
        return;
67
0
    }
68
69
0
    sampling_span_registry_add_trace(ctx, reg, ctr);
70
0
    if (is_before) {
71
0
        snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): before", ctx->type_str, ctr);
72
0
        sampling_span_registry_print(ctx, reg, tmp);
73
0
    }
74
0
    else {
75
0
        snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): after", ctx->type_str, ctr);
76
0
        sampling_span_registry_print(ctx, reg, tmp);
77
0
    }
78
79
0
    sampling_span_registry_destroy(reg);
80
0
}
81
82
static int cb_process_traces(struct flb_processor_instance *ins,
83
                             struct ctrace *in_ctr,
84
                             struct ctrace **out_ctr,
85
                             const char *tag,
86
                             int tag_len)
87
0
{
88
0
    int ret;
89
0
    int count;
90
0
    struct sampling *ctx = ins->context;
91
92
    /* just a quick check for developers */
93
0
    if (!ctx->plugin->cb_do_sampling) {
94
0
        flb_plg_error(ins, "unimplemented sampling callback for type '%s'", ctx->type_str);
95
0
        return -1;
96
0
    }
97
98
0
    if (ctx->debug_mode) {
99
0
        debug_trace(ctx, in_ctr, FLB_TRUE);
100
0
    }
101
102
    /* do sampling: the callback will modify the ctrace context */
103
0
    ret = ctx->plugin->cb_do_sampling(ctx, ctx->plugin_context, in_ctr, out_ctr);
104
105
0
    if (ctx->debug_mode && *out_ctr) {
106
0
        debug_trace(ctx, *out_ctr, FLB_FALSE);
107
0
    }
108
109
    /* check if the ctrace context has empty resource spans */
110
0
    if (*out_ctr) {
111
0
        count = clean_empty_resource_spans(*out_ctr);
112
0
        flb_plg_trace(ins, "cleaned %i empty resource spans", count);
113
0
    }
114
115
0
    return ret;
116
0
}
117
118
/* register the sampling plugins available */
119
static void sampling_plugin_register(struct sampling *ctx)
120
0
{
121
0
    cfl_list_add(&sampling_probabilistic_plugin._head, &ctx->plugins);
122
0
}
123
124
static int cb_init(struct flb_processor_instance *processor_instance,
125
                   void *source_plugin_instance,
126
                   int source_plugin_type,
127
                   struct flb_config *config)
128
0
{
129
0
    int ret;
130
0
    struct sampling *ctx;
131
0
    struct flb_sched *sched;
132
133
    /* create main plugin context */
134
0
    ctx = sampling_config_create(processor_instance, config);
135
0
    if (!ctx) {
136
0
        return FLB_PROCESSOR_FAILURE;
137
0
    }
138
0
    processor_instance->context = (void *) ctx;
139
140
    /* register plugins */
141
0
    sampling_plugin_register(ctx);
142
143
0
    ret = sampling_config_process_rules(config, ctx);
144
0
    if (ret == -1) {
145
0
        flb_plg_error(processor_instance, "failed to parse sampling rules");
146
0
        flb_free(ctx);
147
0
        return -1;
148
0
    }
149
150
    /* get the scheduler context */
151
0
    sched = flb_sched_ctx_get();
152
0
    if (!sched) {
153
0
        flb_plg_error(ctx->ins, "could not get scheduler context");
154
0
        return -1;
155
0
    }
156
157
    /* initialize the backend plugin */
158
0
    ret = ctx->plugin->cb_init(config, ctx);
159
160
0
    return FLB_PROCESSOR_SUCCESS;
161
0
}
162
163
static int cb_exit(struct flb_processor_instance *processor_instance, void *data)
164
0
{
165
0
    if (processor_instance != NULL && data != NULL) {
166
0
        sampling_config_destroy(processor_instance->config, data);
167
0
    }
168
169
0
    return FLB_PROCESSOR_SUCCESS;
170
0
}
171
172
static struct flb_config_map config_map[] = {
173
    {
174
        FLB_CONFIG_MAP_STR, "type", NULL,
175
        0, FLB_TRUE, offsetof(struct sampling, type_str),
176
        "Type of the sampling processor"
177
    },
178
    {
179
        FLB_CONFIG_MAP_BOOL, "debug", "false",
180
        0, FLB_TRUE, offsetof(struct sampling, debug_mode),
181
        "Enable debug mode where it prints the trace and it spans"
182
    },
183
    {
184
        FLB_CONFIG_MAP_VARIANT, "sampling_settings", NULL,
185
        0, FLB_TRUE, offsetof(struct sampling, sampling_settings),
186
        "Sampling rules, these are defined by the sampling processor/type"
187
    },
188
    {
189
        FLB_CONFIG_MAP_VARIANT, "conditions", NULL,
190
        0, FLB_TRUE, offsetof(struct sampling, conditions),
191
        "Sampling conditions"
192
    },
193
194
    /* EOF */
195
    {0}
196
};
197
198
struct flb_processor_plugin processor_sampling_plugin = {
199
    .name               = "sampling",
200
    .description        = "Sampling",
201
    .cb_init            = cb_init,
202
    .cb_process_logs    = NULL,
203
    .cb_process_metrics = NULL,
204
    .cb_process_traces  = cb_process_traces,
205
    .cb_exit            = cb_exit,
206
    .config_map         = config_map,
207
    .flags              = 0
208
};