Coverage Report

Created: 2025-01-28 07:34

/src/fluent-bit/plugins/out_prometheus_exporter/prom.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_metrics.h>
23
24
#include "prom.h"
25
#include "prom_http.h"
26
27
static int config_add_labels(struct flb_output_instance *ins,
28
                             struct prom_exporter *ctx)
29
0
{
30
0
    struct mk_list *head;
31
0
    struct flb_config_map_val *mv;
32
0
    struct flb_slist_entry *k = NULL;
33
0
    struct flb_slist_entry *v = NULL;
34
0
    struct flb_kv *kv;
35
36
0
    if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
37
0
        return 0;
38
0
    }
39
40
    /* iterate all 'add_label' definitions */
41
0
    flb_config_map_foreach(head, mv, ctx->add_labels) {
42
0
        if (mk_list_size(mv->val.list) != 2) {
43
0
            flb_plg_error(ins, "'add_label' expects a key and a value, "
44
0
                          "e.g: 'add_label version 1.8.0'");
45
0
            return -1;
46
0
        }
47
48
0
        k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
49
0
        v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
50
51
0
        kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
52
0
        if (!kv) {
53
0
            flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
54
0
            return -1;
55
0
        }
56
0
    }
57
58
0
    return 0;
59
0
}
60
61
static int cb_prom_init(struct flb_output_instance *ins,
62
                        struct flb_config *config,
63
                        void *data)
64
0
{
65
0
    int ret;
66
0
    struct prom_exporter *ctx;
67
68
0
    flb_output_net_default("0.0.0.0", 2021 , ins);
69
70
0
    ctx = flb_calloc(1, sizeof(struct prom_exporter));
71
0
    if (!ctx) {
72
0
        flb_errno();
73
0
        return -1;
74
0
    }
75
0
    ctx->ins = ins;
76
0
    flb_kv_init(&ctx->kv_labels);
77
0
    flb_output_set_context(ins, ctx);
78
79
    /* Load config map */
80
0
    ret = flb_output_config_map_set(ins, (void *) ctx);
81
0
    if (ret == -1) {
82
0
        return -1;
83
0
    }
84
85
    /* Parse 'add_label' */
86
0
    ret = config_add_labels(ins, ctx);
87
0
    if (ret == -1) {
88
0
        return -1;
89
0
    }
90
91
    /* HTTP Server context */
92
0
    ctx->http = prom_http_server_create(ctx,
93
0
                                        ins->host.name, ins->host.port, config);
94
0
    if (!ctx->http) {
95
0
        flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting");
96
0
        return -1;
97
0
    }
98
99
    /* Hash table for metrics */
100
0
    ctx->ht_metrics = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 32, 0);
101
0
    if (!ctx->ht_metrics) {
102
0
        flb_plg_error(ctx->ins, "could not initialize hash table for metrics");
103
0
        return -1;
104
0
    }
105
106
    /* Start HTTP Server */
107
0
    ret = prom_http_server_start(ctx->http);
108
0
    if (ret == -1) {
109
0
        return -1;
110
0
    }
111
112
0
    flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d",
113
0
                 ins->host.name, ins->host.port);
114
0
    return 0;
115
0
}
116
117
static void append_labels(struct prom_exporter *ctx, struct cmt *cmt)
118
0
{
119
0
    struct flb_kv *kv;
120
0
    struct mk_list *head;
121
122
0
    mk_list_foreach(head, &ctx->kv_labels) {
123
0
        kv = mk_list_entry(head, struct flb_kv, _head);
124
0
        cmt_label_add(cmt, kv->key, kv->val);
125
0
    }
126
0
}
127
128
static int hash_store(struct prom_exporter *ctx, struct flb_input_instance *ins,
129
                      cfl_sds_t buf)
130
0
{
131
0
    int ret;
132
0
    int len;
133
134
0
    len = strlen(ins->name);
135
136
    /* store/override the content into the hash table */
137
0
    ret = flb_hash_table_add(ctx->ht_metrics, ins->name, len,
138
0
                             buf, cfl_sds_len(buf));
139
0
    if (ret < 0) {
140
0
        return -1;
141
0
    }
142
143
0
    return 0;
144
0
}
145
146
static flb_sds_t hash_format_metrics(struct prom_exporter *ctx)
147
0
{
148
0
    int size = 2048;
149
0
    flb_sds_t buf;
150
151
0
    struct mk_list *head;
152
0
    struct flb_hash_table_entry *entry;
153
154
155
0
    buf = flb_sds_create_size(size);
156
0
    if (!buf) {
157
0
        return NULL;
158
0
    }
159
160
    /* Take every hash entry and compose one buffer with the whole content */
161
0
    mk_list_foreach(head, &ctx->ht_metrics->entries) {
162
0
        entry = mk_list_entry(head, struct flb_hash_table_entry, _head_parent);
163
0
        flb_sds_cat_safe(&buf, entry->val, entry->val_size);
164
0
    }
165
166
0
    return buf;
167
0
}
168
169
static void cb_prom_flush(struct flb_event_chunk *event_chunk,
170
                          struct flb_output_flush *out_flush,
171
                          struct flb_input_instance *ins, void *out_context,
172
                          struct flb_config *config)
173
0
{
174
0
    int ret;
175
0
    int add_ts;
176
0
    size_t off = 0;
177
0
    flb_sds_t metrics;
178
0
    cfl_sds_t text = NULL;
179
0
    cfl_sds_t tmp = NULL;
180
0
    struct cmt *cmt;
181
0
    struct prom_exporter *ctx = out_context;
182
0
    int ok = CMT_DECODE_MSGPACK_SUCCESS;
183
184
0
    text = flb_sds_create_size(128);
185
0
    if (text == NULL) {
186
0
        flb_plg_debug(ctx->ins, "failed to allocate buffer for text representation of metrics");
187
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
188
0
    }
189
190
    /*
191
     * A new set of metrics has arrived, perform decoding, apply labels,
192
     * convert to Prometheus text format and store the output in the
193
     * hash table for metrics.
194
     * Note that metrics might be concatenated. So, we need to consume
195
     * until the end of event_chunk.
196
     */
197
0
    while ((ret = cmt_decode_msgpack_create(&cmt,
198
0
                                            (char *) event_chunk->data,
199
0
                                            event_chunk->size, &off)) == ok) {
200
201
        /* append labels set by config */
202
0
        append_labels(ctx, cmt);
203
204
        /* add timestamp in the output format ? */
205
0
        if (ctx->add_timestamp) {
206
0
            add_ts = CMT_TRUE;
207
0
        }
208
0
        else {
209
0
            add_ts = CMT_FALSE;
210
0
        }
211
212
        /* convert to text representation */
213
0
        tmp = cmt_encode_prometheus_create(cmt, add_ts);
214
0
        if (!tmp) {
215
0
            cmt_destroy(cmt);
216
0
            flb_sds_destroy(text);
217
0
            FLB_OUTPUT_RETURN(FLB_ERROR);
218
0
        }
219
0
        ret = flb_sds_cat_safe(&text, tmp, flb_sds_len(tmp));
220
0
        if (ret != 0) {
221
0
            flb_plg_error(ctx->ins, "could not concatenate text representant coming from: %s",
222
0
                          flb_input_name(ins));
223
0
            cmt_encode_prometheus_destroy(tmp);
224
0
            flb_sds_destroy(text);
225
0
            cmt_destroy(cmt);
226
0
            FLB_OUTPUT_RETURN(FLB_ERROR);
227
0
        }
228
0
        cmt_encode_prometheus_destroy(tmp);
229
0
        cmt_destroy(cmt);
230
0
    }
231
232
0
    if (cfl_sds_len(text) == 0) {
233
0
        flb_plg_debug(ctx->ins, "context without metrics (empty)");
234
0
        flb_sds_destroy(text);
235
0
        FLB_OUTPUT_RETURN(FLB_OK);
236
0
    }
237
238
    /* register payload of metrics / override previous one */
239
0
    ret = hash_store(ctx, ins, text);
240
0
    if (ret == -1) {
241
0
        flb_plg_error(ctx->ins, "could not store metrics coming from: %s",
242
0
                      flb_input_name(ins));
243
0
        flb_sds_destroy(text);
244
0
        cmt_destroy(cmt);
245
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
246
0
    }
247
0
    flb_sds_destroy(text);
248
249
    /* retrieve a full copy of all metrics */
250
0
    metrics = hash_format_metrics(ctx);
251
0
    if (!metrics) {
252
0
        flb_plg_error(ctx->ins, "could not retrieve metrics");
253
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
254
0
    }
255
256
    /* push new (full) metrics payload */
257
0
    ret = prom_http_server_mq_push_metrics(ctx->http,
258
0
                                           (char *) metrics,
259
0
                                           flb_sds_len(metrics));
260
0
    flb_sds_destroy(metrics);
261
262
0
    if (ret != 0) {
263
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
264
0
    }
265
266
0
    FLB_OUTPUT_RETURN(FLB_OK);
267
0
}
268
269
static int cb_prom_exit(void *data, struct flb_config *config)
270
0
{
271
0
    struct prom_exporter *ctx = data;
272
273
0
    if (!ctx) {
274
0
        return 0;
275
0
    }
276
277
0
    if (ctx->ht_metrics) {
278
0
        flb_hash_table_destroy(ctx->ht_metrics);
279
0
    }
280
281
0
    flb_kv_release(&ctx->kv_labels);
282
0
    prom_http_server_stop(ctx->http);
283
0
    prom_http_server_destroy(ctx->http);
284
0
    flb_free(ctx);
285
286
0
    return 0;
287
0
}
288
289
/* Configuration properties map */
290
static struct flb_config_map config_map[] = {
291
    {
292
     FLB_CONFIG_MAP_BOOL, "add_timestamp", "false",
293
     0, FLB_TRUE, offsetof(struct prom_exporter, add_timestamp),
294
     "Add timestamp to every metric honoring collection time."
295
    },
296
297
    {
298
     FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
299
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct prom_exporter, add_labels),
300
     "TCP port for listening for HTTP connections."
301
    },
302
303
    /* EOF */
304
    {0}
305
};
306
307
/* Plugin reference */
308
struct flb_output_plugin out_prometheus_exporter_plugin = {
309
    .name        = "prometheus_exporter",
310
    .description = "Prometheus Exporter",
311
    .cb_init     = cb_prom_init,
312
    .cb_flush    = cb_prom_flush,
313
    .cb_exit     = cb_prom_exit,
314
    .flags       = FLB_OUTPUT_NET,
315
    .event_type  = FLB_OUTPUT_METRICS,
316
    .config_map  = config_map,
317
};