Coverage Report

Created: 2025-01-28 06:34

/src/fluent-bit/src/flb_output.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdio.h>
21
#include <stdlib.h>
22
#include <string.h>
23
24
#include <fluent-bit/flb_info.h>
25
#include <fluent-bit/flb_mem.h>
26
#include <fluent-bit/flb_str.h>
27
#include <fluent-bit/flb_env.h>
28
#include <fluent-bit/flb_coro.h>
29
#include <fluent-bit/flb_output.h>
30
#include <fluent-bit/flb_kv.h>
31
#include <fluent-bit/flb_io.h>
32
#include <fluent-bit/flb_uri.h>
33
#include <fluent-bit/flb_config.h>
34
#include <fluent-bit/flb_macros.h>
35
#include <fluent-bit/flb_utils.h>
36
#include <fluent-bit/flb_plugin.h>
37
#include <fluent-bit/flb_plugin_proxy.h>
38
#include <fluent-bit/flb_http_client_debug.h>
39
#include <fluent-bit/flb_output_thread.h>
40
#include <fluent-bit/flb_mp.h>
41
#include <fluent-bit/flb_pack.h>
42
43
FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
44
45
void flb_output_prepare()
46
0
{
47
0
    FLB_TLS_INIT(out_flush_params);
48
0
}
49
50
/* Validate the the output address protocol */
51
static int check_protocol(const char *prot, const char *output)
52
0
{
53
0
    int len;
54
0
    char *p;
55
56
0
    p = strstr(output, "://");
57
0
    if (p && p != output) {
58
0
        len = p - output;
59
0
    }
60
0
    else {
61
0
        len = strlen(output);
62
0
    }
63
64
0
    if (strlen(prot) != len) {
65
0
        return 0;
66
0
    }
67
68
    /* Output plugin match */
69
0
    if (strncasecmp(prot, output, len) == 0) {
70
0
        return 1;
71
0
    }
72
73
0
    return 0;
74
0
}
75
76
77
/* Invoke pre-run call for the output plugin */
78
void flb_output_pre_run(struct flb_config *config)
79
0
{
80
0
    struct mk_list *head;
81
0
    struct flb_output_instance *ins;
82
0
    struct flb_output_plugin *p;
83
84
0
    mk_list_foreach(head, &config->outputs) {
85
0
        ins = mk_list_entry(head, struct flb_output_instance, _head);
86
0
        p = ins->p;
87
0
        if (p->cb_pre_run) {
88
0
            p->cb_pre_run(ins->context, config);
89
0
        }
90
0
    }
91
0
}
92
93
static void flb_output_free_properties(struct flb_output_instance *ins)
94
0
{
95
96
0
    flb_kv_release(&ins->properties);
97
0
    flb_kv_release(&ins->net_properties);
98
99
0
#ifdef FLB_HAVE_TLS
100
0
    if (ins->tls_vhost) {
101
0
        flb_sds_destroy(ins->tls_vhost);
102
0
    }
103
0
    if (ins->tls_ca_path) {
104
0
        flb_sds_destroy(ins->tls_ca_path);
105
0
    }
106
0
    if (ins->tls_ca_file) {
107
0
        flb_sds_destroy(ins->tls_ca_file);
108
0
    }
109
0
    if (ins->tls_crt_file) {
110
0
        flb_sds_destroy(ins->tls_crt_file);
111
0
    }
112
0
    if (ins->tls_key_file) {
113
0
        flb_sds_destroy(ins->tls_key_file);
114
0
    }
115
0
    if (ins->tls_key_passwd) {
116
0
        flb_sds_destroy(ins->tls_key_passwd);
117
0
    }
118
0
#endif
119
0
}
120
121
void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush)
122
0
{
123
0
    struct flb_output_instance *ins = out_flush->o_ins;
124
0
    struct flb_out_thread_instance *th_ins;
125
126
    /* Move output coroutine context from active list to the destroy one */
127
0
    if (flb_output_is_threaded(ins) == FLB_TRUE) {
128
0
        th_ins = flb_output_thread_instance_get();
129
0
        pthread_mutex_lock(&th_ins->flush_mutex);
130
0
        mk_list_del(&out_flush->_head);
131
0
        mk_list_add(&out_flush->_head, &th_ins->flush_list_destroy);
132
0
        pthread_mutex_unlock(&th_ins->flush_mutex);
133
0
    }
134
0
    else {
135
0
        mk_list_del(&out_flush->_head);
136
0
        mk_list_add(&out_flush->_head, &ins->flush_list_destroy);
137
0
    }
138
0
}
139
140
int flb_output_flush_id_get(struct flb_output_instance *ins)
141
0
{
142
0
    int id;
143
0
    int max = (2 << 13) - 1; /* max for 14 bits */
144
0
    struct flb_out_thread_instance *th_ins;
145
146
0
    if (flb_output_is_threaded(ins) == FLB_TRUE) {
147
0
        th_ins = flb_output_thread_instance_get();
148
0
        id = th_ins->flush_id;
149
0
        th_ins->flush_id++;
150
151
        /* reset once it reach the maximum allowed */
152
0
        if (th_ins->flush_id > max) {
153
0
            th_ins->flush_id = 0;
154
0
        }
155
0
    }
156
0
    else {
157
0
        id = ins->flush_id;
158
0
        ins->flush_id++;
159
160
        /* reset once it reach the maximum allowed */
161
0
        if (ins->flush_id > max) {
162
0
            ins->flush_id = 0;
163
0
        }
164
0
    }
165
166
0
    return id;
167
0
}
168
169
void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
170
0
{
171
0
    struct flb_output_flush *out_flush;
172
173
0
    out_flush = (struct flb_output_flush *) FLB_CORO_DATA(coro);
174
0
    mk_list_add(&out_flush->_head, &ins->flush_list);
175
0
}
176
177
/*
178
 * Queue a task to be flushed at a later time
179
 * Deletes retry context if enqueue fails
180
 */
181
static int flb_output_task_queue_enqueue(struct flb_task_queue *queue,
182
                                         struct flb_task_retry *retry,
183
                                         struct flb_task *task,
184
                                         struct flb_output_instance *out_ins,
185
                                         struct flb_config *config)
186
0
{
187
0
    struct flb_task_enqueued *queued_task;
188
189
0
    queued_task = flb_malloc(sizeof(struct flb_task_enqueued));
190
0
    if (!queued_task) {
191
0
        flb_errno();
192
0
        if (retry) {
193
0
            flb_task_retry_destroy(retry);
194
0
        }
195
0
        return -1;
196
0
    }
197
0
    queued_task->retry = retry;
198
0
    queued_task->out_instance = out_ins;
199
0
    queued_task->task = task;
200
0
    queued_task->config = config;
201
202
0
    mk_list_add(&queued_task->_head, &queue->pending);
203
0
    return 0;
204
0
}
205
206
/*
207
 * Pop task from pending queue and flush it
208
 * Will delete retry context if flush fails
209
 */
210
static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
211
0
{
212
0
    struct flb_task_enqueued *queued_task;
213
0
    int ret;
214
0
    int is_empty;
215
216
0
    is_empty = mk_list_is_empty(&queue->pending) == 0;
217
0
    if (is_empty) {
218
0
        flb_error("Attempting to flush task from an empty in_progress queue");
219
0
        return -1;
220
0
    }
221
222
0
    queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
223
0
    mk_list_del(&queued_task->_head);
224
0
    mk_list_add(&queued_task->_head, &queue->in_progress);
225
226
    /*
227
     * Remove temporary user now that task is out of singleplex queue.
228
     * Flush will add back the user representing queued_task->out_instance if it succeeds.
229
     */
230
0
    flb_task_users_dec(queued_task->task, FLB_FALSE);
231
0
    ret = flb_output_task_flush(queued_task->task,
232
0
                                queued_task->out_instance,
233
0
                                queued_task->config);
234
235
    /* Destroy retry context if needed */
236
0
    if (ret == -1) {
237
0
        if (queued_task->retry) {
238
0
            flb_task_retry_destroy(queued_task->retry);
239
0
        }
240
        /* Flush the next task */
241
0
        flb_output_task_singleplex_flush_next(queue);
242
0
        return -1;
243
0
    }
244
245
0
    return ret;
246
0
}
247
248
/*
249
 * Will either run or queue running a single task
250
 * Deletes retry context if enqueue fails
251
 */
252
int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
253
                                       struct flb_task_retry *retry,
254
                                       struct flb_task *task,
255
                                       struct flb_output_instance *out_ins,
256
                                       struct flb_config *config)
257
0
{
258
0
    int ret;
259
0
    int is_empty;
260
261
    /*
262
     * Add temporary user to preserve task while in singleplex queue.
263
     * Temporary user will be removed when task is removed from queue.
264
     *
265
     * Note: if we fail to increment now, then the task may be prematurely
266
     * deleted if the task's users go to 0 while we are waiting in the
267
     * queue.
268
     */
269
0
    flb_task_users_inc(task);
270
271
    /* Enqueue task */
272
0
    ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
273
0
    if (ret == -1) {
274
0
        return -1;
275
0
    }
276
277
    /* Launch task if nothing is running */
278
0
    is_empty = mk_list_is_empty(&out_ins->singleplex_queue->in_progress) == 0;
279
0
    if (is_empty) {
280
0
        return flb_output_task_queue_flush_one(out_ins->singleplex_queue);
281
0
    }
282
283
0
    return 0;
284
0
}
285
286
/*
287
 * Clear in progress task and flush a single queued task if exists
288
 * Deletes retry context on next flush if flush fails
289
 */
290
int flb_output_task_singleplex_flush_next(struct flb_task_queue *queue)
291
0
{
292
0
    int is_empty;
293
0
    struct flb_task_enqueued *ended_task;
294
295
    /* Remove in progress task */
296
0
    is_empty = mk_list_is_empty(&queue->in_progress) == 0;
297
0
    if (!is_empty) {
298
0
        ended_task = mk_list_entry_first(&queue->in_progress,
299
0
                                        struct flb_task_enqueued, _head);
300
0
        mk_list_del(&ended_task->_head);
301
0
        flb_free(ended_task);
302
0
    }
303
304
    /* Flush if there is a pending task queued */
305
0
    is_empty = mk_list_is_empty(&queue->pending) == 0;
306
0
    if (!is_empty) {
307
0
        return flb_output_task_queue_flush_one(queue);
308
0
    }
309
0
    return 0;
310
0
}
311
312
/*
313
 * Flush a task through the output plugin, either using a worker thread + coroutine
314
 * or a simple co-routine in the current thread.
315
 */
316
int flb_output_task_flush(struct flb_task *task,
317
                          struct flb_output_instance *out_ins,
318
                          struct flb_config *config)
319
0
{
320
0
    int ret;
321
0
    struct flb_output_flush *out_flush;
322
323
0
    if (flb_output_is_threaded(out_ins) == FLB_TRUE) {
324
0
        flb_task_users_inc(task);
325
326
        /* Dispatch the task to the thread pool */
327
0
        ret = flb_output_thread_pool_flush(task, out_ins, config);
328
0
        if (ret == -1) {
329
0
            flb_task_users_dec(task, FLB_FALSE);
330
331
            /* If we are in synchronous mode, flush one waiting task */
332
0
            if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
333
0
                flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
334
0
            }
335
0
        }
336
0
    }
337
0
    else {
338
        /* Queue co-routine handling */
339
0
        out_flush = flb_output_flush_create(task,
340
0
                                           task->i_ins,
341
0
                                           out_ins,
342
0
                                           config);
343
0
        if (!out_flush) {
344
0
            return -1;
345
0
        }
346
347
0
        flb_task_users_inc(task);
348
0
        ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
349
0
                        sizeof(struct flb_output_flush*));
350
0
        if (ret == -1) {
351
0
            flb_errno();
352
0
            flb_output_flush_destroy(out_flush);
353
0
            flb_task_users_dec(task, FLB_FALSE);
354
355
            /* If we are in synchronous mode, flush one waiting task */
356
0
            if (out_ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
357
0
                flb_output_task_singleplex_flush_next(out_ins->singleplex_queue);
358
0
            }
359
360
0
            return -1;
361
0
        }
362
0
    }
363
364
0
    return 0;
365
0
}
366
367
int flb_output_instance_destroy(struct flb_output_instance *ins)
368
0
{
369
0
    if (ins->alias) {
370
0
        flb_sds_destroy(ins->alias);
371
0
    }
372
373
    /* Remove URI context */
374
0
    if (ins->host.uri) {
375
0
        flb_uri_destroy(ins->host.uri);
376
0
    }
377
378
0
    flb_sds_destroy(ins->host.name);
379
0
    flb_sds_destroy(ins->host.address);
380
0
    flb_sds_destroy(ins->host.listen);
381
0
    flb_sds_destroy(ins->match);
382
383
0
#ifdef FLB_HAVE_REGEX
384
0
        if (ins->match_regex) {
385
0
            flb_regex_destroy(ins->match_regex);
386
0
        }
387
0
#endif
388
389
0
#ifdef FLB_HAVE_TLS
390
0
    if (ins->use_tls == FLB_TRUE) {
391
0
        if (ins->tls) {
392
0
            flb_tls_destroy(ins->tls);
393
0
        }
394
0
    }
395
396
0
    if (ins->tls_config_map) {
397
0
        flb_config_map_destroy(ins->tls_config_map);
398
0
    }
399
0
#endif
400
401
    /* Remove metrics */
402
0
#ifdef FLB_HAVE_METRICS
403
0
    if (ins->cmt) {
404
0
        cmt_destroy(ins->cmt);
405
0
    }
406
407
0
    if (ins->metrics) {
408
0
        flb_metrics_destroy(ins->metrics);
409
0
    }
410
0
#endif
411
412
    /* destroy callback context */
413
0
    if (ins->callback) {
414
0
        flb_callback_destroy(ins->callback);
415
0
    }
416
417
    /* destroy config map */
418
0
    if (ins->config_map) {
419
0
        flb_config_map_destroy(ins->config_map);
420
0
    }
421
422
0
    if (ins->net_config_map) {
423
0
        flb_config_map_destroy(ins->net_config_map);
424
0
    }
425
426
0
    if (ins->ch_events[0] > 0) {
427
0
        mk_event_closesocket(ins->ch_events[0]);
428
0
    }
429
430
0
    if (ins->ch_events[1] > 0) {
431
0
        mk_event_closesocket(ins->ch_events[1]);
432
0
    }
433
434
    /* release properties */
435
0
    flb_output_free_properties(ins);
436
437
    /* free singleplex queue */
438
0
    if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
439
0
        flb_task_queue_destroy(ins->singleplex_queue);
440
0
    }
441
442
0
    mk_list_del(&ins->_head);
443
444
    /* processor */
445
0
    if (ins->processor) {
446
0
        flb_processor_destroy(ins->processor);
447
0
    }
448
449
0
    flb_free(ins);
450
451
0
    return 0;
452
0
}
453
454
/* Invoke exit call for the output plugin */
455
void flb_output_exit(struct flb_config *config)
456
0
{
457
0
    struct mk_list *tmp;
458
0
    struct mk_list *head;
459
0
    struct flb_output_instance *ins;
460
0
    struct flb_output_plugin *p;
461
0
    void *params;
462
463
0
    mk_list_foreach_safe(head, tmp, &config->outputs) {
464
0
        ins = mk_list_entry(head, struct flb_output_instance, _head);
465
0
        p = ins->p;
466
467
0
        if (ins->is_threaded == FLB_FALSE) {
468
0
            if (ins->p->cb_worker_exit) {
469
0
                ins->p->cb_worker_exit(ins->context, ins->config);
470
0
            }
471
0
        }
472
473
        /* Stop any worker thread */
474
0
        if (flb_output_is_threaded(ins) == FLB_TRUE) {
475
0
            flb_output_thread_pool_destroy(ins);
476
0
        }
477
478
        /* Check a exit callback */
479
0
        if (p->cb_exit) {
480
0
            p->cb_exit(ins->context, config);
481
0
        }
482
0
        flb_output_instance_destroy(ins);
483
0
    }
484
485
0
    params = FLB_TLS_GET(out_flush_params);
486
0
    if (params) {
487
0
        flb_free(params);
488
0
        FLB_TLS_SET(out_flush_params, NULL);
489
0
    }
490
0
}
491
492
static inline int instance_id(struct flb_config *config)
493
0
{
494
0
    struct flb_output_instance *ins;
495
496
0
    if (mk_list_size(&config->outputs) == 0) {
497
0
        return 0;
498
0
    }
499
500
0
    ins = mk_list_entry_last(&config->outputs, struct flb_output_instance,
501
0
                             _head);
502
0
    return (ins->id + 1);
503
0
}
504
505
struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
506
                                                    int out_id)
507
0
{
508
0
    struct mk_list *head;
509
0
    struct flb_output_instance *ins;
510
511
0
    mk_list_foreach(head, &config->outputs) {
512
0
        ins = mk_list_entry(head, struct flb_output_instance, _head);
513
0
        if (ins->id == out_id) {
514
0
            break;
515
0
        }
516
0
        ins = NULL;
517
0
    }
518
519
0
    if (!ins) {
520
0
        return NULL;
521
0
    }
522
523
0
    return ins;
524
0
}
525
526
/*
527
 * Invoked everytime a flush callback has finished (returned). This function
528
 * is called from the event loop.
529
 */
530
int flb_output_flush_finished(struct flb_config *config, int out_id)
531
0
{
532
0
    struct mk_list *tmp;
533
0
    struct mk_list *head;
534
0
    struct mk_list *list;
535
0
    struct flb_output_instance *ins;
536
0
    struct flb_output_flush *out_flush;
537
0
    struct flb_out_thread_instance *th_ins;
538
539
0
    ins = flb_output_get_instance(config, out_id);
540
0
    if (!ins) {
541
0
        return -1;
542
0
    }
543
544
0
    if (flb_output_is_threaded(ins) == FLB_TRUE) {
545
0
        th_ins = flb_output_thread_instance_get();
546
0
        list = &th_ins->flush_list_destroy;
547
0
    }
548
0
    else {
549
0
        list = &ins->flush_list_destroy;
550
0
    }
551
552
    /* Look for output coroutines that needs to be destroyed */
553
0
    mk_list_foreach_safe(head, tmp, list) {
554
0
        out_flush = mk_list_entry(head, struct flb_output_flush, _head);
555
0
        flb_output_flush_destroy(out_flush);
556
0
    }
557
558
0
    return 0;
559
0
}
560
561
562
/*
563
 * It validate an output type given the string, it return the
564
 * proper type and if valid, populate the global config.
565
 */
566
struct flb_output_instance *flb_output_new(struct flb_config *config,
567
                                           const char *output, void *data,
568
                                           int public_only)
569
0
{
570
0
    int ret = -1;
571
0
    int flags = 0;
572
0
    struct mk_list *head;
573
0
    struct flb_output_plugin *plugin;
574
0
    struct flb_output_instance *instance = NULL;
575
576
0
    if (!output) {
577
0
        return NULL;
578
0
    }
579
580
0
    mk_list_foreach(head, &config->out_plugins) {
581
0
        plugin = mk_list_entry(head, struct flb_output_plugin, _head);
582
0
        if (!check_protocol(plugin->name, output)) {
583
0
            plugin = NULL;
584
0
            continue;
585
0
        }
586
587
0
        if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) {
588
0
            return NULL;
589
0
        }
590
0
        break;
591
0
    }
592
593
0
    if (!plugin) {
594
0
        return NULL;
595
0
    }
596
597
    /* Create and load instance */
598
0
    instance = flb_calloc(1, sizeof(struct flb_output_instance));
599
0
    if (!instance) {
600
0
        flb_errno();
601
0
        return NULL;
602
0
    }
603
604
    /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */
605
0
    if (plugin->event_type == 0) {
606
0
        instance->event_type = FLB_OUTPUT_LOGS;
607
0
    }
608
0
    else {
609
0
        instance->event_type = plugin->event_type;
610
0
    }
611
0
    instance->config = config;
612
0
    instance->log_level = -1;
613
0
    instance->log_suppress_interval = -1;
614
0
    instance->test_mode = FLB_FALSE;
615
0
    instance->is_threaded = FLB_FALSE;
616
0
    instance->tp_workers = plugin->workers;
617
618
    /* Retrieve an instance id for the output instance */
619
0
    instance->id = instance_id(config);
620
621
    /* format name (with instance id) */
622
0
    snprintf(instance->name, sizeof(instance->name) - 1,
623
0
             "%s.%i", plugin->name, instance->id);
624
0
    instance->p = plugin;
625
0
    instance->callback = flb_callback_create(instance->name);
626
0
    if (!instance->callback) {
627
0
        if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
628
0
            flb_task_queue_destroy(instance->singleplex_queue);
629
0
        }
630
0
        flb_free(instance);
631
0
        return NULL;
632
0
    }
633
634
0
    if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) {
635
0
        instance->context = NULL;
636
0
    }
637
0
    else {
638
0
        struct flb_plugin_proxy_context *ctx;
639
640
0
        ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
641
0
        if (!ctx) {
642
0
            flb_errno();
643
0
            if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
644
0
                flb_task_queue_destroy(instance->singleplex_queue);
645
0
            }
646
0
            flb_free(instance);
647
0
            return NULL;
648
0
        }
649
650
0
        ctx->proxy = plugin->proxy;
651
652
0
        instance->context = ctx;
653
0
    }
654
655
0
    instance->alias       = NULL;
656
0
    instance->flags       = instance->p->flags;
657
0
    instance->data        = data;
658
0
    instance->match       = NULL;
659
0
#ifdef FLB_HAVE_REGEX
660
0
    instance->match_regex = NULL;
661
0
#endif
662
0
    instance->retry_limit = 1;
663
0
    instance->host.name   = NULL;
664
0
    instance->host.address = NULL;
665
0
    instance->net_config_map = NULL;
666
667
    /* Storage */
668
0
    instance->total_limit_size = -1;
669
670
    /* Parent plugin flags */
671
0
    flags = instance->flags;
672
0
    if (flags & FLB_IO_TCP) {
673
0
        instance->use_tls = FLB_FALSE;
674
0
    }
675
0
    else if (flags & FLB_IO_TLS) {
676
0
        instance->use_tls = FLB_TRUE;
677
0
    }
678
0
    else if (flags & FLB_IO_OPT_TLS) {
679
        /* TLS must be enabled manually in the config */
680
0
        instance->use_tls = FLB_FALSE;
681
0
        instance->flags |= FLB_IO_TLS;
682
0
    }
683
684
0
#ifdef FLB_HAVE_TLS
685
0
    instance->tls                   = NULL;
686
0
    instance->tls_debug             = -1;
687
0
    instance->tls_verify            = FLB_TRUE;
688
0
    instance->tls_verify_hostname   = FLB_FALSE;
689
0
    instance->tls_vhost             = NULL;
690
0
    instance->tls_ca_path           = NULL;
691
0
    instance->tls_ca_file           = NULL;
692
0
    instance->tls_crt_file          = NULL;
693
0
    instance->tls_key_file          = NULL;
694
0
    instance->tls_key_passwd        = NULL;
695
0
#endif
696
697
0
    if (plugin->flags & FLB_OUTPUT_NET) {
698
0
        ret = flb_net_host_set(plugin->name, &instance->host, output);
699
0
        if (ret != 0) {
700
0
            if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
701
0
                flb_task_queue_destroy(instance->singleplex_queue);
702
0
            }
703
0
            flb_free(instance);
704
0
            return NULL;
705
0
        }
706
0
    }
707
708
    /* Create singleplex queue if SYNCHRONOUS mode is used */
709
0
    instance->singleplex_queue = NULL;
710
0
    if (instance->flags & FLB_OUTPUT_SYNCHRONOUS) {
711
0
        instance->singleplex_queue = flb_task_queue_create();
712
0
        if (!instance->singleplex_queue) {
713
0
            flb_free(instance);
714
0
            flb_errno();
715
0
            return NULL;
716
0
        }
717
0
    }
718
719
0
    flb_kv_init(&instance->properties);
720
0
    flb_kv_init(&instance->net_properties);
721
0
    mk_list_init(&instance->upstreams);
722
0
    mk_list_init(&instance->flush_list);
723
0
    mk_list_init(&instance->flush_list_destroy);
724
725
0
    mk_list_add(&instance->_head, &config->outputs);
726
727
    /* processor instance */
728
0
    instance->processor = flb_processor_create(config, instance->name, instance, FLB_PLUGIN_OUTPUT);
729
730
    /* Tests */
731
0
    instance->test_formatter.callback = plugin->test_formatter.callback;
732
0
    instance->test_response.callback = plugin->test_response.callback;
733
734
735
0
    return instance;
736
0
}
737
738
static inline int prop_key_check(const char *key, const char *kv, int k_len)
739
0
{
740
0
    int len;
741
742
0
    len = strlen(key);
743
0
    if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
744
0
        return 0;
745
0
    }
746
747
0
    return -1;
748
0
}
749
750
/* Override a configuration property for the given input_instance plugin */
751
int flb_output_set_property(struct flb_output_instance *ins,
752
                            const char *k, const char *v)
753
0
{
754
0
    int len;
755
0
    int ret;
756
0
    ssize_t limit;
757
0
    flb_sds_t tmp;
758
0
    struct flb_kv *kv;
759
0
    struct flb_config *config = ins->config;
760
761
0
    len = strlen(k);
762
0
    tmp = flb_env_var_translate(config->env, v);
763
0
    if (tmp) {
764
0
        if (strlen(tmp) == 0) {
765
0
            flb_sds_destroy(tmp);
766
0
            tmp = NULL;
767
0
        }
768
0
    }
769
770
    /* Check if the key is a known/shared property */
771
0
    if (prop_key_check("match", k, len) == 0) {
772
0
        flb_utils_set_plugin_string_property("match", &ins->match, tmp);
773
0
    }
774
0
#ifdef FLB_HAVE_REGEX
775
0
    else if (prop_key_check("match_regex", k, len) == 0 && tmp) {
776
0
        ins->match_regex = flb_regex_create(tmp);
777
0
        flb_sds_destroy(tmp);
778
0
    }
779
0
#endif
780
0
    else if (prop_key_check("alias", k, len) == 0 && tmp) {
781
0
        flb_utils_set_plugin_string_property("alias", &ins->alias, tmp);
782
0
    }
783
0
    else if (prop_key_check("log_level", k, len) == 0 && tmp) {
784
0
        ret = flb_log_get_level_str(tmp);
785
0
        flb_sds_destroy(tmp);
786
0
        if (ret == -1) {
787
0
            return -1;
788
0
        }
789
0
        ins->log_level = ret;
790
0
    }
791
0
    else if (prop_key_check("log_suppress_interval", k, len) == 0 && tmp) {
792
0
        ret = flb_utils_time_to_seconds(tmp);
793
0
        flb_sds_destroy(tmp);
794
0
        if (ret == -1) {
795
0
            return -1;
796
0
        }
797
0
        ins->log_suppress_interval = ret;
798
0
    }
799
0
    else if (prop_key_check("host", k, len) == 0) {
800
0
        flb_utils_set_plugin_string_property("host", &ins->host.name, tmp);
801
0
    }
802
0
    else if (prop_key_check("port", k, len) == 0) {
803
0
        if (tmp) {
804
0
            ins->host.port = atoi(tmp);
805
0
            flb_sds_destroy(tmp);
806
0
        }
807
0
        else {
808
0
            ins->host.port = 0;
809
0
        }
810
0
    }
811
0
    else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
812
0
        ins->host.ipv6 = flb_utils_bool(tmp);
813
0
        flb_sds_destroy(tmp);
814
0
    }
815
0
    else if (prop_key_check("retry_limit", k, len) == 0) {
816
0
        if (tmp) {
817
0
            if (strcasecmp(tmp, "no_limits") == 0 ||
818
0
                strcasecmp(tmp, "false") == 0 ||
819
0
                strcasecmp(tmp, "off") == 0) {
820
                /* No limits for retries */
821
0
                ins->retry_limit = FLB_OUT_RETRY_UNLIMITED;
822
0
            }
823
0
            else if (strcasecmp(tmp, "no_retries") == 0) {
824
0
                ins->retry_limit = FLB_OUT_RETRY_NONE;
825
0
            }
826
0
            else {
827
0
                ins->retry_limit = atoi(tmp);
828
0
                if (ins->retry_limit <= 0) {
829
0
                    flb_warn("[config] invalid retry_limit. set default.");
830
                    /* set default when input is invalid number */
831
0
                    ins->retry_limit = 1;
832
0
                }
833
0
            }
834
0
            flb_sds_destroy(tmp);
835
0
        }
836
0
        else {
837
0
            ins->retry_limit = 1;
838
0
        }
839
0
    }
840
0
    else if (strncasecmp("net.", k, 4) == 0 && tmp) {
841
0
        kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
842
0
        if (!kv) {
843
0
            if (tmp) {
844
0
                flb_sds_destroy(tmp);
845
0
            }
846
0
            return -1;
847
0
        }
848
0
        kv->val = tmp;
849
0
    }
850
#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
851
    else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) {
852
        ret = flb_http_client_debug_property_is_valid((char *) k, tmp);
853
        if (ret == FLB_TRUE) {
854
            kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
855
            if (!kv) {
856
                if (tmp) {
857
                    flb_sds_destroy(tmp);
858
                }
859
                return -1;
860
            }
861
            kv->val = tmp;
862
        }
863
        else {
864
            flb_error("[config] invalid property '%s' on instance '%s'",
865
                      k, flb_output_name(ins));
866
            flb_sds_destroy(tmp);
867
        }
868
    }
869
#endif
870
0
#ifdef FLB_HAVE_TLS
871
0
    else if (prop_key_check("tls", k, len) == 0 && tmp) {
872
0
        ins->use_tls = flb_utils_bool(tmp);
873
0
        if (ins->use_tls == FLB_TRUE && ((ins->flags & FLB_IO_TLS) == 0)) {
874
0
            flb_error("[config] %s does not support TLS", ins->name);
875
0
            flb_sds_destroy(tmp);
876
0
            return -1;
877
0
        }
878
0
        flb_sds_destroy(tmp);
879
0
    }
880
0
    else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
881
0
        ins->tls_verify = flb_utils_bool(tmp);
882
0
        flb_sds_destroy(tmp);
883
0
    }
884
0
    else if (prop_key_check("tls.verify_hostname", k, len) == 0 && tmp) {
885
0
        ins->tls_verify_hostname = flb_utils_bool(tmp);
886
0
        flb_sds_destroy(tmp);
887
0
    }
888
0
    else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
889
0
        ins->tls_debug = atoi(tmp);
890
0
        flb_sds_destroy(tmp);
891
0
    }
892
0
    else if (prop_key_check("tls.vhost", k, len) == 0) {
893
0
        flb_utils_set_plugin_string_property("tls.vhost", &ins->tls_vhost, tmp);
894
0
    }
895
0
    else if (prop_key_check("tls.ca_path", k, len) == 0) {
896
0
        flb_utils_set_plugin_string_property("tls.ca_path", &ins->tls_ca_path, tmp);
897
0
    }
898
0
    else if (prop_key_check("tls.ca_file", k, len) == 0) {
899
0
        flb_utils_set_plugin_string_property("tls.ca_file", &ins->tls_ca_file, tmp);
900
0
    }
901
0
    else if (prop_key_check("tls.crt_file", k, len) == 0) {
902
0
        flb_utils_set_plugin_string_property("tls.crt_file", &ins->tls_crt_file, tmp);
903
0
    }
904
0
    else if (prop_key_check("tls.key_file", k, len) == 0) {
905
0
        flb_utils_set_plugin_string_property("tls.key_file", &ins->tls_key_file, tmp);
906
0
    }
907
0
    else if (prop_key_check("tls.key_passwd", k, len) == 0) {
908
0
        flb_utils_set_plugin_string_property("tls.key_passwd", &ins->tls_key_passwd, tmp);
909
0
    }
910
0
#endif
911
0
    else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
912
0
        if (strcasecmp(tmp, "off") == 0 ||
913
0
            flb_utils_bool(tmp) == FLB_FALSE) {
914
            /* no limit for filesystem storage */
915
0
            limit = -1;
916
0
            flb_info("[config] unlimited filesystem buffer for %s plugin",
917
0
                     ins->name);
918
0
        }
919
0
        else {
920
0
            limit = flb_utils_size_to_bytes(tmp);
921
0
            if (limit == -1) {
922
0
                flb_sds_destroy(tmp);
923
0
                return -1;
924
0
            }
925
926
0
            if (limit == 0) {
927
0
                limit = -1;
928
0
            }
929
0
        }
930
931
0
        flb_sds_destroy(tmp);
932
0
        ins->total_limit_size = (size_t) limit;
933
0
    }
934
0
    else if (prop_key_check("workers", k, len) == 0 && tmp) {
935
        /* Set the number of workers */
936
0
        ins->tp_workers = atoi(tmp);
937
0
        flb_sds_destroy(tmp);
938
0
    }
939
0
    else {
940
        /*
941
         * Create the property, we don't pass the value since we will
942
         * map it directly to avoid an extra memory allocation.
943
         */
944
0
        kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
945
0
        if (!kv) {
946
0
            if (tmp) {
947
0
                flb_sds_destroy(tmp);
948
0
            }
949
0
            return -1;
950
0
        }
951
0
        kv->val = tmp;
952
0
    }
953
954
0
    return 0;
955
0
}
956
957
/* Configure a default hostname and TCP port if they are not set */
958
void flb_output_net_default(const char *host, const int port,
959
                            struct flb_output_instance *ins)
960
0
{
961
    /* Set default network configuration */
962
0
    if (!ins->host.name) {
963
0
        ins->host.name = flb_sds_create(host);
964
0
    }
965
0
    if (ins->host.port == 0) {
966
0
        ins->host.port = port;
967
0
    }
968
0
}
969
970
/* Add thread pool for output plugin if configured with workers */
971
int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
972
0
{
973
    /* Multi-threading enabled ? (through 'workers' property) */
974
0
    if (ins->tp_workers > 0) {
975
0
        if(flb_output_thread_pool_create(config, ins) != 0) {
976
0
            flb_output_instance_destroy(ins);
977
0
            return -1;
978
0
        }
979
0
        flb_output_thread_pool_start(ins);
980
0
    }
981
982
0
    return 0;
983
0
}
984
985
/* Return an instance name or alias */
986
const char *flb_output_name(struct flb_output_instance *ins)
987
0
{
988
0
    if (ins->alias) {
989
0
        return ins->alias;
990
0
    }
991
992
0
    return ins->name;
993
0
}
994
995
const char *flb_output_get_property(const char *key, struct flb_output_instance *ins)
996
0
{
997
0
    return flb_config_prop_get(key, &ins->properties);
998
0
}
999
1000
#ifdef FLB_HAVE_METRICS
1001
void *flb_output_get_cmt_instance(struct flb_output_instance *ins)
1002
0
{
1003
0
    return (void *)ins->cmt;
1004
0
}
1005
#endif
1006
1007
int flb_output_net_property_check(struct flb_output_instance *ins,
1008
                                  struct flb_config *config)
1009
0
{
1010
0
    int ret = 0;
1011
1012
    /* Get Upstream net_setup configmap */
1013
0
    ins->net_config_map = flb_upstream_get_config_map(config);
1014
0
    if (!ins->net_config_map) {
1015
0
        flb_output_instance_destroy(ins);
1016
0
        return -1;
1017
0
    }
1018
1019
    /*
1020
     * Validate 'net.*' properties: if the plugin use the Upstream interface,
1021
     * it might receive some networking settings.
1022
     */
1023
0
    if (mk_list_size(&ins->net_properties) > 0) {
1024
0
        ret = flb_config_map_properties_check(ins->p->name,
1025
0
                                              &ins->net_properties,
1026
0
                                              ins->net_config_map);
1027
0
        if (ret == -1) {
1028
0
            if (config->program_name) {
1029
0
                flb_helper("try the command: %s -o %s -h\n",
1030
0
                           config->program_name, ins->p->name);
1031
0
            }
1032
0
            return -1;
1033
0
        }
1034
0
    }
1035
1036
0
    return 0;
1037
0
}
1038
1039
int flb_output_plugin_property_check(struct flb_output_instance *ins,
1040
                                     struct flb_config *config)
1041
0
{
1042
0
    int ret = 0;
1043
0
    struct mk_list *config_map;
1044
0
    struct flb_output_plugin *p = ins->p;
1045
1046
0
    if (p->config_map) {
1047
        /*
1048
         * Create a dynamic version of the configmap that will be used by the specific
1049
         * instance in question.
1050
         */
1051
0
        config_map = flb_config_map_create(config, p->config_map);
1052
0
        if (!config_map) {
1053
0
            flb_error("[output] error loading config map for '%s' plugin",
1054
0
                      p->name);
1055
0
            return -1;
1056
0
        }
1057
0
        ins->config_map = config_map;
1058
1059
        /* Validate incoming properties against config map */
1060
0
        ret = flb_config_map_properties_check(ins->p->name,
1061
0
                                              &ins->properties, ins->config_map);
1062
0
        if (ret == -1) {
1063
0
            if (config->program_name) {
1064
0
                flb_helper("try the command: %s -o %s -h\n",
1065
0
                           config->program_name, ins->p->name);
1066
0
            }
1067
0
            return -1;
1068
0
        }
1069
0
    }
1070
1071
0
    return 0;
1072
0
}
1073
1074
/* Trigger the output plugins setup callbacks to prepare them. */
1075
int flb_output_init_all(struct flb_config *config)
1076
0
{
1077
0
    int ret;
1078
0
#ifdef FLB_HAVE_METRICS
1079
0
    char *name;
1080
0
#endif
1081
0
    struct mk_list *tmp;
1082
0
    struct mk_list *head;
1083
0
    struct flb_output_instance *ins;
1084
0
    struct flb_output_plugin *p;
1085
0
    uint64_t ts;
1086
1087
    /* Retrieve the plugin reference */
1088
0
    mk_list_foreach_safe(head, tmp, &config->outputs) {
1089
0
        ins = mk_list_entry(head, struct flb_output_instance, _head);
1090
0
        if (ins->log_level == -1) {
1091
0
            ins->log_level = config->log->level;
1092
0
        }
1093
0
        p = ins->p;
1094
1095
        /* Output Events Channel */
1096
0
        ret = mk_event_channel_create(config->evl,
1097
0
                                      &ins->ch_events[0],
1098
0
                                      &ins->ch_events[1],
1099
0
                                      ins);
1100
0
        if (ret != 0) {
1101
0
            flb_error("could not create events channels for '%s'",
1102
0
                      flb_output_name(ins));
1103
0
            flb_output_instance_destroy(ins);
1104
0
            return -1;
1105
0
        }
1106
0
        flb_debug("[%s:%s] created event channels: read=%i write=%i",
1107
0
                  ins->p->name, flb_output_name(ins),
1108
0
                  ins->ch_events[0], ins->ch_events[1]);
1109
1110
        /*
1111
         * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
1112
         * default, we need to overwrite this value so we can do a clean check
1113
         * into the Engine when the event is triggered.
1114
         */
1115
0
        ins->event.type = FLB_ENGINE_EV_OUTPUT;
1116
1117
        /* Metrics */
1118
0
#ifdef FLB_HAVE_METRICS
1119
        /* Get name or alias for the instance */
1120
0
        name = (char *) flb_output_name(ins);
1121
1122
        /* get timestamp */
1123
0
        ts = cfl_time_now();
1124
1125
        /* CMetrics */
1126
0
        ins->cmt = cmt_create();
1127
0
        if (!ins->cmt) {
1128
0
            flb_error("[output] could not create cmetrics context");
1129
0
            return -1;
1130
0
        }
1131
1132
        /*
1133
         * Register generic output plugin metrics
1134
         */
1135
1136
        /* fluentbit_output_proc_records_total */
1137
0
        ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit",
1138
0
                                                   "output", "proc_records_total",
1139
0
                                                   "Number of processed output records.",
1140
0
                                                   1, (char *[]) {"name"});
1141
0
        cmt_counter_set(ins->cmt_proc_records, ts, 0, 1, (char *[]) {name});
1142
1143
1144
        /* fluentbit_output_proc_bytes_total */
1145
0
        ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit",
1146
0
                                                 "output", "proc_bytes_total",
1147
0
                                                 "Number of processed output bytes.",
1148
0
                                                 1, (char *[]) {"name"});
1149
0
        cmt_counter_set(ins->cmt_proc_bytes, ts, 0, 1, (char *[]) {name});
1150
1151
1152
        /* fluentbit_output_errors_total */
1153
0
        ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit",
1154
0
                                             "output", "errors_total",
1155
0
                                             "Number of output errors.",
1156
0
                                             1, (char *[]) {"name"});
1157
0
        cmt_counter_set(ins->cmt_errors, ts, 0, 1, (char *[]) {name});
1158
1159
1160
        /* fluentbit_output_retries_total */
1161
0
        ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit",
1162
0
                                             "output", "retries_total",
1163
0
                                             "Number of output retries.",
1164
0
                                             1, (char *[]) {"name"});
1165
0
        cmt_counter_set(ins->cmt_retries, ts, 0, 1, (char *[]) {name});
1166
1167
        /* fluentbit_output_retries_failed_total */
1168
0
        ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit",
1169
0
                                             "output", "retries_failed_total",
1170
0
                                             "Number of abandoned batches because "
1171
0
                                             "the maximum number of re-tries was "
1172
0
                                             "reached.",
1173
0
                                             1, (char *[]) {"name"});
1174
0
        cmt_counter_set(ins->cmt_retries_failed, ts, 0, 1, (char *[]) {name});
1175
1176
1177
        /* fluentbit_output_dropped_records_total */
1178
0
        ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit",
1179
0
                                             "output", "dropped_records_total",
1180
0
                                             "Number of dropped records.",
1181
0
                                             1, (char *[]) {"name"});
1182
0
        cmt_counter_set(ins->cmt_dropped_records, ts, 0, 1, (char *[]) {name});
1183
1184
        /* fluentbit_output_retried_records_total */
1185
0
        ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit",
1186
0
                                             "output", "retried_records_total",
1187
0
                                             "Number of retried records.",
1188
0
                                             1, (char *[]) {"name"});
1189
0
        cmt_counter_set(ins->cmt_retried_records, ts, 0, 1, (char *[]) {name});
1190
1191
        /* output_upstream_total_connections */
1192
0
        ins->cmt_upstream_total_connections = cmt_gauge_create(ins->cmt,
1193
0
                                                               "fluentbit",
1194
0
                                                               "output",
1195
0
                                                               "upstream_total_connections",
1196
0
                                                               "Total Connection count.",
1197
0
                                                               1, (char *[]) {"name"});
1198
0
        cmt_gauge_set(ins->cmt_upstream_total_connections,
1199
0
                      ts,
1200
0
                      0,
1201
0
                      1, (char *[]) {name});
1202
1203
        /* output_upstream_total_connections */
1204
0
        ins->cmt_upstream_busy_connections = cmt_gauge_create(ins->cmt,
1205
0
                                                              "fluentbit",
1206
0
                                                              "output",
1207
0
                                                              "upstream_busy_connections",
1208
0
                                                              "Busy Connection count.",
1209
0
                                                              1, (char *[]) {"name"});
1210
0
        cmt_gauge_set(ins->cmt_upstream_busy_connections,
1211
0
                      ts,
1212
0
                      0,
1213
0
                      1, (char *[]) {name});
1214
1215
        /* output_chunk_available_capacity_percent */
1216
0
        ins->cmt_chunk_available_capacity_percent = cmt_gauge_create(ins->cmt,
1217
0
                                                        "fluentbit",
1218
0
                                                        "output",
1219
0
                                                        "chunk_available_capacity_percent",
1220
0
                                                        "Available chunk capacity (percent)",
1221
0
                                                        1, (char *[]) {"name"});
1222
0
        cmt_gauge_set(ins->cmt_chunk_available_capacity_percent,
1223
0
                      ts,
1224
0
                      100.0,
1225
0
                      1, (char *[]) {name});
1226
1227
        /* old API */
1228
0
        ins->metrics = flb_metrics_create(name);
1229
0
        if (ins->metrics) {
1230
0
            flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS,
1231
0
                            "proc_records", ins->metrics);
1232
0
            flb_metrics_add(FLB_METRIC_OUT_OK_BYTES,
1233
0
                            "proc_bytes", ins->metrics);
1234
0
            flb_metrics_add(FLB_METRIC_OUT_ERROR,
1235
0
                            "errors", ins->metrics);
1236
0
            flb_metrics_add(FLB_METRIC_OUT_RETRY,
1237
0
                            "retries", ins->metrics);
1238
0
            flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED,
1239
0
                        "retries_failed", ins->metrics);
1240
0
            flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS,
1241
0
                        "dropped_records", ins->metrics);
1242
0
            flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS,
1243
0
                        "retried_records", ins->metrics);
1244
0
        }
1245
0
#endif
1246
1247
0
#ifdef FLB_HAVE_TLS
1248
0
        if (ins->use_tls == FLB_TRUE) {
1249
0
            ins->tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
1250
0
                                      ins->tls_verify,
1251
0
                                      ins->tls_debug,
1252
0
                                      ins->tls_vhost,
1253
0
                                      ins->tls_ca_path,
1254
0
                                      ins->tls_ca_file,
1255
0
                                      ins->tls_crt_file,
1256
0
                                      ins->tls_key_file,
1257
0
                                      ins->tls_key_passwd);
1258
0
            if (!ins->tls) {
1259
0
                flb_error("[output %s] error initializing TLS context",
1260
0
                          ins->name);
1261
0
                flb_output_instance_destroy(ins);
1262
0
                return -1;
1263
0
            }
1264
1265
0
            if (ins->tls_verify_hostname == FLB_TRUE) {
1266
0
                ret = flb_tls_set_verify_hostname(ins->tls, ins->tls_verify_hostname);
1267
0
                if (ret == -1) {
1268
0
                    flb_error("[output %s] error set up to verify hostname in TLS context",
1269
0
                              ins->name);
1270
1271
0
                    return -1;
1272
0
                }
1273
0
            }
1274
0
        }
1275
0
#endif
1276
        /*
1277
         * Before to call the initialization callback, make sure that the received
1278
         * configuration parameters are valid if the plugin is registering a config map.
1279
         */
1280
0
        if (flb_output_plugin_property_check(ins, config) == -1) {
1281
0
            flb_output_instance_destroy(ins);
1282
0
            return -1;
1283
0
        }
1284
1285
0
#ifdef FLB_HAVE_TLS
1286
0
        struct flb_config_map *m;
1287
1288
        /* TLS config map (just for 'help' formatting purposes) */
1289
0
        ins->tls_config_map = flb_tls_get_config_map(config);
1290
0
        if (!ins->tls_config_map) {
1291
0
            flb_output_instance_destroy(ins);
1292
0
            return -1;
1293
0
        }
1294
1295
        /* Override first configmap value based on it plugin flag */
1296
0
        m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
1297
0
        if (p->flags & FLB_IO_TLS) {
1298
0
            m->value.val.boolean = FLB_TRUE;
1299
0
        }
1300
0
        else {
1301
0
            m->value.val.boolean = FLB_FALSE;
1302
0
        }
1303
0
#endif
1304
1305
        /* Init network defaults */
1306
0
        flb_net_setup_init(&ins->net_setup);
1307
1308
0
        if (flb_output_net_property_check(ins, config) == -1) {
1309
0
            flb_output_instance_destroy(ins);
1310
0
            return -1;
1311
0
        }
1312
1313
        /* Initialize plugin through it 'init callback' */
1314
0
        ret = p->cb_init(ins, config, ins->data);
1315
0
        if (ret == -1) {
1316
0
            flb_error("[output] failed to initialize '%s' plugin",
1317
0
                      p->name);
1318
0
            flb_output_instance_destroy(ins);
1319
0
            return -1;
1320
0
        }
1321
1322
0
        ins->notification_channel = config->notification_channels[1];
1323
1324
        /* Multi-threading enabled if configured */
1325
0
        ret = flb_output_enable_multi_threading(ins, config);
1326
0
        if (ret == -1) {
1327
0
            flb_error("[output] could not start thread pool for '%s' plugin",
1328
0
                      flb_output_name(ins));
1329
0
            return -1;
1330
0
        }
1331
1332
0
        if (ins->is_threaded == FLB_FALSE) {
1333
0
            if (ins->p->cb_worker_init) {
1334
0
                ret = ins->p->cb_worker_init(ins->context, ins->config);
1335
0
            }
1336
0
        }
1337
1338
0
        ins->processor->notification_channel = ins->notification_channel;
1339
1340
        /* initialize processors */
1341
0
        ret = flb_processor_init(ins->processor);
1342
0
        if (ret == -1) {
1343
0
            return -1;
1344
0
        }
1345
0
    }
1346
1347
0
    return 0;
1348
0
}
1349
1350
/* Assign an Configuration context to an Output */
1351
void flb_output_set_context(struct flb_output_instance *ins, void *context)
1352
0
{
1353
0
    ins->context = context;
1354
0
}
1355
1356
/* Check that at least one Output is enabled */
1357
int flb_output_check(struct flb_config *config)
1358
0
{
1359
0
    if (mk_list_is_empty(&config->outputs) == 0) {
1360
0
        return -1;
1361
0
    }
1362
0
    return 0;
1363
0
}
1364
1365
/* Check output plugin's log level.
1366
 * Not for core plugins but for Golang plugins.
1367
 * Golang plugins do not have thread-local flb_worker_ctx information. */
1368
int flb_output_log_check(struct flb_output_instance *ins, int l)
1369
0
{
1370
0
    if (ins->log_level < l) {
1371
0
        return FLB_FALSE;
1372
0
    }
1373
1374
0
    return FLB_TRUE;
1375
0
}
1376
1377
/*
1378
 * Output plugins might have enabled certain features that have not been passed
1379
 * directly to the upstream context. In order to avoid let plugins validate specific
1380
 * variables from the instance context like tls, tls.x, keepalive, etc, we populate
1381
 * them directly through this function.
1382
 */
1383
int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins)
1384
0
{
1385
0
    int flags = 0;
1386
1387
0
    if (!u) {
1388
0
        return -1;
1389
0
    }
1390
1391
    /* TLS */
1392
0
#ifdef FLB_HAVE_TLS
1393
0
    if (ins->use_tls == FLB_TRUE) {
1394
0
        flags |= FLB_IO_TLS;
1395
0
    }
1396
0
    else {
1397
0
        flags |= FLB_IO_TCP;
1398
0
    }
1399
#else
1400
    flags |= FLB_IO_TCP;
1401
#endif
1402
1403
    /* IPv6 */
1404
0
    if (ins->host.ipv6 == FLB_TRUE) {
1405
0
        flags |= FLB_IO_IPV6;
1406
0
    }
1407
        /* keepalive */
1408
0
    if (ins->net_setup.keepalive == FLB_TRUE) {
1409
0
        flags |= FLB_IO_TCP_KA;
1410
0
    }
1411
1412
0
    if (ins->net_setup.keepalive == FLB_TRUE) {
1413
0
        flags |= FLB_IO_TCP_KA;
1414
0
    }
1415
1416
    /* Set flags */
1417
0
    flb_stream_enable_flags(&u->base, flags);
1418
1419
0
    flb_upstream_set_total_connections_label(u,
1420
0
                                             flb_output_name(ins));
1421
1422
0
    flb_upstream_set_total_connections_gauge(u,
1423
0
                                             ins->cmt_upstream_total_connections);
1424
1425
0
    flb_upstream_set_busy_connections_label(u,
1426
0
                                            flb_output_name(ins));
1427
1428
0
    flb_upstream_set_busy_connections_gauge(u,
1429
0
                                            ins->cmt_upstream_busy_connections);
1430
1431
    /*
1432
     * If the output plugin flush callbacks will run in multiple threads, enable
1433
     * the thread safe mode for the Upstream context.
1434
     */
1435
0
    if (ins->tp_workers > 0) {
1436
0
        flb_stream_enable_thread_safety(&u->base);
1437
1438
0
        mk_list_add(&u->base._head, &ins->upstreams);
1439
0
    }
1440
1441
    /* Set networking options 'net.*' received through instance properties */
1442
0
    memcpy(&u->base.net, &ins->net_setup, sizeof(struct flb_net_setup));
1443
1444
0
    return 0;
1445
0
}
1446
1447
int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins)
1448
0
{
1449
0
    struct mk_list *head;
1450
0
    struct flb_upstream_node *node;
1451
0
    struct flb_upstream_ha *upstream_ha = ha;
1452
1453
0
    mk_list_foreach(head, &upstream_ha->nodes) {
1454
0
        node = mk_list_entry(head, struct flb_upstream_node, _head);
1455
0
        flb_output_upstream_set(node->u, ins);
1456
0
    }
1457
1458
0
    return 0;
1459
0
}
1460
1461
/*
1462
 * Helper function to set HTTP callbacks using the output instance 'callback'
1463
 * context.
1464
 */
1465
int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins)
1466
0
{
1467
#ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
1468
    return flb_http_client_debug_setup(ins->callback, &ins->properties);
1469
#else
1470
0
    return 0;
1471
0
#endif
1472
0
}