Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/src/http_server/flb_http_server.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_mem.h>
21
#include <fluent-bit/flb_input.h>
22
#include <fluent-bit/flb_engine.h>
23
#include <fluent-bit/flb_network.h>
24
#include <fluent-bit/flb_pthread.h>
25
#include <string.h>
26
27
#include <fluent-bit/http_server/flb_http_server.h>
28
#include <fluent-bit/http_server/flb_http_server_config_map.h>
29
30
#include <fluent-bit/flb_snappy.h>
31
#include <fluent-bit/flb_gzip.h>
32
#include <cfl/cfl_atomic.h>
33
34
/* PRIVATE */
35
36
struct flb_http_server_worker_context {
37
    struct flb_http_server parent;
38
    struct flb_http_server server;
39
    struct flb_net_setup net_setup;
40
    struct mk_event_loop *event_loop;
41
    pthread_t thread;
42
    pthread_mutex_t mutex;
43
    pthread_cond_t condition;
44
    int worker_id;
45
    uint64_t should_exit;
46
    int initialized;
47
    int thread_created;
48
    int startup_result;
49
};
50
51
struct flb_http_server_runtime {
52
    struct flb_http_server_worker_context *workers;
53
    int worker_count;
54
};
55
56
const struct flb_net_setup *
57
flb_http_server_runtime_worker_net_setup_get(struct flb_http_server *server,
58
                                             int worker_id)
59
0
{
60
0
    if (server == NULL ||
61
0
        server->runtime == NULL ||
62
0
        worker_id < 0 ||
63
0
        worker_id >= server->runtime->worker_count) {
64
0
        return NULL;
65
0
    }
66
67
0
    return &server->runtime->workers[worker_id].net_setup;
68
0
}
69
70
static void flb_http_server_runtime_stop(struct flb_http_server *session,
71
                                         struct flb_http_server_runtime *runtime);
72
73
static int flb_http_server_worker_context_reset(
74
    struct flb_http_server_worker_context *worker)
75
0
{
76
0
    int result;
77
78
0
    memset(worker, 0, sizeof(struct flb_http_server_worker_context));
79
80
0
    result = pthread_mutex_init(&worker->mutex, NULL);
81
0
    if (result != 0) {
82
0
        return result;
83
0
    }
84
85
0
    result = pthread_cond_init(&worker->condition, NULL);
86
0
    if (result != 0) {
87
0
        pthread_mutex_destroy(&worker->mutex);
88
0
        return result;
89
0
    }
90
91
0
    return 0;
92
0
}
93
94
static void flb_http_server_worker_context_cleanup(
95
    struct flb_http_server_worker_context *worker)
96
0
{
97
0
    pthread_mutex_destroy(&worker->mutex);
98
0
    pthread_cond_destroy(&worker->condition);
99
0
}
100
101
static int flb_http_server_running_on_caller_context(
102
    struct flb_http_server *session)
103
0
{
104
0
    return session->workers == 1 &&
105
0
           session->use_caller_event_loop == FLB_TRUE &&
106
0
           session->event_loop != NULL;
107
0
}
108
109
static const char *flb_http_server_get_alpn_string(struct flb_http_server *session)
110
0
{
111
0
    if (session == NULL) {
112
0
        return NULL;
113
0
    }
114
115
0
    if (session->protocol_version == HTTP_PROTOCOL_VERSION_AUTODETECT) {
116
0
        return "h2,http/1.1,http/1.0";
117
0
    }
118
119
0
    if (session->protocol_version >= HTTP_PROTOCOL_VERSION_20) {
120
0
        return "h2";
121
0
    }
122
123
0
    if (session->protocol_version == HTTP_PROTOCOL_VERSION_11) {
124
0
        return "http/1.1,http/1.0";
125
0
    }
126
127
0
    return "http/1.0";
128
0
}
129
130
static void flb_http_server_connection_drop(struct flb_connection *connection)
131
0
{
132
0
    struct flb_http_server_session *session;
133
134
0
    if (connection == NULL) {
135
0
        return;
136
0
    }
137
138
0
    session = connection->user_data;
139
140
0
    if (session != NULL &&
141
0
        session->connection == connection) {
142
0
        session->connection = NULL;
143
0
        session->drop_pending = FLB_FALSE;
144
0
    }
145
146
0
    connection->drop_notification_callback = NULL;
147
0
}
148
149
static void flb_http_server_reap_stale_sessions(struct flb_http_server *server)
150
0
{
151
0
    struct cfl_list                *iterator_backup;
152
0
    struct cfl_list                *iterator;
153
0
    struct flb_http_server_session *session;
154
155
0
    cfl_list_foreach_safe(iterator,
156
0
                          iterator_backup,
157
0
                          &server->clients) {
158
0
        session = cfl_list_entry(iterator,
159
0
                                 struct flb_http_server_session,
160
0
                                 _head);
161
162
0
        if (session->drop_pending == FLB_FALSE &&
163
0
            (session->connection == NULL ||
164
0
             session->connection->fd == FLB_INVALID_SOCKET)) {
165
0
            flb_http_server_session_destroy(session);
166
0
        }
167
0
    }
168
0
}
169
170
static size_t flb_http_server_client_count(struct flb_http_server *server)
171
0
{
172
0
    return cfl_list_size(&server->clients);
173
0
}
174
175
static int flb_http_server_apply_options(struct flb_http_server *session,
176
                                         struct flb_http_server_options *options)
177
0
{
178
0
    if (session == NULL || options == NULL) {
179
0
        return -1;
180
0
    }
181
182
0
    session->status = HTTP_SERVER_UNINITIALIZED;
183
0
    session->protocol_version = options->protocol_version;
184
0
    session->idle_timeout = options->idle_timeout;
185
0
    session->flags = options->flags;
186
0
    session->request_callback = options->request_callback;
187
0
    session->user_data = options->user_data;
188
189
0
    session->address = options->address;
190
0
    session->port = options->port;
191
0
    session->tls_provider = options->tls_provider;
192
0
    session->networking_flags = options->networking_flags;
193
0
    session->networking_setup = options->networking_setup;
194
0
    session->event_loop = options->event_loop;
195
0
    session->system_context = options->system_context;
196
197
0
    session->downstream = NULL;
198
0
    session->buffer_max_size = options->buffer_max_size;
199
0
    session->buffer_chunk_size = options->buffer_chunk_size;
200
0
    session->max_connections = options->max_connections;
201
0
    session->workers = options->workers;
202
0
    session->worker_id = 0;
203
0
    session->use_caller_event_loop = options->use_caller_event_loop;
204
0
    session->reuse_port = options->reuse_port;
205
0
    session->tls_alpn_configured = FLB_FALSE;
206
0
    session->cb_worker_init = options->cb_worker_init;
207
0
    session->cb_worker_exit = options->cb_worker_exit;
208
0
    session->runtime = NULL;
209
210
0
    cfl_list_init(&session->clients);
211
212
0
    MK_EVENT_NEW(&session->listener_event);
213
214
0
    session->status = HTTP_SERVER_INITIALIZED;
215
216
0
    return 0;
217
0
}
218
219
static int flb_http_server_session_read(struct flb_http_server_session *session)
220
0
{
221
0
    size_t sent;
222
0
    size_t read_buffer_size;
223
0
    ssize_t result;
224
0
    char *request_too_large = "HTTP/1.1 413 Request Entity Too Large\r\n"
225
0
                              "Content-Length: 0\r\n"
226
0
                              "Connection: close\r\n\r\n";
227
228
0
    if (session->read_buffer == NULL) {
229
0
        if (session->parent != NULL &&
230
0
            session->parent->buffer_chunk_size > 0) {
231
0
            read_buffer_size = session->parent->buffer_chunk_size;
232
0
        }
233
0
        else {
234
0
            read_buffer_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
235
0
        }
236
237
0
        session->read_buffer = flb_malloc(read_buffer_size);
238
0
        if (session->read_buffer == NULL) {
239
0
            flb_errno();
240
0
            return -1;
241
0
        }
242
243
0
        session->read_buffer_size = read_buffer_size;
244
0
    }
245
246
0
    result = flb_io_net_read(session->connection,
247
0
                             (void *) session->read_buffer,
248
0
                             session->read_buffer_size);
249
250
0
    if (result <= 0) {
251
0
        return -1;
252
0
    }
253
254
0
    result = (ssize_t) flb_http_server_session_ingest(session,
255
0
                                                      session->read_buffer,
256
0
                                                      result);
257
258
0
    if (result == HTTP_SERVER_BUFFER_LIMIT_EXCEEDED) {
259
0
        flb_io_net_write(session->connection,
260
0
                         (void *) request_too_large,
261
0
                         strlen(request_too_large),
262
0
                         &sent);
263
0
        return -1;
264
0
    }
265
0
    else if (result < 0) {
266
0
        return -1;
267
0
    }
268
269
0
    return 0;
270
0
}
271
272
static int flb_http_server_session_write(struct flb_http_server_session *session)
273
0
{
274
0
    size_t data_length;
275
0
    size_t data_sent;
276
0
    int    result;
277
278
0
    if (session == NULL) {
279
0
        return -1;
280
0
    }
281
282
0
    if (session->outgoing_data == NULL) {
283
0
        return 0;
284
0
    }
285
286
0
    data_length = cfl_sds_len(session->outgoing_data);
287
288
0
    if (data_length > 0) {
289
0
        result = flb_io_net_write(session->connection,
290
0
                                  (void *) session->outgoing_data,
291
0
                                  data_length,
292
0
                                  &data_sent);
293
294
0
        if (result == -1) {
295
0
            return -2;
296
0
        }
297
298
0
        if (data_sent < data_length) {
299
0
            memmove(session->outgoing_data,
300
0
                    &session->outgoing_data[data_sent],
301
0
                    data_length - data_sent);
302
303
0
            cfl_sds_set_len(session->outgoing_data,
304
0
                            data_length - data_sent);
305
0
        }
306
0
        else {
307
0
            cfl_sds_set_len(session->outgoing_data, 0);
308
0
        }
309
0
    }
310
311
0
    return 0;
312
0
}
313
314
static int flb_http_server_should_connection_be_closed(
315
    struct flb_http_request *request)
316
0
{
317
0
    char                            *connection_header_value;
318
0
    struct flb_http_server_session  *parent_session;
319
0
    struct flb_downstream           *downstream;
320
0
    int                              keepalive;
321
0
    struct flb_http_server          *server;
322
323
0
    keepalive = FLB_FALSE;
324
325
0
    parent_session = (struct flb_http_server_session *) request->stream->parent;
326
327
0
    server = parent_session->parent;
328
0
    downstream = server->downstream;
329
330
    /* Version behaviors implemented in the following block :
331
     * HTTP/0.9 keep-alive is opt-in
332
     * HTTP/1.0 keep-alive is opt-in
333
     * HTTP/1.1 keep-alive is opt-out
334
     * HTTP/2   keep-alive is "mandatory"
335
     */
336
337
0
    if (request->protocol_version >= HTTP_PROTOCOL_VERSION_20) {
338
        /* HTTP/2 always keeps the connection open */
339
0
        return FLB_FALSE;
340
0
    }
341
342
    /*
343
      * user config overrides any protocol defaults, this is set
344
      * with the option 'net.keepalive: off`. This override is only
345
      * effective less than HTTP/2.
346
      */
347
0
    if (!downstream->net_setup->keepalive) {
348
0
        return FLB_TRUE;
349
0
    }
350
351
    /* Set the defaults per protocol version */
352
0
    if (request->protocol_version == HTTP_PROTOCOL_VERSION_09) {
353
0
        keepalive = FLB_FALSE;
354
0
    }
355
0
    else if (request->protocol_version == HTTP_PROTOCOL_VERSION_10) {
356
0
        keepalive = FLB_FALSE;
357
0
    }
358
0
    else if (request->protocol_version == HTTP_PROTOCOL_VERSION_11) {
359
0
        keepalive = FLB_TRUE;
360
0
    }
361
362
    /* Override protocol defaults by checking connection header */
363
0
    connection_header_value = flb_http_request_get_header(request,
364
0
                                                          "connection");
365
0
    if (connection_header_value &&
366
0
        strcasecmp(connection_header_value, "keep-alive") == 0) {
367
0
        keepalive = FLB_TRUE;
368
0
    }
369
370
0
    if (keepalive) {
371
0
        return FLB_FALSE;
372
0
    }
373
374
0
    return FLB_TRUE;
375
0
}
376
377
static int flb_http_server_client_activity_event_handler(void *data)
378
0
{
379
0
    int                             close_connection;
380
0
    struct cfl_list                *backup_iterator;
381
0
    struct flb_connection          *connection;
382
0
    struct cfl_list                *iterator;
383
0
    struct flb_http_response       *response;
384
0
    struct flb_http_request        *request;
385
0
    struct flb_http_server_session *session;
386
0
    struct flb_http_server         *server;
387
0
    struct flb_http_stream         *stream;
388
0
    int                             result;
389
0
    struct mk_event                *event;
390
391
0
    connection = (struct flb_connection *) data;
392
0
    if (connection == NULL) {
393
0
        return -1;
394
0
    }
395
396
0
    event = &connection->event;
397
398
0
    session = (struct flb_http_server_session *) connection->user_data;
399
0
    if (session == NULL) {
400
0
        return -1;
401
0
    }
402
403
0
    if (session->connection == NULL ||
404
0
        session->connection->fd == FLB_INVALID_SOCKET) {
405
0
        session->drop_pending = FLB_FALSE;
406
0
        flb_http_server_session_destroy(session);
407
0
        return -1;
408
0
    }
409
410
0
    server = session->parent;
411
412
0
    if (event->mask & MK_EVENT_READ) {
413
0
        result = flb_http_server_session_read(session);
414
415
0
        if (result != 0) {
416
0
            flb_http_server_session_destroy(session);
417
418
0
            return -1;
419
0
        }
420
0
    }
421
422
0
    close_connection = FLB_FALSE;
423
424
0
    cfl_list_foreach_safe(iterator,
425
0
                            backup_iterator,
426
0
                            &session->request_queue) {
427
0
        request = cfl_list_entry(iterator, struct flb_http_request, _head);
428
429
0
        stream = (struct flb_http_stream *) request->stream;
430
431
0
        response = flb_http_response_begin(session, stream);
432
433
0
        if (request->body != NULL && request->content_length == 0) {
434
0
            request->content_length = cfl_sds_len(request->body);
435
0
        }
436
437
0
        if ((server->flags & FLB_HTTP_SERVER_FLAG_AUTO_INFLATE) != 0) {
438
0
            result = flb_http_request_uncompress_body(request);
439
440
0
            if (result != 0) {
441
0
                flb_http_server_session_destroy(session);
442
443
0
                return -1;
444
0
            }
445
0
        }
446
447
0
        if (server->request_callback != NULL) {
448
0
            result = server->request_callback(request, response);
449
0
        }
450
0
        else {
451
            /* Report */
452
0
        }
453
454
0
        close_connection = flb_http_server_should_connection_be_closed(request);
455
456
0
        flb_http_request_destroy(&stream->request);
457
0
        flb_http_response_destroy(&stream->response);
458
0
    }
459
460
0
    result = flb_http_server_session_write(session);
461
462
0
    if (result != 0) {
463
0
        flb_http_server_session_destroy(session);
464
465
0
        return -4;
466
0
    }
467
468
0
    if (close_connection) {
469
0
        flb_http_server_session_destroy(session);
470
0
    }
471
472
0
    return 0;
473
0
}
474
475
static int flb_http_server_client_connection_event_handler(void *data)
476
0
{
477
0
    struct flb_connection          *connection;
478
0
    struct flb_http_server_session *session;
479
0
    struct flb_http_server         *server;
480
0
    int                             result;
481
482
0
    server = (struct flb_http_server *) data;
483
484
0
    connection = flb_downstream_conn_get(server->downstream);
485
486
0
    if (connection == NULL) {
487
0
        return -1;
488
0
    }
489
490
0
    if (server->max_connections > 0) {
491
0
        flb_http_server_reap_stale_sessions(server);
492
493
0
        if (flb_http_server_client_count(server) >= server->max_connections) {
494
0
            flb_downstream_conn_release(connection);
495
496
0
            return -5;
497
0
        }
498
0
    }
499
500
0
    session = flb_http_server_session_create(server->protocol_version);
501
502
0
    if (session == NULL) {
503
0
        flb_downstream_conn_release(connection);
504
505
0
        return -2;
506
0
    }
507
508
0
    session->parent = server;
509
0
    session->connection = connection;
510
511
0
    if (session->version <= HTTP_PROTOCOL_VERSION_11) {
512
0
        session->http1.stream.user_data = server->user_data;
513
0
    }
514
515
0
    MK_EVENT_NEW(&connection->event);
516
517
0
    connection->user_data     = (void *) session;
518
0
    connection->drop_notification_callback = flb_http_server_connection_drop;
519
0
    connection->event.type    = FLB_ENGINE_EV_CUSTOM;
520
0
    connection->event.handler = flb_http_server_client_activity_event_handler;
521
522
0
    result = mk_event_add(server->event_loop,
523
0
                          connection->fd,
524
0
                          FLB_ENGINE_EV_CUSTOM,
525
0
                          MK_EVENT_READ,
526
0
                          &connection->event);
527
528
0
    if (result == -1) {
529
0
        flb_http_server_session_destroy(session);
530
531
0
        return -3;
532
0
    }
533
534
0
    cfl_list_add(&session->_head, &server->clients);
535
536
0
    result = flb_http_server_session_write(session);
537
538
0
    if (result != 0) {
539
0
        flb_http_server_session_destroy(session);
540
541
0
        return -4;
542
0
    }
543
544
0
    return 0;
545
0
}
546
547
static void flb_http_server_worker_maintenance(struct flb_config *config,
548
                                               void *data)
549
0
{
550
0
    struct flb_http_server_worker_context *worker;
551
552
0
    (void) config;
553
554
0
    worker = data;
555
556
0
    if (worker->server.downstream != NULL) {
557
0
        flb_downstream_conn_timeouts_stream(worker->server.downstream);
558
0
    }
559
560
0
    flb_http_server_reap_stale_sessions(&worker->server);
561
0
}
562
563
static int flb_http_server_worker_initialize(
564
    struct flb_http_server_worker_context *worker)
565
0
{
566
0
    int result;
567
0
    struct flb_http_server_options options;
568
569
0
    flb_http_server_options_init(&options);
570
571
0
    options.protocol_version = worker->parent.protocol_version;
572
0
    options.flags = worker->parent.flags;
573
0
    options.request_callback = worker->parent.request_callback;
574
0
    options.user_data = worker->parent.user_data;
575
0
    options.address = worker->parent.address;
576
0
    options.port = worker->parent.port;
577
0
    options.tls_provider = worker->parent.tls_provider;
578
0
    options.networking_flags = worker->parent.networking_flags;
579
0
    options.networking_setup = &worker->net_setup;
580
0
    options.event_loop = worker->event_loop;
581
0
    options.system_context = worker->parent.system_context;
582
0
    options.idle_timeout = worker->parent.idle_timeout;
583
0
    options.buffer_max_size = worker->parent.buffer_max_size;
584
0
    options.workers = 1;
585
0
    options.use_caller_event_loop = FLB_TRUE;
586
0
    options.reuse_port = worker->parent.reuse_port;
587
0
    options.cb_worker_init = worker->parent.cb_worker_init;
588
0
    options.cb_worker_exit = worker->parent.cb_worker_exit;
589
590
0
    result = flb_http_server_init_with_options(&worker->server, &options);
591
0
    if (result != 0) {
592
0
        return result;
593
0
    }
594
595
0
    result = flb_http_server_start(&worker->server);
596
0
    if (result != 0) {
597
0
        return result;
598
0
    }
599
600
0
    flb_downstream_thread_safe(worker->server.downstream);
601
602
0
    worker->server.worker_id = worker->worker_id;
603
0
    worker->server.workers = worker->parent.workers;
604
605
0
    return 0;
606
0
}
607
608
static void *flb_http_server_worker_thread(void *data)
609
0
{
610
0
    int result;
611
0
    uint64_t should_exit;
612
0
    struct mk_event *event;
613
0
    struct flb_net_dns dns_ctx = {0};
614
0
    struct flb_http_server_worker_context *worker;
615
616
0
    worker = data;
617
618
0
    worker->event_loop = mk_event_loop_create(256);
619
0
    if (worker->event_loop == NULL) {
620
0
        result = -1;
621
0
        goto signal_and_exit;
622
0
    }
623
624
0
    flb_engine_evl_init();
625
0
    flb_engine_evl_set(worker->event_loop);
626
627
0
    flb_net_dns_ctx_init();
628
0
    flb_net_ctx_init(&dns_ctx);
629
0
    flb_net_dns_ctx_set(&dns_ctx);
630
631
0
    result = flb_http_server_worker_initialize(worker);
632
633
0
signal_and_exit:
634
0
    pthread_mutex_lock(&worker->mutex);
635
0
    worker->startup_result = result;
636
0
    worker->initialized = FLB_TRUE;
637
0
    pthread_cond_signal(&worker->condition);
638
0
    pthread_mutex_unlock(&worker->mutex);
639
640
0
    if (result != 0) {
641
0
        goto cleanup;
642
0
    }
643
644
0
    while ((should_exit = cfl_atomic_load(&worker->should_exit)) == FLB_FALSE) {
645
0
        mk_event_wait_2(worker->event_loop, 250);
646
647
0
        mk_event_foreach(event, worker->event_loop) {
648
0
            if (event->type == FLB_ENGINE_EV_CUSTOM) {
649
0
                event->handler(event);
650
0
            }
651
0
        }
652
653
0
        flb_http_server_worker_maintenance(worker->parent.system_context,
654
0
                                           worker);
655
0
        flb_downstream_conn_pending_destroy(worker->server.downstream);
656
0
    }
657
658
0
cleanup:
659
0
    if (worker->server.status == HTTP_SERVER_RUNNING &&
660
0
        worker->server.cb_worker_exit != NULL) {
661
0
        worker->server.cb_worker_exit(&worker->server,
662
0
                                      worker->server.user_data);
663
0
        worker->server.cb_worker_exit = NULL;
664
0
    }
665
666
0
    return NULL;
667
0
}
668
669
static int flb_http_server_runtime_start(struct flb_http_server *session)
670
0
{
671
0
    const char *alpn;
672
0
    int index;
673
0
    int result;
674
0
    struct flb_http_server_runtime *runtime;
675
676
0
    runtime = flb_calloc(1, sizeof(struct flb_http_server_runtime));
677
0
    if (runtime == NULL) {
678
0
        flb_errno();
679
0
        return -1;
680
0
    }
681
682
0
    runtime->workers = flb_calloc(session->workers,
683
0
                                  sizeof(struct flb_http_server_worker_context));
684
0
    if (runtime->workers == NULL) {
685
0
        flb_errno();
686
0
        flb_free(runtime);
687
0
        return -1;
688
0
    }
689
690
0
    if (session->tls_provider != NULL &&
691
0
        session->tls_alpn_configured == FLB_FALSE) {
692
0
        alpn = flb_http_server_get_alpn_string(session);
693
0
        result = flb_tls_set_alpn(session->tls_provider, alpn);
694
695
0
        if (result != 0) {
696
0
            flb_free(runtime->workers);
697
0
            flb_free(runtime);
698
699
0
            return -1;
700
0
        }
701
702
0
        session->tls_alpn_configured = FLB_TRUE;
703
0
    }
704
705
0
    for (index = 0; index < session->workers; index++) {
706
0
        result = flb_http_server_worker_context_reset(&runtime->workers[index]);
707
0
        if (result != 0) {
708
0
            while (index > 0) {
709
0
                index--;
710
0
                flb_http_server_worker_context_cleanup(&runtime->workers[index]);
711
0
            }
712
713
0
            flb_free(runtime->workers);
714
0
            flb_free(runtime);
715
716
0
            return -1;
717
0
        }
718
0
    }
719
720
0
    runtime->worker_count = session->workers;
721
722
0
    for (index = 0; index < runtime->worker_count; index++) {
723
0
        memcpy(&runtime->workers[index].parent,
724
0
               session,
725
0
               sizeof(struct flb_http_server));
726
0
        memcpy(&runtime->workers[index].net_setup,
727
0
               session->networking_setup,
728
0
               sizeof(struct flb_net_setup));
729
730
0
        runtime->workers[index].net_setup.share_port = FLB_TRUE;
731
0
        runtime->workers[index].worker_id = index;
732
0
        runtime->workers[index].parent.reuse_port = FLB_TRUE;
733
0
        runtime->workers[index].parent.runtime = NULL;
734
0
        runtime->workers[index].parent.workers = session->workers;
735
736
0
        result = pthread_create(&runtime->workers[index].thread,
737
0
                                NULL,
738
0
                                flb_http_server_worker_thread,
739
0
                                &runtime->workers[index]);
740
0
        if (result != 0) {
741
0
            runtime->workers[index].startup_result = -1;
742
0
            break;
743
0
        }
744
0
        runtime->workers[index].thread_created = FLB_TRUE;
745
746
0
        pthread_mutex_lock(&runtime->workers[index].mutex);
747
0
        while (runtime->workers[index].initialized == FLB_FALSE) {
748
0
            pthread_cond_wait(&runtime->workers[index].condition,
749
0
                              &runtime->workers[index].mutex);
750
0
        }
751
0
        result = runtime->workers[index].startup_result;
752
0
        pthread_mutex_unlock(&runtime->workers[index].mutex);
753
754
0
        if (result != 0) {
755
0
            break;
756
0
        }
757
0
    }
758
759
0
    if (index != runtime->worker_count) {
760
0
        flb_http_server_runtime_stop(session, runtime);
761
0
        return -1;
762
0
    }
763
764
0
    session->runtime = runtime;
765
0
    session->status = HTTP_SERVER_RUNNING;
766
767
0
    return 0;
768
0
}
769
770
static void flb_http_server_runtime_stop(struct flb_http_server *session,
771
                                         struct flb_http_server_runtime *runtime)
772
0
{
773
0
    int index;
774
0
    int published;
775
776
0
    if (runtime == NULL) {
777
0
        return;
778
0
    }
779
780
0
    published = (session->runtime == runtime);
781
782
0
    for (index = 0; index < runtime->worker_count; index++) {
783
0
        cfl_atomic_store(&runtime->workers[index].should_exit, FLB_TRUE);
784
785
0
        if (runtime->workers[index].thread_created == FLB_TRUE) {
786
0
            pthread_join(runtime->workers[index].thread, NULL);
787
788
0
            flb_http_server_destroy(&runtime->workers[index].server);
789
790
0
            if (runtime->workers[index].event_loop != NULL) {
791
0
                mk_event_loop_destroy(runtime->workers[index].event_loop);
792
0
                runtime->workers[index].event_loop = NULL;
793
0
            }
794
0
        }
795
796
0
        flb_http_server_worker_context_cleanup(&runtime->workers[index]);
797
0
    }
798
799
0
    if (published == FLB_TRUE) {
800
0
        session->runtime = NULL;
801
0
    }
802
803
0
    flb_free(runtime->workers);
804
0
    flb_free(runtime);
805
0
}
806
807
/* HTTP SERVER */
808
809
int flb_http_server_init(struct flb_http_server *session,
810
                         int protocol_version,
811
                         uint64_t flags,
812
                         flb_http_server_request_processor_callback request_callback,
813
                         char *address,
814
                         unsigned short int port,
815
                         struct flb_tls *tls_provider,
816
                         int networking_flags,
817
                         struct flb_net_setup *networking_setup,
818
                         struct mk_event_loop *event_loop,
819
                         struct flb_config *system_context,
820
                         void *user_data)
821
0
{
822
0
    struct flb_http_server_options options;
823
824
0
    flb_http_server_options_init(&options);
825
826
0
    options.protocol_version = protocol_version;
827
0
    options.flags = flags;
828
0
    options.request_callback = request_callback;
829
0
    options.user_data = user_data;
830
0
    options.address = address;
831
0
    options.port = port;
832
0
    options.tls_provider = tls_provider;
833
0
    options.networking_flags = networking_flags;
834
0
    options.networking_setup = networking_setup;
835
0
    options.event_loop = event_loop;
836
0
    options.system_context = system_context;
837
838
0
    return flb_http_server_init_with_options(session, &options);
839
0
}
840
841
void flb_http_server_options_init(struct flb_http_server_options *options)
842
0
{
843
0
    if (options == NULL) {
844
0
        return;
845
0
    }
846
847
0
    memset(options, 0, sizeof(struct flb_http_server_options));
848
849
0
    options->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
850
0
    options->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
851
0
    options->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT;
852
0
    options->max_connections = 0;
853
0
    options->workers = 1;
854
0
    options->use_caller_event_loop = FLB_TRUE;
855
0
    options->reuse_port = FLB_FALSE;
856
0
}
857
858
void flb_http_server_config_init(struct flb_http_server_config *config)
859
3.58k
{
860
3.58k
    if (config == NULL) {
861
0
        return;
862
0
    }
863
864
3.58k
    memset(config, 0, sizeof(struct flb_http_server_config));
865
866
3.58k
    config->http2 = FLB_TRUE;
867
3.58k
    config->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT;
868
3.58k
    config->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
869
3.58k
    config->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
870
3.58k
    config->max_connections = 0;
871
3.58k
    config->workers = 1;
872
3.58k
    config->ingress_queue_event_limit = FLB_HTTP_SERVER_INGRESS_QUEUE_EVENT_LIMIT;
873
3.58k
    config->ingress_queue_byte_limit = FLB_HTTP_SERVER_INGRESS_QUEUE_BYTE_LIMIT;
874
3.58k
}
875
876
int flb_http_server_options_init_from_input(struct flb_http_server_options *options,
877
                                            struct flb_input_instance *input_instance,
878
                                            int protocol_version,
879
                                            uint64_t flags,
880
                                            size_t buffer_max_size,
881
                                            flb_http_server_request_processor_callback
882
                                                request_callback,
883
                                            void *user_data)
884
0
{
885
0
    if (options == NULL || input_instance == NULL) {
886
0
        return -1;
887
0
    }
888
889
0
    flb_http_server_options_init(options);
890
891
0
    options->protocol_version = protocol_version;
892
0
    options->flags = flags;
893
0
    options->request_callback = request_callback;
894
0
    options->user_data = user_data;
895
0
    options->address = input_instance->host.listen;
896
0
    options->port = input_instance->host.port;
897
0
    options->tls_provider = input_instance->tls;
898
0
    options->networking_flags = input_instance->flags;
899
0
    options->networking_setup = &input_instance->net_setup;
900
0
    options->event_loop = flb_input_event_loop_get(input_instance);
901
0
    options->system_context = input_instance->config;
902
0
    options->buffer_max_size = buffer_max_size;
903
0
    options->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
904
0
    options->max_connections = 0;
905
0
    options->reuse_port = input_instance->net_setup.share_port;
906
907
0
    return 0;
908
0
}
909
910
int flb_input_http_server_options_init(struct flb_http_server_options *options,
911
                                       struct flb_input_instance *input_instance,
912
                                       uint64_t flags,
913
                                       flb_http_server_request_processor_callback request_callback,
914
                                       void *user_data)
915
0
{
916
0
    int protocol_version;
917
0
    size_t buffer_max_size;
918
0
    int result;
919
0
    struct flb_http_server_config *server_config;
920
921
0
    if (input_instance == NULL || options == NULL ||
922
0
        input_instance->http_server_config == NULL) {
923
0
        return -1;
924
0
    }
925
926
0
    server_config = input_instance->http_server_config;
927
928
0
    if (server_config != NULL && server_config->http2 == FLB_FALSE) {
929
0
        protocol_version = HTTP_PROTOCOL_VERSION_11;
930
0
    }
931
0
    else {
932
0
        protocol_version = HTTP_PROTOCOL_VERSION_AUTODETECT;
933
0
    }
934
935
0
    if (server_config != NULL && server_config->buffer_max_size > 0) {
936
0
        buffer_max_size = server_config->buffer_max_size;
937
0
    }
938
0
    else {
939
0
        buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
940
0
    }
941
942
0
    result = flb_http_server_options_init_from_input(options,
943
0
                                                     input_instance,
944
0
                                                     protocol_version,
945
0
                                                     flags,
946
0
                                                     buffer_max_size,
947
0
                                                     request_callback,
948
0
                                                     user_data);
949
0
    if (result != 0) {
950
0
        return result;
951
0
    }
952
953
0
    if (server_config != NULL) {
954
0
        options->idle_timeout = server_config->idle_timeout;
955
0
        if (server_config->buffer_chunk_size > 0) {
956
0
            options->buffer_chunk_size = server_config->buffer_chunk_size;
957
0
        }
958
0
        options->max_connections = server_config->max_connections;
959
0
        options->workers = server_config->workers;
960
0
    }
961
962
0
    return 0;
963
0
}
964
965
int flb_http_server_init_with_options(
966
    struct flb_http_server *session,
967
    struct flb_http_server_options *options)
968
0
{
969
0
    if (session == NULL || options == NULL) {
970
0
        return -1;
971
0
    }
972
973
0
    if (options->buffer_max_size == 0) {
974
0
        options->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
975
0
    }
976
977
0
    if (options->workers <= 0) {
978
0
        options->workers = 1;
979
0
    }
980
981
0
    if (options->workers > 1) {
982
0
        options->reuse_port = FLB_TRUE;
983
0
        options->use_caller_event_loop = FLB_FALSE;
984
0
    }
985
986
0
    if (options->reuse_port == FLB_TRUE &&
987
0
        options->networking_setup != NULL) {
988
0
        options->networking_setup->share_port = FLB_TRUE;
989
0
    }
990
991
0
    if (options->networking_setup != NULL &&
992
0
        options->networking_setup->io_timeout <= 0 &&
993
0
        options->idle_timeout > 0) {
994
0
        options->networking_setup->io_timeout = options->idle_timeout;
995
0
    }
996
997
0
    return flb_http_server_apply_options(session, options);
998
0
}
999
1000
int flb_http_server_start(struct flb_http_server *session)
1001
0
{
1002
0
    const char *alpn;
1003
0
    int result;
1004
1005
0
    if (!flb_http_server_running_on_caller_context(session)) {
1006
0
        return flb_http_server_runtime_start(session);
1007
0
    }
1008
1009
0
    if (session->tls_provider != NULL &&
1010
0
        session->tls_alpn_configured == FLB_FALSE) {
1011
0
        alpn = flb_http_server_get_alpn_string(session);
1012
0
        result = flb_tls_set_alpn(session->tls_provider, alpn);
1013
1014
0
        if (result != 0) {
1015
0
            return -1;
1016
0
        }
1017
1018
0
        session->tls_alpn_configured = FLB_TRUE;
1019
0
    }
1020
1021
0
    session->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
1022
0
                                                session->networking_flags,
1023
0
                                                session->address,
1024
0
                                                session->port,
1025
0
                                                session->tls_provider,
1026
0
                                                session->system_context,
1027
0
                                                session->networking_setup);
1028
1029
0
    if (session->downstream == NULL) {
1030
0
        return -1;
1031
0
    }
1032
1033
0
    session->listener_event.type    = FLB_ENGINE_EV_CUSTOM;
1034
0
    session->listener_event.handler = flb_http_server_client_connection_event_handler;
1035
1036
    /* Register instance into the event loop */
1037
0
    result = mk_event_add(session->event_loop,
1038
0
                          session->downstream->server_fd,
1039
0
                          FLB_ENGINE_EV_CUSTOM,
1040
0
                          MK_EVENT_READ,
1041
0
                          &session->listener_event);
1042
1043
0
    if (result == -1) {
1044
0
        return -1;
1045
0
    }
1046
1047
0
    if (session->cb_worker_init != NULL) {
1048
0
        result = session->cb_worker_init(session,
1049
0
                                         session->user_data);
1050
1051
0
        if (result != 0) {
1052
0
            mk_event_del(session->event_loop, &session->listener_event);
1053
0
            flb_downstream_destroy(session->downstream);
1054
0
            session->downstream = NULL;
1055
1056
0
            return result;
1057
0
        }
1058
0
    }
1059
1060
0
    session->status = HTTP_SERVER_RUNNING;
1061
1062
0
    return 0;
1063
0
}
1064
1065
int flb_http_server_stop(struct flb_http_server *server)
1066
0
{
1067
0
    struct cfl_list                *iterator_backup;
1068
0
    struct cfl_list                *iterator;
1069
0
    struct flb_http_server_session *session;
1070
1071
0
    if (server->runtime != NULL) {
1072
0
        flb_http_server_runtime_stop(server, server->runtime);
1073
0
        server->status = HTTP_SERVER_STOPPED;
1074
0
        return 0;
1075
0
    }
1076
1077
0
    if (server->status == HTTP_SERVER_RUNNING) {
1078
0
        if (MK_EVENT_IS_REGISTERED((&server->listener_event))) {
1079
0
            mk_event_del(server->event_loop, &server->listener_event);
1080
0
        }
1081
1082
0
        mk_list_foreach_safe(iterator, iterator_backup, &server->clients) {
1083
0
            session = cfl_list_entry(iterator,
1084
0
                                     struct flb_http_server_session,
1085
0
                                     _head);
1086
1087
0
            flb_http_server_session_destroy(session);
1088
0
        }
1089
1090
0
        if (server->cb_worker_exit != NULL) {
1091
0
            server->cb_worker_exit(server, server->user_data);
1092
0
        }
1093
1094
0
        server->status = HTTP_SERVER_STOPPED;
1095
0
    }
1096
1097
0
    return 0;
1098
0
}
1099
1100
int flb_http_server_destroy(struct flb_http_server *server)
1101
0
{
1102
0
    flb_http_server_stop(server);
1103
1104
0
    if (server->downstream != NULL) {
1105
0
        flb_downstream_destroy(server->downstream);
1106
1107
0
        server->downstream = NULL;
1108
0
    }
1109
1110
0
    return 0;
1111
0
}
1112
1113
int flb_http_server_init_on_input(struct flb_http_server *session,
1114
                                  struct flb_input_instance *input_instance,
1115
                                  int protocol_version,
1116
                                  uint64_t flags,
1117
                                  size_t buffer_max_size,
1118
                                  flb_http_server_request_processor_callback request_callback,
1119
                                  void *user_data)
1120
0
{
1121
0
    int result;
1122
0
    struct flb_http_server_options options;
1123
1124
0
    if (session == NULL || input_instance == NULL) {
1125
0
        return -1;
1126
0
    }
1127
1128
0
    result = flb_http_server_options_init_from_input(&options,
1129
0
                                                     input_instance,
1130
0
                                                     protocol_version,
1131
0
                                                     flags,
1132
0
                                                     buffer_max_size,
1133
0
                                                     request_callback,
1134
0
                                                     user_data);
1135
0
    if (result != 0) {
1136
0
        return result;
1137
0
    }
1138
1139
0
    result = flb_http_server_init_with_options(session, &options);
1140
1141
0
    if (result != 0) {
1142
0
        return result;
1143
0
    }
1144
1145
0
    result = flb_http_server_start(session);
1146
1147
0
    if (result != 0) {
1148
0
        flb_http_server_destroy(session);
1149
0
        return result;
1150
0
    }
1151
1152
0
    result = 0;
1153
1154
0
    if (session->runtime == NULL && session->downstream != NULL) {
1155
0
        result = flb_input_downstream_set(session->downstream, input_instance);
1156
1157
0
        if (result != 0) {
1158
0
            flb_http_server_destroy(session);
1159
0
        }
1160
0
    }
1161
1162
0
    return result;
1163
0
}
1164
1165
void flb_http_server_set_buffer_max_size(struct flb_http_server *server,
1166
                                         size_t size)
1167
0
{
1168
0
    server->buffer_max_size = size;
1169
0
}
1170
1171
size_t flb_http_server_get_buffer_max_size(struct flb_http_server *server)
1172
0
{
1173
0
    return server->buffer_max_size;
1174
0
}
1175
1176
/* HTTP SESSION */
1177
1178
int flb_http_server_session_init(struct flb_http_server_session *session, int version)
1179
0
{
1180
0
    int result;
1181
1182
0
    memset(session, 0, sizeof(struct flb_http_server_session));
1183
1184
0
    cfl_list_init(&session->request_queue);
1185
0
    cfl_list_entry_init(&session->_head);
1186
1187
0
    session->incoming_data = cfl_sds_create_size(HTTP_SERVER_INITIAL_BUFFER_SIZE);
1188
1189
0
    if (session->incoming_data == NULL) {
1190
0
        return -1;
1191
0
    }
1192
1193
0
    session->outgoing_data = cfl_sds_create_size(HTTP_SERVER_INITIAL_BUFFER_SIZE);
1194
1195
0
    if (session->outgoing_data == NULL) {
1196
0
        return -2;
1197
0
    }
1198
1199
0
    session->version = version;
1200
1201
0
    if (session->version == HTTP_PROTOCOL_VERSION_20) {
1202
0
        result = flb_http2_server_session_init(&session->http2, session);
1203
1204
0
        if (result != 0) {
1205
0
            return -3;
1206
0
        }
1207
0
    }
1208
0
    else if (session->version >  HTTP_PROTOCOL_VERSION_AUTODETECT &&
1209
0
             session->version <= HTTP_PROTOCOL_VERSION_11) {
1210
0
        result = flb_http1_server_session_init(&session->http1, session);
1211
1212
0
        if (result != 0) {
1213
0
            return -4;
1214
0
        }
1215
0
    }
1216
1217
0
    return 0;
1218
0
}
1219
1220
struct flb_http_server_session *flb_http_server_session_create(int version)
1221
0
{
1222
0
    struct flb_http_server_session *session;
1223
0
    int                  result;
1224
1225
0
    session = flb_calloc(1, sizeof(struct flb_http_server_session));
1226
1227
0
    if (session != NULL) {
1228
0
        result = flb_http_server_session_init(session, version);
1229
1230
0
        session->releasable = FLB_TRUE;
1231
1232
0
        if (result != 0) {
1233
0
            flb_http_server_session_destroy(session);
1234
1235
0
            session = NULL;
1236
0
        }
1237
0
    }
1238
1239
0
    return session;
1240
0
}
1241
1242
void flb_http_server_session_destroy(struct flb_http_server_session *session)
1243
0
{
1244
0
    struct flb_connection *connection;
1245
1246
0
    if (session != NULL) {
1247
0
        connection = session->connection;
1248
0
        session->connection = NULL;
1249
1250
0
        if (connection != NULL) {
1251
0
            connection->user_data = NULL;
1252
0
            connection->drop_notification_callback = NULL;
1253
0
            session->drop_pending = FLB_FALSE;
1254
1255
0
            if (connection->fd != FLB_INVALID_SOCKET) {
1256
0
                flb_downstream_conn_release(connection);
1257
0
            }
1258
0
        }
1259
1260
0
        if (!cfl_list_entry_is_orphan(&session->_head)) {
1261
0
            cfl_list_del(&session->_head);
1262
0
        }
1263
1264
0
        if (session->incoming_data != NULL) {
1265
0
            cfl_sds_destroy(session->incoming_data);
1266
0
        }
1267
1268
0
        if (session->outgoing_data != NULL) {
1269
0
            cfl_sds_destroy(session->outgoing_data);
1270
0
        }
1271
1272
0
        if (session->read_buffer != NULL) {
1273
0
            flb_free(session->read_buffer);
1274
0
        }
1275
1276
0
        flb_http1_server_session_destroy(&session->http1);
1277
0
        flb_http2_server_session_destroy(&session->http2);
1278
1279
0
        if (session->releasable) {
1280
0
            flb_free(session);
1281
0
        }
1282
0
    }
1283
0
}
1284
1285
int flb_http_server_session_ingest(struct flb_http_server_session *session,
1286
                            unsigned char *buffer,
1287
                            size_t length)
1288
0
{
1289
0
    int       result;
1290
0
    size_t    max_size;
1291
0
    cfl_sds_t resized_buffer;
1292
1293
0
    max_size = flb_http_server_get_buffer_max_size(session->parent);
1294
0
    if (session->parent != NULL && cfl_sds_len(session->incoming_data) + length > max_size) {
1295
0
        return HTTP_SERVER_BUFFER_LIMIT_EXCEEDED;
1296
0
    }
1297
1298
0
    if (session->version == HTTP_PROTOCOL_VERSION_AUTODETECT ||
1299
0
        session->version <= HTTP_PROTOCOL_VERSION_11) {
1300
0
        resized_buffer = cfl_sds_cat(session->incoming_data,
1301
0
                                     (const char *) buffer,
1302
0
                                     length);
1303
1304
0
        if (resized_buffer == NULL) {
1305
0
            return HTTP_SERVER_ALLOCATION_ERROR;
1306
0
        }
1307
1308
0
        session->incoming_data = resized_buffer;
1309
0
    }
1310
1311
0
    if (session->version == HTTP_PROTOCOL_VERSION_AUTODETECT) {
1312
0
        if (cfl_sds_len(session->incoming_data) >= 24) {
1313
0
            if (strncmp(session->incoming_data,
1314
0
                        "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
1315
0
                        24) == 0) {
1316
0
                session->version = HTTP_PROTOCOL_VERSION_20;
1317
0
            }
1318
0
            else {
1319
0
                session->version = HTTP_PROTOCOL_VERSION_11;
1320
0
            }
1321
0
        }
1322
0
        else if (cfl_sds_len(session->incoming_data) >= 4) {
1323
0
            if (strncmp(session->incoming_data, "PRI ", 4) != 0) {
1324
0
                session->version = HTTP_PROTOCOL_VERSION_11;
1325
0
            }
1326
0
        }
1327
1328
0
        if (session->version <= HTTP_PROTOCOL_VERSION_11) {
1329
0
            result = flb_http1_server_session_init(&session->http1, session);
1330
1331
0
            if (result != 0) {
1332
0
                return -1;
1333
0
            }
1334
0
        }
1335
0
        else if (session->version == HTTP_PROTOCOL_VERSION_20) {
1336
0
            result = flb_http2_server_session_init(&session->http2, session);
1337
1338
0
            if (result != 0) {
1339
0
                return -1;
1340
0
            }
1341
0
        }
1342
0
    }
1343
1344
0
    if (session->version <= HTTP_PROTOCOL_VERSION_11) {
1345
0
        return flb_http1_server_session_ingest(&session->http1,
1346
0
                                               buffer,
1347
0
                                               length);
1348
0
    }
1349
0
    else if (session->version == HTTP_PROTOCOL_VERSION_20) {
1350
0
        return flb_http2_server_session_ingest(&session->http2,
1351
0
                                               buffer,
1352
0
                                               length);
1353
0
    }
1354
1355
0
    return -1;
1356
0
}