Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_opentelemetry/opentelemetry_logs.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_sds.h>
22
#include <fluent-bit/flb_pack.h>
23
#include <fluent-bit/flb_log_event_encoder.h>
24
#include <fluent-bit/flb_time.h>
25
#include <fluent-bit/flb_opentelemetry.h>
26
#include <fluent-otel-proto/fluent-otel.h>
27
28
29
#include "opentelemetry.h"
30
#include "opentelemetry_utils.h"
31
32
/*
33
 * OTLP encoding functions to pack the log records as msgpack
34
 * ----------------------------------------------------------
35
 */
36
static int otlp_pack_any_value(msgpack_packer *mp_pck, Opentelemetry__Proto__Common__V1__AnyValue *body);
37
38
static int otel_pack_string(msgpack_packer *mp_pck, char *str)
39
0
{
40
0
    return msgpack_pack_str_with_body(mp_pck, str, strlen(str));
41
0
}
42
43
static int otel_pack_bool(msgpack_packer *mp_pck, bool val)
44
0
{
45
0
    if (val) {
46
0
        return msgpack_pack_true(mp_pck);
47
0
    }
48
0
    else {
49
0
        return msgpack_pack_false(mp_pck);
50
0
    }
51
0
}
52
53
static int otel_pack_int(msgpack_packer *mp_pck, int val)
54
0
{
55
0
    return msgpack_pack_int64(mp_pck, val);
56
0
}
57
58
static int otel_pack_double(msgpack_packer *mp_pck, double val)
59
0
{
60
0
    return msgpack_pack_double(mp_pck, val);
61
0
}
62
63
static int otel_pack_kvarray(msgpack_packer *mp_pck,
64
                             Opentelemetry__Proto__Common__V1__KeyValue **kv_array,
65
                             size_t kv_count)
66
0
{
67
0
    int result;
68
0
    int index;
69
70
0
    result = msgpack_pack_map(mp_pck, kv_count);
71
72
0
    if (result != 0) {
73
0
        return result;
74
0
    }
75
76
0
    for (index = 0; index < kv_count && result == 0; index++) {
77
0
        result = otel_pack_string(mp_pck, kv_array[index]->key);
78
79
0
        if(result == 0) {
80
0
           result = otlp_pack_any_value(mp_pck, kv_array[index]->value);
81
0
        }
82
0
    }
83
84
0
    return result;
85
0
}
86
87
static int otel_pack_kvlist(msgpack_packer *mp_pck,
88
                            Opentelemetry__Proto__Common__V1__KeyValueList *kv_list)
89
0
{
90
0
    int kv_index;
91
0
    int ret;
92
0
    char *key;
93
0
    Opentelemetry__Proto__Common__V1__AnyValue *value;
94
95
0
    ret = msgpack_pack_map(mp_pck, kv_list->n_values);
96
0
    if (ret != 0) {
97
0
        return ret;
98
0
    }
99
100
0
    for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) {
101
0
        key = kv_list->values[kv_index]->key;
102
0
        value = kv_list->values[kv_index]->value;
103
104
0
        ret = otel_pack_string(mp_pck, key);
105
106
0
        if(ret == 0) {
107
0
           ret = otlp_pack_any_value(mp_pck, value);
108
0
        }
109
0
    }
110
111
0
    return ret;
112
0
}
113
114
static int otel_pack_array(msgpack_packer *mp_pck,
115
                           Opentelemetry__Proto__Common__V1__ArrayValue *array)
116
0
{
117
0
    int ret;
118
0
    int array_index;
119
120
0
    ret = msgpack_pack_array(mp_pck, array->n_values);
121
122
0
    if (ret != 0) {
123
0
        return ret;
124
0
    }
125
126
0
    for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) {
127
0
        ret = otlp_pack_any_value(mp_pck, array->values[array_index]);
128
0
    }
129
130
0
    return ret;
131
0
}
132
133
static int otel_pack_bytes(msgpack_packer *mp_pck,
134
                           ProtobufCBinaryData bytes)
135
0
{
136
0
    return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len);
137
0
}
138
139
static int otlp_pack_any_value(msgpack_packer *mp_pck,
140
                               Opentelemetry__Proto__Common__V1__AnyValue *body)
141
0
{
142
0
    int result;
143
144
0
    result = -2;
145
146
0
    if (body == NULL) {
147
0
        msgpack_pack_nil(mp_pck);
148
0
        return 0;
149
0
    }
150
151
0
    switch(body->value_case){
152
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE:
153
0
            result = otel_pack_string(mp_pck, body->string_value);
154
0
            break;
155
156
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE_STRINDEX:
157
            /* Profiling-only string dictionary reference: ignore in logs. */
158
0
            result = msgpack_pack_nil(mp_pck);
159
0
            break;
160
161
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE:
162
0
            result =  otel_pack_bool(mp_pck, body->bool_value);
163
0
            break;
164
165
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE:
166
0
            result = otel_pack_int(mp_pck, body->int_value);
167
0
            break;
168
169
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE:
170
0
            result = otel_pack_double(mp_pck, body->double_value);
171
0
            break;
172
173
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE:
174
0
            result = otel_pack_array(mp_pck, body->array_value);
175
0
            break;
176
177
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE:
178
0
            result = otel_pack_kvlist(mp_pck, body->kvlist_value);
179
0
            break;
180
181
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE:
182
0
            result = otel_pack_bytes(mp_pck, body->bytes_value);
183
0
            break;
184
185
0
        case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET:
186
            /* treat an unset value as null */
187
0
            result = msgpack_pack_nil(mp_pck);
188
0
            break;
189
190
0
        default:
191
0
            break;
192
0
    }
193
194
0
    if (result == -2) {
195
0
        flb_error("[otel]: invalid value type in pack_any_value");
196
0
        result = -1;
197
0
    }
198
199
0
    return result;
200
0
}
201
202
/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */
203
static int otel_pack_v1_metadata(struct flb_opentelemetry *ctx,
204
                                 msgpack_packer *mp_pck,
205
                                 struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
206
                                 Opentelemetry__Proto__Resource__V1__Resource *resource,
207
                                 Opentelemetry__Proto__Common__V1__InstrumentationScope *scope)
208
0
{
209
0
    int ret;
210
0
    int len;
211
0
    struct flb_mp_map_header mh;
212
0
    struct flb_mp_map_header otlp_mh;
213
214
0
    flb_mp_map_header_init(&otlp_mh, mp_pck);
215
216
0
    len = flb_sds_len(ctx->logs_metadata_key);
217
218
    /* otlp key start */
219
0
    flb_mp_map_header_append(&otlp_mh);
220
221
0
    msgpack_pack_str(mp_pck, len);
222
0
    msgpack_pack_str_body(mp_pck, ctx->logs_metadata_key, len);
223
224
0
    flb_mp_map_header_init(&mh, mp_pck);
225
226
0
    if (log_record->observed_time_unix_nano != 0) {
227
0
        flb_mp_map_header_append(&mh);
228
0
        msgpack_pack_str(mp_pck, 18);
229
0
        msgpack_pack_str_body(mp_pck, "observed_timestamp", 18);
230
0
        msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano);
231
0
    }
232
233
    /* Value of 0 indicates unknown or missing timestamp. */
234
0
    if (log_record->time_unix_nano != 0) {
235
0
        flb_mp_map_header_append(&mh);
236
0
        msgpack_pack_str(mp_pck, 9);
237
0
        msgpack_pack_str_body(mp_pck, "timestamp", 9);
238
0
        msgpack_pack_uint64(mp_pck, log_record->time_unix_nano);
239
0
    }
240
241
    /* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
242
0
    if (log_record->severity_number >= 1 && log_record->severity_number <= 24) {
243
0
        flb_mp_map_header_append(&mh);
244
0
        msgpack_pack_str(mp_pck, 15);
245
0
        msgpack_pack_str_body(mp_pck, "severity_number", 15);
246
0
        msgpack_pack_uint64(mp_pck, log_record->severity_number);
247
0
    }
248
249
0
    if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) {
250
0
        flb_mp_map_header_append(&mh);
251
0
        msgpack_pack_str(mp_pck, 13);
252
0
        msgpack_pack_str_body(mp_pck, "severity_text", 13);
253
0
        msgpack_pack_str(mp_pck, strlen(log_record->severity_text));
254
0
        msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text));
255
0
    }
256
257
0
    if (log_record->n_attributes > 0) {
258
0
        flb_mp_map_header_append(&mh);
259
0
        msgpack_pack_str(mp_pck, 10);
260
0
        msgpack_pack_str_body(mp_pck, "attributes", 10);
261
0
        ret = otel_pack_kvarray(mp_pck,
262
0
                                log_record->attributes,
263
0
                                log_record->n_attributes);
264
0
        if (ret != 0) {
265
0
            return ret;
266
0
        }
267
0
    }
268
269
0
    if (log_record->dropped_attributes_count > 0) {
270
0
        flb_mp_map_header_append(&mh);
271
0
        msgpack_pack_str(mp_pck, 24);
272
0
        msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
273
0
        msgpack_pack_uint64(mp_pck, log_record->dropped_attributes_count);
274
0
    }
275
276
0
    if (log_record->trace_id.len > 0) {
277
0
        flb_mp_map_header_append(&mh);
278
0
        msgpack_pack_str(mp_pck, 8);
279
0
        msgpack_pack_str_body(mp_pck, "trace_id", 8);
280
0
        ret = otel_pack_bytes(mp_pck, log_record->trace_id);
281
0
        if (ret != 0) {
282
0
            return ret;
283
0
        }
284
0
    }
285
286
0
    if (log_record->span_id.len > 0) {
287
0
        flb_mp_map_header_append(&mh);
288
0
        msgpack_pack_str(mp_pck, 7);
289
0
        msgpack_pack_str_body(mp_pck, "span_id", 7);
290
0
        ret = otel_pack_bytes(mp_pck, log_record->span_id);
291
0
        if (ret != 0) {
292
0
            return ret;
293
0
        }
294
0
    }
295
296
0
    flb_mp_map_header_append(&mh);
297
0
    msgpack_pack_str(mp_pck, 11);
298
0
    msgpack_pack_str_body(mp_pck, "trace_flags", 11);
299
0
    msgpack_pack_uint8(mp_pck, (uint8_t) log_record->flags & 0xff);
300
301
0
    if (log_record->event_name != NULL && strlen(log_record->event_name) > 0) {
302
0
        flb_mp_map_header_append(&mh);
303
0
        msgpack_pack_str(mp_pck, 10);
304
0
        msgpack_pack_str_body(mp_pck, "event_name", 10);
305
0
        msgpack_pack_str(mp_pck, strlen(log_record->event_name));
306
0
        msgpack_pack_str_body(mp_pck, log_record->event_name, strlen(log_record->event_name));
307
0
    }
308
309
0
    flb_mp_map_header_end(&mh);
310
311
    /* otlp key end */
312
0
    flb_mp_map_header_end(&otlp_mh);
313
314
0
    return 0;
315
0
}
316
317
static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
318
                                     struct flb_log_event_encoder *encoder,
319
                                     char *tag, size_t tag_len,
320
                                     uint8_t *in_buf,
321
                                     size_t in_size)
322
0
{
323
0
    int ret = 0;
324
0
    int len;
325
0
    int resource_logs_index;
326
0
    int scope_log_index;
327
0
    int log_record_index;
328
0
    char *logs_body_key;
329
0
    int scope_has_schema_url;
330
0
    struct flb_mp_map_header mh;
331
0
    struct flb_mp_map_header mh_tmp;
332
0
    struct flb_time tm;
333
334
0
    msgpack_packer *mp_pck;
335
0
    msgpack_packer *mp_pck_meta;
336
337
    /* OTel proto suff */
338
0
    Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
339
0
    Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
340
0
    Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log;
341
0
    Opentelemetry__Proto__Common__V1__InstrumentationScope *scope;
342
343
0
    Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
344
0
    Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
345
0
    Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
346
0
    Opentelemetry__Proto__Resource__V1__Resource *resource;
347
348
0
    mp_pck = &encoder->body.packer;
349
0
    mp_pck_meta = &encoder->metadata.packer;
350
351
    /* unpack logs from protobuf payload */
352
0
    input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
353
0
    if (input_logs == NULL) {
354
0
        flb_plg_warn(ctx->ins, "failed to unpack input logs from OpenTelemetry payload");
355
0
        ret = -1;
356
0
        goto binary_payload_to_msgpack_end;
357
0
    }
358
359
0
    resource_logs = input_logs->resource_logs;
360
0
    if (input_logs->n_resource_logs == 0) {
361
0
        ret = 0;
362
0
        goto binary_payload_to_msgpack_end;
363
0
    }
364
365
0
    if (resource_logs == NULL) {
366
0
        flb_plg_warn(ctx->ins, "no resource logs found");
367
0
        ret = -1;
368
0
        goto binary_payload_to_msgpack_end;
369
0
    }
370
371
0
    for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
372
0
        resource_log = resource_logs[resource_logs_index];
373
0
        if (resource_log == NULL) {
374
0
            flb_plg_warn(ctx->ins, "null resource logs entry found");
375
0
            ret = -1;
376
0
            goto binary_payload_to_msgpack_end;
377
0
        }
378
379
0
        resource = resource_log->resource;
380
0
        scope_logs = resource_log->scope_logs;
381
382
0
        if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
383
0
            flb_plg_warn(ctx->ins, "no scope logs found");
384
0
            ret = -1;
385
0
            goto binary_payload_to_msgpack_end;
386
0
        }
387
388
0
        for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) {
389
0
            scope_log = scope_logs[scope_log_index];
390
0
            if (scope_log == NULL) {
391
0
                flb_plg_warn(ctx->ins, "null scope logs entry found");
392
0
                ret = -1;
393
0
                goto binary_payload_to_msgpack_end;
394
0
            }
395
396
0
            log_records = scope_log->log_records;
397
398
0
            if (scope_log->n_log_records == 0) {
399
0
                continue;
400
0
            }
401
402
0
            if (log_records == NULL) {
403
0
                flb_plg_warn(ctx->ins, "no log records found");
404
0
                ret = -1;
405
0
                goto binary_payload_to_msgpack_end;
406
0
            }
407
408
0
            flb_log_event_encoder_group_init(encoder);
409
410
            /* pack schema (internal) */
411
0
            ret = flb_log_event_encoder_append_metadata_values(encoder,
412
0
                                                               FLB_LOG_EVENT_STRING_VALUE("schema", 6),
413
0
                                                               FLB_LOG_EVENT_STRING_VALUE("otlp", 4),
414
0
                                                               FLB_LOG_EVENT_STRING_VALUE("resource_id", 11),
415
0
                                                               FLB_LOG_EVENT_INT64_VALUE(resource_logs_index),
416
0
                                                               FLB_LOG_EVENT_STRING_VALUE("scope_id", 8),
417
0
                                                               FLB_LOG_EVENT_INT64_VALUE(scope_log_index));
418
419
420
0
            ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
421
0
            if (ret != FLB_EVENT_ENCODER_SUCCESS) {
422
0
                flb_plg_error(ctx->ins, "failed to reset log event body: %s",
423
0
                              flb_log_event_encoder_get_error_description(ret));
424
0
                goto binary_payload_to_msgpack_end;
425
0
            }
426
427
0
            flb_mp_map_header_init(&mh, mp_pck);
428
429
            /* Resource */
430
0
            flb_mp_map_header_append(&mh);
431
0
            msgpack_pack_str(mp_pck, 8);
432
0
            msgpack_pack_str_body(mp_pck, "resource", 8);
433
434
0
            flb_mp_map_header_init(&mh_tmp, mp_pck);
435
0
            if (resource) {
436
                /* look for OTel resource attributes */
437
0
                if (resource->n_attributes > 0 && resource->attributes) {
438
0
                    flb_mp_map_header_append(&mh_tmp);
439
0
                    msgpack_pack_str(mp_pck, 10);
440
0
                    msgpack_pack_str_body(mp_pck, "attributes", 10);
441
442
0
                    ret = otel_pack_kvarray(mp_pck,
443
0
                                            resource->attributes,
444
0
                                            resource->n_attributes);
445
0
                    if (ret != 0) {
446
0
                        goto binary_payload_to_msgpack_end;
447
0
                    }
448
0
                }
449
450
0
                if (resource->dropped_attributes_count > 0) {
451
0
                    flb_mp_map_header_append(&mh_tmp);
452
0
                    msgpack_pack_str(mp_pck, 24);
453
0
                    msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
454
0
                    msgpack_pack_uint64(mp_pck, resource->dropped_attributes_count);
455
0
                }
456
457
0
                if (resource_log->schema_url) {
458
0
                    flb_mp_map_header_append(&mh_tmp);
459
0
                    msgpack_pack_str(mp_pck, 10);
460
0
                    msgpack_pack_str_body(mp_pck, "schema_url", 10);
461
462
0
                    len = strlen(resource_log->schema_url);
463
0
                    msgpack_pack_str(mp_pck, len);
464
0
                    msgpack_pack_str_body(mp_pck, resource_log->schema_url, len);
465
0
                }
466
0
            }
467
0
            flb_mp_map_header_end(&mh_tmp);
468
469
            /* scope */
470
0
            flb_mp_map_header_append(&mh);
471
0
            msgpack_pack_str(mp_pck, 5);
472
0
            msgpack_pack_str_body(mp_pck, "scope", 5);
473
474
            /* Scope */
475
0
            scope = scope_log->scope;
476
0
            scope_has_schema_url = FLB_FALSE;
477
478
0
            if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) {
479
0
                scope_has_schema_url = FLB_TRUE;
480
0
            }
481
482
0
            if (scope && (scope->name || scope->version ||
483
0
                          scope->n_attributes > 0 || scope->dropped_attributes_count > 0 ||
484
0
                          scope_has_schema_url == FLB_TRUE)) {
485
0
                flb_mp_map_header_init(&mh_tmp, mp_pck);
486
487
0
                if (scope_has_schema_url == FLB_TRUE) {
488
0
                    flb_mp_map_header_append(&mh_tmp);
489
0
                    msgpack_pack_str(mp_pck, 10);
490
0
                    msgpack_pack_str_body(mp_pck, "schema_url", 10);
491
492
0
                    len = strlen(scope_log->schema_url);
493
0
                    msgpack_pack_str(mp_pck, len);
494
0
                    msgpack_pack_str_body(mp_pck, scope_log->schema_url, len);
495
0
                }
496
497
0
                if (scope->name && strlen(scope->name) > 0) {
498
0
                    flb_mp_map_header_append(&mh_tmp);
499
0
                    msgpack_pack_str(mp_pck, 4);
500
0
                    msgpack_pack_str_body(mp_pck, "name", 4);
501
502
0
                    len = strlen(scope->name);
503
0
                    msgpack_pack_str(mp_pck, len);
504
0
                    msgpack_pack_str_body(mp_pck, scope->name, len);
505
0
                }
506
0
                if (scope->version && strlen(scope->version) > 0) {
507
0
                    flb_mp_map_header_append(&mh_tmp);
508
509
0
                    msgpack_pack_str(mp_pck, 7);
510
0
                    msgpack_pack_str_body(mp_pck, "version", 7);
511
512
0
                    len = strlen(scope->version);
513
0
                    msgpack_pack_str(mp_pck, len);
514
0
                    msgpack_pack_str_body(mp_pck, scope->version, len);
515
0
                }
516
517
0
                if (scope->n_attributes > 0 && scope->attributes) {
518
0
                    flb_mp_map_header_append(&mh_tmp);
519
0
                    msgpack_pack_str(mp_pck, 10);
520
0
                    msgpack_pack_str_body(mp_pck, "attributes", 10);
521
0
                    ret = otel_pack_kvarray(mp_pck,
522
0
                                            scope->attributes,
523
0
                                            scope->n_attributes);
524
0
                    if (ret != 0) {
525
0
                        goto binary_payload_to_msgpack_end;
526
0
                    }
527
0
                }
528
529
0
                if (scope->dropped_attributes_count > 0) {
530
0
                    flb_mp_map_header_append(&mh_tmp);
531
0
                    msgpack_pack_str(mp_pck, 24);
532
0
                    msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
533
0
                    msgpack_pack_uint64(mp_pck, scope->dropped_attributes_count);
534
0
                }
535
536
0
                flb_mp_map_header_end(&mh_tmp);
537
0
            }
538
0
            else {
539
0
                flb_mp_map_header_init(&mh_tmp, mp_pck);
540
541
0
                if (scope_has_schema_url == FLB_TRUE) {
542
0
                    flb_mp_map_header_append(&mh_tmp);
543
0
                    msgpack_pack_str(mp_pck, 10);
544
0
                    msgpack_pack_str_body(mp_pck, "schema_url", 10);
545
546
0
                    len = strlen(scope_log->schema_url);
547
0
                    msgpack_pack_str(mp_pck, len);
548
0
                    msgpack_pack_str_body(mp_pck, scope_log->schema_url, len);
549
0
                }
550
551
0
                flb_mp_map_header_end(&mh_tmp);
552
0
            }
553
554
0
            flb_mp_map_header_end(&mh);
555
556
0
            ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
557
0
            if (ret != FLB_EVENT_ENCODER_SUCCESS) {
558
0
                flb_plg_error(ctx->ins, "could not set group content metadata: %s",
559
0
                              flb_log_event_encoder_get_error_description(ret));
560
0
                goto binary_payload_to_msgpack_end;
561
0
            }
562
563
0
            flb_log_event_encoder_group_header_end(encoder);
564
565
0
            for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
566
0
                ret = flb_log_event_encoder_begin_record(encoder);
567
568
0
                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
569
0
                    if (log_records[log_record_index]->time_unix_nano > 0) {
570
0
                        flb_time_from_uint64(&tm, log_records[log_record_index]->time_unix_nano);
571
0
                        ret = flb_log_event_encoder_set_timestamp(encoder, &tm);
572
0
                    }
573
0
                    else if (log_records[log_record_index]->observed_time_unix_nano > 0) {
574
0
                        flb_time_from_uint64(&tm, log_records[log_record_index]->observed_time_unix_nano);
575
0
                        ret = flb_log_event_encoder_set_timestamp(encoder, &tm);
576
0
                    }
577
0
                    else {
578
0
                        flb_time_get(&tm);
579
0
                        ret = flb_log_event_encoder_set_timestamp(encoder, &tm);
580
0
                    }
581
0
                }
582
583
0
                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
584
0
                    ret = flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
585
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
586
0
                        flb_plg_error(ctx->ins, "failed to reset log event metadata: %s",
587
0
                                      flb_log_event_encoder_get_error_description(ret));
588
0
                        ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
589
0
                    }
590
0
                    else {
591
0
                        ret = otel_pack_v1_metadata(ctx,
592
0
                                                    mp_pck_meta,
593
0
                                                    log_records[log_record_index],
594
0
                                                    resource,
595
0
                                                    scope_log->scope);
596
0
                    }
597
598
0
                    if (ret != 0) {
599
0
                        flb_plg_error(ctx->ins, "failed to convert log record");
600
0
                        ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
601
0
                    }
602
0
                    else {
603
0
                        ret = flb_log_event_encoder_dynamic_field_flush(&encoder->metadata);
604
0
                    }
605
0
                }
606
607
0
                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
608
0
                    ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
609
0
                    if (ret != FLB_EVENT_ENCODER_SUCCESS) {
610
0
                        flb_plg_error(ctx->ins, "failed to reset log event body: %s",
611
0
                                      flb_log_event_encoder_get_error_description(ret));
612
0
                        ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
613
0
                    }
614
0
                    else if (ctx->logs_body_key == NULL &&
615
0
                             log_records[log_record_index]->body != NULL &&
616
0
                             log_records[log_record_index]->body->value_case ==
617
0
                             OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
618
0
                        ret = otlp_pack_any_value(
619
0
                                mp_pck,
620
0
                                log_records[log_record_index]->body);
621
0
                    }
622
0
                    else {
623
0
                        logs_body_key = ctx->logs_body_key;
624
0
                        if (logs_body_key == NULL) {
625
0
                            logs_body_key = "log";
626
0
                        }
627
0
                        ret = msgpack_pack_map(mp_pck, 1);
628
0
                        if (ret == 0) {
629
0
                            ret = msgpack_pack_str(mp_pck, strlen(logs_body_key));
630
0
                        }
631
0
                        if (ret == 0) {
632
0
                            ret = msgpack_pack_str_body(mp_pck,
633
0
                                                        logs_body_key,
634
0
                                                        strlen(logs_body_key));
635
0
                        }
636
0
                        if (ret == 0) {
637
0
                            ret = otlp_pack_any_value(
638
0
                                    mp_pck,
639
0
                                    log_records[log_record_index]->body);
640
0
                        }
641
0
                    }
642
643
0
                    if (ret != 0) {
644
0
                        flb_plg_error(ctx->ins, "failed to convert log record body");
645
0
                        ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
646
0
                    }
647
0
                    else {
648
0
                        ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
649
0
                    }
650
0
                }
651
652
0
                if (ret == FLB_EVENT_ENCODER_SUCCESS) {
653
0
                    ret = flb_log_event_encoder_commit_record(encoder);
654
0
                }
655
0
                else {
656
0
                    flb_plg_error(ctx->ins, "marshalling error");
657
0
                    goto binary_payload_to_msgpack_end;
658
0
                }
659
0
            }
660
661
0
            flb_log_event_encoder_group_end(encoder);
662
663
0
        }
664
0
    }
665
666
0
 binary_payload_to_msgpack_end:
667
0
    if (input_logs) {
668
0
        opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
669
0
                                            input_logs, NULL);
670
0
    }
671
672
0
    if (ret != 0) {
673
0
        return -1;
674
0
    }
675
676
0
    return 0;
677
0
}
678
679
/*
680
 * Main function used from opentelemetry_prot.c to process logs either in JSON or Protobuf format.
681
 * -----------------------------------------------------------------------------------------------
682
 */
683
int opentelemetry_process_logs(struct flb_opentelemetry *ctx,
684
                               flb_sds_t content_type,
685
                               flb_sds_t tag,
686
                               size_t tag_len,
687
                               void *data, size_t size)
688
0
{
689
0
    int ret = -1;
690
0
    int is_proto = FLB_FALSE; /* default to JSON */
691
0
    int error_status = 0;
692
0
    char *buf;
693
0
    uint8_t *payload;
694
0
    uint64_t payload_size;
695
0
    struct flb_log_event_encoder *encoder;
696
697
0
    buf = (char *) data;
698
0
    payload = data;
699
0
    payload_size = size;
700
701
    /* Detect the type of payload */
702
0
    if (content_type) {
703
0
        if (opentelemetry_is_json_content_type(content_type) == FLB_TRUE) {
704
0
            if (opentelemetry_payload_starts_with_json_object(buf, size) != FLB_TRUE) {
705
0
                flb_plg_error(ctx->ins, "Invalid JSON payload");
706
0
                return -1;
707
0
            }
708
0
            is_proto = FLB_FALSE;
709
0
        }
710
0
        else if (opentelemetry_is_protobuf_content_type(content_type) == FLB_TRUE) {
711
0
            is_proto = FLB_TRUE;
712
0
        }
713
0
        else {
714
0
            flb_plg_error(ctx->ins, "Unsupported content type %s", content_type);
715
0
            return -1;
716
0
        }
717
0
    }
718
719
0
    encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2);
720
0
    if (encoder == NULL) {
721
0
        return -1;
722
0
    }
723
724
0
    if (is_proto == FLB_TRUE) {
725
0
        ret = binary_payload_to_msgpack(ctx, encoder,
726
0
                                        tag, tag_len,
727
0
                                        (uint8_t *) payload, payload_size);
728
0
        if (ret < 0) {
729
0
            flb_plg_error(ctx->ins, "failed to process logs from protobuf payload");
730
0
        }
731
0
    }
732
0
    else {
733
0
        ret = flb_opentelemetry_logs_json_to_msgpack(encoder,
734
0
                                                     (const char *) payload, payload_size,
735
0
                                                     ctx->logs_body_key,
736
0
                                                     &error_status);
737
0
        if (ret != 0) {
738
            /* we are printing the error for now, let's see what is the user's preference later */
739
0
            flb_plg_error(ctx->ins, "failed to process logs from JSON payload (%i) %s",
740
0
                          error_status,
741
0
                          flb_opentelemetry_error_to_string(error_status));
742
0
        }
743
744
0
    }
745
746
0
    if (ret >= 0) {
747
0
        if (opentelemetry_uses_worker_ingress_queue(ctx)) {
748
0
            size_t allocation_size;
749
0
            void *resized_buffer;
750
751
0
            allocation_size = encoder->buffer.alloc;
752
753
0
            if (allocation_size > encoder->output_length) {
754
0
                resized_buffer = flb_realloc(encoder->output_buffer,
755
0
                                             encoder->output_length);
756
0
                if (resized_buffer != NULL) {
757
0
                    encoder->buffer.data = resized_buffer;
758
0
                    encoder->output_buffer = resized_buffer;
759
0
                    encoder->buffer.alloc = encoder->output_length;
760
0
                    allocation_size = encoder->output_length;
761
0
                }
762
0
            }
763
764
0
            ret = opentelemetry_ingest_logs_take(ctx,
765
0
                                                 tag,
766
0
                                                 flb_sds_len(tag),
767
0
                                                 encoder->output_buffer,
768
0
                                                 encoder->output_length,
769
0
                                                 allocation_size);
770
0
            if (ret == 0 || ret == FLB_INPUT_INGRESS_BUSY) {
771
0
                flb_log_event_encoder_claim_internal_buffer_ownership(encoder);
772
0
            }
773
0
        }
774
0
        else {
775
0
            ret = opentelemetry_ingest_logs(ctx,
776
0
                                            tag,
777
0
                                            flb_sds_len(tag),
778
0
                                            encoder->output_buffer,
779
0
                                            encoder->output_length);
780
0
        }
781
782
0
        if (ret != 0) {
783
0
            flb_plg_error(ctx->ins, "failed to append logs to the input buffer");
784
0
        }
785
0
    }
786
787
0
    flb_log_event_encoder_destroy(encoder);
788
0
    return ret;
789
0
}