/src/fluent-bit/plugins/out_http/http.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2026 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_output.h> |
22 | | #include <fluent-bit/flb_http_client.h> |
23 | | #include <fluent-bit/flb_pack.h> |
24 | | #include <fluent-bit/flb_str.h> |
25 | | #include <fluent-bit/flb_time.h> |
26 | | #include <fluent-bit/flb_utils.h> |
27 | | #include <fluent-bit/flb_pack.h> |
28 | | #include <fluent-bit/flb_sds.h> |
29 | | |
30 | | #include <fluent-bit/flb_gzip.h> |
31 | | #include <fluent-bit/flb_snappy.h> |
32 | | #include <fluent-bit/flb_zstd.h> |
33 | | |
34 | | #include <fluent-bit/flb_record_accessor.h> |
35 | | #include <fluent-bit/flb_log_event_decoder.h> |
36 | | #include <msgpack.h> |
37 | | |
38 | | #ifdef FLB_HAVE_SIGNV4 |
39 | | #ifdef FLB_HAVE_AWS |
40 | | #include <fluent-bit/flb_aws_credentials.h> |
41 | | #include <fluent-bit/flb_signv4.h> |
42 | | #endif |
43 | | #endif |
44 | | |
45 | | #include <stdio.h> |
46 | | #include <stdlib.h> |
47 | | #include <string.h> |
48 | | #include <assert.h> |
49 | | #include <errno.h> |
50 | | |
51 | | #include "http.h" |
52 | | #include "http_conf.h" |
53 | | |
54 | | #include <fluent-bit/flb_callback.h> |
55 | | |
56 | | static int cb_http_init(struct flb_output_instance *ins, |
57 | | struct flb_config *config, void *data) |
58 | 102 | { |
59 | 102 | struct flb_out_http *ctx = NULL; |
60 | 102 | (void) data; |
61 | | |
62 | 102 | ctx = flb_http_conf_create(ins, config); |
63 | 102 | if (!ctx) { |
64 | 0 | return -1; |
65 | 0 | } |
66 | | |
67 | | /* Set the plugin context */ |
68 | 102 | flb_output_set_context(ins, ctx); |
69 | | |
70 | | /* |
71 | | * This plugin instance uses the HTTP client interface, let's register |
72 | | * it debugging callbacks. |
73 | | */ |
74 | 102 | flb_output_set_http_debug_callbacks(ins); |
75 | | |
76 | 102 | return 0; |
77 | 102 | } |
78 | | |
79 | | static void append_headers(struct flb_http_client *c, |
80 | | char **headers) |
81 | 0 | { |
82 | 0 | int i; |
83 | 0 | char *header_key; |
84 | 0 | char *header_value; |
85 | |
|
86 | 0 | i = 0; |
87 | 0 | header_key = NULL; |
88 | 0 | header_value = NULL; |
89 | 0 | while (*headers) { |
90 | 0 | if (i % 2 == 0) { |
91 | 0 | header_key = *headers; |
92 | 0 | } |
93 | 0 | else { |
94 | 0 | header_value = *headers; |
95 | 0 | } |
96 | 0 | if (header_key && header_value) { |
97 | 0 | flb_http_add_header(c, |
98 | 0 | header_key, |
99 | 0 | strlen(header_key), |
100 | 0 | header_value, |
101 | 0 | strlen(header_value)); |
102 | 0 | flb_free(header_key); |
103 | 0 | flb_free(header_value); |
104 | 0 | header_key = NULL; |
105 | 0 | header_value = NULL; |
106 | 0 | } |
107 | 0 | headers++; |
108 | 0 | i++; |
109 | 0 | } |
110 | 0 | } |
111 | | |
112 | | static int http_request(struct flb_out_http *ctx, |
113 | | const void *body, size_t body_len, |
114 | | const char *tag, int tag_len, |
115 | | char **headers) |
116 | 7 | { |
117 | 7 | int ret = 0; |
118 | 7 | int out_ret = FLB_OK; |
119 | 7 | int compressed = FLB_FALSE; |
120 | 7 | size_t b_sent; |
121 | 7 | void *payload_buf = NULL; |
122 | 7 | size_t payload_size = 0; |
123 | 7 | struct flb_upstream *u; |
124 | 7 | struct flb_connection *u_conn; |
125 | 7 | struct flb_http_client *c; |
126 | 7 | struct mk_list *head; |
127 | 7 | struct flb_config_map_val *mv; |
128 | 7 | struct flb_slist_entry *key = NULL; |
129 | 7 | struct flb_slist_entry *val = NULL; |
130 | 7 | flb_sds_t signature = NULL; |
131 | | |
132 | | /* Get upstream context and connection */ |
133 | 7 | u = ctx->u; |
134 | 7 | u_conn = flb_upstream_conn_get(u); |
135 | 7 | if (!u_conn) { |
136 | 7 | flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", |
137 | 7 | u->tcp_host, u->tcp_port); |
138 | 7 | return FLB_RETRY; |
139 | 7 | } |
140 | | |
141 | | /* Map payload */ |
142 | 0 | payload_buf = (void *) body; |
143 | 0 | payload_size = body_len; |
144 | | |
145 | | /* Should we compress the payload ? */ |
146 | 0 | ret = 0; |
147 | 0 | if (ctx->compress_gzip == FLB_TRUE) { |
148 | 0 | ret = flb_gzip_compress((void *) body, body_len, |
149 | 0 | &payload_buf, &payload_size); |
150 | 0 | if (ret == 0) { |
151 | 0 | compressed = FLB_TRUE; |
152 | 0 | } |
153 | 0 | } |
154 | 0 | else if (ctx->compress_snappy == FLB_TRUE) { |
155 | 0 | ret = flb_snappy_compress((void *) body, body_len, |
156 | 0 | (char **) &payload_buf, &payload_size); |
157 | 0 | if (ret == 0) { |
158 | 0 | compressed = FLB_TRUE; |
159 | 0 | } |
160 | 0 | } |
161 | 0 | else if (ctx->compress_zstd == FLB_TRUE) { |
162 | 0 | ret = flb_zstd_compress((void *) body, body_len, |
163 | 0 | &payload_buf, &payload_size); |
164 | 0 | if (ret == 0) { |
165 | 0 | compressed = FLB_TRUE; |
166 | 0 | } |
167 | 0 | } |
168 | |
|
169 | 0 | if (ret == -1) { |
170 | 0 | flb_plg_warn(ctx->ins, "could not compress payload, sending as it is"); |
171 | 0 | compressed = FLB_FALSE; |
172 | 0 | } |
173 | | |
174 | | |
175 | | /* Create HTTP client context */ |
176 | 0 | c = flb_http_client(u_conn, ctx->http_method, ctx->uri, |
177 | 0 | payload_buf, payload_size, |
178 | 0 | ctx->host, ctx->port, |
179 | 0 | ctx->proxy, 0); |
180 | |
|
181 | 0 | if (c == NULL) { |
182 | 0 | flb_plg_error(ctx->ins, "[http_client] failed to create HTTP client"); |
183 | 0 | if (payload_buf != body) { |
184 | 0 | flb_free(payload_buf); |
185 | 0 | } |
186 | |
|
187 | 0 | if (u_conn) { |
188 | 0 | flb_upstream_conn_release(u_conn); |
189 | 0 | } |
190 | |
|
191 | 0 | return FLB_RETRY; |
192 | 0 | } |
193 | | |
194 | 0 | if (c->proxy.host) { |
195 | 0 | flb_plg_debug(ctx->ins, "[http_client] proxy host: %s port: %i", |
196 | 0 | c->proxy.host, c->proxy.port); |
197 | 0 | } |
198 | | |
199 | | /* Allow duplicated headers ? */ |
200 | 0 | flb_http_allow_duplicated_headers(c, ctx->allow_dup_headers); |
201 | | |
202 | | /* |
203 | | * Direct assignment of the callback context to the HTTP client context. |
204 | | * This needs to be improved through a more clean API. |
205 | | */ |
206 | 0 | c->cb_ctx = ctx->ins->callback; |
207 | |
|
208 | 0 | flb_http_set_response_timeout(c, ctx->response_timeout); |
209 | |
|
210 | 0 | if (ctx->read_idle_timeout > 0) { |
211 | 0 | flb_http_set_read_idle_timeout(c, ctx->read_idle_timeout); |
212 | 0 | } |
213 | 0 | else { |
214 | 0 | flb_http_set_read_idle_timeout(c, ctx->ins->net_setup.io_timeout); |
215 | 0 | } |
216 | | |
217 | | /* Append headers */ |
218 | 0 | if (headers) { |
219 | 0 | append_headers(c, headers); |
220 | 0 | } |
221 | 0 | else if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) || |
222 | 0 | (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) || |
223 | 0 | (ctx->out_format == FLB_HTTP_OUT_GELF)) { |
224 | 0 | flb_http_add_header(c, |
225 | 0 | FLB_HTTP_CONTENT_TYPE, |
226 | 0 | sizeof(FLB_HTTP_CONTENT_TYPE) - 1, |
227 | 0 | FLB_HTTP_MIME_JSON, |
228 | 0 | sizeof(FLB_HTTP_MIME_JSON) - 1); |
229 | 0 | } |
230 | 0 | else if (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) { |
231 | 0 | flb_http_add_header(c, |
232 | 0 | FLB_HTTP_CONTENT_TYPE, |
233 | 0 | sizeof(FLB_HTTP_CONTENT_TYPE) - 1, |
234 | 0 | FLB_HTTP_MIME_NDJSON, |
235 | 0 | sizeof(FLB_HTTP_MIME_NDJSON) - 1); |
236 | 0 | } |
237 | 0 | else if (ctx->out_format == FLB_HTTP_OUT_MSGPACK) { |
238 | 0 | flb_http_add_header(c, |
239 | 0 | FLB_HTTP_CONTENT_TYPE, |
240 | 0 | sizeof(FLB_HTTP_CONTENT_TYPE) - 1, |
241 | 0 | FLB_HTTP_MIME_MSGPACK, |
242 | 0 | sizeof(FLB_HTTP_MIME_MSGPACK) - 1); |
243 | 0 | } |
244 | |
|
245 | 0 | if (ctx->header_tag) { |
246 | 0 | flb_http_add_header(c, |
247 | 0 | ctx->header_tag, |
248 | 0 | flb_sds_len(ctx->header_tag), |
249 | 0 | tag, tag_len); |
250 | 0 | } |
251 | | |
252 | | /* Content Encoding: gzip */ |
253 | 0 | if (compressed == FLB_TRUE) { |
254 | 0 | if (ctx->compress_gzip == FLB_TRUE) { |
255 | 0 | flb_http_set_content_encoding_gzip(c); |
256 | 0 | } |
257 | 0 | else if (ctx->compress_snappy == FLB_TRUE) { |
258 | 0 | flb_http_set_content_encoding_snappy(c); |
259 | 0 | } |
260 | 0 | else if (ctx->compress_zstd == FLB_TRUE) { |
261 | 0 | flb_http_set_content_encoding_zstd(c); |
262 | 0 | } |
263 | 0 | } |
264 | | |
265 | | /* Basic Auth headers */ |
266 | 0 | if (ctx->http_user && ctx->http_passwd) { |
267 | 0 | flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); |
268 | 0 | } |
269 | |
|
270 | 0 | flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); |
271 | |
|
272 | 0 | flb_config_map_foreach(head, mv, ctx->headers) { |
273 | 0 | key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); |
274 | 0 | val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); |
275 | |
|
276 | 0 | flb_http_add_header(c, |
277 | 0 | key->str, flb_sds_len(key->str), |
278 | 0 | val->str, flb_sds_len(val->str)); |
279 | 0 | } |
280 | |
|
281 | 0 | #ifdef FLB_HAVE_SIGNV4 |
282 | 0 | #ifdef FLB_HAVE_AWS |
283 | | /* AWS SigV4 headers */ |
284 | 0 | if (ctx->has_aws_auth == FLB_TRUE) { |
285 | 0 | flb_plg_debug(ctx->ins, "signing request with AWS Sigv4"); |
286 | 0 | signature = flb_signv4_do(c, |
287 | 0 | FLB_TRUE, /* normalize URI ? */ |
288 | 0 | FLB_TRUE, /* add x-amz-date header ? */ |
289 | 0 | time(NULL), |
290 | 0 | (char *) ctx->aws_region, |
291 | 0 | (char *) ctx->aws_service, |
292 | 0 | 0, NULL, |
293 | 0 | ctx->aws_provider); |
294 | |
|
295 | 0 | if (!signature) { |
296 | 0 | flb_plg_error(ctx->ins, "could not sign request with sigv4"); |
297 | 0 | out_ret = FLB_RETRY; |
298 | 0 | goto cleanup; |
299 | 0 | } |
300 | 0 | flb_sds_destroy(signature); |
301 | 0 | } |
302 | 0 | #endif |
303 | 0 | #endif |
304 | | |
305 | 0 | ret = flb_http_do_with_oauth2(c, &b_sent, ctx->oauth2_ctx); |
306 | 0 | if (ret == 0) { |
307 | | /* |
308 | | * Only allow the following HTTP status: |
309 | | * |
310 | | * - 200: OK |
311 | | * - 201: Created |
312 | | * - 202: Accepted |
313 | | * - 203: no authorative resp |
314 | | * - 204: No Content |
315 | | * - 205: Reset content |
316 | | * |
317 | | */ |
318 | 0 | if (c->resp.status < 200 || c->resp.status > 205) { |
319 | 0 | if (ctx->log_response_payload && |
320 | 0 | c->resp.payload && c->resp.payload_size > 0) { |
321 | 0 | flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s", |
322 | 0 | ctx->host, ctx->port, |
323 | 0 | c->resp.status, c->resp.payload); |
324 | 0 | } |
325 | 0 | else { |
326 | 0 | flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i", |
327 | 0 | ctx->host, ctx->port, c->resp.status); |
328 | 0 | } |
329 | 0 | if (c->resp.status >= 400 && c->resp.status < 500 && |
330 | 0 | c->resp.status != 429 && c->resp.status != 408) { |
331 | 0 | flb_plg_warn(ctx->ins, "could not flush records to %s:%i (http_do=%i), " |
332 | 0 | "chunk will not be retried", |
333 | 0 | ctx->host, ctx->port, ret); |
334 | 0 | out_ret = FLB_ERROR; |
335 | 0 | } |
336 | 0 | else { |
337 | 0 | out_ret = FLB_RETRY; |
338 | 0 | } |
339 | 0 | } |
340 | 0 | else { |
341 | 0 | if (ctx->log_response_payload && |
342 | 0 | c->resp.payload && c->resp.payload_size > 0) { |
343 | 0 | flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s", |
344 | 0 | ctx->host, ctx->port, |
345 | 0 | c->resp.status, c->resp.payload); |
346 | 0 | } |
347 | 0 | else { |
348 | 0 | flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i", |
349 | 0 | ctx->host, ctx->port, |
350 | 0 | c->resp.status); |
351 | 0 | } |
352 | 0 | } |
353 | 0 | } |
354 | 0 | else { |
355 | 0 | flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)", |
356 | 0 | ctx->host, ctx->port, ret); |
357 | 0 | out_ret = FLB_RETRY; |
358 | 0 | } |
359 | |
|
360 | 0 | cleanup: |
361 | | /* |
362 | | * If the payload buffer is different than incoming records in body, means |
363 | | * we generated a different payload and must be freed. |
364 | | */ |
365 | 0 | if (payload_buf != body) { |
366 | 0 | flb_free(payload_buf); |
367 | 0 | } |
368 | | |
369 | | /* Destroy HTTP client context */ |
370 | 0 | flb_http_client_destroy(c); |
371 | | |
372 | | /* Release the TCP connection */ |
373 | 0 | flb_upstream_conn_release(u_conn); |
374 | |
|
375 | 0 | return out_ret; |
376 | 0 | } |
377 | | |
378 | | static int compose_payload_gelf(struct flb_out_http *ctx, |
379 | | const char *data, uint64_t bytes, |
380 | | void **out_body, size_t *out_size) |
381 | 0 | { |
382 | 0 | flb_sds_t s; |
383 | 0 | flb_sds_t tmp = NULL; |
384 | 0 | size_t size = 0; |
385 | 0 | msgpack_object map; |
386 | 0 | struct flb_log_event_decoder log_decoder; |
387 | 0 | struct flb_log_event log_event; |
388 | 0 | int ret; |
389 | |
|
390 | 0 | size = bytes * 1.5; |
391 | | |
392 | | /* Allocate buffer for our new payload */ |
393 | 0 | s = flb_sds_create_size(size); |
394 | 0 | if (!s) { |
395 | 0 | flb_plg_error(ctx->ins, "flb_sds_create_size failed"); |
396 | 0 | return FLB_RETRY; |
397 | 0 | } |
398 | | |
399 | 0 | ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); |
400 | |
|
401 | 0 | if (ret != FLB_EVENT_DECODER_SUCCESS) { |
402 | 0 | flb_plg_error(ctx->ins, |
403 | 0 | "Log event decoder initialization error : %d", ret); |
404 | |
|
405 | 0 | flb_sds_destroy(s); |
406 | |
|
407 | 0 | return FLB_RETRY; |
408 | 0 | } |
409 | | |
410 | 0 | while ((ret = flb_log_event_decoder_next( |
411 | 0 | &log_decoder, |
412 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
413 | 0 | map = *log_event.body; |
414 | |
|
415 | 0 | tmp = flb_msgpack_to_gelf(&s, &map, |
416 | 0 | &log_event.timestamp, |
417 | 0 | &(ctx->gelf_fields)); |
418 | 0 | if (!tmp) { |
419 | 0 | flb_plg_error(ctx->ins, "error encoding to GELF"); |
420 | |
|
421 | 0 | flb_sds_destroy(s); |
422 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
423 | |
|
424 | 0 | return FLB_ERROR; |
425 | 0 | } |
426 | | |
427 | | /* Append new line */ |
428 | 0 | tmp = flb_sds_cat(s, "\n", 1); |
429 | 0 | if (!tmp) { |
430 | 0 | flb_plg_error(ctx->ins, "error concatenating records"); |
431 | |
|
432 | 0 | flb_sds_destroy(s); |
433 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
434 | |
|
435 | 0 | return FLB_RETRY; |
436 | 0 | } |
437 | | |
438 | 0 | s = tmp; |
439 | 0 | } |
440 | | |
441 | 0 | *out_body = s; |
442 | 0 | *out_size = flb_sds_len(s); |
443 | |
|
444 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
445 | |
|
446 | 0 | return FLB_OK; |
447 | 0 | } |
448 | | |
449 | | static int compose_payload(struct flb_out_http *ctx, |
450 | | const void *in_body, size_t in_size, |
451 | | void **out_body, size_t *out_size, |
452 | | struct flb_config *config) |
453 | 7 | { |
454 | 7 | flb_sds_t encoded; |
455 | | |
456 | 7 | *out_body = NULL; |
457 | 7 | *out_size = 0; |
458 | | |
459 | 7 | if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) || |
460 | 0 | (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) || |
461 | 7 | (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES)) { |
462 | | |
463 | 7 | encoded = flb_pack_msgpack_to_json_format(in_body, |
464 | 7 | in_size, |
465 | 7 | ctx->out_format, |
466 | 7 | ctx->json_date_format, |
467 | 7 | ctx->date_key, |
468 | 7 | config->json_escape_unicode); |
469 | 7 | if (encoded == NULL) { |
470 | 0 | flb_plg_error(ctx->ins, "failed to convert json"); |
471 | 0 | return FLB_ERROR; |
472 | 0 | } |
473 | 7 | *out_body = (void*)encoded; |
474 | 7 | *out_size = flb_sds_len(encoded); |
475 | 7 | } |
476 | 0 | else if (ctx->out_format == FLB_HTTP_OUT_GELF) { |
477 | 0 | return compose_payload_gelf(ctx, in_body, in_size, out_body, out_size); |
478 | 0 | } |
479 | 0 | else { |
480 | | /* Nothing to do, if the format is msgpack */ |
481 | 0 | *out_body = (void *)in_body; |
482 | 0 | *out_size = in_size; |
483 | 0 | } |
484 | | |
485 | 7 | return FLB_OK; |
486 | 7 | } |
487 | | |
488 | 0 | static char **extract_headers(msgpack_object *obj) { |
489 | 0 | size_t i; |
490 | 0 | char **headers = NULL; |
491 | 0 | size_t str_count; |
492 | 0 | msgpack_object_map map; |
493 | 0 | msgpack_object_str k; |
494 | 0 | msgpack_object_str v; |
495 | |
|
496 | 0 | if (obj->type != MSGPACK_OBJECT_MAP) { |
497 | 0 | goto err; |
498 | 0 | } |
499 | | |
500 | 0 | map = obj->via.map; |
501 | 0 | str_count = map.size * 2 + 1; |
502 | 0 | headers = flb_calloc(str_count, sizeof *headers); |
503 | |
|
504 | 0 | if (!headers) { |
505 | 0 | goto err; |
506 | 0 | } |
507 | | |
508 | 0 | for (i = 0; i < map.size; i++) { |
509 | 0 | if (map.ptr[i].key.type != MSGPACK_OBJECT_STR || |
510 | 0 | map.ptr[i].val.type != MSGPACK_OBJECT_STR) { |
511 | 0 | continue; |
512 | 0 | } |
513 | | |
514 | 0 | k = map.ptr[i].key.via.str; |
515 | 0 | v = map.ptr[i].val.via.str; |
516 | |
|
517 | 0 | headers[i * 2] = strndup(k.ptr, k.size); |
518 | |
|
519 | 0 | if (!headers[i]) { |
520 | 0 | goto err; |
521 | 0 | } |
522 | | |
523 | 0 | headers[i * 2 + 1] = strndup(v.ptr, v.size); |
524 | |
|
525 | 0 | if (!headers[i]) { |
526 | 0 | goto err; |
527 | 0 | } |
528 | 0 | } |
529 | | |
530 | 0 | return headers; |
531 | | |
532 | 0 | err: |
533 | 0 | if (headers) { |
534 | 0 | for (i = 0; i < str_count; i++) { |
535 | 0 | if (headers[i]) { |
536 | 0 | flb_free(headers[i]); |
537 | 0 | } |
538 | 0 | } |
539 | 0 | flb_free(headers); |
540 | 0 | } |
541 | 0 | return NULL; |
542 | 0 | } |
543 | | |
544 | | static int send_all_requests(struct flb_out_http *ctx, |
545 | | const char *data, size_t size, |
546 | | flb_sds_t body_key, |
547 | | flb_sds_t headers_key, |
548 | | struct flb_event_chunk *event_chunk) |
549 | 0 | { |
550 | 0 | msgpack_object map; |
551 | 0 | msgpack_object *k; |
552 | 0 | msgpack_object *v; |
553 | 0 | msgpack_object *start_key; |
554 | 0 | const char *body; |
555 | 0 | size_t body_size; |
556 | 0 | bool body_found; |
557 | 0 | bool headers_found; |
558 | 0 | char **headers; |
559 | 0 | size_t record_count = 0; |
560 | 0 | int ret = 0; |
561 | 0 | struct flb_log_event_decoder log_decoder; |
562 | 0 | struct flb_log_event log_event; |
563 | |
|
564 | 0 | ret = flb_log_event_decoder_init(&log_decoder, (char *) data, size); |
565 | |
|
566 | 0 | if (ret != FLB_EVENT_DECODER_SUCCESS) { |
567 | 0 | flb_plg_error(ctx->ins, |
568 | 0 | "Log event decoder initialization error : %d", ret); |
569 | |
|
570 | 0 | return -1; |
571 | 0 | } |
572 | | |
573 | 0 | while ((flb_log_event_decoder_next( |
574 | 0 | &log_decoder, |
575 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
576 | 0 | headers = NULL; |
577 | 0 | body_found = false; |
578 | 0 | headers_found = false; |
579 | |
|
580 | 0 | map = *log_event.body; |
581 | |
|
582 | 0 | if (map.type != MSGPACK_OBJECT_MAP) { |
583 | 0 | ret = -1; |
584 | 0 | break; |
585 | 0 | } |
586 | | |
587 | 0 | if (!flb_ra_get_kv_pair(ctx->body_ra, map, &start_key, &k, &v)) { |
588 | 0 | if (v->type == MSGPACK_OBJECT_STR || v->type == MSGPACK_OBJECT_BIN) { |
589 | 0 | body = v->via.str.ptr; |
590 | 0 | body_size = v->via.str.size; |
591 | 0 | body_found = true; |
592 | 0 | } |
593 | 0 | else { |
594 | 0 | flb_plg_warn(ctx->ins, |
595 | 0 | "failed to extract body using pattern \"%s\" " |
596 | 0 | "(must be a msgpack string or bin)", ctx->body_key); |
597 | 0 | } |
598 | 0 | } |
599 | |
|
600 | 0 | if (!flb_ra_get_kv_pair(ctx->headers_ra, map, &start_key, &k, &v)) { |
601 | 0 | headers = extract_headers(v); |
602 | 0 | if (headers) { |
603 | 0 | headers_found = true; |
604 | 0 | } |
605 | 0 | else { |
606 | 0 | flb_plg_warn(ctx->ins, |
607 | 0 | "error extracting headers using pattern \"%s\"", |
608 | 0 | ctx->headers_key); |
609 | 0 | } |
610 | 0 | } |
611 | |
|
612 | 0 | if (body_found && headers_found) { |
613 | 0 | flb_plg_trace(ctx->ins, "sending record %zu via %s", |
614 | 0 | record_count++, |
615 | 0 | ctx->http_method == FLB_HTTP_POST ? "POST" : "PUT"); |
616 | 0 | ret = http_request(ctx, body, body_size, event_chunk->tag, |
617 | 0 | flb_sds_len(event_chunk->tag), headers); |
618 | 0 | } |
619 | 0 | else { |
620 | 0 | flb_plg_warn(ctx->ins, |
621 | 0 | "failed to extract body/headers using patterns " |
622 | 0 | "\"%s\" and \"%s\"", ctx->body_key, ctx->headers_key); |
623 | 0 | ret = -1; |
624 | 0 | continue; |
625 | 0 | } |
626 | | |
627 | 0 | flb_free(headers); |
628 | 0 | } |
629 | |
|
630 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
631 | |
|
632 | 0 | return ret; |
633 | 0 | } |
634 | | |
635 | | static void cb_http_flush(struct flb_event_chunk *event_chunk, |
636 | | struct flb_output_flush *out_flush, |
637 | | struct flb_input_instance *i_ins, |
638 | | void *out_context, |
639 | | struct flb_config *config) |
640 | 7 | { |
641 | 7 | int ret = FLB_ERROR; |
642 | 7 | struct flb_out_http *ctx = out_context; |
643 | 7 | void *out_body; |
644 | 7 | size_t out_size; |
645 | 7 | (void) i_ins; |
646 | | |
647 | 7 | if (ctx->body_key) { |
648 | 0 | ret = send_all_requests(ctx, event_chunk->data, event_chunk->size, |
649 | 0 | ctx->body_key, ctx->headers_key, event_chunk); |
650 | 0 | if (ret < 0) { |
651 | 0 | flb_plg_error(ctx->ins, |
652 | 0 | "failed to send requests using body key \"%s\"", ctx->body_key); |
653 | 0 | } |
654 | 0 | } |
655 | 7 | else { |
656 | 7 | ret = compose_payload(ctx, event_chunk->data, event_chunk->size, |
657 | 7 | &out_body, &out_size, config); |
658 | 7 | if (ret != FLB_OK) { |
659 | 0 | FLB_OUTPUT_RETURN(ret); |
660 | 0 | } |
661 | | |
662 | 7 | if ((ctx->out_format == FLB_PACK_JSON_FORMAT_JSON) || |
663 | 0 | (ctx->out_format == FLB_PACK_JSON_FORMAT_STREAM) || |
664 | 0 | (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) || |
665 | 7 | (ctx->out_format == FLB_HTTP_OUT_GELF)) { |
666 | 7 | ret = http_request(ctx, out_body, out_size, |
667 | 7 | event_chunk->tag, flb_sds_len(event_chunk->tag), NULL); |
668 | 7 | flb_sds_destroy(out_body); |
669 | 7 | } |
670 | 0 | else { |
671 | | /* msgpack */ |
672 | 0 | ret = http_request(ctx, |
673 | 0 | event_chunk->data, event_chunk->size, |
674 | 0 | event_chunk->tag, flb_sds_len(event_chunk->tag), NULL); |
675 | 0 | } |
676 | 7 | } |
677 | | |
678 | 7 | FLB_OUTPUT_RETURN(ret); |
679 | 7 | } |
680 | | |
681 | | static int cb_http_exit(void *data, struct flb_config *config) |
682 | 102 | { |
683 | 102 | struct flb_out_http *ctx = data; |
684 | | |
685 | 102 | flb_http_conf_destroy(ctx); |
686 | 102 | return 0; |
687 | 102 | } |
688 | | |
689 | | /* Configuration properties map */ |
690 | | static struct flb_config_map config_map[] = { |
691 | | { |
692 | | FLB_CONFIG_MAP_STR, "proxy", NULL, |
693 | | 0, FLB_FALSE, 0, |
694 | | "Specify an HTTP Proxy. The expected format of this value is http://host:port. " |
695 | | }, |
696 | | { |
697 | | FLB_CONFIG_MAP_BOOL, "allow_duplicated_headers", "true", |
698 | | 0, FLB_TRUE, offsetof(struct flb_out_http, allow_dup_headers), |
699 | | "Specify if duplicated headers are allowed or not" |
700 | | }, |
701 | | { |
702 | | FLB_CONFIG_MAP_BOOL, "log_response_payload", "true", |
703 | | 0, FLB_TRUE, offsetof(struct flb_out_http, log_response_payload), |
704 | | "Specify if the response paylod should be logged or not" |
705 | | }, |
706 | | { |
707 | | FLB_CONFIG_MAP_TIME, "http.response_timeout", "60s", |
708 | | 0, FLB_TRUE, offsetof(struct flb_out_http, response_timeout), |
709 | | "Set maximum time to wait for a server response" |
710 | | }, |
711 | | { |
712 | | FLB_CONFIG_MAP_TIME, "http.read_idle_timeout", "0s", |
713 | | 0, FLB_TRUE, offsetof(struct flb_out_http, read_idle_timeout), |
714 | | "Set maximum allowed time between two consecutive reads" |
715 | | }, |
716 | | { |
717 | | FLB_CONFIG_MAP_STR, "http_user", NULL, |
718 | | 0, FLB_TRUE, offsetof(struct flb_out_http, http_user), |
719 | | "Set HTTP auth user" |
720 | | }, |
721 | | { |
722 | | FLB_CONFIG_MAP_STR, "http_passwd", "", |
723 | | 0, FLB_TRUE, offsetof(struct flb_out_http, http_passwd), |
724 | | "Set HTTP auth password" |
725 | | }, |
726 | | { |
727 | | FLB_CONFIG_MAP_BOOL, "oauth2.enable", "false", |
728 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.enabled), |
729 | | "Enable OAuth2 client credentials for outgoing requests" |
730 | | }, |
731 | | { |
732 | | FLB_CONFIG_MAP_STR, "oauth2.token_url", NULL, |
733 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.token_url), |
734 | | "OAuth2 token endpoint URL" |
735 | | }, |
736 | | { |
737 | | FLB_CONFIG_MAP_STR, "oauth2.client_id", NULL, |
738 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.client_id), |
739 | | "OAuth2 client_id" |
740 | | }, |
741 | | { |
742 | | FLB_CONFIG_MAP_STR, "oauth2.client_secret", NULL, |
743 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.client_secret), |
744 | | "OAuth2 client_secret" |
745 | | }, |
746 | | { |
747 | | FLB_CONFIG_MAP_STR, "oauth2.scope", NULL, |
748 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.scope), |
749 | | "Optional OAuth2 scope" |
750 | | }, |
751 | | { |
752 | | FLB_CONFIG_MAP_STR, "oauth2.audience", NULL, |
753 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.audience), |
754 | | "Optional OAuth2 audience parameter" |
755 | | }, |
756 | | { |
757 | | FLB_CONFIG_MAP_STR, "oauth2.resource", NULL, |
758 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.resource), |
759 | | "Optional OAuth2 resource parameter" |
760 | | }, |
761 | | { |
762 | | FLB_CONFIG_MAP_STR, "oauth2.auth_method", "basic", |
763 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_auth_method), |
764 | | "OAuth2 client authentication method: basic, post or private_key_jwt" |
765 | | }, |
766 | | { |
767 | | FLB_CONFIG_MAP_STR, "oauth2.jwt_key_file", NULL, |
768 | | 0, FLB_TRUE, offsetof(struct flb_out_http, |
769 | | oauth2_config.jwt_key_file), |
770 | | "Path to PEM private key used by private_key_jwt" |
771 | | }, |
772 | | { |
773 | | FLB_CONFIG_MAP_STR, "oauth2.jwt_cert_file", NULL, |
774 | | 0, FLB_TRUE, offsetof(struct flb_out_http, |
775 | | oauth2_config.jwt_cert_file), |
776 | | "Path to certificate file used by private_key_jwt" |
777 | | }, |
778 | | { |
779 | | FLB_CONFIG_MAP_STR, "oauth2.jwt_aud", NULL, |
780 | | 0, FLB_TRUE, offsetof(struct flb_out_http, |
781 | | oauth2_config.jwt_aud), |
782 | | "Audience for private_key_jwt assertion (defaults to oauth2.token_url)" |
783 | | }, |
784 | | { |
785 | | FLB_CONFIG_MAP_STR, "oauth2.jwt_header", "kid", |
786 | | 0, FLB_TRUE, offsetof(struct flb_out_http, |
787 | | oauth2_config.jwt_header), |
788 | | "JWT header claim name for private_key_jwt thumbprint (kid or x5t)" |
789 | | }, |
790 | | { |
791 | | FLB_CONFIG_MAP_INT, "oauth2.jwt_ttl_seconds", "300", |
792 | | 0, FLB_TRUE, offsetof(struct flb_out_http, |
793 | | oauth2_config.jwt_ttl), |
794 | | "Lifetime in seconds for private_key_jwt client assertions" |
795 | | }, |
796 | | { |
797 | | FLB_CONFIG_MAP_INT, "oauth2.refresh_skew_seconds", "60", |
798 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.refresh_skew), |
799 | | "Seconds before expiry to refresh the access token" |
800 | | }, |
801 | | { |
802 | | FLB_CONFIG_MAP_TIME, "oauth2.timeout", "0s", |
803 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.timeout), |
804 | | "Timeout for OAuth2 token requests (defaults to response_timeout when unset)" |
805 | | }, |
806 | | { |
807 | | FLB_CONFIG_MAP_TIME, "oauth2.connect_timeout", "0s", |
808 | | 0, FLB_TRUE, offsetof(struct flb_out_http, oauth2_config.connect_timeout), |
809 | | "Connect timeout for OAuth2 token requests" |
810 | | }, |
811 | | #ifdef FLB_HAVE_SIGNV4 |
812 | | #ifdef FLB_HAVE_AWS |
813 | | { |
814 | | FLB_CONFIG_MAP_BOOL, "aws_auth", "false", |
815 | | 0, FLB_TRUE, offsetof(struct flb_out_http, has_aws_auth), |
816 | | "Enable AWS SigV4 authentication" |
817 | | }, |
818 | | { |
819 | | FLB_CONFIG_MAP_STR, "aws_service", NULL, |
820 | | 0, FLB_TRUE, offsetof(struct flb_out_http, aws_service), |
821 | | "AWS destination service code, used by SigV4 authentication" |
822 | | }, |
823 | | FLB_AWS_CREDENTIAL_BASE_CONFIG_MAP(FLB_HTTP_AWS_CREDENTIAL_PREFIX), |
824 | | #endif |
825 | | #endif |
826 | | { |
827 | | FLB_CONFIG_MAP_STR, "header_tag", NULL, |
828 | | 0, FLB_TRUE, offsetof(struct flb_out_http, header_tag), |
829 | | "Set a HTTP header which value is the Tag" |
830 | | }, |
831 | | { |
832 | | FLB_CONFIG_MAP_STR, "format", "json", |
833 | | 0, FLB_TRUE, offsetof(struct flb_out_http, format), |
834 | | "Set desired payload format: json, json_stream, json_lines, gelf or msgpack" |
835 | | }, |
836 | | { |
837 | | FLB_CONFIG_MAP_STR, "json_date_format", NULL, |
838 | | 0, FLB_FALSE, 0, |
839 | | FBL_PACK_JSON_DATE_FORMAT_DESCRIPTION |
840 | | }, |
841 | | { |
842 | | FLB_CONFIG_MAP_STR, "json_date_key", "date", |
843 | | 0, FLB_TRUE, offsetof(struct flb_out_http, json_date_key), |
844 | | "Specify the name of the date field in output" |
845 | | }, |
846 | | { |
847 | | FLB_CONFIG_MAP_STR, "compress", NULL, |
848 | | 0, FLB_FALSE, 0, |
849 | | "Set payload compression mechanism. Option available are 'gzip', 'snappy' and 'zstd'" |
850 | | }, |
851 | | { |
852 | | FLB_CONFIG_MAP_SLIST_1, "header", NULL, |
853 | | FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_http, headers), |
854 | | "Add a HTTP header key/value pair. Multiple headers can be set" |
855 | | }, |
856 | | { |
857 | | FLB_CONFIG_MAP_STR, "uri", NULL, |
858 | | 0, FLB_TRUE, offsetof(struct flb_out_http, uri), |
859 | | "Specify an optional HTTP URI for the target web server, e.g: /something" |
860 | | }, |
861 | | { |
862 | | FLB_CONFIG_MAP_STR, "http_method", "POST", |
863 | | 0, FLB_FALSE, 0, |
864 | | "Specify the HTTP method to use. Supported methods are POST and PUT" |
865 | | }, |
866 | | |
867 | | /* Gelf Properties */ |
868 | | { |
869 | | FLB_CONFIG_MAP_STR, "gelf_timestamp_key", NULL, |
870 | | 0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.timestamp_key), |
871 | | "Specify the key to use for 'timestamp' in gelf format" |
872 | | }, |
873 | | { |
874 | | FLB_CONFIG_MAP_STR, "gelf_host_key", NULL, |
875 | | 0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.host_key), |
876 | | "Specify the key to use for the 'host' in gelf format" |
877 | | }, |
878 | | { |
879 | | FLB_CONFIG_MAP_STR, "gelf_short_message_key", NULL, |
880 | | 0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.short_message_key), |
881 | | "Specify the key to use as the 'short' message in gelf format" |
882 | | }, |
883 | | { |
884 | | FLB_CONFIG_MAP_STR, "gelf_full_message_key", NULL, |
885 | | 0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.full_message_key), |
886 | | "Specify the key to use for the 'full' message in gelf format" |
887 | | }, |
888 | | { |
889 | | FLB_CONFIG_MAP_STR, "gelf_level_key", NULL, |
890 | | 0, FLB_TRUE, offsetof(struct flb_out_http, gelf_fields.level_key), |
891 | | "Specify the key to use for the 'level' in gelf format" |
892 | | }, |
893 | | { |
894 | | FLB_CONFIG_MAP_STR, "body_key", NULL, |
895 | | 0, FLB_TRUE, offsetof(struct flb_out_http, body_key), |
896 | | "Specify the key which contains the body" |
897 | | }, |
898 | | { |
899 | | FLB_CONFIG_MAP_STR, "headers_key", NULL, |
900 | | 0, FLB_TRUE, offsetof(struct flb_out_http, headers_key), |
901 | | "Specify the key which contains the headers" |
902 | | }, |
903 | | |
904 | | /* EOF */ |
905 | | {0} |
906 | | }; |
907 | | |
908 | | static int cb_http_format_test(struct flb_config *config, |
909 | | struct flb_input_instance *ins, |
910 | | void *plugin_context, |
911 | | void *flush_ctx, |
912 | | int event_type, |
913 | | const char *tag, int tag_len, |
914 | | const void *data, size_t bytes, |
915 | | void **out_data, size_t *out_size) |
916 | 0 | { |
917 | 0 | struct flb_out_http *ctx = plugin_context; |
918 | 0 | int ret; |
919 | |
|
920 | 0 | ret = compose_payload(ctx, data, bytes, out_data, out_size, config); |
921 | 0 | if (ret != FLB_OK) { |
922 | 0 | flb_error("ret=%d", ret); |
923 | 0 | return -1; |
924 | 0 | } |
925 | 0 | return 0; |
926 | 0 | } |
927 | | |
928 | | /* Plugin reference */ |
929 | | struct flb_output_plugin out_http_plugin = { |
930 | | .name = "http", |
931 | | .description = "HTTP Output", |
932 | | .cb_init = cb_http_init, |
933 | | .cb_pre_run = NULL, |
934 | | .cb_flush = cb_http_flush, |
935 | | .cb_exit = cb_http_exit, |
936 | | .config_map = config_map, |
937 | | |
938 | | /* for testing */ |
939 | | .test_formatter.callback = cb_http_format_test, |
940 | | |
941 | | .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, |
942 | | .workers = 2 |
943 | | }; |