Coverage Report

Created: 2026-01-21 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_opentelemetry/opentelemetry.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
21
#include <fluent-bit/flb_input_plugin.h>
22
#include <fluent-bit/flb_downstream.h>
23
#include <fluent-bit/flb_network.h>
24
#include <fluent-bit/flb_config.h>
25
26
#include "http_conn.h"
27
#include "opentelemetry.h"
28
#include "opentelemetry_prot.h"
29
#include "opentelemetry_config.h"
30
31
/*
32
 * For a server event, the collection event means a new client have arrived, we
33
 * accept the connection and create a new TCP instance which will wait for
34
 * JSON map messages.
35
 */
36
static int in_opentelemetry_collect(struct flb_input_instance *ins,
37
                                    struct flb_config *config, void *in_context)
38
0
{
39
0
    struct flb_connection    *connection;
40
0
    struct http_conn         *conn;
41
0
    struct flb_opentelemetry *ctx;
42
43
0
    ctx = in_context;
44
45
0
    connection = flb_downstream_conn_get(ctx->downstream);
46
47
0
    if (connection == NULL) {
48
0
        flb_plg_error(ctx->ins, "could not accept new connection");
49
50
0
        return -1;
51
0
    }
52
53
0
    flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);
54
55
0
    conn = opentelemetry_conn_add(connection, ctx);
56
57
0
    if (conn == NULL) {
58
0
        return -1;
59
0
    }
60
61
0
    return 0;
62
0
}
63
64
static int in_opentelemetry_init(struct flb_input_instance *ins,
65
                                 struct flb_config *config, void *data)
66
0
{
67
0
    unsigned short int        port;
68
0
    int                       ret;
69
0
    struct flb_opentelemetry *ctx;
70
71
0
    (void) data;
72
73
    /* Create context and basic conf */
74
0
    ctx = opentelemetry_config_create(ins);
75
0
    if (!ctx) {
76
0
        return -1;
77
0
    }
78
0
    ctx->collector_id = -1;
79
80
    /* Populate context with config map defaults and incoming properties */
81
0
    ret = flb_input_config_map_set(ins, (void *) ctx);
82
0
    if (ret == -1) {
83
0
        flb_plg_error(ctx->ins, "configuration error");
84
0
        opentelemetry_config_destroy(ctx);
85
0
        return -1;
86
0
    }
87
88
    /* Set the context */
89
0
    flb_input_set_context(ins, ctx);
90
91
0
    port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);
92
93
0
    if (ctx->enable_http2) {
94
0
        ret = flb_http_server_init(&ctx->http_server,
95
0
                                    HTTP_PROTOCOL_VERSION_AUTODETECT,
96
0
                                    (FLB_HTTP_SERVER_FLAG_KEEPALIVE | FLB_HTTP_SERVER_FLAG_AUTO_INFLATE),
97
0
                                    NULL,
98
0
                                    ins->host.listen,
99
0
                                    ins->host.port,
100
0
                                    ins->tls,
101
0
                                    ins->flags,
102
0
                                    &ins->net_setup,
103
0
                                    flb_input_event_loop_get(ins),
104
0
                                    ins->config,
105
0
                                    (void *) ctx);
106
107
0
        if (ret != 0) {
108
0
            flb_plg_error(ctx->ins,
109
0
                          "could not initialize http server on %s:%u. Aborting",
110
0
                          ins->host.listen, ins->host.port);
111
112
0
            opentelemetry_config_destroy(ctx);
113
114
0
            return -1;
115
0
        }
116
117
0
        ret = flb_http_server_start(&ctx->http_server);
118
119
0
        if (ret != 0) {
120
0
            flb_plg_error(ctx->ins,
121
0
                          "could not start http server on %s:%u. Aborting",
122
0
                          ins->host.listen, ins->host.port);
123
124
0
            opentelemetry_config_destroy(ctx);
125
126
0
            return -1;
127
0
        }
128
129
0
        flb_http_server_set_buffer_max_size(&ctx->http_server, ctx->buffer_max_size);
130
131
0
        ctx->http_server.request_callback = opentelemetry_prot_handle_ng;
132
133
0
        flb_input_downstream_set(ctx->http_server.downstream, ctx->ins);
134
0
    }
135
0
    else {
136
0
        ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
137
0
                                                ins->flags,
138
0
                                                ctx->listen,
139
0
                                                port,
140
0
                                                ins->tls,
141
0
                                                config,
142
0
                                                &ins->net_setup);
143
144
0
        if (ctx->downstream == NULL) {
145
0
            flb_plg_error(ctx->ins,
146
0
                        "could not initialize downstream on %s:%s. Aborting",
147
0
                        ctx->listen, ctx->tcp_port);
148
149
0
            opentelemetry_config_destroy(ctx);
150
151
0
            return -1;
152
0
        }
153
154
0
        flb_input_downstream_set(ctx->downstream, ctx->ins);
155
156
        /* Collect upon data available on the standard input */
157
0
        ret = flb_input_set_collector_socket(ins,
158
0
                                            in_opentelemetry_collect,
159
0
                                            ctx->downstream->server_fd,
160
0
                                            config);
161
0
        if (ret == -1) {
162
0
            flb_plg_error(ctx->ins, "Could not set collector for IN_TCP input plugin");
163
0
            opentelemetry_config_destroy(ctx);
164
0
            return -1;
165
0
        }
166
167
0
        ctx->collector_id = ret;
168
0
    }
169
170
0
    flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port);
171
172
0
    if (ctx->successful_response_code != 200 &&
173
0
        ctx->successful_response_code != 201 &&
174
0
        ctx->successful_response_code != 204) {
175
0
        flb_plg_error(ctx->ins, "%d is not supported response code. Use default 201",
176
0
                      ctx->successful_response_code);
177
0
        ctx->successful_response_code = 201;
178
0
    }
179
180
0
    return 0;
181
0
}
182
183
static int in_opentelemetry_exit(void *data, struct flb_config *config)
184
0
{
185
0
    struct flb_opentelemetry *ctx;
186
187
0
    (void) config;
188
189
0
    ctx = data;
190
191
0
    if (ctx != NULL) {
192
0
        opentelemetry_config_destroy(ctx);
193
0
    }
194
195
0
    return 0;
196
0
}
197
198
/* Configuration properties map */
199
static struct flb_config_map config_map[] = {
200
    {
201
     FLB_CONFIG_MAP_BOOL, "http2", "true",
202
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, enable_http2),
203
     "Enable HTTP/2 protocol support for the OpenTelemetry receiver"
204
    },
205
206
    {
207
     FLB_CONFIG_MAP_BOOL, "profiles_support", "false",
208
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, profile_support_enabled),
209
     "This is an experimental feature whoses specification is not stable yet, " \
210
     "feel free to test it but please do not enable this in production " \
211
     "environments"
212
    },
213
    {
214
     FLB_CONFIG_MAP_BOOL, "encode_profiles_as_log", "true",
215
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, encode_profiles_as_log),
216
     "Encode profiles received as text and ingest them in the logging pipeline"
217
    },
218
219
    {
220
     FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
221
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size),
222
     "Maximum size of the HTTP request buffer"
223
    },
224
225
    {
226
     FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", HTTP_BUFFER_CHUNK_SIZE,
227
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_chunk_size),
228
     "Size of each buffer chunk allocated for HTTP requests"
229
    },
230
231
    {
232
     FLB_CONFIG_MAP_STR, "tag_key", NULL,
233
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, tag_key),
234
     "Record accessor key to use for generating tags from incoming records"
235
    },
236
    {
237
     FLB_CONFIG_MAP_BOOL, "tag_from_uri", "true",
238
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, tag_from_uri),
239
     "If true, tag will be created from uri. e.g. v1_metrics from /v1/metrics ."
240
    },
241
    {
242
     FLB_CONFIG_MAP_INT, "successful_response_code", "201",
243
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, successful_response_code),
244
     "Set successful response code. 200, 201 and 204 are supported."
245
    },
246
    {
247
     FLB_CONFIG_MAP_BOOL, "raw_traces", "false",
248
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, raw_traces),
249
     "Forward traces without processing"
250
    },
251
252
    {
253
     FLB_CONFIG_MAP_STR, "logs_metadata_key", "otlp",
254
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, logs_metadata_key),
255
     "Key name to store OpenTelemetry logs metadata in the record"
256
    },
257
    {
258
     FLB_CONFIG_MAP_STR, "logs_body_key", NULL,
259
     0, FLB_TRUE, offsetof(struct flb_opentelemetry, logs_body_key),
260
     "Key to use for the logs body. If unset, body key-value pairs will be " \
261
     "used as the log record, and other types will be nested under a key."
262
    },
263
264
    /* EOF */
265
    {0}
266
};
267
268
/* Plugin reference */
269
struct flb_input_plugin in_opentelemetry_plugin = {
270
    .name         = "opentelemetry",
271
    .description  = "OpenTelemetry",
272
    .cb_init      = in_opentelemetry_init,
273
    .cb_pre_run   = NULL,
274
    .cb_collect   = in_opentelemetry_collect,
275
    .cb_flush_buf = NULL,
276
    .cb_pause     = NULL,
277
    .cb_resume    = NULL,
278
    .cb_exit      = in_opentelemetry_exit,
279
    .config_map   = config_map,
280
    .flags        = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
281
};