Coverage Report

Created: 2026-03-09 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_opentelemetry/opentelemetry_traces.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_sds.h>
22
#include <fluent-bit/flb_pack.h>
23
#include <fluent-bit/flb_opentelemetry.h>
24
25
#include <ctraces/ctraces.h>
26
#include <ctraces/ctr_encode_text.h>
27
28
#include "opentelemetry.h"
29
#include "opentelemetry_traces.h"
30
#include "opentelemetry_utils.h"
31
32
int opentelemetry_traces_process_protobuf(struct flb_opentelemetry *ctx,
33
                                          flb_sds_t tag,
34
                                          size_t tag_len,
35
                                          void *data, size_t size)
36
0
{
37
0
    struct ctrace *decoded_context;
38
0
    size_t         offset;
39
0
    int            result;
40
41
0
    offset = 0;
42
0
    result = ctr_decode_opentelemetry_create(&decoded_context,
43
0
                                             data, size,
44
0
                                             &offset);
45
0
    if (result == 0) {
46
0
        result = flb_input_trace_append(ctx->ins, tag, tag_len, decoded_context);
47
0
        if (result == -1) {
48
0
            ctr_destroy(decoded_context);
49
0
        }
50
0
    }
51
52
0
    return result;
53
0
}
54
55
56
57
58
59
static int process_json(struct flb_opentelemetry *ctx,
60
                        char *tag, size_t tag_len,
61
                        const char *body, size_t len)
62
0
{
63
0
    int result = -1;
64
0
    int error_status = 0;
65
0
    struct ctrace *ctr;
66
67
    /* Use the new centralized API for JSON to ctrace conversion */
68
0
    ctr = flb_opentelemetry_json_traces_to_ctrace(body, len, &error_status);
69
0
    if (ctr) {
70
0
        result = flb_input_trace_append(ctx->ins, tag, tag_len, ctr);
71
0
        if (result == -1) {
72
0
            ctr_destroy(ctr);
73
0
        }
74
0
    }
75
0
    else {
76
0
        flb_plg_error(ctx->ins, "invalid JSON trace: conversion error (status: %d)", error_status);
77
0
    }
78
79
0
    return result;
80
0
}
81
82
static int opentelemetry_traces_process_json(struct flb_opentelemetry *ctx,
83
                                             flb_sds_t tag, size_t tag_len,
84
                                             char *data, size_t size)
85
0
{
86
0
    int ret;
87
88
0
    ret = process_json(ctx, tag, tag_len, data, size);
89
90
0
    return ret;
91
0
}
92
93
/*
94
 * This interface was the first approach to take traces in JSON and ingest them as logs,
95
 * we are not sure if it is still in use, but we are keeping it for now
96
 */
97
int opentelemetry_traces_process_raw_traces(struct flb_opentelemetry *ctx,
98
                                            flb_sds_t tag,
99
                                            size_t tag_len,
100
                                            void *data, size_t size)
101
0
{
102
0
    int ret;
103
0
    int root_type;
104
0
    char *out_buf = NULL;
105
0
    size_t out_size;
106
107
0
    msgpack_packer mp_pck;
108
0
    msgpack_sbuffer mp_sbuf;
109
110
0
    msgpack_sbuffer_init(&mp_sbuf);
111
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
112
113
0
    msgpack_pack_array(&mp_pck, 2);
114
0
    flb_pack_time_now(&mp_pck);
115
116
    /* Check if the incoming payload is a valid  message and convert it to msgpack */
117
0
    ret = flb_pack_json(data, size,
118
0
                        &out_buf, &out_size, &root_type, NULL);
119
120
0
    if (ret == 0 && root_type == JSMN_OBJECT) {
121
        /* JSON found, pack it msgpack representation */
122
0
        msgpack_sbuffer_write(&mp_sbuf, out_buf, out_size);
123
0
    }
124
0
    else {
125
        /* the content might be a binary payload or invalid JSON */
126
0
        msgpack_pack_map(&mp_pck, 1);
127
0
        msgpack_pack_str_with_body(&mp_pck, "trace", 5);
128
0
        msgpack_pack_str_with_body(&mp_pck, data, size);
129
0
    }
130
131
    /* release 'out_buf' if it was allocated */
132
0
    if (out_buf) {
133
0
        flb_free(out_buf);
134
0
    }
135
136
0
    flb_input_log_append(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
137
0
    msgpack_sbuffer_destroy(&mp_sbuf);
138
139
0
    return 0;
140
0
}
141
142
int opentelemetry_process_traces(struct flb_opentelemetry *ctx,
143
                                 flb_sds_t content_type,
144
                                 flb_sds_t tag,
145
                                 size_t tag_len,
146
                                 void *data, size_t size)
147
0
{
148
0
    int ret = -1;
149
0
    int is_proto = FLB_FALSE; /* default to JSON */
150
0
    char *buf;
151
0
    char *payload;
152
0
    uint64_t payload_size;
153
154
0
    buf = (char *) data;
155
156
0
    payload = buf;
157
0
    payload_size = size;
158
159
    /* Detect the type of payload */
160
0
    if (content_type) {
161
0
        if (opentelemetry_is_json_content_type(content_type) == FLB_TRUE) {
162
0
            if (opentelemetry_payload_starts_with_json_object(buf, size) != FLB_TRUE) {
163
0
                flb_plg_error(ctx->ins, "Invalid JSON payload");
164
0
                return -1;
165
0
            }
166
167
0
            is_proto = FLB_FALSE;
168
0
        }
169
0
        else if (opentelemetry_is_protobuf_content_type(content_type) == FLB_TRUE) {
170
0
            is_proto = FLB_TRUE;
171
0
        }
172
0
        else {
173
0
            flb_plg_error(ctx->ins, "Unsupported content type %s", content_type);
174
0
            return -1;
175
0
        }
176
0
    }
177
178
0
    if (is_proto == FLB_TRUE) {
179
0
        ret = opentelemetry_traces_process_protobuf(ctx,
180
0
                                                    tag, tag_len,
181
0
                                                    payload, payload_size);
182
0
    }
183
0
    else {
184
0
        if (ctx->raw_traces) {
185
0
            ret = opentelemetry_traces_process_raw_traces(ctx,
186
0
                                                          tag, tag_len,
187
0
                                                          payload, payload_size);
188
0
        }
189
0
        else {
190
            /* The content is likely OTel JSON */
191
0
            ret = opentelemetry_traces_process_json(ctx,
192
0
                                                    tag, tag_len,
193
0
                                                    payload, payload_size);
194
0
        }
195
0
    }
196
197
0
    return ret;
198
0
}