/src/fluent-bit/plugins/processor_sql/sql.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_processor_plugin.h> |
21 | | #include <fluent-bit/flb_mem.h> |
22 | | |
23 | | #include "sql.h" |
24 | | #include "sql_config.h" |
25 | | |
26 | | /* String type to numerical conversion */ |
27 | 0 | #define SQL_STR_INT 1 |
28 | 0 | #define SQL_STR_FLOAT 2 |
29 | | |
30 | | static int cb_init(struct flb_processor_instance *ins, |
31 | | void *source_plugin_instance, |
32 | | int source_plugin_type, |
33 | | struct flb_config *config) |
34 | 0 | { |
35 | 0 | struct sql_ctx *ctx; |
36 | |
|
37 | 0 | ctx = sql_config_create(ins, config); |
38 | 0 | if (!ctx) { |
39 | 0 | return -1; |
40 | 0 | } |
41 | | |
42 | 0 | flb_processor_instance_set_context(ins, ctx); |
43 | |
|
44 | 0 | return FLB_PROCESSOR_SUCCESS; |
45 | 0 | } |
46 | | |
47 | | /* Processor exit */ |
48 | | static int cb_exit(struct flb_processor_instance *ins, void *data ) |
49 | 0 | { |
50 | 0 | struct sql_ctx *ctx = (struct sql_ctx *) data; |
51 | |
|
52 | 0 | if (!ctx) { |
53 | 0 | return FLB_PROCESSOR_SUCCESS; |
54 | 0 | } |
55 | | |
56 | 0 | sql_config_destroy(ctx); |
57 | 0 | return FLB_PROCESSOR_SUCCESS; |
58 | 0 | } |
59 | | |
60 | | void sql_expression_val_free(struct sql_expression_val *v) |
61 | 0 | { |
62 | 0 | if (!v) { |
63 | 0 | return; |
64 | 0 | } |
65 | | |
66 | 0 | if (v->type == SQL_EXP_STRING) { |
67 | 0 | cfl_sds_destroy(v->val.string); |
68 | 0 | } |
69 | |
|
70 | 0 | flb_free(v); |
71 | 0 | } |
72 | | |
73 | | /* |
74 | | * Convert a string to a numerical representation: |
75 | | * |
76 | | * - if output number is an integer, 'i' is set and returns FLB_STR_INT |
77 | | * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT |
78 | | * - if no conversion is possible (not a number), returns -1 |
79 | | */ |
80 | | static int string_to_number(const char *str, int len, int64_t *i, double *d) |
81 | 0 | { |
82 | 0 | int c; |
83 | 0 | int dots = 0; |
84 | 0 | char *end; |
85 | 0 | int64_t i_out; |
86 | 0 | double d_out; |
87 | | |
88 | | /* Detect if this is a floating point number */ |
89 | 0 | for (c = 0; c < len; c++) { |
90 | 0 | if (str[c] == '.') { |
91 | 0 | dots++; |
92 | 0 | } |
93 | 0 | } |
94 | |
|
95 | 0 | if (dots > 1) { |
96 | 0 | return -1; |
97 | 0 | } |
98 | 0 | else if (dots == 1) { |
99 | | /* Floating point number */ |
100 | 0 | errno = 0; |
101 | 0 | d_out = strtold(str, &end); |
102 | | |
103 | | /* Check for various possible errors */ |
104 | 0 | if ((errno == ERANGE || (errno != 0 && d_out == 0))) { |
105 | 0 | return -1; |
106 | 0 | } |
107 | | |
108 | 0 | if (end == str) { |
109 | 0 | return -1; |
110 | 0 | } |
111 | | |
112 | 0 | *d = d_out; |
113 | 0 | return SQL_STR_FLOAT; |
114 | 0 | } |
115 | 0 | else { |
116 | | /* Integer */ |
117 | 0 | errno = 0; |
118 | 0 | i_out = strtoll(str, &end, 10); |
119 | | |
120 | | /* Check for various possible errors */ |
121 | 0 | if ((errno == ERANGE || (errno != 0 && i_out == 0))) { |
122 | 0 | return -1; |
123 | 0 | } |
124 | | |
125 | 0 | if (end == str) { |
126 | 0 | return -1; |
127 | 0 | } |
128 | | |
129 | 0 | *i = i_out; |
130 | 0 | return SQL_STR_INT; |
131 | 0 | } |
132 | | |
133 | 0 | return -1; |
134 | 0 | } |
135 | | |
136 | | /* Convert (string) expression to number */ |
137 | | static void exp_string_to_number(struct sql_expression_val *val) |
138 | 0 | { |
139 | 0 | int ret; |
140 | 0 | int len; |
141 | 0 | int64_t i = 0; |
142 | 0 | char *str; |
143 | 0 | double d = 0.0; |
144 | |
|
145 | 0 | len = flb_sds_len(val->val.string); |
146 | 0 | str = val->val.string; |
147 | |
|
148 | 0 | ret = string_to_number(str, len, &i, &d); |
149 | 0 | if (ret == -1) { |
150 | 0 | return; |
151 | 0 | } |
152 | | |
153 | | /* Assign to proper type */ |
154 | 0 | if (ret == SQL_STR_FLOAT) { |
155 | 0 | flb_sds_destroy(val->val.string); |
156 | 0 | val->type = SQL_EXP_FLOAT; |
157 | 0 | val->val.f64 = d; |
158 | 0 | } |
159 | 0 | else if (ret == SQL_STR_INT) { |
160 | 0 | flb_sds_destroy(val->val.string); |
161 | 0 | val->type = SQL_EXP_INT; |
162 | 0 | val->val.i64 = i; |
163 | 0 | } |
164 | 0 | } |
165 | | |
166 | | |
167 | | |
168 | 0 | static bool value_to_bool(struct sql_expression_val *val) { |
169 | 0 | bool result = FLB_FALSE; |
170 | |
|
171 | 0 | switch (val->type) { |
172 | 0 | case SQL_EXP_BOOL: |
173 | 0 | result = val->val.boolean; |
174 | 0 | break; |
175 | 0 | case SQL_EXP_INT: |
176 | 0 | result = val->val.i64 > 0; |
177 | 0 | break; |
178 | 0 | case SQL_EXP_FLOAT: |
179 | 0 | result = val->val.f64 > 0; |
180 | 0 | break; |
181 | 0 | case SQL_EXP_STRING: |
182 | 0 | result = true; |
183 | 0 | break; |
184 | 0 | } |
185 | | |
186 | 0 | return result; |
187 | 0 | } |
188 | | static void itof_convert(struct sql_expression_val *val) |
189 | 0 | { |
190 | 0 | if (val->type != SQL_EXP_INT) { |
191 | 0 | return; |
192 | 0 | } |
193 | | |
194 | 0 | val->type = SQL_EXP_FLOAT; |
195 | 0 | val->val.f64 = (double) val->val.i64; |
196 | 0 | } |
197 | | |
198 | | static void numerical_comp(struct sql_expression_val *left, |
199 | | struct sql_expression_val *right, |
200 | | struct sql_expression_val *result, int op) |
201 | 0 | { |
202 | 0 | result->type = SQL_EXP_BOOL; |
203 | |
|
204 | 0 | if (left == NULL || right == NULL) { |
205 | 0 | result->val.boolean = false; |
206 | 0 | return; |
207 | 0 | } |
208 | | |
209 | | /* Check if left expression value is a number, if so, convert it */ |
210 | 0 | if (left->type == SQL_EXP_STRING && right->type != SQL_EXP_STRING) { |
211 | 0 | exp_string_to_number(left); |
212 | 0 | } |
213 | |
|
214 | 0 | if (left->type == SQL_EXP_INT && right->type == SQL_EXP_FLOAT) { |
215 | 0 | itof_convert(left); |
216 | 0 | } |
217 | 0 | else if (left->type == SQL_EXP_FLOAT && right->type == SQL_EXP_INT) { |
218 | 0 | itof_convert(right); |
219 | 0 | } |
220 | |
|
221 | 0 | switch (op) { |
222 | 0 | case SQL_EXP_EQ: |
223 | 0 | if (left->type == right->type) { |
224 | 0 | switch(left->type) { |
225 | 0 | case SQL_EXP_NULL: |
226 | 0 | result->val.boolean = true; |
227 | 0 | break; |
228 | 0 | case SQL_EXP_BOOL: |
229 | 0 | result->val.boolean = (left->val.boolean == right->val.boolean); |
230 | 0 | break; |
231 | 0 | case SQL_EXP_INT: |
232 | 0 | result->val.boolean = (left->val.i64 == right->val.i64); |
233 | 0 | break; |
234 | 0 | case SQL_EXP_FLOAT: |
235 | 0 | result->val.boolean = (left->val.f64 == right->val.f64); |
236 | 0 | break; |
237 | 0 | case SQL_EXP_STRING: |
238 | 0 | if (flb_sds_len(left->val.string) != |
239 | 0 | flb_sds_len(right->val.string)) { |
240 | 0 | result->val.boolean = false; |
241 | 0 | } |
242 | 0 | else if (strncmp(left->val.string, right->val.string, |
243 | 0 | flb_sds_len(left->val.string)) != 0) { |
244 | 0 | result->val.boolean = false; |
245 | 0 | } |
246 | 0 | else { |
247 | 0 | result->val.boolean = true; |
248 | 0 | } |
249 | 0 | break; |
250 | 0 | default: |
251 | 0 | result->val.boolean = false; |
252 | 0 | break; |
253 | 0 | } |
254 | 0 | } |
255 | 0 | else { |
256 | 0 | result->val.boolean = false; |
257 | 0 | } |
258 | 0 | break; |
259 | 0 | case SQL_EXP_LT: |
260 | 0 | if (left->type == right->type) { |
261 | 0 | switch(left->type) { |
262 | 0 | case SQL_EXP_INT: |
263 | 0 | result->val.boolean = (left->val.i64 < right->val.i64); |
264 | 0 | break; |
265 | 0 | case SQL_EXP_FLOAT: |
266 | 0 | result->val.boolean = (left->val.f64 < right->val.f64); |
267 | 0 | break; |
268 | 0 | case SQL_EXP_STRING: |
269 | 0 | if (strncmp(left->val.string, right->val.string, |
270 | 0 | flb_sds_len(left->val.string)) < 0) { |
271 | 0 | result->val.boolean = true; |
272 | 0 | } |
273 | 0 | else { |
274 | 0 | result->val.boolean = false; |
275 | 0 | } |
276 | 0 | break; |
277 | 0 | default: |
278 | 0 | result->val.boolean = false; |
279 | 0 | break; |
280 | 0 | } |
281 | 0 | } |
282 | 0 | else { |
283 | 0 | result->val.boolean = false; |
284 | 0 | } |
285 | 0 | break; |
286 | 0 | case SQL_EXP_LTE: |
287 | 0 | if (left->type == right->type) { |
288 | 0 | switch(left->type) { |
289 | 0 | case SQL_EXP_INT: |
290 | 0 | result->val.boolean = (left->val.i64 <= right->val.i64); |
291 | 0 | break; |
292 | 0 | case SQL_EXP_FLOAT: |
293 | 0 | result->val.boolean = (left->val.f64 <= right->val.f64); |
294 | 0 | break; |
295 | 0 | case SQL_EXP_STRING: |
296 | 0 | if (strncmp(left->val.string, right->val.string, |
297 | 0 | flb_sds_len(left->val.string)) <= 0) { |
298 | 0 | result->val.boolean = true; |
299 | 0 | } |
300 | 0 | else { |
301 | 0 | result->val.boolean = false; |
302 | 0 | } |
303 | 0 | break; |
304 | 0 | default: |
305 | 0 | result->val.boolean = false; |
306 | 0 | break; |
307 | 0 | } |
308 | 0 | } |
309 | 0 | else { |
310 | 0 | result->val.boolean = false; |
311 | 0 | } |
312 | 0 | break; |
313 | 0 | case SQL_EXP_GT: |
314 | 0 | if (left->type == right->type) { |
315 | 0 | switch(left->type) { |
316 | 0 | case SQL_EXP_INT: |
317 | 0 | result->val.boolean = (left->val.i64 > right->val.i64); |
318 | 0 | break; |
319 | 0 | case SQL_EXP_FLOAT: |
320 | 0 | result->val.boolean = (left->val.f64 > right->val.f64); |
321 | 0 | break; |
322 | 0 | case SQL_EXP_STRING: |
323 | 0 | if (strncmp(left->val.string, right->val.string, |
324 | 0 | flb_sds_len(left->val.string)) > 0) { |
325 | 0 | result->val.boolean = true; |
326 | 0 | } |
327 | 0 | else { |
328 | 0 | result->val.boolean = false; |
329 | 0 | } |
330 | 0 | break; |
331 | 0 | default: |
332 | 0 | result->val.boolean = false; |
333 | 0 | break; |
334 | 0 | } |
335 | 0 | } |
336 | 0 | else { |
337 | 0 | result->val.boolean = false; |
338 | 0 | } |
339 | 0 | break; |
340 | 0 | case SQL_EXP_GTE: |
341 | 0 | if (left->type == right->type) { |
342 | 0 | switch(left->type) { |
343 | 0 | case SQL_EXP_INT: |
344 | 0 | result->val.boolean = (left->val.i64 >= right->val.i64); |
345 | 0 | break; |
346 | 0 | case SQL_EXP_FLOAT: |
347 | 0 | result->val.boolean = (left->val.f64 >= right->val.f64); |
348 | 0 | break; |
349 | 0 | case SQL_EXP_STRING: |
350 | 0 | if (strncmp(left->val.string, right->val.string, |
351 | 0 | flb_sds_len(left->val.string)) >= 0) { |
352 | 0 | result->val.boolean = true; |
353 | 0 | } |
354 | 0 | else { |
355 | 0 | result->val.boolean = false; |
356 | 0 | } |
357 | 0 | break; |
358 | 0 | default: |
359 | 0 | result->val.boolean = false; |
360 | 0 | break; |
361 | 0 | } |
362 | 0 | } |
363 | 0 | else { |
364 | 0 | result->val.boolean = false; |
365 | 0 | } |
366 | 0 | break; |
367 | 0 | } |
368 | 0 | } |
369 | | |
370 | | static void logical_operation(struct sql_expression_val *left, |
371 | | struct sql_expression_val *right, |
372 | | struct sql_expression_val *result, int op) |
373 | 0 | { |
374 | 0 | bool lval; |
375 | 0 | bool rval; |
376 | |
|
377 | 0 | result->type = SQL_EXP_BOOL; |
378 | | |
379 | | /* Null is always interpreted as false in a logical operation */ |
380 | 0 | lval = left ? value_to_bool(left) : false; |
381 | 0 | rval = right ? value_to_bool(right) : false; |
382 | |
|
383 | 0 | switch (op) { |
384 | 0 | case SQL_EXP_NOT: |
385 | 0 | result->val.boolean = !lval; |
386 | 0 | break; |
387 | 0 | case SQL_EXP_AND: |
388 | 0 | result->val.boolean = lval & rval; |
389 | 0 | break; |
390 | 0 | case SQL_EXP_OR: |
391 | 0 | result->val.boolean = lval | rval; |
392 | 0 | break; |
393 | 0 | } |
394 | 0 | } |
395 | | |
396 | | static int sql_key_to_value(char *name, struct flb_mp_chunk_record *record, struct sql_expression_val *val) |
397 | 0 | { |
398 | |
|
399 | 0 | struct cfl_list *head; |
400 | 0 | struct cfl_list *tmp; |
401 | 0 | struct cfl_variant *var; |
402 | 0 | struct cfl_kvlist *kvlist; |
403 | 0 | struct cfl_kvpair *kvpair; |
404 | |
|
405 | 0 | kvlist = record->cobj_record->variant->data.as_kvlist; |
406 | |
|
407 | 0 | cfl_list_foreach_safe(head, tmp, &kvlist->list) { |
408 | 0 | kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); |
409 | |
|
410 | 0 | if (cfl_sds_len(kvpair->key) != cfl_sds_len(name)) { |
411 | 0 | var = NULL; |
412 | 0 | continue; |
413 | 0 | } |
414 | | |
415 | 0 | if (strcmp(kvpair->key, name) != 0) { |
416 | 0 | var = NULL; |
417 | 0 | continue; |
418 | 0 | } |
419 | | |
420 | 0 | var = kvpair->val; |
421 | 0 | break; |
422 | 0 | } |
423 | |
|
424 | 0 | if (!var) { |
425 | 0 | return -1; |
426 | 0 | } |
427 | | |
428 | 0 | if (var->type == CFL_VARIANT_STRING) { |
429 | 0 | val->type = SQL_EXP_STRING; |
430 | 0 | val->val.string = cfl_sds_create(kvpair->val->data.as_string); |
431 | 0 | } |
432 | 0 | else if (var->type == CFL_VARIANT_INT) { |
433 | 0 | val->type = SQL_EXP_INT; |
434 | 0 | val->val.i64 = kvpair->val->data.as_int64; |
435 | 0 | } |
436 | 0 | else if (var->type == CFL_VARIANT_UINT) { |
437 | | /* |
438 | | * Note on uint64 handling: our parsing rules in sql-parser.l handles the strings |
439 | | * that represents integers through an atol() conversion. If we get a case of a |
440 | | * long unsigned value, we can adjust it here by extending the sql_val union. |
441 | | * |
442 | | */ |
443 | 0 | val->type = SQL_EXP_INT; |
444 | 0 | val->val.i64 = kvpair->val->data.as_uint64; |
445 | 0 | } |
446 | 0 | else if (var->type == CFL_VARIANT_DOUBLE) { |
447 | 0 | val->type = SQL_EXP_FLOAT; |
448 | 0 | val->val.f64 = kvpair->val->data.as_double; |
449 | 0 | } |
450 | 0 | else if (var->type == CFL_VARIANT_BOOL) { |
451 | 0 | val->type = SQL_EXP_BOOL; |
452 | 0 | val->val.boolean = kvpair->val->data.as_bool; |
453 | 0 | } |
454 | 0 | else if (var->type == CFL_VARIANT_NULL) { |
455 | 0 | val->type = SQL_EXP_NULL; |
456 | 0 | val->val.boolean = 1; |
457 | 0 | } |
458 | 0 | else { |
459 | 0 | return -1; |
460 | 0 | } |
461 | | |
462 | 0 | return 0; |
463 | 0 | } |
464 | | |
465 | | static struct sql_expression_val *reduce_expression(struct sql_expression *expression, |
466 | | struct flb_mp_chunk_record *record) |
467 | | |
468 | 0 | { |
469 | 0 | int ret; |
470 | 0 | int operation; |
471 | 0 | flb_sds_t s; |
472 | 0 | struct sql_expression_key *key; |
473 | 0 | struct sql_expression_val *left; |
474 | 0 | struct sql_expression_val *right; |
475 | 0 | struct sql_expression_val *result; |
476 | |
|
477 | 0 | if (!expression) { |
478 | 0 | return NULL; |
479 | 0 | } |
480 | | |
481 | 0 | result = flb_calloc(1, sizeof(struct sql_expression_val)); |
482 | 0 | if (!result) { |
483 | 0 | flb_errno(); |
484 | 0 | return NULL; |
485 | 0 | } |
486 | | |
487 | 0 | switch (expression->type) { |
488 | 0 | case SQL_EXP_NULL: |
489 | 0 | result->type = expression->type; |
490 | 0 | break; |
491 | 0 | case SQL_EXP_BOOL: |
492 | 0 | result->type = expression->type; |
493 | 0 | result->val.boolean = ((struct sql_expression_val *) expression)->val.boolean; |
494 | 0 | break; |
495 | 0 | case SQL_EXP_INT: |
496 | 0 | result->type = expression->type; |
497 | 0 | result->val.i64 = ((struct sql_expression_val *) expression)->val.i64; |
498 | 0 | break; |
499 | 0 | case SQL_EXP_FLOAT: |
500 | 0 | result->type = expression->type; |
501 | 0 | result->val.f64 = ((struct sql_expression_val *) expression)->val.f64; |
502 | 0 | break; |
503 | 0 | case SQL_EXP_STRING: |
504 | 0 | s = ((struct sql_expression_val *) expression)->val.string; |
505 | 0 | result->type = expression->type; |
506 | 0 | result->val.string = cfl_sds_create(s); |
507 | 0 | break; |
508 | 0 | case SQL_EXP_KEY: |
509 | 0 | key = (struct sql_expression_key *) expression; |
510 | 0 | ret = sql_key_to_value(key->name, record, result); |
511 | 0 | if (ret == 0) { |
512 | 0 | return result; |
513 | 0 | } |
514 | 0 | else { |
515 | 0 | flb_free(result); |
516 | 0 | return NULL; |
517 | 0 | } |
518 | 0 | break; |
519 | | /* Functions are to be defined |
520 | | case SQL_EXP_FUNC: |
521 | | we don't need result |
522 | | flb_free(result); |
523 | | ret = reduce_expression(((struct sql_func *) expression)->param, |
524 | | tag, tag_len, tms, map); |
525 | | result = ((struct SQL_EXP_func *) expression)->cb_func(tag, tag_len, |
526 | | tms, ret); |
527 | | sql_expression_val_free(ret); |
528 | | break; |
529 | | */ |
530 | 0 | case SQL_LOGICAL_OP: |
531 | 0 | left = reduce_expression(expression->left, record); |
532 | 0 | right = reduce_expression(expression->right, record); |
533 | |
|
534 | 0 | operation = ((struct sql_expression_op *) expression)->operation; |
535 | |
|
536 | 0 | switch (operation) { |
537 | 0 | case SQL_EXP_PAR: |
538 | 0 | if (left == NULL) { /* Null is always interpreted as false in a |
539 | | logical operation */ |
540 | 0 | result->type = SQL_EXP_BOOL; |
541 | 0 | result->val.boolean = false; |
542 | 0 | } |
543 | 0 | else { /* Left and right sides of a logical operation reduce to |
544 | | boolean values */ |
545 | 0 | result->type = SQL_EXP_BOOL; |
546 | 0 | result->val.boolean = left->val.boolean; |
547 | 0 | } |
548 | 0 | break; |
549 | 0 | case SQL_EXP_EQ: |
550 | 0 | case SQL_EXP_LT: |
551 | 0 | case SQL_EXP_LTE: |
552 | 0 | case SQL_EXP_GT: |
553 | 0 | case SQL_EXP_GTE: |
554 | 0 | numerical_comp(left, right, result, operation); |
555 | 0 | break; |
556 | 0 | case SQL_EXP_NOT: |
557 | 0 | case SQL_EXP_AND: |
558 | 0 | case SQL_EXP_OR: |
559 | 0 | logical_operation(left, right, result, operation); |
560 | 0 | break; |
561 | 0 | } |
562 | 0 | sql_expression_val_free(left); |
563 | 0 | sql_expression_val_free(right); |
564 | 0 | break; |
565 | 0 | default: |
566 | 0 | break; |
567 | 0 | } |
568 | | |
569 | 0 | return result; |
570 | 0 | } |
571 | | |
572 | | |
573 | | static int process_record(struct sql_ctx *ctx, struct sql_query *query, struct flb_mp_chunk_record *chunk_record) |
574 | 0 | { |
575 | 0 | int found = FLB_FALSE; |
576 | 0 | struct sql_key *key; |
577 | 0 | struct cfl_list *tmp; |
578 | 0 | struct cfl_list *tmp_var; |
579 | 0 | struct cfl_list *head; |
580 | 0 | struct cfl_kvlist *kvlist; |
581 | 0 | struct cfl_kvpair *kvpair; |
582 | 0 | struct cfl_list *head_var; |
583 | 0 | struct sql_expression_val *condition; |
584 | | |
585 | | |
586 | | /* check if the query contains a conditional statement */ |
587 | 0 | if (query->condition) { |
588 | 0 | condition = reduce_expression(query->condition, chunk_record); |
589 | 0 | if (!condition) { |
590 | 0 | return 0; |
591 | 0 | } |
592 | 0 | else if (!condition->val.boolean) { |
593 | 0 | flb_free(condition); |
594 | 0 | return -1; |
595 | 0 | } |
596 | 0 | else { |
597 | 0 | flb_free(condition); |
598 | 0 | } |
599 | |
|
600 | 0 | } |
601 | | |
602 | | /* |
603 | | * iterate all the record keys and see if they are listed in the selected keys, |
604 | | * otherwise check if a wildcard has been used so all the keys will match. |
605 | | * |
606 | | * if no matches exists, just remove the record. |
607 | | */ |
608 | 0 | kvlist = chunk_record->cobj_record->variant->data.as_kvlist; |
609 | | |
610 | |
|
611 | 0 | cfl_list_foreach_safe(head, tmp, &kvlist->list) { |
612 | 0 | kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); |
613 | |
|
614 | 0 | found = FLB_FALSE; |
615 | | |
616 | | /* check if we have a wildcard */ |
617 | 0 | if (cfl_list_size(&query->keys) > 0) { |
618 | 0 | key = cfl_list_entry_first(&query->keys, struct sql_key, _head); |
619 | 0 | if (key->name == NULL) { |
620 | 0 | found = FLB_TRUE; |
621 | 0 | } |
622 | 0 | } |
623 | |
|
624 | 0 | if (found == FLB_FALSE) { |
625 | 0 | cfl_list_foreach_safe(head_var, tmp_var, &ctx->query->keys) { |
626 | 0 | key = cfl_list_entry(head_var, struct sql_key, _head); |
627 | |
|
628 | 0 | if (cfl_sds_len(kvpair->key) != cfl_sds_len(key->name)) { |
629 | 0 | continue; |
630 | 0 | } |
631 | | |
632 | 0 | if (strcmp(kvpair->key, key->name) == 0) { |
633 | 0 | found = FLB_TRUE; |
634 | 0 | break; |
635 | 0 | } |
636 | 0 | } |
637 | 0 | } |
638 | |
|
639 | 0 | if (!found) { |
640 | 0 | cfl_kvpair_destroy(kvpair); |
641 | 0 | } |
642 | 0 | else { |
643 | | /* we keep the key in the list, check if it needs an alias */ |
644 | 0 | if (key->alias) { |
645 | 0 | cfl_sds_destroy(kvpair->key); |
646 | 0 | kvpair->key = cfl_sds_create(key->alias); |
647 | 0 | } |
648 | 0 | } |
649 | 0 | } |
650 | |
|
651 | 0 | return 0; |
652 | 0 | } |
653 | | |
654 | | /* Logs callback */ |
655 | | static int cb_process_logs(struct flb_processor_instance *ins, |
656 | | void *chunk_data, |
657 | | const char *tag, |
658 | | int tag_len) |
659 | 0 | { |
660 | 0 | int ret; |
661 | 0 | struct sql_ctx *ctx; |
662 | 0 | struct flb_mp_chunk_cobj *chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data; |
663 | 0 | struct flb_mp_chunk_record *record; |
664 | 0 | ctx = ins->context; |
665 | | |
666 | | /* Iterate records */ |
667 | 0 | while (flb_mp_chunk_cobj_record_next(chunk_cobj, &record) == FLB_MP_CHUNK_RECORD_OK) { |
668 | 0 | ret = process_record(ctx, ctx->query, record); |
669 | 0 | if (ret == -1) { |
670 | | /* remove the record from the chunk */ |
671 | 0 | flb_mp_chunk_cobj_record_destroy(chunk_cobj, record); |
672 | 0 | } |
673 | 0 | } |
674 | |
|
675 | 0 | return FLB_PROCESSOR_SUCCESS; |
676 | 0 | } |
677 | | |
678 | | |
679 | | static struct flb_config_map config_map[] = { |
680 | | { |
681 | | FLB_CONFIG_MAP_STR, "query", NULL, |
682 | | 0, FLB_TRUE, offsetof(struct sql_ctx, query_str), |
683 | | "SQL query for data selection." |
684 | | }, |
685 | | |
686 | | /* EOF */ |
687 | | {0} |
688 | | }; |
689 | | |
690 | | struct flb_processor_plugin processor_sql_plugin = { |
691 | | .name = "sql", |
692 | | .description = "SQL processor", |
693 | | .cb_init = cb_init, |
694 | | .cb_process_logs = cb_process_logs, |
695 | | .cb_process_metrics = NULL, |
696 | | .cb_process_traces = NULL, |
697 | | .cb_exit = cb_exit, |
698 | | .config_map = config_map, |
699 | | .flags = 0 |
700 | | }; |
701 | | |