Coverage Report

Created: 2026-01-21 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/in_opentelemetry/http_conn.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_input_plugin.h>
21
#include <fluent-bit/flb_engine.h>
22
#include <fluent-bit/flb_downstream.h>
23
24
#include "opentelemetry.h"
25
#include "http_conn.h"
26
#include "opentelemetry_prot.h"
27
28
static void opentelemetry_conn_request_init(struct mk_http_session *session,
29
                                   struct mk_http_request *request);
30
31
static int opentelemetry_conn_buffer_realloc(struct flb_opentelemetry *ctx,
32
                                             struct http_conn *conn, size_t size)
33
0
{
34
0
    char *tmp;
35
36
    /* Perform realloc */
37
0
    tmp = flb_realloc(conn->buf_data, size);
38
0
    if (!tmp) {
39
0
        flb_errno();
40
0
        flb_plg_error(ctx->ins, "could not perform realloc for size %zu", size);
41
0
        return -1;
42
0
    }
43
44
    /* Update buffer info */
45
0
    conn->buf_data = tmp;
46
0
    conn->buf_size = size;
47
48
    /* Keep NULL termination */
49
0
    conn->buf_data[conn->buf_len] = '\0';
50
51
    /* Reset parser state */
52
0
    mk_http_parser_init(&conn->session.parser);
53
54
0
    return 0;
55
0
}
56
57
static int opentelemetry_conn_event(void *data)
58
0
{
59
0
    int ret;
60
0
    int status;
61
0
    size_t size;
62
0
    ssize_t available;
63
0
    ssize_t bytes;
64
0
    size_t request_len;
65
0
    struct http_conn *conn;
66
0
    struct mk_event *event;
67
0
    struct flb_opentelemetry *ctx;
68
0
    struct flb_connection *connection;
69
70
0
    connection = (struct flb_connection *) data;
71
72
0
    conn = connection->user_data;
73
74
0
    ctx = conn->ctx;
75
76
0
    event = &connection->event;
77
78
0
    if (event->mask & MK_EVENT_READ) {
79
0
        available = (conn->buf_size - conn->buf_len) - 1;
80
0
        if (available < 1) {
81
0
            if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) {
82
0
                flb_plg_trace(ctx->ins,
83
0
                              "fd=%i incoming data exceed limit (%zu KB)",
84
0
                              event->fd, (ctx->buffer_max_size / 1024));
85
0
                opentelemetry_conn_del(conn);
86
0
                return -1;
87
0
            }
88
89
0
            size = conn->buf_size + ctx->buffer_chunk_size;
90
0
            ret = opentelemetry_conn_buffer_realloc(ctx, conn, size);
91
0
            if (ret == -1) {
92
0
                flb_errno();
93
0
                opentelemetry_conn_del(conn);
94
0
                return -1;
95
0
            }
96
97
0
            flb_plg_trace(ctx->ins, "fd=%i buffer realloc %i -> %zu",
98
0
                          event->fd, conn->buf_size, size);
99
100
0
            available = (conn->buf_size - conn->buf_len) - 1;
101
0
        }
102
103
        /* Read data */
104
0
        bytes = flb_io_net_read(connection,
105
0
                                (void *) &conn->buf_data[conn->buf_len],
106
0
                                available);
107
108
0
        if (bytes <= 0) {
109
0
            flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
110
0
            opentelemetry_conn_del(conn);
111
0
            return -1;
112
0
        }
113
114
0
        flb_plg_trace(ctx->ins, "read()=%zi pre_len=%i now_len=%zi",
115
0
                      bytes, conn->buf_len, conn->buf_len + bytes);
116
0
        conn->buf_len += bytes;
117
0
        conn->buf_data[conn->buf_len] = '\0';
118
119
0
        status = mk_http_parser(&conn->request, &conn->session.parser,
120
0
                                conn->buf_data, conn->buf_len, conn->session.server);
121
122
0
        if (status == MK_HTTP_PARSER_OK) {
123
            /* Do more logic parsing and checks for this request */
124
0
            opentelemetry_prot_handle(ctx, conn, &conn->session, &conn->request);
125
126
            /*
127
             * Evict the processed request from the connection buffer and reinitialize
128
             * the HTTP parser.
129
             */
130
131
            /* Use the last parser position as the request length */
132
0
            request_len = mk_http_parser_request_size(&conn->session.parser,
133
0
                                                      conn->buf_data,
134
0
                                                      conn->buf_len);
135
136
0
            if (request_len == -1 || (request_len > conn->buf_len)) {
137
                /* Unexpected but let's make sure things are safe */
138
0
                conn->buf_len = 0;
139
0
                flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection");
140
0
                opentelemetry_conn_del(conn);
141
0
                return -1;
142
0
            }
143
144
            /* If we have extra bytes in our bytes, adjust the extra bytes */
145
0
            if (0 < (conn->buf_len - request_len)) {
146
0
                memmove(conn->buf_data, &conn->buf_data[request_len],
147
0
                        conn->buf_len - request_len);
148
149
0
                conn->buf_data[conn->buf_len - request_len] = '\0';
150
0
                conn->buf_len -= request_len;
151
0
            }
152
0
            else {
153
0
                memset(conn->buf_data, 0, request_len);
154
0
                conn->buf_len = 0;
155
0
            }
156
157
            /* Reinitialize the parser so the next request is properly
158
                * handled, the additional memset intends to wipe any left over data
159
                * from the headers parsed in the previous request.
160
                */
161
0
            memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
162
0
            mk_http_parser_init(&conn->session.parser);
163
0
            opentelemetry_conn_request_init(&conn->session, &conn->request);
164
0
        }
165
0
        else if (status == MK_HTTP_PARSER_ERROR) {
166
0
            opentelemetry_prot_handle_error(ctx, conn, &conn->session, &conn->request);
167
168
            /* Reinitialize the parser so the next request is properly
169
             * handled, the additional memset intends to wipe any left over data
170
             * from the headers parsed in the previous request.
171
             */
172
0
            memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
173
0
            mk_http_parser_init(&conn->session.parser);
174
0
            opentelemetry_conn_request_init(&conn->session, &conn->request);
175
0
        }
176
177
        /* FIXME: add Protocol handler here */
178
0
        return bytes;
179
0
    }
180
181
0
    if (event->mask & MK_EVENT_CLOSE) {
182
0
        flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
183
0
        opentelemetry_conn_del(conn);
184
0
        return -1;
185
0
    }
186
187
0
    return 0;
188
189
0
}
190
191
static void opentelemetry_conn_session_init(struct mk_http_session *session,
192
                                            struct mk_server *server,
193
                                            int client_fd)
194
0
{
195
    /* Alloc memory for node */
196
0
    session->_sched_init = MK_TRUE;
197
0
    session->pipelined   = MK_FALSE;
198
0
    session->counter_connections = 0;
199
0
    session->close_now = MK_FALSE;
200
0
    session->status = MK_REQUEST_STATUS_INCOMPLETE;
201
0
    session->server = server;
202
0
    session->socket = client_fd;
203
204
    /* creation time in unix time */
205
0
    session->init_time = time(NULL);
206
207
0
    session->channel = mk_channel_new(MK_CHANNEL_SOCKET, session->socket);
208
0
    session->channel->io = session->server->network;
209
210
    /* Init session request list */
211
0
    mk_list_init(&session->request_list);
212
213
    /* Initialize the parser */
214
0
    mk_http_parser_init(&session->parser);
215
0
}
216
217
static void opentelemetry_conn_request_init(struct mk_http_session *session,
218
                                            struct mk_http_request *request)
219
0
{
220
0
    memset(request, 0, sizeof(struct mk_http_request));
221
222
0
    mk_http_request_init(session, request, session->server);
223
224
0
    request->in_headers.type        = MK_STREAM_IOV;
225
0
    request->in_headers.dynamic     = MK_FALSE;
226
0
    request->in_headers.cb_consumed = NULL;
227
0
    request->in_headers.cb_finished = NULL;
228
0
    request->in_headers.stream      = &request->stream;
229
230
0
    mk_list_add(&request->in_headers._head, &request->stream.inputs);
231
232
0
    request->session = session;
233
0
}
234
235
struct http_conn *opentelemetry_conn_add(struct flb_connection *connection,
236
                                         struct flb_opentelemetry *ctx)
237
0
{
238
0
    struct http_conn *conn;
239
0
    int               ret;
240
241
0
    conn = flb_calloc(1, sizeof(struct http_conn));
242
0
    if (!conn) {
243
0
        flb_errno();
244
0
        return NULL;
245
0
    }
246
0
    conn->connection = connection;
247
248
    /* Set data for the event-loop */
249
0
    MK_EVENT_NEW(&connection->event);
250
251
0
    connection->user_data     = conn;
252
0
    connection->event.type    = FLB_ENGINE_EV_CUSTOM;
253
0
    connection->event.handler = opentelemetry_conn_event;
254
255
    /* Connection info */
256
0
    conn->ctx     = ctx;
257
0
    conn->buf_len = 0;
258
259
0
    conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
260
0
    if (!conn->buf_data) {
261
0
        flb_errno();
262
0
        flb_plg_error(ctx->ins, "could not allocate new connection");
263
0
        flb_free(conn);
264
0
        return NULL;
265
0
    }
266
0
    conn->buf_size = ctx->buffer_chunk_size;
267
268
    /* Register instance into the event loop */
269
0
    ret = mk_event_add(flb_engine_evl_get(),
270
0
                       connection->fd,
271
0
                       FLB_ENGINE_EV_CUSTOM,
272
0
                       MK_EVENT_READ,
273
0
                       &connection->event);
274
0
    if (ret == -1) {
275
0
        flb_plg_error(ctx->ins, "could not register new connection");
276
0
        flb_free(conn->buf_data);
277
0
        flb_free(conn);
278
0
        return NULL;
279
0
    }
280
281
    /* Initialize HTTP Session: this is a custom context for Monkey HTTP */
282
0
    opentelemetry_conn_session_init(&conn->session, ctx->server, connection->fd);
283
284
    /* Initialize HTTP Request: this is the initial request and it will be reinitialized
285
     * automatically after the request is handled so it can be used for the next one.
286
     */
287
0
    opentelemetry_conn_request_init(&conn->session, &conn->request);
288
289
    /* Link connection node to parent context list */
290
0
    mk_list_add(&conn->_head, &ctx->connections);
291
0
    return conn;
292
0
}
293
294
int opentelemetry_conn_del(struct http_conn *conn)
295
0
{
296
0
    if (conn->session.channel != NULL) {
297
0
        mk_channel_release(conn->session.channel);
298
0
    }
299
300
    /* The downstream unregisters the file descriptor from the event-loop
301
     * so there's nothing to be done by the plugin
302
     */
303
0
    flb_downstream_conn_release(conn->connection);
304
305
0
    mk_list_del(&conn->_head);
306
307
0
    flb_free(conn->buf_data);
308
0
    flb_free(conn);
309
310
0
    return 0;
311
0
}
312
313
void opentelemetry_conn_release_all(struct flb_opentelemetry *ctx)
314
0
{
315
0
    struct mk_list *tmp;
316
0
    struct mk_list *head;
317
0
    struct http_conn *conn;
318
319
0
    mk_list_foreach_safe(head, tmp, &ctx->connections) {
320
        conn = mk_list_entry(head, struct http_conn, _head);
321
0
        opentelemetry_conn_del(conn);
322
0
    }
323
0
}