Coverage Report

Created: 2025-01-28 07:34

/src/fluent-bit/plugins/processor_sql/sql.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_processor_plugin.h>
21
#include <fluent-bit/flb_mem.h>
22
23
#include "sql.h"
24
#include "sql_config.h"
25
26
/* String type to numerical conversion */
27
0
#define SQL_STR_INT   1
28
0
#define SQL_STR_FLOAT 2
29
30
static int cb_init(struct flb_processor_instance *ins,
31
                   void *source_plugin_instance,
32
                   int source_plugin_type,
33
                   struct flb_config *config)
34
0
{
35
0
    struct sql_ctx *ctx;
36
37
0
    ctx = sql_config_create(ins, config);
38
0
    if (!ctx) {
39
0
        return -1;
40
0
    }
41
42
0
    flb_processor_instance_set_context(ins, ctx);
43
44
0
    return FLB_PROCESSOR_SUCCESS;
45
0
}
46
47
/* Processor exit */
48
static int cb_exit(struct flb_processor_instance *ins, void *data   )
49
0
{
50
0
    struct sql_ctx *ctx = (struct sql_ctx *) data;
51
52
0
    if (!ctx) {
53
0
        return FLB_PROCESSOR_SUCCESS;
54
0
    }
55
56
0
    sql_config_destroy(ctx);
57
0
    return FLB_PROCESSOR_SUCCESS;
58
0
}
59
60
void sql_expression_val_free(struct sql_expression_val *v)
61
0
{
62
0
    if (!v) {
63
0
        return;
64
0
    }
65
66
0
    if (v->type == SQL_EXP_STRING) {
67
0
        cfl_sds_destroy(v->val.string);
68
0
    }
69
70
0
    flb_free(v);
71
0
}
72
73
/*
74
 * Convert a string to a numerical representation:
75
 *
76
 * - if output number is an integer, 'i' is set and returns FLB_STR_INT
77
 * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT
78
 * - if no conversion is possible (not a number), returns -1
79
 */
80
static int string_to_number(const char *str, int len, int64_t *i, double *d)
81
0
{
82
0
    int c;
83
0
    int dots = 0;
84
0
    char *end;
85
0
    int64_t i_out;
86
0
    double d_out;
87
88
    /* Detect if this is a floating point number */
89
0
    for (c = 0; c < len; c++) {
90
0
        if (str[c] == '.') {
91
0
            dots++;
92
0
        }
93
0
    }
94
95
0
    if (dots > 1) {
96
0
        return -1;
97
0
    }
98
0
    else if (dots == 1) {
99
        /* Floating point number */
100
0
        errno = 0;
101
0
        d_out = strtold(str, &end);
102
103
        /* Check for various possible errors */
104
0
        if ((errno == ERANGE || (errno != 0 && d_out == 0))) {
105
0
            return -1;
106
0
        }
107
108
0
        if (end == str) {
109
0
            return -1;
110
0
        }
111
112
0
        *d = d_out;
113
0
        return SQL_STR_FLOAT;
114
0
    }
115
0
    else {
116
        /* Integer */
117
0
        errno = 0;
118
0
        i_out = strtoll(str, &end, 10);
119
120
        /* Check for various possible errors */
121
0
        if ((errno == ERANGE || (errno != 0 && i_out == 0))) {
122
0
            return -1;
123
0
        }
124
125
0
        if (end == str) {
126
0
            return -1;
127
0
        }
128
129
0
        *i = i_out;
130
0
        return SQL_STR_INT;
131
0
    }
132
133
0
    return -1;
134
0
}
135
136
/* Convert (string) expression to number */
137
static void exp_string_to_number(struct sql_expression_val *val)
138
0
{
139
0
    int ret;
140
0
    int len;
141
0
    int64_t i = 0;
142
0
    char *str;
143
0
    double d = 0.0;
144
145
0
    len = flb_sds_len(val->val.string);
146
0
    str = val->val.string;
147
148
0
    ret = string_to_number(str, len, &i, &d);
149
0
    if (ret == -1) {
150
0
        return;
151
0
    }
152
153
    /* Assign to proper type */
154
0
    if (ret == SQL_STR_FLOAT) {
155
0
        flb_sds_destroy(val->val.string);
156
0
        val->type = SQL_EXP_FLOAT;
157
0
        val->val.f64 = d;
158
0
    }
159
0
    else if (ret == SQL_STR_INT) {
160
0
        flb_sds_destroy(val->val.string);
161
0
        val->type = SQL_EXP_INT;
162
0
        val->val.i64 = i;
163
0
    }
164
0
}
165
166
167
168
0
static bool value_to_bool(struct sql_expression_val *val) {
169
0
    bool result = FLB_FALSE;
170
171
0
    switch (val->type) {
172
0
    case SQL_EXP_BOOL:
173
0
        result = val->val.boolean;
174
0
        break;
175
0
    case SQL_EXP_INT:
176
0
        result = val->val.i64 > 0;
177
0
        break;
178
0
    case SQL_EXP_FLOAT:
179
0
        result = val->val.f64 > 0;
180
0
        break;
181
0
    case SQL_EXP_STRING:
182
0
        result = true;
183
0
        break;
184
0
    }
185
186
0
    return result;
187
0
}
188
static void itof_convert(struct sql_expression_val *val)
189
0
{
190
0
    if (val->type != SQL_EXP_INT) {
191
0
        return;
192
0
    }
193
194
0
    val->type = SQL_EXP_FLOAT;
195
0
    val->val.f64 = (double) val->val.i64;
196
0
}
197
198
static void numerical_comp(struct sql_expression_val *left,
199
                           struct sql_expression_val *right,
200
                           struct sql_expression_val *result, int op)
201
0
{
202
0
    result->type = SQL_EXP_BOOL;
203
204
0
    if (left == NULL || right == NULL) {
205
0
        result->val.boolean = false;
206
0
        return;
207
0
    }
208
209
    /* Check if left expression value is a number, if so, convert it */
210
0
    if (left->type == SQL_EXP_STRING && right->type != SQL_EXP_STRING) {
211
0
        exp_string_to_number(left);
212
0
    }
213
214
0
    if (left->type == SQL_EXP_INT && right->type == SQL_EXP_FLOAT) {
215
0
        itof_convert(left);
216
0
    }
217
0
    else if (left->type == SQL_EXP_FLOAT && right->type == SQL_EXP_INT) {
218
0
        itof_convert(right);
219
0
    }
220
221
0
    switch (op) {
222
0
    case SQL_EXP_EQ:
223
0
        if (left->type == right->type) {
224
0
            switch(left->type) {
225
0
            case SQL_EXP_NULL:
226
0
                result->val.boolean = true;
227
0
                break;
228
0
            case SQL_EXP_BOOL:
229
0
                result->val.boolean = (left->val.boolean == right->val.boolean);
230
0
                break;
231
0
            case SQL_EXP_INT:
232
0
                result->val.boolean = (left->val.i64 == right->val.i64);
233
0
                break;
234
0
            case SQL_EXP_FLOAT:
235
0
                result->val.boolean = (left->val.f64 == right->val.f64);
236
0
                break;
237
0
            case SQL_EXP_STRING:
238
0
                if (flb_sds_len(left->val.string) !=
239
0
                    flb_sds_len(right->val.string)) {
240
0
                    result->val.boolean = false;
241
0
                }
242
0
                else if (strncmp(left->val.string, right->val.string,
243
0
                                 flb_sds_len(left->val.string)) != 0) {
244
0
                    result->val.boolean = false;
245
0
                }
246
0
                else {
247
0
                    result->val.boolean = true;
248
0
                }
249
0
                break;
250
0
            default:
251
0
                result->val.boolean = false;
252
0
                break;
253
0
            }
254
0
        }
255
0
        else {
256
0
            result->val.boolean = false;
257
0
        }
258
0
        break;
259
0
    case SQL_EXP_LT:
260
0
        if (left->type == right->type) {
261
0
            switch(left->type) {
262
0
            case SQL_EXP_INT:
263
0
                result->val.boolean = (left->val.i64 < right->val.i64);
264
0
                break;
265
0
            case SQL_EXP_FLOAT:
266
0
                result->val.boolean = (left->val.f64 < right->val.f64);
267
0
                break;
268
0
            case SQL_EXP_STRING:
269
0
                if (strncmp(left->val.string, right->val.string,
270
0
                            flb_sds_len(left->val.string)) < 0) {
271
0
                    result->val.boolean = true;
272
0
                }
273
0
                else {
274
0
                    result->val.boolean = false;
275
0
                }
276
0
                break;
277
0
            default:
278
0
                result->val.boolean = false;
279
0
                break;
280
0
            }
281
0
        }
282
0
        else {
283
0
            result->val.boolean = false;
284
0
        }
285
0
        break;
286
0
    case SQL_EXP_LTE:
287
0
        if (left->type == right->type) {
288
0
            switch(left->type) {
289
0
            case SQL_EXP_INT:
290
0
                result->val.boolean = (left->val.i64 <= right->val.i64);
291
0
                break;
292
0
            case SQL_EXP_FLOAT:
293
0
                result->val.boolean = (left->val.f64 <= right->val.f64);
294
0
                break;
295
0
            case SQL_EXP_STRING:
296
0
                if (strncmp(left->val.string, right->val.string,
297
0
                            flb_sds_len(left->val.string)) <= 0) {
298
0
                    result->val.boolean = true;
299
0
                }
300
0
                else {
301
0
                    result->val.boolean = false;
302
0
                }
303
0
                break;
304
0
            default:
305
0
                result->val.boolean = false;
306
0
                break;
307
0
            }
308
0
        }
309
0
        else {
310
0
            result->val.boolean = false;
311
0
        }
312
0
        break;
313
0
    case SQL_EXP_GT:
314
0
        if (left->type == right->type) {
315
0
            switch(left->type) {
316
0
            case SQL_EXP_INT:
317
0
                result->val.boolean = (left->val.i64 > right->val.i64);
318
0
                break;
319
0
            case SQL_EXP_FLOAT:
320
0
                result->val.boolean = (left->val.f64 > right->val.f64);
321
0
                break;
322
0
            case SQL_EXP_STRING:
323
0
                if (strncmp(left->val.string, right->val.string,
324
0
                            flb_sds_len(left->val.string)) > 0) {
325
0
                    result->val.boolean = true;
326
0
                }
327
0
                else {
328
0
                    result->val.boolean = false;
329
0
                }
330
0
                break;
331
0
            default:
332
0
                result->val.boolean = false;
333
0
                break;
334
0
            }
335
0
        }
336
0
        else {
337
0
            result->val.boolean = false;
338
0
        }
339
0
        break;
340
0
    case SQL_EXP_GTE:
341
0
        if (left->type == right->type) {
342
0
            switch(left->type) {
343
0
            case SQL_EXP_INT:
344
0
                result->val.boolean = (left->val.i64 >= right->val.i64);
345
0
                break;
346
0
            case SQL_EXP_FLOAT:
347
0
                result->val.boolean = (left->val.f64 >= right->val.f64);
348
0
                break;
349
0
            case SQL_EXP_STRING:
350
0
                if (strncmp(left->val.string, right->val.string,
351
0
                            flb_sds_len(left->val.string)) >= 0) {
352
0
                    result->val.boolean = true;
353
0
                }
354
0
                else {
355
0
                    result->val.boolean = false;
356
0
                }
357
0
                break;
358
0
            default:
359
0
                result->val.boolean = false;
360
0
                break;
361
0
            }
362
0
        }
363
0
        else {
364
0
            result->val.boolean = false;
365
0
        }
366
0
        break;
367
0
    }
368
0
}
369
370
static void logical_operation(struct sql_expression_val *left,
371
                              struct sql_expression_val *right,
372
                              struct sql_expression_val *result, int op)
373
0
{
374
0
    bool lval;
375
0
    bool rval;
376
377
0
    result->type = SQL_EXP_BOOL;
378
379
    /* Null is always interpreted as false in a logical operation */
380
0
    lval = left ? value_to_bool(left) : false;
381
0
    rval = right ? value_to_bool(right) : false;
382
383
0
    switch (op) {
384
0
    case SQL_EXP_NOT:
385
0
        result->val.boolean = !lval;
386
0
        break;
387
0
    case SQL_EXP_AND:
388
0
        result->val.boolean = lval & rval;
389
0
        break;
390
0
    case SQL_EXP_OR:
391
0
        result->val.boolean = lval | rval;
392
0
        break;
393
0
    }
394
0
}
395
396
static int sql_key_to_value(char *name, struct flb_mp_chunk_record *record, struct sql_expression_val *val)
397
0
{
398
399
0
    struct cfl_list *head;
400
0
    struct cfl_list *tmp;
401
0
    struct cfl_variant *var;
402
0
    struct cfl_kvlist *kvlist;
403
0
    struct cfl_kvpair *kvpair;
404
405
0
    kvlist = record->cobj_record->variant->data.as_kvlist;
406
407
0
    cfl_list_foreach_safe(head, tmp, &kvlist->list) {
408
0
        kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
409
410
0
        if (cfl_sds_len(kvpair->key) != cfl_sds_len(name)) {
411
0
            var = NULL;
412
0
            continue;
413
0
        }
414
415
0
        if (strcmp(kvpair->key, name) != 0) {
416
0
            var = NULL;
417
0
            continue;
418
0
        }
419
420
0
        var = kvpair->val;
421
0
        break;
422
0
    }
423
424
0
    if (!var) {
425
0
        return -1;
426
0
    }
427
428
0
    if (var->type == CFL_VARIANT_STRING) {
429
0
        val->type = SQL_EXP_STRING;
430
0
        val->val.string = cfl_sds_create(kvpair->val->data.as_string);
431
0
    }
432
0
    else if (var->type == CFL_VARIANT_INT) {
433
0
        val->type = SQL_EXP_INT;
434
0
        val->val.i64 = kvpair->val->data.as_int64;
435
0
    }
436
0
    else if (var->type == CFL_VARIANT_UINT) {
437
        /*
438
         * Note on uint64 handling: our parsing rules in sql-parser.l handles the strings
439
         * that represents integers through an atol() conversion. If we get a case of a
440
         * long unsigned value, we can adjust it here by extending the sql_val union.
441
         *
442
         */
443
0
        val->type = SQL_EXP_INT;
444
0
        val->val.i64 = kvpair->val->data.as_uint64;
445
0
    }
446
0
    else if (var->type == CFL_VARIANT_DOUBLE) {
447
0
        val->type = SQL_EXP_FLOAT;
448
0
        val->val.f64 = kvpair->val->data.as_double;
449
0
    }
450
0
    else if (var->type == CFL_VARIANT_BOOL) {
451
0
        val->type = SQL_EXP_BOOL;
452
0
        val->val.boolean = kvpair->val->data.as_bool;
453
0
    }
454
0
    else if (var->type == CFL_VARIANT_NULL) {
455
0
        val->type = SQL_EXP_NULL;
456
0
        val->val.boolean = 1;
457
0
    }
458
0
    else {
459
0
        return -1;
460
0
    }
461
462
0
    return 0;
463
0
}
464
465
static struct sql_expression_val *reduce_expression(struct sql_expression *expression,
466
                                                    struct flb_mp_chunk_record *record)
467
468
0
{
469
0
    int ret;
470
0
    int operation;
471
0
    flb_sds_t s;
472
0
    struct sql_expression_key *key;
473
0
    struct sql_expression_val *left;
474
0
    struct sql_expression_val *right;
475
0
    struct sql_expression_val *result;
476
477
0
    if (!expression) {
478
0
        return NULL;
479
0
    }
480
481
0
    result = flb_calloc(1, sizeof(struct sql_expression_val));
482
0
    if (!result) {
483
0
        flb_errno();
484
0
        return NULL;
485
0
    }
486
487
0
    switch (expression->type) {
488
0
    case SQL_EXP_NULL:
489
0
        result->type = expression->type;
490
0
        break;
491
0
    case SQL_EXP_BOOL:
492
0
        result->type = expression->type;
493
0
        result->val.boolean = ((struct sql_expression_val *) expression)->val.boolean;
494
0
        break;
495
0
    case SQL_EXP_INT:
496
0
        result->type = expression->type;
497
0
        result->val.i64 = ((struct sql_expression_val *) expression)->val.i64;
498
0
        break;
499
0
    case SQL_EXP_FLOAT:
500
0
        result->type = expression->type;
501
0
        result->val.f64 = ((struct sql_expression_val *) expression)->val.f64;
502
0
        break;
503
0
    case SQL_EXP_STRING:
504
0
        s = ((struct sql_expression_val *) expression)->val.string;
505
0
        result->type = expression->type;
506
0
        result->val.string = cfl_sds_create(s);
507
0
        break;
508
0
    case SQL_EXP_KEY:
509
0
        key = (struct sql_expression_key *) expression;
510
0
        ret = sql_key_to_value(key->name, record, result);
511
0
        if (ret == 0) {
512
0
            return result;
513
0
        }
514
0
        else {
515
0
            flb_free(result);
516
0
            return NULL;
517
0
        }
518
0
        break;
519
    /* Functions are to be defined
520
    case SQL_EXP_FUNC:
521
        we don't need result
522
        flb_free(result);
523
        ret = reduce_expression(((struct sql_func *) expression)->param,
524
                                tag, tag_len, tms, map);
525
        result = ((struct SQL_EXP_func *) expression)->cb_func(tag, tag_len,
526
                                                               tms, ret);
527
        sql_expression_val_free(ret);
528
        break;
529
    */
530
0
    case SQL_LOGICAL_OP:
531
0
        left = reduce_expression(expression->left, record);
532
0
        right = reduce_expression(expression->right, record);
533
534
0
        operation = ((struct sql_expression_op *) expression)->operation;
535
536
0
        switch (operation) {
537
0
        case SQL_EXP_PAR:
538
0
            if (left == NULL) { /* Null is always interpreted as false in a
539
                                   logical operation */
540
0
                result->type = SQL_EXP_BOOL;
541
0
                result->val.boolean = false;
542
0
            }
543
0
            else { /* Left and right sides of a logical operation reduce to
544
                      boolean values */
545
0
                result->type = SQL_EXP_BOOL;
546
0
                result->val.boolean = left->val.boolean;
547
0
            }
548
0
            break;
549
0
        case SQL_EXP_EQ:
550
0
        case SQL_EXP_LT:
551
0
        case SQL_EXP_LTE:
552
0
        case SQL_EXP_GT:
553
0
        case SQL_EXP_GTE:
554
0
            numerical_comp(left, right, result, operation);
555
0
            break;
556
0
        case SQL_EXP_NOT:
557
0
        case SQL_EXP_AND:
558
0
        case SQL_EXP_OR:
559
0
            logical_operation(left, right, result, operation);
560
0
            break;
561
0
        }
562
0
        sql_expression_val_free(left);
563
0
        sql_expression_val_free(right);
564
0
        break;
565
0
    default:
566
0
        break;
567
0
    }
568
569
0
    return result;
570
0
}
571
572
573
static int process_record(struct sql_ctx *ctx, struct sql_query *query, struct flb_mp_chunk_record *chunk_record)
574
0
{
575
0
    int found = FLB_FALSE;
576
0
    struct sql_key *key;
577
0
    struct cfl_list *tmp;
578
0
    struct cfl_list *tmp_var;
579
0
    struct cfl_list *head;
580
0
    struct cfl_kvlist *kvlist;
581
0
    struct cfl_kvpair *kvpair;
582
0
    struct cfl_list *head_var;
583
0
    struct sql_expression_val *condition;
584
585
586
    /* check if the query contains a conditional statement */
587
0
    if (query->condition) {
588
0
        condition = reduce_expression(query->condition, chunk_record);
589
0
        if (!condition) {
590
0
            return 0;
591
0
        }
592
0
        else if (!condition->val.boolean) {
593
0
            flb_free(condition);
594
0
            return -1;
595
0
        }
596
0
        else {
597
0
            flb_free(condition);
598
0
        }
599
600
0
    }
601
602
    /*
603
     * iterate all the record keys and see if they are listed in the selected keys,
604
     * otherwise check if a wildcard has been used so all the keys will match.
605
     *
606
     * if no matches exists, just remove the record.
607
     */
608
0
    kvlist = chunk_record->cobj_record->variant->data.as_kvlist;
609
610
611
0
    cfl_list_foreach_safe(head, tmp, &kvlist->list) {
612
0
        kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
613
614
0
        found = FLB_FALSE;
615
616
        /* check if we have a wildcard */
617
0
        if (cfl_list_size(&query->keys) > 0) {
618
0
            key = cfl_list_entry_first(&query->keys, struct sql_key, _head);
619
0
            if (key->name == NULL) {
620
0
                found = FLB_TRUE;
621
0
            }
622
0
        }
623
624
0
        if (found == FLB_FALSE) {
625
0
            cfl_list_foreach_safe(head_var, tmp_var, &ctx->query->keys) {
626
0
                key = cfl_list_entry(head_var, struct sql_key, _head);
627
628
0
                if (cfl_sds_len(kvpair->key) != cfl_sds_len(key->name)) {
629
0
                    continue;
630
0
                }
631
632
0
                if (strcmp(kvpair->key, key->name) == 0) {
633
0
                    found = FLB_TRUE;
634
0
                    break;
635
0
                }
636
0
            }
637
0
        }
638
639
0
        if (!found) {
640
0
            cfl_kvpair_destroy(kvpair);
641
0
        }
642
0
        else {
643
            /* we keep the key in the list, check if it needs an alias */
644
0
            if (key->alias) {
645
0
                cfl_sds_destroy(kvpair->key);
646
0
                kvpair->key = cfl_sds_create(key->alias);
647
0
            }
648
0
        }
649
0
    }
650
651
0
    return 0;
652
0
}
653
654
/* Logs callback */
655
static int cb_process_logs(struct flb_processor_instance *ins,
656
                           void *chunk_data,
657
                           const char *tag,
658
                           int tag_len)
659
0
{
660
0
    int ret;
661
0
    struct sql_ctx *ctx;
662
0
    struct flb_mp_chunk_cobj *chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data;
663
0
    struct flb_mp_chunk_record *record;
664
0
    ctx = ins->context;
665
666
    /* Iterate records */
667
0
    while (flb_mp_chunk_cobj_record_next(chunk_cobj, &record) == FLB_MP_CHUNK_RECORD_OK) {
668
0
        ret = process_record(ctx, ctx->query, record);
669
0
        if (ret == -1) {
670
          /* remove the record from the chunk */
671
0
          flb_mp_chunk_cobj_record_destroy(chunk_cobj, record);
672
0
        }
673
0
    }
674
675
0
    return FLB_PROCESSOR_SUCCESS;
676
0
}
677
678
679
static struct flb_config_map config_map[] = {
680
    {
681
        FLB_CONFIG_MAP_STR, "query", NULL,
682
        0, FLB_TRUE, offsetof(struct sql_ctx, query_str),
683
        "SQL query for data selection."
684
    },
685
686
    /* EOF */
687
    {0}
688
};
689
690
struct flb_processor_plugin processor_sql_plugin = {
691
    .name               = "sql",
692
    .description        = "SQL processor",
693
    .cb_init            = cb_init,
694
    .cb_process_logs    = cb_process_logs,
695
    .cb_process_metrics = NULL,
696
    .cb_process_traces  = NULL,
697
    .cb_exit            = cb_exit,
698
    .config_map         = config_map,
699
    .flags              = 0
700
};
701