Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/src/flb_router_condition.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_mem.h>
21
#include <fluent-bit/flb_log.h>
22
#include <fluent-bit/flb_router.h>
23
#include <fluent-bit/flb_conditionals.h>
24
#include <fluent-bit/flb_log_event_encoder.h>
25
#include <fluent-bit/flb_log_event_decoder.h>
26
#include <fluent-bit/flb_mp_chunk.h>
27
#include <cfl/cfl_kvlist.h>
28
29
0
#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS  1
30
0
#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1
31
32
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition);
33
34
static inline struct cfl_variant *get_object_variant(struct cfl_object *object)
35
0
{
36
0
    if (!object) {
37
0
        return NULL;
38
0
    }
39
40
0
    return object->variant;
41
0
}
42
43
static inline struct cfl_variant *get_body_variant(struct flb_mp_chunk_record *record)
44
0
{
45
0
    if (!record || !record->cobj_record) {
46
0
        return NULL;
47
0
    }
48
49
0
    return record->cobj_record->variant;
50
0
}
51
52
static struct cfl_variant *get_otel_container_variant(struct flb_mp_chunk_record *record,
53
                                                      const char *key,
54
                                                      int use_group_attributes)
55
0
{
56
0
    struct cfl_variant *source;
57
0
    struct cfl_variant *container;
58
59
    /* For OTLP, resource/scope attributes are in group_attributes, not body */
60
0
    if (use_group_attributes && record->cobj_group_attributes && record->cobj_group_attributes->variant) {
61
0
        source = record->cobj_group_attributes->variant;
62
0
    }
63
0
    else {
64
0
        source = get_body_variant(record);
65
0
    }
66
67
0
    if (!source || source->type != CFL_VARIANT_KVLIST) {
68
0
        return NULL;
69
0
    }
70
71
0
    container = cfl_kvlist_fetch(source->data.as_kvlist, key);
72
0
    if (!container || container->type != CFL_VARIANT_KVLIST) {
73
0
        return NULL;
74
0
    }
75
76
0
    return container;
77
0
}
78
79
static struct cfl_variant *get_otel_attributes_variant(struct flb_mp_chunk_record *record,
80
                                                       enum record_context_type context_type)
81
0
{
82
0
    struct cfl_variant *container;
83
0
    const char *container_key = NULL;
84
85
0
    if (context_type == RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES) {
86
0
        container_key = "resource";
87
0
    }
88
0
    else if (context_type == RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES) {
89
0
        container_key = "scope";
90
0
    }
91
0
    else {
92
0
        return NULL;
93
0
    }
94
95
    /* For OTLP resource/scope attributes, look in group_attributes first */
96
0
    container = get_otel_container_variant(record, container_key, 1);
97
0
    if (!container) {
98
0
        return NULL;
99
0
    }
100
101
0
    container = cfl_kvlist_fetch(container->data.as_kvlist, "attributes");
102
0
    if (!container || container->type != CFL_VARIANT_KVLIST) {
103
0
        return NULL;
104
0
    }
105
106
0
    return container;
107
0
}
108
109
static struct cfl_variant *get_otel_scope_metadata_variant(struct flb_mp_chunk_record *record)
110
0
{
111
0
    struct cfl_variant *scope;
112
113
    /* For OTLP scope metadata, also check group_attributes first */
114
0
    scope = get_otel_container_variant(record, "scope", 1);
115
0
    if (!scope || scope->type != CFL_VARIANT_KVLIST) {
116
0
        return NULL;
117
0
    }
118
119
0
    return scope;
120
0
}
121
122
static struct cfl_variant *route_logs_get_variant(struct flb_condition_rule *rule,
123
                                                  void *ctx)
124
0
{
125
0
    struct flb_mp_chunk_record *record = (struct flb_mp_chunk_record *) ctx;
126
127
0
    if (!rule || !record) {
128
0
        return NULL;
129
0
    }
130
131
0
    switch (rule->context) {
132
0
    case RECORD_CONTEXT_METADATA:
133
0
        return get_object_variant(record->cobj_metadata);
134
0
    case RECORD_CONTEXT_BODY:
135
0
        return get_body_variant(record);
136
0
    case RECORD_CONTEXT_GROUP_METADATA:
137
0
        return get_object_variant(record->cobj_group_metadata);
138
0
    case RECORD_CONTEXT_GROUP_ATTRIBUTES:
139
0
        return get_object_variant(record->cobj_group_attributes);
140
0
    case RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES:
141
0
    case RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES:
142
0
        return get_otel_attributes_variant(record, rule->context);
143
0
    case RECORD_CONTEXT_OTEL_SCOPE_METADATA:
144
0
        return get_otel_scope_metadata_variant(record);
145
0
    default:
146
0
        break;
147
0
    }
148
149
0
    return NULL;
150
0
}
151
152
int flb_router_chunk_context_init(struct flb_router_chunk_context *context)
153
6
{
154
6
    if (!context) {
155
0
        return -1;
156
0
    }
157
158
6
    context->chunk_cobj = NULL;
159
6
    context->log_encoder = NULL;
160
6
    context->log_decoder = NULL;
161
162
6
    return 0;
163
6
}
164
165
void flb_router_chunk_context_reset(struct flb_router_chunk_context *context)
166
6
{
167
6
    if (!context) {
168
0
        return;
169
0
    }
170
171
6
    if (context->chunk_cobj) {
172
0
        flb_mp_chunk_cobj_destroy(context->chunk_cobj);
173
0
        context->chunk_cobj = NULL;
174
0
    }
175
176
6
    if (context->log_decoder) {
177
0
        flb_log_event_decoder_destroy(context->log_decoder);
178
0
        context->log_decoder = NULL;
179
0
    }
180
181
6
    if (context->log_encoder) {
182
0
        flb_log_event_encoder_destroy(context->log_encoder);
183
0
        context->log_encoder = NULL;
184
0
    }
185
6
}
186
187
void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context)
188
6
{
189
6
    flb_router_chunk_context_reset(context);
190
6
}
191
192
int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context,
193
                                          struct flb_event_chunk *chunk)
194
0
{
195
0
    int ret;
196
0
    struct flb_mp_chunk_record *record;
197
198
0
    if (!context || !chunk) {
199
0
        return -1;
200
0
    }
201
202
0
    if (chunk->type != FLB_EVENT_TYPE_LOGS) {
203
0
        return 0;
204
0
    }
205
206
0
    if (context->chunk_cobj) {
207
0
        return 0;
208
0
    }
209
210
0
    if (!chunk->data || chunk->size == 0) {
211
0
        return -1;
212
0
    }
213
214
0
    if (!context->log_encoder) {
215
0
        context->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
216
0
        if (!context->log_encoder) {
217
0
            return -1;
218
0
        }
219
0
    }
220
221
0
    if (!context->log_decoder) {
222
0
        context->log_decoder = flb_log_event_decoder_create(NULL, 0);
223
0
        if (!context->log_decoder) {
224
0
            flb_router_chunk_context_reset(context);
225
0
            return -1;
226
0
        }
227
0
        flb_log_event_decoder_read_groups(context->log_decoder, FLB_TRUE);
228
0
    }
229
230
0
    flb_log_event_decoder_reset(context->log_decoder, chunk->data, chunk->size);
231
232
0
    context->chunk_cobj = flb_mp_chunk_cobj_create(context->log_encoder,
233
0
                                                   context->log_decoder);
234
0
    if (!context->chunk_cobj) {
235
0
        flb_router_chunk_context_reset(context);
236
0
        return -1;
237
0
    }
238
239
0
    while ((ret = flb_mp_chunk_cobj_record_next(context->chunk_cobj, &record)) ==
240
0
           FLB_MP_CHUNK_RECORD_OK) {
241
0
        continue;
242
0
    }
243
244
0
    if (ret != FLB_MP_CHUNK_RECORD_EOF) {
245
0
        flb_router_chunk_context_reset(context);
246
0
        return -1;
247
0
    }
248
249
0
    context->chunk_cobj->record_pos = NULL;
250
0
    context->chunk_cobj->condition = NULL;
251
252
0
    return 0;
253
0
}
254
255
uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
256
0
{
257
0
    if (!chunk) {
258
0
        return 0;
259
0
    }
260
261
0
    switch (chunk->type) {
262
0
    case FLB_EVENT_TYPE_LOGS:
263
0
        return FLB_ROUTER_SIGNAL_LOGS;
264
0
    case FLB_EVENT_TYPE_METRICS:
265
0
        return FLB_ROUTER_SIGNAL_METRICS;
266
0
    case FLB_EVENT_TYPE_TRACES:
267
0
        return FLB_ROUTER_SIGNAL_TRACES;
268
0
    default:
269
0
        break;
270
0
    }
271
272
0
    return 0;
273
0
}
274
275
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
276
                            struct flb_router_chunk_context *context,
277
                            struct flb_route *route)
278
0
{
279
0
    int result = FLB_FALSE;
280
0
    struct flb_route_condition *condition;
281
0
    struct flb_condition *compiled;
282
0
    struct flb_mp_chunk_record *record;
283
0
    struct cfl_list *head;
284
285
0
    if (!chunk || !context || !route || !route->condition) {
286
0
        return FLB_FALSE;
287
0
    }
288
289
0
    condition = route->condition;
290
291
0
    compiled = route_condition_get_compiled(condition);
292
0
    if (!compiled) {
293
0
        return FLB_FALSE;
294
0
    }
295
296
0
    if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
297
0
        return FLB_FALSE;
298
0
    }
299
300
0
    if (!context->chunk_cobj) {
301
0
        return FLB_FALSE;
302
0
    }
303
304
0
    cfl_list_foreach(head, &context->chunk_cobj->records) {
305
0
        record = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
306
307
0
        if (flb_condition_evaluate_ex(compiled, record, route_logs_get_variant) == FLB_TRUE) {
308
0
            result = FLB_TRUE;
309
0
            break;
310
0
        }
311
0
    }
312
313
0
    return result;
314
0
}
315
316
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
317
                               struct flb_router_chunk_context *context,
318
                               struct flb_route *route)
319
0
{
320
0
    (void) chunk;
321
0
    (void) context;
322
0
    (void) route;
323
324
0
    return FLB_FALSE;
325
0
}
326
327
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
328
                              struct flb_router_chunk_context *context,
329
                              struct flb_route *route)
330
0
{
331
0
    (void) chunk;
332
0
    (void) context;
333
0
    (void) route;
334
335
0
    return FLB_FALSE;
336
0
}
337
338
int flb_route_condition_eval(struct flb_event_chunk *chunk,
339
                             struct flb_router_chunk_context *context,
340
                             struct flb_route *route)
341
0
{
342
0
    uint32_t signal;
343
344
0
    if (!route) {
345
0
        return FLB_FALSE;
346
0
    }
347
348
0
    if (!route->condition) {
349
0
        return FLB_TRUE;
350
0
    }
351
352
0
    signal = flb_router_signal_from_chunk(chunk);
353
0
    if (signal == 0) {
354
0
        return FLB_FALSE;
355
0
    }
356
357
0
    if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) {
358
0
        return FLB_FALSE;
359
0
    }
360
361
0
    if (route->condition->is_default) {
362
0
        return FLB_TRUE;
363
0
    }
364
365
0
    switch (signal) {
366
0
    case FLB_ROUTER_SIGNAL_LOGS:
367
0
        return flb_condition_eval_logs(chunk, context, route);
368
0
    case FLB_ROUTER_SIGNAL_METRICS:
369
0
        return flb_condition_eval_metrics(chunk, context, route);
370
0
    case FLB_ROUTER_SIGNAL_TRACES:
371
0
        return flb_condition_eval_traces(chunk, context, route);
372
0
    default:
373
0
        break;
374
0
    }
375
376
0
    return FLB_FALSE;
377
0
}
378
379
int flb_router_path_should_route(struct flb_event_chunk *chunk,
380
                                 struct flb_router_chunk_context *context,
381
                                 struct flb_router_path *path)
382
0
{
383
0
    if (!path) {
384
0
        return FLB_FALSE;
385
0
    }
386
387
0
    if (!path->route) {
388
0
        return FLB_TRUE;
389
0
    }
390
391
0
    if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) {
392
0
        if (!context) {
393
0
            return FLB_FALSE;
394
0
        }
395
396
0
        if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
397
0
            return FLB_FALSE;
398
0
        }
399
0
    }
400
401
0
    return flb_route_condition_eval(chunk, context, path->route);
402
0
}
403
404
struct flb_condition *flb_router_route_get_condition(struct flb_route *route)
405
0
{
406
0
    if (!route || !route->condition) {
407
0
        return NULL;
408
0
    }
409
410
0
    return route_condition_get_compiled(route->condition);
411
0
}
412
413
int flb_router_condition_evaluate_record(struct flb_route *route,
414
                                         struct flb_mp_chunk_record *record)
415
0
{
416
0
    struct flb_condition *compiled;
417
418
0
    if (!route || !record) {
419
0
        return FLB_FALSE;
420
0
    }
421
422
0
    if (!route->condition) {
423
0
        return FLB_TRUE;
424
0
    }
425
426
0
    compiled = flb_router_route_get_condition(route);
427
0
    if (!compiled) {
428
0
        if (route->condition->is_default) {
429
0
            return FLB_TRUE;
430
0
        }
431
432
0
        return FLB_FALSE;
433
0
    }
434
435
0
    return flb_condition_evaluate_ex(compiled, record, route_logs_get_variant);
436
0
}
437
438
static int parse_rule_operator(const flb_sds_t op_str,
439
                               enum flb_rule_operator *out)
440
0
{
441
0
    if (!op_str || !out) {
442
0
        return -1;
443
0
    }
444
445
0
    if (strcasecmp(op_str, "eq") == 0) {
446
0
        *out = FLB_RULE_OP_EQ;
447
0
    }
448
0
    else if (strcasecmp(op_str, "neq") == 0) {
449
0
        *out = FLB_RULE_OP_NEQ;
450
0
    }
451
0
    else if (strcasecmp(op_str, "gt") == 0) {
452
0
        *out = FLB_RULE_OP_GT;
453
0
    }
454
0
    else if (strcasecmp(op_str, "lt") == 0) {
455
0
        *out = FLB_RULE_OP_LT;
456
0
    }
457
0
    else if (strcasecmp(op_str, "gte") == 0) {
458
0
        *out = FLB_RULE_OP_GTE;
459
0
    }
460
0
    else if (strcasecmp(op_str, "lte") == 0) {
461
0
        *out = FLB_RULE_OP_LTE;
462
0
    }
463
0
    else if (strcasecmp(op_str, "regex") == 0) {
464
0
        *out = FLB_RULE_OP_REGEX;
465
0
    }
466
0
    else if (strcasecmp(op_str, "not_regex") == 0) {
467
0
        *out = FLB_RULE_OP_NOT_REGEX;
468
0
    }
469
0
    else if (strcasecmp(op_str, "in") == 0) {
470
0
        *out = FLB_RULE_OP_IN;
471
0
    }
472
0
    else if (strcasecmp(op_str, "not_in") == 0) {
473
0
        *out = FLB_RULE_OP_NOT_IN;
474
0
    }
475
0
    else {
476
0
        return -1;
477
0
    }
478
479
0
    return 0;
480
0
}
481
482
static int parse_numeric_value(flb_sds_t value, double *out)
483
0
{
484
0
    char *endptr = NULL;
485
0
    double result;
486
487
0
    if (!value || !out) {
488
0
        return -1;
489
0
    }
490
491
0
    errno = 0;
492
0
    result = strtod(value, &endptr);
493
0
    if (errno == ERANGE || endptr == value || (endptr && *endptr != '\0')) {
494
0
        return -1;
495
0
    }
496
497
0
    *out = result;
498
0
    return 0;
499
0
}
500
501
static struct flb_condition *route_condition_compile(struct flb_route_condition *condition)
502
0
{
503
0
    int ret;
504
0
    double numeric_value;
505
0
    enum flb_rule_operator op;
506
0
    struct cfl_list *head;
507
0
    struct flb_condition *compiled;
508
0
    struct flb_route_condition_rule *rule;
509
510
0
    compiled = flb_condition_create(condition->op);
511
0
    if (!compiled) {
512
0
        return NULL;
513
0
    }
514
515
0
    cfl_list_foreach(head, &condition->rules) {
516
0
        rule = cfl_list_entry(head, struct flb_route_condition_rule, _head);
517
518
0
        if (!rule->field || !rule->op) {
519
0
            flb_condition_destroy(compiled);
520
0
            return NULL;
521
0
        }
522
523
0
        if (parse_rule_operator(rule->op, &op) != 0) {
524
0
            flb_condition_destroy(compiled);
525
0
            return NULL;
526
0
        }
527
528
0
        switch (op) {
529
0
        case FLB_RULE_OP_EQ:
530
0
        case FLB_RULE_OP_NEQ:
531
0
        case FLB_RULE_OP_REGEX:
532
0
        case FLB_RULE_OP_NOT_REGEX:
533
0
            if (!rule->value) {
534
0
                flb_condition_destroy(compiled);
535
0
                return NULL;
536
0
            }
537
0
            ret = flb_condition_add_rule(compiled, rule->field, op,
538
0
                                         rule->value, 1, rule->context);
539
0
            break;
540
0
        case FLB_RULE_OP_GT:
541
0
        case FLB_RULE_OP_LT:
542
0
        case FLB_RULE_OP_GTE:
543
0
        case FLB_RULE_OP_LTE:
544
0
            if (!rule->value) {
545
0
                flb_condition_destroy(compiled);
546
0
                return NULL;
547
0
            }
548
0
            if (parse_numeric_value(rule->value, &numeric_value) != 0) {
549
0
                flb_condition_destroy(compiled);
550
0
                return NULL;
551
0
            }
552
0
            ret = flb_condition_add_rule(compiled, rule->field, op,
553
0
                                         &numeric_value, 1, rule->context);
554
0
            break;
555
0
        case FLB_RULE_OP_IN:
556
0
        case FLB_RULE_OP_NOT_IN:
557
0
            if (!rule->values || rule->values_count == 0) {
558
0
                flb_condition_destroy(compiled);
559
0
                return NULL;
560
0
            }
561
0
            ret = flb_condition_add_rule(compiled, rule->field, op,
562
0
                                         rule->values,
563
0
                                         (int) rule->values_count,
564
0
                                         rule->context);
565
0
            break;
566
0
        default:
567
0
            flb_condition_destroy(compiled);
568
0
            return NULL;
569
0
        }
570
571
0
        if (ret != FLB_TRUE) {
572
0
            flb_condition_destroy(compiled);
573
0
            return NULL;
574
0
        }
575
0
    }
576
577
0
    return compiled;
578
0
}
579
580
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition)
581
0
{
582
0
    if (!condition) {
583
0
        return NULL;
584
0
    }
585
586
0
    if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_FAILURE) {
587
0
        return NULL;
588
0
    }
589
590
0
    if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS &&
591
0
        condition->compiled) {
592
0
        return condition->compiled;
593
0
    }
594
595
0
    condition->compiled = route_condition_compile(condition);
596
0
    if (!condition->compiled) {
597
0
        condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE;
598
0
        return NULL;
599
0
    }
600
601
0
    condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS;
602
0
    return condition->compiled;
603
0
}
604