Coverage Report

Created: 2023-01-10 06:17

/src/fluent-bit/src/flb_downstream.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <monkey/mk_core.h>
21
#include <fluent-bit/flb_info.h>
22
#include <fluent-bit/flb_mem.h>
23
#include <fluent-bit/flb_kv.h>
24
#include <fluent-bit/flb_io.h>
25
#include <fluent-bit/flb_str.h>
26
#include <fluent-bit/flb_slist.h>
27
#include <fluent-bit/flb_utils.h>
28
#include <fluent-bit/flb_engine.h>
29
#include <fluent-bit/tls/flb_tls.h>
30
#include <fluent-bit/flb_downstream.h>
31
#include <fluent-bit/flb_connection.h>
32
#include <fluent-bit/flb_config_map.h>
33
#include <fluent-bit/flb_thread_storage.h>
34
35
/* Config map for Downstream networking setup */
36
struct flb_config_map downstream_net[] = {
37
    {
38
     FLB_CONFIG_MAP_TIME, "net.io_timeout", "0s",
39
     0, FLB_TRUE, offsetof(struct flb_net_setup, io_timeout),
40
     "Set maximum time a connection can stay idle"
41
    },
42
43
    {
44
     FLB_CONFIG_MAP_TIME, "net.accept_timeout", "10s",
45
     0, FLB_TRUE, offsetof(struct flb_net_setup, accept_timeout),
46
     "Set maximum time allowed to establish an incoming connection, this time "
47
     "includes the TLS handshake"
48
    },
49
50
    {
51
     FLB_CONFIG_MAP_BOOL, "net.accept_timeout_log_error", "true",
52
     0, FLB_TRUE, offsetof(struct flb_net_setup, accept_timeout_log_error),
53
     "On client accept timeout, specify if it should log an error. When "
54
     "disabled, the timeout is logged as a debug message"
55
    },
56
57
    /* EOF */
58
    {0}
59
};
60
61
/* Enable thread-safe mode for downstream connection */
62
void flb_downstream_thread_safe(struct flb_downstream *stream)
63
0
{
64
0
    flb_stream_enable_thread_safety(&stream->base);
65
0
}
66
67
struct mk_list *flb_downstream_get_config_map(struct flb_config *config)
68
0
{
69
0
    return flb_config_map_create(config, downstream_net);
70
0
}
71
72
/* Initialize any downstream environment context */
73
void flb_downstream_init()
74
0
{
75
    /* There's nothing to do here yet */
76
0
}
77
78
int flb_downstream_setup(struct flb_downstream *stream,
79
                         int transport, int flags,
80
                         const char *host,
81
                         unsigned short int port,
82
                         struct flb_tls *tls,
83
                         struct flb_config *config,
84
                         struct flb_net_setup *net_setup)
85
0
{
86
0
    char port_string[8];
87
88
0
    flb_stream_setup(&stream->base,
89
0
                     FLB_DOWNSTREAM,
90
0
                     transport,
91
0
                     flags,
92
0
                     tls,
93
0
                     config,
94
0
                     net_setup);
95
96
0
    stream->server_fd = FLB_INVALID_SOCKET;
97
0
    stream->host = flb_strdup(host);
98
0
    stream->port = port;
99
100
0
    if (stream->host == NULL) {
101
0
        return -1;
102
0
    }
103
104
0
    mk_list_init(&stream->busy_queue);
105
0
    mk_list_init(&stream->destroy_queue);
106
107
0
    snprintf(port_string, sizeof(port_string), "%u", port);
108
109
0
    if (transport == FLB_TRANSPORT_TCP) {
110
0
        stream->server_fd = flb_net_server(port_string, host);
111
0
    }
112
0
    else if (transport == FLB_TRANSPORT_UDP) {
113
0
        stream->server_fd = flb_net_server_udp(port_string, host);
114
0
    }
115
0
    else if (transport == FLB_TRANSPORT_UNIX_STREAM) {
116
0
        stream->server_fd = flb_net_server_unix(host,
117
0
                                                FLB_TRUE,
118
0
                                                FLB_NETWORK_DEFAULT_BACKLOG_SIZE);
119
0
    }
120
0
    else if (transport == FLB_TRANSPORT_UNIX_DGRAM) {
121
0
        stream->server_fd = flb_net_server_unix(host,
122
0
                                                FLB_FALSE,
123
0
                                                FLB_NETWORK_DEFAULT_BACKLOG_SIZE);
124
0
    }
125
126
0
    if (stream->server_fd != -1) {
127
0
        flb_debug("[downstream] listening on %s:%s", host, port_string);
128
0
    }
129
0
    else {
130
0
        flb_error("[downstream] could not bind address %s:%s. Aborting",
131
0
                  host, port_string);
132
133
0
        return -2;
134
0
    }
135
136
0
    mk_list_add(&stream->base._head, &config->downstreams);
137
138
0
    return 0;
139
0
}
140
141
/* Creates a new downstream context */
142
struct flb_downstream *flb_downstream_create(int transport, int flags,
143
                                             const char *host,
144
                                             unsigned short int port,
145
                                             struct flb_tls *tls,
146
                                             struct flb_config *config,
147
                                             struct flb_net_setup *net_setup)
148
0
{
149
0
    struct flb_downstream *stream;
150
0
    int                    result;
151
152
0
    stream = flb_calloc(1, sizeof(struct flb_downstream));
153
154
0
    if (stream == NULL) {
155
0
        flb_errno();
156
0
    }
157
0
    else {
158
0
        stream->base.dynamically_allocated = FLB_TRUE;
159
160
0
        result = flb_downstream_setup(stream,
161
0
                                      transport, flags,
162
0
                                      host, port,
163
0
                                      tls,
164
0
                                      config,
165
0
                                      net_setup);
166
167
0
        if (result != 0) {
168
0
            flb_downstream_destroy(stream);
169
170
0
            stream = NULL;
171
0
        }
172
0
    }
173
174
0
    return stream;
175
0
}
176
177
/*
178
 * This function moves the 'downstream connection' into the queue to be
179
 * destroyed. Note that the caller is responsible to validate and check
180
 * required mutex if this is being used in multi-worker mode.
181
 */
182
static int prepare_destroy_conn(struct flb_connection *connection)
183
0
{
184
0
    struct flb_stream *stream;
185
186
0
    stream = connection->stream;
187
188
0
    flb_trace("[downstream] destroy connection #%i to %s",
189
0
              connection->fd, flb_connection_get_remote_address(connection));
190
191
0
    if (stream->flags & FLB_IO_ASYNC) {
192
0
        mk_event_del(connection->evl, &connection->event);
193
0
    }
194
195
    /* This should be != -1 to cover those use cases where stdin, stdout
196
     * and stderr are closed.
197
     */
198
199
0
    if (connection->fd != FLB_INVALID_SOCKET) {
200
0
        flb_socket_close(connection->fd);
201
202
0
        connection->fd = FLB_INVALID_SOCKET;
203
0
        connection->event.fd = FLB_INVALID_SOCKET;
204
0
    }
205
206
    /* remove connection from the queue */
207
0
    mk_list_del(&connection->_head);
208
209
    /* Add node to destroy queue */
210
0
    mk_list_add(&connection->_head, &connection->downstream->destroy_queue);
211
212
    /*
213
     * note: the connection context is destroyed by the engine once all events
214
     * have been processed.
215
     */
216
0
    return 0;
217
0
}
218
219
/* 'safe' version of prepare_destroy_conn. It set locks if necessary */
220
static inline int prepare_destroy_conn_safe(struct flb_connection *connection)
221
0
{
222
0
    int result;
223
224
    /* This used to not wait for the lock in thread safe mode but it makes
225
     * no sense so I'm changing it (08/28/22) leo
226
     */
227
228
0
    flb_stream_acquire_lock(connection->stream, FLB_TRUE);
229
230
0
    result = prepare_destroy_conn(connection);
231
232
0
    flb_stream_release_lock(connection->stream);
233
234
0
    return result;
235
0
}
236
237
static int destroy_conn(struct flb_connection *connection)
238
0
{
239
    /* Delay the destruction of busy connections */
240
0
    if (connection->busy_flag) {
241
0
        return 0;
242
0
    }
243
244
0
    if (connection->tls_session != NULL) {
245
0
        flb_tls_session_destroy(connection->tls_session);
246
0
    }
247
248
0
    mk_list_del(&connection->_head);
249
250
0
    flb_connection_destroy(connection);
251
252
0
    return 0;
253
0
}
254
255
struct flb_connection *flb_downstream_conn_get(struct flb_downstream *stream)
256
0
{
257
0
    flb_sockfd_t           connection_fd;
258
0
    struct flb_connection *connection;
259
0
    int                    transport;
260
0
    struct flb_coro       *coroutine;
261
0
    int                    result;
262
263
0
    transport = stream->base.transport;
264
265
0
    if (transport == FLB_TRANSPORT_UDP ||
266
0
        transport == FLB_TRANSPORT_UNIX_DGRAM ) {
267
0
        if (stream->dgram_connection != NULL) {
268
0
            return stream->dgram_connection;
269
0
        }
270
271
0
        connection_fd = stream->server_fd;
272
0
    }
273
0
    else {
274
0
        connection_fd = FLB_INVALID_SOCKET;
275
0
    }
276
277
0
    if (flb_downstream_is_async(stream)) {
278
0
        coroutine = flb_coro_get();
279
0
    }
280
0
    else {
281
0
        coroutine = NULL;
282
0
    }
283
284
0
    connection = flb_connection_create(connection_fd,
285
0
                                       FLB_DOWNSTREAM_CONNECTION,
286
0
                                       (void *) stream,
287
0
                                       flb_engine_evl_get(),
288
0
                                       coroutine);
289
290
0
    if (connection == NULL) {
291
0
        return NULL;
292
0
    }
293
294
0
    connection->busy_flag = FLB_TRUE;
295
296
0
    flb_stream_acquire_lock(&stream->base, FLB_TRUE);
297
298
    /* Link new connection to the busy queue */
299
0
    mk_list_add(&connection->_head, &stream->busy_queue);
300
301
0
    flb_stream_release_lock(&stream->base);
302
303
0
    if (transport != FLB_TRANSPORT_UDP &&
304
0
        transport != FLB_TRANSPORT_UNIX_DGRAM ) {
305
0
        flb_connection_reset_connection_timeout(connection);
306
307
0
        result = flb_io_net_accept(connection, coroutine);
308
309
0
        if (result != 0) {
310
0
            flb_connection_reset_connection_timeout(connection);
311
312
0
            flb_debug("[downstream] connection #%i failed",
313
0
                      connection->fd);
314
315
0
            prepare_destroy_conn_safe(connection);
316
317
0
            connection->busy_flag = FLB_FALSE;
318
319
0
            return NULL;
320
0
        }
321
322
0
        flb_connection_unset_connection_timeout(connection);
323
0
    }
324
325
0
    connection->busy_flag = FLB_FALSE;
326
327
0
    flb_connection_reset_io_timeout(connection);
328
329
0
    if (transport == FLB_TRANSPORT_UDP ||
330
0
        transport == FLB_TRANSPORT_UNIX_DGRAM) {
331
0
        if (stream->dgram_connection == NULL) {
332
0
            stream->dgram_connection = connection;
333
0
        }
334
0
    }
335
336
0
    return connection;
337
0
}
338
339
void flb_downstream_destroy(struct flb_downstream *stream)
340
0
{
341
0
    struct flb_connection *connection;
342
0
    struct mk_list        *head;
343
0
    struct mk_list        *tmp;
344
345
0
    if (stream != NULL) {
346
0
        mk_list_foreach_safe(head, tmp, &stream->busy_queue) {
347
0
            connection = mk_list_entry(head, struct flb_connection, _head);
348
349
0
            prepare_destroy_conn(connection);
350
0
        }
351
352
0
        mk_list_foreach_safe(head, tmp, &stream->destroy_queue) {
353
0
            connection = mk_list_entry(head, struct flb_connection, _head);
354
355
0
            destroy_conn(connection);
356
0
        }
357
358
        /* If the simulated UDP connection reference is set then
359
         * it means that connection was already cleaned up by the
360
         * preceding code which means server_fd holds a socket
361
         * reference that has already been closed and we need to
362
         * honor that.
363
         */
364
365
0
        if (stream->dgram_connection != NULL) {
366
0
            stream->dgram_connection = NULL;
367
0
            stream->server_fd = FLB_INVALID_SOCKET;
368
0
        }
369
370
0
        if (stream->host != NULL) {
371
0
            flb_free(stream->host);
372
0
        }
373
374
0
        if (stream->server_fd != FLB_INVALID_SOCKET) {
375
0
            flb_socket_close(stream->server_fd);
376
0
        }
377
378
0
        if (mk_list_entry_orphan(&stream->base._head) == 0) {
379
0
            mk_list_del(&stream->base._head);
380
0
        }
381
382
0
        if (stream->base.dynamically_allocated) {
383
0
            flb_free(stream);
384
0
        }
385
0
    }
386
0
}
387
388
int flb_downstream_conn_release(struct flb_connection *connection)
389
0
{
390
0
    return prepare_destroy_conn_safe(connection);
391
0
}
392
393
int flb_downstream_conn_timeouts(struct mk_list *list)
394
0
{
395
0
    int                    elapsed_time;
396
0
    struct flb_connection *connection;
397
0
    const char            *reason;
398
0
    struct flb_downstream *stream;
399
0
    struct mk_list        *s_head;
400
0
    struct mk_list        *head;
401
0
    int                    drop;
402
0
    struct mk_list        *tmp;
403
0
    time_t                 now;
404
405
0
    now = time(NULL);
406
407
    /* Iterate all downstream contexts */
408
0
    mk_list_foreach(head, list) {
409
0
        stream = mk_list_entry(head, struct flb_downstream, base._head);
410
411
0
        if (stream->base.transport == FLB_TRANSPORT_UDP) {
412
0
            continue;
413
0
        }
414
415
0
        flb_stream_acquire_lock(&stream->base, FLB_TRUE);
416
417
        /* Iterate every busy connection */
418
0
        mk_list_foreach_safe(s_head, tmp, &stream->busy_queue) {
419
0
            connection = mk_list_entry(s_head, struct flb_connection, _head);
420
421
0
            drop = FLB_FALSE;
422
423
            /* Connect timeouts */
424
0
            if (connection->net->connect_timeout > 0 &&
425
0
                connection->ts_connect_timeout > 0 &&
426
0
                connection->ts_connect_timeout <= now) {
427
0
                drop = FLB_TRUE;
428
0
                reason = "connection timeout";
429
0
                elapsed_time = connection->net->accept_timeout;
430
0
            }
431
0
            else if (connection->net->io_timeout > 0 &&
432
0
                     connection->ts_io_timeout > 0 &&
433
0
                     connection->ts_io_timeout <= now) {
434
0
                drop = FLB_TRUE;
435
0
                reason = "IO timeout";
436
0
                elapsed_time = connection->net->io_timeout;
437
0
            }
438
439
0
            if (drop) {
440
0
                if (!flb_downstream_is_shutting_down(stream)) {
441
0
                    if (connection->net->accept_timeout_log_error) {
442
0
                        flb_error("[downstream] connection #%i from %s timed "
443
0
                                  "out after %i seconds (%s)",
444
0
                                  connection->fd,
445
0
                                  connection->user_friendly_remote_host,
446
0
                                  elapsed_time,
447
0
                                  reason);
448
0
                    }
449
0
                    else {
450
0
                        flb_debug("[downstream] connection #%i from %s timed "
451
0
                                  "out after %i seconds (%s)",
452
0
                                  connection->fd,
453
0
                                  connection->user_friendly_remote_host,
454
0
                                  elapsed_time,
455
0
                                  reason);
456
0
                    }
457
0
                }
458
459
0
                if (connection->event.status != MK_EVENT_NONE) {
460
0
                    mk_event_inject(connection->evl,
461
0
                                    &connection->event,
462
0
                                    connection->event.mask,
463
0
                                    FLB_TRUE);
464
0
                }
465
466
0
                connection->net_error = ETIMEDOUT;
467
468
0
                prepare_destroy_conn(connection);
469
0
            }
470
0
        }
471
472
0
        flb_stream_release_lock(&stream->base);
473
0
    }
474
475
0
    return 0;
476
0
}
477
478
int flb_downstream_conn_pending_destroy(struct flb_downstream *stream)
479
0
{
480
0
    struct flb_connection *connection;
481
0
    struct mk_list        *head;
482
0
    struct mk_list        *tmp;
483
484
0
    flb_stream_acquire_lock(&stream->base, FLB_TRUE);
485
486
0
    mk_list_foreach_safe(head, tmp, &stream->destroy_queue) {
487
0
        connection = mk_list_entry(head, struct flb_connection, _head);
488
489
0
        destroy_conn(connection);
490
0
    }
491
492
0
    flb_stream_release_lock(&stream->base);
493
494
0
    return 0;
495
0
}
496
497
int flb_downstream_conn_pending_destroy_list(struct mk_list *list)
498
0
{
499
0
    struct flb_downstream *stream;
500
0
    struct mk_list        *head;
501
502
    /* Iterate all downstream contexts */
503
0
    mk_list_foreach(head, list) {
504
0
         stream = mk_list_entry(head, struct flb_downstream, base._head);
505
506
0
        flb_downstream_conn_pending_destroy(stream);
507
0
    }
508
509
0
    return 0;
510
0
}
511
512
int flb_downstream_is_async(struct flb_downstream *stream)
513
0
{
514
0
    return flb_stream_is_async(&stream->base);
515
0
}