/src/fluent-bit/plugins/out_opensearch/opensearch.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2026 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_utils.h> |
22 | | #include <fluent-bit/flb_network.h> |
23 | | #include <fluent-bit/flb_http_client.h> |
24 | | #include <fluent-bit/flb_pack.h> |
25 | | #include <fluent-bit/flb_time.h> |
26 | | #include <fluent-bit/flb_signv4.h> |
27 | | #include <fluent-bit/flb_aws_credentials.h> |
28 | | #include <fluent-bit/flb_gzip.h> |
29 | | #include <fluent-bit/flb_record_accessor.h> |
30 | | #include <fluent-bit/flb_ra_key.h> |
31 | | #include <fluent-bit/flb_log_event_decoder.h> |
32 | | #include <msgpack.h> |
33 | | |
34 | | #include <cfl/cfl.h> |
35 | | |
36 | | #include "opensearch.h" |
37 | | #include "os_conf.h" |
38 | | |
39 | | static int os_pack_array_content(msgpack_packer *tmp_pck, |
40 | | msgpack_object array, |
41 | | struct flb_opensearch *ctx); |
42 | | |
43 | | #ifdef FLB_HAVE_AWS |
44 | | static flb_sds_t add_aws_auth(struct flb_http_client *c, |
45 | | struct flb_opensearch *ctx) |
46 | 0 | { |
47 | 0 | flb_sds_t signature = NULL; |
48 | 0 | int ret; |
49 | |
|
50 | 0 | flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4"); |
51 | | |
52 | | /* Amazon OpenSearch Sigv4 does not allow the host header to include the port */ |
53 | 0 | ret = flb_http_strip_port_from_host(c); |
54 | 0 | if (ret < 0) { |
55 | 0 | flb_plg_error(ctx->ins, "could not strip port from host for sigv4"); |
56 | 0 | return NULL; |
57 | 0 | } |
58 | | |
59 | | /* AWS Fluent Bit user agent */ |
60 | 0 | flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); |
61 | |
|
62 | 0 | signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL), |
63 | 0 | ctx->aws_region, ctx->aws_service_name, |
64 | 0 | S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers, |
65 | 0 | ctx->aws_provider); |
66 | 0 | if (!signature) { |
67 | 0 | flb_plg_error(ctx->ins, "could not sign request with sigv4"); |
68 | 0 | return NULL; |
69 | 0 | } |
70 | 0 | return signature; |
71 | 0 | } |
72 | | #endif /* FLB_HAVE_AWS */ |
73 | | |
74 | | static int os_pack_map_content(msgpack_packer *tmp_pck, |
75 | | msgpack_object map, |
76 | | struct flb_opensearch *ctx) |
77 | 0 | { |
78 | 0 | int i; |
79 | 0 | char *ptr_key = NULL; |
80 | 0 | char buf_key[256]; |
81 | 0 | msgpack_object *k; |
82 | 0 | msgpack_object *v; |
83 | |
|
84 | 0 | for (i = 0; i < map.via.map.size; i++) { |
85 | 0 | k = &map.via.map.ptr[i].key; |
86 | 0 | v = &map.via.map.ptr[i].val; |
87 | 0 | ptr_key = NULL; |
88 | | |
89 | | /* Store key */ |
90 | 0 | const char *key_ptr = NULL; |
91 | 0 | size_t key_size = 0; |
92 | |
|
93 | 0 | if (k->type == MSGPACK_OBJECT_BIN) { |
94 | 0 | key_ptr = k->via.bin.ptr; |
95 | 0 | key_size = k->via.bin.size; |
96 | 0 | } |
97 | 0 | else if (k->type == MSGPACK_OBJECT_STR) { |
98 | 0 | key_ptr = k->via.str.ptr; |
99 | 0 | key_size = k->via.str.size; |
100 | 0 | } |
101 | |
|
102 | 0 | if (key_size < (sizeof(buf_key) - 1)) { |
103 | 0 | memcpy(buf_key, key_ptr, key_size); |
104 | 0 | buf_key[key_size] = '\0'; |
105 | 0 | ptr_key = buf_key; |
106 | 0 | } |
107 | 0 | else { |
108 | | /* Long map keys have a performance penalty */ |
109 | 0 | ptr_key = flb_malloc(key_size + 1); |
110 | 0 | if (!ptr_key) { |
111 | 0 | flb_errno(); |
112 | 0 | return -1; |
113 | 0 | } |
114 | | |
115 | 0 | memcpy(ptr_key, key_ptr, key_size); |
116 | 0 | ptr_key[key_size] = '\0'; |
117 | 0 | } |
118 | | |
119 | | /* |
120 | | * Sanitize key name, it don't allow dots in field names: |
121 | | * |
122 | | * https://goo.gl/R5NMTr |
123 | | */ |
124 | 0 | if (ctx->replace_dots == FLB_TRUE) { |
125 | 0 | char *p = ptr_key; |
126 | 0 | char *end = ptr_key + key_size; |
127 | 0 | while (p != end) { |
128 | 0 | if (*p == '.') *p = '_'; |
129 | 0 | p++; |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | | /* Append the key */ |
134 | 0 | msgpack_pack_str(tmp_pck, key_size); |
135 | 0 | msgpack_pack_str_body(tmp_pck, ptr_key, key_size); |
136 | | |
137 | | /* Release temporary key if was allocated */ |
138 | 0 | if (ptr_key && ptr_key != buf_key) { |
139 | 0 | flb_free(ptr_key); |
140 | 0 | } |
141 | 0 | ptr_key = NULL; |
142 | | |
143 | | /* |
144 | | * The value can be any data type, if it's a map we need to |
145 | | * sanitize to avoid dots. |
146 | | */ |
147 | 0 | if (v->type == MSGPACK_OBJECT_MAP) { |
148 | 0 | msgpack_pack_map(tmp_pck, v->via.map.size); |
149 | 0 | os_pack_map_content(tmp_pck, *v, ctx); |
150 | 0 | } |
151 | | /* |
152 | | * The value can be any data type, if it's an array we need to |
153 | | * pass it to os_pack_array_content. |
154 | | */ |
155 | 0 | else if (v->type == MSGPACK_OBJECT_ARRAY) { |
156 | 0 | msgpack_pack_array(tmp_pck, v->via.array.size); |
157 | 0 | os_pack_array_content(tmp_pck, *v, ctx); |
158 | 0 | } |
159 | 0 | else { |
160 | 0 | msgpack_pack_object(tmp_pck, *v); |
161 | 0 | } |
162 | 0 | } |
163 | 0 | return 0; |
164 | 0 | } |
165 | | |
166 | | /* |
167 | | * Iterate through the array and sanitize elements. |
168 | | * Mutual recursion with os_pack_map_content. |
169 | | */ |
170 | | static int os_pack_array_content(msgpack_packer *tmp_pck, |
171 | | msgpack_object array, |
172 | | struct flb_opensearch *ctx) |
173 | 0 | { |
174 | 0 | int i; |
175 | 0 | msgpack_object *e; |
176 | |
|
177 | 0 | for (i = 0; i < array.via.array.size; i++) { |
178 | 0 | e = &array.via.array.ptr[i]; |
179 | 0 | if (e->type == MSGPACK_OBJECT_MAP) { |
180 | 0 | msgpack_pack_map(tmp_pck, e->via.map.size); |
181 | 0 | os_pack_map_content(tmp_pck, *e, ctx); |
182 | 0 | } |
183 | 0 | else if (e->type == MSGPACK_OBJECT_ARRAY) { |
184 | 0 | msgpack_pack_array(tmp_pck, e->via.array.size); |
185 | 0 | os_pack_array_content(tmp_pck, *e, ctx); |
186 | 0 | } |
187 | 0 | else { |
188 | 0 | msgpack_pack_object(tmp_pck, *e); |
189 | 0 | } |
190 | 0 | } |
191 | 0 | return 0; |
192 | 0 | } |
193 | | |
194 | | /* |
195 | | * Get _id value from incoming record. |
196 | | * If it successed, return the value as flb_sds_t. |
197 | | * If it failed, return NULL. |
198 | | */ |
199 | | static flb_sds_t os_get_id_value(struct flb_opensearch *ctx, |
200 | | msgpack_object *map) |
201 | 0 | { |
202 | 0 | struct flb_ra_value *rval = NULL; |
203 | 0 | flb_sds_t tmp_str; |
204 | 0 | rval = flb_ra_get_value_object(ctx->ra_id_key, *map); |
205 | 0 | if (rval == NULL) { |
206 | 0 | flb_plg_warn(ctx->ins, "the value of %s is missing", |
207 | 0 | ctx->id_key); |
208 | 0 | return NULL; |
209 | 0 | } |
210 | 0 | else if(rval->o.type != MSGPACK_OBJECT_STR) { |
211 | 0 | flb_plg_warn(ctx->ins, "the value of %s is not string", |
212 | 0 | ctx->id_key); |
213 | 0 | flb_ra_key_value_destroy(rval); |
214 | 0 | return NULL; |
215 | 0 | } |
216 | | |
217 | 0 | tmp_str = flb_sds_create_len(rval->o.via.str.ptr, |
218 | 0 | rval->o.via.str.size); |
219 | 0 | if (tmp_str == NULL) { |
220 | 0 | flb_plg_warn(ctx->ins, "cannot create ID string from record"); |
221 | 0 | flb_ra_key_value_destroy(rval); |
222 | 0 | return NULL; |
223 | 0 | } |
224 | 0 | flb_ra_key_value_destroy(rval); |
225 | 0 | return tmp_str; |
226 | 0 | } |
227 | | |
228 | | static int compose_index_header(struct flb_opensearch *ctx, |
229 | | int index_custom_len, |
230 | | char *logstash_index, size_t logstash_index_size, |
231 | | char *separator_str, |
232 | | struct tm *tm) |
233 | 0 | { |
234 | 0 | int ret; |
235 | 0 | int len; |
236 | 0 | char *p; |
237 | 0 | size_t s; |
238 | | |
239 | | /* Compose Index header */ |
240 | 0 | if (index_custom_len > 0) { |
241 | 0 | p = logstash_index + index_custom_len; |
242 | 0 | } else { |
243 | 0 | p = logstash_index + flb_sds_len(ctx->logstash_prefix); |
244 | 0 | } |
245 | 0 | len = p - logstash_index; |
246 | 0 | ret = snprintf(p, logstash_index_size - len, "%s", |
247 | 0 | separator_str); |
248 | 0 | if (ret > logstash_index_size - len) { |
249 | | /* exceed limit */ |
250 | 0 | return -1; |
251 | 0 | } |
252 | 0 | p += strlen(separator_str); |
253 | 0 | len += strlen(separator_str); |
254 | |
|
255 | 0 | s = strftime(p, logstash_index_size - len, |
256 | 0 | ctx->logstash_dateformat, tm); |
257 | 0 | if (s==0) { |
258 | | /* exceed limit */ |
259 | 0 | return -1; |
260 | 0 | } |
261 | 0 | p += s; |
262 | 0 | *p++ = '\0'; |
263 | |
|
264 | 0 | return 0; |
265 | 0 | } |
266 | | |
267 | | /* |
268 | | * Convert the internal Fluent Bit data representation to the required |
269 | | * one by OpenSearch. |
270 | | */ |
271 | | static int opensearch_format(struct flb_config *config, |
272 | | struct flb_input_instance *ins, |
273 | | void *plugin_context, |
274 | | void *flush_ctx, |
275 | | int event_type, |
276 | | const char *tag, int tag_len, |
277 | | const void *data, size_t bytes, |
278 | | void **out_data, size_t *out_size) |
279 | 0 | { |
280 | 0 | int ret; |
281 | 0 | int len; |
282 | 0 | int map_size; |
283 | 0 | int index_len = 0; |
284 | 0 | int write_op_update = FLB_FALSE; |
285 | 0 | int write_op_upsert = FLB_FALSE; |
286 | 0 | flb_sds_t ra_index = NULL; |
287 | 0 | size_t s = 0; |
288 | 0 | char *index = NULL; |
289 | 0 | char logstash_index[256]; |
290 | 0 | char time_formatted[256]; |
291 | 0 | char index_formatted[256]; |
292 | 0 | char uuid[37]; |
293 | 0 | flb_sds_t out_buf; |
294 | 0 | flb_sds_t id_key_str = NULL; |
295 | 0 | msgpack_object map; |
296 | 0 | flb_sds_t bulk; |
297 | 0 | struct tm tm; |
298 | 0 | struct flb_time tms; |
299 | 0 | msgpack_sbuffer tmp_sbuf; |
300 | 0 | msgpack_packer tmp_pck; |
301 | 0 | cfl_hash_128bits_t hash; |
302 | 0 | unsigned char h[sizeof(cfl_hash_128bits_t)]; |
303 | 0 | int index_custom_len; |
304 | 0 | struct flb_opensearch *ctx = plugin_context; |
305 | 0 | flb_sds_t j_index; |
306 | 0 | struct flb_log_event_decoder log_decoder; |
307 | 0 | struct flb_log_event log_event; |
308 | |
|
309 | 0 | ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); |
310 | |
|
311 | 0 | if (ret != FLB_EVENT_DECODER_SUCCESS) { |
312 | 0 | flb_plg_error(ctx->ins, |
313 | 0 | "Log event decoder initialization error : %d", ret); |
314 | |
|
315 | 0 | return -1; |
316 | 0 | } |
317 | | |
318 | 0 | j_index = flb_sds_create_size(FLB_OS_HEADER_SIZE); |
319 | 0 | if (j_index == NULL) { |
320 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
321 | |
|
322 | 0 | return -1; |
323 | 0 | } |
324 | | |
325 | 0 | bulk = flb_sds_create_size(bytes * 2); |
326 | 0 | if (!bulk) { |
327 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
328 | 0 | flb_sds_destroy(j_index); |
329 | |
|
330 | 0 | return -1; |
331 | 0 | } |
332 | | |
333 | | /* Copy logstash prefix if logstash format is enabled */ |
334 | 0 | if (ctx->logstash_format == FLB_TRUE) { |
335 | 0 | strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index)); |
336 | 0 | logstash_index[sizeof(logstash_index) - 1] = '\0'; |
337 | 0 | } |
338 | | |
339 | | /* |
340 | | * If logstash format and id generation are disabled, pre-generate |
341 | | * the index line for all records. |
342 | | * |
343 | | * The header stored in 'j_index' will be used for the all records on |
344 | | * this payload. |
345 | | */ |
346 | 0 | if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) { |
347 | 0 | flb_time_get(&tms); |
348 | 0 | gmtime_r(&tms.tm.tv_sec, &tm); |
349 | 0 | strftime(index_formatted, sizeof(index_formatted) - 1, |
350 | 0 | ctx->index, &tm); |
351 | 0 | index = index_formatted; |
352 | 0 | if (ctx->suppress_type_name) { |
353 | 0 | index_len = flb_sds_snprintf(&j_index, |
354 | 0 | flb_sds_alloc(j_index), |
355 | 0 | OS_BULK_INDEX_FMT_NO_TYPE, |
356 | 0 | ctx->action, |
357 | 0 | index); |
358 | 0 | } |
359 | 0 | else { |
360 | 0 | index_len = flb_sds_snprintf(&j_index, |
361 | 0 | flb_sds_alloc(j_index), |
362 | 0 | OS_BULK_INDEX_FMT, |
363 | 0 | ctx->action, |
364 | 0 | index, ctx->type); |
365 | 0 | } |
366 | |
|
367 | 0 | if (index_len == -1) { |
368 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
369 | 0 | flb_sds_destroy(bulk); |
370 | 0 | flb_sds_destroy(j_index); |
371 | 0 | return -1; |
372 | 0 | } |
373 | 0 | } |
374 | | |
375 | | /* |
376 | | * Some broken clients may have time drift up to year 1970 |
377 | | * this will generate corresponding index in OpenSearch |
378 | | * in order to prevent generating millions of indexes |
379 | | * we can set to always use current time for index generation |
380 | | */ |
381 | 0 | if (ctx->current_time_index == FLB_TRUE) { |
382 | 0 | flb_time_get(&tms); |
383 | 0 | } |
384 | |
|
385 | 0 | while ((ret = flb_log_event_decoder_next( |
386 | 0 | &log_decoder, |
387 | 0 | &log_event)) == FLB_EVENT_DECODER_SUCCESS) { |
388 | | /* Only pop time from record if current_time_index is disabled */ |
389 | 0 | if (!ctx->current_time_index) { |
390 | 0 | flb_time_copy(&tms, &log_event.timestamp); |
391 | 0 | } |
392 | |
|
393 | 0 | map = *log_event.body; |
394 | 0 | map_size = map.via.map.size; |
395 | |
|
396 | 0 | index_custom_len = 0; |
397 | 0 | if (ctx->logstash_prefix_key) { |
398 | 0 | flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key, |
399 | 0 | (char *) tag, tag_len, |
400 | 0 | map, NULL); |
401 | 0 | if (v) { |
402 | 0 | len = flb_sds_len(v); |
403 | 0 | if (len > 128) { |
404 | 0 | len = 128; |
405 | 0 | memcpy(logstash_index, v, 128); |
406 | 0 | } |
407 | 0 | else { |
408 | 0 | memcpy(logstash_index, v, len); |
409 | 0 | } |
410 | |
|
411 | 0 | index_custom_len = len; |
412 | 0 | flb_sds_destroy(v); |
413 | 0 | } |
414 | 0 | } |
415 | | |
416 | | /* Create temporary msgpack buffer */ |
417 | 0 | msgpack_sbuffer_init(&tmp_sbuf); |
418 | 0 | msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); |
419 | |
|
420 | 0 | if (ctx->include_tag_key) { |
421 | 0 | map_size++; |
422 | 0 | } |
423 | | |
424 | | /* Set the new map size */ |
425 | 0 | msgpack_pack_map(&tmp_pck, map_size + 1); |
426 | | |
427 | | /* Append the time key */ |
428 | 0 | msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); |
429 | 0 | msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); |
430 | | |
431 | | /* Format the time */ |
432 | 0 | gmtime_r(&tms.tm.tv_sec, &tm); |
433 | 0 | s = strftime(time_formatted, sizeof(time_formatted) - 1, |
434 | 0 | ctx->time_key_format, &tm); |
435 | 0 | if (ctx->time_key_nanos) { |
436 | 0 | len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, |
437 | 0 | ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); |
438 | 0 | } else { |
439 | 0 | len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, |
440 | 0 | ".%03" PRIu64 "Z", |
441 | 0 | (uint64_t) tms.tm.tv_nsec / 1000000); |
442 | 0 | } |
443 | |
|
444 | 0 | s += len; |
445 | 0 | msgpack_pack_str(&tmp_pck, s); |
446 | 0 | msgpack_pack_str_body(&tmp_pck, time_formatted, s); |
447 | |
|
448 | 0 | index = ctx->index; |
449 | 0 | if (ctx->logstash_format == FLB_TRUE) { |
450 | 0 | ret = compose_index_header(ctx, index_custom_len, |
451 | 0 | &logstash_index[0], sizeof(logstash_index), |
452 | 0 | ctx->logstash_prefix_separator, &tm); |
453 | 0 | if (ret < 0) { |
454 | | /* retry with default separator */ |
455 | 0 | compose_index_header(ctx, index_custom_len, |
456 | 0 | &logstash_index[0], sizeof(logstash_index), |
457 | 0 | "-", &tm); |
458 | 0 | } |
459 | 0 | index = logstash_index; |
460 | 0 | if (ctx->generate_id == FLB_FALSE) { |
461 | 0 | if (ctx->suppress_type_name) { |
462 | 0 | index_len = flb_sds_snprintf(&j_index, |
463 | 0 | flb_sds_alloc(j_index), |
464 | 0 | OS_BULK_INDEX_FMT_NO_TYPE, |
465 | 0 | ctx->action, |
466 | 0 | index); |
467 | 0 | } |
468 | 0 | else { |
469 | 0 | index_len = flb_sds_snprintf(&j_index, |
470 | 0 | flb_sds_alloc(j_index), |
471 | 0 | OS_BULK_INDEX_FMT, |
472 | 0 | ctx->action, |
473 | 0 | index, ctx->type); |
474 | 0 | } |
475 | 0 | } |
476 | 0 | } |
477 | 0 | else if (ctx->current_time_index == FLB_TRUE) { |
478 | | /* Make sure we handle index time format for index */ |
479 | 0 | strftime(index_formatted, sizeof(index_formatted) - 1, |
480 | 0 | ctx->index, &tm); |
481 | 0 | index = index_formatted; |
482 | 0 | } |
483 | 0 | else if (ctx->ra_index) { |
484 | | // free any previous ra_index to avoid memory leaks. |
485 | 0 | if (ra_index != NULL) { |
486 | 0 | flb_sds_destroy(ra_index); |
487 | 0 | } |
488 | | /* a record accessor pattern exists for the index */ |
489 | 0 | ra_index = flb_ra_translate(ctx->ra_index, |
490 | 0 | (char *) tag, tag_len, |
491 | 0 | map, NULL); |
492 | 0 | if (!ra_index) { |
493 | 0 | flb_plg_warn(ctx->ins, "invalid index translation from record accessor pattern, default to static index"); |
494 | 0 | } |
495 | 0 | else { |
496 | 0 | index = ra_index; |
497 | 0 | } |
498 | |
|
499 | 0 | if (ctx->suppress_type_name) { |
500 | 0 | index_len = flb_sds_snprintf(&j_index, |
501 | 0 | flb_sds_alloc(j_index), |
502 | 0 | OS_BULK_INDEX_FMT_NO_TYPE, |
503 | 0 | ctx->action, |
504 | 0 | index); |
505 | 0 | } |
506 | 0 | else { |
507 | 0 | index_len = flb_sds_snprintf(&j_index, |
508 | 0 | flb_sds_alloc(j_index), |
509 | 0 | OS_BULK_INDEX_FMT, |
510 | 0 | ctx->action, |
511 | 0 | index, ctx->type); |
512 | 0 | } |
513 | 0 | } |
514 | | |
515 | | /* Tag Key */ |
516 | 0 | if (ctx->include_tag_key == FLB_TRUE) { |
517 | 0 | msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); |
518 | 0 | msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); |
519 | 0 | msgpack_pack_str(&tmp_pck, tag_len); |
520 | 0 | msgpack_pack_str_body(&tmp_pck, tag, tag_len); |
521 | 0 | } |
522 | | |
523 | | /* |
524 | | * The map_content routine iterate over each Key/Value pair found in |
525 | | * the map and do some sanitization for the key names. |
526 | | * |
527 | | * There is a restriction that key names cannot contain a dot; if some |
528 | | * dot is found, it's replaced with an underscore. |
529 | | */ |
530 | 0 | ret = os_pack_map_content(&tmp_pck, map, ctx); |
531 | 0 | if (ret == -1) { |
532 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
533 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
534 | 0 | flb_sds_destroy(bulk); |
535 | 0 | flb_sds_destroy(j_index); |
536 | 0 | if (ra_index != NULL) { |
537 | 0 | flb_sds_destroy(ra_index); |
538 | 0 | } |
539 | 0 | return -1; |
540 | 0 | } |
541 | | |
542 | 0 | if (ctx->generate_id == FLB_TRUE) { |
543 | | /* use a 128 bit hash and copy it to a buffer */ |
544 | 0 | hash = cfl_hash_128bits(tmp_sbuf.data, tmp_sbuf.size); |
545 | 0 | memcpy(h, &hash, sizeof(hash)); |
546 | 0 | snprintf(uuid, sizeof(uuid), |
547 | 0 | "%02X%02X%02X%02X-%02X%02X-%02X%02X-" |
548 | 0 | "%02X%02X-%02X%02X%02X%02X%02X%02X", |
549 | 0 | h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7], |
550 | 0 | h[8], h[9], h[10], h[11], h[12], h[13], h[14], h[15]); |
551 | |
|
552 | 0 | if (ctx->suppress_type_name) { |
553 | 0 | index_len = flb_sds_snprintf(&j_index, |
554 | 0 | flb_sds_alloc(j_index), |
555 | 0 | OS_BULK_INDEX_FMT_ID_NO_TYPE, |
556 | 0 | ctx->action, |
557 | 0 | index, uuid); |
558 | 0 | } |
559 | 0 | else { |
560 | 0 | index_len = flb_sds_snprintf(&j_index, |
561 | 0 | flb_sds_alloc(j_index), |
562 | 0 | OS_BULK_INDEX_FMT_ID, |
563 | 0 | ctx->action, |
564 | 0 | index, ctx->type, uuid); |
565 | 0 | } |
566 | 0 | } |
567 | 0 | if (ctx->ra_id_key) { |
568 | 0 | id_key_str = os_get_id_value(ctx ,&map); |
569 | 0 | if (id_key_str) { |
570 | 0 | if (ctx->suppress_type_name) { |
571 | 0 | index_len = flb_sds_snprintf(&j_index, |
572 | 0 | flb_sds_alloc(j_index), |
573 | 0 | OS_BULK_INDEX_FMT_ID_NO_TYPE, |
574 | 0 | ctx->action, |
575 | 0 | index, id_key_str); |
576 | 0 | } |
577 | 0 | else { |
578 | 0 | index_len = flb_sds_snprintf(&j_index, |
579 | 0 | flb_sds_alloc(j_index), |
580 | 0 | OS_BULK_INDEX_FMT_ID, |
581 | 0 | ctx->action, |
582 | 0 | index, ctx->type, id_key_str); |
583 | 0 | } |
584 | 0 | flb_sds_destroy(id_key_str); |
585 | 0 | id_key_str = NULL; |
586 | 0 | } |
587 | 0 | } |
588 | | |
589 | | /* Convert msgpack to JSON */ |
590 | 0 | out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size, |
591 | 0 | config->json_escape_unicode); |
592 | 0 | msgpack_sbuffer_destroy(&tmp_sbuf); |
593 | 0 | if (!out_buf) { |
594 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
595 | 0 | flb_sds_destroy(bulk); |
596 | 0 | flb_sds_destroy(j_index); |
597 | 0 | if (ra_index != NULL) { |
598 | 0 | flb_sds_destroy(ra_index); |
599 | 0 | } |
600 | 0 | return -1; |
601 | 0 | } |
602 | | |
603 | 0 | ret = flb_sds_cat_safe(&bulk, j_index, flb_sds_len(j_index)); |
604 | 0 | if (ret == -1) { |
605 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
606 | 0 | *out_size = 0; |
607 | 0 | flb_sds_destroy(bulk); |
608 | 0 | flb_sds_destroy(j_index); |
609 | 0 | flb_sds_destroy(out_buf); |
610 | 0 | if (ra_index != NULL) { |
611 | 0 | flb_sds_destroy(ra_index); |
612 | 0 | } |
613 | 0 | return -1; |
614 | 0 | } |
615 | | |
616 | 0 | if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) { |
617 | 0 | write_op_update = FLB_TRUE; |
618 | 0 | } |
619 | 0 | else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) { |
620 | 0 | write_op_upsert = FLB_TRUE; |
621 | 0 | } |
622 | | |
623 | | /* UPDATE | UPSERT */ |
624 | 0 | if (write_op_update) { |
625 | 0 | flb_sds_cat_safe(&bulk, |
626 | 0 | OS_BULK_UPDATE_OP_BODY, |
627 | 0 | sizeof(OS_BULK_UPDATE_OP_BODY) - 1); |
628 | 0 | } |
629 | 0 | else if (write_op_upsert) { |
630 | 0 | flb_sds_cat_safe(&bulk, |
631 | 0 | OS_BULK_UPSERT_OP_BODY, |
632 | 0 | sizeof(OS_BULK_UPSERT_OP_BODY) - 1); |
633 | 0 | } |
634 | |
|
635 | 0 | ret = flb_sds_cat_safe(&bulk, out_buf, flb_sds_len(out_buf)); |
636 | 0 | if (ret == -1) { |
637 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
638 | 0 | *out_size = 0; |
639 | 0 | flb_sds_destroy(bulk); |
640 | 0 | flb_sds_destroy(j_index); |
641 | 0 | flb_sds_destroy(out_buf); |
642 | 0 | if (ra_index != NULL) { |
643 | 0 | flb_sds_destroy(ra_index); |
644 | 0 | } |
645 | 0 | return -1; |
646 | 0 | } |
647 | | |
648 | | /* finish UPDATE | UPSERT */ |
649 | 0 | if (write_op_update || write_op_upsert) { |
650 | 0 | flb_sds_cat_safe(&bulk, "}", 1); |
651 | 0 | } |
652 | |
|
653 | 0 | flb_sds_cat_safe(&bulk, "\n", 1); |
654 | 0 | flb_sds_destroy(out_buf); |
655 | 0 | } |
656 | | |
657 | 0 | flb_log_event_decoder_destroy(&log_decoder); |
658 | | |
659 | | /* Set outgoing data */ |
660 | 0 | *out_data = bulk; |
661 | 0 | *out_size = flb_sds_len(bulk); |
662 | |
|
663 | 0 | if (ra_index != NULL) { |
664 | 0 | flb_sds_destroy(ra_index); |
665 | 0 | } |
666 | | /* |
667 | | * Note: we don't destroy the bulk as we need to keep the allocated |
668 | | * buffer with the data. Instead we just release the bulk context and |
669 | | * return the bulk->ptr buffer |
670 | | */ |
671 | 0 | if (ctx->trace_output) { |
672 | 0 | fwrite(*out_data, 1, *out_size, stdout); |
673 | 0 | fflush(stdout); |
674 | 0 | } |
675 | 0 | flb_sds_destroy(j_index); |
676 | 0 | return 0; |
677 | 0 | } |
678 | | |
679 | | static int cb_opensearch_init(struct flb_output_instance *ins, |
680 | | struct flb_config *config, |
681 | | void *data) |
682 | 0 | { |
683 | 0 | struct flb_opensearch *ctx; |
684 | |
|
685 | 0 | ctx = flb_os_conf_create(ins, config); |
686 | 0 | if (!ctx) { |
687 | 0 | flb_plg_error(ins, "cannot initialize plugin"); |
688 | 0 | return -1; |
689 | 0 | } |
690 | | |
691 | 0 | if (ctx->index == NULL && ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { |
692 | 0 | flb_plg_error(ins, "cannot initialize plugin, index is not set and logstash_format and generate_id are both off"); |
693 | 0 | return -1; |
694 | 0 | } |
695 | | |
696 | 0 | flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s", |
697 | 0 | ins->host.name, ins->host.port, ctx->uri, |
698 | 0 | ctx->index, ctx->type); |
699 | |
|
700 | 0 | flb_output_set_context(ins, ctx); |
701 | | |
702 | | /* |
703 | | * This plugin instance uses the HTTP client interface, let's register |
704 | | * it debugging callbacks. |
705 | | */ |
706 | 0 | flb_output_set_http_debug_callbacks(ins); |
707 | |
|
708 | 0 | return 0; |
709 | 0 | } |
710 | | |
711 | | static int opensearch_error_check(struct flb_opensearch *ctx, |
712 | | struct flb_http_client *c) |
713 | 0 | { |
714 | 0 | int i, j, k; |
715 | 0 | int ret; |
716 | 0 | int check = FLB_FALSE; |
717 | 0 | int root_type; |
718 | 0 | char *out_buf; |
719 | 0 | size_t off = 0; |
720 | 0 | size_t out_size; |
721 | 0 | msgpack_unpacked result; |
722 | 0 | msgpack_object root; |
723 | 0 | msgpack_object key; |
724 | 0 | msgpack_object val; |
725 | 0 | msgpack_object item; |
726 | 0 | msgpack_object item_key; |
727 | 0 | msgpack_object item_val; |
728 | | |
729 | | /* |
730 | | * Check if our payload is complete: there is such situations where |
731 | | * the OpenSearch HTTP response body is bigger than the HTTP client |
732 | | * buffer so payload can be incomplete. |
733 | | */ |
734 | | /* Convert JSON payload to msgpack */ |
735 | 0 | ret = flb_pack_json(c->resp.payload, c->resp.payload_size, |
736 | 0 | &out_buf, &out_size, &root_type, NULL); |
737 | 0 | if (ret == -1) { |
738 | | /* Is this an incomplete HTTP Request ? */ |
739 | 0 | if (c->resp.payload_size <= 0) { |
740 | 0 | return FLB_TRUE; |
741 | 0 | } |
742 | | |
743 | | /* Lookup error field */ |
744 | 0 | if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) { |
745 | 0 | return FLB_FALSE; |
746 | 0 | } |
747 | | |
748 | 0 | flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s", |
749 | 0 | c->resp.payload); |
750 | 0 | return FLB_TRUE; |
751 | 0 | } |
752 | | |
753 | | /* Lookup error field */ |
754 | 0 | msgpack_unpacked_init(&result); |
755 | 0 | ret = msgpack_unpack_next(&result, out_buf, out_size, &off); |
756 | 0 | if (ret != MSGPACK_UNPACK_SUCCESS) { |
757 | 0 | flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", |
758 | 0 | c->resp.payload); |
759 | 0 | return FLB_TRUE; |
760 | 0 | } |
761 | | |
762 | 0 | root = result.data; |
763 | 0 | if (root.type != MSGPACK_OBJECT_MAP) { |
764 | 0 | flb_plg_error(ctx->ins, "unexpected payload type=%i", |
765 | 0 | root.type); |
766 | 0 | check = FLB_TRUE; |
767 | 0 | goto done; |
768 | 0 | } |
769 | | |
770 | 0 | for (i = 0; i < root.via.map.size; i++) { |
771 | 0 | key = root.via.map.ptr[i].key; |
772 | 0 | if (key.type != MSGPACK_OBJECT_STR) { |
773 | 0 | flb_plg_error(ctx->ins, "unexpected key type=%i", |
774 | 0 | key.type); |
775 | 0 | check = FLB_TRUE; |
776 | 0 | goto done; |
777 | 0 | } |
778 | | |
779 | 0 | if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) { |
780 | 0 | val = root.via.map.ptr[i].val; |
781 | 0 | if (val.type != MSGPACK_OBJECT_BOOLEAN) { |
782 | 0 | flb_plg_error(ctx->ins, "unexpected 'error' value type=%i", |
783 | 0 | val.type); |
784 | 0 | check = FLB_TRUE; |
785 | 0 | goto done; |
786 | 0 | } |
787 | | |
788 | | /* If error == false, we are OK (no errors = FLB_FALSE) */ |
789 | 0 | if (!val.via.boolean) { |
790 | | /* no errors */ |
791 | 0 | check = FLB_FALSE; |
792 | 0 | goto done; |
793 | 0 | } |
794 | 0 | } |
795 | 0 | else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) { |
796 | 0 | val = root.via.map.ptr[i].val; |
797 | 0 | if (val.type != MSGPACK_OBJECT_ARRAY) { |
798 | 0 | flb_plg_error(ctx->ins, "unexpected 'items' value type=%i", |
799 | 0 | val.type); |
800 | 0 | check = FLB_TRUE; |
801 | 0 | goto done; |
802 | 0 | } |
803 | | |
804 | 0 | for (j = 0; j < val.via.array.size; j++) { |
805 | 0 | item = val.via.array.ptr[j]; |
806 | 0 | if (item.type != MSGPACK_OBJECT_MAP) { |
807 | 0 | flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i", |
808 | 0 | item.type); |
809 | 0 | check = FLB_TRUE; |
810 | 0 | goto done; |
811 | 0 | } |
812 | | |
813 | 0 | if (item.via.map.size != 1) { |
814 | 0 | flb_plg_error(ctx->ins, "unexpected 'item' size=%i", |
815 | 0 | item.via.map.size); |
816 | 0 | check = FLB_TRUE; |
817 | 0 | goto done; |
818 | 0 | } |
819 | | |
820 | 0 | item = item.via.map.ptr[0].val; |
821 | 0 | if (item.type != MSGPACK_OBJECT_MAP) { |
822 | 0 | flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i", |
823 | 0 | item.type); |
824 | 0 | check = FLB_TRUE; |
825 | 0 | goto done; |
826 | 0 | } |
827 | | |
828 | 0 | for (k = 0; k < item.via.map.size; k++) { |
829 | 0 | item_key = item.via.map.ptr[k].key; |
830 | 0 | if (item_key.type != MSGPACK_OBJECT_STR) { |
831 | 0 | flb_plg_error(ctx->ins, "unexpected key type=%i", |
832 | 0 | item_key.type); |
833 | 0 | check = FLB_TRUE; |
834 | 0 | goto done; |
835 | 0 | } |
836 | | |
837 | 0 | if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) { |
838 | 0 | item_val = item.via.map.ptr[k].val; |
839 | |
|
840 | 0 | if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) { |
841 | 0 | flb_plg_error(ctx->ins, "unexpected 'status' value type=%i", |
842 | 0 | item_val.type); |
843 | 0 | check = FLB_TRUE; |
844 | 0 | goto done; |
845 | 0 | } |
846 | | /* Check for errors other than version conflict (document already exists) */ |
847 | 0 | if (item_val.via.i64 != 409) { |
848 | 0 | check = FLB_TRUE; |
849 | 0 | goto done; |
850 | 0 | } |
851 | 0 | } |
852 | 0 | } |
853 | 0 | } |
854 | 0 | } |
855 | 0 | } |
856 | | |
857 | 0 | done: |
858 | 0 | flb_free(out_buf); |
859 | 0 | msgpack_unpacked_destroy(&result); |
860 | 0 | return check; |
861 | 0 | } |
862 | | |
863 | | static void cb_opensearch_flush(struct flb_event_chunk *event_chunk, |
864 | | struct flb_output_flush *out_flush, |
865 | | struct flb_input_instance *ins, void *out_context, |
866 | | struct flb_config *config) |
867 | 0 | { |
868 | 0 | int ret = -1; |
869 | 0 | size_t pack_size; |
870 | 0 | flb_sds_t pack; |
871 | 0 | void *out_buf; |
872 | 0 | size_t out_size; |
873 | 0 | size_t b_sent; |
874 | 0 | struct flb_opensearch *ctx = out_context; |
875 | 0 | struct flb_connection *u_conn; |
876 | 0 | struct flb_http_client *c; |
877 | 0 | flb_sds_t signature = NULL; |
878 | 0 | int compressed = FLB_FALSE; |
879 | 0 | void *final_payload_buf = NULL; |
880 | 0 | size_t final_payload_size = 0; |
881 | | |
882 | | /* Get upstream connection */ |
883 | 0 | u_conn = flb_upstream_conn_get(ctx->u); |
884 | 0 | if (!u_conn) { |
885 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
886 | 0 | } |
887 | | |
888 | | /* Convert format */ |
889 | 0 | if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { |
890 | 0 | pack = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size, |
891 | 0 | config->json_escape_unicode); |
892 | 0 | if (pack) { |
893 | 0 | ret = 0; |
894 | |
|
895 | 0 | out_buf = (void *) pack; |
896 | 0 | out_size = cfl_sds_len(pack); |
897 | 0 | } |
898 | 0 | else { |
899 | 0 | ret = -1; |
900 | 0 | } |
901 | 0 | } |
902 | 0 | else if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { |
903 | 0 | ret = opensearch_format(config, ins, |
904 | 0 | ctx, NULL, |
905 | 0 | event_chunk->type, |
906 | 0 | event_chunk->tag, flb_sds_len(event_chunk->tag), |
907 | 0 | event_chunk->data, event_chunk->size, |
908 | 0 | &out_buf, &out_size); |
909 | 0 | } |
910 | |
|
911 | 0 | if (ret != 0) { |
912 | 0 | flb_upstream_conn_release(u_conn); |
913 | 0 | FLB_OUTPUT_RETURN(FLB_ERROR); |
914 | 0 | } |
915 | | |
916 | 0 | pack = (char *) out_buf; |
917 | 0 | pack_size = out_size; |
918 | |
|
919 | 0 | final_payload_buf = pack; |
920 | 0 | final_payload_size = pack_size; |
921 | | /* Should we compress the payload ? */ |
922 | 0 | if (ctx->compression == FLB_OS_COMPRESSION_GZIP) { |
923 | 0 | ret = flb_gzip_compress((void *) pack, pack_size, |
924 | 0 | &out_buf, &out_size); |
925 | 0 | if (ret == -1) { |
926 | 0 | flb_plg_error(ctx->ins, |
927 | 0 | "cannot gzip payload, disabling compression"); |
928 | 0 | } |
929 | 0 | else { |
930 | 0 | compressed = FLB_TRUE; |
931 | 0 | final_payload_buf = out_buf; |
932 | 0 | final_payload_size = out_size; |
933 | 0 | } |
934 | 0 | } |
935 | | |
936 | | /* Compose HTTP Client request */ |
937 | 0 | c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, |
938 | 0 | final_payload_buf, final_payload_size, NULL, 0, NULL, 0); |
939 | |
|
940 | 0 | flb_http_buffer_size(c, ctx->buffer_size); |
941 | |
|
942 | | #ifndef FLB_HAVE_AWS |
943 | | flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); |
944 | | #endif |
945 | |
|
946 | 0 | flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); |
947 | |
|
948 | 0 | if (ctx->http_user && ctx->http_passwd) { |
949 | 0 | flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); |
950 | 0 | } |
951 | |
|
952 | 0 | #ifdef FLB_HAVE_AWS |
953 | 0 | if (ctx->has_aws_auth == FLB_TRUE) { |
954 | 0 | signature = add_aws_auth(c, ctx); |
955 | 0 | if (!signature) { |
956 | 0 | goto retry; |
957 | 0 | } |
958 | 0 | } |
959 | 0 | else { |
960 | 0 | flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); |
961 | 0 | } |
962 | 0 | #endif |
963 | | |
964 | | /* Set Content-Encoding of compressed payload */ |
965 | 0 | if (compressed == FLB_TRUE) { |
966 | 0 | if (ctx->compression == FLB_OS_COMPRESSION_GZIP) { |
967 | 0 | flb_http_set_content_encoding_gzip(c); |
968 | 0 | } |
969 | 0 | } |
970 | | |
971 | | /* Map debug callbacks */ |
972 | 0 | flb_http_client_debug(c, ctx->ins->callback); |
973 | |
|
974 | 0 | ret = flb_http_do(c, &b_sent); |
975 | 0 | if (ret != 0) { |
976 | 0 | flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); |
977 | 0 | if (signature) { |
978 | 0 | flb_sds_destroy(signature); |
979 | 0 | signature = NULL; |
980 | 0 | } |
981 | 0 | goto retry; |
982 | 0 | } |
983 | 0 | else { |
984 | | /* The request was issued successfully, validate the 'error' field */ |
985 | 0 | flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); |
986 | 0 | if (c->resp.status != 200 && c->resp.status != 201) { |
987 | 0 | if (c->resp.payload_size > 0) { |
988 | 0 | flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", |
989 | 0 | c->resp.status, ctx->uri, c->resp.payload); |
990 | 0 | } |
991 | 0 | else { |
992 | 0 | flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", |
993 | 0 | c->resp.status, ctx->uri); |
994 | 0 | } |
995 | 0 | if (signature) { |
996 | 0 | flb_sds_destroy(signature); |
997 | 0 | signature = NULL; |
998 | 0 | } |
999 | 0 | goto retry; |
1000 | 0 | } |
1001 | | |
1002 | 0 | if (c->resp.payload_size > 0) { |
1003 | | /* |
1004 | | * OpenSearch payload should be JSON, we convert it to msgpack |
1005 | | * and lookup the 'error' field. |
1006 | | */ |
1007 | 0 | ret = opensearch_error_check(ctx, c); |
1008 | 0 | if (ret == FLB_TRUE) { |
1009 | | /* we got an error */ |
1010 | 0 | if (ctx->trace_error) { |
1011 | | /* |
1012 | | * If trace_error is set, trace the actual |
1013 | | * response from Elasticsearch explaining the problem. |
1014 | | * Trace_Output can be used to see the request. |
1015 | | */ |
1016 | 0 | if (pack_size < 4000) { |
1017 | 0 | flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n", |
1018 | 0 | (int) pack_size, pack); |
1019 | 0 | } |
1020 | 0 | if (c->resp.payload_size < 4000) { |
1021 | 0 | flb_plg_error(ctx->ins, "error: Output\n%s", |
1022 | 0 | c->resp.payload); |
1023 | 0 | } else { |
1024 | | /* |
1025 | | * We must use fwrite since the flb_log functions |
1026 | | * will truncate data at 4KB |
1027 | | */ |
1028 | 0 | fwrite(c->resp.payload, 1, c->resp.payload_size, stderr); |
1029 | 0 | fflush(stderr); |
1030 | 0 | } |
1031 | 0 | } |
1032 | 0 | if (signature) { |
1033 | 0 | flb_sds_destroy(signature); |
1034 | 0 | signature = NULL; |
1035 | 0 | } |
1036 | 0 | goto retry; |
1037 | 0 | } |
1038 | 0 | else { |
1039 | 0 | flb_plg_debug(ctx->ins, "OpenSearch response\n%s", |
1040 | 0 | c->resp.payload); |
1041 | 0 | } |
1042 | 0 | } |
1043 | 0 | else { |
1044 | 0 | if (signature) { |
1045 | 0 | flb_sds_destroy(signature); |
1046 | 0 | signature = NULL; |
1047 | 0 | } |
1048 | 0 | goto retry; |
1049 | 0 | } |
1050 | 0 | } |
1051 | | |
1052 | | /* Cleanup */ |
1053 | 0 | flb_http_client_destroy(c); |
1054 | |
|
1055 | 0 | if (final_payload_buf != pack) { |
1056 | 0 | flb_free(final_payload_buf); |
1057 | 0 | } |
1058 | 0 | flb_sds_destroy(pack); |
1059 | |
|
1060 | 0 | flb_upstream_conn_release(u_conn); |
1061 | 0 | if (signature) { |
1062 | 0 | flb_sds_destroy(signature); |
1063 | 0 | } |
1064 | 0 | FLB_OUTPUT_RETURN(FLB_OK); |
1065 | | |
1066 | | /* Issue a retry */ |
1067 | 0 | retry: |
1068 | 0 | flb_http_client_destroy(c); |
1069 | 0 | flb_sds_destroy(pack); |
1070 | |
|
1071 | 0 | if (final_payload_buf != pack) { |
1072 | 0 | flb_free(final_payload_buf); |
1073 | 0 | } |
1074 | |
|
1075 | 0 | flb_upstream_conn_release(u_conn); |
1076 | 0 | FLB_OUTPUT_RETURN(FLB_RETRY); |
1077 | 0 | } |
1078 | | |
1079 | | static int cb_opensearch_exit(void *data, struct flb_config *config) |
1080 | 0 | { |
1081 | 0 | struct flb_opensearch *ctx = data; |
1082 | |
|
1083 | 0 | flb_os_conf_destroy(ctx); |
1084 | 0 | return 0; |
1085 | 0 | } |
1086 | | |
1087 | | /* Configuration properties map */ |
1088 | | static struct flb_config_map config_map[] = { |
1089 | | { |
1090 | | FLB_CONFIG_MAP_STR, "index", FLB_OS_DEFAULT_INDEX, |
1091 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, index), |
1092 | | "Set an index name" |
1093 | | }, |
1094 | | { |
1095 | | FLB_CONFIG_MAP_STR, "type", FLB_OS_DEFAULT_TYPE, |
1096 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, type), |
1097 | | "Set the document type property" |
1098 | | }, |
1099 | | { |
1100 | | FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false", |
1101 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, suppress_type_name), |
1102 | | "If true, mapping types is removed. (for v7.0.0 or later)" |
1103 | | }, |
1104 | | |
1105 | | /* HTTP Authentication */ |
1106 | | { |
1107 | | FLB_CONFIG_MAP_STR, "http_user", NULL, |
1108 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, http_user), |
1109 | | "Optional username credential for access" |
1110 | | }, |
1111 | | { |
1112 | | FLB_CONFIG_MAP_STR, "http_passwd", "", |
1113 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, http_passwd), |
1114 | | "Password for user defined in 'http_user'" |
1115 | | }, |
1116 | | |
1117 | | /* AWS Authentication */ |
1118 | | #ifdef FLB_HAVE_AWS |
1119 | | { |
1120 | | FLB_CONFIG_MAP_BOOL, "aws_auth", "false", |
1121 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, has_aws_auth), |
1122 | | "Enable AWS Sigv4 Authentication" |
1123 | | }, |
1124 | | { |
1125 | | FLB_CONFIG_MAP_STR, "aws_region", NULL, |
1126 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, aws_region), |
1127 | | "AWS Region of your Amazon OpenSearch Service cluster" |
1128 | | }, |
1129 | | { |
1130 | | FLB_CONFIG_MAP_STR, "aws_profile", "default", |
1131 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, aws_profile), |
1132 | | "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in " |
1133 | | "$HOME/.aws/ directory." |
1134 | | }, |
1135 | | { |
1136 | | FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL, |
1137 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, aws_sts_endpoint), |
1138 | | "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option" |
1139 | | }, |
1140 | | { |
1141 | | FLB_CONFIG_MAP_STR, "aws_role_arn", NULL, |
1142 | | 0, FLB_FALSE, 0, |
1143 | | "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster" |
1144 | | }, |
1145 | | { |
1146 | | FLB_CONFIG_MAP_STR, "aws_external_id", NULL, |
1147 | | 0, FLB_FALSE, 0, |
1148 | | "External ID for the AWS IAM Role specified with `aws_role_arn`" |
1149 | | }, |
1150 | | { |
1151 | | FLB_CONFIG_MAP_STR, "aws_service_name", "es", |
1152 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, aws_service_name), |
1153 | | "AWS Service Name" |
1154 | | }, |
1155 | | #endif |
1156 | | |
1157 | | /* Logstash compatibility */ |
1158 | | { |
1159 | | FLB_CONFIG_MAP_BOOL, "logstash_format", "false", |
1160 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_format), |
1161 | | "Enable Logstash format compatibility" |
1162 | | }, |
1163 | | { |
1164 | | FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_OS_DEFAULT_PREFIX, |
1165 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix), |
1166 | | "When Logstash_Format is enabled, the Index name is composed using a prefix " |
1167 | | "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will " |
1168 | | "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date " |
1169 | | "when the data is being generated" |
1170 | | }, |
1171 | | { |
1172 | | FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-", |
1173 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix_separator), |
1174 | | "Set a separator between logstash_prefix and date." |
1175 | | }, |
1176 | | { |
1177 | | FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, |
1178 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix_key), |
1179 | | "When included: the value in the record that belongs to the key will be looked " |
1180 | | "up and over-write the Logstash_Prefix for index generation. If the key/value " |
1181 | | "is not found in the record then the Logstash_Prefix option will act as a " |
1182 | | "fallback. Nested keys are supported through record accessor pattern" |
1183 | | }, |
1184 | | { |
1185 | | FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_OS_DEFAULT_TIME_FMT, |
1186 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_dateformat), |
1187 | | "Time format (based on strftime) to generate the second part of the Index name" |
1188 | | }, |
1189 | | |
1190 | | /* Custom Time and Tag keys */ |
1191 | | { |
1192 | | FLB_CONFIG_MAP_STR, "time_key", FLB_OS_DEFAULT_TIME_KEY, |
1193 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, time_key), |
1194 | | "When Logstash_Format is enabled, each record will get a new timestamp field. " |
1195 | | "The Time_Key property defines the name of that field" |
1196 | | }, |
1197 | | { |
1198 | | FLB_CONFIG_MAP_STR, "time_key_format", FLB_OS_DEFAULT_TIME_KEYF, |
1199 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, time_key_format), |
1200 | | "When Logstash_Format is enabled, this property defines the format of the " |
1201 | | "timestamp" |
1202 | | }, |
1203 | | { |
1204 | | FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false", |
1205 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, time_key_nanos), |
1206 | | "When Logstash_Format is enabled, enabling this property sends nanosecond " |
1207 | | "precision timestamps" |
1208 | | }, |
1209 | | { |
1210 | | FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", |
1211 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, include_tag_key), |
1212 | | "When enabled, it append the Tag name to the record" |
1213 | | }, |
1214 | | { |
1215 | | FLB_CONFIG_MAP_STR, "tag_key", FLB_OS_DEFAULT_TAG_KEY, |
1216 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, tag_key), |
1217 | | "When Include_Tag_Key is enabled, this property defines the key name for the tag" |
1218 | | }, |
1219 | | { |
1220 | | FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_OS_DEFAULT_HTTP_MAX, |
1221 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, buffer_size), |
1222 | | "Specify the buffer size used to read the response from the OpenSearch HTTP " |
1223 | | "service. This option is useful for debugging purposes where is required to read " |
1224 | | "full responses, note that response size grows depending of the number of records " |
1225 | | "inserted. To set an unlimited amount of memory set this value to 'false', " |
1226 | | "otherwise the value must be according to the Unit Size specification" |
1227 | | }, |
1228 | | |
1229 | | /* OpenSearch specifics */ |
1230 | | { |
1231 | | FLB_CONFIG_MAP_STR, "path", NULL, |
1232 | | 0, FLB_FALSE, 0, |
1233 | | "OpenSearch accepts new data on HTTP query path '/_bulk'. But it is also " |
1234 | | "possible to serve OpenSearch behind a reverse proxy on a subpath. This " |
1235 | | "option defines such path on the fluent-bit side. It simply adds a path " |
1236 | | "prefix in the indexing HTTP POST URI" |
1237 | | }, |
1238 | | { |
1239 | | FLB_CONFIG_MAP_STR, "pipeline", NULL, |
1240 | | 0, FLB_FALSE, 0, |
1241 | | "OpenSearch allows to setup filters called pipelines. " |
1242 | | "This option allows to define which pipeline the database should use. For " |
1243 | | "performance reasons is strongly suggested to do parsing and filtering on " |
1244 | | "Fluent Bit side, avoid pipelines" |
1245 | | }, |
1246 | | { |
1247 | | FLB_CONFIG_MAP_BOOL, "generate_id", "false", |
1248 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, generate_id), |
1249 | | "When enabled, generate _id for outgoing records. This prevents duplicate " |
1250 | | "records when retrying" |
1251 | | }, |
1252 | | { |
1253 | | FLB_CONFIG_MAP_STR, "write_operation", "create", |
1254 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, write_operation), |
1255 | | "Operation to use to write in bulk requests" |
1256 | | }, |
1257 | | { |
1258 | | FLB_CONFIG_MAP_STR, "id_key", NULL, |
1259 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, id_key), |
1260 | | "If set, _id will be the value of the key from incoming record." |
1261 | | }, |
1262 | | { |
1263 | | FLB_CONFIG_MAP_BOOL, "replace_dots", "false", |
1264 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, replace_dots), |
1265 | | "When enabled, replace field name dots with underscore." |
1266 | | }, |
1267 | | |
1268 | | { |
1269 | | FLB_CONFIG_MAP_BOOL, "current_time_index", "false", |
1270 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, current_time_index), |
1271 | | "Use current time for index generation instead of message record" |
1272 | | }, |
1273 | | |
1274 | | /* Trace */ |
1275 | | { |
1276 | | FLB_CONFIG_MAP_BOOL, "trace_output", "false", |
1277 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, trace_output), |
1278 | | "When enabled print the OpenSearch API calls to stdout (for diag only)" |
1279 | | }, |
1280 | | { |
1281 | | FLB_CONFIG_MAP_BOOL, "trace_error", "false", |
1282 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, trace_error), |
1283 | | "When enabled print the OpenSearch exception to stderr (for diag only)" |
1284 | | }, |
1285 | | |
1286 | | /* HTTP Compression */ |
1287 | | { |
1288 | | FLB_CONFIG_MAP_STR, "compress", NULL, |
1289 | | 0, FLB_TRUE, offsetof(struct flb_opensearch, compression_str), |
1290 | | "Set payload compression mechanism. Option available is 'gzip'" |
1291 | | }, |
1292 | | |
1293 | | /* EOF */ |
1294 | | {0} |
1295 | | }; |
1296 | | |
1297 | | /* Plugin reference */ |
1298 | | struct flb_output_plugin out_opensearch_plugin = { |
1299 | | .name = "opensearch", |
1300 | | .description = "OpenSearch", |
1301 | | .cb_init = cb_opensearch_init, |
1302 | | .cb_pre_run = NULL, |
1303 | | .cb_flush = cb_opensearch_flush, |
1304 | | .cb_exit = cb_opensearch_exit, |
1305 | | |
1306 | | /* Configuration */ |
1307 | | .config_map = config_map, |
1308 | | |
1309 | | /* Events supported */ |
1310 | | .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_TRACES, |
1311 | | |
1312 | | /* Test */ |
1313 | | .test_formatter.callback = opensearch_format, |
1314 | | |
1315 | | /* Plugin flags */ |
1316 | | .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, |
1317 | | }; |