Coverage Report

Created: 2025-07-04 07:08

/src/fluent-bit/plugins/out_opensearch/os_conf.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_mem.h>
22
#include <fluent-bit/flb_utils.h>
23
#include <fluent-bit/flb_http_client.h>
24
#include <fluent-bit/flb_record_accessor.h>
25
#include <fluent-bit/flb_signv4.h>
26
#include <fluent-bit/flb_aws_credentials.h>
27
28
#include "opensearch.h"
29
#include "os_conf.h"
30
31
struct flb_opensearch *flb_os_conf_create(struct flb_output_instance *ins,
32
                                          struct flb_config *config)
33
0
{
34
0
    int len;
35
0
    int io_flags = 0;
36
0
    ssize_t ret;
37
0
    char *buf;
38
0
    const char *tmp;
39
0
    const char *path;
40
0
#ifdef FLB_HAVE_AWS
41
0
    char *aws_role_arn = NULL;
42
0
    char *aws_external_id = NULL;
43
0
    char *aws_session_name = NULL;
44
0
#endif
45
0
    struct flb_uri *uri = ins->host.uri;
46
0
    struct flb_uri_field *f_index = NULL;
47
0
    struct flb_uri_field *f_type = NULL;
48
0
    struct flb_upstream *upstream;
49
0
    struct flb_opensearch *ctx;
50
51
    /* Allocate context */
52
0
    ctx = flb_calloc(1, sizeof(struct flb_opensearch));
53
0
    if (!ctx) {
54
0
        flb_errno();
55
0
        return NULL;
56
0
    }
57
0
    ctx->ins = ins;
58
59
    /* only used if the config has been set from the command line */
60
0
    if (uri) {
61
0
        if (uri->count >= 2) {
62
0
            f_index = flb_uri_get(uri, 0);
63
0
            f_type  = flb_uri_get(uri, 1);
64
0
        }
65
0
    }
66
67
    /* Set default network configuration */
68
0
    flb_output_net_default("127.0.0.1", 9200, ins);
69
70
    /* Populate context with config map defaults and incoming properties */
71
0
    ret = flb_output_config_map_set(ins, (void *) ctx);
72
0
    if (ret == -1) {
73
0
        flb_plg_error(ctx->ins, "configuration error");
74
0
        flb_os_conf_destroy(ctx);
75
0
        return NULL;
76
0
    }
77
78
    /* use TLS ? */
79
0
    if (ins->use_tls == FLB_TRUE) {
80
0
        io_flags = FLB_IO_TLS;
81
0
    }
82
0
    else {
83
0
        io_flags = FLB_IO_TCP;
84
0
    }
85
86
0
    if (ins->host.ipv6 == FLB_TRUE) {
87
0
        io_flags |= FLB_IO_IPV6;
88
0
    }
89
90
    /* Prepare an upstream handler */
91
0
    upstream = flb_upstream_create(config,
92
0
                                   ins->host.name,
93
0
                                   ins->host.port,
94
0
                                   io_flags,
95
0
                                   ins->tls);
96
0
    if (!upstream) {
97
0
        flb_plg_error(ctx->ins, "cannot create Upstream context");
98
0
        flb_os_conf_destroy(ctx);
99
0
        return NULL;
100
0
    }
101
0
    ctx->u = upstream;
102
103
    /* Set instance flags into upstream */
104
0
    flb_output_upstream_set(ctx->u, ins);
105
106
    /* Set manual Index and Type */
107
0
    if (f_index) {
108
0
        ctx->index = flb_strdup(f_index->value);
109
0
    }
110
0
    else {
111
        /* Check if the index has been set in the configuration */
112
0
        if (ctx->index) {
113
            /* do we have a record accessor pattern ? */
114
0
            if (strchr(ctx->index, '$')) {
115
0
                ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE);
116
0
                if (!ctx->ra_index) {
117
0
                    flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property");
118
0
                    flb_os_conf_destroy(ctx);
119
0
                    return NULL;
120
0
                }
121
0
            }
122
0
        }
123
0
    }
124
125
0
    if (f_type) {
126
0
        ctx->type = flb_strdup(f_type->value); /* FIXME */
127
0
    }
128
129
    /* HTTP Payload (response) maximum buffer size (0 == unlimited) */
130
0
    if (ctx->buffer_size == -1) {
131
0
        ctx->buffer_size = 0;
132
0
    }
133
134
    /* Path */
135
0
    path = flb_output_get_property("path", ins);
136
0
    if (!path) {
137
0
        path = "";
138
0
    }
139
140
    /* Pipeline */
141
0
    tmp = flb_output_get_property("pipeline", ins);
142
0
    if (tmp) {
143
0
        snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp);
144
0
    }
145
0
    else {
146
0
        snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
147
0
    }
148
149
150
0
    if (ctx->id_key) {
151
0
        ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
152
0
        if (ctx->ra_id_key == NULL) {
153
0
            flb_plg_error(ins, "could not create record accessor for Id Key");
154
0
        }
155
0
        if (ctx->generate_id == FLB_TRUE) {
156
0
            flb_plg_warn(ins, "Generate_ID is ignored when ID_key is set");
157
0
            ctx->generate_id = FLB_FALSE;
158
0
        }
159
0
    }
160
161
0
    if (ctx->write_operation) {
162
0
        if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_INDEX) == 0) {
163
0
            ctx->action = FLB_OS_WRITE_OP_INDEX;
164
0
        }
165
0
        else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_CREATE) == 0) {
166
0
            ctx->action = FLB_OS_WRITE_OP_CREATE;
167
0
        }
168
0
        else if (strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPDATE) == 0
169
0
            || strcasecmp(ctx->write_operation, FLB_OS_WRITE_OP_UPSERT) == 0) {
170
0
            ctx->action = FLB_OS_WRITE_OP_UPDATE;
171
0
        }
172
0
        else {
173
0
            flb_plg_error(ins,
174
0
                          "wrong Write_Operation (should be one of index, "
175
0
                          "create, update, upsert)");
176
0
            flb_os_conf_destroy(ctx);
177
0
            return NULL;
178
0
        }
179
180
0
        if (strcasecmp(ctx->action, FLB_OS_WRITE_OP_UPDATE) == 0
181
0
            && !ctx->ra_id_key && ctx->generate_id == FLB_FALSE) {
182
0
            flb_plg_error(ins,
183
0
                          "id_key or generate_id must be set when Write_Operation "
184
0
                          "update or upsert");
185
0
            flb_os_conf_destroy(ctx);
186
0
            return NULL;
187
0
        }
188
0
    }
189
190
0
    if (ctx->logstash_prefix_key) {
191
0
        if (ctx->logstash_prefix_key[0] != '$') {
192
0
            len = flb_sds_len(ctx->logstash_prefix_key);
193
0
            buf = flb_malloc(len + 2);
194
0
            if (!buf) {
195
0
                flb_errno();
196
0
                flb_os_conf_destroy(ctx);
197
0
                return NULL;
198
0
            }
199
0
            buf[0] = '$';
200
0
            memcpy(buf + 1, ctx->logstash_prefix_key, len);
201
0
            buf[len + 1] = '\0';
202
203
0
            ctx->ra_prefix_key = flb_ra_create(buf, FLB_TRUE);
204
0
            flb_free(buf);
205
0
        }
206
0
        else {
207
0
            ctx->ra_prefix_key = flb_ra_create(ctx->logstash_prefix_key, FLB_TRUE);
208
0
        }
209
210
0
        if (!ctx->ra_prefix_key) {
211
0
            flb_plg_error(ins, "invalid logstash_prefix_key pattern '%s'", tmp);
212
0
            flb_os_conf_destroy(ctx);
213
0
            return NULL;
214
0
        }
215
0
    }
216
217
0
    if (ctx->compression_str) {
218
0
        if (strcasecmp(ctx->compression_str, "gzip") == 0) {
219
0
            ctx->compression = FLB_OS_COMPRESSION_GZIP;
220
0
        }
221
0
        else {
222
0
            ctx->compression = FLB_OS_COMPRESSION_NONE;
223
0
        }
224
0
    }
225
0
    else {
226
0
        ctx->compression = FLB_OS_COMPRESSION_NONE;
227
0
    }
228
229
0
#ifdef FLB_HAVE_AWS
230
    /* AWS Auth Unsigned Headers */
231
0
    ctx->aws_unsigned_headers = flb_malloc(sizeof(struct mk_list));
232
0
    if (!ctx->aws_unsigned_headers) {
233
0
        flb_os_conf_destroy(ctx);
234
0
        return NULL;
235
0
    }
236
0
    flb_slist_create(ctx->aws_unsigned_headers);
237
0
    ret = flb_slist_add(ctx->aws_unsigned_headers, "Content-Length");
238
0
    if (ret != 0) {
239
0
        flb_os_conf_destroy(ctx);
240
0
        return NULL;
241
0
    }
242
243
    /* AWS Auth */
244
0
    ctx->has_aws_auth = FLB_FALSE;
245
0
    tmp = flb_output_get_property("aws_auth", ins);
246
0
    if (tmp) {
247
0
        if (strncasecmp(tmp, "On", 2) == 0) {
248
0
            ctx->has_aws_auth = FLB_TRUE;
249
0
            flb_debug("[out_es] Enabled AWS Auth");
250
251
            /* AWS provider needs a separate TLS instance */
252
0
            ctx->aws_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
253
0
                                          FLB_TRUE,
254
0
                                          ins->tls_debug,
255
0
                                          ins->tls_vhost,
256
0
                                          ins->tls_ca_path,
257
0
                                          ins->tls_ca_file,
258
0
                                          ins->tls_crt_file,
259
0
                                          ins->tls_key_file,
260
0
                                          ins->tls_key_passwd);
261
0
            if (!ctx->aws_tls) {
262
0
                flb_errno();
263
0
                flb_os_conf_destroy(ctx);
264
0
                return NULL;
265
0
            }
266
267
0
            tmp = flb_output_get_property("aws_region", ins);
268
0
            if (!tmp) {
269
0
                flb_error("[out_es] aws_auth enabled but aws_region not set");
270
0
                flb_os_conf_destroy(ctx);
271
0
                return NULL;
272
0
            }
273
0
            ctx->aws_region = (char *) tmp;
274
275
0
            tmp = flb_output_get_property("aws_sts_endpoint", ins);
276
0
            if (tmp) {
277
0
                ctx->aws_sts_endpoint = (char *) tmp;
278
0
            }
279
280
0
            ctx->aws_provider = flb_standard_chain_provider_create(config,
281
0
                                                                   ctx->aws_tls,
282
0
                                                                   ctx->aws_region,
283
0
                                                                   ctx->aws_sts_endpoint,
284
0
                                                                   NULL,
285
0
                                                                   flb_aws_client_generator(),
286
0
                                                                   ctx->aws_profile);
287
0
            if (!ctx->aws_provider) {
288
0
                flb_error("[out_es] Failed to create AWS Credential Provider");
289
0
                flb_os_conf_destroy(ctx);
290
0
                return NULL;
291
0
            }
292
293
0
            tmp = flb_output_get_property("aws_role_arn", ins);
294
0
            if (tmp) {
295
                /* Use the STS Provider */
296
0
                ctx->base_aws_provider = ctx->aws_provider;
297
0
                aws_role_arn = (char *) tmp;
298
0
                aws_external_id = NULL;
299
0
                tmp = flb_output_get_property("aws_external_id", ins);
300
0
                if (tmp) {
301
0
                    aws_external_id = (char *) tmp;
302
0
                }
303
304
0
                aws_session_name = flb_sts_session_name();
305
0
                if (!aws_session_name) {
306
0
                    flb_error("[out_es] Failed to create aws iam role "
307
0
                              "session name");
308
0
                    flb_os_conf_destroy(ctx);
309
0
                    return NULL;
310
0
                }
311
312
                /* STS provider needs yet another separate TLS instance */
313
0
                ctx->aws_sts_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
314
0
                                                  FLB_TRUE,
315
0
                                                  ins->tls_debug,
316
0
                                                  ins->tls_vhost,
317
0
                                                  ins->tls_ca_path,
318
0
                                                  ins->tls_ca_file,
319
0
                                                  ins->tls_crt_file,
320
0
                                                  ins->tls_key_file,
321
0
                                                  ins->tls_key_passwd);
322
0
                if (!ctx->aws_sts_tls) {
323
0
                    flb_errno();
324
0
                    flb_os_conf_destroy(ctx);
325
0
                    return NULL;
326
0
                }
327
328
0
                ctx->aws_provider = flb_sts_provider_create(config,
329
0
                                                            ctx->aws_sts_tls,
330
0
                                                            ctx->
331
0
                                                            base_aws_provider,
332
0
                                                            aws_external_id,
333
0
                                                            aws_role_arn,
334
0
                                                            aws_session_name,
335
0
                                                            ctx->aws_region,
336
0
                                                            ctx->aws_sts_endpoint,
337
0
                                                            NULL,
338
0
                                                            flb_aws_client_generator());
339
                /* Session name can be freed once provider is created */
340
0
                flb_free(aws_session_name);
341
0
                if (!ctx->aws_provider) {
342
0
                    flb_error("[out_es] Failed to create AWS STS Credential "
343
0
                              "Provider");
344
0
                    flb_os_conf_destroy(ctx);
345
0
                    return NULL;
346
0
                }
347
348
0
            }
349
350
            /* initialize credentials in sync mode */
351
0
            ctx->aws_provider->provider_vtable->sync(ctx->aws_provider);
352
0
            ctx->aws_provider->provider_vtable->init(ctx->aws_provider);
353
            /* set back to async */
354
0
            ctx->aws_provider->provider_vtable->async(ctx->aws_provider);
355
0
            ctx->aws_provider->provider_vtable->upstream_set(ctx->aws_provider, ctx->ins);
356
0
        }
357
0
    }
358
0
#endif
359
360
0
    return ctx;
361
0
}
362
363
int flb_os_conf_destroy(struct flb_opensearch *ctx)
364
0
{
365
0
    if (!ctx) {
366
0
        return 0;
367
0
    }
368
369
0
    if (ctx->u) {
370
0
        flb_upstream_destroy(ctx->u);
371
0
    }
372
0
    if (ctx->ra_id_key) {
373
0
        flb_ra_destroy(ctx->ra_id_key);
374
0
        ctx->ra_id_key = NULL;
375
0
    }
376
377
0
#ifdef FLB_HAVE_AWS
378
0
    if (ctx->base_aws_provider) {
379
0
        flb_aws_provider_destroy(ctx->base_aws_provider);
380
0
    }
381
382
0
    if (ctx->aws_provider) {
383
0
        flb_aws_provider_destroy(ctx->aws_provider);
384
0
    }
385
386
0
    if (ctx->aws_tls) {
387
0
        flb_tls_destroy(ctx->aws_tls);
388
0
    }
389
390
0
    if (ctx->aws_sts_tls) {
391
0
        flb_tls_destroy(ctx->aws_sts_tls);
392
0
    }
393
394
0
    if (ctx->aws_unsigned_headers) {
395
0
        flb_slist_destroy(ctx->aws_unsigned_headers);
396
0
        flb_free(ctx->aws_unsigned_headers);
397
0
    }
398
0
#endif
399
400
0
    if (ctx->ra_prefix_key) {
401
0
        flb_ra_destroy(ctx->ra_prefix_key);
402
0
    }
403
404
0
    if (ctx->ra_index) {
405
0
        flb_ra_destroy(ctx->ra_index);
406
0
    }
407
408
0
    flb_free(ctx);
409
410
0
    return 0;
411
0
}