/src/fluent-bit/src/flb_mp.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2022 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_info.h> |
21 | | #include <fluent-bit/flb_utils.h> |
22 | | #include <fluent-bit/flb_mem.h> |
23 | | #include <fluent-bit/flb_log.h> |
24 | | #include <fluent-bit/flb_mp.h> |
25 | | #include <fluent-bit/flb_slist.h> |
26 | | #include <fluent-bit/flb_record_accessor.h> |
27 | | #include <fluent-bit/flb_metrics.h> |
28 | | |
29 | | #include <msgpack.h> |
30 | | #include <mpack/mpack.h> |
31 | | |
32 | | /* don't do this at home */ |
33 | 0 | #define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d) |
34 | 0 | #define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d) |
35 | | |
36 | | /* Return the number of msgpack serialized events in the buffer */ |
37 | | int flb_mp_count(const void *data, size_t bytes) |
38 | 0 | { |
39 | 0 | return flb_mp_count_remaining(data, bytes, NULL); |
40 | 0 | } |
41 | | |
42 | | int flb_mp_count_remaining(const void *data, size_t bytes, size_t *remaining_bytes) |
43 | 0 | { |
44 | 0 | size_t remaining; |
45 | 0 | int count = 0; |
46 | 0 | mpack_reader_t reader; |
47 | |
|
48 | 0 | mpack_reader_init_data(&reader, (const char *) data, bytes); |
49 | 0 | for (;;) { |
50 | 0 | remaining = mpack_reader_remaining(&reader, NULL); |
51 | 0 | if (!remaining) { |
52 | 0 | break; |
53 | 0 | } |
54 | 0 | mpack_discard(&reader); |
55 | 0 | if (mpack_reader_error(&reader)) { |
56 | 0 | break; |
57 | 0 | } |
58 | 0 | count++; |
59 | 0 | } |
60 | |
|
61 | 0 | if (remaining_bytes) { |
62 | 0 | *remaining_bytes = remaining; |
63 | 0 | } |
64 | 0 | mpack_reader_destroy(&reader); |
65 | 0 | return count; |
66 | 0 | } |
67 | | |
68 | | int flb_mp_validate_metric_chunk(const void *data, size_t bytes, |
69 | | int *out_series, size_t *processed_bytes) |
70 | 0 | { |
71 | 0 | int ret; |
72 | 0 | int ok = CMT_DECODE_MSGPACK_SUCCESS; |
73 | 0 | int count = 0; |
74 | 0 | size_t off = 0; |
75 | 0 | size_t pre_off = 0; |
76 | 0 | struct cmt *cmt; |
77 | |
|
78 | 0 | while ((ret = cmt_decode_msgpack_create(&cmt, |
79 | 0 | (char *) data, bytes, &off)) == ok) { |
80 | 0 | cmt_destroy(cmt); |
81 | 0 | count++; |
82 | 0 | pre_off = off; |
83 | 0 | } |
84 | |
|
85 | 0 | switch (ret) { |
86 | 0 | case CMT_DECODE_MSGPACK_INVALID_ARGUMENT_ERROR: |
87 | 0 | case CMT_DECODE_MSGPACK_CORRUPT_INPUT_DATA_ERROR: |
88 | 0 | case CMT_DECODE_MSGPACK_CONSUME_ERROR: |
89 | 0 | case CMT_DECODE_MSGPACK_ENGINE_ERROR: |
90 | 0 | case CMT_DECODE_MSGPACK_PENDING_MAP_ENTRIES: |
91 | 0 | case CMT_DECODE_MSGPACK_PENDING_ARRAY_ENTRIES: |
92 | 0 | case CMT_DECODE_MSGPACK_UNEXPECTED_KEY_ERROR: |
93 | 0 | case CMT_DECODE_MSGPACK_UNEXPECTED_DATA_TYPE_ERROR: |
94 | 0 | case CMT_DECODE_MSGPACK_DICTIONARY_LOOKUP_ERROR: |
95 | 0 | case CMT_DECODE_MSGPACK_VERSION_ERROR: |
96 | 0 | goto error; |
97 | 0 | } |
98 | | |
99 | 0 | if (ret == CMT_DECODE_MSGPACK_INSUFFICIENT_DATA && off == bytes) { |
100 | 0 | *out_series = count; |
101 | 0 | *processed_bytes = pre_off; |
102 | 0 | return 0; |
103 | 0 | } |
104 | | |
105 | 0 | error: |
106 | 0 | *out_series = count; |
107 | 0 | *processed_bytes = pre_off; |
108 | |
|
109 | 0 | return -1; |
110 | 0 | } |
111 | | |
112 | | int flb_mp_validate_log_chunk(const void *data, size_t bytes, |
113 | | int *out_records, size_t *processed_bytes) |
114 | 0 | { |
115 | 0 | int ret; |
116 | 0 | int count = 0; |
117 | 0 | size_t off = 0; |
118 | 0 | size_t pre_off = 0; |
119 | 0 | size_t ptr_size; |
120 | 0 | unsigned char *ptr; |
121 | 0 | msgpack_object array; |
122 | 0 | msgpack_object ts; |
123 | 0 | msgpack_object record; |
124 | 0 | msgpack_unpacked result; |
125 | |
|
126 | 0 | msgpack_unpacked_init(&result); |
127 | 0 | while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) { |
128 | 0 | array = result.data; |
129 | |
|
130 | 0 | if (array.type != MSGPACK_OBJECT_ARRAY) { |
131 | | /* |
132 | | * Sometimes there is a special case: Chunks might have extra zero |
133 | | * bytes at the end of a record, meaning: no more records. This is not |
134 | | * an error and actually it happens if a previous run of Fluent Bit |
135 | | * was stopped/killed before to adjust the file size. |
136 | | * |
137 | | * Just validate if all bytes are zero, if so, adjust counters |
138 | | * and return zero. |
139 | | */ |
140 | 0 | ptr = (unsigned char *) (data); |
141 | 0 | ptr += pre_off; |
142 | 0 | if (ptr[0] != 0) { |
143 | 0 | goto error; |
144 | 0 | } |
145 | | |
146 | 0 | ptr_size = bytes - pre_off; |
147 | 0 | ret = memcmp(ptr, ptr + 1, ptr_size - 1); |
148 | 0 | if (ret == 0) { |
149 | | /* |
150 | | * The chunk is valid, just let the caller know the last processed |
151 | | * valid byte. |
152 | | */ |
153 | 0 | msgpack_unpacked_destroy(&result); |
154 | 0 | *out_records = count; |
155 | 0 | *processed_bytes = pre_off; |
156 | 0 | return 0; |
157 | 0 | } |
158 | 0 | goto error; |
159 | 0 | } |
160 | | |
161 | 0 | if (array.via.array.size != 2) { |
162 | 0 | goto error; |
163 | 0 | } |
164 | | |
165 | 0 | ts = array.via.array.ptr[0]; |
166 | 0 | record = array.via.array.ptr[1]; |
167 | |
|
168 | 0 | if (ts.type != MSGPACK_OBJECT_POSITIVE_INTEGER && |
169 | 0 | ts.type != MSGPACK_OBJECT_FLOAT && |
170 | 0 | ts.type != MSGPACK_OBJECT_EXT) { |
171 | 0 | goto error; |
172 | 0 | } |
173 | | |
174 | 0 | if (record.type != MSGPACK_OBJECT_MAP) { |
175 | 0 | goto error; |
176 | 0 | } |
177 | | |
178 | 0 | count++; |
179 | 0 | pre_off = off; |
180 | 0 | } |
181 | | |
182 | 0 | msgpack_unpacked_destroy(&result); |
183 | 0 | *out_records = count; |
184 | 0 | *processed_bytes = pre_off; |
185 | 0 | return 0; |
186 | | |
187 | 0 | error: |
188 | 0 | msgpack_unpacked_destroy(&result); |
189 | 0 | *out_records = count; |
190 | 0 | *processed_bytes = pre_off; |
191 | |
|
192 | 0 | return -1; |
193 | 0 | } |
194 | | |
195 | | /* Adjust a mspack header buffer size */ |
196 | | void flb_mp_set_map_header_size(char *buf, int size) |
197 | 0 | { |
198 | 0 | uint8_t h; |
199 | 0 | char *tmp = buf; |
200 | |
|
201 | 0 | h = tmp[0]; |
202 | 0 | if (h >> 4 == 0x8) { /* 1000xxxx */ |
203 | 0 | *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) size); |
204 | 0 | } |
205 | 0 | else if (h == 0xde) { |
206 | 0 | tmp++; |
207 | 0 | pack_uint16(tmp, size); |
208 | 0 | } |
209 | 0 | else if (h == 0xdf) { |
210 | 0 | tmp++; |
211 | 0 | pack_uint32(tmp, size); |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | | void flb_mp_set_array_header_size(char *buf, int size) |
216 | 0 | { |
217 | 0 | uint8_t h; |
218 | 0 | char *tmp = buf; |
219 | |
|
220 | 0 | h = tmp[0]; |
221 | 0 | if (h >> 4 == 0x9) { /* 1001xxxx */ |
222 | 0 | *tmp = (uint8_t) 0x9 << 4 | ((uint8_t) size); |
223 | 0 | } |
224 | 0 | else if (h == 0xdc) { |
225 | 0 | tmp++; |
226 | 0 | pack_uint16(tmp, size); |
227 | 0 | } |
228 | 0 | else if (h == 0xdd) { |
229 | 0 | tmp++; |
230 | 0 | pack_uint32(tmp, size); |
231 | 0 | } |
232 | 0 | } |
233 | | |
234 | | /* |
235 | | * msgpack-c requires to set the number of the entries in a map beforehand. For our |
236 | | * use case this adds some complexity, having developers to count all possible |
237 | | * entries that might be added. |
238 | | * |
239 | | * As a workaround and to avoid map's recomposition over and over, this simple API |
240 | | * allows to initialize the array header, 'register' new entries (as counters) and |
241 | | * finalize, upon finalization the proper array header size is adjusted. |
242 | | * |
243 | | * To make things easier, we make sure msgpack-c always register an array type of |
244 | | * 32 bits (identified by 0xdf, for number of entries >= 65536). Yes, for every |
245 | | * array using this API it will use 2 more bytes, not a big ideal. So whoever |
246 | | * uses this API, use it only if you don't know the exact number of entries to add. |
247 | | * |
248 | | * MANDATORY: make sure to always initialize, register every entry and finalize, |
249 | | * otherwise you will get a corrupted or incomplete msgpack buffer. |
250 | | * |
251 | | * Usage example |
252 | | * ============= |
253 | | * |
254 | | * struct flb_mp_map_header mh; |
255 | | * |
256 | | * flb_mp_map_header_init(&mh, mp_pck); |
257 | | * |
258 | | * -- First key/value entry -- |
259 | | * flb_mp_map_header_append(&mh); |
260 | | * msgpack_pack_str(mp_pck, 4); |
261 | | * msgpack_pack_str_body(mp_pck, "cool", 4); |
262 | | * msgpack_pack_true(mp_pck); |
263 | | * |
264 | | * -- Second key/value entry -- |
265 | | * flb_mp_map_header_append(&mh); |
266 | | * msgpack_pack_str(mp_pck, 4); |
267 | | * msgpack_pack_str_body(mp_pck, "slow", 4); |
268 | | * msgpack_pack_false(mp_pck); |
269 | | * |
270 | | * -- Finalize Map -- |
271 | | * flb_mp_map_header_end(&mh); |
272 | | */ |
273 | | |
274 | | static inline void mp_header_type_init(struct flb_mp_map_header *mh, |
275 | | msgpack_packer *mp_pck, |
276 | | int type) |
277 | 0 | { |
278 | 0 | msgpack_sbuffer *mp_sbuf; |
279 | |
|
280 | 0 | mp_sbuf = (msgpack_sbuffer *) mp_pck->data; |
281 | | |
282 | | /* map sbuffer */ |
283 | 0 | mh->data = mp_pck->data; |
284 | | |
285 | | /* Reset entries */ |
286 | 0 | mh->entries = 0; |
287 | | |
288 | | /* Store the next byte available */ |
289 | 0 | mh->offset = mp_sbuf->size; |
290 | 0 | } |
291 | | |
292 | | int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck) |
293 | 0 | { |
294 | | /* Initialize context for a map */ |
295 | 0 | mp_header_type_init(mh, mp_pck, FLB_MP_MAP); |
296 | | |
297 | | /* |
298 | | * Pack a map with size = 65536, so we force the underlaying msgpack-c |
299 | | * to use a 32 bit buffer size (0xdf), reference: |
300 | | * |
301 | | * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family |
302 | | */ |
303 | 0 | return msgpack_pack_map(mp_pck, 65536); |
304 | 0 | } |
305 | | |
306 | | int flb_mp_array_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck) |
307 | 0 | { |
308 | | /* Initialize context for a map */ |
309 | 0 | mp_header_type_init(mh, mp_pck, FLB_MP_ARRAY); |
310 | | |
311 | | /* |
312 | | * Pack a map with size = 65536, so we force the underlaying msgpack-c |
313 | | * to use a 32 bit buffer size (0xdf), reference: |
314 | | * |
315 | | * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family |
316 | | */ |
317 | 0 | return msgpack_pack_array(mp_pck, 65536); |
318 | 0 | } |
319 | | |
320 | | |
321 | | int flb_mp_map_header_append(struct flb_mp_map_header *mh) |
322 | 0 | { |
323 | 0 | mh->entries++; |
324 | 0 | return mh->entries; |
325 | 0 | } |
326 | | |
327 | | int flb_mp_array_header_append(struct flb_mp_map_header *mh) |
328 | 0 | { |
329 | 0 | mh->entries++; |
330 | 0 | return mh->entries; |
331 | 0 | } |
332 | | |
333 | | void flb_mp_map_header_end(struct flb_mp_map_header *mh) |
334 | 0 | { |
335 | 0 | char *ptr; |
336 | 0 | msgpack_sbuffer *mp_sbuf; |
337 | |
|
338 | 0 | mp_sbuf = mh->data; |
339 | 0 | ptr = (char *) mp_sbuf->data + mh->offset; |
340 | 0 | flb_mp_set_map_header_size(ptr, mh->entries); |
341 | 0 | } |
342 | | |
343 | | void flb_mp_array_header_end(struct flb_mp_map_header *mh) |
344 | 0 | { |
345 | 0 | char *ptr; |
346 | 0 | msgpack_sbuffer *mp_sbuf; |
347 | |
|
348 | 0 | mp_sbuf = mh->data; |
349 | 0 | ptr = (char *) mp_sbuf->data + mh->offset; |
350 | 0 | flb_mp_set_array_header_size(ptr, mh->entries); |
351 | 0 | } |
352 | | |
353 | | /* |
354 | | * Create an 'mp accessor' context: this context allows to create a list of |
355 | | * record accessor patterns based on a 'slist' context, where every slist string |
356 | | * buffer represents a key accessor. |
357 | | */ |
358 | | struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns) |
359 | 0 | { |
360 | 0 | size_t size; |
361 | 0 | struct mk_list *head; |
362 | 0 | struct flb_slist_entry *entry; |
363 | 0 | struct flb_record_accessor *ra; |
364 | 0 | struct flb_mp_accessor *mpa; |
365 | | |
366 | | /* Allocate context */ |
367 | 0 | mpa = flb_calloc(1, sizeof(struct flb_mp_accessor)); |
368 | 0 | if (!mpa) { |
369 | 0 | flb_errno(); |
370 | 0 | return NULL; |
371 | 0 | } |
372 | 0 | mk_list_init(&mpa->ra_list); |
373 | |
|
374 | 0 | mk_list_foreach(head, slist_patterns) { |
375 | 0 | entry = mk_list_entry(head, struct flb_slist_entry, _head); |
376 | | |
377 | | /* Create the record accessor context */ |
378 | 0 | ra = flb_ra_create(entry->str, FLB_TRUE); |
379 | 0 | if (!ra) { |
380 | 0 | flb_error("[mp accessor] could not create entry for pattern '%s'", |
381 | 0 | entry->str); |
382 | 0 | flb_mp_accessor_destroy(mpa); |
383 | 0 | return NULL; |
384 | 0 | } |
385 | 0 | mk_list_add(&ra->_head, &mpa->ra_list); |
386 | 0 | } |
387 | | |
388 | 0 | if (mk_list_size(&mpa->ra_list) == 0) { |
389 | 0 | return mpa; |
390 | 0 | } |
391 | | |
392 | 0 | size = sizeof(struct flb_mp_accessor_match) * mk_list_size(&mpa->ra_list); |
393 | 0 | mpa->matches_size = size; |
394 | 0 | mpa->matches = flb_calloc(1, size); |
395 | 0 | if (!mpa->matches) { |
396 | 0 | flb_errno(); |
397 | 0 | flb_mp_accessor_destroy(mpa); |
398 | 0 | return NULL; |
399 | 0 | } |
400 | | |
401 | 0 | return mpa; |
402 | 0 | } |
403 | | |
404 | | static inline int accessor_key_find_match(struct flb_mp_accessor *mpa, |
405 | | msgpack_object *key) |
406 | 0 | { |
407 | 0 | int i; |
408 | 0 | int count; |
409 | 0 | struct flb_mp_accessor_match *match; |
410 | |
|
411 | 0 | count = mk_list_size(&mpa->ra_list); |
412 | 0 | for (i = 0; i < count; i++) { |
413 | 0 | match = &mpa->matches[i]; |
414 | 0 | if (match->matched == FLB_FALSE) { |
415 | 0 | continue; |
416 | 0 | } |
417 | | |
418 | 0 | if (match->start_key == key) { |
419 | 0 | return i; |
420 | 0 | } |
421 | 0 | } |
422 | | |
423 | 0 | return -1; |
424 | 0 | } |
425 | | |
426 | | static inline int accessor_sub_pack(struct flb_mp_accessor_match *match, |
427 | | msgpack_packer *mp_pck, |
428 | | msgpack_object *key, |
429 | | msgpack_object *val) |
430 | 0 | { |
431 | 0 | int i; |
432 | 0 | int ret; |
433 | 0 | msgpack_object *k; |
434 | 0 | msgpack_object *v; |
435 | 0 | struct flb_mp_map_header mh; |
436 | |
|
437 | 0 | if (match->key == key || match->key == val) { |
438 | 0 | return FLB_FALSE; |
439 | 0 | } |
440 | | |
441 | 0 | if (key) { |
442 | 0 | msgpack_pack_object(mp_pck, *key); |
443 | 0 | } |
444 | |
|
445 | 0 | if (val->type == MSGPACK_OBJECT_MAP) { |
446 | 0 | flb_mp_map_header_init(&mh, mp_pck); |
447 | 0 | for (i = 0; i < val->via.map.size; i++) { |
448 | 0 | k = &val->via.map.ptr[i].key; |
449 | 0 | v = &val->via.map.ptr[i].val; |
450 | |
|
451 | 0 | ret = accessor_sub_pack(match, mp_pck, k, v); |
452 | 0 | if (ret == FLB_TRUE) { |
453 | 0 | flb_mp_map_header_append(&mh); |
454 | 0 | } |
455 | 0 | } |
456 | 0 | flb_mp_map_header_end(&mh); |
457 | 0 | } |
458 | 0 | else if (val->type == MSGPACK_OBJECT_ARRAY) { |
459 | 0 | flb_mp_array_header_init(&mh, mp_pck); |
460 | 0 | for (i = 0; i < val->via.array.size; i++) { |
461 | 0 | v = &val->via.array.ptr[i]; |
462 | 0 | ret = accessor_sub_pack(match, mp_pck, NULL, v); |
463 | 0 | if (ret == FLB_TRUE) { |
464 | 0 | flb_mp_array_header_append(&mh); |
465 | 0 | } |
466 | 0 | } |
467 | 0 | flb_mp_array_header_end(&mh); |
468 | 0 | } |
469 | 0 | else { |
470 | 0 | msgpack_pack_object(mp_pck, *val); |
471 | 0 | } |
472 | |
|
473 | 0 | return FLB_TRUE; |
474 | 0 | } |
475 | | |
476 | | /* |
477 | | * Remove keys or nested keys from a map. It compose the final result in a |
478 | | * new buffer. On error, it returns -1, if the map was modified it returns FLB_TRUE, |
479 | | * if no modification was required it returns FLB_FALSE. |
480 | | */ |
481 | | int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa, |
482 | | msgpack_object *map, |
483 | | void **out_buf, size_t *out_size) |
484 | 0 | { |
485 | 0 | int i; |
486 | 0 | int ret; |
487 | 0 | int rule_id = 0; |
488 | 0 | int matches = 0; |
489 | 0 | msgpack_object *key; |
490 | 0 | msgpack_object *val; |
491 | 0 | msgpack_object *s_key; |
492 | 0 | msgpack_object *o_key; |
493 | 0 | msgpack_object *o_val; |
494 | 0 | struct mk_list *head; |
495 | 0 | struct flb_record_accessor *ra; |
496 | 0 | struct flb_mp_accessor_match *match; |
497 | 0 | struct flb_mp_map_header mh; |
498 | 0 | msgpack_sbuffer mp_sbuf; |
499 | 0 | msgpack_packer mp_pck; |
500 | |
|
501 | 0 | if (map->via.map.size == 0) { |
502 | 0 | return FLB_FALSE; |
503 | 0 | } |
504 | | |
505 | | /* Reset matches cache */ |
506 | 0 | memset(mpa->matches, '\0', mpa->matches_size); |
507 | |
|
508 | 0 | mk_list_foreach(head, &mpa->ra_list) { |
509 | 0 | ra = mk_list_entry(head, struct flb_record_accessor, _head); |
510 | | |
511 | | /* Apply the record accessor rule against the map */ |
512 | 0 | ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val); |
513 | 0 | if (ret == 0) { |
514 | | /* There is a match, register in the matches table */ |
515 | 0 | match = &mpa->matches[rule_id]; |
516 | 0 | match->matched = FLB_TRUE; |
517 | 0 | match->start_key = s_key; /* Initial key path that matched */ |
518 | 0 | match->key = o_key; /* Final key that matched */ |
519 | 0 | match->val = o_val; /* Final value */ |
520 | 0 | match->ra = ra; /* Record accessor context */ |
521 | 0 | matches++; |
522 | 0 | } |
523 | 0 | rule_id++; |
524 | 0 | } |
525 | | |
526 | | /* If no matches, no modifications were made */ |
527 | 0 | if (matches == 0) { |
528 | 0 | return FLB_FALSE; |
529 | 0 | } |
530 | | |
531 | | /* Some rules matched, compose a new outgoing buffer */ |
532 | 0 | msgpack_sbuffer_init(&mp_sbuf); |
533 | 0 | msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); |
534 | | |
535 | | /* Initialize map */ |
536 | 0 | flb_mp_map_header_init(&mh, &mp_pck); |
537 | |
|
538 | 0 | for (i = 0; i < map->via.map.size; i++) { |
539 | 0 | key = &map->via.map.ptr[i].key; |
540 | 0 | val = &map->via.map.ptr[i].val; |
541 | | |
542 | | /* |
543 | | * For every entry on the path, check if we should do a step-by-step |
544 | | * repackaging or just pack the whole object. |
545 | | * |
546 | | * Just check: does this 'key' exists on any path of the record |
547 | | * accessor patterns ? |
548 | | * |
549 | | * Find if the active key in the map, matches an accessor rule, if |
550 | | * if match we get the match id as return value, otherwise -1. |
551 | | */ |
552 | 0 | ret = accessor_key_find_match(mpa, key); |
553 | 0 | if (ret == -1) { |
554 | | /* No matches, it's ok to pack the kv pair */ |
555 | 0 | flb_mp_map_header_append(&mh); |
556 | 0 | msgpack_pack_object(&mp_pck, *key); |
557 | 0 | msgpack_pack_object(&mp_pck, *val); |
558 | 0 | } |
559 | 0 | else { |
560 | | /* The key has a match. Now we do a step-by-step packaging */ |
561 | 0 | match = &mpa->matches[ret]; |
562 | 0 | ret = accessor_sub_pack(match, &mp_pck, key, val); |
563 | 0 | if (ret == FLB_TRUE) { |
564 | 0 | flb_mp_map_header_append(&mh); |
565 | 0 | } |
566 | 0 | } |
567 | 0 | } |
568 | 0 | flb_mp_map_header_end(&mh); |
569 | |
|
570 | 0 | *out_buf = mp_sbuf.data; |
571 | 0 | *out_size = mp_sbuf.size; |
572 | |
|
573 | 0 | return FLB_TRUE; |
574 | 0 | } |
575 | | |
576 | | void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa) |
577 | 0 | { |
578 | 0 | struct mk_list *tmp; |
579 | 0 | struct mk_list *head; |
580 | 0 | struct flb_record_accessor *ra; |
581 | |
|
582 | 0 | if (!mpa) { |
583 | 0 | return; |
584 | 0 | } |
585 | | |
586 | 0 | mk_list_foreach_safe(head, tmp, &mpa->ra_list) { |
587 | 0 | ra = mk_list_entry(head, struct flb_record_accessor, _head); |
588 | 0 | mk_list_del(&ra->_head); |
589 | 0 | flb_ra_destroy(ra); |
590 | 0 | } |
591 | |
|
592 | 0 | if (mpa->matches) { |
593 | 0 | flb_free(mpa->matches); |
594 | 0 | } |
595 | 0 | flb_free(mpa); |
596 | 0 | } |