Coverage Report

Created: 2026-03-09 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_opensearch/opensearch.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_utils.h>
22
#include <fluent-bit/flb_network.h>
23
#include <fluent-bit/flb_http_client.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_time.h>
26
#include <fluent-bit/flb_signv4.h>
27
#include <fluent-bit/flb_aws_credentials.h>
28
#include <fluent-bit/flb_gzip.h>
29
#include <fluent-bit/flb_record_accessor.h>
30
#include <fluent-bit/flb_ra_key.h>
31
#include <fluent-bit/flb_log_event_decoder.h>
32
#include <msgpack.h>
33
34
#include <cfl/cfl.h>
35
36
#include "opensearch.h"
37
#include "os_conf.h"
38
39
static int os_pack_array_content(msgpack_packer *tmp_pck,
40
                                 msgpack_object array,
41
                                 struct flb_opensearch *ctx);
42
43
#ifdef FLB_HAVE_AWS
44
static flb_sds_t add_aws_auth(struct flb_http_client *c,
45
                              struct flb_opensearch *ctx)
46
0
{
47
0
    flb_sds_t signature = NULL;
48
0
    int ret;
49
50
0
    flb_plg_debug(ctx->ins, "Signing request with AWS Sigv4");
51
52
    /* Amazon OpenSearch Sigv4 does not allow the host header to include the port */
53
0
    ret = flb_http_strip_port_from_host(c);
54
0
    if (ret < 0) {
55
0
        flb_plg_error(ctx->ins, "could not strip port from host for sigv4");
56
0
        return NULL;
57
0
    }
58
59
    /* AWS Fluent Bit user agent */
60
0
    flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21);
61
62
0
    signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL),
63
0
                              ctx->aws_region, ctx->aws_service_name,
64
0
                              S3_MODE_SIGNED_PAYLOAD, ctx->aws_unsigned_headers,
65
0
                              ctx->aws_provider);
66
0
    if (!signature) {
67
0
        flb_plg_error(ctx->ins, "could not sign request with sigv4");
68
0
        return NULL;
69
0
    }
70
0
    return signature;
71
0
}
72
#endif /* FLB_HAVE_AWS */
73
74
static int os_pack_map_content(msgpack_packer *tmp_pck,
75
                               msgpack_object map,
76
                               struct flb_opensearch *ctx)
77
0
{
78
0
    int i;
79
0
    char *ptr_key = NULL;
80
0
    char buf_key[256];
81
0
    msgpack_object *k;
82
0
    msgpack_object *v;
83
84
0
    for (i = 0; i < map.via.map.size; i++) {
85
0
        k = &map.via.map.ptr[i].key;
86
0
        v = &map.via.map.ptr[i].val;
87
0
        ptr_key = NULL;
88
89
        /* Store key */
90
0
        const char *key_ptr = NULL;
91
0
        size_t key_size = 0;
92
93
0
        if (k->type == MSGPACK_OBJECT_BIN) {
94
0
            key_ptr  = k->via.bin.ptr;
95
0
            key_size = k->via.bin.size;
96
0
        }
97
0
        else if (k->type == MSGPACK_OBJECT_STR) {
98
0
            key_ptr  = k->via.str.ptr;
99
0
            key_size = k->via.str.size;
100
0
        }
101
102
0
        if (key_size < (sizeof(buf_key) - 1)) {
103
0
            memcpy(buf_key, key_ptr, key_size);
104
0
            buf_key[key_size] = '\0';
105
0
            ptr_key = buf_key;
106
0
        }
107
0
        else {
108
            /* Long map keys have a performance penalty */
109
0
            ptr_key = flb_malloc(key_size + 1);
110
0
            if (!ptr_key) {
111
0
                flb_errno();
112
0
                return -1;
113
0
            }
114
115
0
            memcpy(ptr_key, key_ptr, key_size);
116
0
            ptr_key[key_size] = '\0';
117
0
        }
118
119
        /*
120
         * Sanitize key name, it don't allow dots in field names:
121
         *
122
         *   https://goo.gl/R5NMTr
123
         */
124
0
        if (ctx->replace_dots == FLB_TRUE) {
125
0
            char *p   = ptr_key;
126
0
            char *end = ptr_key + key_size;
127
0
            while (p != end) {
128
0
                if (*p == '.') *p = '_';
129
0
                p++;
130
0
            }
131
0
        }
132
133
        /* Append the key */
134
0
        msgpack_pack_str(tmp_pck, key_size);
135
0
        msgpack_pack_str_body(tmp_pck, ptr_key, key_size);
136
137
        /* Release temporary key if was allocated */
138
0
        if (ptr_key && ptr_key != buf_key) {
139
0
            flb_free(ptr_key);
140
0
        }
141
0
        ptr_key = NULL;
142
143
        /*
144
         * The value can be any data type, if it's a map we need to
145
         * sanitize to avoid dots.
146
         */
147
0
        if (v->type == MSGPACK_OBJECT_MAP) {
148
0
            msgpack_pack_map(tmp_pck, v->via.map.size);
149
0
            os_pack_map_content(tmp_pck, *v, ctx);
150
0
        }
151
        /*
152
         * The value can be any data type, if it's an array we need to
153
         * pass it to os_pack_array_content.
154
         */
155
0
        else if (v->type == MSGPACK_OBJECT_ARRAY) {
156
0
          msgpack_pack_array(tmp_pck, v->via.array.size);
157
0
          os_pack_array_content(tmp_pck, *v, ctx);
158
0
        }
159
0
        else {
160
0
            msgpack_pack_object(tmp_pck, *v);
161
0
        }
162
0
    }
163
0
    return 0;
164
0
}
165
166
/*
167
  * Iterate through the array and sanitize elements.
168
  * Mutual recursion with os_pack_map_content.
169
  */
170
static int os_pack_array_content(msgpack_packer *tmp_pck,
171
                                 msgpack_object array,
172
                                 struct flb_opensearch *ctx)
173
0
{
174
0
    int i;
175
0
    msgpack_object *e;
176
177
0
    for (i = 0; i < array.via.array.size; i++) {
178
0
        e = &array.via.array.ptr[i];
179
0
        if (e->type == MSGPACK_OBJECT_MAP) {
180
0
            msgpack_pack_map(tmp_pck, e->via.map.size);
181
0
            os_pack_map_content(tmp_pck, *e, ctx);
182
0
        }
183
0
        else if (e->type == MSGPACK_OBJECT_ARRAY) {
184
0
            msgpack_pack_array(tmp_pck, e->via.array.size);
185
0
            os_pack_array_content(tmp_pck, *e, ctx);
186
0
        }
187
0
        else {
188
0
            msgpack_pack_object(tmp_pck, *e);
189
0
        }
190
0
    }
191
0
    return 0;
192
0
}
193
194
/*
195
 * Get _id value from incoming record.
196
 * If it successed, return the value as flb_sds_t.
197
 * If it failed, return NULL.
198
*/
199
static flb_sds_t os_get_id_value(struct flb_opensearch *ctx,
200
                                 msgpack_object *map)
201
0
{
202
0
    struct flb_ra_value *rval = NULL;
203
0
    flb_sds_t tmp_str;
204
0
    rval = flb_ra_get_value_object(ctx->ra_id_key, *map);
205
0
    if (rval == NULL) {
206
0
        flb_plg_warn(ctx->ins, "the value of %s is missing",
207
0
                     ctx->id_key);
208
0
        return NULL;
209
0
    }
210
0
    else if(rval->o.type != MSGPACK_OBJECT_STR) {
211
0
        flb_plg_warn(ctx->ins, "the value of %s is not string",
212
0
                     ctx->id_key);
213
0
        flb_ra_key_value_destroy(rval);
214
0
        return NULL;
215
0
    }
216
217
0
    tmp_str = flb_sds_create_len(rval->o.via.str.ptr,
218
0
                                 rval->o.via.str.size);
219
0
    if (tmp_str == NULL) {
220
0
        flb_plg_warn(ctx->ins, "cannot create ID string from record");
221
0
        flb_ra_key_value_destroy(rval);
222
0
        return NULL;
223
0
    }
224
0
    flb_ra_key_value_destroy(rval);
225
0
    return tmp_str;
226
0
}
227
228
static int compose_index_header(struct flb_opensearch *ctx,
229
                                int index_custom_len,
230
                                char *logstash_index, size_t logstash_index_size,
231
                                char *separator_str,
232
                                struct tm *tm)
233
0
{
234
0
    int ret;
235
0
    int len;
236
0
    char *p;
237
0
    size_t s;
238
239
    /* Compose Index header */
240
0
    if (index_custom_len > 0) {
241
0
        p = logstash_index + index_custom_len;
242
0
    } else {
243
0
        p = logstash_index + flb_sds_len(ctx->logstash_prefix);
244
0
    }
245
0
    len = p - logstash_index;
246
0
    ret = snprintf(p, logstash_index_size - len, "%s",
247
0
                   separator_str);
248
0
    if (ret > logstash_index_size - len) {
249
        /* exceed limit */
250
0
        return -1;
251
0
    }
252
0
    p += strlen(separator_str);
253
0
    len += strlen(separator_str);
254
255
0
    s = strftime(p, logstash_index_size - len,
256
0
                 ctx->logstash_dateformat, tm);
257
0
    if (s==0) {
258
        /* exceed limit */
259
0
        return -1;
260
0
    }
261
0
    p += s;
262
0
    *p++ = '\0';
263
264
0
    return 0;
265
0
}
266
267
/*
268
 * Convert the internal Fluent Bit data representation to the required
269
 * one by OpenSearch.
270
 */
271
static int opensearch_format(struct flb_config *config,
272
                             struct flb_input_instance *ins,
273
                             void *plugin_context,
274
                             void *flush_ctx,
275
                             int event_type,
276
                             const char *tag, int tag_len,
277
                             const void *data, size_t bytes,
278
                             void **out_data, size_t *out_size)
279
0
{
280
0
    int ret;
281
0
    int len;
282
0
    int map_size;
283
0
    int index_len = 0;
284
0
    int write_op_update = FLB_FALSE;
285
0
    int write_op_upsert = FLB_FALSE;
286
0
    flb_sds_t ra_index = NULL;
287
0
    size_t s = 0;
288
0
    char *index = NULL;
289
0
    char logstash_index[256];
290
0
    char time_formatted[256];
291
0
    char index_formatted[256];
292
0
    char uuid[37];
293
0
    flb_sds_t out_buf;
294
0
    flb_sds_t id_key_str = NULL;
295
0
    msgpack_object map;
296
0
    flb_sds_t bulk;
297
0
    struct tm tm;
298
0
    struct flb_time tms;
299
0
    msgpack_sbuffer tmp_sbuf;
300
0
    msgpack_packer tmp_pck;
301
0
    cfl_hash_128bits_t hash;
302
0
    unsigned char h[sizeof(cfl_hash_128bits_t)];
303
0
    int index_custom_len;
304
0
    struct flb_opensearch *ctx = plugin_context;
305
0
    flb_sds_t j_index;
306
0
    struct flb_log_event_decoder log_decoder;
307
0
    struct flb_log_event log_event;
308
309
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
310
311
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
312
0
        flb_plg_error(ctx->ins,
313
0
                      "Log event decoder initialization error : %d", ret);
314
315
0
        return -1;
316
0
    }
317
318
0
    j_index = flb_sds_create_size(FLB_OS_HEADER_SIZE);
319
0
    if (j_index == NULL) {
320
0
        flb_log_event_decoder_destroy(&log_decoder);
321
322
0
        return -1;
323
0
    }
324
325
0
    bulk = flb_sds_create_size(bytes * 2);
326
0
    if (!bulk) {
327
0
        flb_log_event_decoder_destroy(&log_decoder);
328
0
        flb_sds_destroy(j_index);
329
330
0
        return -1;
331
0
    }
332
333
    /* Copy logstash prefix if logstash format is enabled */
334
0
    if (ctx->logstash_format == FLB_TRUE) {
335
0
        strncpy(logstash_index, ctx->logstash_prefix, sizeof(logstash_index));
336
0
        logstash_index[sizeof(logstash_index) - 1] = '\0';
337
0
    }
338
339
    /*
340
     * If logstash format and id generation are disabled, pre-generate
341
     * the index line for all records.
342
     *
343
     * The header stored in 'j_index' will be used for the all records on
344
     * this payload.
345
     */
346
0
    if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) {
347
0
        flb_time_get(&tms);
348
0
        gmtime_r(&tms.tm.tv_sec, &tm);
349
0
        strftime(index_formatted, sizeof(index_formatted) - 1,
350
0
                 ctx->index, &tm);
351
0
        index = index_formatted;
352
0
        if (ctx->suppress_type_name) {
353
0
            index_len = flb_sds_snprintf(&j_index,
354
0
                                         flb_sds_alloc(j_index),
355
0
                                         OS_BULK_INDEX_FMT_NO_TYPE,
356
0
                                         ctx->action,
357
0
                                         index);
358
0
        }
359
0
        else {
360
0
            index_len = flb_sds_snprintf(&j_index,
361
0
                                         flb_sds_alloc(j_index),
362
0
                                         OS_BULK_INDEX_FMT,
363
0
                                         ctx->action,
364
0
                                         index, ctx->type);
365
0
        }
366
367
0
        if (index_len == -1) {
368
0
            flb_log_event_decoder_destroy(&log_decoder);
369
0
            flb_sds_destroy(bulk);
370
0
            flb_sds_destroy(j_index);
371
0
            return -1;
372
0
        }
373
0
    }
374
375
    /*
376
     * Some broken clients may have time drift up to year 1970
377
     * this will generate corresponding index in OpenSearch
378
     * in order to prevent generating millions of indexes
379
     * we can set to always use current time for index generation
380
     */
381
0
    if (ctx->current_time_index == FLB_TRUE) {
382
0
        flb_time_get(&tms);
383
0
    }
384
385
0
    while ((ret = flb_log_event_decoder_next(
386
0
                    &log_decoder,
387
0
                    &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
388
        /* Only pop time from record if current_time_index is disabled */
389
0
        if (!ctx->current_time_index) {
390
0
            flb_time_copy(&tms, &log_event.timestamp);
391
0
        }
392
393
0
        map      = *log_event.body;
394
0
        map_size = map.via.map.size;
395
396
0
        index_custom_len = 0;
397
0
        if (ctx->logstash_prefix_key) {
398
0
            flb_sds_t v = flb_ra_translate(ctx->ra_prefix_key,
399
0
                                           (char *) tag, tag_len,
400
0
                                           map, NULL);
401
0
            if (v) {
402
0
                len = flb_sds_len(v);
403
0
                if (len > 128) {
404
0
                    len = 128;
405
0
                    memcpy(logstash_index, v, 128);
406
0
                }
407
0
                else {
408
0
                    memcpy(logstash_index, v, len);
409
0
                }
410
411
0
                index_custom_len = len;
412
0
                flb_sds_destroy(v);
413
0
            }
414
0
        }
415
416
        /* Create temporary msgpack buffer */
417
0
        msgpack_sbuffer_init(&tmp_sbuf);
418
0
        msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
419
420
0
        if (ctx->include_tag_key) {
421
0
            map_size++;
422
0
        }
423
424
        /* Set the new map size */
425
0
        msgpack_pack_map(&tmp_pck, map_size + 1);
426
427
        /* Append the time key */
428
0
        msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key));
429
0
        msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
430
431
        /* Format the time */
432
0
        gmtime_r(&tms.tm.tv_sec, &tm);
433
0
        s = strftime(time_formatted, sizeof(time_formatted) - 1,
434
0
                     ctx->time_key_format, &tm);
435
0
        if (ctx->time_key_nanos) {
436
0
            len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
437
0
                           ".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
438
0
        } else {
439
0
            len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
440
0
                           ".%03" PRIu64 "Z",
441
0
                           (uint64_t) tms.tm.tv_nsec / 1000000);
442
0
        }
443
444
0
        s += len;
445
0
        msgpack_pack_str(&tmp_pck, s);
446
0
        msgpack_pack_str_body(&tmp_pck, time_formatted, s);
447
448
0
        index = ctx->index;
449
0
        if (ctx->logstash_format == FLB_TRUE) {
450
0
            ret = compose_index_header(ctx, index_custom_len,
451
0
                                       &logstash_index[0], sizeof(logstash_index),
452
0
                                       ctx->logstash_prefix_separator, &tm);
453
0
            if (ret < 0) {
454
                /* retry with default separator */
455
0
                compose_index_header(ctx, index_custom_len,
456
0
                                     &logstash_index[0], sizeof(logstash_index),
457
0
                                     "-", &tm);
458
0
            }
459
0
            index = logstash_index;
460
0
            if (ctx->generate_id == FLB_FALSE) {
461
0
                if (ctx->suppress_type_name) {
462
0
                    index_len = flb_sds_snprintf(&j_index,
463
0
                                                 flb_sds_alloc(j_index),
464
0
                                                 OS_BULK_INDEX_FMT_NO_TYPE,
465
0
                                                 ctx->action,
466
0
                                                 index);
467
0
                }
468
0
                else {
469
0
                    index_len = flb_sds_snprintf(&j_index,
470
0
                                                 flb_sds_alloc(j_index),
471
0
                                                 OS_BULK_INDEX_FMT,
472
0
                                                 ctx->action,
473
0
                                                 index, ctx->type);
474
0
                }
475
0
            }
476
0
        }
477
0
        else if (ctx->current_time_index == FLB_TRUE) {
478
            /* Make sure we handle index time format for index */
479
0
            strftime(index_formatted, sizeof(index_formatted) - 1,
480
0
                     ctx->index, &tm);
481
0
            index = index_formatted;
482
0
        }
483
0
        else if (ctx->ra_index) {
484
            // free any previous ra_index to avoid memory leaks.
485
0
            if (ra_index != NULL) {
486
0
                flb_sds_destroy(ra_index);
487
0
            }
488
            /* a record accessor pattern exists for the index */
489
0
            ra_index = flb_ra_translate(ctx->ra_index,
490
0
                                           (char *) tag, tag_len,
491
0
                                           map, NULL);
492
0
            if (!ra_index) {
493
0
                flb_plg_warn(ctx->ins, "invalid index translation from record accessor pattern, default to static index");
494
0
            }
495
0
            else {
496
0
                index = ra_index;
497
0
            }
498
499
0
            if (ctx->suppress_type_name) {
500
0
                index_len = flb_sds_snprintf(&j_index,
501
0
                                             flb_sds_alloc(j_index),
502
0
                                             OS_BULK_INDEX_FMT_NO_TYPE,
503
0
                                             ctx->action,
504
0
                                             index);
505
0
            }
506
0
            else {
507
0
                index_len = flb_sds_snprintf(&j_index,
508
0
                                             flb_sds_alloc(j_index),
509
0
                                             OS_BULK_INDEX_FMT,
510
0
                                             ctx->action,
511
0
                                             index, ctx->type);
512
0
            }
513
0
        }
514
515
        /* Tag Key */
516
0
        if (ctx->include_tag_key == FLB_TRUE) {
517
0
            msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key));
518
0
            msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
519
0
            msgpack_pack_str(&tmp_pck, tag_len);
520
0
            msgpack_pack_str_body(&tmp_pck, tag, tag_len);
521
0
        }
522
523
        /*
524
         * The map_content routine iterate over each Key/Value pair found in
525
         * the map and do some sanitization for the key names.
526
         *
527
         * There is a restriction that key names cannot contain a dot; if some
528
         * dot is found, it's replaced with an underscore.
529
         */
530
0
        ret = os_pack_map_content(&tmp_pck, map, ctx);
531
0
        if (ret == -1) {
532
0
            flb_log_event_decoder_destroy(&log_decoder);
533
0
            msgpack_sbuffer_destroy(&tmp_sbuf);
534
0
            flb_sds_destroy(bulk);
535
0
            flb_sds_destroy(j_index);
536
0
            if (ra_index != NULL) {
537
0
                flb_sds_destroy(ra_index);
538
0
            }
539
0
            return -1;
540
0
        }
541
542
0
        if (ctx->generate_id == FLB_TRUE) {
543
            /* use a 128 bit hash and copy it to a buffer */
544
0
            hash = cfl_hash_128bits(tmp_sbuf.data, tmp_sbuf.size);
545
0
            memcpy(h, &hash, sizeof(hash));
546
0
            snprintf(uuid, sizeof(uuid),
547
0
                     "%02X%02X%02X%02X-%02X%02X-%02X%02X-"
548
0
                     "%02X%02X-%02X%02X%02X%02X%02X%02X",
549
0
                     h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7],
550
0
                     h[8], h[9], h[10], h[11], h[12], h[13], h[14], h[15]);
551
552
0
            if (ctx->suppress_type_name) {
553
0
                index_len = flb_sds_snprintf(&j_index,
554
0
                                             flb_sds_alloc(j_index),
555
0
                                             OS_BULK_INDEX_FMT_ID_NO_TYPE,
556
0
                                             ctx->action,
557
0
                                             index, uuid);
558
0
            }
559
0
            else {
560
0
                index_len = flb_sds_snprintf(&j_index,
561
0
                                             flb_sds_alloc(j_index),
562
0
                                             OS_BULK_INDEX_FMT_ID,
563
0
                                             ctx->action,
564
0
                                             index, ctx->type, uuid);
565
0
            }
566
0
        }
567
0
        if (ctx->ra_id_key) {
568
0
            id_key_str = os_get_id_value(ctx ,&map);
569
0
            if (id_key_str) {
570
0
                if (ctx->suppress_type_name) {
571
0
                    index_len = flb_sds_snprintf(&j_index,
572
0
                                                 flb_sds_alloc(j_index),
573
0
                                                 OS_BULK_INDEX_FMT_ID_NO_TYPE,
574
0
                                                 ctx->action,
575
0
                                                 index,  id_key_str);
576
0
                }
577
0
                else {
578
0
                    index_len = flb_sds_snprintf(&j_index,
579
0
                                                 flb_sds_alloc(j_index),
580
0
                                                 OS_BULK_INDEX_FMT_ID,
581
0
                                                 ctx->action,
582
0
                                                 index, ctx->type, id_key_str);
583
0
                }
584
0
                flb_sds_destroy(id_key_str);
585
0
                id_key_str = NULL;
586
0
            }
587
0
        }
588
589
        /* Convert msgpack to JSON */
590
0
        out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size,
591
0
                                              config->json_escape_unicode);
592
0
        msgpack_sbuffer_destroy(&tmp_sbuf);
593
0
        if (!out_buf) {
594
0
            flb_log_event_decoder_destroy(&log_decoder);
595
0
            flb_sds_destroy(bulk);
596
0
            flb_sds_destroy(j_index);
597
0
            if (ra_index != NULL) {
598
0
                flb_sds_destroy(ra_index);
599
0
            }
600
0
            return -1;
601
0
        }
602
603
0
        ret = flb_sds_cat_safe(&bulk, j_index, flb_sds_len(j_index));
604
0
        if (ret == -1) {
605
0
            flb_log_event_decoder_destroy(&log_decoder);
606
0
            *out_size = 0;
607
0
            flb_sds_destroy(bulk);
608
0
            flb_sds_destroy(j_index);
609
0
            flb_sds_destroy(out_buf);
610
0
            if (ra_index != NULL) {
611
0
                flb_sds_destroy(ra_index);
612
0
            }
613
0
            return -1;
614
0
        }
615
616
0
        if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0) {
617
0
            write_op_update = FLB_TRUE;
618
0
        }
619
0
        else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
620
0
            write_op_upsert = FLB_TRUE;
621
0
        }
622
623
        /* UPDATE | UPSERT */
624
0
        if (write_op_update) {
625
0
            flb_sds_cat_safe(&bulk,
626
0
                             OS_BULK_UPDATE_OP_BODY,
627
0
                             sizeof(OS_BULK_UPDATE_OP_BODY) - 1);
628
0
        }
629
0
        else if (write_op_upsert) {
630
0
            flb_sds_cat_safe(&bulk,
631
0
                             OS_BULK_UPSERT_OP_BODY,
632
0
                             sizeof(OS_BULK_UPSERT_OP_BODY) - 1);
633
0
        }
634
635
0
        ret = flb_sds_cat_safe(&bulk, out_buf, flb_sds_len(out_buf));
636
0
        if (ret == -1) {
637
0
            flb_log_event_decoder_destroy(&log_decoder);
638
0
            *out_size = 0;
639
0
            flb_sds_destroy(bulk);
640
0
            flb_sds_destroy(j_index);
641
0
            flb_sds_destroy(out_buf);
642
0
            if (ra_index != NULL) {
643
0
                flb_sds_destroy(ra_index);
644
0
            }
645
0
            return -1;
646
0
        }
647
648
        /* finish UPDATE | UPSERT */
649
0
        if (write_op_update || write_op_upsert) {
650
0
            flb_sds_cat_safe(&bulk, "}", 1);
651
0
        }
652
653
0
        flb_sds_cat_safe(&bulk, "\n", 1);
654
0
        flb_sds_destroy(out_buf);
655
0
    }
656
657
0
    flb_log_event_decoder_destroy(&log_decoder);
658
659
    /* Set outgoing data */
660
0
    *out_data = bulk;
661
0
    *out_size = flb_sds_len(bulk);
662
663
0
    if (ra_index != NULL) {
664
0
        flb_sds_destroy(ra_index);
665
0
    }
666
    /*
667
     * Note: we don't destroy the bulk as we need to keep the allocated
668
     * buffer with the data. Instead we just release the bulk context and
669
     * return the bulk->ptr buffer
670
     */
671
0
    if (ctx->trace_output) {
672
0
        fwrite(*out_data, 1, *out_size, stdout);
673
0
        fflush(stdout);
674
0
    }
675
0
    flb_sds_destroy(j_index);
676
0
    return 0;
677
0
}
678
679
static int cb_opensearch_init(struct flb_output_instance *ins,
680
                              struct flb_config *config,
681
                              void *data)
682
0
{
683
0
    struct flb_opensearch *ctx;
684
685
0
    ctx = flb_os_conf_create(ins, config);
686
0
    if (!ctx) {
687
0
        flb_plg_error(ins, "cannot initialize plugin");
688
0
        return -1;
689
0
    }
690
691
0
    if (ctx->index == NULL && ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
692
0
        flb_plg_error(ins, "cannot initialize plugin, index is not set and logstash_format and generate_id are both off");
693
0
        return -1;
694
0
    }
695
696
0
    flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s",
697
0
                  ins->host.name, ins->host.port, ctx->uri,
698
0
                  ctx->index, ctx->type);
699
700
0
    flb_output_set_context(ins, ctx);
701
702
    /*
703
     * This plugin instance uses the HTTP client interface, let's register
704
     * it debugging callbacks.
705
     */
706
0
    flb_output_set_http_debug_callbacks(ins);
707
708
0
    return 0;
709
0
}
710
711
static int opensearch_error_check(struct flb_opensearch *ctx,
712
                                  struct flb_http_client *c)
713
0
{
714
0
    int i, j, k;
715
0
    int ret;
716
0
    int check = FLB_FALSE;
717
0
    int root_type;
718
0
    char *out_buf;
719
0
    size_t off = 0;
720
0
    size_t out_size;
721
0
    msgpack_unpacked result;
722
0
    msgpack_object root;
723
0
    msgpack_object key;
724
0
    msgpack_object val;
725
0
    msgpack_object item;
726
0
    msgpack_object item_key;
727
0
    msgpack_object item_val;
728
729
    /*
730
     * Check if our payload is complete: there is such situations where
731
     * the OpenSearch HTTP response body is bigger than the HTTP client
732
     * buffer so payload can be incomplete.
733
     */
734
    /* Convert JSON payload to msgpack */
735
0
    ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
736
0
                        &out_buf, &out_size, &root_type, NULL);
737
0
    if (ret == -1) {
738
        /* Is this an incomplete HTTP Request ? */
739
0
        if (c->resp.payload_size <= 0) {
740
0
            return FLB_TRUE;
741
0
        }
742
743
        /* Lookup error field */
744
0
        if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
745
0
            return FLB_FALSE;
746
0
        }
747
748
0
        flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
749
0
                      c->resp.payload);
750
0
        return FLB_TRUE;
751
0
    }
752
753
    /* Lookup error field */
754
0
    msgpack_unpacked_init(&result);
755
0
    ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
756
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
757
0
        flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
758
0
                      c->resp.payload);
759
0
        return FLB_TRUE;
760
0
    }
761
762
0
    root = result.data;
763
0
    if (root.type != MSGPACK_OBJECT_MAP) {
764
0
        flb_plg_error(ctx->ins, "unexpected payload type=%i",
765
0
                      root.type);
766
0
        check = FLB_TRUE;
767
0
        goto done;
768
0
    }
769
770
0
    for (i = 0; i < root.via.map.size; i++) {
771
0
        key = root.via.map.ptr[i].key;
772
0
        if (key.type != MSGPACK_OBJECT_STR) {
773
0
            flb_plg_error(ctx->ins, "unexpected key type=%i",
774
0
                          key.type);
775
0
            check = FLB_TRUE;
776
0
            goto done;
777
0
        }
778
779
0
        if (key.via.str.size == 6 && strncmp(key.via.str.ptr, "errors", 6) == 0) {
780
0
            val = root.via.map.ptr[i].val;
781
0
            if (val.type != MSGPACK_OBJECT_BOOLEAN) {
782
0
                flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
783
0
                              val.type);
784
0
                check = FLB_TRUE;
785
0
                goto done;
786
0
            }
787
788
            /* If error == false, we are OK (no errors = FLB_FALSE) */
789
0
            if (!val.via.boolean) {
790
                /* no errors */
791
0
                check = FLB_FALSE;
792
0
                goto done;
793
0
            }
794
0
        }
795
0
        else if (key.via.str.size == 5 && strncmp(key.via.str.ptr, "items", 5) == 0) {
796
0
            val = root.via.map.ptr[i].val;
797
0
            if (val.type != MSGPACK_OBJECT_ARRAY) {
798
0
                flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
799
0
                              val.type);
800
0
                check = FLB_TRUE;
801
0
                goto done;
802
0
            }
803
804
0
            for (j = 0; j < val.via.array.size; j++) {
805
0
                item = val.via.array.ptr[j];
806
0
                if (item.type != MSGPACK_OBJECT_MAP) {
807
0
                    flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
808
0
                                  item.type);
809
0
                    check = FLB_TRUE;
810
0
                    goto done;
811
0
                }
812
813
0
                if (item.via.map.size != 1) {
814
0
                    flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
815
0
                                  item.via.map.size);
816
0
                    check = FLB_TRUE;
817
0
                    goto done;
818
0
                }
819
820
0
                item = item.via.map.ptr[0].val;
821
0
                if (item.type != MSGPACK_OBJECT_MAP) {
822
0
                    flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
823
0
                                  item.type);
824
0
                    check = FLB_TRUE;
825
0
                    goto done;
826
0
                }
827
828
0
                for (k = 0; k < item.via.map.size; k++) {
829
0
                    item_key = item.via.map.ptr[k].key;
830
0
                    if (item_key.type != MSGPACK_OBJECT_STR) {
831
0
                        flb_plg_error(ctx->ins, "unexpected key type=%i",
832
0
                                      item_key.type);
833
0
                        check = FLB_TRUE;
834
0
                        goto done;
835
0
                    }
836
837
0
                    if (item_key.via.str.size == 6 && strncmp(item_key.via.str.ptr, "status", 6) == 0) {
838
0
                        item_val = item.via.map.ptr[k].val;
839
840
0
                        if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
841
0
                            flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
842
0
                                          item_val.type);
843
0
                            check = FLB_TRUE;
844
0
                            goto done;
845
0
                        }
846
                        /* Check for errors other than version conflict (document already exists) */
847
0
                        if (item_val.via.i64 != 409) {
848
0
                            check = FLB_TRUE;
849
0
                            goto done;
850
0
                        }
851
0
                    }
852
0
                }
853
0
            }
854
0
        }
855
0
    }
856
857
0
 done:
858
0
    flb_free(out_buf);
859
0
    msgpack_unpacked_destroy(&result);
860
0
    return check;
861
0
}
862
863
static void cb_opensearch_flush(struct flb_event_chunk *event_chunk,
864
                                struct flb_output_flush *out_flush,
865
                                struct flb_input_instance *ins, void *out_context,
866
                                struct flb_config *config)
867
0
{
868
0
    int ret = -1;
869
0
    size_t pack_size;
870
0
    flb_sds_t pack;
871
0
    void *out_buf;
872
0
    size_t out_size;
873
0
    size_t b_sent;
874
0
    struct flb_opensearch *ctx = out_context;
875
0
    struct flb_connection *u_conn;
876
0
    struct flb_http_client *c;
877
0
    flb_sds_t signature = NULL;
878
0
    int compressed = FLB_FALSE;
879
0
    void *final_payload_buf = NULL;
880
0
    size_t final_payload_size = 0;
881
882
    /* Get upstream connection */
883
0
    u_conn = flb_upstream_conn_get(ctx->u);
884
0
    if (!u_conn) {
885
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
886
0
    }
887
888
    /* Convert format */
889
0
    if (event_chunk->type == FLB_EVENT_TYPE_TRACES) {
890
0
        pack = flb_msgpack_raw_to_json_sds(event_chunk->data, event_chunk->size,
891
0
                                           config->json_escape_unicode);
892
0
        if (pack) {
893
0
            ret = 0;
894
895
0
            out_buf = (void *) pack;
896
0
            out_size = cfl_sds_len(pack);
897
0
        }
898
0
        else {
899
0
            ret = -1;
900
0
        }
901
0
    }
902
0
    else if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
903
0
        ret = opensearch_format(config, ins,
904
0
                                   ctx, NULL,
905
0
                                   event_chunk->type,
906
0
                                   event_chunk->tag, flb_sds_len(event_chunk->tag),
907
0
                                   event_chunk->data, event_chunk->size,
908
0
                                   &out_buf, &out_size);
909
0
    }
910
911
0
    if (ret != 0) {
912
0
        flb_upstream_conn_release(u_conn);
913
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
914
0
    }
915
916
0
    pack = (char *) out_buf;
917
0
    pack_size = out_size;
918
919
0
    final_payload_buf = pack;
920
0
    final_payload_size = pack_size;
921
    /* Should we compress the payload ? */
922
0
    if (ctx->compression == FLB_OS_COMPRESSION_GZIP) {
923
0
        ret = flb_gzip_compress((void *) pack, pack_size,
924
0
                                &out_buf, &out_size);
925
0
        if (ret == -1) {
926
0
            flb_plg_error(ctx->ins,
927
0
                          "cannot gzip payload, disabling compression");
928
0
        }
929
0
        else {
930
0
            compressed = FLB_TRUE;
931
0
            final_payload_buf = out_buf;
932
0
            final_payload_size = out_size;
933
0
        }
934
0
    }
935
936
    /* Compose HTTP Client request */
937
0
    c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
938
0
                        final_payload_buf, final_payload_size, NULL, 0, NULL, 0);
939
940
0
    flb_http_buffer_size(c, ctx->buffer_size);
941
942
#ifndef FLB_HAVE_AWS
943
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
944
#endif
945
946
0
    flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);
947
948
0
    if (ctx->http_user && ctx->http_passwd) {
949
0
        flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
950
0
    }
951
952
0
#ifdef FLB_HAVE_AWS
953
0
    if (ctx->has_aws_auth == FLB_TRUE) {
954
0
        signature = add_aws_auth(c, ctx);
955
0
        if (!signature) {
956
0
            goto retry;
957
0
        }
958
0
    }
959
0
    else {
960
0
        flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
961
0
    }
962
0
#endif
963
964
    /* Set Content-Encoding of compressed payload */
965
0
    if (compressed == FLB_TRUE) {
966
0
        if (ctx->compression == FLB_OS_COMPRESSION_GZIP) {
967
0
            flb_http_set_content_encoding_gzip(c);
968
0
        }
969
0
    }
970
971
    /* Map debug callbacks */
972
0
    flb_http_client_debug(c, ctx->ins->callback);
973
974
0
    ret = flb_http_do(c, &b_sent);
975
0
    if (ret != 0) {
976
0
        flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri);
977
0
        if (signature) {
978
0
            flb_sds_destroy(signature);
979
0
            signature = NULL;
980
0
        }
981
0
        goto retry;
982
0
    }
983
0
    else {
984
        /* The request was issued successfully, validate the 'error' field */
985
0
        flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
986
0
        if (c->resp.status != 200 && c->resp.status != 201) {
987
0
            if (c->resp.payload_size > 0) {
988
0
                flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n",
989
0
                              c->resp.status, ctx->uri, c->resp.payload);
990
0
            }
991
0
            else {
992
0
                flb_plg_error(ctx->ins, "HTTP status=%i URI=%s",
993
0
                              c->resp.status, ctx->uri);
994
0
            }
995
0
            if (signature) {
996
0
                flb_sds_destroy(signature);
997
0
                signature = NULL;
998
0
            }
999
0
            goto retry;
1000
0
        }
1001
1002
0
        if (c->resp.payload_size > 0) {
1003
            /*
1004
             * OpenSearch payload should be JSON, we convert it to msgpack
1005
             * and lookup the 'error' field.
1006
             */
1007
0
            ret = opensearch_error_check(ctx, c);
1008
0
            if (ret == FLB_TRUE) {
1009
                /* we got an error */
1010
0
                if (ctx->trace_error) {
1011
                    /*
1012
                     * If trace_error is set, trace the actual
1013
                     * response from Elasticsearch explaining the problem.
1014
                     * Trace_Output can be used to see the request.
1015
                     */
1016
0
                    if (pack_size < 4000) {
1017
0
                        flb_plg_debug(ctx->ins, "error caused by: Input\n%.*s\n",
1018
0
                                      (int) pack_size, pack);
1019
0
                    }
1020
0
                    if (c->resp.payload_size < 4000) {
1021
0
                        flb_plg_error(ctx->ins, "error: Output\n%s",
1022
0
                                      c->resp.payload);
1023
0
                    } else {
1024
                        /*
1025
                        * We must use fwrite since the flb_log functions
1026
                        * will truncate data at 4KB
1027
                        */
1028
0
                        fwrite(c->resp.payload, 1, c->resp.payload_size, stderr);
1029
0
                        fflush(stderr);
1030
0
                    }
1031
0
                }
1032
0
                if (signature) {
1033
0
                    flb_sds_destroy(signature);
1034
0
                    signature = NULL;
1035
0
                }
1036
0
                goto retry;
1037
0
            }
1038
0
            else {
1039
0
                flb_plg_debug(ctx->ins, "OpenSearch response\n%s",
1040
0
                              c->resp.payload);
1041
0
            }
1042
0
        }
1043
0
        else {
1044
0
            if (signature) {
1045
0
                flb_sds_destroy(signature);
1046
0
                signature = NULL;
1047
0
            }
1048
0
            goto retry;
1049
0
        }
1050
0
    }
1051
1052
    /* Cleanup */
1053
0
    flb_http_client_destroy(c);
1054
1055
0
    if (final_payload_buf != pack) {
1056
0
        flb_free(final_payload_buf);
1057
0
    }
1058
0
    flb_sds_destroy(pack);
1059
1060
0
    flb_upstream_conn_release(u_conn);
1061
0
    if (signature) {
1062
0
        flb_sds_destroy(signature);
1063
0
    }
1064
0
    FLB_OUTPUT_RETURN(FLB_OK);
1065
1066
    /* Issue a retry */
1067
0
 retry:
1068
0
    flb_http_client_destroy(c);
1069
0
    flb_sds_destroy(pack);
1070
1071
0
    if (final_payload_buf != pack) {
1072
0
        flb_free(final_payload_buf);
1073
0
    }
1074
1075
0
    flb_upstream_conn_release(u_conn);
1076
0
    FLB_OUTPUT_RETURN(FLB_RETRY);
1077
0
}
1078
1079
static int cb_opensearch_exit(void *data, struct flb_config *config)
1080
0
{
1081
0
    struct flb_opensearch *ctx = data;
1082
1083
0
    flb_os_conf_destroy(ctx);
1084
0
    return 0;
1085
0
}
1086
1087
/* Configuration properties map */
1088
static struct flb_config_map config_map[] = {
1089
    {
1090
     FLB_CONFIG_MAP_STR, "index", FLB_OS_DEFAULT_INDEX,
1091
     0, FLB_TRUE, offsetof(struct flb_opensearch, index),
1092
     "Set an index name"
1093
    },
1094
    {
1095
     FLB_CONFIG_MAP_STR, "type", FLB_OS_DEFAULT_TYPE,
1096
     0, FLB_TRUE, offsetof(struct flb_opensearch, type),
1097
     "Set the document type property"
1098
    },
1099
    {
1100
     FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
1101
     0, FLB_TRUE, offsetof(struct flb_opensearch, suppress_type_name),
1102
     "If true, mapping types is removed. (for v7.0.0 or later)"
1103
    },
1104
1105
    /* HTTP Authentication */
1106
    {
1107
     FLB_CONFIG_MAP_STR, "http_user", NULL,
1108
     0, FLB_TRUE, offsetof(struct flb_opensearch, http_user),
1109
     "Optional username credential for access"
1110
    },
1111
    {
1112
     FLB_CONFIG_MAP_STR, "http_passwd", "",
1113
     0, FLB_TRUE, offsetof(struct flb_opensearch, http_passwd),
1114
     "Password for user defined in 'http_user'"
1115
    },
1116
1117
    /* AWS Authentication */
1118
#ifdef FLB_HAVE_AWS
1119
    {
1120
     FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
1121
     0, FLB_TRUE, offsetof(struct flb_opensearch, has_aws_auth),
1122
     "Enable AWS Sigv4 Authentication"
1123
    },
1124
    {
1125
     FLB_CONFIG_MAP_STR, "aws_region", NULL,
1126
     0, FLB_TRUE, offsetof(struct flb_opensearch, aws_region),
1127
     "AWS Region of your Amazon OpenSearch Service cluster"
1128
    },
1129
    {
1130
     FLB_CONFIG_MAP_STR, "aws_profile", "default",
1131
     0, FLB_TRUE, offsetof(struct flb_opensearch, aws_profile),
1132
     "AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
1133
     "$HOME/.aws/ directory."
1134
    },
1135
    {
1136
     FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
1137
     0, FLB_TRUE, offsetof(struct flb_opensearch, aws_sts_endpoint),
1138
     "Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
1139
    },
1140
    {
1141
     FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
1142
     0, FLB_FALSE, 0,
1143
     "AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
1144
    },
1145
    {
1146
     FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
1147
     0, FLB_FALSE, 0,
1148
     "External ID for the AWS IAM Role specified with `aws_role_arn`"
1149
    },
1150
    {
1151
     FLB_CONFIG_MAP_STR, "aws_service_name", "es",
1152
     0, FLB_TRUE, offsetof(struct flb_opensearch, aws_service_name),
1153
     "AWS Service Name"
1154
    },
1155
#endif
1156
1157
    /* Logstash compatibility */
1158
    {
1159
     FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
1160
     0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_format),
1161
     "Enable Logstash format compatibility"
1162
    },
1163
    {
1164
     FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_OS_DEFAULT_PREFIX,
1165
     0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix),
1166
     "When Logstash_Format is enabled, the Index name is composed using a prefix "
1167
     "and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
1168
     "become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
1169
     "when the data is being generated"
1170
    },
1171
    {
1172
     FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
1173
     0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix_separator),
1174
     "Set a separator between logstash_prefix and date."
1175
    },
1176
    {
1177
     FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
1178
     0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_prefix_key),
1179
     "When included: the value in the record that belongs to the key will be looked "
1180
     "up and over-write the Logstash_Prefix for index generation. If the key/value "
1181
     "is not found in the record then the Logstash_Prefix option will act as a "
1182
     "fallback. Nested keys are supported through record accessor pattern"
1183
    },
1184
    {
1185
     FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_OS_DEFAULT_TIME_FMT,
1186
     0, FLB_TRUE, offsetof(struct flb_opensearch, logstash_dateformat),
1187
     "Time format (based on strftime) to generate the second part of the Index name"
1188
    },
1189
1190
    /* Custom Time and Tag keys */
1191
    {
1192
     FLB_CONFIG_MAP_STR, "time_key", FLB_OS_DEFAULT_TIME_KEY,
1193
     0, FLB_TRUE, offsetof(struct flb_opensearch, time_key),
1194
     "When Logstash_Format is enabled, each record will get a new timestamp field. "
1195
     "The Time_Key property defines the name of that field"
1196
    },
1197
    {
1198
     FLB_CONFIG_MAP_STR, "time_key_format", FLB_OS_DEFAULT_TIME_KEYF,
1199
     0, FLB_TRUE, offsetof(struct flb_opensearch, time_key_format),
1200
     "When Logstash_Format is enabled, this property defines the format of the "
1201
     "timestamp"
1202
    },
1203
    {
1204
     FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
1205
     0, FLB_TRUE, offsetof(struct flb_opensearch, time_key_nanos),
1206
     "When Logstash_Format is enabled, enabling this property sends nanosecond "
1207
     "precision timestamps"
1208
    },
1209
    {
1210
     FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
1211
     0, FLB_TRUE, offsetof(struct flb_opensearch, include_tag_key),
1212
     "When enabled, it append the Tag name to the record"
1213
    },
1214
    {
1215
     FLB_CONFIG_MAP_STR, "tag_key", FLB_OS_DEFAULT_TAG_KEY,
1216
     0, FLB_TRUE, offsetof(struct flb_opensearch, tag_key),
1217
     "When Include_Tag_Key is enabled, this property defines the key name for the tag"
1218
    },
1219
    {
1220
     FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_OS_DEFAULT_HTTP_MAX,
1221
     0, FLB_TRUE, offsetof(struct flb_opensearch, buffer_size),
1222
     "Specify the buffer size used to read the response from the OpenSearch HTTP "
1223
     "service. This option is useful for debugging purposes where is required to read "
1224
     "full responses, note that response size grows depending of the number of records "
1225
     "inserted. To set an unlimited amount of memory set this value to 'false', "
1226
     "otherwise the value must be according to the Unit Size specification"
1227
    },
1228
1229
    /* OpenSearch specifics */
1230
    {
1231
     FLB_CONFIG_MAP_STR, "path", NULL,
1232
     0, FLB_FALSE, 0,
1233
     "OpenSearch accepts new data on HTTP query path '/_bulk'. But it is also "
1234
     "possible to serve OpenSearch behind a reverse proxy on a subpath. This "
1235
     "option defines such path on the fluent-bit side. It simply adds a path "
1236
     "prefix in the indexing HTTP POST URI"
1237
    },
1238
    {
1239
     FLB_CONFIG_MAP_STR, "pipeline", NULL,
1240
     0, FLB_FALSE, 0,
1241
     "OpenSearch allows to setup filters called pipelines. "
1242
     "This option allows to define which pipeline the database should use. For "
1243
     "performance reasons is strongly suggested to do parsing and filtering on "
1244
     "Fluent Bit side, avoid pipelines"
1245
    },
1246
    {
1247
     FLB_CONFIG_MAP_BOOL, "generate_id", "false",
1248
     0, FLB_TRUE, offsetof(struct flb_opensearch, generate_id),
1249
     "When enabled, generate _id for outgoing records. This prevents duplicate "
1250
     "records when retrying"
1251
    },
1252
    {
1253
     FLB_CONFIG_MAP_STR, "write_operation", "create",
1254
     0, FLB_TRUE, offsetof(struct flb_opensearch, write_operation),
1255
     "Operation to use to write in bulk requests"
1256
    },
1257
    {
1258
     FLB_CONFIG_MAP_STR, "id_key", NULL,
1259
     0, FLB_TRUE, offsetof(struct flb_opensearch, id_key),
1260
     "If set, _id will be the value of the key from incoming record."
1261
    },
1262
    {
1263
     FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
1264
     0, FLB_TRUE, offsetof(struct flb_opensearch, replace_dots),
1265
     "When enabled, replace field name dots with underscore."
1266
    },
1267
1268
    {
1269
     FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
1270
     0, FLB_TRUE, offsetof(struct flb_opensearch, current_time_index),
1271
     "Use current time for index generation instead of message record"
1272
    },
1273
1274
    /* Trace */
1275
    {
1276
     FLB_CONFIG_MAP_BOOL, "trace_output", "false",
1277
     0, FLB_TRUE, offsetof(struct flb_opensearch, trace_output),
1278
     "When enabled print the OpenSearch API calls to stdout (for diag only)"
1279
    },
1280
    {
1281
     FLB_CONFIG_MAP_BOOL, "trace_error", "false",
1282
     0, FLB_TRUE, offsetof(struct flb_opensearch, trace_error),
1283
     "When enabled print the OpenSearch exception to stderr (for diag only)"
1284
    },
1285
1286
    /* HTTP Compression */
1287
    {
1288
     FLB_CONFIG_MAP_STR, "compress", NULL,
1289
     0, FLB_TRUE, offsetof(struct flb_opensearch, compression_str),
1290
     "Set payload compression mechanism. Option available is 'gzip'"
1291
    },
1292
1293
    /* EOF */
1294
    {0}
1295
};
1296
1297
/* Plugin reference */
1298
struct flb_output_plugin out_opensearch_plugin = {
1299
    .name           = "opensearch",
1300
    .description    = "OpenSearch",
1301
    .cb_init        = cb_opensearch_init,
1302
    .cb_pre_run     = NULL,
1303
    .cb_flush       = cb_opensearch_flush,
1304
    .cb_exit        = cb_opensearch_exit,
1305
1306
    /* Configuration */
1307
    .config_map     = config_map,
1308
1309
    /* Events supported */
1310
    .event_type   = FLB_OUTPUT_LOGS | FLB_OUTPUT_TRACES,
1311
1312
    /* Test */
1313
    .test_formatter.callback = opensearch_format,
1314
1315
    /* Plugin flags */
1316
    .flags          = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
1317
};