Coverage Report

Created: 2023-03-10 07:33

/src/fluent-bit/src/flb_input.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdlib.h>
21
22
#include <monkey/mk_core.h>
23
#include <fluent-bit/flb_info.h>
24
#include <fluent-bit/flb_mem.h>
25
#include <fluent-bit/flb_str.h>
26
#include <fluent-bit/flb_env.h>
27
#include <fluent-bit/flb_pipe.h>
28
#include <fluent-bit/flb_macros.h>
29
#include <fluent-bit/flb_input.h>
30
#include <fluent-bit/flb_input_thread.h>
31
#include <fluent-bit/flb_error.h>
32
#include <fluent-bit/flb_utils.h>
33
#include <fluent-bit/flb_plugin_proxy.h>
34
#include <fluent-bit/flb_engine.h>
35
#include <fluent-bit/flb_metrics.h>
36
#include <fluent-bit/flb_storage.h>
37
#include <fluent-bit/flb_downstream.h>
38
#include <fluent-bit/flb_kv.h>
39
#include <fluent-bit/flb_hash_table.h>
40
#include <fluent-bit/flb_scheduler.h>
41
#include <fluent-bit/flb_ring_buffer.h>
42
43
/* input plugin macro helpers */
44
#include <fluent-bit/flb_input_plugin.h>
45
46
#ifdef FLB_HAVE_CHUNK_TRACE
47
#include <fluent-bit/flb_chunk_trace.h>
48
#endif /* FLB_HAVE_CHUNK_TRACE */
49
50
struct flb_libco_in_params libco_in_param;
51
52
3.36k
#define protcmp(a, b)  strncasecmp(a, b, strlen(a))
53
54
/*
55
 * Ring buffer size: we make space for 512 entries that each input instance can
56
 * use to enqueue data. Note that this value is fixed and only affect input plugins
57
 * which runs in threaded mode (separate thread)
58
 *
59
 * Ring buffer window: the current window size is set to 5% which means that the
60
 * ring buffer will emit a flush request whenever there are 51 records or more
61
 * awaiting to be consumed.
62
 */
63
64
1.68k
#define FLB_INPUT_RING_BUFFER_SIZE   (sizeof(void *) * 1024)
65
0
#define FLB_INPUT_RING_BUFFER_WINDOW (5)
66
67
68
static int check_protocol(const char *prot, const char *output)
69
28.6k
{
70
28.6k
    int len;
71
72
28.6k
    len = strlen(prot);
73
28.6k
    if (len != strlen(output)) {
74
25.2k
        return 0;
75
25.2k
    }
76
77
3.36k
    if (protcmp(prot, output) != 0) {
78
1.68k
        return 0;
79
1.68k
    }
80
81
1.68k
    return 1;
82
3.36k
}
83
84
static inline int instance_id(struct flb_input_plugin *p,
85
1.68k
                              struct flb_config *config) \
86
1.68k
{
87
1.68k
    int c = 0;
88
1.68k
    struct mk_list *head;
89
1.68k
    struct flb_input_instance *entry;
90
91
1.68k
    mk_list_foreach(head, &config->inputs) {
92
0
        entry = mk_list_entry(head, struct flb_input_instance, _head);
93
0
        if (entry->id == c) {
94
0
            c++;
95
0
        }
96
0
    }
97
98
1.68k
    return c;
99
1.68k
}
100
101
/* Generate a new collector ID for the instance in question */
102
static int collector_id(struct flb_input_instance *ins)
103
1.68k
{
104
1.68k
    int id = 0;
105
1.68k
    struct flb_input_collector *collector;
106
107
1.68k
    if (mk_list_is_empty(&ins->collectors) == 0) {
108
1.68k
        return id;
109
1.68k
    }
110
111
0
    collector = mk_list_entry_last(&ins->collectors,
112
0
                                   struct flb_input_collector,
113
0
                                   _head);
114
0
    return (collector->id + 1);
115
1.68k
}
116
117
void flb_input_net_default_listener(const char *listen, int port,
118
                                    struct flb_input_instance *ins)
119
0
{
120
    /* Set default network configuration */
121
0
    if (!ins->host.listen) {
122
0
        ins->host.listen = flb_sds_create(listen);
123
0
    }
124
0
    if (ins->host.port == 0) {
125
0
        ins->host.port = port;
126
0
    }
127
0
}
128
129
/* Check input plugin's log level.
130
 * Not for core plugins but for Golang plugins.
131
 * Golang plugins do not have thread-local flb_worker_ctx information. */
132
int flb_input_log_check(struct flb_input_instance *ins, int l)
133
0
{
134
0
    if (ins->log_level < l) {
135
0
        return FLB_FALSE;
136
0
    }
137
138
0
    return FLB_TRUE;
139
0
}
140
141
/* Create an input plugin instance */
142
struct flb_input_instance *flb_input_new(struct flb_config *config,
143
                                         const char *input, void *data,
144
                                         int public_only)
145
1.68k
{
146
1.68k
    int id;
147
1.68k
    int ret;
148
1.68k
    int flags = 0;
149
1.68k
    struct mk_list *head;
150
1.68k
    struct flb_input_plugin *plugin;
151
1.68k
    struct flb_input_instance *instance = NULL;
152
153
/* use for locking the use of the chunk trace context. */
154
1.68k
#ifdef FLB_HAVE_CHUNK_TRACE
155
1.68k
    pthread_mutexattr_t attr = {0};
156
1.68k
    pthread_mutexattr_init(&attr);
157
1.68k
#endif
158
159
1.68k
    if (!input) {
160
0
        return NULL;
161
0
    }
162
163
28.6k
    mk_list_foreach(head, &config->in_plugins) {
164
28.6k
        plugin = mk_list_entry(head, struct flb_input_plugin, _head);
165
28.6k
        if (!check_protocol(plugin->name, input)) {
166
26.9k
            plugin = NULL;
167
26.9k
            continue;
168
26.9k
        }
169
170
        /*
171
         * Check if the plugin is private and validate the 'public_only'
172
         * requirement.
173
         */
174
1.68k
        if (public_only == FLB_TRUE && plugin->flags & FLB_INPUT_PRIVATE) {
175
0
            return NULL;
176
0
        }
177
178
        /* Create plugin instance */
179
1.68k
        instance = flb_calloc(1, sizeof(struct flb_input_instance));
180
1.68k
        if (!instance) {
181
0
            flb_errno();
182
0
            return NULL;
183
0
        }
184
1.68k
        instance->config = config;
185
186
        /* Get an ID */
187
1.68k
        id =  instance_id(plugin, config);
188
189
        /* Index for log Chunks (hash table) */
190
1.68k
        instance->ht_log_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
191
1.68k
                                                        512, 0);
192
1.68k
        if (!instance->ht_log_chunks) {
193
0
            flb_free(instance);
194
0
            return NULL;
195
0
        }
196
197
        /* Index for metric Chunks (hash table) */
198
1.68k
        instance->ht_metric_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
199
1.68k
                                                           512, 0);
200
1.68k
        if (!instance->ht_metric_chunks) {
201
0
            flb_hash_table_destroy(instance->ht_log_chunks);
202
0
            flb_free(instance);
203
0
            return NULL;
204
0
        }
205
206
        /* Index for trace Chunks (hash table) */
207
1.68k
        instance->ht_trace_chunks = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE,
208
1.68k
                                                          512, 0);
209
1.68k
        if (!instance->ht_trace_chunks) {
210
0
            flb_hash_table_destroy(instance->ht_log_chunks);
211
0
            flb_hash_table_destroy(instance->ht_metric_chunks);
212
0
            flb_free(instance);
213
0
            return NULL;
214
0
        }
215
216
        /* format name (with instance id) */
217
1.68k
        snprintf(instance->name, sizeof(instance->name) - 1,
218
1.68k
                 "%s.%i", plugin->name, id);
219
220
1.68k
        if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
221
1.68k
            instance->context = NULL;
222
1.68k
        }
223
0
        else {
224
0
            struct flb_plugin_proxy_context *ctx;
225
226
0
            ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
227
0
            if (!ctx) {
228
0
                flb_errno();
229
0
                flb_free(instance);
230
0
                return NULL;
231
0
            }
232
233
0
            ctx->proxy = plugin->proxy;
234
235
0
            instance->context = ctx;
236
0
        }
237
238
        /* initialize remaining vars */
239
1.68k
        instance->alias    = NULL;
240
1.68k
        instance->id       = id;
241
1.68k
        instance->flags    = plugin->flags;
242
1.68k
        instance->p        = plugin;
243
1.68k
        instance->tag      = NULL;
244
1.68k
        instance->tag_len  = 0;
245
1.68k
        instance->routable = FLB_TRUE;
246
1.68k
        instance->data     = data;
247
1.68k
        instance->storage  = NULL;
248
1.68k
        instance->storage_type = -1;
249
1.68k
        instance->log_level = -1;
250
1.68k
        instance->runs_in_coroutine = FLB_FALSE;
251
252
        /* net */
253
1.68k
        instance->host.name    = NULL;
254
1.68k
        instance->host.address = NULL;
255
1.68k
        instance->host.uri     = NULL;
256
1.68k
        instance->host.listen  = NULL;
257
1.68k
        instance->host.ipv6    = FLB_FALSE;
258
259
        /* Initialize list heads */
260
1.68k
        mk_list_init(&instance->routes_direct);
261
1.68k
        mk_list_init(&instance->routes);
262
1.68k
        mk_list_init(&instance->tasks);
263
1.68k
        mk_list_init(&instance->chunks);
264
1.68k
        mk_list_init(&instance->collectors);
265
1.68k
        mk_list_init(&instance->input_coro_list);
266
1.68k
        mk_list_init(&instance->input_coro_list_destroy);
267
1.68k
        mk_list_init(&instance->downstreams);
268
1.68k
        mk_list_init(&instance->upstreams);
269
270
        /* Initialize properties list */
271
1.68k
        flb_kv_init(&instance->properties);
272
1.68k
        flb_kv_init(&instance->net_properties);
273
274
        /* Plugin use networking */
275
1.68k
        if (plugin->flags & (FLB_INPUT_NET | FLB_INPUT_NET_SERVER)) {
276
0
            ret = flb_net_host_set(plugin->name, &instance->host, input);
277
0
            if (ret != 0) {
278
0
                flb_free(instance);
279
0
                return NULL;
280
0
            }
281
0
        }
282
283
/* initialize lock for access to chunk trace context. */
284
1.68k
#ifdef FLB_HAVE_CHUNK_TRACE
285
1.68k
        pthread_mutex_init(&instance->chunk_trace_lock, &attr);
286
1.68k
#endif
287
288
        /* Parent plugin flags */
289
1.68k
        flags = instance->flags;
290
1.68k
        if (flags & FLB_IO_TCP) {
291
0
            instance->use_tls = FLB_FALSE;
292
0
        }
293
1.68k
        else if (flags & FLB_IO_TLS) {
294
0
            instance->use_tls = FLB_TRUE;
295
0
        }
296
1.68k
        else if (flags & FLB_IO_OPT_TLS) {
297
            /* TLS must be enabled manually in the config */
298
0
            instance->use_tls = FLB_FALSE;
299
0
            instance->flags |= FLB_IO_TLS;
300
0
        }
301
302
1.68k
#ifdef FLB_HAVE_TLS
303
1.68k
        instance->tls                   = NULL;
304
1.68k
        instance->tls_debug             = -1;
305
1.68k
        instance->tls_verify            = FLB_TRUE;
306
1.68k
        instance->tls_vhost             = NULL;
307
1.68k
        instance->tls_ca_path           = NULL;
308
1.68k
        instance->tls_ca_file           = NULL;
309
1.68k
        instance->tls_crt_file          = NULL;
310
1.68k
        instance->tls_key_file          = NULL;
311
1.68k
        instance->tls_key_passwd        = NULL;
312
1.68k
#endif
313
314
        /* Plugin requires a co-routine context ? */
315
1.68k
        if (plugin->flags & FLB_INPUT_CORO) {
316
0
            instance->runs_in_coroutine = FLB_TRUE;
317
0
        }
318
319
        /* Plugin will run in a separate thread  ? */
320
1.68k
        if (plugin->flags & FLB_INPUT_THREADED) {
321
0
            instance->is_threaded = FLB_TRUE;
322
323
0
        }
324
325
        /* allocate a ring buffer */
326
1.68k
        instance->rb = flb_ring_buffer_create(FLB_INPUT_RING_BUFFER_SIZE);
327
1.68k
        if (!instance->rb) {
328
0
            flb_error("instance %s could not initialize ring buffer",
329
0
                      flb_input_name(instance));
330
0
            flb_free(instance);
331
0
            return NULL;
332
0
        }
333
334
1.68k
        instance->mem_buf_status = FLB_INPUT_RUNNING;
335
1.68k
        instance->mem_buf_limit = 0;
336
1.68k
        instance->mem_chunks_size = 0;
337
1.68k
        instance->storage_buf_status = FLB_INPUT_RUNNING;
338
1.68k
        mk_list_add(&instance->_head, &config->inputs);
339
1.68k
    }
340
341
1.68k
    return instance;
342
1.68k
}
343
344
static inline int prop_key_check(const char *key, const char *kv, int k_len)
345
1.68k
{
346
1.68k
    int len;
347
348
1.68k
    len = strlen(key);
349
350
1.68k
    if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
351
1.68k
        return 0;
352
1.68k
    }
353
354
0
    return -1;
355
1.68k
}
356
357
struct flb_input_instance *flb_input_get_instance(struct flb_config *config,
358
                                                  int ins_id)
359
0
{
360
0
    struct mk_list *head;
361
0
    struct flb_input_instance *ins;
362
363
0
    mk_list_foreach(head, &config->inputs) {
364
0
        ins = mk_list_entry(head, struct flb_input_instance, _head);
365
0
        if (ins->id == ins_id) {
366
0
            break;
367
0
        }
368
0
        ins = NULL;
369
0
    }
370
371
0
    if (!ins) {
372
0
        return NULL;
373
0
    }
374
375
0
    return ins;
376
0
}
377
378
static void flb_input_coro_destroy(struct flb_input_coro *input_coro)
379
0
{
380
0
    flb_debug("[input coro] destroy coro_id=%i", input_coro->id);
381
382
0
    mk_list_del(&input_coro->_head);
383
0
    flb_coro_destroy(input_coro->coro);
384
0
    flb_free(input_coro);
385
0
}
386
387
int flb_input_coro_finished(struct flb_config *config, int ins_id)
388
0
{
389
0
    struct mk_list *tmp;
390
0
    struct mk_list *head;
391
0
    struct flb_input_instance *ins;
392
0
    struct flb_input_coro *input_coro;
393
394
0
    ins = flb_input_get_instance(config, ins_id);
395
0
    if (!ins) {
396
0
        return -1;
397
0
    }
398
399
    /* Look for input coroutines that needs to be destroyed */
400
0
    mk_list_foreach_safe(head, tmp, &ins->input_coro_list_destroy) {
401
0
        input_coro = mk_list_entry(head, struct flb_input_coro, _head);
402
0
        flb_input_coro_destroy(input_coro);
403
0
    }
404
405
0
    return 0;
406
0
}
407
408
void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro)
409
0
{
410
0
    struct flb_input_instance *ins = input_coro->ins;
411
412
    /* move flb_input_coro from 'input_coro_list' to 'input_coro_list_destroy' */
413
0
    mk_list_del(&input_coro->_head);
414
0
    mk_list_add(&input_coro->_head, &ins->input_coro_list_destroy);
415
0
}
416
417
int flb_input_name_exists(const char *name, struct flb_config *config)
418
0
{
419
0
    struct mk_list *head;
420
0
    struct flb_input_instance *ins;
421
422
0
    mk_list_foreach(head, &config->inputs) {
423
0
        ins = mk_list_entry(head, struct flb_input_instance, _head);
424
0
        if (strcmp(ins->name, name) == 0) {
425
0
            return FLB_TRUE;
426
0
        }
427
428
0
        if (ins->alias) {
429
0
            if (strcmp(ins->alias, name) == 0) {
430
0
                return FLB_TRUE;
431
0
            }
432
0
        }
433
0
    }
434
435
0
    return FLB_FALSE;
436
0
}
437
438
struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins)
439
1.68k
{
440
1.68k
    struct flb_input_thread_instance *thi;
441
442
1.68k
    if (flb_input_is_threaded(ins)) {
443
0
        thi = ins->thi;
444
0
        return thi->evl;
445
0
    }
446
447
1.68k
    return ins->config->evl;
448
1.68k
}
449
450
/* Override a configuration property for the given input_instance plugin */
451
int flb_input_set_property(struct flb_input_instance *ins,
452
                           const char *k, const char *v)
453
1.68k
{
454
1.68k
    int len;
455
1.68k
    int ret;
456
1.68k
    int enabled;
457
1.68k
    ssize_t limit;
458
1.68k
    flb_sds_t tmp = NULL;
459
1.68k
    struct flb_kv *kv;
460
461
1.68k
    len = strlen(k);
462
1.68k
    tmp = flb_env_var_translate(ins->config->env, v);
463
1.68k
    if (tmp) {
464
1.68k
        if (flb_sds_len(tmp) == 0) {
465
0
            flb_sds_destroy(tmp);
466
0
            tmp = NULL;
467
0
        }
468
1.68k
    }
469
470
    /* Check if the key is a known/shared property */
471
1.68k
    if (prop_key_check("tag", k, len) == 0 && tmp) {
472
1.68k
        ins->tag     = tmp;
473
1.68k
        ins->tag_len = flb_sds_len(tmp);
474
1.68k
    }
475
0
    else if (prop_key_check("log_level", k, len) == 0 && tmp) {
476
0
        ret = flb_log_get_level_str(tmp);
477
0
        flb_sds_destroy(tmp);
478
0
        if (ret == -1) {
479
0
            return -1;
480
0
        }
481
0
        ins->log_level = ret;
482
0
    }
483
0
    else if (prop_key_check("routable", k, len) == 0 && tmp) {
484
0
        ins->routable = flb_utils_bool(tmp);
485
0
        flb_sds_destroy(tmp);
486
0
    }
487
0
    else if (prop_key_check("alias", k, len) == 0 && tmp) {
488
0
        ins->alias = tmp;
489
0
    }
490
0
    else if (prop_key_check("mem_buf_limit", k, len) == 0 && tmp) {
491
0
        limit = flb_utils_size_to_bytes(tmp);
492
0
        flb_sds_destroy(tmp);
493
0
        if (limit == -1) {
494
0
            return -1;
495
0
        }
496
0
        ins->mem_buf_limit = (size_t) limit;
497
0
    }
498
0
    else if (prop_key_check("listen", k, len) == 0) {
499
0
        ins->host.listen = tmp;
500
0
    }
501
0
    else if (prop_key_check("host", k, len) == 0) {
502
0
        ins->host.name   = tmp;
503
0
    }
504
0
    else if (prop_key_check("port", k, len) == 0) {
505
0
        if (tmp) {
506
0
            ins->host.port = atoi(tmp);
507
0
            flb_sds_destroy(tmp);
508
0
        }
509
0
    }
510
0
    else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
511
0
        ins->host.ipv6 = flb_utils_bool(tmp);
512
0
        flb_sds_destroy(tmp);
513
0
    }
514
0
    else if (strncasecmp("net.", k, 4) == 0 && tmp) {
515
0
        kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
516
0
        if (!kv) {
517
0
            if (tmp) {
518
0
                flb_sds_destroy(tmp);
519
0
            }
520
0
            return -1;
521
0
        }
522
0
        kv->val = tmp;
523
0
    }
524
525
0
#ifdef FLB_HAVE_TLS
526
0
    else if (prop_key_check("tls", k, len) == 0 && tmp) {
527
0
        if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
528
0
            if ((ins->flags & FLB_IO_TLS) == 0) {
529
0
                flb_error("[config] %s don't support TLS", ins->name);
530
0
                flb_sds_destroy(tmp);
531
0
                return -1;
532
0
            }
533
534
0
            ins->use_tls = FLB_TRUE;
535
0
        }
536
0
        else {
537
0
            ins->use_tls = FLB_FALSE;
538
0
        }
539
0
        flb_sds_destroy(tmp);
540
0
    }
541
0
    else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
542
0
        if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
543
0
            ins->tls_verify = FLB_TRUE;
544
0
        }
545
0
        else {
546
0
            ins->tls_verify = FLB_FALSE;
547
0
        }
548
0
        flb_sds_destroy(tmp);
549
0
    }
550
0
    else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
551
0
        ins->tls_debug = atoi(tmp);
552
0
        flb_sds_destroy(tmp);
553
0
    }
554
0
    else if (prop_key_check("tls.vhost", k, len) == 0) {
555
0
        ins->tls_vhost = tmp;
556
0
    }
557
0
    else if (prop_key_check("tls.ca_path", k, len) == 0) {
558
0
        ins->tls_ca_path = tmp;
559
0
    }
560
0
    else if (prop_key_check("tls.ca_file", k, len) == 0) {
561
0
        ins->tls_ca_file = tmp;
562
0
    }
563
0
    else if (prop_key_check("tls.crt_file", k, len) == 0) {
564
0
        ins->tls_crt_file = tmp;
565
0
    }
566
0
    else if (prop_key_check("tls.key_file", k, len) == 0) {
567
0
        ins->tls_key_file = tmp;
568
0
    }
569
0
    else if (prop_key_check("tls.key_passwd", k, len) == 0) {
570
0
        ins->tls_key_passwd = tmp;
571
0
    }
572
0
#endif
573
0
    else if (prop_key_check("storage.type", k, len) == 0 && tmp) {
574
        /* Set the storage type */
575
0
        if (strcasecmp(tmp, "filesystem") == 0) {
576
0
            ins->storage_type = FLB_STORAGE_FS;
577
0
        }
578
0
        else if (strcasecmp(tmp, "memory") == 0) {
579
0
            ins->storage_type = FLB_STORAGE_MEM;
580
0
        }
581
0
        else if (strcasecmp(tmp, "memrb") == 0) {
582
0
            ins->storage_type = FLB_STORAGE_MEMRB;
583
0
        }
584
0
        else {
585
0
            flb_sds_destroy(tmp);
586
0
            return -1;
587
0
        }
588
0
        flb_sds_destroy(tmp);
589
0
    }
590
0
    else if (prop_key_check("threaded", k, len) == 0 && tmp) {
591
0
        enabled = flb_utils_bool(tmp);
592
0
        flb_sds_destroy(tmp);
593
594
0
        if (enabled == -1) {
595
0
            return -1;
596
0
        }
597
598
0
        ins->is_threaded = enabled;
599
0
    }
600
0
    else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) {
601
0
        if (ins->storage_type == FLB_STORAGE_FS) {
602
0
            ret = flb_utils_bool(tmp);
603
0
            flb_sds_destroy(tmp);
604
0
            if (ret == -1) {
605
0
                return -1;
606
0
            }
607
0
            ins->storage_pause_on_chunks_overlimit = ret;
608
0
        }
609
0
    }
610
0
    else {
611
        /*
612
         * Create the property, we don't pass the value since we will
613
         * map it directly to avoid an extra memory allocation.
614
         */
615
0
        kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
616
0
        if (!kv) {
617
0
            if (tmp) {
618
0
                flb_sds_destroy(tmp);
619
0
            }
620
0
            return -1;
621
0
        }
622
0
        kv->val = tmp;
623
0
    }
624
625
1.68k
    return 0;
626
1.68k
}
627
628
const char *flb_input_get_property(const char *key,
629
                                   struct flb_input_instance *ins)
630
0
{
631
0
    return flb_config_prop_get(key, &ins->properties);
632
0
}
633
634
#ifdef FLB_HAVE_METRICS
635
void *flb_input_get_cmt_instance(struct flb_input_instance *ins)
636
0
{
637
0
    return (void *)ins->cmt;
638
0
}
639
#endif
640
641
/* Return an instance name or alias */
642
const char *flb_input_name(struct flb_input_instance *ins)
643
828k
{
644
828k
    if (ins->alias) {
645
0
        return ins->alias;
646
0
    }
647
648
828k
    return ins->name;
649
828k
}
650
651
void flb_input_instance_destroy(struct flb_input_instance *ins)
652
1.68k
{
653
1.68k
    struct mk_list *tmp;
654
1.68k
    struct mk_list *head;
655
1.68k
    struct flb_input_collector *collector;
656
657
1.68k
    if (ins->alias) {
658
0
        flb_sds_destroy(ins->alias);
659
0
    }
660
661
    /* Remove URI context */
662
1.68k
    if (ins->host.uri) {
663
0
        flb_uri_destroy(ins->host.uri);
664
0
    }
665
666
1.68k
    if (ins->host.name) {
667
0
        flb_sds_destroy(ins->host.name);
668
0
    }
669
1.68k
    if (ins->host.address) {
670
0
        flb_sds_destroy(ins->host.address);
671
0
    }
672
1.68k
    if (ins->host.listen) {
673
0
        flb_sds_destroy(ins->host.listen);
674
0
    }
675
676
1.68k
#ifdef FLB_HAVE_TLS
677
1.68k
    if (ins->use_tls) {
678
0
        if (ins->tls != NULL) {
679
0
            flb_tls_destroy(ins->tls);
680
0
        }
681
0
    }
682
683
1.68k
    if (ins->tls_config_map) {
684
1.68k
        flb_config_map_destroy(ins->tls_config_map);
685
1.68k
    }
686
1.68k
#endif
687
688
1.68k
    if (ins->tls_vhost) {
689
0
        flb_sds_destroy(ins->tls_vhost);
690
0
    }
691
692
1.68k
    if (ins->tls_ca_path) {
693
0
        flb_sds_destroy(ins->tls_ca_path);
694
0
    }
695
696
1.68k
    if (ins->tls_ca_file) {
697
0
        flb_sds_destroy(ins->tls_ca_file);
698
0
    }
699
700
1.68k
    if (ins->tls_crt_file) {
701
0
        flb_sds_destroy(ins->tls_crt_file);
702
0
    }
703
704
1.68k
    if (ins->tls_key_file) {
705
0
        flb_sds_destroy(ins->tls_key_file);
706
0
    }
707
708
1.68k
    if (ins->tls_key_passwd) {
709
0
        flb_sds_destroy(ins->tls_key_passwd);
710
0
    }
711
712
    /* release the tag if any */
713
1.68k
    flb_sds_destroy(ins->tag);
714
715
    /* Let the engine remove any pending task */
716
1.68k
    flb_engine_destroy_tasks(&ins->tasks);
717
718
    /* release properties */
719
1.68k
    flb_kv_release(&ins->properties);
720
1.68k
    flb_kv_release(&ins->net_properties);
721
722
723
1.68k
#ifdef FLB_HAVE_CHUNK_TRACE
724
1.68k
    flb_chunk_trace_context_destroy(ins);
725
1.68k
#endif /* FLB_HAVE_CHUNK_TRACE */
726
727
    /* Remove metrics */
728
1.68k
#ifdef FLB_HAVE_METRICS
729
1.68k
    if (ins->cmt) {
730
1.68k
        cmt_destroy(ins->cmt);
731
1.68k
    }
732
733
1.68k
    if (ins->metrics) {
734
1.68k
        flb_metrics_destroy(ins->metrics);
735
1.68k
    }
736
1.68k
#endif
737
738
1.68k
    if (ins->storage) {
739
1.68k
        flb_storage_input_destroy(ins);
740
1.68k
    }
741
742
    /* destroy config map */
743
1.68k
    if (ins->config_map) {
744
0
        flb_config_map_destroy(ins->config_map);
745
0
    }
746
747
1.68k
    if (ins->net_config_map) {
748
1.68k
        flb_config_map_destroy(ins->net_config_map);
749
1.68k
    }
750
751
    /* hash table for chunks */
752
1.68k
    if (ins->ht_log_chunks) {
753
1.68k
        flb_hash_table_destroy(ins->ht_log_chunks);
754
1.68k
    }
755
756
1.68k
    if (ins->ht_metric_chunks) {
757
1.68k
        flb_hash_table_destroy(ins->ht_metric_chunks);
758
1.68k
    }
759
760
1.68k
    if (ins->ht_trace_chunks) {
761
1.68k
        flb_hash_table_destroy(ins->ht_trace_chunks);
762
1.68k
    }
763
764
1.68k
    if (ins->ch_events[0] > 0) {
765
1.68k
        mk_event_closesocket(ins->ch_events[0]);
766
1.68k
    }
767
768
1.68k
    if (ins->ch_events[1] > 0) {
769
1.68k
        mk_event_closesocket(ins->ch_events[1]);
770
1.68k
    }
771
772
    /* Collectors */
773
1.68k
    mk_list_foreach_safe(head, tmp, &ins->collectors) {
774
1.68k
        collector = mk_list_entry(head, struct flb_input_collector, _head);
775
1.68k
        mk_list_del(&collector->_head);
776
1.68k
        flb_input_collector_destroy(collector);
777
1.68k
    }
778
779
    /* delete storage context */
780
1.68k
    flb_storage_input_destroy(ins);
781
782
1.68k
    mk_list_del(&ins->_head);
783
784
1.68k
    if (ins->rb) {
785
1.68k
        flb_ring_buffer_destroy(ins->rb);
786
1.68k
    }
787
1.68k
    flb_free(ins);
788
1.68k
}
789
790
int flb_input_coro_id_get(struct flb_input_instance *ins)
791
0
{
792
0
    int id;
793
0
    int max = (2 << 13) - 1; /* max for 14 bits */
794
795
0
    id = ins->input_coro_id;
796
0
    ins->input_coro_id++;
797
798
    /* reset once it reach the maximum allowed */
799
0
    if (ins->input_coro_id > max) {
800
0
        ins->input_coro_id = 0;
801
0
    }
802
803
0
    return id;
804
0
}
805
806
static int input_instance_channel_events_init(struct flb_input_instance *ins)
807
1.68k
{
808
1.68k
    int ret;
809
1.68k
    struct mk_event_loop *evl;
810
811
1.68k
    evl = flb_input_event_loop_get(ins);
812
813
    /* Input event channel: used for co-routines to report return status */
814
1.68k
    ret = mk_event_channel_create(evl,
815
1.68k
                                  &ins->ch_events[0],
816
1.68k
                                  &ins->ch_events[1],
817
1.68k
                                  ins);
818
1.68k
    if (ret != 0) {
819
0
        flb_error("could not create events channels for '%s'",
820
0
                  flb_input_name(ins));
821
0
        return -1;
822
0
    }
823
824
1.68k
    flb_debug("[%s:%s] created event channels: read=%i write=%i",
825
1.68k
              ins->p->name, flb_input_name(ins),
826
1.68k
              ins->ch_events[0], ins->ch_events[1]);
827
828
    /*
829
     * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
830
     * default, we need to overwrite this value so we can do a clean check
831
     * into the Engine when the event is triggered.
832
     */
833
1.68k
    ins->event.type = FLB_ENGINE_EV_INPUT;
834
835
1.68k
    return 0;
836
1.68k
}
837
838
int flb_input_instance_init(struct flb_input_instance *ins,
839
                            struct flb_config *config)
840
1.68k
{
841
1.68k
    int ret;
842
1.68k
    struct flb_config *ctx = ins->config;
843
1.68k
    struct mk_list *config_map;
844
1.68k
    struct flb_input_plugin *p = ins->p;
845
1.68k
    int tls_session_mode;
846
847
1.68k
    if (ins->log_level == -1 && config->log != NULL) {
848
1.68k
        ins->log_level = config->log->level;
849
1.68k
    }
850
851
    /* Skip pseudo input plugins */
852
1.68k
    if (!p) {
853
0
        return 0;
854
0
    }
855
856
857
1.68k
#ifdef FLB_HAVE_METRICS
858
1.68k
    uint64_t ts;
859
1.68k
    char *name;
860
861
1.68k
    name = (char *) flb_input_name(ins);
862
1.68k
    ts = cfl_time_now();
863
864
    /* CMetrics */
865
1.68k
    ins->cmt = cmt_create();
866
1.68k
    if (!ins->cmt) {
867
0
        flb_error("[input] could not create cmetrics context: %s",
868
0
                  flb_input_name(ins));
869
0
        return -1;
870
0
    }
871
872
    /*
873
     * Register generic input plugin metrics
874
     * -------------------------------------
875
     */
876
877
    /* fluentbit_input_bytes_total */
878
1.68k
    ins->cmt_bytes = \
879
1.68k
        cmt_counter_create(ins->cmt,
880
1.68k
                           "fluentbit", "input", "bytes_total",
881
1.68k
                           "Number of input bytes.",
882
1.68k
                           1, (char *[]) {"name"});
883
1.68k
    cmt_counter_set(ins->cmt_bytes, ts, 0, 1, (char *[]) {name});
884
885
    /* fluentbit_input_records_total */
886
1.68k
    ins->cmt_records = \
887
1.68k
        cmt_counter_create(ins->cmt,
888
1.68k
                           "fluentbit", "input", "records_total",
889
1.68k
                           "Number of input records.",
890
1.68k
                           1, (char *[]) {"name"});
891
1.68k
    cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name});
892
893
    /* Storage Metrics */
894
1.68k
    if (ctx->storage_metrics == FLB_TRUE) {
895
        /* fluentbit_input_storage_overlimit */
896
1.68k
        ins->cmt_storage_overlimit = \
897
1.68k
            cmt_gauge_create(ins->cmt,
898
1.68k
                             "fluentbit", "input",
899
1.68k
                             "storage_overlimit",
900
1.68k
                             "Is the input memory usage overlimit ?.",
901
1.68k
                             1, (char *[]) {"name"});
902
1.68k
        cmt_gauge_set(ins->cmt_storage_overlimit, ts, 0, 1, (char *[]) {name});
903
904
        /* fluentbit_input_storage_memory_bytes */
905
1.68k
        ins->cmt_storage_memory_bytes = \
906
1.68k
            cmt_gauge_create(ins->cmt,
907
1.68k
                             "fluentbit", "input",
908
1.68k
                             "storage_memory_bytes",
909
1.68k
                             "Memory bytes used by the chunks.",
910
1.68k
                             1, (char *[]) {"name"});
911
1.68k
        cmt_gauge_set(ins->cmt_storage_memory_bytes, ts, 0, 1, (char *[]) {name});
912
913
        /* fluentbit_input_storage_chunks */
914
1.68k
        ins->cmt_storage_chunks = \
915
1.68k
            cmt_gauge_create(ins->cmt,
916
1.68k
                             "fluentbit", "input",
917
1.68k
                             "storage_chunks",
918
1.68k
                             "Total number of chunks.",
919
1.68k
                             1, (char *[]) {"name"});
920
1.68k
        cmt_gauge_set(ins->cmt_storage_chunks, ts, 0, 1, (char *[]) {name});
921
922
        /* fluentbit_input_storage_chunks_up */
923
1.68k
        ins->cmt_storage_chunks_up = \
924
1.68k
            cmt_gauge_create(ins->cmt,
925
1.68k
                             "fluentbit", "input",
926
1.68k
                             "storage_chunks_up",
927
1.68k
                             "Total number of chunks up in memory.",
928
1.68k
                             1, (char *[]) {"name"});
929
1.68k
        cmt_gauge_set(ins->cmt_storage_chunks_up, ts, 0, 1, (char *[]) {name});
930
931
        /* fluentbit_input_storage_chunks_down */
932
1.68k
        ins->cmt_storage_chunks_down = \
933
1.68k
            cmt_gauge_create(ins->cmt,
934
1.68k
                             "fluentbit", "input",
935
1.68k
                             "storage_chunks_down",
936
1.68k
                             "Total number of chunks down.",
937
1.68k
                             1, (char *[]) {"name"});
938
1.68k
        cmt_gauge_set(ins->cmt_storage_chunks_down, ts, 0, 1, (char *[]) {name});
939
940
        /* fluentbit_input_storage_chunks_busy */
941
1.68k
        ins->cmt_storage_chunks_busy = \
942
1.68k
            cmt_gauge_create(ins->cmt,
943
1.68k
                             "fluentbit", "input",
944
1.68k
                             "storage_chunks_busy",
945
1.68k
                             "Total number of chunks in a busy state.",
946
1.68k
                             1, (char *[]) {"name"});
947
1.68k
        cmt_gauge_set(ins->cmt_storage_chunks_busy, ts, 0, 1, (char *[]) {name});
948
949
        /* fluentbit_input_storage_chunks_busy_bytes */
950
1.68k
        ins->cmt_storage_chunks_busy_bytes = \
951
1.68k
            cmt_gauge_create(ins->cmt,
952
1.68k
                             "fluentbit", "input",
953
1.68k
                             "storage_chunks_busy_bytes",
954
1.68k
                             "Total number of bytes used by chunks in a busy state.",
955
1.68k
                             1, (char *[]) {"name"});
956
1.68k
        cmt_gauge_set(ins->cmt_storage_chunks_busy_bytes, ts, 0, 1, (char *[]) {name});
957
1.68k
    }
958
959
1.68k
    if (ins->storage_type == FLB_STORAGE_MEMRB) {
960
        /* fluentbit_input_memrb_dropped_chunks */
961
0
        ins->cmt_memrb_dropped_chunks = cmt_counter_create(ins->cmt,
962
0
                                                          "fluentbit", "input",
963
0
                                                          "memrb_dropped_chunks",
964
0
                                                          "Number of memrb dropped chunks.",
965
0
                                                          1, (char *[]) {"name"});
966
0
        cmt_counter_set(ins->cmt_memrb_dropped_chunks, ts, 0, 1, (char *[]) {name});
967
968
969
        /* fluentbit_input_memrb_dropped_bytes */
970
0
        ins->cmt_memrb_dropped_bytes = cmt_counter_create(ins->cmt,
971
0
                                                          "fluentbit", "input",
972
0
                                                          "memrb_dropped_bytes",
973
0
                                                          "Number of memrb dropped bytes.",
974
0
                                                          1, (char *[]) {"name"});
975
976
0
        cmt_counter_set(ins->cmt_memrb_dropped_bytes, ts, 0, 1, (char *[]) {name});
977
0
    }
978
979
    /* OLD Metrics */
980
1.68k
    ins->metrics = flb_metrics_create(name);
981
1.68k
    if (ins->metrics) {
982
1.68k
        flb_metrics_add(FLB_METRIC_N_RECORDS, "records", ins->metrics);
983
1.68k
        flb_metrics_add(FLB_METRIC_N_BYTES, "bytes", ins->metrics);
984
1.68k
    }
985
1.68k
#endif
986
987
    /*
988
     * Before to call the initialization callback, make sure that the received
989
     * configuration parameters are valid if the plugin is registering a config map.
990
     */
991
1.68k
    if (p->config_map) {
992
        /*
993
         * Create a dynamic version of the configmap that will be used by the specific
994
         * instance in question.
995
         */
996
0
        config_map = flb_config_map_create(config, p->config_map);
997
0
        if (!config_map) {
998
0
            flb_error("[input] error loading config map for '%s' plugin",
999
0
                      p->name);
1000
0
            flb_input_instance_destroy(ins);
1001
0
            return -1;
1002
0
        }
1003
0
        ins->config_map = config_map;
1004
1005
        /* Validate incoming properties against config map */
1006
0
        ret = flb_config_map_properties_check(ins->p->name,
1007
0
                                              &ins->properties, ins->config_map);
1008
0
        if (ret == -1) {
1009
0
            if (config->program_name) {
1010
0
                flb_helper("try the command: %s -i %s -h\n",
1011
0
                           config->program_name, ins->p->name);
1012
0
            }
1013
0
            flb_input_instance_destroy(ins);
1014
0
            return -1;
1015
0
        }
1016
0
    }
1017
1018
1.68k
#ifdef FLB_HAVE_TLS
1019
1.68k
    if (ins->use_tls == FLB_TRUE) {
1020
0
        if ((p->flags & FLB_INPUT_NET_SERVER) != 0) {
1021
0
            if (ins->tls_crt_file == NULL) {
1022
0
                flb_error("[input %s] error initializing TLS context "
1023
0
                          "(certificate file missing)",
1024
0
                          ins->name);
1025
1026
0
                flb_input_instance_destroy(ins);
1027
1028
0
                return -1;
1029
0
            }
1030
0
            else if (ins->tls_key_file == NULL) {
1031
0
                flb_error("[input %s] error initializing TLS context "
1032
0
                          "(private key file missing)",
1033
0
                          ins->name);
1034
1035
0
                flb_input_instance_destroy(ins);
1036
1037
0
                return -1;
1038
0
            }
1039
1040
0
            tls_session_mode = FLB_TLS_SERVER_MODE;
1041
0
        }
1042
0
        else {
1043
0
            tls_session_mode = FLB_TLS_CLIENT_MODE;
1044
0
        }
1045
1046
0
        ins->tls = flb_tls_create(tls_session_mode,
1047
0
                                  ins->tls_verify,
1048
0
                                  ins->tls_debug,
1049
0
                                  ins->tls_vhost,
1050
0
                                  ins->tls_ca_path,
1051
0
                                  ins->tls_ca_file,
1052
0
                                  ins->tls_crt_file,
1053
0
                                  ins->tls_key_file,
1054
0
                                  ins->tls_key_passwd);
1055
1056
0
        if (ins->tls == NULL) {
1057
0
            flb_error("[input %s] error initializing TLS context",
1058
0
                      ins->name);
1059
1060
0
            flb_input_instance_destroy(ins);
1061
1062
0
            return -1;
1063
0
        }
1064
0
    }
1065
1066
1.68k
    struct flb_config_map *m;
1067
1068
    /* TLS config map (just for 'help' formatting purposes) */
1069
1.68k
    ins->tls_config_map = flb_tls_get_config_map(config);
1070
1071
1.68k
    if (ins->tls_config_map == NULL) {
1072
0
        flb_input_instance_destroy(ins);
1073
1074
0
        return -1;
1075
0
    }
1076
1077
    /* Override first configmap value based on it plugin flag */
1078
1.68k
    m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
1079
1.68k
    if (p->flags & FLB_IO_TLS) {
1080
0
        m->value.val.boolean = FLB_TRUE;
1081
0
    }
1082
1.68k
    else {
1083
1.68k
        m->value.val.boolean = FLB_FALSE;
1084
1.68k
    }
1085
1.68k
#endif
1086
1087
    /* Init network defaults */
1088
1.68k
    flb_net_setup_init(&ins->net_setup);
1089
1090
    /* Get Downstream net_setup configmap */
1091
1.68k
    ins->net_config_map = flb_downstream_get_config_map(config);
1092
1.68k
    if (!ins->net_config_map) {
1093
0
        flb_input_instance_destroy(ins);
1094
0
        return -1;
1095
0
    }
1096
1097
    /*
1098
     * Validate 'net.*' properties: if the plugin use the Downstream interface,
1099
     * it might receive some networking settings.
1100
     */
1101
1.68k
    if (mk_list_size(&ins->net_properties) > 0) {
1102
0
        ret = flb_config_map_properties_check(ins->p->name,
1103
0
                                              &ins->net_properties,
1104
0
                                              ins->net_config_map);
1105
0
        if (ret == -1) {
1106
0
            if (config->program_name) {
1107
0
                flb_helper("try the command: %s -i %s -h\n",
1108
0
                           config->program_name, ins->p->name);
1109
0
            }
1110
0
            flb_input_instance_destroy(ins);
1111
0
            return -1;
1112
0
        }
1113
0
    }
1114
1115
    /* Initialize the input */
1116
1.68k
    if (p->cb_init) {
1117
1.68k
        flb_plg_info(ins, "initializing");
1118
1.68k
        flb_plg_info(ins, "storage_strategy=%s", flb_storage_get_type(ins->storage_type));
1119
1120
        /* Sanity check: all non-dynamic tag input plugins must have a tag */
1121
1.68k
        if (!ins->tag) {
1122
1
            flb_input_set_property(ins, "tag", ins->name);
1123
1
        }
1124
1125
1.68k
        if (flb_input_is_threaded(ins)) {
1126
            /*
1127
             * Create a thread for a new instance. Now the plugin initialization callback will be invoked and report an early failure
1128
             * or an 'ok' status, we will wait for that return value on flb_input_thread_instance_get_status() below.
1129
             */
1130
0
            ret = flb_input_thread_instance_init(config, ins);
1131
0
            if (ret != 0) {
1132
0
                flb_error("failed initialize input %s",
1133
0
                          ins->name);
1134
0
                flb_input_instance_destroy(ins);
1135
0
                return -1;
1136
0
            }
1137
1138
            /* initialize channel events */
1139
0
            ret = input_instance_channel_events_init(ins);
1140
0
            if (ret != 0) {
1141
0
                flb_error("failed initialize channel events on input %s",
1142
0
                          ins->name);
1143
0
                flb_input_instance_destroy(ins);
1144
0
                return -1;
1145
0
            }
1146
1147
            /* register the ring buffer */
1148
0
            ret = flb_ring_buffer_add_event_loop(ins->rb, config->evl, FLB_INPUT_RING_BUFFER_WINDOW);
1149
0
            if (ret) {
1150
0
                flb_error("failed while registering ring buffer events on input %s",
1151
0
                          ins->name);
1152
0
                flb_input_instance_destroy(ins);
1153
0
                return -1;
1154
0
            }
1155
0
        }
1156
1.68k
        else {
1157
            /* initialize channel events */
1158
1.68k
            ret = input_instance_channel_events_init(ins);
1159
1.68k
            if (ret != 0) {
1160
0
                flb_error("failed initialize channel events on input %s",
1161
0
                          ins->name);
1162
0
            }
1163
1.68k
            ret = p->cb_init(ins, config, ins->data);
1164
1.68k
            if (ret != 0) {
1165
0
                flb_error("failed initialize input %s",
1166
0
                          ins->name);
1167
0
                flb_input_instance_destroy(ins);
1168
0
                return -1;
1169
0
            }
1170
1.68k
        }
1171
1.68k
    }
1172
1173
1.68k
    return 0;
1174
1.68k
}
1175
1176
int flb_input_instance_pre_run(struct flb_input_instance *ins, struct flb_config *config)
1177
1.68k
{
1178
1.68k
    int ret;
1179
1180
1.68k
    if (flb_input_is_threaded(ins)) {
1181
0
        return flb_input_thread_instance_pre_run(config, ins);
1182
0
    }
1183
1.68k
    else if (ins->p->cb_pre_run) {
1184
0
            ret = ins->p->cb_pre_run(ins, config, ins->context);
1185
0
            if (ret == -1) {
1186
0
                return -1;
1187
0
            }
1188
0
            return 0;
1189
0
    }
1190
1191
1.68k
    return 0;
1192
1.68k
}
1193
1194
/* Initialize all inputs */
1195
int flb_input_init_all(struct flb_config *config)
1196
1.68k
{
1197
1.68k
    int ret;
1198
1.68k
    struct mk_list *tmp;
1199
1.68k
    struct mk_list *head;
1200
1.68k
    struct flb_input_instance *ins;
1201
1.68k
    struct flb_input_plugin *p;
1202
1203
    /* Initialize thread-id table */
1204
1.68k
    memset(&config->in_table_id, '\0', sizeof(config->in_table_id));
1205
1206
    /* Iterate all active input instance plugins */
1207
1.68k
    mk_list_foreach_safe(head, tmp, &config->inputs) {
1208
1.68k
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1209
1.68k
        p = ins->p;
1210
1211
        /* Skip pseudo input plugins */
1212
1.68k
        if (!p) {
1213
0
            continue;
1214
0
        }
1215
1216
        /* Initialize instance */
1217
1.68k
        ret = flb_input_instance_init(ins, config);
1218
1.68k
        if (ret == -1) {
1219
            /* do nothing, it's ok if it fails */
1220
0
            return -1;
1221
0
        }
1222
1.68k
    }
1223
1224
1.68k
    return 0;
1225
1.68k
}
1226
1227
/* Invoke all pre-run input callbacks */
1228
void flb_input_pre_run_all(struct flb_config *config)
1229
1.68k
{
1230
1.68k
    struct mk_list *head;
1231
1.68k
    struct flb_input_instance *ins;
1232
1.68k
    struct flb_input_plugin *p;
1233
1234
1.68k
    mk_list_foreach(head, &config->inputs) {
1235
1.68k
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1236
1.68k
        p = ins->p;
1237
1.68k
        if (!p) {
1238
0
            continue;
1239
0
        }
1240
1241
1.68k
        flb_input_instance_pre_run(ins, config);
1242
1.68k
    }
1243
1.68k
}
1244
1245
void flb_input_instance_exit(struct flb_input_instance *ins,
1246
                             struct flb_config *config)
1247
1.68k
{
1248
1.68k
    struct flb_input_plugin *p;
1249
1250
    /* if the instance runs in a separate thread, signal the thread */
1251
1.68k
    if (flb_input_is_threaded(ins)) {
1252
0
        flb_input_thread_instance_exit(ins);
1253
0
        return;
1254
0
    }
1255
1256
1.68k
    p = ins->p;
1257
1.68k
    if (p->cb_exit && ins->context) {
1258
        /* Multi-threaded input plugins use the same function signature for exit callbacks. */
1259
1.68k
        p->cb_exit(ins->context, config);
1260
1.68k
    }
1261
1.68k
}
1262
1263
/* Invoke all exit input callbacks */
1264
void flb_input_exit_all(struct flb_config *config)
1265
1.68k
{
1266
1.68k
    struct mk_list *tmp;
1267
1.68k
    struct mk_list *head;
1268
1.68k
    struct flb_input_instance *ins;
1269
1.68k
    struct flb_input_plugin *p;
1270
1271
    /* Iterate instances */
1272
1.68k
    mk_list_foreach_safe_r(head, tmp, &config->inputs) {
1273
1.68k
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1274
1.68k
        p = ins->p;
1275
1.68k
        if (!p) {
1276
0
            continue;
1277
0
        }
1278
1279
        /* invoke plugin instance exit callback */
1280
1.68k
        flb_input_instance_exit(ins, config);
1281
1282
        /* destroy the instance */
1283
1.68k
        flb_input_instance_destroy(ins);
1284
1.68k
    }
1285
1.68k
}
1286
1287
/* Check that at least one Input is enabled */
1288
int flb_input_check(struct flb_config *config)
1289
0
{
1290
0
    if (mk_list_is_empty(&config->inputs) == 0) {
1291
0
        return -1;
1292
0
    }
1293
1294
0
    return 0;
1295
0
}
1296
1297
/*
1298
 * API for Input plugins
1299
 * =====================
1300
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
1301
 * The Input interface provides a certain number of functions that can be
1302
 * used by Input plugins to configure it own behavior and request specific
1303
 *
1304
 *  1. flb_input_set_context()
1305
 *
1306
 *     let an Input plugin set a context data reference that can be used
1307
 *     later when invoking other callbacks.
1308
 *
1309
 *  2. flb_input_set_collector_time()
1310
 *
1311
 *     request the Engine to trigger a specific collector callback at a
1312
 *     certain interval time. Note that this callback will run in the main
1313
 *     thread so it computing time must be short, otherwise it will block
1314
 *     the main loop.
1315
 *
1316
 *     The collector can runs in timeouts of the order of seconds.nanoseconds
1317
 *
1318
 *      note: 1 Second = 1000000000 Nanosecond
1319
 *
1320
 *  3. flb_input_set_collector_event()
1321
 *
1322
 *     for a registered file descriptor, associate the READ events to a
1323
 *     specified plugin. Every time there is some data to read, the collector
1324
 *     callback will be triggered. Oriented to a file descriptor that already
1325
 *     have information that may be read through iotctl(..FIONREAD..);
1326
 *
1327
 *  4. flb_input_set_collector_server()
1328
 *
1329
 *     it register a collector based on TCP socket events. It register a socket
1330
 *     who did bind() and listen() and for each event on the socket it triggers
1331
 *     the registered callbacks.
1332
 */
1333
1334
/* Assign an Configuration context to an Input */
1335
void flb_input_set_context(struct flb_input_instance *in, void *context)
1336
1.68k
{
1337
1.68k
    in->context = context;
1338
1.68k
}
1339
1340
int flb_input_channel_init(struct flb_input_instance *in)
1341
1.68k
{
1342
1.68k
    return flb_pipe_create(in->channel);
1343
1.68k
}
1344
1345
static struct flb_input_collector *collector_create(int type,
1346
                                                    struct flb_input_instance *ins,
1347
                                                            int (*cb) (
1348
                                                            struct flb_input_instance *,
1349
                                                            struct flb_config *, void *),
1350
                                                    struct flb_config *config)
1351
1.68k
{
1352
1.68k
    struct flb_input_collector *coll;
1353
1.68k
    struct flb_input_thread_instance *thi;
1354
1355
1.68k
    coll = flb_calloc(1, sizeof(struct flb_input_collector));
1356
1.68k
    if (!coll) {
1357
0
        flb_errno();
1358
0
        return NULL;
1359
0
    }
1360
1361
1.68k
    coll->id          = collector_id(ins);
1362
1.68k
    coll->type        = type;
1363
1.68k
    coll->running     = FLB_FALSE;
1364
1.68k
    coll->fd_event    = -1;
1365
1.68k
    coll->fd_timer    = -1;
1366
1.68k
    coll->seconds     = -1;
1367
1.68k
    coll->nanoseconds = -1;
1368
1.68k
    coll->cb_collect  = cb;
1369
1.68k
    coll->instance    = ins;
1370
1.68k
    MK_EVENT_ZERO(&coll->event);
1371
1372
1.68k
    if (flb_input_is_threaded(ins)) {
1373
0
        thi = ins->thi;
1374
0
        coll->evl = thi->evl;
1375
0
    }
1376
1.68k
    else {
1377
1.68k
        coll->evl = config->evl;
1378
1.68k
    }
1379
1380
    /*
1381
     * Collectors created from a threaded input instance are only added to the
1382
     * instance `collectors` list. For instances in non-threaded mode, they are
1383
     * added to both lists, the global config collectors list and the instance
1384
     * list.
1385
     */
1386
1.68k
    mk_list_add(&coll->_head, &ins->collectors);
1387
1388
1.68k
    return coll;
1389
1.68k
}
1390
1391
1392
int flb_input_set_collector_time(struct flb_input_instance *ins,
1393
                                 int (*cb_collect) (struct flb_input_instance *,
1394
                                                    struct flb_config *, void *),
1395
                                 time_t seconds,
1396
                                 long   nanoseconds,
1397
                                 struct flb_config *config)
1398
0
{
1399
0
    struct flb_input_collector *coll;
1400
1401
0
    coll = collector_create(FLB_COLLECT_TIME, ins, cb_collect, config);
1402
0
    if (!coll) {
1403
0
        return -1;
1404
0
    }
1405
1406
    /* specific collector initialization */
1407
0
    coll->seconds     = seconds;
1408
0
    coll->nanoseconds = nanoseconds;
1409
1410
0
    return coll->id;
1411
0
}
1412
1413
int flb_input_set_collector_event(struct flb_input_instance *ins,
1414
                                  int (*cb_collect) (struct flb_input_instance *,
1415
                                                     struct flb_config *, void *),
1416
                                  flb_pipefd_t fd,
1417
                                  struct flb_config *config)
1418
1.68k
{
1419
1.68k
    struct flb_input_collector *coll;
1420
1421
1.68k
    coll = collector_create(FLB_COLLECT_FD_EVENT, ins, cb_collect, config);
1422
1.68k
    if (!coll) {
1423
0
        return -1;
1424
0
    }
1425
1426
    /* specific collector initialization */
1427
1.68k
    coll->fd_event = fd;
1428
1429
1.68k
    return coll->id;
1430
1.68k
}
1431
1432
int flb_input_set_collector_socket(struct flb_input_instance *ins,
1433
                                   int (*cb_new_connection) (struct flb_input_instance *,
1434
                                                             struct flb_config *,
1435
                                                             void *),
1436
                                   flb_pipefd_t fd,
1437
                                   struct flb_config *config)
1438
0
{
1439
0
    struct flb_input_collector *coll;
1440
1441
1442
0
    coll = collector_create(FLB_COLLECT_FD_SERVER, ins, cb_new_connection, config);
1443
0
    if (!coll) {
1444
0
        return -1;
1445
0
    }
1446
1447
    /* specific collector initialization */
1448
0
    coll->fd_event = fd;
1449
1450
0
    return coll->id;
1451
0
}
1452
1453
1454
static int collector_start(struct flb_input_collector *coll,
1455
                           struct flb_config *config)
1456
1.68k
{
1457
1.68k
    int fd;
1458
1.68k
    int ret;
1459
1.68k
    struct mk_event *event;
1460
1461
1.68k
    if (coll->running == FLB_TRUE) {
1462
0
        return 0;
1463
0
    }
1464
1465
1.68k
    event = &coll->event;
1466
1.68k
    event->mask = MK_EVENT_EMPTY;
1467
1.68k
    event->status = MK_EVENT_NONE;
1468
1469
1.68k
    if (coll->type == FLB_COLLECT_TIME) {
1470
0
        fd = mk_event_timeout_create(coll->evl, coll->seconds,
1471
0
                                     coll->nanoseconds, event);
1472
0
        if (fd == -1) {
1473
0
            flb_error("[input collector] COLLECT_TIME registration failed");
1474
0
            coll->running = FLB_FALSE;
1475
0
            return -1;
1476
0
        }
1477
0
        coll->fd_timer = fd;
1478
0
    }
1479
1.68k
    else if (coll->type & (FLB_COLLECT_FD_EVENT | FLB_COLLECT_FD_SERVER)) {
1480
1.68k
        event->fd = coll->fd_event;
1481
1.68k
        ret = mk_event_add(coll->evl,
1482
1.68k
                           coll->fd_event,
1483
1.68k
                           FLB_ENGINE_EV_CORE,
1484
1.68k
                           MK_EVENT_READ, event);
1485
1.68k
        if (ret == -1) {
1486
0
            flb_error("[input collector] COLLECT_EVENT registration failed");
1487
0
            mk_event_closesocket(coll->fd_event);
1488
0
            coll->running = FLB_FALSE;
1489
0
            return -1;
1490
0
        }
1491
1.68k
    }
1492
1493
1.68k
    coll->running = FLB_TRUE;
1494
1.68k
    return 0;
1495
1.68k
}
1496
1497
int flb_input_collector_start(int coll_id, struct flb_input_instance *in)
1498
1.68k
{
1499
1.68k
    int ret;
1500
1.68k
    int c = 0;
1501
1.68k
    struct mk_list *head;
1502
1.68k
    struct flb_input_collector *coll;
1503
1504
1.68k
    mk_list_foreach(head, &in->collectors) {
1505
1.68k
        coll = mk_list_entry(head, struct flb_input_collector, _head);
1506
1.68k
        if (coll->id == coll_id) {
1507
1.68k
            ret = collector_start(coll, in->config);
1508
1.68k
            if (ret == -1) {
1509
0
                flb_error("[input] error starting collector #%i: %s",
1510
0
                          coll_id, in->name);
1511
0
            }
1512
1.68k
            return ret;
1513
1.68k
        }
1514
0
        c++;
1515
0
    }
1516
1517
0
    return -1;
1518
1.68k
}
1519
1520
/* start collectors for main thread, no threaded plugins */
1521
int flb_input_collectors_signal_start(struct flb_input_instance *ins)
1522
1.68k
{
1523
1.68k
    int ret;
1524
1.68k
    struct mk_list *head;
1525
1.68k
    struct flb_input_collector *coll;
1526
1527
1.68k
    if (flb_input_is_threaded(ins)) {
1528
0
        flb_error("input plugin '%s' is threaded", flb_input_name(ins));
1529
0
        return -1;
1530
0
    }
1531
1532
1.68k
    mk_list_foreach(head, &ins->collectors) {
1533
1.68k
        coll = mk_list_entry(head, struct flb_input_collector, _head);
1534
1.68k
        ret = flb_input_collector_start(coll->id, ins);
1535
1.68k
        if (ret < 0) {
1536
0
            return -1;
1537
0
        }
1538
1.68k
    }
1539
1540
1.68k
    return 0;
1541
1.68k
}
1542
1543
/*
1544
 * Start all collectors: this function is invoked from the engine interface and aim
1545
 * to start the local collectors and also signal the threaded input plugins to start
1546
 * their own collectors.
1547
 */
1548
int flb_input_collectors_start(struct flb_config *config)
1549
1.68k
{
1550
1.68k
    int ret;
1551
1.68k
    struct mk_list *head;
1552
1.68k
    struct flb_input_instance *ins;
1553
1554
    /* Signal threaded input plugins to start their collectors */
1555
1.68k
    mk_list_foreach(head, &config->inputs) {
1556
1.68k
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1557
1.68k
        if (flb_input_is_threaded(ins)) {
1558
0
            ret = flb_input_thread_collectors_signal_start(ins);
1559
0
            if (ret != 0) {
1560
0
                flb_error("could not start collectors for threaded plugin '%s'",
1561
0
                          flb_input_name(ins));
1562
0
            }
1563
0
        }
1564
1.68k
        else {
1565
1.68k
            ret = flb_input_collectors_signal_start(ins);
1566
1.68k
            if (ret != 0) {
1567
0
                flb_error("could not start collectors for plugin '%s'",
1568
0
                          flb_input_name(ins));
1569
0
            }
1570
1.68k
        }
1571
1.68k
    }
1572
1573
1.68k
    return 0;
1574
1.68k
}
1575
1576
static struct flb_input_collector *get_collector(int id,
1577
                                                 struct flb_input_instance *in)
1578
0
{
1579
0
    struct mk_list *head;
1580
0
    struct flb_input_collector *coll;
1581
1582
0
    mk_list_foreach(head, &in->collectors) {
1583
0
        coll = mk_list_entry(head, struct flb_input_collector, _head);
1584
0
        if (coll->id == id) {
1585
0
            return coll;
1586
0
        }
1587
0
    }
1588
1589
0
    return NULL;
1590
0
}
1591
1592
int flb_input_collector_running(int coll_id, struct flb_input_instance *in)
1593
0
{
1594
0
    struct flb_input_collector *coll;
1595
1596
0
    coll = get_collector(coll_id, in);
1597
0
    if (!coll) {
1598
0
        return FLB_FALSE;
1599
0
    }
1600
1601
0
    return coll->running;
1602
0
}
1603
1604
struct mk_event *flb_input_collector_get_event(int coll_id,
1605
                                               struct flb_input_instance *ins)
1606
0
{
1607
0
    struct flb_input_collector *collector;
1608
1609
0
    collector = get_collector(coll_id, ins);
1610
1611
0
    if (collector == NULL) {
1612
0
        return NULL;
1613
0
    }
1614
1615
0
    return &collector->event;
1616
0
}
1617
1618
/*
1619
 * TEST: this is a test function that can be used by input plugins to check the
1620
 * 'pause' and 'resume' callback operations.
1621
 *
1622
 * After is invoked, it will schedule an internal event to wake up the instance
1623
 * after 'sleep_seconds'.
1624
 */
1625
int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_seconds)
1626
0
{
1627
    /*
1628
     * This is a fake pause/resume implementation since it's only used to test the plugin
1629
     * callbacks for such purposes.
1630
     */
1631
1632
    /* pause the instance */
1633
0
    flb_input_pause(ins);
1634
1635
    /* wait */
1636
0
    sleep(sleep_seconds);
1637
1638
    /* resume again */
1639
0
    flb_input_resume(ins);
1640
1641
0
    return 0;
1642
0
}
1643
1644
int flb_input_pause(struct flb_input_instance *ins)
1645
3.36k
{
1646
    /* if the instance is already paused, just return */
1647
3.36k
    if (flb_input_buf_paused(ins)) {
1648
0
        return -1;
1649
0
    }
1650
1651
    /* Pause only if a callback is set and a local context exists */
1652
3.36k
    if (ins->p->cb_pause && ins->context) {
1653
0
        if (flb_input_is_threaded(ins)) {
1654
            /* signal the thread event loop about the 'pause' operation */
1655
0
            flb_input_thread_instance_pause(ins);
1656
0
        }
1657
0
        else {
1658
0
            flb_info("[input] pausing %s", flb_input_name(ins));
1659
0
            ins->p->cb_pause(ins->context, ins->config);
1660
0
        }
1661
0
    }
1662
1663
3.36k
    return 0;
1664
3.36k
}
1665
1666
int flb_input_resume(struct flb_input_instance *ins)
1667
0
{
1668
0
    if (ins->p->cb_resume) {
1669
0
        ins->p->cb_resume(ins->context, ins->config);
1670
0
    }
1671
1672
0
    return 0;
1673
0
}
1674
1675
int flb_input_pause_all(struct flb_config *config)
1676
3.36k
{
1677
3.36k
    int ret;
1678
3.36k
    int paused = 0;
1679
3.36k
    struct mk_list *head;
1680
3.36k
    struct flb_input_instance *ins;
1681
1682
3.36k
    mk_list_foreach(head, &config->inputs) {
1683
3.36k
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1684
        /*
1685
         * Inform the plugin that is being paused, the source type is set to 'FLB_INPUT_PAUSE_MEM_BUF', no real reason, we
1686
         * just need to get it paused.
1687
         */
1688
3.36k
        ret = flb_input_pause(ins);
1689
3.36k
        if (ret == 0) {
1690
3.36k
            paused++;
1691
3.36k
        }
1692
3.36k
    }
1693
1694
3.36k
    return paused;
1695
3.36k
}
1696
1697
int flb_input_collector_destroy(struct flb_input_collector *coll)
1698
1.68k
{
1699
1.68k
    struct flb_config *config = coll->instance->config;
1700
1701
1.68k
    if (coll->type == FLB_COLLECT_TIME) {
1702
0
        if (coll->fd_timer > 0) {
1703
0
            mk_event_timeout_destroy(config->evl, &coll->event);
1704
0
            mk_event_closesocket(coll->fd_timer);
1705
0
        }
1706
0
    }
1707
1.68k
    else {
1708
1.68k
        mk_event_del(config->evl, &coll->event);
1709
1.68k
    }
1710
1711
1.68k
    flb_free(coll);
1712
1713
1.68k
    return 0;
1714
1.68k
}
1715
1716
int flb_input_collector_pause(int coll_id, struct flb_input_instance *in)
1717
0
{
1718
0
    int ret;
1719
0
    flb_pipefd_t fd;
1720
0
    struct flb_input_collector *coll;
1721
1722
0
    coll = get_collector(coll_id, in);
1723
0
    if (!coll) {
1724
0
        return -1;
1725
0
    }
1726
1727
0
    if (coll->running == FLB_FALSE) {
1728
0
        return 0;
1729
0
    }
1730
1731
0
    if (coll->type == FLB_COLLECT_TIME) {
1732
        /*
1733
         * For a collector time, it's better to just remove the file
1734
         * descriptor associated to the time out, when resumed a new
1735
         * one can be created.
1736
         *
1737
         * Note: Invalidate fd_timer first in case closing a socket
1738
         * invokes another event.
1739
         */
1740
0
        fd = coll->fd_timer;
1741
0
        coll->fd_timer = -1;
1742
0
        mk_event_timeout_destroy(coll->evl, &coll->event);
1743
0
        mk_event_closesocket(fd);
1744
0
    }
1745
0
    else if (coll->type & (FLB_COLLECT_FD_SERVER | FLB_COLLECT_FD_EVENT)) {
1746
0
        ret = mk_event_del(coll->evl, &coll->event);
1747
0
        if (ret != 0) {
1748
0
            flb_warn("[input] cannot disable event for %s", in->name);
1749
0
            return -1;
1750
0
        }
1751
0
    }
1752
1753
0
    coll->running = FLB_FALSE;
1754
1755
0
    return 0;
1756
0
}
1757
1758
int flb_input_collector_delete(int coll_id, struct flb_input_instance *in)
1759
0
{
1760
0
    struct flb_input_collector *coll;
1761
1762
0
    coll = get_collector(coll_id, in);
1763
0
    if (!coll) {
1764
0
        return -1;
1765
0
    }
1766
0
    if (flb_input_collector_pause(coll_id, in) < 0) {
1767
0
        return -1;
1768
0
    }
1769
1770
1771
0
    pthread_mutex_lock(&in->config->collectors_mutex);
1772
0
    mk_list_del(&coll->_head);
1773
0
    pthread_mutex_unlock(&in->config->collectors_mutex);
1774
1775
0
    flb_free(coll);
1776
0
    return 0;
1777
0
}
1778
1779
int flb_input_collector_resume(int coll_id, struct flb_input_instance *in)
1780
0
{
1781
0
    int fd;
1782
0
    int ret;
1783
0
    struct flb_input_collector *coll;
1784
0
    struct flb_config *config;
1785
0
    struct mk_event *event;
1786
1787
0
    coll = get_collector(coll_id, in);
1788
0
    if (!coll) {
1789
0
        return -1;
1790
0
    }
1791
1792
0
    if (coll->running == FLB_TRUE) {
1793
0
        flb_error("[input] cannot resume collector %s:%i, already running",
1794
0
                  in->name, coll_id);
1795
0
        return -1;
1796
0
    }
1797
1798
0
    config = in->config;
1799
0
    event = &coll->event;
1800
1801
    /* If data ingestion has been paused, the collector cannot resume */
1802
0
    if (config->is_ingestion_active == FLB_FALSE) {
1803
0
        return 0;
1804
0
    }
1805
1806
0
    if (coll->type == FLB_COLLECT_TIME) {
1807
0
        event->mask = MK_EVENT_EMPTY;
1808
0
        event->status = MK_EVENT_NONE;
1809
0
        fd = mk_event_timeout_create(coll->evl, coll->seconds,
1810
0
                                     coll->nanoseconds, event);
1811
0
        if (fd == -1) {
1812
0
            flb_error("[input collector] resume COLLECT_TIME failed");
1813
0
            return -1;
1814
0
        }
1815
0
        coll->fd_timer = fd;
1816
0
    }
1817
0
    else if (coll->type & (FLB_COLLECT_FD_SERVER | FLB_COLLECT_FD_EVENT)) {
1818
0
        event->fd     = coll->fd_event;
1819
0
        event->mask   = MK_EVENT_EMPTY;
1820
0
        event->status = MK_EVENT_NONE;
1821
1822
0
        ret = mk_event_add(coll->evl,
1823
0
                           coll->fd_event,
1824
0
                           FLB_ENGINE_EV_CORE,
1825
0
                           MK_EVENT_READ, event);
1826
0
        if (ret == -1) {
1827
0
            flb_error("[input] cannot disable/pause event for %s", in->name);
1828
0
            return -1;
1829
0
        }
1830
0
    }
1831
1832
0
    coll->running = FLB_TRUE;
1833
1834
0
    return 0;
1835
0
}
1836
1837
int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config)
1838
5.90M
{
1839
5.90M
    struct mk_list *head;
1840
5.90M
    struct mk_list *head_coll;
1841
5.90M
    struct flb_input_instance *ins;
1842
5.90M
    struct flb_input_collector *collector = NULL;
1843
5.90M
    struct flb_input_coro *input_coro;
1844
1845
5.90M
    mk_list_foreach(head, &config->inputs) {
1846
5.90M
        ins = mk_list_entry(head, struct flb_input_instance, _head);
1847
5.90M
        mk_list_foreach(head_coll, &ins->collectors) {
1848
5.90M
            collector = mk_list_entry(head_coll, struct flb_input_collector, _head);
1849
5.90M
            if (collector->fd_event == fd) {
1850
5.90M
                break;
1851
5.90M
            }
1852
1.69k
            else if (collector->fd_timer == fd) {
1853
0
                flb_utils_timer_consume(fd);
1854
0
                break;
1855
0
            }
1856
1.69k
            collector = NULL;
1857
1.69k
        }
1858
1859
5.90M
        if (collector) {
1860
5.90M
            break;
1861
5.90M
        }
1862
5.90M
    }
1863
1864
    /* No matches */
1865
5.90M
    if (!collector) {
1866
1.69k
        return -1;
1867
1.69k
    }
1868
1869
5.90M
    if (collector->running == FLB_FALSE) {
1870
0
        return -1;
1871
0
    }
1872
1873
    /* Trigger the collector callback */
1874
5.90M
    if (collector->instance->runs_in_coroutine) {
1875
0
        input_coro = flb_input_coro_collect(collector, config);
1876
0
        if (!input_coro) {
1877
0
            return -1;
1878
0
        }
1879
0
        flb_input_coro_resume(input_coro);
1880
0
    }
1881
5.90M
    else {
1882
5.90M
        if (collector->cb_collect(collector->instance, config,
1883
5.90M
                                  collector->instance->context) == -1) {
1884
2.75M
            return -1;
1885
2.75M
        }
1886
5.90M
    }
1887
1888
3.15M
    return 0;
1889
5.90M
}
1890
1891
int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins)
1892
0
{
1893
0
    if (!u) {
1894
0
        return -1;
1895
0
    }
1896
1897
    /*
1898
     * if the input instance runs in threaded mode, make sure to flag the
1899
     * upstream context so the lists operations are done in thread safe mode
1900
     */
1901
0
    if (flb_input_is_threaded(ins)) {
1902
0
        flb_upstream_thread_safe(u);
1903
0
        mk_list_add(&u->base._head, &ins->upstreams);
1904
0
    }
1905
1906
    /* Set networking options 'net.*' received through instance properties */
1907
0
    memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup));
1908
1909
0
    return 0;
1910
0
}
1911
1912
int flb_input_downstream_set(struct flb_downstream *stream,
1913
                             struct flb_input_instance *ins)
1914
0
{
1915
0
    if (stream == NULL) {
1916
0
        return -1;
1917
0
    }
1918
1919
    /*
1920
     * If the input plugin will run in multiple threads, enable
1921
     * the thread safe mode for the Downstream context.
1922
     */
1923
0
    if (flb_input_is_threaded(ins)) {
1924
0
        flb_stream_enable_thread_safety(&stream->base);
1925
1926
0
        mk_list_add(&stream->base._head, &ins->downstreams);
1927
0
    }
1928
1929
0
    return 0;
1930
0
}