Coverage Report

Created: 2026-05-16 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_http/http.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_output.h>
22
#include <fluent-bit/flb_http_client.h>
23
#include <fluent-bit/flb_pack.h>
24
#include <fluent-bit/flb_str.h>
25
#include <fluent-bit/flb_time.h>
26
#include <fluent-bit/flb_utils.h>
27
#include <fluent-bit/flb_pack.h>
28
#include <fluent-bit/flb_sds.h>
29
30
#include <fluent-bit/flb_gzip.h>
31
#include <fluent-bit/flb_snappy.h>
32
#include <fluent-bit/flb_zstd.h>
33
34
#include <fluent-bit/flb_record_accessor.h>
35
#include <fluent-bit/flb_log_event_decoder.h>
36
#include <msgpack.h>
37
38
#ifdef FLB_HAVE_SIGNV4
39
#ifdef FLB_HAVE_AWS
40
#include <fluent-bit/flb_aws_credentials.h>
41
#include <fluent-bit/flb_signv4.h>
42
#endif
43
#endif
44
45
#include <stdio.h>
46
#include <stdlib.h>
47
#include <string.h>
48
#include <assert.h>
49
#include <errno.h>
50
51
#include "http.h"
52
#include "http_conf.h"
53
54
#include <fluent-bit/flb_callback.h>
55
56
static int cb_http_init(struct flb_output_instance *ins,
57
                        struct flb_config *config, void *data)
58
102
{
59
102
    struct flb_out_http *ctx = NULL;
60
102
    (void) data;
61
62
102
    ctx = flb_http_conf_create(ins, config);
63
102
    if (!ctx) {
64
0
        return -1;
65
0
    }
66
67
    /* Set the plugin context */
68
102
    flb_output_set_context(ins, ctx);
69
70
    /*
71
     * This plugin instance uses the HTTP client interface, let's register
72
     * it debugging callbacks.
73
     */
74
102
    flb_output_set_http_debug_callbacks(ins);
75
76
102
    return 0;
77
102
}
78
79
static void append_headers(struct flb_http_client *c,
80
                           char **headers)
81
0
{
82
0
    int i;
83
0
    char *header_key;
84
0
    char *header_value;
85
86
0
    i = 0;
87
0
    header_key = NULL;
88
0
    header_value = NULL;
89
0
    while (*headers) {
90
0
        if (i % 2 == 0) {
91
0
            header_key = *headers;
92
0
        }
93
0
        else {
94
0
            header_value = *headers;
95
0
        }
96
0
        if (header_key && header_value) {
97
0
            flb_http_add_header(c,
98
0
                                header_key,
99
0
                                strlen(header_key),
100
0
                                header_value,
101
0
                                strlen(header_value));
102
0
            flb_free(header_key);
103
0
            flb_free(header_value);
104
0
            header_key = NULL;
105
0
            header_value = NULL;
106
0
        }
107
0
        headers++;
108
0
        i++;
109
0
    }
110
0
}
111
112
static int http_request(struct flb_out_http *ctx,
113
                        const void *body, size_t body_len,
114
                        const char *tag, int tag_len,
115
                        char **headers)
116
7
{
117
7
    int ret = 0;
118
7
    int out_ret = FLB_OK;
119
7
    int compressed = FLB_FALSE;
120
7
    size_t b_sent;
121
7
    void *payload_buf = NULL;
122
7
    size_t payload_size = 0;
123
7
    struct flb_upstream *u;
124
7
    struct flb_connection *u_conn;
125
7
    struct flb_http_client *c;
126
7
    struct mk_list *head;
127
7
    struct flb_config_map_val *mv;
128
7
    struct flb_slist_entry *key = NULL;
129
7
    struct flb_slist_entry *val = NULL;
130
7
    flb_sds_t signature = NULL;
131
132
    /* Get upstream context and connection */
133
7
    u = ctx->u;
134
7
    u_conn = flb_upstream_conn_get(u);
135
7
    if (!u_conn) {
136
7
        flb_plg_error(ctx->ins, "no upstream connections available to %s:%i",
137
7
                      u->tcp_host, u->tcp_port);
138
7
        return FLB_RETRY;
139
7
    }
140
141
    /* Map payload */
142
0
    payload_buf = (void *) body;
143
0
    payload_size = body_len;
144
145
    /* Should we compress the payload ? */
146
0
    ret = 0;
147
0
    if (ctx->compress_gzip == FLB_TRUE) {
148
0
        ret = flb_gzip_compress((void *) body, body_len,
149
0
                                &payload_buf, &payload_size);
150
0
        if (ret == 0) {
151
0
            compressed = FLB_TRUE;
152
0
        }
153
0
    }
154
0
    else if (ctx->compress_snappy == FLB_TRUE) {
155
0
        ret = flb_snappy_compress((void *) body, body_len,
156
0
                                  (char **) &payload_buf, &payload_size);
157
0
        if (ret == 0) {
158
0
            compressed = FLB_TRUE;
159
0
        }
160
0
    }
161
0
    else if (ctx->compress_zstd == FLB_TRUE) {
162
0
        ret = flb_zstd_compress((void *) body, body_len,
163
0
                                &payload_buf, &payload_size);
164
0
        if (ret == 0) {
165
0
            compressed = FLB_TRUE;
166
0
        }
167
0
    }
168
169
0
    if (ret == -1) {
170
0
        flb_plg_warn(ctx->ins, "could not compress payload, sending as it is");
171
0
        compressed = FLB_FALSE;
172
0
    }
173
174
175
    /* Create HTTP client context */
176
0
    c = flb_http_client(u_conn, ctx->http_method, ctx->uri,
177
0
                        payload_buf, payload_size,
178
0
                        ctx->host, ctx->port,
179
0
                        ctx->proxy, 0);
180
181
0
    if (c == NULL) {
182
0
        flb_plg_error(ctx->ins, "[http_client] failed to create HTTP client");
183
0
        if (payload_buf != body) {
184
0
            flb_free(payload_buf);
185
0
        }
186
187
0
        if (u_conn) {
188
0
            flb_upstream_conn_release(u_conn);
189
0
        }
190
191
0
        return FLB_RETRY;
192
0
    }
193
194
0
    if (c->proxy.host) {
195
0
        flb_plg_debug(ctx->ins, "[http_client] proxy host: %s port: %i",
196
0
                      c->proxy.host, c->proxy.port);
197
0
    }
198
199
    /* Allow duplicated headers ? */
200
0
    flb_http_allow_duplicated_headers(c, ctx->allow_dup_headers);
201
202
    /*
203
     * Direct assignment of the callback context to the HTTP client context.
204
     * This needs to be improved through a more clean API.
205
     */
206
0
    c->cb_ctx = ctx->ins->callback;
207
208
0
    flb_http_set_response_timeout(c, ctx->response_timeout);
209
210
0
    if (ctx->read_idle_timeout > 0) {
211
0
        flb_http_set_read_idle_timeout(c, ctx->read_idle_timeout);
212
0
    }
213
0
    else {
214
0
        flb_http_set_read_idle_timeout(c, ctx->ins->net_setup.io_timeout);
215
0
    }
216
217
    /* Append headers */
218
0
    if (headers) {
219
0
        append_headers(c, headers);
220
0
    }
221
0
    else if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
222
0
        (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
223
0
        (ctx->out_format == FLB_HTTP_OUT_GELF)) {
224
0
        flb_http_add_header(c,
225
0
                            FLB_HTTP_CONTENT_TYPE,
226
0
                            sizeof(FLB_HTTP_CONTENT_TYPE) - 1,
227
0
                            FLB_HTTP_MIME_JSON,
228
0
                            sizeof(FLB_HTTP_MIME_JSON) - 1);
229
0
    }
230
0
    else if (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) {
231
0
        flb_http_add_header(c,
232
0
                            FLB_HTTP_CONTENT_TYPE,
233
0
                            sizeof(FLB_HTTP_CONTENT_TYPE) - 1,
234
0
                            FLB_HTTP_MIME_NDJSON,
235
0
                            sizeof(FLB_HTTP_MIME_NDJSON) - 1);
236
0
    }
237
0
    else if (ctx->out_format == FLB_HTTP_OUT_MSGPACK) {
238
0
        flb_http_add_header(c,
239
0
                            FLB_HTTP_CONTENT_TYPE,
240
0
                            sizeof(FLB_HTTP_CONTENT_TYPE) - 1,
241
0
                            FLB_HTTP_MIME_MSGPACK,
242
0
                            sizeof(FLB_HTTP_MIME_MSGPACK) - 1);
243
0
    }
244
245
0
    if (ctx->header_tag) {
246
0
        flb_http_add_header(c,
247
0
                            ctx->header_tag,
248
0
                            flb_sds_len(ctx->header_tag),
249
0
                            tag, tag_len);
250
0
    }
251
252
    /* Content Encoding: gzip */
253
0
    if (compressed == FLB_TRUE) {
254
0
        if (ctx->compress_gzip == FLB_TRUE) {
255
0
            flb_http_set_content_encoding_gzip(c);
256
0
        }
257
0
        else if (ctx->compress_snappy == FLB_TRUE) {
258
0
            flb_http_set_content_encoding_snappy(c);
259
0
        }
260
0
        else if (ctx->compress_zstd == FLB_TRUE) {
261
0
            flb_http_set_content_encoding_zstd(c);
262
0
        }
263
0
    }
264
265
    /* Basic Auth headers */
266
0
    if (ctx->http_user && ctx->http_passwd) {
267
0
        flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
268
0
    }
269
270
0
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
271
272
0
    flb_config_map_foreach(head, mv, ctx->headers) {
273
0
        key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
274
0
        val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
275
276
0
        flb_http_add_header(c,
277
0
                            key->str, flb_sds_len(key->str),
278
0
                            val->str, flb_sds_len(val->str));
279
0
    }
280
281
0
#ifdef FLB_HAVE_SIGNV4
282
0
#ifdef FLB_HAVE_AWS
283
    /* AWS SigV4 headers */
284
0
    if (ctx->has_aws_auth == FLB_TRUE) {
285
0
        flb_plg_debug(ctx->ins, "signing request with AWS Sigv4");
286
0
        signature = flb_signv4_do(c,
287
0
                                  FLB_TRUE,  /* normalize URI ? */
288
0
                                  FLB_TRUE,  /* add x-amz-date header ? */
289
0
                                  time(NULL),
290
0
                                  (char *) ctx->aws_region,
291
0
                                  (char *) ctx->aws_service,
292
0
                                  0, NULL,
293
0
                                  ctx->aws_provider);
294
295
0
        if (!signature) {
296
0
            flb_plg_error(ctx->ins, "could not sign request with sigv4");
297
0
            out_ret = FLB_RETRY;
298
0
            goto cleanup;
299
0
        }
300
0
        flb_sds_destroy(signature);
301
0
    }
302
0
#endif
303
0
#endif
304
305
0
    ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx);
306
0
    if (ret == 0) {
307
        /*
308
         * Only allow the following HTTP status:
309
         *
310
         * - 200: OK
311
         * - 201: Created
312
         * - 202: Accepted
313
         * - 203: no authorative resp
314
         * - 204: No Content
315
         * - 205: Reset content
316
         *
317
         */
318
0
        if (c->resp.status < 200 || c->resp.status > 205) {
319
0
            if (ctx->log_response_payload &&
320
0
                c->resp.payload && c->resp.payload_size > 0) {
321
0
                flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s",
322
0
                              ctx->host, ctx->port,
323
0
                              c->resp.status, c->resp.payload);
324
0
            }
325
0
            else {
326
0
                flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i",
327
0
                              ctx->host, ctx->port, c->resp.status);
328
0
            }
329
0
            if (c->resp.status >= 400 && c->resp.status < 500 &&
330
0
                c->resp.status != 429 && c->resp.status != 408) {
331
0
                flb_plg_warn(ctx->ins, "could not flush records to %s:%i (http_do=%i), "
332
0
                                "chunk will not be retried",
333
0
                                ctx->host, ctx->port, ret);
334
0
                out_ret = FLB_ERROR;
335
0
            }
336
0
            else {
337
0
                out_ret = FLB_RETRY;
338
0
            }
339
0
        }
340
0
        else {
341
0
            if (ctx->log_response_payload &&
342
0
                c->resp.payload && c->resp.payload_size > 0) {
343
0
                flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s",
344
0
                             ctx->host, ctx->port,
345
0
                             c->resp.status, c->resp.payload);
346
0
            }
347
0
            else {
348
0
                flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i",
349
0
                             ctx->host, ctx->port,
350
0
                             c->resp.status);
351
0
            }
352
0
        }
353
0
    }
354
0
    else {
355
0
        flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
356
0
                      ctx->host, ctx->port, ret);
357
0
        out_ret = FLB_RETRY;
358
0
    }
359
360
0
cleanup:
361
    /*
362
     * If the payload buffer is different than incoming records in body, means
363
     * we generated a different payload and must be freed.
364
     */
365
0
    if (payload_buf != body) {
366
0
        flb_free(payload_buf);
367
0
    }
368
369
    /* Destroy HTTP client context */
370
0
    flb_http_client_destroy(c);
371
372
    /* Release the TCP connection */
373
0
    flb_upstream_conn_release(u_conn);
374
375
0
    return out_ret;
376
0
}
377
378
static int compose_payload_gelf(struct flb_out_http *ctx,
379
                                const char *data, uint64_t bytes,
380
                                void **out_body, size_t *out_size)
381
0
{
382
0
    flb_sds_t s;
383
0
    flb_sds_t tmp = NULL;
384
0
    size_t size = 0;
385
0
    msgpack_object map;
386
0
    struct flb_log_event_decoder log_decoder;
387
0
    struct flb_log_event log_event;
388
0
    int ret;
389
390
0
    size = bytes * 1.5;
391
392
    /* Allocate buffer for our new payload */
393
0
    s = flb_sds_create_size(size);
394
0
    if (!s) {
395
0
        flb_plg_error(ctx->ins, "flb_sds_create_size failed");
396
0
        return FLB_RETRY;
397
0
    }
398
399
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
400
401
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
402
0
        flb_plg_error(ctx->ins,
403
0
                      "Log event decoder initialization error : %d", ret);
404
405
0
        flb_sds_destroy(s);
406
407
0
        return FLB_RETRY;
408
0
    }
409
410
0
    while ((ret = flb_log_event_decoder_next(
411
0
                    &log_decoder,
412
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
413
0
        map = *log_event.body;
414
415
0
        tmp = flb_msgpack_to_gelf(&s, &map,
416
0
                                  &log_event.timestamp,
417
0
                                  &(ctx->gelf_fields));
418
0
        if (!tmp) {
419
0
            flb_plg_error(ctx->ins, "error encoding to GELF");
420
421
0
            flb_sds_destroy(s);
422
0
            flb_log_event_decoder_destroy(&log_decoder);
423
424
0
            return FLB_ERROR;
425
0
        }
426
427
        /* Append new line */
428
0
        tmp = flb_sds_cat(s, "\n", 1);
429
0
        if (!tmp) {
430
0
            flb_plg_error(ctx->ins, "error concatenating records");
431
432
0
            flb_sds_destroy(s);
433
0
            flb_log_event_decoder_destroy(&log_decoder);
434
435
0
            return FLB_RETRY;
436
0
        }
437
438
0
        s = tmp;
439
0
    }
440
441
0
    *out_body = s;
442
0
    *out_size = flb_sds_len(s);
443
444
0
    flb_log_event_decoder_destroy(&log_decoder);
445
446
0
    return FLB_OK;
447
0
}
448
449
static int compose_payload(struct flb_out_http *ctx,
450
                           const void *in_body, size_t in_size,
451
                           void **out_body, size_t *out_size,
452
                           struct flb_config *config)
453
7
{
454
7
    flb_sds_t encoded;
455
456
7
    *out_body = NULL;
457
7
    *out_size = 0;
458
459
7
    if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
460
0
        (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
461
7
        (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES)) {
462
463
7
        encoded = flb_pack_msgpack_to_json_format(in_body,
464
7
                                                  in_size,
465
7
                                                  ctx->out_format,
466
7
                                                  ctx->json_date_format,
467
7
                                                  ctx->date_key,
468
7
                                                  config->json_escape_unicode);
469
7
        if (encoded == NULL) {
470
0
            flb_plg_error(ctx->ins, "failed to convert json");
471
0
            return FLB_ERROR;
472
0
        }
473
7
        *out_body = (void*)encoded;
474
7
        *out_size = flb_sds_len(encoded);
475
7
    }
476
0
    else if (ctx->out_format == FLB_HTTP_OUT_GELF) {
477
0
        return compose_payload_gelf(ctx, in_body, in_size, out_body, out_size);
478
0
    }
479
0
    else {
480
        /* Nothing to do, if the format is msgpack */
481
0
        *out_body = (void *)in_body;
482
0
        *out_size = in_size;
483
0
    }
484
485
7
    return FLB_OK;
486
7
}
487
488
0
static char **extract_headers(msgpack_object *obj) {
489
0
    size_t i;
490
0
    char **headers = NULL;
491
0
    size_t str_count;
492
0
    msgpack_object_map map;
493
0
    msgpack_object_str k;
494
0
    msgpack_object_str v;
495
496
0
    if (obj->type != MSGPACK_OBJECT_MAP) {
497
0
        goto err;
498
0
    }
499
500
0
    map = obj->via.map;
501
0
    str_count = map.size * 2 + 1;
502
0
    headers = flb_calloc(str_count, sizeof *headers);
503
504
0
    if (!headers) {
505
0
        goto err;
506
0
    }
507
508
0
    for (i = 0; i < map.size; i++) {
509
0
        if (map.ptr[i].key.type != MSGPACK_OBJECT_STR ||
510
0
            map.ptr[i].val.type != MSGPACK_OBJECT_STR) {
511
0
            continue;
512
0
        }
513
514
0
        k = map.ptr[i].key.via.str;
515
0
        v = map.ptr[i].val.via.str;
516
517
0
        headers[i * 2] = strndup(k.ptr, k.size);
518
519
0
        if (!headers[i]) {
520
0
            goto err;
521
0
        }
522
523
0
        headers[i * 2 + 1] = strndup(v.ptr, v.size);
524
525
0
        if (!headers[i]) {
526
0
            goto err;
527
0
        }
528
0
    }
529
530
0
    return headers;
531
532
0
err:
533
0
    if (headers) {
534
0
        for (i = 0; i < str_count; i++) {
535
0
            if (headers[i]) {
536
0
                flb_free(headers[i]);
537
0
            }
538
0
        }
539
0
        flb_free(headers);
540
0
    }
541
0
    return NULL;
542
0
}
543
544
static int send_all_requests(struct flb_out_http *ctx,
545
                             const char *data, size_t size,
546
                             flb_sds_t body_key,
547
                             flb_sds_t headers_key,
548
                             struct flb_event_chunk *event_chunk)
549
0
{
550
0
    msgpack_object map;
551
0
    msgpack_object *k;
552
0
    msgpack_object *v;
553
0
    msgpack_object *start_key;
554
0
    const char *body;
555
0
    size_t body_size;
556
0
    bool body_found;
557
0
    bool headers_found;
558
0
    char **headers;
559
0
    size_t record_count = 0;
560
0
    int ret = 0;
561
0
    struct flb_log_event_decoder log_decoder;
562
0
    struct flb_log_event log_event;
563
564
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) data, size);
565
566
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
567
0
        flb_plg_error(ctx->ins,
568
0
                      "Log event decoder initialization error : %d", ret);
569
570
0
        return -1;
571
0
    }
572
573
0
    while ((flb_log_event_decoder_next(
574
0
                    &log_decoder,
575
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
576
0
        headers = NULL;
577
0
        body_found = false;
578
0
        headers_found = false;
579
580
0
        map = *log_event.body;
581
582
0
        if (map.type != MSGPACK_OBJECT_MAP) {
583
0
            ret = -1;
584
0
            break;
585
0
        }
586
587
0
        if (!flb_ra_get_kv_pair(ctx->body_ra, map, &start_key, &k, &v)) {
588
0
            if (v->type == MSGPACK_OBJECT_STR || v->type == MSGPACK_OBJECT_BIN) {
589
0
                body = v->via.str.ptr;
590
0
                body_size = v->via.str.size;
591
0
                body_found = true;
592
0
            }
593
0
            else {
594
0
                flb_plg_warn(ctx->ins,
595
0
                             "failed to extract body using pattern \"%s\" "
596
0
                             "(must be a msgpack string or bin)", ctx->body_key);
597
0
            }
598
0
        }
599
600
0
        if (!flb_ra_get_kv_pair(ctx->headers_ra, map, &start_key, &k, &v)) {
601
0
            headers = extract_headers(v);
602
0
            if (headers) {
603
0
                headers_found = true;
604
0
            }
605
0
            else {
606
0
                flb_plg_warn(ctx->ins,
607
0
                             "error extracting headers using pattern \"%s\"",
608
0
                             ctx->headers_key);
609
0
            }
610
0
        }
611
612
0
        if (body_found && headers_found) {
613
0
            flb_plg_trace(ctx->ins, "sending record %zu via %s",
614
0
                          record_count++,
615
0
                          ctx->http_method == FLB_HTTP_POST ? "POST" : "PUT");
616
0
            ret = http_request(ctx, body, body_size, event_chunk->tag,
617
0
                    flb_sds_len(event_chunk->tag), headers);
618
0
        }
619
0
        else {
620
0
            flb_plg_warn(ctx->ins,
621
0
                         "failed to extract body/headers using patterns "
622
0
                         "\"%s\" and \"%s\"", ctx->body_key, ctx->headers_key);
623
0
            ret = -1;
624
0
            continue;
625
0
        }
626
627
0
        flb_free(headers);
628
0
    }
629
630
0
    flb_log_event_decoder_destroy(&log_decoder);
631
632
0
    return ret;
633
0
}
634
635
static void cb_http_flush(struct flb_event_chunk *event_chunk,
636
                          struct flb_output_flush *out_flush,
637
                          struct flb_input_instance *i_ins,
638
                          void *out_context,
639
                          struct flb_config *config)
640
7
{
641
7
    int ret = FLB_ERROR;
642
7
    struct flb_out_http *ctx = out_context;
643
7
    void *out_body;
644
7
    size_t out_size;
645
7
    (void) i_ins;
646
647
7
    if (ctx->body_key) {
648
0
        ret = send_all_requests(ctx, event_chunk->data, event_chunk->size,
649
0
                                ctx->body_key, ctx->headers_key, event_chunk);
650
0
        if (ret < 0) {
651
0
            flb_plg_error(ctx->ins,
652
0
                          "failed to send requests using body key \"%s\"", ctx->body_key);
653
0
        }
654
0
    }
655
7
    else {
656
7
        ret = compose_payload(ctx, event_chunk->data, event_chunk->size,
657
7
                              &out_body, &out_size, config);
658
7
        if (ret != FLB_OK) {
659
0
            FLB_OUTPUT_RETURN(ret);
660
0
        }
661
662
7
        if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) ||
663
0
            (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) ||
664
0
            (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) ||
665
7
            (ctx->out_format == FLB_HTTP_OUT_GELF)) {
666
7
            ret = http_request(ctx, out_body, out_size,
667
7
                               event_chunk->tag, flb_sds_len(event_chunk->tag), NULL);
668
7
            flb_sds_destroy(out_body);
669
7
        }
670
0
        else {
671
            /* msgpack */
672
0
            ret = http_request(ctx,
673
0
                               event_chunk->data, event_chunk->size,
674
0
                               event_chunk->tag, flb_sds_len(event_chunk->tag), NULL);
675
0
        }
676
7
    }
677
678
7
    FLB_OUTPUT_RETURN(ret);
679
7
}
680
681
static int cb_http_exit(void *data, struct flb_config *config)
682
102
{
683
102
    struct flb_out_http *ctx = data;
684
685
102
    flb_http_conf_destroy(ctx);
686
102
    return 0;
687
102
}
688
689
/* Configuration properties map */
690
static struct flb_config_map config_map[] = {
691
    {
692
     FLB_CONFIG_MAP_STR, "proxy", NULL,
693
     0, FLB_FALSE, 0,
694
     "Specify an HTTP Proxy. The expected format of this value is http://host:port. "
695
    },
696
    {
697
     FLB_CONFIG_MAP_BOOL, "allow_duplicated_headers", "true",
698
     0, FLB_TRUE, offsetof(struct flb_out_http, allow_dup_headers),
699
     "Specify if duplicated headers are allowed or not"
700
    },
701
    {
702
     FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
703
     0, FLB_TRUE, offsetof(struct flb_out_http, log_response_payload),
704
     "Specify if the response paylod should be logged or not"
705
    },
706
    {
707
     FLB_CONFIG_MAP_TIME, "http.response_timeout", "60s",
708
     0, FLB_TRUE, offsetof(struct flb_out_http, response_timeout),
709
     "Set maximum time to wait for a server response"
710
    },
711
    {
712
     FLB_CONFIG_MAP_TIME, "http.read_idle_timeout", "0s",
713
     0, FLB_TRUE, offsetof(struct flb_out_http, read_idle_timeout),
714
     "Set maximum allowed time between two consecutive reads"
715
    },
716
    {
717
     FLB_CONFIG_MAP_STR, "http_user", NULL,
718
     0, FLB_TRUE, offsetof(struct flb_out_http, http_user),
719
     "Set HTTP auth user"
720
    },
721
    {
722
     FLB_CONFIG_MAP_STR, "http_passwd", "",
723
     0, FLB_TRUE, offsetof(struct flb_out_http, http_passwd),
724
     "Set HTTP auth password"
725
    },
726
    {
727
     FLB_CONFIG_MAP_BOOL, "oauth2.enable", "false",
728
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.enabled),
729
     "Enable OAuth2 client credentials for outgoing requests"
730
    },
731
    {
732
     FLB_CONFIG_MAP_STR, "oauth2.token_url", NULL,
733
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.token_url),
734
     "OAuth2 token endpoint URL"
735
    },
736
    {
737
     FLB_CONFIG_MAP_STR, "oauth2.client_id", NULL,
738
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.client_id),
739
     "OAuth2 client_id"
740
    },
741
    {
742
     FLB_CONFIG_MAP_STR, "oauth2.client_secret", NULL,
743
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.client_secret),
744
     "OAuth2 client_secret"
745
    },
746
    {
747
     FLB_CONFIG_MAP_STR, "oauth2.scope", NULL,
748
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.scope),
749
     "Optional OAuth2 scope"
750
    },
751
    {
752
     FLB_CONFIG_MAP_STR, "oauth2.audience", NULL,
753
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.audience),
754
     "Optional OAuth2 audience parameter"
755
    },
756
    {
757
     FLB_CONFIG_MAP_STR, "oauth2.resource", NULL,
758
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.resource),
759
     "Optional OAuth2 resource parameter"
760
    },
761
    {
762
     FLB_CONFIG_MAP_STR, "oauth2.auth_method", "basic",
763
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_auth_method),
764
     "OAuth2 client authentication method: basic, post or private_key_jwt"
765
    },
766
    {
767
     FLB_CONFIG_MAP_STR, "oauth2.jwt_key_file", NULL,
768
     0, FLB_TRUE, offsetof(struct flb_out_http,
769
                           oauth2_config.jwt_key_file),
770
     "Path to PEM private key used by private_key_jwt"
771
    },
772
    {
773
     FLB_CONFIG_MAP_STR, "oauth2.jwt_cert_file", NULL,
774
     0, FLB_TRUE, offsetof(struct flb_out_http,
775
                           oauth2_config.jwt_cert_file),
776
     "Path to certificate file used by private_key_jwt"
777
    },
778
    {
779
     FLB_CONFIG_MAP_STR, "oauth2.jwt_aud", NULL,
780
     0, FLB_TRUE, offsetof(struct flb_out_http,
781
                           oauth2_config.jwt_aud),
782
     "Audience for private_key_jwt assertion (defaults to oauth2.token_url)"
783
    },
784
    {
785
     FLB_CONFIG_MAP_STR, "oauth2.jwt_header", "kid",
786
     0, FLB_TRUE, offsetof(struct flb_out_http,
787
                           oauth2_config.jwt_header),
788
     "JWT header claim name for private_key_jwt thumbprint (kid or x5t)"
789
    },
790
    {
791
     FLB_CONFIG_MAP_INT, "oauth2.jwt_ttl_seconds", "300",
792
     0, FLB_TRUE, offsetof(struct flb_out_http,
793
                           oauth2_config.jwt_ttl),
794
     "Lifetime in seconds for private_key_jwt client assertions"
795
    },
796
    {
797
     FLB_CONFIG_MAP_INT, "oauth2.refresh_skew_seconds", "60",
798
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.refresh_skew),
799
     "Seconds before expiry to refresh the access token"
800
    },
801
    {
802
     FLB_CONFIG_MAP_TIME, "oauth2.timeout", "0s",
803
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.timeout),
804
     "Timeout for OAuth2 token requests (defaults to response_timeout when unset)"
805
    },
806
    {
807
     FLB_CONFIG_MAP_TIME, "oauth2.connect_timeout", "0s",
808
     0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.connect_timeout),
809
     "Connect timeout for OAuth2 token requests"
810
    },
811
#ifdef FLB_HAVE_SIGNV4
812
#ifdef FLB_HAVE_AWS
813
    {
814
     FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
815
     0, FLB_TRUE, offsetof(struct flb_out_http, has_aws_auth),
816
     "Enable AWS SigV4 authentication"
817
    },
818
    {
819
     FLB_CONFIG_MAP_STR, "aws_service", NULL,
820
     0, FLB_TRUE, offsetof(struct flb_out_http, aws_service),
821
     "AWS destination service code, used by SigV4 authentication"
822
    },
823
    FLB_AWS_CREDENTIAL_BASE_CONFIG_MAP(FLB_HTTP_AWS_CREDENTIAL_PREFIX),
824
#endif
825
#endif
826
    {
827
     FLB_CONFIG_MAP_STR, "header_tag", NULL,
828
     0, FLB_TRUE, offsetof(struct flb_out_http, header_tag),
829
     "Set a HTTP header which value is the Tag"
830
    },
831
    {
832
     FLB_CONFIG_MAP_STR, "format", "json",
833
     0, FLB_TRUE, offsetof(struct flb_out_http, format),
834
     "Set desired payload format: json, json_stream, json_lines, gelf or msgpack"
835
    },
836
    {
837
     FLB_CONFIG_MAP_STR, "json_date_format", NULL,
838
     0, FLB_FALSE, 0,
839
     FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION
840
    },
841
    {
842
     FLB_CONFIG_MAP_STR, "json_date_key", "date",
843
     0, FLB_TRUE, offsetof(struct flb_out_http, json_date_key),
844
     "Specify the name of the date field in output"
845
    },
846
    {
847
     FLB_CONFIG_MAP_STR, "compress", NULL,
848
     0, FLB_FALSE, 0,
849
     "Set payload compression mechanism. Option available are 'gzip', 'snappy' and 'zstd'"
850
    },
851
    {
852
     FLB_CONFIG_MAP_SLIST_1, "header", NULL,
853
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_http, headers),
854
     "Add a HTTP header key/value pair. Multiple headers can be set"
855
    },
856
    {
857
     FLB_CONFIG_MAP_STR, "uri", NULL,
858
     0, FLB_TRUE, offsetof(struct flb_out_http, uri),
859
     "Specify an optional HTTP URI for the target web server, e.g: /something"
860
    },
861
    {
862
     FLB_CONFIG_MAP_STR, "http_method", "POST",
863
     0, FLB_FALSE, 0,
864
     "Specify the HTTP method to use. Supported methods are POST and PUT"
865
    },
866
867
    /* Gelf Properties */
868
    {
869
     FLB_CONFIG_MAP_STR, "gelf_timestamp_key", NULL,
870
     0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.timestamp_key),
871
     "Specify the key to use for 'timestamp' in gelf format"
872
    },
873
    {
874
     FLB_CONFIG_MAP_STR, "gelf_host_key", NULL,
875
     0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.host_key),
876
     "Specify the key to use for the 'host' in gelf format"
877
    },
878
    {
879
     FLB_CONFIG_MAP_STR, "gelf_short_message_key", NULL,
880
     0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.short_message_key),
881
     "Specify the key to use as the 'short' message in gelf format"
882
    },
883
    {
884
     FLB_CONFIG_MAP_STR, "gelf_full_message_key", NULL,
885
     0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.full_message_key),
886
     "Specify the key to use for the 'full' message in gelf format"
887
    },
888
    {
889
     FLB_CONFIG_MAP_STR, "gelf_level_key", NULL,
890
     0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.level_key),
891
     "Specify the key to use for the 'level' in gelf format"
892
    },
893
    {
894
     FLB_CONFIG_MAP_STR, "body_key", NULL,
895
     0, FLB_TRUE, offsetof(struct flb_out_http, body_key),
896
     "Specify the key which contains the body"
897
    },
898
    {
899
     FLB_CONFIG_MAP_STR, "headers_key", NULL,
900
     0, FLB_TRUE, offsetof(struct flb_out_http, headers_key),
901
     "Specify the key which contains the headers"
902
    },
903
904
    /* EOF */
905
    {0}
906
};
907
908
static int cb_http_format_test(struct flb_config *config,
909
                               struct flb_input_instance *ins,
910
                               void *plugin_context,
911
                               void *flush_ctx,
912
                               int event_type,
913
                               const char *tag, int tag_len,
914
                               const void *data, size_t bytes,
915
                               void **out_data, size_t *out_size)
916
0
{
917
0
    struct flb_out_http *ctx = plugin_context;
918
0
    int ret;
919
920
0
    ret = compose_payload(ctx, data, bytes, out_data, out_size, config);
921
0
    if (ret != FLB_OK) {
922
0
        flb_error("ret=%d", ret);
923
0
        return -1;
924
0
    }
925
0
    return 0;
926
0
}
927
928
/* Plugin reference */
929
struct flb_output_plugin out_http_plugin = {
930
    .name        = "http",
931
    .description = "HTTP Output",
932
    .cb_init     = cb_http_init,
933
    .cb_pre_run  = NULL,
934
    .cb_flush    = cb_http_flush,
935
    .cb_exit     = cb_http_exit,
936
    .config_map  = config_map,
937
938
    /* for testing */
939
    .test_formatter.callback = cb_http_format_test,
940
941
    .flags       = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
942
    .workers     = 2
943
};