Coverage Report

Created: 2025-09-04 07:51

/src/fluent-bit/plugins/in_prometheus_textfile/prometheus_textfile.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2025 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_input.h>
22
#include <fluent-bit/flb_input_plugin.h>
23
#include <fluent-bit/flb_config_map.h>
24
#include <fluent-bit/flb_file.h>
25
#include <fluent-bit/flb_sds.h>
26
#include <fluent-bit/flb_time.h>
27
28
#include <cmetrics/cmt_decode_prometheus.h>
29
#include "prometheus_textfile.h"
30
31
#include <sys/stat.h>
32
/* Glob support */
33
#ifndef _MSC_VER
34
#include <glob.h>
35
#endif
36
37
#ifdef _WIN32
38
#include <Windows.h>
39
#include <strsafe.h>
40
#define PATH_MAX MAX_PATH
41
#endif
42
43
#ifndef _WIN32
44
static struct cfl_array *read_glob(const char *path)
45
0
{
46
0
    int ret = -1;
47
0
    int ret_glb = -1;
48
0
    glob_t glb;
49
0
    size_t idx;
50
0
    struct cfl_array *list;
51
52
0
    ret_glb = glob(path, GLOB_NOSORT, NULL, &glb);
53
54
0
    if (ret_glb != 0) {
55
0
        switch(ret_glb){
56
0
        case GLOB_NOSPACE:
57
0
            flb_warn("[%s] glob: [%s] no space", __FUNCTION__, path);
58
0
            break;
59
0
        case GLOB_NOMATCH:
60
0
            flb_warn("[%s] glob: [%s] no match", __FUNCTION__, path);
61
0
            break;
62
0
        case GLOB_ABORTED:
63
0
            flb_warn("[%s] glob: [%s] aborted", __FUNCTION__, path);
64
0
            break;
65
0
        default:
66
0
            flb_warn("[%s] glob: [%s] other error", __FUNCTION__, path);
67
0
        }
68
0
        return NULL;
69
0
    }
70
71
0
    list = cfl_array_create(glb.gl_pathc);
72
0
    for (idx = 0; idx < glb.gl_pathc; idx++) {
73
0
        ret = cfl_array_append_string(list, glb.gl_pathv[idx]);
74
0
        if (ret < 0) {
75
0
            cfl_array_destroy(list);
76
0
            globfree(&glb);
77
0
            return NULL;
78
0
        }
79
0
    }
80
81
0
    globfree(&glb);
82
0
    return list;
83
0
}
84
#else
85
static char *dirname(char *path)
86
{
87
    char *ptr;
88
89
    ptr = strrchr(path, '\\');
90
91
    if (ptr == NULL) {
92
        return path;
93
    }
94
    *ptr++='\0';
95
    return path;
96
}
97
98
static struct cfl_array *read_glob_win(const char *path, struct cfl_array *list)
99
{
100
    char *star, *p0, *p1;
101
    char pattern[MAX_PATH];
102
    char buf[MAX_PATH];
103
    int ret;
104
    struct stat st;
105
    HANDLE hnd;
106
    WIN32_FIND_DATA data;
107
108
    if (strlen(path) > MAX_PATH - 1) {
109
        flb_error("path too long: %s", path);
110
        return NULL;
111
    }
112
113
    star = strchr(path, '*');
114
    if (star == NULL) {
115
        flb_error("path has no wild card: %s", path);
116
        return NULL;
117
    }
118
119
    /*
120
     * C:\data\tmp\input_*.conf
121
     *            0<-----|
122
     */
123
    p0 = star;
124
    while (path <= p0 && *p0 != '\\') {
125
        p0--;
126
    }
127
128
    /*
129
     * C:\data\tmp\input_*.conf
130
     *                   |---->1
131
     */
132
    p1 = star;
133
    while (*p1 && *p1 != '\\') {
134
        p1++;
135
    }
136
137
    memcpy(pattern, path, (p1 - path));
138
    pattern[p1 - path] = '\0';
139
140
    hnd = FindFirstFileA(pattern, &data);
141
142
    if (hnd == INVALID_HANDLE_VALUE) {
143
        flb_error("unable to open valid handle for: %s", path);
144
        return NULL;
145
    }
146
147
    if (list == NULL) {
148
        list = cfl_array_create(3);
149
150
        if (list == NULL) {
151
            flb_error("unable to allocate array");
152
            FindClose(hnd);
153
            return NULL;
154
        }
155
156
        /* cfl_array_resizable is hardcoded to return 0. */
157
        if (cfl_array_resizable(list, FLB_TRUE) != 0) {
158
            flb_error("unable to make array resizable");
159
            FindClose(hnd);
160
            cfl_array_destroy(list);
161
            return NULL;
162
        }
163
    }
164
165
    do {
166
        /* Ignore the current and parent dirs */
167
        if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) {
168
            continue;
169
        }
170
171
        /* Avoid an infinite loop */
172
        if (strchr(data.cFileName, '*')) {
173
            continue;
174
        }
175
176
        /* Create a path (prefix + filename + suffix) */
177
        memcpy(buf, path, p0 - path + 1);
178
        buf[p0 - path + 1] = '\0';
179
180
        if (FAILED(StringCchCatA(buf, MAX_PATH, data.cFileName))) {
181
            continue;
182
        }
183
184
        if (FAILED(StringCchCatA(buf, MAX_PATH, p1))) {
185
            continue;
186
        }
187
188
        if (strchr(p1, '*')) {
189
            if (read_glob_win(path, list) == NULL) {
190
                cfl_array_destroy(list);
191
                FindClose(hnd);
192
                return NULL;
193
            }
194
            continue;
195
        }
196
197
        ret = stat(buf, &st);
198
199
        if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) {
200
            cfl_array_append_string(list, buf);
201
        }
202
    } while (FindNextFileA(hnd, &data) != 0);
203
204
    FindClose(hnd);
205
    return list;
206
}
207
208
static struct cfl_array *read_glob(const char *path)
209
{
210
    return read_glob_win(path, NULL);
211
}
212
#endif
213
214
static int collect_metrics(struct prom_textfile *ctx)
215
0
{
216
0
    int i;
217
0
    int ret;
218
0
    char errbuf[256];
219
0
    struct stat st;
220
0
    struct mk_list *head;
221
0
    struct flb_slist_entry *entry;
222
0
    struct cmt *cmt = NULL;
223
0
    struct cmt_decode_prometheus_parse_opts opts = {0};
224
0
    flb_sds_t content;
225
0
    struct cfl_array *files;
226
227
    /* cmetrics prometheus decoder options */
228
0
    opts.default_timestamp = cfl_time_now();
229
0
    opts.errbuf = errbuf;
230
0
    opts.errbuf_size = sizeof(errbuf);
231
232
    /* iterate over configured paths */
233
0
    mk_list_foreach(head, ctx->path_list) {
234
0
        entry = mk_list_entry(head, struct flb_slist_entry, _head);
235
236
0
        files = read_glob(entry->str);
237
0
        if (!files) {
238
0
            flb_plg_error(ctx->ins, "error reading glob pattern: %s", entry->str);
239
0
            continue;
240
0
        }
241
0
        if (files->entry_count == 0) {
242
0
            flb_plg_debug(ctx->ins, "no files found for glob pattern: %s", entry->str);
243
0
            cfl_array_destroy(files);
244
0
            continue;
245
0
        }
246
247
        /* iterate files */
248
0
        for (i = 0; i < files->entry_count; i++) {
249
0
            ret = stat(files->entries[i]->data.as_string, &st);
250
251
            /* only process regular files */
252
0
            if (ret == 0 && S_ISREG(st.st_mode)) {
253
0
                content = flb_file_read(files->entries[i]->data.as_string);
254
0
                if (!content) {
255
0
                    flb_plg_debug(ctx->ins, "cannot read %s", files->entries[i]->data.as_string);
256
0
                    continue;
257
0
                }
258
259
0
                if (flb_sds_len(content) == 0) {
260
0
                    flb_sds_destroy(content);
261
0
                    continue;
262
0
                }
263
264
0
                cmt = NULL;
265
0
                memset(errbuf, 0, sizeof(errbuf));
266
0
                ret = cmt_decode_prometheus_create(&cmt,
267
0
                                                   content,
268
0
                                                   flb_sds_len(content),
269
0
                                                   &opts);
270
0
                flb_sds_destroy(content);
271
272
0
                if (ret == 0) {
273
0
                    flb_input_metrics_append(ctx->ins, NULL, 0, cmt);
274
0
                    cmt_decode_prometheus_destroy(cmt);
275
0
                }
276
0
                else {
277
0
                    flb_plg_error(ctx->ins, "error parsing file %s: '%s'",
278
0
                                  files->entries[i]->data.as_string, errbuf);
279
0
                    continue;
280
0
                }
281
0
            }
282
0
            else if (ret != 0) {
283
0
                flb_plg_error(ctx->ins, "cannot read '%s'", files->entries[i]->data.as_string);
284
0
                continue;
285
0
            }
286
0
        }
287
0
        cfl_array_destroy(files);
288
0
    }
289
290
0
    return 0;
291
0
}
292
293
static int cb_collect(struct flb_input_instance *ins,
294
                      struct flb_config *config, void *data)
295
0
{
296
0
    struct prom_textfile *ctx = data;
297
298
0
    collect_metrics(ctx);
299
300
0
    FLB_INPUT_RETURN(0);
301
0
}
302
303
static int cb_init(struct flb_input_instance *ins,
304
                   struct flb_config *config, void *data)
305
0
{
306
0
    int ret;
307
0
    struct prom_textfile *ctx;
308
309
0
    ctx = flb_calloc(1, sizeof(struct prom_textfile));
310
0
    if (!ctx) {
311
0
        flb_errno();
312
0
        return -1;
313
0
    }
314
0
    ctx->ins = ins;
315
316
0
    ret = flb_input_config_map_set(ins, (void *) ctx);
317
0
    if (ret == -1) {
318
0
        flb_free(ctx);
319
0
        return -1;
320
0
    }
321
322
0
    ctx->coll_fd = flb_input_set_collector_time(ins,
323
0
                                                cb_collect,
324
0
                                                ctx->scrape_interval,
325
0
                                                0, config);
326
0
    if (ctx->coll_fd < 0) {
327
0
        flb_free(ctx);
328
0
        return -1;
329
0
    }
330
331
0
    flb_input_set_context(ins, ctx);
332
0
    return 0;
333
0
}
334
335
static void cb_pause(void *data, struct flb_config *config)
336
0
{
337
0
    struct prom_textfile *ctx = data;
338
0
    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
339
0
}
340
341
static void cb_resume(void *data, struct flb_config *config)
342
0
{
343
0
    struct prom_textfile *ctx = data;
344
0
    flb_input_collector_resume(ctx->coll_fd, ctx->ins);
345
0
}
346
347
static int cb_exit(void *data, struct flb_config *config)
348
0
{
349
0
    struct prom_textfile *ctx = data;
350
351
0
    if (!ctx) {
352
0
        return 0;
353
0
    }
354
355
0
    flb_free(ctx);
356
0
    return 0;
357
0
}
358
359
static struct flb_config_map config_map[] = {
360
    {
361
     FLB_CONFIG_MAP_CLIST, "path", NULL,
362
     0, FLB_TRUE, offsetof(struct prom_textfile, path_list),
363
     "Comma separated list of files or glob patterns to read"
364
    },
365
    {
366
     FLB_CONFIG_MAP_TIME, "scrape_interval", "10s",
367
     0, FLB_TRUE, offsetof(struct prom_textfile, scrape_interval),
368
     "Scraping interval"
369
    },
370
    /* EOF */
371
    {0}
372
};
373
374
struct flb_input_plugin in_prometheus_textfile_plugin = {
375
    .name         = "prometheus_textfile",
376
    .description  = "Read Prometheus metrics from text files",
377
    .cb_init      = cb_init,
378
    .cb_pre_run   = NULL,
379
    .cb_collect   = cb_collect,
380
    .cb_flush_buf = NULL,
381
    .cb_pause     = cb_pause,
382
    .cb_resume    = cb_resume,
383
    .cb_exit      = cb_exit,
384
    .config_map   = config_map,
385
    .flags        = FLB_INPUT_THREADED | FLB_INPUT_CORO,
386
};
387