/src/fluent-bit/plugins/out_prometheus_exporter/prom.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_kv.h> |
22 | | #include <fluent-bit/flb_metrics.h> |
23 | | |
24 | | #include "prom.h" |
25 | | #include "prom_http.h" |
26 | | |
27 | | static int config_add_labels(struct flb_output_instance *ins, |
28 | | struct prom_exporter *ctx) |
29 | 0 | { |
30 | 0 | struct mk_list *head; |
31 | 0 | struct flb_config_map_val *mv; |
32 | 0 | struct flb_slist_entry *k = NULL; |
33 | 0 | struct flb_slist_entry *v = NULL; |
34 | 0 | struct flb_kv *kv; |
35 | |
|
36 | 0 | if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) { |
37 | 0 | return 0; |
38 | 0 | } |
39 | | |
40 | | /* iterate all 'add_label' definitions */ |
41 | 0 | flb_config_map_foreach(head, mv, ctx->add_labels) { |
42 | 0 | if (mk_list_size(mv->val.list) != 2) { |
43 | 0 | flb_plg_error(ins, "'add_label' expects a key and a value, " |
44 | 0 | "e.g: 'add_label version 1.8.0'"); |
45 | 0 | return -1; |
46 | 0 | } |
47 | | |
48 | 0 | k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); |
49 | 0 | v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); |
50 | |
|
51 | 0 | kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str); |
52 | 0 | if (!kv) { |
53 | 0 | flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str); |
54 | 0 | return -1; |
55 | 0 | } |
56 | 0 | } |
57 | | |
58 | 0 | return 0; |
59 | 0 | } |
60 | | |
61 | | static int cb_prom_init(struct flb_output_instance *ins, |
62 | | struct flb_config *config, |
63 | | void *data) |
64 | 0 | { |
65 | 0 | int ret; |
66 | 0 | struct prom_exporter *ctx; |
67 | |
|
68 | 0 | flb_output_net_default("0.0.0.0", 2021 , ins); |
69 | |
|
70 | 0 | ctx = flb_calloc(1, sizeof(struct prom_exporter)); |
71 | 0 | if (!ctx) { |
72 | 0 | flb_errno(); |
73 | 0 | return -1; |
74 | 0 | } |
75 | 0 | ctx->ins = ins; |
76 | 0 | flb_kv_init(&ctx->kv_labels); |
77 | 0 | flb_output_set_context(ins, ctx); |
78 | | |
79 | | /* Load config map */ |
80 | 0 | ret = flb_output_config_map_set(ins, (void *) ctx); |
81 | 0 | if (ret == -1) { |
82 | 0 | return -1; |
83 | 0 | } |
84 | | |
85 | | /* Parse 'add_label' */ |
86 | 0 | ret = config_add_labels(ins, ctx); |
87 | 0 | if (ret == -1) { |
88 | 0 | return -1; |
89 | 0 | } |
90 | | |
91 | | /* HTTP Server context */ |
92 | 0 | ctx->http = prom_http_server_create(ctx, |
93 | 0 | ins->host.name, ins->host.port, config); |
94 | 0 | if (!ctx->http) { |
95 | 0 | flb_plg_error(ctx->ins, "could not initialize HTTP server, aborting"); |
96 | 0 | return -1; |
97 | 0 | } |
98 | | |
99 | | /* Hash table for metrics */ |
100 | 0 | ctx->ht_metrics = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 32, 0); |
101 | 0 | if (!ctx->ht_metrics) { |
102 | 0 | flb_plg_error(ctx->ins, "could not initialize hash table for metrics"); |
103 | 0 | return -1; |
104 | 0 | } |
105 | | |
106 | | /* Start HTTP Server */ |
107 | 0 | ret = prom_http_server_start(ctx->http); |
108 | 0 | if (ret == -1) { |
109 | 0 | return -1; |
110 | 0 | } |
111 | | |
112 | 0 | flb_plg_info(ctx->ins, "listening iface=%s tcp_port=%d", |
113 | 0 | ins->host.name, ins->host.port); |
114 | 0 | return 0; |
115 | 0 | } |
116 | | |
117 | | static void append_labels(struct prom_exporter *ctx, struct cmt *cmt) |
118 | 0 | { |
119 | 0 | struct flb_kv *kv; |
120 | 0 | struct mk_list *head; |
121 | |
|
122 | 0 | mk_list_foreach(head, &ctx->kv_labels) { |
123 | 0 | kv = mk_list_entry(head, struct flb_kv, _head); |
124 | 0 | cmt_label_add(cmt, kv->key, kv->val); |
125 | 0 | } |
126 | 0 | } |
127 | | |
128 | | static int hash_store(struct prom_exporter *ctx, struct flb_input_instance *ins, |
129 | | cfl_sds_t buf) |
130 | 0 | { |
131 | 0 | int ret; |
132 | 0 | int len; |
133 | |
|
134 | 0 | len = strlen(ins->name); |
135 | | |
136 | | /* store/override the content into the hash table */ |
137 | 0 | ret = flb_hash_table_add(ctx->ht_metrics, ins->name, len, |
138 | 0 | buf, cfl_sds_len(buf)); |
139 | 0 | if (ret < 0) { |
140 | 0 | return -1; |
141 | 0 | } |
142 | | |
143 | 0 | return 0; |
144 | 0 | } |
145 | | |
146 | | static flb_sds_t hash_format_metrics(struct prom_exporter *ctx) |
147 | 0 | { |
148 | 0 | int size = 2048; |
149 | 0 | flb_sds_t buf; |
150 | |
|
151 | 0 | struct mk_list *head; |
152 | 0 | struct flb_hash_table_entry *entry; |
153 | | |
154 | |
|
155 | 0 | buf = flb_sds_create_size(size); |
156 | 0 | if (!buf) { |
157 | 0 | return NULL; |
158 | 0 | } |
159 | | |
160 | | /* Take every hash entry and compose one buffer with the whole content */ |
161 | 0 | mk_list_foreach(head, &ctx->ht_metrics->entries) { |
162 | 0 | entry = mk_list_entry(head, struct flb_hash_table_entry, _head_parent); |
163 | 0 | flb_sds_cat_safe(&buf, entry->val, entry->val_size); |
164 | 0 | } |
165 | |
|
166 | 0 | return buf; |
167 | 0 | } |
168 | | |
169 | | static void cb_prom_flush(struct flb_event_chunk *event_chunk, |
170 | | struct flb_output_flush *out_flush, |
171 | | struct flb_input_instance *ins, void *out_context, |
172 | | struct flb_config *config) |
173 | 0 | { |
174 | 0 | int ret; |
175 | 0 | int add_ts; |
176 | 0 | size_t off = 0; |
177 | 0 | flb_sds_t metrics; |
178 | 0 | cfl_sds_t text = NULL; |
179 | 0 | cfl_sds_t tmp = NULL; |
180 | 0 | struct cmt *cmt; |
181 | 0 | struct prom_exporter *ctx = out_context; |
182 | 0 | int ok = CMT_DECODE_MSGPACK_SUCCESS; |
183 | |
|
184 | 0 | text = flb_sds_create_size(128); |
185 | 0 | if (text == NULL) { |
186 | 0 | flb_plg_debug(ctx->ins, "failed to allocate buffer for text representation of metrics"); |
187 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
188 | 0 | } |
189 | | |
190 | | /* |
191 | | * A new set of metrics has arrived, perform decoding, apply labels, |
192 | | * convert to Prometheus text format and store the output in the |
193 | | * hash table for metrics. |
194 | | * Note that metrics might be concatenated. So, we need to consume |
195 | | * until the end of event_chunk. |
196 | | */ |
197 | 0 | while ((ret = cmt_decode_msgpack_create(&cmt, |
198 | 0 | (char *) event_chunk->data, |
199 | 0 | event_chunk->size, &off)) == ok) { |
200 | | |
201 | | /* append labels set by config */ |
202 | 0 | append_labels(ctx, cmt); |
203 | | |
204 | | /* add timestamp in the output format ? */ |
205 | 0 | if (ctx->add_timestamp) { |
206 | 0 | add_ts = CMT_TRUE; |
207 | 0 | } |
208 | 0 | else { |
209 | 0 | add_ts = CMT_FALSE; |
210 | 0 | } |
211 | | |
212 | | /* convert to text representation */ |
213 | 0 | tmp = cmt_encode_prometheus_create(cmt, add_ts); |
214 | 0 | if (!tmp) { |
215 | 0 | cmt_destroy(cmt); |
216 | 0 | flb_sds_destroy(text); |
217 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
218 | 0 | } |
219 | 0 | ret = flb_sds_cat_safe(&text, tmp, flb_sds_len(tmp)); |
220 | 0 | if (ret != 0) { |
221 | 0 | flb_plg_error(ctx->ins, "could not concatenate text representant coming from: %s", |
222 | 0 | flb_input_name(ins)); |
223 | 0 | cmt_encode_prometheus_destroy(tmp); |
224 | 0 | flb_sds_destroy(text); |
225 | 0 | cmt_destroy(cmt); |
226 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
227 | 0 | } |
228 | 0 | cmt_encode_prometheus_destroy(tmp); |
229 | 0 | cmt_destroy(cmt); |
230 | 0 | } |
231 | | |
232 | 0 | if (cfl_sds_len(text) == 0) { |
233 | 0 | flb_plg_debug(ctx->ins, "context without metrics (empty)"); |
234 | 0 | flb_sds_destroy(text); |
235 | 0 | FLB_OUTPUT_RETURN(FLB_OK); |
236 | 0 | } |
237 | | |
238 | | /* register payload of metrics / override previous one */ |
239 | 0 | ret = hash_store(ctx, ins, text); |
240 | 0 | if (ret == -1) { |
241 | 0 | flb_plg_error(ctx->ins, "could not store metrics coming from: %s", |
242 | 0 | flb_input_name(ins)); |
243 | 0 | flb_sds_destroy(text); |
244 | 0 | cmt_destroy(cmt); |
245 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
246 | 0 | } |
247 | 0 | flb_sds_destroy(text); |
248 | | |
249 | | /* retrieve a full copy of all metrics */ |
250 | 0 | metrics = hash_format_metrics(ctx); |
251 | 0 | if (!metrics) { |
252 | 0 | flb_plg_error(ctx->ins, "could not retrieve metrics"); |
253 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
254 | 0 | } |
255 | | |
256 | | /* push new (full) metrics payload */ |
257 | 0 | ret = prom_http_server_mq_push_metrics(ctx->http, |
258 | 0 | (char *) metrics, |
259 | 0 | flb_sds_len(metrics)); |
260 | 0 | flb_sds_destroy(metrics); |
261 | |
|
262 | 0 | if (ret != 0) { |
263 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
264 | 0 | } |
265 | | |
266 | 0 | FLB_OUTPUT_RETURN(FLB_OK); |
267 | 0 | } |
268 | | |
269 | | static int cb_prom_exit(void *data, struct flb_config *config) |
270 | 0 | { |
271 | 0 | struct prom_exporter *ctx = data; |
272 | |
|
273 | 0 | if (!ctx) { |
274 | 0 | return 0; |
275 | 0 | } |
276 | | |
277 | 0 | if (ctx->ht_metrics) { |
278 | 0 | flb_hash_table_destroy(ctx->ht_metrics); |
279 | 0 | } |
280 | |
|
281 | 0 | flb_kv_release(&ctx->kv_labels); |
282 | 0 | prom_http_server_stop(ctx->http); |
283 | 0 | prom_http_server_destroy(ctx->http); |
284 | 0 | flb_free(ctx); |
285 | |
|
286 | 0 | return 0; |
287 | 0 | } |
288 | | |
289 | | /* Configuration properties map */ |
290 | | static struct flb_config_map config_map[] = { |
291 | | { |
292 | | FLB_CONFIG_MAP_BOOL, "add_timestamp", "false", |
293 | | 0, FLB_TRUE, offsetof(struct prom_exporter, add_timestamp), |
294 | | "Add timestamp to every metric honoring collection time." |
295 | | }, |
296 | | |
297 | | { |
298 | | FLB_CONFIG_MAP_SLIST_1, "add_label", NULL, |
299 | | FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct prom_exporter, add_labels), |
300 | | "TCP port for listening for HTTP connections." |
301 | | }, |
302 | | |
303 | | /* EOF */ |
304 | | {0} |
305 | | }; |
306 | | |
307 | | /* Plugin reference */ |
308 | | struct flb_output_plugin out_prometheus_exporter_plugin = { |
309 | | .name = "prometheus_exporter", |
310 | | .description = "Prometheus Exporter", |
311 | | .cb_init = cb_prom_init, |
312 | | .cb_flush = cb_prom_flush, |
313 | | .cb_exit = cb_prom_exit, |
314 | | .flags = FLB_OUTPUT_NET, |
315 | | .event_type = FLB_OUTPUT_METRICS, |
316 | | .config_map = config_map, |
317 | | }; |