Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/processor_tda/tda.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2025 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_processor_plugin.h>
21
#include <fluent-bit/flb_hash_table.h>
22
23
/* lwrb header */
24
#include <lwrb/lwrb.h>
25
26
#include <math.h>
27
#include <string.h>
28
#include <stdlib.h>
29
30
#include "tda.h"
31
32
/* Choose a distance threshold from a dense (n x n) distance matrix.
33
 * We collect all off-diagonal distances (i > j), sort them, and
34
 * return the given quantile (e.g. q=0.5 → median).
35
 *
36
 * If anything goes wrong, we return 0.0f and let the wrapper fall
37
 * back to "automatic" (enclosing radius) mode.
38
 */
39
static int cmp_float_asc(const void *a, const void *b)
40
0
{
41
0
    const float fa = *(const float *) a;
42
0
    const float fb = *(const float *) b;
43
44
0
    if (fa < fb) {
45
0
        return -1;
46
0
    }
47
0
    else if (fa > fb) {
48
0
        return 1;
49
0
    }
50
0
    else {
51
0
        return 0;
52
0
    }
53
0
}
54
55
static float tda_choose_threshold_from_dist(struct tda_proc_ctx *ctx,
56
                                            const float *dist,
57
                                            size_t n,
58
                                            double quantile)
59
0
{
60
0
    size_t m;
61
0
    float *vals;
62
0
    size_t idx;
63
0
    size_t i;
64
0
    size_t j;
65
0
    size_t k = 0;
66
0
    float thr = 0.0f;
67
0
    double pos;
68
0
    double q;
69
70
0
    if (!dist || n < 2) {
71
0
        return 0.0f;
72
0
    }
73
74
    /* if user specified threshold as quantile (0 < q < 1),
75
     * override the default quantile argument.
76
     */
77
0
    if (ctx && ctx->threshold > 0.0 && ctx->threshold < 1.0) {
78
0
        q = ctx->threshold;
79
0
    }
80
0
    else {
81
0
        q = quantile;
82
0
    }
83
84
0
    if (q <= 0.0) {
85
0
        q = 0.0;
86
0
    }
87
0
    else if (q >= 1.0) {
88
0
        q = 1.0;
89
0
    }
90
91
    /* number of unique off-diagonal distances */
92
0
    m = n * (n - 1) / 2;
93
0
    if (m == 0) {
94
0
        return 0.0f;
95
0
    }
96
97
0
    vals = (float *) flb_malloc(sizeof(float) * m);
98
0
    if (!vals) {
99
0
        flb_errno();
100
0
        return 0.0f;
101
0
    }
102
103
    /* collect i > j entries */
104
0
    for (i = 0; i < n; i++) {
105
0
        for (j = 0; j < i; j++) {
106
0
            vals[k++] = dist[i * n + j];
107
0
        }
108
0
    }
109
110
0
    if (k == 0) {
111
0
        flb_free(vals);
112
0
        return 0.0f;
113
0
    }
114
115
0
    qsort(vals, k, sizeof(float), cmp_float_asc);
116
117
    /* pick quantile index (e.g. 0.5 → median) */
118
0
    if (k == 1) {
119
0
        idx = 0;
120
0
    }
121
0
    else {
122
0
        pos = q * (double) (k - 1);
123
0
        if (pos < 0.0) {
124
0
            pos = 0.0;
125
0
        }
126
0
        if (pos > (double) (k - 1)) {
127
0
            pos = (double) (k - 1);
128
0
        }
129
0
        idx = (size_t) pos;
130
0
    }
131
132
0
    thr = vals[idx];
133
134
0
    flb_debug("[tda] chosen distance threshold=%.6f (quantile=%.2f, m=%zu)",
135
0
              thr, q, k);
136
137
0
    flb_free(vals);
138
139
0
    return thr;
140
0
}
141
142
struct tda_window *tda_window_create(size_t capacity, int feature_dim)
143
0
{
144
0
    struct tda_window *w;
145
0
    size_t sample_size;
146
0
    size_t buf_size;
147
148
0
    w = flb_calloc(1, sizeof(*w));
149
0
    if (!w) {
150
0
        return NULL;
151
0
    }
152
153
0
    w->feature_dim = feature_dim;
154
    /* struct tda_sample { uint64_t ts; double values[]; } */
155
0
    sample_size = sizeof(uint64_t) + (size_t) feature_dim * sizeof(double);
156
0
    w->sample_size = sample_size;
157
158
0
    buf_size = capacity * sample_size;
159
160
0
    w->buf = flb_malloc(buf_size);
161
0
    if (!w->buf) {
162
0
        flb_free(w);
163
0
        return NULL;
164
0
    }
165
166
0
    if (lwrb_init(&w->rb, w->buf, buf_size) != 1) {
167
0
        flb_free(w->buf);
168
0
        flb_free(w);
169
0
        return NULL;
170
0
    }
171
172
0
    return w;
173
0
}
174
175
/* ---------------------------------------------------------------------- */
176
/* small helpers                                                          */
177
/* ---------------------------------------------------------------------- */
178
179
static inline int tda_append_group_to_list(struct tda_group ***plist,
180
                                           int *plist_cap,
181
                                           int *pnext_index,
182
                                           struct tda_group *g)
183
0
{
184
0
    int next_index;
185
0
    int list_cap;
186
0
    struct tda_group **list;
187
0
    int new_cap;
188
189
0
    if (!plist || !plist_cap || !pnext_index || !g) {
190
0
        return -1;
191
0
    }
192
193
0
    list       = *plist;
194
0
    list_cap   = *plist_cap;
195
0
    next_index = *pnext_index;
196
197
0
    if (next_index >= list_cap) {
198
0
        new_cap = (list_cap == 0) ? 16 : list_cap * 2;
199
200
0
        list = flb_realloc(list,
201
0
                           sizeof(struct tda_group *) * (size_t) new_cap);
202
0
        if (!list) {
203
0
            flb_errno();
204
0
            return -1;
205
0
        }
206
207
0
        *plist     = list;
208
0
        *plist_cap = new_cap;
209
0
    }
210
211
0
    list       = *plist;
212
0
    next_index = *pnext_index;
213
214
0
    list[next_index] = g;
215
0
    *pnext_index     = next_index + 1;
216
217
0
    return 0;
218
0
}
219
220
static inline int tda_register_map_group(struct flb_hash_table *ht,
221
                                         struct tda_group ***plist,
222
                                         int *plist_cap,
223
                                         int *pnext_index,
224
                                         struct cmt_map *map)
225
0
{
226
0
    const char *ns;
227
0
    const char *sub;
228
0
    char key[256];
229
0
    int  len;
230
0
    void *out;
231
0
    struct tda_group *g;
232
0
    int idx;
233
234
0
    if (!ht || !plist || !plist_cap || !pnext_index || !map || !map->opts) {
235
0
        return -1;
236
0
    }
237
238
0
    ns  = map->opts->ns        ? map->opts->ns        : "";
239
0
    sub = map->opts->subsystem ? map->opts->subsystem : "";
240
241
0
    len = snprintf(key, sizeof(key), "%s.%s", ns, sub);
242
0
    if (len < 0 || (size_t) len >= sizeof(key)) {
243
0
        return 0;
244
0
    }
245
246
0
    out = flb_hash_table_get_ptr(ht, key, len);
247
0
    if (out) {
248
0
        return 0;
249
0
    }
250
251
0
    g = flb_calloc(1, sizeof(*g));
252
0
    if (!g) {
253
0
        flb_errno();
254
0
        return -1;
255
0
    }
256
257
0
    g->ns        = cfl_sds_create(ns);
258
0
    g->subsystem = cfl_sds_create(sub);
259
260
0
    if (!g->ns || !g->subsystem) {
261
0
        if (g->ns) {
262
0
            cfl_sds_destroy(g->ns);
263
0
        }
264
0
        if (g->subsystem) {
265
0
            cfl_sds_destroy(g->subsystem);
266
0
        }
267
0
        flb_free(g);
268
0
        flb_errno();
269
0
        return -1;
270
0
    }
271
272
0
    g->index = *pnext_index;
273
0
    if (tda_append_group_to_list(plist, plist_cap, pnext_index, g) != 0) {
274
0
        cfl_sds_destroy(g->ns);
275
0
        cfl_sds_destroy(g->subsystem);
276
0
        flb_free(g);
277
0
        return -1;
278
0
    }
279
280
0
    if (flb_hash_table_add(ht, key, len, g, 0) < 0) {
281
0
        idx = g->index;
282
283
0
        if (*pnext_index > 0 && *pnext_index - 1 == idx) {
284
0
            (*pnext_index)--;
285
0
        }
286
287
0
        cfl_sds_destroy(g->ns);
288
0
        cfl_sds_destroy(g->subsystem);
289
0
        flb_free(g);
290
0
        flb_errno();
291
0
        return -1;
292
0
    }
293
294
0
    return 0;
295
0
}
296
297
static inline void tda_accumulate_map_metrics(struct tda_proc_ctx *ctx,
298
                                              struct cmt_map *map,
299
                                              double *out_vec)
300
0
{
301
0
    const char *ns;
302
0
    const char *sub;
303
0
    char key[256];
304
0
    int  len;
305
0
    void *out;
306
0
    struct tda_group *g;
307
0
    int idx;
308
0
    struct cfl_list *metric_head;
309
0
    struct cmt_metric *metric;
310
311
0
    if (!ctx || !ctx->groups || !map || !map->opts || !out_vec) {
312
0
        return;
313
0
    }
314
315
0
    ns  = map->opts->ns        ? map->opts->ns        : "";
316
0
    sub = map->opts->subsystem ? map->opts->subsystem : "";
317
318
0
    len = snprintf(key, sizeof(key), "%s.%s", ns, sub);
319
0
    if (len < 0 || (size_t) len >= sizeof(key)) {
320
0
        return;
321
0
    }
322
323
0
    out = flb_hash_table_get_ptr(ctx->groups, key, len);
324
0
    if (!out) {
325
0
        return;
326
0
    }
327
328
0
    g = (struct tda_group *) out;
329
0
    idx = g->index;
330
331
0
    if (idx < 0 || idx >= ctx->feature_dim) {
332
0
        return;
333
0
    }
334
335
0
    if (map->metric_static_set) {
336
0
        metric = &map->metric;
337
0
        out_vec[idx] += cmt_metric_get_value(metric);
338
0
    }
339
340
0
    cfl_list_foreach(metric_head, &map->metrics) {
341
0
        metric = cfl_list_entry(metric_head, struct cmt_metric, _head);
342
0
        out_vec[idx] += cmt_metric_get_value(metric);
343
0
    }
344
0
}
345
346
/* ---------------------------------------------------------------------- */
347
/* group building: allocate dimensions each of (ns,subsystem)             */
348
/* ---------------------------------------------------------------------- */
349
350
static int tda_build_groups(struct tda_proc_ctx *ctx, struct cmt *cmt)
351
0
{
352
0
    struct cfl_list *head;
353
0
    struct cmt_counter *counter;
354
0
    struct cmt_gauge *gauge;
355
0
    struct cmt_untyped *untyped;
356
0
    struct cmt_map *map;
357
0
    struct flb_hash_table *ht;
358
0
    struct tda_group **list = NULL;
359
0
    int list_cap = 0;
360
0
    int next_index = 0;
361
0
    int i;
362
0
    int ret = -1;
363
0
    struct tda_group *g;
364
365
0
    if (!ctx || !cmt) {
366
0
        return -1;
367
0
    }
368
369
0
    ht = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 64, 0);
370
0
    if (!ht) {
371
0
        flb_errno();
372
0
        return -1;
373
0
    }
374
375
    /* counters */
376
0
    cfl_list_foreach(head, &cmt->counters) {
377
0
        counter = cfl_list_entry(head, struct cmt_counter, _head);
378
0
        map = counter->map;
379
380
0
        if (tda_register_map_group(ht,
381
0
                                   &list,
382
0
                                   &list_cap,
383
0
                                   &next_index,
384
0
                                   map) != 0) {
385
0
            goto error;
386
0
        }
387
0
    }
388
389
    /* gauges */
390
0
    cfl_list_foreach(head, &cmt->gauges) {
391
0
        gauge = cfl_list_entry(head, struct cmt_gauge, _head);
392
0
        map = gauge->map;
393
394
0
        if (tda_register_map_group(ht,
395
0
                                   &list,
396
0
                                   &list_cap,
397
0
                                   &next_index,
398
0
                                   map) != 0) {
399
0
            goto error;
400
0
        }
401
0
    }
402
403
    /* untyped */
404
0
    cfl_list_foreach(head, &cmt->untypeds) {
405
0
        untyped = cfl_list_entry(head, struct cmt_untyped, _head);
406
0
        map = untyped->map;
407
408
0
        if (tda_register_map_group(ht,
409
0
                                   &list,
410
0
                                   &list_cap,
411
0
                                   &next_index,
412
0
                                   map) != 0) {
413
0
            goto error;
414
0
        }
415
0
    }
416
417
0
    ctx->groups      = ht;
418
0
    ctx->group_list  = list;
419
0
    ctx->feature_dim = next_index;
420
421
    /* allocate last_vec for rate calculation */
422
0
    ctx->last_vec = flb_calloc(ctx->feature_dim, sizeof(double));
423
0
    if (!ctx->last_vec) {
424
0
        flb_errno();
425
        /* Clean up what we just assigned */
426
0
        ctx->groups = NULL;
427
0
        ctx->group_list = NULL;
428
0
        ctx->feature_dim = 0;
429
0
        goto error;
430
0
    }
431
0
    ctx->last_ts = 0;
432
433
0
    flb_plg_info(ctx->ins, "built TDA groups: feature_dim=%d", ctx->feature_dim);
434
435
0
    return 0;
436
437
0
error:
438
0
    if (list) {
439
0
        for (i = 0; i < next_index; i++) {
440
0
            g = list[i];
441
0
            if (!g) {
442
0
                continue;
443
0
            }
444
0
            if (g->ns) {
445
0
                cfl_sds_destroy(g->ns);
446
0
            }
447
0
            if (g->subsystem) {
448
0
                cfl_sds_destroy(g->subsystem);
449
0
            }
450
0
            flb_free(g);
451
0
        }
452
0
        flb_free(list);
453
0
    }
454
0
    if (ht) {
455
0
        flb_hash_table_destroy(ht);
456
0
    }
457
0
    if (ctx->last_vec) {
458
0
        flb_free(ctx->last_vec);
459
0
        ctx->last_vec = NULL;
460
0
    }
461
0
    return ret;
462
0
}
463
464
void tda_window_destroy(struct tda_window *w)
465
0
{
466
0
    if (!w) {
467
0
        return;
468
0
    }
469
470
0
    flb_free(w->buf);
471
0
    flb_free(w);
472
0
}
473
474
/* ---- metrics aggregation ---------------------------------------------- */
475
476
static int tda_build_vector_from_cmt(struct tda_proc_ctx *ctx,
477
                                     struct cmt *cmt,
478
                                     double *out_vec,
479
                                     uint64_t ts)
480
0
{
481
0
    struct cfl_list *head;
482
0
    struct cmt_counter *counter;
483
0
    struct cmt_gauge *gauge;
484
0
    struct cmt_untyped *untyped;
485
0
    struct cmt_map *map;
486
487
0
    int i;
488
0
    double dt_sec;
489
0
    double raw_now;
490
0
    double raw_prev;
491
0
    double diff;
492
0
    double rate;
493
0
    double mag;
494
0
    double norm;
495
496
    /* zero-initialize vector */
497
0
    for (i = 0; i < ctx->feature_dim; i++) {
498
0
        out_vec[i] = 0.0;
499
0
    }
500
501
0
    if (!cmt || !ctx->groups) {
502
0
        return -1;
503
0
    }
504
505
    /* counters */
506
0
    cfl_list_foreach(head, &cmt->counters) {
507
0
        counter = cfl_list_entry(head, struct cmt_counter, _head);
508
0
        map = counter->map;
509
0
        tda_accumulate_map_metrics(ctx, map, out_vec);
510
0
    }
511
512
    /* gauges */
513
0
    cfl_list_foreach(head, &cmt->gauges) {
514
0
        gauge = cfl_list_entry(head, struct cmt_gauge, _head);
515
0
        map = gauge->map;
516
0
        tda_accumulate_map_metrics(ctx, map, out_vec);
517
0
    }
518
519
    /* untyped */
520
0
    cfl_list_foreach(head, &cmt->untypeds) {
521
0
        untyped = cfl_list_entry(head, struct cmt_untyped, _head);
522
0
        map = untyped->map;
523
0
        tda_accumulate_map_metrics(ctx, map, out_vec);
524
0
    }
525
526
    /* At this point, out_vec contains the aggregated value for each (ns, subsystem).
527
     *
528
     * Next, we use the difference from the previous snapshot and dt to compute:
529
     *   rate = diff / dt
530
     * and then apply log1p for a light normalization.
531
     */
532
533
0
    if (!ctx->last_vec || ctx->feature_dim <= 0) {
534
0
        return -1;
535
0
    }
536
537
0
    if (ctx->last_ts == 0) {
538
        /* First call: we cannot compute rates yet, so we return 0
539
         * and store the current values in last_vec.
540
         */
541
0
        for (i = 0; i < ctx->feature_dim; i++) {
542
0
            ctx->last_vec[i] = out_vec[i];
543
0
            out_vec[i]       = 0.0;
544
0
        }
545
0
        ctx->last_ts = ts;
546
0
        return 0;
547
0
    }
548
549
0
    if (ts > ctx->last_ts) {
550
0
        dt_sec = (double) (ts - ctx->last_ts) / 1e9; /* cfl_time_now() returns ns */
551
0
    }
552
0
    else {
553
        /* safeguard in case time goes backwards */
554
0
        dt_sec = 1.0;
555
0
    }
556
557
0
    if (dt_sec <= 0.0) {
558
0
        dt_sec = 1.0;
559
0
    }
560
561
0
    for (i = 0; i < ctx->feature_dim; i++) {
562
0
        raw_now  = out_vec[i];
563
0
        raw_prev = ctx->last_vec[i];
564
0
        diff     = raw_now - raw_prev;
565
0
        rate     = diff / dt_sec;
566
0
        mag      = fabs(rate);
567
0
        norm     = log1p(mag);     /* squash into [0, +∞) */
568
569
0
        out_vec[i]       = (rate >= 0.0) ? norm : -norm;
570
0
        ctx->last_vec[i] = raw_now;       /* store raw value for next time */
571
0
    }
572
0
    ctx->last_ts = ts;
573
574
0
    return 0;
575
0
}
576
577
void tda_window_ingest(struct tda_window *w,
578
                       struct tda_proc_ctx *ctx,
579
                       struct cmt *cmt)
580
0
{
581
0
    uint64_t ts;
582
0
    size_t needed;
583
0
    size_t r;
584
0
    uint8_t *buf;
585
0
    uint8_t *drop = NULL;
586
0
    double *vec;
587
0
    struct tda_sample *s;
588
589
0
    if (!w || !ctx || !cmt) {
590
0
        return;
591
0
    }
592
593
0
    ts = cfl_time_now();
594
0
    needed = w->sample_size;
595
596
0
    buf = flb_malloc(needed);
597
0
    if (!buf) {
598
0
        flb_errno();
599
0
        return;
600
0
    }
601
602
0
    s = (struct tda_sample *) buf;
603
0
    s->ts = ts;
604
605
0
    vec = s->values;
606
0
    if (tda_build_vector_from_cmt(ctx, cmt, vec, ts) != 0) {
607
0
        flb_free(buf);
608
0
        return;
609
0
    }
610
611
    /* ring buffer full -> drop oldest sample(s) */
612
0
    while (lwrb_get_free(&w->rb) < needed) {
613
0
        if (drop == NULL) {
614
0
            drop = flb_malloc(w->sample_size);
615
0
            if (!drop) {
616
0
                flb_errno();
617
0
                lwrb_reset(&w->rb);
618
0
                flb_free(buf);
619
0
                return;
620
0
            }
621
0
        }
622
623
0
        r = lwrb_read(&w->rb, drop, w->sample_size);
624
0
        if (r != w->sample_size) {
625
0
            lwrb_reset(&w->rb);
626
0
            break;
627
0
        }
628
0
    }
629
630
0
    if (lwrb_write(&w->rb, buf, needed) != needed) {
631
0
        lwrb_reset(&w->rb);
632
0
    }
633
634
0
    if (drop) {
635
0
        flb_free(drop);
636
0
    }
637
0
    flb_free(buf);
638
0
}
639
640
static size_t tda_window_length(struct tda_window *w)
641
0
{
642
0
    size_t full;
643
644
0
    if (!w) {
645
0
        return 0;
646
0
    }
647
648
0
    full = lwrb_get_full(&w->rb);
649
0
    return (w->sample_size > 0) ? full / w->sample_size : 0;
650
0
}
651
652
/* non-destructive snapshot of the last max_samples samples into out_buf.
653
 * out_buf must have at least max_samples * w->sample_size bytes.
654
 */
655
static size_t tda_window_snapshot(struct tda_window *w,
656
                                  uint8_t *out_buf,
657
                                  size_t max_samples)
658
0
{
659
0
    size_t full_bytes;
660
0
    size_t sample_bytes;
661
0
    size_t total_count;
662
0
    size_t copy_count;
663
0
    size_t start_index;
664
0
    uint8_t *tmp;
665
0
    size_t r;
666
667
0
    if (!w || !out_buf || max_samples == 0) {
668
0
        return 0;
669
0
    }
670
671
0
    sample_bytes = w->sample_size;
672
0
    full_bytes   = lwrb_get_full(&w->rb);
673
674
0
    total_count = full_bytes / sample_bytes;
675
0
    if (total_count == 0) {
676
0
        return 0;
677
0
    }
678
679
    /* only whole samples are interesting */
680
0
    full_bytes = total_count * sample_bytes;
681
682
0
    tmp = flb_calloc(1, full_bytes);
683
0
    if (!tmp) {
684
0
        flb_errno();
685
0
        return 0;
686
0
    }
687
688
    /* Note: lwrb doesn't support peek, so we read and restore.
689
     * In the unlikely event write-back fails, data is lost.
690
     */
691
    /* read out all data ... */
692
0
    r = lwrb_read(&w->rb, tmp, full_bytes);
693
0
    if (r != full_bytes) {
694
        /* inconsistent state, reset */
695
0
        lwrb_reset(&w->rb);
696
0
        flb_free(tmp);
697
0
        return 0;
698
0
    }
699
700
    /* ... and immediately write it back to keep the logical window */
701
0
    if (lwrb_write(&w->rb, tmp, full_bytes) != full_bytes) {
702
        /* this should not fail; if it does, reset */
703
0
        lwrb_reset(&w->rb);
704
0
        flb_free(tmp);
705
0
        return 0;
706
0
    }
707
708
    /* keep only the last max_samples */
709
0
    copy_count = total_count;
710
0
    if (copy_count > max_samples) {
711
0
        copy_count = max_samples;
712
0
    }
713
714
0
    start_index = total_count - copy_count;
715
716
0
    memcpy(out_buf,
717
0
           tmp + start_index * sample_bytes,
718
0
           copy_count * sample_bytes);
719
720
0
    flb_free(tmp);
721
722
0
    return copy_count;
723
0
}
724
725
static int ensure_betti_gauges(struct tda_proc_ctx *ctx, struct cmt *cmt)
726
0
{
727
0
    if (!ctx || !cmt) {
728
0
        return -1;
729
0
    }
730
731
0
    if (!ctx->g_betti0) {
732
0
        ctx->g_betti0 = cmt_gauge_create(cmt,
733
0
                                         "fluentbit", "tda",
734
0
                                         "betti0",
735
0
                                         "Betti_0 over TDA sliding window",
736
0
                                         0, NULL);
737
0
        if (!ctx->g_betti0) {
738
0
            return -1;
739
0
        }
740
0
    }
741
742
0
    if (!ctx->g_betti1) {
743
0
        ctx->g_betti1 = cmt_gauge_create(cmt,
744
0
                                         "fluentbit", "tda",
745
0
                                         "betti1",
746
0
                                         "Betti_1 over TDA sliding window",
747
0
                                         0, NULL);
748
0
        if (!ctx->g_betti1) {
749
0
            return -1;
750
0
        }
751
0
    }
752
753
0
    if (!ctx->g_betti2) {
754
0
        ctx->g_betti2 = cmt_gauge_create(cmt,
755
0
                                         "fluentbit", "tda",
756
0
                                         "betti2",
757
0
                                         "Betti_2 over TDA sliding window",
758
0
                                         0, NULL);
759
0
        if (!ctx->g_betti2) {
760
0
            return -1;
761
0
        }
762
0
    }
763
764
0
    return 0;
765
0
}
766
767
static void tda_window_run_ripser(struct tda_window *w,
768
                                  struct tda_proc_ctx *ctx,
769
                                  struct cmt *cmt)
770
0
{
771
0
    size_t n_raw;
772
0
    size_t mat_size;
773
0
    float *dist;
774
0
    uint8_t *raw_samples;
775
0
    flb_ripser_betti betti;
776
0
    uint64_t ts;
777
0
    float threshold;
778
779
0
    size_t i, j, k;
780
0
    size_t lag;
781
0
    size_t m;
782
0
    size_t tau;
783
0
    size_t min_required;
784
0
    size_t n_embed;
785
786
0
    double q;
787
788
0
    double accum;
789
0
    size_t base_i;
790
0
    size_t base_j;
791
0
    size_t idx_i;
792
0
    size_t idx_j;
793
794
0
    uint8_t *si;
795
0
    uint8_t *sj;
796
797
0
    struct tda_sample *s_i = NULL;
798
0
    struct tda_sample *s_j = NULL;
799
800
0
    double *xi;
801
0
    double *xj;
802
803
0
    double diff;
804
0
    float d;
805
806
    /* --- search for H1 structures across multiple scales --- */
807
0
    static const double q_candidates[] = {
808
0
        0.10, 0.20, 0.30, 0.40, 0.50,
809
0
        0.60, 0.70, 0.80, 0.90
810
0
    };
811
812
0
    int nq;
813
814
0
    int best_b0 = 0;
815
0
    int best_b1 = 0;
816
0
    int best_b2 = 0;
817
0
    double best_q_for_b1 = 0.0;
818
819
0
    int qi;
820
0
    double qc;
821
0
    float thr;
822
0
    flb_ripser_betti tmp;
823
0
    int ret_local;
824
825
0
    if (!w || !ctx || !cmt) {
826
0
        return;
827
0
    }
828
829
0
    n_raw = tda_window_length(w);
830
0
    if (n_raw < 2) {
831
0
        return;
832
0
    }
833
834
0
    if (ensure_betti_gauges(ctx, cmt) != 0) {
835
0
        flb_plg_warn(ctx->ins, "failed to create betti gauges");
836
0
        return;
837
0
    }
838
839
0
    raw_samples = flb_calloc(1, n_raw * w->sample_size);
840
0
    if (!raw_samples) {
841
0
        flb_errno();
842
0
        return;
843
0
    }
844
845
    /* snapshot of the latest n_raw samples into raw_samples */
846
0
    n_raw = tda_window_snapshot(w, raw_samples, n_raw);
847
0
    if (n_raw < 2) {
848
0
        flb_free(raw_samples);
849
0
        return;
850
0
    }
851
852
    /* --- delay embedding settings --- */
853
0
    m   = (ctx->embed_dim   > 0) ? (size_t) ctx->embed_dim   : 1;
854
0
    tau = (ctx->embed_delay > 0) ? (size_t) ctx->embed_delay : 1;
855
856
    /* When m == 1, disable delay embedding to match the original behavior. */
857
0
    if (m == 1) {
858
0
        tau = 1;
859
0
    }
860
861
    /* Minimum number of samples required for the embedding:
862
     * index: t, t - tau, ..., t - (m-1)tau → t >= (m-1)tau
863
     * number of valid t = n_raw - (m - 1)tau
864
     */
865
0
    min_required = (m - 1) * tau + 1;
866
0
    if (n_raw < min_required) {
867
        /* Not enough samples to construct the delay embedding yet. */
868
0
        flb_free(raw_samples);
869
0
        return;
870
0
    }
871
872
0
    n_embed = n_raw - (m - 1) * tau;
873
874
0
    flb_plg_debug(ctx->ins, "n_raw=%zu, embed_dim=%d, embed_delay=%d, n_embed=%zu",
875
0
                  n_raw, ctx->embed_dim, ctx->embed_delay, n_embed);
876
877
0
    mat_size = n_embed * n_embed;
878
0
    dist = flb_calloc(mat_size, sizeof(float));
879
0
    if (!dist) {
880
0
        flb_errno();
881
0
        flb_free(raw_samples);
882
0
        return;
883
0
    }
884
885
    /* Build the distance matrix as an (n × m)-dimensional Euclidean distance.
886
     *
887
     * Embedded point p (0..n_embed-1) corresponds to the actual sample indices:
888
     *   base_p = p + (m - 1) * tau;
889
     *   for lag l: index = base_p - l * tau;
890
     */
891
0
    for (i = 0; i < n_embed; i++) {
892
0
        dist[i * n_embed + i] = 0.0f;
893
894
0
        for (j = 0; j < i; j++) {
895
0
            accum = 0.0;
896
897
0
            base_i = i + (m - 1) * tau;
898
0
            base_j = j + (m - 1) * tau;
899
900
0
            for (lag = 0; lag < m; lag++) {
901
0
                idx_i = base_i - lag * tau;
902
0
                idx_j = base_j - lag * tau;
903
904
0
                si = raw_samples + idx_i * w->sample_size;
905
0
                sj = raw_samples + idx_j * w->sample_size;
906
907
0
                s_i = (struct tda_sample *) si;
908
0
                s_j = (struct tda_sample *) sj;
909
910
0
                xi = s_i->values;
911
0
                xj = s_j->values;
912
913
                /* feature_dim (≈ 8 collapsed metrics) × m (lags) */
914
0
                for (k = 0; k < (size_t) ctx->feature_dim; k++) {
915
0
                    diff = xi[k] - xj[k];
916
0
                    accum += diff * diff;
917
0
                }
918
0
            }
919
920
0
            d = (float) sqrt(accum);
921
0
            dist[i * n_embed + j] = d;
922
0
            dist[j * n_embed + i] = d;
923
0
        }
924
0
    }
925
926
0
    if (m == 1) {
927
0
        q = 0.5;      /* No delay embedding: use something like the median. */
928
0
    }
929
0
    else {
930
0
        q = 0.2;      /* With delay embedding: look at a smaller scale. */
931
0
    }
932
933
    /* --- choose a scale for TDA ---
934
     * Use the number of embedded points n_embed to determine the threshold.
935
     */
936
0
    threshold = tda_choose_threshold_from_dist(ctx, dist, n_embed, q);
937
0
    if (threshold <= 0.0f) {
938
0
        threshold = 0.0f;
939
0
    }
940
941
0
    memset(&betti, 0, sizeof(betti));
942
943
0
    nq = sizeof(q_candidates) / sizeof(q_candidates[0]);
944
945
0
    for (qi = 0; qi < nq; qi++) {
946
0
        qc = q_candidates[qi];
947
0
        thr = tda_choose_threshold_from_dist(ctx, dist, n_embed, qc);
948
949
0
        if (thr < 0.0f) {
950
0
            thr = 0.0f;
951
0
        }
952
953
0
        memset(&tmp, 0, sizeof(tmp));
954
955
0
        ret_local = flb_ripser_compute_betti_from_dense_distance(dist,
956
0
                                                                 n_embed,
957
0
                                                                 2 /* max_dim */,
958
0
                                                                 thr,
959
0
                                                                 &tmp);
960
0
        if (ret_local != 0) {
961
0
            continue;
962
0
        }
963
964
        /* Prefer H1 (loops) as the primary signal.
965
         * If needed, H0/H2 can be used as additional indicators.
966
         */
967
0
        if (tmp.num_dims > 1 && tmp.betti[1] > best_b1) {
968
0
            best_b1 = tmp.betti[1];
969
0
            best_b0 = tmp.betti[0];
970
0
            best_b2 = (tmp.num_dims > 2) ? tmp.betti[2] : 0;
971
972
            /* if user forced ctx->threshold as quantile, report that,
973
             * otherwise report the candidate quantile qc.
974
             */
975
0
            if (ctx && ctx->threshold > 0.0 && ctx->threshold < 1.0) {
976
0
                best_q_for_b1 = ctx->threshold;
977
0
            }
978
0
            else {
979
0
                best_q_for_b1 = qc;
980
0
            }
981
0
        }
982
        /* If all H1 are zero, fall back to H0. */
983
0
        else if (best_b1 == 0 && tmp.betti[0] > best_b0) {
984
0
            best_b0 = tmp.betti[0];
985
0
            best_b2 = (tmp.num_dims > 2) ? tmp.betti[2] : 0;
986
987
0
            if (ctx && ctx->threshold > 0.0 && ctx->threshold < 1.0) {
988
0
                best_q_for_b1 = ctx->threshold;
989
0
            }
990
0
            else {
991
0
                best_q_for_b1 = qc;
992
0
            }
993
0
        }
994
0
    }
995
996
    /* After the loop, copy the "most plausible" values into betti. */
997
0
    betti.num_dims = 3;  /* we track b0, b1, b2 */
998
0
    betti.betti[0] = best_b0;
999
0
    betti.betti[1] = best_b1;
1000
0
    betti.betti[2] = best_b2;
1001
1002
0
    flb_plg_debug(ctx->ins, "betti dims=%d, b0=%d, b1=%d, b2=%d (best_q=%.2f)",
1003
0
                  betti.num_dims,
1004
0
                  betti.betti[0],
1005
0
                  betti.betti[1],
1006
0
                  betti.betti[2],
1007
0
                  best_q_for_b1);
1008
1009
0
    ts = cfl_time_now();
1010
1011
0
    if (ctx->g_betti0) {
1012
0
        cmt_gauge_set(ctx->g_betti0, ts,
1013
0
                      (double) betti.betti[0],
1014
0
                      0, NULL);
1015
0
    }
1016
1017
0
    if (ctx->g_betti1) {
1018
0
        cmt_gauge_set(ctx->g_betti1, ts,
1019
0
                      (double) betti.betti[1],
1020
0
                      0, NULL);
1021
0
    }
1022
1023
0
    if (ctx->g_betti2) {
1024
0
        cmt_gauge_set(ctx->g_betti2, ts,
1025
0
                      (double) betti.betti[2],
1026
0
                      0, NULL);
1027
0
    }
1028
1029
0
    flb_free(dist);
1030
0
    flb_free(raw_samples);
1031
0
}
1032
1033
1034
/* ---------------------------------------------------------------------- */
1035
/* processor plugin glue                                                  */
1036
/* ---------------------------------------------------------------------- */
1037
1038
static int tda_proc_init(struct flb_processor_instance *ins,
1039
                         void *source_plugin_instance,
1040
                         int source_plugin_type,
1041
                         struct flb_config *config)
1042
0
{
1043
0
    int ret = -1;
1044
0
    struct tda_proc_ctx *ctx;
1045
1046
0
    (void) source_plugin_instance;
1047
0
    (void) source_plugin_type;
1048
0
    (void) config;
1049
1050
0
    ctx = flb_calloc(1, sizeof(*ctx));
1051
0
    if (!ctx) {
1052
0
        flb_errno();
1053
0
        return FLB_PROCESSOR_FAILURE;
1054
0
    }
1055
1056
0
    ctx->feature_dim = 0;
1057
0
    ctx->groups      = NULL;
1058
0
    ctx->group_list  = NULL;
1059
0
    ctx->window      = NULL;
1060
0
    ctx->last_vec    = NULL;
1061
0
    ctx->last_ts     = 0;
1062
0
    ctx->ins         = ins;
1063
1064
    /* load configuration from config_map (override defaults if present) */
1065
0
    ret = flb_processor_instance_config_map_set(ins, (void *) ctx);
1066
0
    if (ret == -1) {
1067
0
        flb_plg_error(ins, "unable to load configuration");
1068
0
        flb_free(ctx);
1069
0
        return FLB_PROCESSOR_FAILURE;
1070
0
    }
1071
1072
0
    ins->context = ctx;
1073
1074
0
    return FLB_PROCESSOR_SUCCESS;
1075
0
}
1076
1077
static void tda_free_groups(struct tda_proc_ctx *ctx)
1078
0
{
1079
0
    int i;
1080
0
    struct tda_group *g = NULL;
1081
1082
0
    if (!ctx) {
1083
0
        return;
1084
0
    }
1085
1086
0
    if (ctx->group_list) {
1087
0
        for (i = 0; i < ctx->feature_dim; i++) {
1088
0
            g = ctx->group_list[i];
1089
0
            if (!g) {
1090
0
                continue;
1091
0
            }
1092
0
            if (g->ns) {
1093
0
                cfl_sds_destroy(g->ns);
1094
0
            }
1095
0
            if (g->subsystem) {
1096
0
                cfl_sds_destroy(g->subsystem);
1097
0
            }
1098
0
            flb_free(g);
1099
0
        }
1100
0
        flb_free(ctx->group_list);
1101
0
        ctx->group_list = NULL;
1102
0
    }
1103
1104
0
    if (ctx->groups) {
1105
0
        flb_hash_table_destroy(ctx->groups);
1106
0
        ctx->groups = NULL;
1107
0
    }
1108
1109
0
    ctx->feature_dim = 0;
1110
0
}
1111
1112
static int tda_proc_exit(struct flb_processor_instance *ins, void *data)
1113
0
{
1114
0
    struct tda_proc_ctx *ctx;
1115
1116
0
    (void) ins;
1117
1118
0
    ctx = (struct tda_proc_ctx *) data;
1119
0
    if (!ctx) {
1120
0
        return FLB_PROCESSOR_SUCCESS;
1121
0
    }
1122
1123
0
    if (ctx->window) {
1124
0
        tda_window_destroy(ctx->window);
1125
0
    }
1126
1127
0
    tda_free_groups(ctx);
1128
1129
0
    if (ctx->last_vec) {
1130
0
        flb_free(ctx->last_vec);
1131
0
    }
1132
1133
0
    flb_free(ctx);
1134
1135
0
    return FLB_PROCESSOR_SUCCESS;
1136
0
}
1137
1138
static int tda_proc_process_metrics(struct flb_processor_instance *ins,
1139
                                    struct cmt *metrics_context,
1140
                                    struct cmt **out_context,
1141
                                    const char *tag,
1142
                                    int tag_len)
1143
0
{
1144
0
    struct tda_proc_ctx *ctx;
1145
1146
0
    (void) tag;
1147
0
    (void) tag_len;
1148
1149
0
    ctx = (struct tda_proc_ctx *) ins->context;
1150
0
    if (!ctx) {
1151
0
        return FLB_PROCESSOR_FAILURE;
1152
0
    }
1153
1154
0
    if (!metrics_context) {
1155
0
        *out_context = NULL;
1156
0
        return FLB_PROCESSOR_SUCCESS;
1157
0
    }
1158
1159
0
    ctx->g_betti0 = NULL;
1160
0
    ctx->g_betti1 = NULL;
1161
0
    ctx->g_betti2 = NULL;
1162
1163
    /* initial: construct groups and window */
1164
0
    if (ctx->groups == NULL) {
1165
0
        if (tda_build_groups(ctx, metrics_context) != 0) {
1166
0
            flb_plg_warn(ins, "[tda] failed to build TDA groups");
1167
0
            *out_context = metrics_context;
1168
0
            return FLB_PROCESSOR_SUCCESS;
1169
0
        }
1170
1171
0
        ctx->window = tda_window_create(ctx->window_size, ctx->feature_dim);
1172
0
        if (!ctx->window) {
1173
0
            flb_plg_warn(ins, "[tda] failed to create TDA window");
1174
0
            *out_context = metrics_context;
1175
0
            return FLB_PROCESSOR_SUCCESS;
1176
0
        }
1177
0
    }
1178
1179
0
    tda_window_ingest(ctx->window, ctx, metrics_context);
1180
1181
0
    if (tda_window_length(ctx->window) >= ctx->min_points) {
1182
0
        tda_window_run_ripser(ctx->window, ctx, metrics_context);
1183
0
    }
1184
1185
0
    *out_context = metrics_context;
1186
1187
0
    return FLB_PROCESSOR_SUCCESS;
1188
0
}
1189
1190
1191
static struct flb_config_map config_map[] = {
1192
    {
1193
        FLB_CONFIG_MAP_INT, "window_size", "60",
1194
        0, FLB_TRUE, offsetof(struct tda_proc_ctx, window_size),
1195
        "Number of samples to keep in the TDA sliding window"
1196
    },
1197
    {
1198
        FLB_CONFIG_MAP_INT, "min_points", "10",
1199
        0, FLB_TRUE, offsetof(struct tda_proc_ctx, min_points),
1200
        "Minimum number of samples required before running Ripser"
1201
    },
1202
    {
1203
        FLB_CONFIG_MAP_INT, "embed_dim", "3",
1204
        0, FLB_TRUE, offsetof(struct tda_proc_ctx, embed_dim),
1205
        "Delay embedding dimension m (m=1 disables delay embedding)."
1206
        "For example, m = 3 → x_t, x_{t-1}, x_{t-2}."
1207
    },
1208
    {
1209
        FLB_CONFIG_MAP_INT, "embed_delay", "1",
1210
        0, FLB_TRUE, offsetof(struct tda_proc_ctx, embed_delay),
1211
        "Delay embedding lag tau in samples. This means that 1 delaying sample."
1212
    },
1213
    {
1214
        FLB_CONFIG_MAP_DOUBLE, "threshold", "0",
1215
        0, FLB_TRUE, offsetof(struct tda_proc_ctx, threshold),
1216
        "Distance scale selector. 0 = auto multi-quantile scan; "
1217
        "(0,1) = use as quantile to pick the distance threshold."
1218
    },
1219
    /* EOF */
1220
    {0}
1221
};
1222
1223
struct flb_processor_plugin processor_tda_plugin = {
1224
    .name               = "tda",
1225
    .description        = "TDA (persistent homology) processor",
1226
    .cb_init            = tda_proc_init,
1227
    .cb_process_logs    = NULL,
1228
    .cb_process_metrics = tda_proc_process_metrics,
1229
    .cb_process_traces  = NULL,
1230
    .cb_exit            = tda_proc_exit,
1231
    .config_map         = config_map,
1232
    .flags              = 0
1233
};