/src/fluent-bit/plugins/in_opentelemetry/opentelemetry_logs.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2026 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_input_plugin.h> |
21 | | #include <fluent-bit/flb_sds.h> |
22 | | #include <fluent-bit/flb_pack.h> |
23 | | #include <fluent-bit/flb_log_event_encoder.h> |
24 | | #include <fluent-bit/flb_time.h> |
25 | | #include <fluent-bit/flb_opentelemetry.h> |
26 | | #include <fluent-otel-proto/fluent-otel.h> |
27 | | |
28 | | |
29 | | #include "opentelemetry.h" |
30 | | #include "opentelemetry_utils.h" |
31 | | |
32 | | /* |
33 | | * OTLP encoding functions to pack the log records as msgpack |
34 | | * ---------------------------------------------------------- |
35 | | */ |
36 | | static int otlp_pack_any_value(msgpack_packer *mp_pck, Opentelemetry__Proto__Common__V1__AnyValue *body); |
37 | | |
38 | | static int otel_pack_string(msgpack_packer *mp_pck, char *str) |
39 | 0 | { |
40 | 0 | return msgpack_pack_str_with_body(mp_pck, str, strlen(str)); |
41 | 0 | } |
42 | | |
43 | | static int otel_pack_bool(msgpack_packer *mp_pck, bool val) |
44 | 0 | { |
45 | 0 | if (val) { |
46 | 0 | return msgpack_pack_true(mp_pck); |
47 | 0 | } |
48 | 0 | else { |
49 | 0 | return msgpack_pack_false(mp_pck); |
50 | 0 | } |
51 | 0 | } |
52 | | |
53 | | static int otel_pack_int(msgpack_packer *mp_pck, int val) |
54 | 0 | { |
55 | 0 | return msgpack_pack_int64(mp_pck, val); |
56 | 0 | } |
57 | | |
58 | | static int otel_pack_double(msgpack_packer *mp_pck, double val) |
59 | 0 | { |
60 | 0 | return msgpack_pack_double(mp_pck, val); |
61 | 0 | } |
62 | | |
63 | | static int otel_pack_kvarray(msgpack_packer *mp_pck, |
64 | | Opentelemetry__Proto__Common__V1__KeyValue **kv_array, |
65 | | size_t kv_count) |
66 | 0 | { |
67 | 0 | int result; |
68 | 0 | int index; |
69 | |
|
70 | 0 | result = msgpack_pack_map(mp_pck, kv_count); |
71 | |
|
72 | 0 | if (result != 0) { |
73 | 0 | return result; |
74 | 0 | } |
75 | | |
76 | 0 | for (index = 0; index < kv_count && result == 0; index++) { |
77 | 0 | result = otel_pack_string(mp_pck, kv_array[index]->key); |
78 | |
|
79 | 0 | if(result == 0) { |
80 | 0 | result = otlp_pack_any_value(mp_pck, kv_array[index]->value); |
81 | 0 | } |
82 | 0 | } |
83 | |
|
84 | 0 | return result; |
85 | 0 | } |
86 | | |
87 | | static int otel_pack_kvlist(msgpack_packer *mp_pck, |
88 | | Opentelemetry__Proto__Common__V1__KeyValueList *kv_list) |
89 | 0 | { |
90 | 0 | int kv_index; |
91 | 0 | int ret; |
92 | 0 | char *key; |
93 | 0 | Opentelemetry__Proto__Common__V1__AnyValue *value; |
94 | |
|
95 | 0 | ret = msgpack_pack_map(mp_pck, kv_list->n_values); |
96 | 0 | if (ret != 0) { |
97 | 0 | return ret; |
98 | 0 | } |
99 | | |
100 | 0 | for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) { |
101 | 0 | key = kv_list->values[kv_index]->key; |
102 | 0 | value = kv_list->values[kv_index]->value; |
103 | |
|
104 | 0 | ret = otel_pack_string(mp_pck, key); |
105 | |
|
106 | 0 | if(ret == 0) { |
107 | 0 | ret = otlp_pack_any_value(mp_pck, value); |
108 | 0 | } |
109 | 0 | } |
110 | |
|
111 | 0 | return ret; |
112 | 0 | } |
113 | | |
114 | | static int otel_pack_array(msgpack_packer *mp_pck, |
115 | | Opentelemetry__Proto__Common__V1__ArrayValue *array) |
116 | 0 | { |
117 | 0 | int ret; |
118 | 0 | int array_index; |
119 | |
|
120 | 0 | ret = msgpack_pack_array(mp_pck, array->n_values); |
121 | |
|
122 | 0 | if (ret != 0) { |
123 | 0 | return ret; |
124 | 0 | } |
125 | | |
126 | 0 | for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) { |
127 | 0 | ret = otlp_pack_any_value(mp_pck, array->values[array_index]); |
128 | 0 | } |
129 | |
|
130 | 0 | return ret; |
131 | 0 | } |
132 | | |
133 | | static int otel_pack_bytes(msgpack_packer *mp_pck, |
134 | | ProtobufCBinaryData bytes) |
135 | 0 | { |
136 | 0 | return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len); |
137 | 0 | } |
138 | | |
139 | | static int otlp_pack_any_value(msgpack_packer *mp_pck, |
140 | | Opentelemetry__Proto__Common__V1__AnyValue *body) |
141 | 0 | { |
142 | 0 | int result; |
143 | |
|
144 | 0 | result = -2; |
145 | |
|
146 | 0 | if (body == NULL) { |
147 | 0 | msgpack_pack_nil(mp_pck); |
148 | 0 | return 0; |
149 | 0 | } |
150 | | |
151 | 0 | switch(body->value_case){ |
152 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE: |
153 | 0 | result = otel_pack_string(mp_pck, body->string_value); |
154 | 0 | break; |
155 | | |
156 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE_STRINDEX: |
157 | | /* Profiling-only string dictionary reference: ignore in logs. */ |
158 | 0 | result = msgpack_pack_nil(mp_pck); |
159 | 0 | break; |
160 | | |
161 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE: |
162 | 0 | result = otel_pack_bool(mp_pck, body->bool_value); |
163 | 0 | break; |
164 | | |
165 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE: |
166 | 0 | result = otel_pack_int(mp_pck, body->int_value); |
167 | 0 | break; |
168 | | |
169 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE: |
170 | 0 | result = otel_pack_double(mp_pck, body->double_value); |
171 | 0 | break; |
172 | | |
173 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE: |
174 | 0 | result = otel_pack_array(mp_pck, body->array_value); |
175 | 0 | break; |
176 | | |
177 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE: |
178 | 0 | result = otel_pack_kvlist(mp_pck, body->kvlist_value); |
179 | 0 | break; |
180 | | |
181 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE: |
182 | 0 | result = otel_pack_bytes(mp_pck, body->bytes_value); |
183 | 0 | break; |
184 | | |
185 | 0 | case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET: |
186 | | /* treat an unset value as null */ |
187 | 0 | result = msgpack_pack_nil(mp_pck); |
188 | 0 | break; |
189 | | |
190 | 0 | default: |
191 | 0 | break; |
192 | 0 | } |
193 | | |
194 | 0 | if (result == -2) { |
195 | 0 | flb_error("[otel]: invalid value type in pack_any_value"); |
196 | 0 | result = -1; |
197 | 0 | } |
198 | |
|
199 | 0 | return result; |
200 | 0 | } |
201 | | |
202 | | /* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */ |
203 | | static int otel_pack_v1_metadata(struct flb_opentelemetry *ctx, |
204 | | msgpack_packer *mp_pck, |
205 | | struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record, |
206 | | Opentelemetry__Proto__Resource__V1__Resource *resource, |
207 | | Opentelemetry__Proto__Common__V1__InstrumentationScope *scope) |
208 | 0 | { |
209 | 0 | int ret; |
210 | 0 | int len; |
211 | 0 | struct flb_mp_map_header mh; |
212 | 0 | struct flb_mp_map_header otlp_mh; |
213 | |
|
214 | 0 | flb_mp_map_header_init(&otlp_mh, mp_pck); |
215 | |
|
216 | 0 | len = flb_sds_len(ctx->logs_metadata_key); |
217 | | |
218 | | /* otlp key start */ |
219 | 0 | flb_mp_map_header_append(&otlp_mh); |
220 | |
|
221 | 0 | msgpack_pack_str(mp_pck, len); |
222 | 0 | msgpack_pack_str_body(mp_pck, ctx->logs_metadata_key, len); |
223 | |
|
224 | 0 | flb_mp_map_header_init(&mh, mp_pck); |
225 | |
|
226 | 0 | if (log_record->observed_time_unix_nano != 0) { |
227 | 0 | flb_mp_map_header_append(&mh); |
228 | 0 | msgpack_pack_str(mp_pck, 18); |
229 | 0 | msgpack_pack_str_body(mp_pck, "observed_timestamp", 18); |
230 | 0 | msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano); |
231 | 0 | } |
232 | | |
233 | | /* Value of 0 indicates unknown or missing timestamp. */ |
234 | 0 | if (log_record->time_unix_nano != 0) { |
235 | 0 | flb_mp_map_header_append(&mh); |
236 | 0 | msgpack_pack_str(mp_pck, 9); |
237 | 0 | msgpack_pack_str_body(mp_pck, "timestamp", 9); |
238 | 0 | msgpack_pack_uint64(mp_pck, log_record->time_unix_nano); |
239 | 0 | } |
240 | | |
241 | | /* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ |
242 | 0 | if (log_record->severity_number >= 1 && log_record->severity_number <= 24) { |
243 | 0 | flb_mp_map_header_append(&mh); |
244 | 0 | msgpack_pack_str(mp_pck, 15); |
245 | 0 | msgpack_pack_str_body(mp_pck, "severity_number", 15); |
246 | 0 | msgpack_pack_uint64(mp_pck, log_record->severity_number); |
247 | 0 | } |
248 | |
|
249 | 0 | if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) { |
250 | 0 | flb_mp_map_header_append(&mh); |
251 | 0 | msgpack_pack_str(mp_pck, 13); |
252 | 0 | msgpack_pack_str_body(mp_pck, "severity_text", 13); |
253 | 0 | msgpack_pack_str(mp_pck, strlen(log_record->severity_text)); |
254 | 0 | msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text)); |
255 | 0 | } |
256 | |
|
257 | 0 | if (log_record->n_attributes > 0) { |
258 | 0 | flb_mp_map_header_append(&mh); |
259 | 0 | msgpack_pack_str(mp_pck, 10); |
260 | 0 | msgpack_pack_str_body(mp_pck, "attributes", 10); |
261 | 0 | ret = otel_pack_kvarray(mp_pck, |
262 | 0 | log_record->attributes, |
263 | 0 | log_record->n_attributes); |
264 | 0 | if (ret != 0) { |
265 | 0 | return ret; |
266 | 0 | } |
267 | 0 | } |
268 | | |
269 | 0 | if (log_record->dropped_attributes_count > 0) { |
270 | 0 | flb_mp_map_header_append(&mh); |
271 | 0 | msgpack_pack_str(mp_pck, 24); |
272 | 0 | msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24); |
273 | 0 | msgpack_pack_uint64(mp_pck, log_record->dropped_attributes_count); |
274 | 0 | } |
275 | |
|
276 | 0 | if (log_record->trace_id.len > 0) { |
277 | 0 | flb_mp_map_header_append(&mh); |
278 | 0 | msgpack_pack_str(mp_pck, 8); |
279 | 0 | msgpack_pack_str_body(mp_pck, "trace_id", 8); |
280 | 0 | ret = otel_pack_bytes(mp_pck, log_record->trace_id); |
281 | 0 | if (ret != 0) { |
282 | 0 | return ret; |
283 | 0 | } |
284 | 0 | } |
285 | | |
286 | 0 | if (log_record->span_id.len > 0) { |
287 | 0 | flb_mp_map_header_append(&mh); |
288 | 0 | msgpack_pack_str(mp_pck, 7); |
289 | 0 | msgpack_pack_str_body(mp_pck, "span_id", 7); |
290 | 0 | ret = otel_pack_bytes(mp_pck, log_record->span_id); |
291 | 0 | if (ret != 0) { |
292 | 0 | return ret; |
293 | 0 | } |
294 | 0 | } |
295 | | |
296 | 0 | flb_mp_map_header_append(&mh); |
297 | 0 | msgpack_pack_str(mp_pck, 11); |
298 | 0 | msgpack_pack_str_body(mp_pck, "trace_flags", 11); |
299 | 0 | msgpack_pack_uint8(mp_pck, (uint8_t) log_record->flags & 0xff); |
300 | |
|
301 | 0 | if (log_record->event_name != NULL && strlen(log_record->event_name) > 0) { |
302 | 0 | flb_mp_map_header_append(&mh); |
303 | 0 | msgpack_pack_str(mp_pck, 10); |
304 | 0 | msgpack_pack_str_body(mp_pck, "event_name", 10); |
305 | 0 | msgpack_pack_str(mp_pck, strlen(log_record->event_name)); |
306 | 0 | msgpack_pack_str_body(mp_pck, log_record->event_name, strlen(log_record->event_name)); |
307 | 0 | } |
308 | |
|
309 | 0 | flb_mp_map_header_end(&mh); |
310 | | |
311 | | /* otlp key end */ |
312 | 0 | flb_mp_map_header_end(&otlp_mh); |
313 | |
|
314 | 0 | return 0; |
315 | 0 | } |
316 | | |
317 | | static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, |
318 | | struct flb_log_event_encoder *encoder, |
319 | | char *tag, size_t tag_len, |
320 | | uint8_t *in_buf, |
321 | | size_t in_size) |
322 | 0 | { |
323 | 0 | int ret = 0; |
324 | 0 | int len; |
325 | 0 | int resource_logs_index; |
326 | 0 | int scope_log_index; |
327 | 0 | int log_record_index; |
328 | 0 | char *logs_body_key; |
329 | 0 | int scope_has_schema_url; |
330 | 0 | struct flb_mp_map_header mh; |
331 | 0 | struct flb_mp_map_header mh_tmp; |
332 | 0 | struct flb_time tm; |
333 | |
|
334 | 0 | msgpack_packer *mp_pck; |
335 | 0 | msgpack_packer *mp_pck_meta; |
336 | | |
337 | | /* OTel proto suff */ |
338 | 0 | Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; |
339 | 0 | Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; |
340 | 0 | Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log; |
341 | 0 | Opentelemetry__Proto__Common__V1__InstrumentationScope *scope; |
342 | |
|
343 | 0 | Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs; |
344 | 0 | Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; |
345 | 0 | Opentelemetry__Proto__Logs__V1__LogRecord **log_records; |
346 | 0 | Opentelemetry__Proto__Resource__V1__Resource *resource; |
347 | |
|
348 | 0 | mp_pck = &encoder->body.packer; |
349 | 0 | mp_pck_meta = &encoder->metadata.packer; |
350 | | |
351 | | /* unpack logs from protobuf payload */ |
352 | 0 | input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf); |
353 | 0 | if (input_logs == NULL) { |
354 | 0 | flb_plg_warn(ctx->ins, "failed to unpack input logs from OpenTelemetry payload"); |
355 | 0 | ret = -1; |
356 | 0 | goto binary_payload_to_msgpack_end; |
357 | 0 | } |
358 | | |
359 | 0 | resource_logs = input_logs->resource_logs; |
360 | 0 | if (input_logs->n_resource_logs == 0) { |
361 | 0 | ret = 0; |
362 | 0 | goto binary_payload_to_msgpack_end; |
363 | 0 | } |
364 | | |
365 | 0 | if (resource_logs == NULL) { |
366 | 0 | flb_plg_warn(ctx->ins, "no resource logs found"); |
367 | 0 | ret = -1; |
368 | 0 | goto binary_payload_to_msgpack_end; |
369 | 0 | } |
370 | | |
371 | 0 | for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { |
372 | 0 | resource_log = resource_logs[resource_logs_index]; |
373 | 0 | if (resource_log == NULL) { |
374 | 0 | flb_plg_warn(ctx->ins, "null resource logs entry found"); |
375 | 0 | ret = -1; |
376 | 0 | goto binary_payload_to_msgpack_end; |
377 | 0 | } |
378 | | |
379 | 0 | resource = resource_log->resource; |
380 | 0 | scope_logs = resource_log->scope_logs; |
381 | |
|
382 | 0 | if (resource_log->n_scope_logs > 0 && scope_logs == NULL) { |
383 | 0 | flb_plg_warn(ctx->ins, "no scope logs found"); |
384 | 0 | ret = -1; |
385 | 0 | goto binary_payload_to_msgpack_end; |
386 | 0 | } |
387 | | |
388 | 0 | for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) { |
389 | 0 | scope_log = scope_logs[scope_log_index]; |
390 | 0 | if (scope_log == NULL) { |
391 | 0 | flb_plg_warn(ctx->ins, "null scope logs entry found"); |
392 | 0 | ret = -1; |
393 | 0 | goto binary_payload_to_msgpack_end; |
394 | 0 | } |
395 | | |
396 | 0 | log_records = scope_log->log_records; |
397 | |
|
398 | 0 | if (scope_log->n_log_records == 0) { |
399 | 0 | continue; |
400 | 0 | } |
401 | | |
402 | 0 | if (log_records == NULL) { |
403 | 0 | flb_plg_warn(ctx->ins, "no log records found"); |
404 | 0 | ret = -1; |
405 | 0 | goto binary_payload_to_msgpack_end; |
406 | 0 | } |
407 | | |
408 | 0 | flb_log_event_encoder_group_init(encoder); |
409 | | |
410 | | /* pack schema (internal) */ |
411 | 0 | ret = flb_log_event_encoder_append_metadata_values(encoder, |
412 | 0 | FLB_LOG_EVENT_STRING_VALUE("schema", 6), |
413 | 0 | FLB_LOG_EVENT_STRING_VALUE("otlp", 4), |
414 | 0 | FLB_LOG_EVENT_STRING_VALUE("resource_id", 11), |
415 | 0 | FLB_LOG_EVENT_INT64_VALUE(resource_logs_index), |
416 | 0 | FLB_LOG_EVENT_STRING_VALUE("scope_id", 8), |
417 | 0 | FLB_LOG_EVENT_INT64_VALUE(scope_log_index)); |
418 | | |
419 | |
|
420 | 0 | ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body); |
421 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
422 | 0 | flb_plg_error(ctx->ins, "failed to reset log event body: %s", |
423 | 0 | flb_log_event_encoder_get_error_description(ret)); |
424 | 0 | goto binary_payload_to_msgpack_end; |
425 | 0 | } |
426 | | |
427 | 0 | flb_mp_map_header_init(&mh, mp_pck); |
428 | | |
429 | | /* Resource */ |
430 | 0 | flb_mp_map_header_append(&mh); |
431 | 0 | msgpack_pack_str(mp_pck, 8); |
432 | 0 | msgpack_pack_str_body(mp_pck, "resource", 8); |
433 | |
|
434 | 0 | flb_mp_map_header_init(&mh_tmp, mp_pck); |
435 | 0 | if (resource) { |
436 | | /* look for OTel resource attributes */ |
437 | 0 | if (resource->n_attributes > 0 && resource->attributes) { |
438 | 0 | flb_mp_map_header_append(&mh_tmp); |
439 | 0 | msgpack_pack_str(mp_pck, 10); |
440 | 0 | msgpack_pack_str_body(mp_pck, "attributes", 10); |
441 | |
|
442 | 0 | ret = otel_pack_kvarray(mp_pck, |
443 | 0 | resource->attributes, |
444 | 0 | resource->n_attributes); |
445 | 0 | if (ret != 0) { |
446 | 0 | goto binary_payload_to_msgpack_end; |
447 | 0 | } |
448 | 0 | } |
449 | | |
450 | 0 | if (resource->dropped_attributes_count > 0) { |
451 | 0 | flb_mp_map_header_append(&mh_tmp); |
452 | 0 | msgpack_pack_str(mp_pck, 24); |
453 | 0 | msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24); |
454 | 0 | msgpack_pack_uint64(mp_pck, resource->dropped_attributes_count); |
455 | 0 | } |
456 | |
|
457 | 0 | if (resource_log->schema_url) { |
458 | 0 | flb_mp_map_header_append(&mh_tmp); |
459 | 0 | msgpack_pack_str(mp_pck, 10); |
460 | 0 | msgpack_pack_str_body(mp_pck, "schema_url", 10); |
461 | |
|
462 | 0 | len = strlen(resource_log->schema_url); |
463 | 0 | msgpack_pack_str(mp_pck, len); |
464 | 0 | msgpack_pack_str_body(mp_pck, resource_log->schema_url, len); |
465 | 0 | } |
466 | 0 | } |
467 | 0 | flb_mp_map_header_end(&mh_tmp); |
468 | | |
469 | | /* scope */ |
470 | 0 | flb_mp_map_header_append(&mh); |
471 | 0 | msgpack_pack_str(mp_pck, 5); |
472 | 0 | msgpack_pack_str_body(mp_pck, "scope", 5); |
473 | | |
474 | | /* Scope */ |
475 | 0 | scope = scope_log->scope; |
476 | 0 | scope_has_schema_url = FLB_FALSE; |
477 | |
|
478 | 0 | if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) { |
479 | 0 | scope_has_schema_url = FLB_TRUE; |
480 | 0 | } |
481 | |
|
482 | 0 | if (scope && (scope->name || scope->version || |
483 | 0 | scope->n_attributes > 0 || scope->dropped_attributes_count > 0 || |
484 | 0 | scope_has_schema_url == FLB_TRUE)) { |
485 | 0 | flb_mp_map_header_init(&mh_tmp, mp_pck); |
486 | |
|
487 | 0 | if (scope_has_schema_url == FLB_TRUE) { |
488 | 0 | flb_mp_map_header_append(&mh_tmp); |
489 | 0 | msgpack_pack_str(mp_pck, 10); |
490 | 0 | msgpack_pack_str_body(mp_pck, "schema_url", 10); |
491 | |
|
492 | 0 | len = strlen(scope_log->schema_url); |
493 | 0 | msgpack_pack_str(mp_pck, len); |
494 | 0 | msgpack_pack_str_body(mp_pck, scope_log->schema_url, len); |
495 | 0 | } |
496 | |
|
497 | 0 | if (scope->name && strlen(scope->name) > 0) { |
498 | 0 | flb_mp_map_header_append(&mh_tmp); |
499 | 0 | msgpack_pack_str(mp_pck, 4); |
500 | 0 | msgpack_pack_str_body(mp_pck, "name", 4); |
501 | |
|
502 | 0 | len = strlen(scope->name); |
503 | 0 | msgpack_pack_str(mp_pck, len); |
504 | 0 | msgpack_pack_str_body(mp_pck, scope->name, len); |
505 | 0 | } |
506 | 0 | if (scope->version && strlen(scope->version) > 0) { |
507 | 0 | flb_mp_map_header_append(&mh_tmp); |
508 | |
|
509 | 0 | msgpack_pack_str(mp_pck, 7); |
510 | 0 | msgpack_pack_str_body(mp_pck, "version", 7); |
511 | |
|
512 | 0 | len = strlen(scope->version); |
513 | 0 | msgpack_pack_str(mp_pck, len); |
514 | 0 | msgpack_pack_str_body(mp_pck, scope->version, len); |
515 | 0 | } |
516 | |
|
517 | 0 | if (scope->n_attributes > 0 && scope->attributes) { |
518 | 0 | flb_mp_map_header_append(&mh_tmp); |
519 | 0 | msgpack_pack_str(mp_pck, 10); |
520 | 0 | msgpack_pack_str_body(mp_pck, "attributes", 10); |
521 | 0 | ret = otel_pack_kvarray(mp_pck, |
522 | 0 | scope->attributes, |
523 | 0 | scope->n_attributes); |
524 | 0 | if (ret != 0) { |
525 | 0 | goto binary_payload_to_msgpack_end; |
526 | 0 | } |
527 | 0 | } |
528 | | |
529 | 0 | if (scope->dropped_attributes_count > 0) { |
530 | 0 | flb_mp_map_header_append(&mh_tmp); |
531 | 0 | msgpack_pack_str(mp_pck, 24); |
532 | 0 | msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24); |
533 | 0 | msgpack_pack_uint64(mp_pck, scope->dropped_attributes_count); |
534 | 0 | } |
535 | |
|
536 | 0 | flb_mp_map_header_end(&mh_tmp); |
537 | 0 | } |
538 | 0 | else { |
539 | 0 | flb_mp_map_header_init(&mh_tmp, mp_pck); |
540 | |
|
541 | 0 | if (scope_has_schema_url == FLB_TRUE) { |
542 | 0 | flb_mp_map_header_append(&mh_tmp); |
543 | 0 | msgpack_pack_str(mp_pck, 10); |
544 | 0 | msgpack_pack_str_body(mp_pck, "schema_url", 10); |
545 | |
|
546 | 0 | len = strlen(scope_log->schema_url); |
547 | 0 | msgpack_pack_str(mp_pck, len); |
548 | 0 | msgpack_pack_str_body(mp_pck, scope_log->schema_url, len); |
549 | 0 | } |
550 | |
|
551 | 0 | flb_mp_map_header_end(&mh_tmp); |
552 | 0 | } |
553 | | |
554 | 0 | flb_mp_map_header_end(&mh); |
555 | |
|
556 | 0 | ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body); |
557 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
558 | 0 | flb_plg_error(ctx->ins, "could not set group content metadata: %s", |
559 | 0 | flb_log_event_encoder_get_error_description(ret)); |
560 | 0 | goto binary_payload_to_msgpack_end; |
561 | 0 | } |
562 | | |
563 | 0 | flb_log_event_encoder_group_header_end(encoder); |
564 | |
|
565 | 0 | for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) { |
566 | 0 | ret = flb_log_event_encoder_begin_record(encoder); |
567 | |
|
568 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
569 | 0 | if (log_records[log_record_index]->time_unix_nano > 0) { |
570 | 0 | flb_time_from_uint64(&tm, log_records[log_record_index]->time_unix_nano); |
571 | 0 | ret = flb_log_event_encoder_set_timestamp(encoder, &tm); |
572 | 0 | } |
573 | 0 | else if (log_records[log_record_index]->observed_time_unix_nano > 0) { |
574 | 0 | flb_time_from_uint64(&tm, log_records[log_record_index]->observed_time_unix_nano); |
575 | 0 | ret = flb_log_event_encoder_set_timestamp(encoder, &tm); |
576 | 0 | } |
577 | 0 | else { |
578 | 0 | flb_time_get(&tm); |
579 | 0 | ret = flb_log_event_encoder_set_timestamp(encoder, &tm); |
580 | 0 | } |
581 | 0 | } |
582 | |
|
583 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
584 | 0 | ret = flb_log_event_encoder_dynamic_field_reset(&encoder->metadata); |
585 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
586 | 0 | flb_plg_error(ctx->ins, "failed to reset log event metadata: %s", |
587 | 0 | flb_log_event_encoder_get_error_description(ret)); |
588 | 0 | ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; |
589 | 0 | } |
590 | 0 | else { |
591 | 0 | ret = otel_pack_v1_metadata(ctx, |
592 | 0 | mp_pck_meta, |
593 | 0 | log_records[log_record_index], |
594 | 0 | resource, |
595 | 0 | scope_log->scope); |
596 | 0 | } |
597 | |
|
598 | 0 | if (ret != 0) { |
599 | 0 | flb_plg_error(ctx->ins, "failed to convert log record"); |
600 | 0 | ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; |
601 | 0 | } |
602 | 0 | else { |
603 | 0 | ret = flb_log_event_encoder_dynamic_field_flush(&encoder->metadata); |
604 | 0 | } |
605 | 0 | } |
606 | |
|
607 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
608 | 0 | ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body); |
609 | 0 | if (ret != FLB_EVENT_ENCODER_SUCCESS) { |
610 | 0 | flb_plg_error(ctx->ins, "failed to reset log event body: %s", |
611 | 0 | flb_log_event_encoder_get_error_description(ret)); |
612 | 0 | ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; |
613 | 0 | } |
614 | 0 | else if (ctx->logs_body_key == NULL && |
615 | 0 | log_records[log_record_index]->body != NULL && |
616 | 0 | log_records[log_record_index]->body->value_case == |
617 | 0 | OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) { |
618 | 0 | ret = otlp_pack_any_value( |
619 | 0 | mp_pck, |
620 | 0 | log_records[log_record_index]->body); |
621 | 0 | } |
622 | 0 | else { |
623 | 0 | logs_body_key = ctx->logs_body_key; |
624 | 0 | if (logs_body_key == NULL) { |
625 | 0 | logs_body_key = "log"; |
626 | 0 | } |
627 | 0 | ret = msgpack_pack_map(mp_pck, 1); |
628 | 0 | if (ret == 0) { |
629 | 0 | ret = msgpack_pack_str(mp_pck, strlen(logs_body_key)); |
630 | 0 | } |
631 | 0 | if (ret == 0) { |
632 | 0 | ret = msgpack_pack_str_body(mp_pck, |
633 | 0 | logs_body_key, |
634 | 0 | strlen(logs_body_key)); |
635 | 0 | } |
636 | 0 | if (ret == 0) { |
637 | 0 | ret = otlp_pack_any_value( |
638 | 0 | mp_pck, |
639 | 0 | log_records[log_record_index]->body); |
640 | 0 | } |
641 | 0 | } |
642 | |
|
643 | 0 | if (ret != 0) { |
644 | 0 | flb_plg_error(ctx->ins, "failed to convert log record body"); |
645 | 0 | ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; |
646 | 0 | } |
647 | 0 | else { |
648 | 0 | ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body); |
649 | 0 | } |
650 | 0 | } |
651 | |
|
652 | 0 | if (ret == FLB_EVENT_ENCODER_SUCCESS) { |
653 | 0 | ret = flb_log_event_encoder_commit_record(encoder); |
654 | 0 | } |
655 | 0 | else { |
656 | 0 | flb_plg_error(ctx->ins, "marshalling error"); |
657 | 0 | goto binary_payload_to_msgpack_end; |
658 | 0 | } |
659 | 0 | } |
660 | | |
661 | 0 | flb_log_event_encoder_group_end(encoder); |
662 | |
|
663 | 0 | } |
664 | 0 | } |
665 | | |
666 | 0 | binary_payload_to_msgpack_end: |
667 | 0 | if (input_logs) { |
668 | 0 | opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked( |
669 | 0 | input_logs, NULL); |
670 | 0 | } |
671 | |
|
672 | 0 | if (ret != 0) { |
673 | 0 | return -1; |
674 | 0 | } |
675 | | |
676 | 0 | return 0; |
677 | 0 | } |
678 | | |
679 | | /* |
680 | | * Main function used from opentelemetry_prot.c to process logs either in JSON or Protobuf format. |
681 | | * ----------------------------------------------------------------------------------------------- |
682 | | */ |
683 | | int opentelemetry_process_logs(struct flb_opentelemetry *ctx, |
684 | | flb_sds_t content_type, |
685 | | flb_sds_t tag, |
686 | | size_t tag_len, |
687 | | void *data, size_t size) |
688 | 0 | { |
689 | 0 | int ret = -1; |
690 | 0 | int is_proto = FLB_FALSE; /* default to JSON */ |
691 | 0 | int error_status = 0; |
692 | 0 | char *buf; |
693 | 0 | uint8_t *payload; |
694 | 0 | uint64_t payload_size; |
695 | 0 | struct flb_log_event_encoder *encoder; |
696 | |
|
697 | 0 | buf = (char *) data; |
698 | 0 | payload = data; |
699 | 0 | payload_size = size; |
700 | | |
701 | | /* Detect the type of payload */ |
702 | 0 | if (content_type) { |
703 | 0 | if (opentelemetry_is_json_content_type(content_type) == FLB_TRUE) { |
704 | 0 | if (opentelemetry_payload_starts_with_json_object(buf, size) != FLB_TRUE) { |
705 | 0 | flb_plg_error(ctx->ins, "Invalid JSON payload"); |
706 | 0 | return -1; |
707 | 0 | } |
708 | 0 | is_proto = FLB_FALSE; |
709 | 0 | } |
710 | 0 | else if (opentelemetry_is_protobuf_content_type(content_type) == FLB_TRUE) { |
711 | 0 | is_proto = FLB_TRUE; |
712 | 0 | } |
713 | 0 | else { |
714 | 0 | flb_plg_error(ctx->ins, "Unsupported content type %s", content_type); |
715 | 0 | return -1; |
716 | 0 | } |
717 | 0 | } |
718 | | |
719 | 0 | encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); |
720 | 0 | if (encoder == NULL) { |
721 | 0 | return -1; |
722 | 0 | } |
723 | | |
724 | 0 | if (is_proto == FLB_TRUE) { |
725 | 0 | ret = binary_payload_to_msgpack(ctx, encoder, |
726 | 0 | tag, tag_len, |
727 | 0 | (uint8_t *) payload, payload_size); |
728 | 0 | if (ret < 0) { |
729 | 0 | flb_plg_error(ctx->ins, "failed to process logs from protobuf payload"); |
730 | 0 | } |
731 | 0 | } |
732 | 0 | else { |
733 | 0 | ret = flb_opentelemetry_logs_json_to_msgpack(encoder, |
734 | 0 | (const char *) payload, payload_size, |
735 | 0 | ctx->logs_body_key, |
736 | 0 | &error_status); |
737 | 0 | if (ret != 0) { |
738 | | /* we are printing the error for now, let's see what is the user's preference later */ |
739 | 0 | flb_plg_error(ctx->ins, "failed to process logs from JSON payload (%i) %s", |
740 | 0 | error_status, |
741 | 0 | flb_opentelemetry_error_to_string(error_status)); |
742 | 0 | } |
743 | |
|
744 | 0 | } |
745 | |
|
746 | 0 | if (ret >= 0) { |
747 | 0 | if (opentelemetry_uses_worker_ingress_queue(ctx)) { |
748 | 0 | size_t allocation_size; |
749 | 0 | void *resized_buffer; |
750 | |
|
751 | 0 | allocation_size = encoder->buffer.alloc; |
752 | |
|
753 | 0 | if (allocation_size > encoder->output_length) { |
754 | 0 | resized_buffer = flb_realloc(encoder->output_buffer, |
755 | 0 | encoder->output_length); |
756 | 0 | if (resized_buffer != NULL) { |
757 | 0 | encoder->buffer.data = resized_buffer; |
758 | 0 | encoder->output_buffer = resized_buffer; |
759 | 0 | encoder->buffer.alloc = encoder->output_length; |
760 | 0 | allocation_size = encoder->output_length; |
761 | 0 | } |
762 | 0 | } |
763 | |
|
764 | 0 | ret = opentelemetry_ingest_logs_take(ctx, |
765 | 0 | tag, |
766 | 0 | flb_sds_len(tag), |
767 | 0 | encoder->output_buffer, |
768 | 0 | encoder->output_length, |
769 | 0 | allocation_size); |
770 | 0 | if (ret == 0 || ret == FLB_INPUT_INGRESS_BUSY) { |
771 | 0 | flb_log_event_encoder_claim_internal_buffer_ownership(encoder); |
772 | 0 | } |
773 | 0 | } |
774 | 0 | else { |
775 | 0 | ret = opentelemetry_ingest_logs(ctx, |
776 | 0 | tag, |
777 | 0 | flb_sds_len(tag), |
778 | 0 | encoder->output_buffer, |
779 | 0 | encoder->output_length); |
780 | 0 | } |
781 | |
|
782 | 0 | if (ret != 0) { |
783 | 0 | flb_plg_error(ctx->ins, "failed to append logs to the input buffer"); |
784 | 0 | } |
785 | 0 | } |
786 | |
|
787 | 0 | flb_log_event_encoder_destroy(encoder); |
788 | 0 | return ret; |
789 | 0 | } |