Coverage Report

Created: 2025-09-04 07:51

/src/fluent-bit/plugins/custom_calyptia/calyptia.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_custom_plugin.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_env.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_router.h>
25
26
/* pipeline plugins */
27
#include <fluent-bit/flb_input.h>
28
#include <fluent-bit/flb_filter.h>
29
#include <fluent-bit/flb_output.h>
30
#include <fluent-bit/flb_custom_plugin.h>
31
#include <fluent-bit/flb_config.h>
32
#include <fluent-bit/flb_config_map.h>
33
#include <fluent-bit/flb_utils.h>
34
#include <fluent-bit/flb_hash.h>
35
36
#include <fluent-bit/calyptia/calyptia_constants.h>
37
38
#include "calyptia.h"
39
40
0
#define UUID_BUFFER_SIZE 38 /* Maximum length of UUID string + null terminator */
41
42
/* Function wrappers to enable mocking for unit test filesystem access */
43
int (*flb_access)(const char *pathname, int mode) = access;
44
int (*flb_open)(const char *pathname, int flags, ...) = open;
45
ssize_t (*flb_write)(int fd, const void *buf, size_t count) = write;
46
int (*flb_close)(int fd) = close;
47
int (*flb_utils_read_file_wrapper)(char *path, char **out_buf, size_t *out_size) = flb_utils_read_file;
48
49
/*
50
 * Check if the key belongs to a sensitive data field, if so report it. We never
51
 * share any sensitive data.
52
 */
53
static int is_sensitive_property(char *key)
54
0
{
55
56
0
    if (strcasecmp(key, "password") == 0 ||
57
0
        strcasecmp(key, "passwd") == 0   ||
58
0
        strcasecmp(key, "user") == 0 ||
59
0
        strcasecmp(key, "http_user") == 0 ||
60
0
        strcasecmp(key, "http_passwd") == 0 ||
61
0
        strcasecmp(key, "shared_key") == 0 ||
62
0
        strcasecmp(key, "endpoint") == 0 ||
63
0
        strcasecmp(key, "apikey") == 0 ||
64
0
        strcasecmp(key, "private_key") == 0 ||
65
0
        strcasecmp(key, "service_account_secret") == 0 ||
66
0
        strcasecmp(key, "splunk_token") == 0 ||
67
0
        strcasecmp(key, "logdna_host") == 0 ||
68
0
        strcasecmp(key, "api_key") == 0 ||
69
0
        strcasecmp(key, "hostname") == 0 ||
70
0
        strcasecmp(key, "license_key") == 0 ||
71
0
        strcasecmp(key, "base_uri") == 0 ||
72
0
        strcasecmp(key, "api") == 0) {
73
74
0
        return FLB_TRUE;
75
0
    }
76
77
0
    return FLB_FALSE;
78
0
}
79
80
static void pipeline_config_add_properties(flb_sds_t *buf, struct mk_list *props)
81
0
{
82
0
    struct mk_list *head;
83
0
    struct flb_kv *kv;
84
85
0
    mk_list_foreach(head, props) {
86
0
        kv = mk_list_entry(head, struct flb_kv, _head);
87
88
0
        if (kv->key != NULL && kv->val != NULL) {
89
0
            flb_sds_printf(buf, "    %s ", kv->key);
90
91
0
            if (is_sensitive_property(kv->key)) {
92
0
                flb_sds_cat_safe(buf, "--redacted--", strlen("--redacted--"));
93
0
            }
94
0
            else {
95
0
                flb_sds_cat_safe(buf, kv->val, strlen(kv->val));
96
0
            }
97
98
0
            flb_sds_cat_safe(buf, "\n", 1);
99
0
        }
100
0
    }
101
0
}
102
103
flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx)
104
0
{
105
0
    char tmp[32];
106
0
    flb_sds_t buf;
107
0
    struct mk_list *head;
108
0
    struct flb_input_instance *i_ins;
109
0
    struct flb_filter_instance *f_ins;
110
0
    struct flb_output_instance *o_ins;
111
112
0
    buf = flb_sds_create_size(2048);
113
114
0
    if (!buf) {
115
0
        return NULL;
116
0
    }
117
118
    /* [INPUT] */
119
0
    mk_list_foreach(head, &ctx->inputs) {
120
0
        i_ins = mk_list_entry(head, struct flb_input_instance, _head);
121
0
        flb_sds_printf(&buf, "[INPUT]\n");
122
0
        flb_sds_printf(&buf, "    name %s\n", i_ins->name);
123
124
0
        if (i_ins->alias) {
125
0
            flb_sds_printf(&buf, "    alias %s\n", i_ins->alias);
126
0
        }
127
128
0
        if (i_ins->tag) {
129
0
            flb_sds_printf(&buf, "    tag %s\n", i_ins->tag);
130
0
        }
131
132
0
        if (i_ins->mem_buf_limit > 0) {
133
0
            flb_utils_bytes_to_human_readable_size(i_ins->mem_buf_limit,
134
0
                                                   tmp, sizeof(tmp) - 1);
135
0
            flb_sds_printf(&buf, "    mem_buf_limit %s\n", tmp);
136
0
        }
137
138
0
        pipeline_config_add_properties(&buf, &i_ins->properties);
139
0
    }
140
0
    flb_sds_printf(&buf, "\n");
141
142
    /* Config: [FILTER] */
143
0
    mk_list_foreach(head, &ctx->filters) {
144
0
        f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
145
146
0
        flb_sds_printf(&buf, "[FILTER]\n");
147
0
        flb_sds_printf(&buf, "    name  %s\n", f_ins->name);
148
0
        flb_sds_printf(&buf, "    match %s\n", f_ins->match);
149
150
0
        pipeline_config_add_properties(&buf, &f_ins->properties);
151
0
    }
152
0
    flb_sds_printf(&buf, "\n");
153
154
    /* Config: [OUTPUT] */
155
0
    mk_list_foreach(head, &ctx->outputs) {
156
0
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
157
158
0
        flb_sds_printf(&buf, "[OUTPUT]\n");
159
0
        flb_sds_printf(&buf, "    name  %s\n", o_ins->name);
160
161
0
        if (o_ins->match) {
162
0
            flb_sds_printf(&buf, "    match %s\n", o_ins->match);
163
0
        }
164
0
        else {
165
0
            flb_sds_printf(&buf, "    match *\n");
166
0
        }
167
168
0
#ifdef FLB_HAVE_TLS
169
0
        if (o_ins->use_tls == FLB_TRUE) {
170
0
            flb_sds_printf(&buf, "    tls   %s\n", o_ins->use_tls ? "on" : "off");
171
0
            flb_sds_printf(&buf, "    tls.verify     %s\n",
172
0
                             o_ins->tls_verify ? "on": "off");
173
174
0
            if (o_ins->tls_ca_file) {
175
0
                flb_sds_printf(&buf, "    tls.ca_file    %s\n",
176
0
                               o_ins->tls_ca_file);
177
0
            }
178
179
0
            if (o_ins->tls_crt_file) {
180
0
                flb_sds_printf(&buf, "    tls.crt_file   %s\n",
181
0
                               o_ins->tls_crt_file);
182
0
            }
183
184
0
            if (o_ins->tls_key_file) {
185
0
                flb_sds_printf(&buf, "    tls.key_file   %s\n",
186
0
                               o_ins->tls_key_file);
187
0
            }
188
189
0
            if (o_ins->tls_key_passwd) {
190
0
                flb_sds_printf(&buf, "    tls.key_passwd --redacted--\n");
191
0
            }
192
0
        }
193
0
#endif
194
195
0
        if (o_ins->retry_limit == FLB_OUT_RETRY_UNLIMITED) {
196
0
            flb_sds_printf(&buf, "    retry_limit no_limits\n");
197
0
        }
198
0
        else if (o_ins->retry_limit == FLB_OUT_RETRY_NONE) {
199
0
            flb_sds_printf(&buf, "    retry_limit no_retries\n");
200
0
        }
201
0
        else {
202
0
            flb_sds_printf(&buf, "    retry_limit %i\n", o_ins->retry_limit);
203
0
        }
204
205
0
        if (o_ins->host.name) {
206
0
            flb_sds_printf(&buf, "    host  --redacted--\n");
207
0
        }
208
209
0
        pipeline_config_add_properties(&buf, &o_ins->properties);
210
0
        flb_sds_printf(&buf, "\n");
211
0
    }
212
213
0
    return buf;
214
0
}
215
216
int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet)
217
0
{
218
0
    if (!fleet) {
219
0
        flb_plg_error(ctx->ins, "invalid fleet input instance");
220
0
        return -1;
221
0
    }
222
223
0
    if (ctx->fleet_name) {
224
0
        flb_input_set_property(fleet, "fleet_name", ctx->fleet_name);
225
0
    }
226
227
0
    if (ctx->fleet_id) {
228
0
        flb_input_set_property(fleet, "fleet_id", ctx->fleet_id);
229
0
    }
230
231
0
    flb_input_set_property(fleet, "api_key", ctx->api_key);
232
0
    flb_input_set_property(fleet, "host", ctx->cloud_host);
233
0
    flb_input_set_property(fleet, "port", ctx->cloud_port);
234
0
    flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir);
235
0
    flb_input_set_property(fleet, "fleet_config_legacy_format", ctx->fleet_config_legacy_format == 1 ? "on" : "off");
236
237
    /* Set TLS properties */
238
0
    flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off");
239
0
    flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off");
240
241
    /* Optional configurations */
242
0
    if (ctx->fleet_max_http_buffer_size) {
243
0
        flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
244
0
    }
245
246
0
    if (ctx->machine_id) {
247
0
        flb_input_set_property(fleet, "machine_id", ctx->machine_id);
248
0
    }
249
250
0
    if (ctx->fleet_interval_sec) {
251
0
        flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec);
252
0
    }
253
254
0
    if (ctx->fleet_interval_nsec) {
255
0
        flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec);
256
0
    }
257
258
0
    return 0;
259
0
}
260
261
static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx)
262
0
{
263
0
    int ret;
264
0
    struct flb_output_instance *cloud;
265
0
    struct mk_list *head;
266
0
    struct flb_slist_entry *key = NULL;
267
0
    struct flb_slist_entry *val = NULL;
268
0
    flb_sds_t label;
269
0
    struct flb_config_map_val *mv;
270
271
0
    cloud = flb_output_new(config, "calyptia", ctx, FLB_FALSE);
272
273
0
    if (!cloud) {
274
0
        flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector");
275
0
        return NULL;
276
0
    }
277
278
    /* direct connect / routing */
279
0
    ret = flb_router_connect_direct(ctx->i, cloud);
280
281
0
    if (ret != 0) {
282
0
        flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector");
283
0
        return NULL;
284
0
    }
285
286
0
    if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
287
288
        /* iterate all 'add_label' definitions */
289
0
        flb_config_map_foreach(head, mv, ctx->add_labels) {
290
0
            key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
291
0
            val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
292
293
0
            label = flb_sds_create_size(strlen(key->str) + strlen(val->str) + 1);
294
295
0
            if (!label) {
296
0
                return NULL;
297
0
            }
298
299
0
            flb_sds_printf(&label, "%s %s", key->str, val->str);
300
0
            flb_output_set_property(cloud, "add_label", label);
301
0
            flb_sds_destroy(label);
302
0
        }
303
0
    }
304
305
0
    flb_output_set_property(cloud, "match", "_calyptia_cloud");
306
0
    flb_output_set_property(cloud, "api_key", ctx->api_key);
307
308
0
    if (ctx->register_retry_on_flush) {
309
0
        flb_output_set_property(cloud, "register_retry_on_flush", "true");
310
0
    } else {
311
0
        flb_output_set_property(cloud, "register_retry_on_flush", "false");
312
0
    }
313
314
0
    if (ctx->store_path) {
315
0
        flb_output_set_property(cloud, "store_path", ctx->store_path);
316
0
    }
317
318
0
    if (ctx->machine_id) {
319
0
        flb_output_set_property(cloud, "machine_id", ctx->machine_id);
320
0
    }
321
322
    /* Override network details: development purposes only */
323
0
    if (ctx->cloud_host) {
324
0
        flb_output_set_property(cloud, "cloud_host", ctx->cloud_host);
325
0
    }
326
327
0
    if (ctx->cloud_port) {
328
0
        flb_output_set_property(cloud, "cloud_port", ctx->cloud_port);
329
0
    }
330
331
0
    if (ctx->cloud_tls) {
332
0
        flb_output_set_property(cloud, "tls", "true");
333
0
    }
334
0
    else {
335
0
        flb_output_set_property(cloud, "tls", "false");
336
0
    }
337
338
0
    if (ctx->cloud_tls_verify) {
339
0
        flb_output_set_property(cloud, "tls.verify", "true");
340
0
    }
341
0
    else {
342
0
        flb_output_set_property(cloud, "tls.verify", "false");
343
0
    }
344
345
0
    if (ctx->fleet_id) {
346
0
        label = flb_sds_create_size(strlen("fleet_id") + strlen(ctx->fleet_id) + 1);
347
348
0
        if (!label) {
349
0
            return NULL;
350
0
        }
351
352
0
        flb_sds_printf(&label, "fleet_id %s", ctx->fleet_id);
353
0
        flb_output_set_property(cloud, "add_label", label);
354
0
        flb_sds_destroy(label);
355
0
    }
356
357
0
#ifdef FLB_HAVE_CHUNK_TRACE
358
0
    flb_output_set_property(cloud, "pipeline_id", ctx->pipeline_id);
359
0
#endif /* FLB_HAVE_CHUNK_TRACE */
360
361
0
    return cloud;
362
0
}
363
364
static flb_sds_t sha256_to_hex(unsigned char *sha256)
365
0
{
366
0
    int idx;
367
0
    flb_sds_t hex;
368
0
    flb_sds_t tmp;
369
370
0
    hex = flb_sds_create_size(64);
371
372
0
    if (!hex) {
373
0
        return NULL;
374
0
    }
375
376
0
    for (idx = 0; idx < 32; idx++) {
377
0
        tmp = flb_sds_printf(&hex, "%02x", sha256[idx]);
378
379
0
        if (!tmp) {
380
0
            flb_sds_destroy(hex);
381
0
            return NULL;
382
0
        }
383
384
0
        hex = tmp;
385
0
    }
386
387
0
    flb_sds_len_set(hex, 64);
388
0
    return hex;
389
0
}
390
391
static flb_sds_t generate_base_agent_directory(struct calyptia *ctx, flb_sds_t *fleet_dir)
392
0
{
393
0
    flb_sds_t ret = NULL;
394
395
0
    if (ctx == NULL || fleet_dir == NULL) {
396
0
        return NULL;
397
0
    }
398
399
0
    if (*fleet_dir == NULL) {
400
0
        *fleet_dir = flb_sds_create_size(CALYPTIA_MAX_DIR_SIZE);
401
0
        if (*fleet_dir == NULL) {
402
0
            return NULL;
403
0
        }
404
0
    }
405
406
0
    ret = flb_sds_printf(fleet_dir, "%s", ctx->fleet_config_dir);
407
0
    if (ret == NULL) {
408
0
        flb_sds_destroy(*fleet_dir);
409
0
        return NULL;
410
0
    }
411
412
0
    return ret;
413
0
}
414
415
flb_sds_t agent_config_filename(struct calyptia *ctx, char *fname)
416
0
{
417
0
    flb_sds_t cfgname = NULL;
418
0
    flb_sds_t ret;
419
420
0
    if (ctx == NULL || fname == NULL) {
421
0
        return NULL;
422
0
    }
423
424
0
    if (generate_base_agent_directory(ctx, &cfgname) == NULL) {
425
0
        return NULL;
426
0
    }
427
428
0
    ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.conf", fname);
429
0
    if (ret == NULL) {
430
0
        flb_sds_destroy(cfgname);
431
0
        return NULL;
432
0
    }
433
434
0
    return cfgname;
435
0
}
436
437
0
static char* generate_uuid() {
438
0
    char* uuid = flb_malloc(UUID_BUFFER_SIZE);
439
0
    if (!uuid) {
440
0
        flb_errno();
441
0
        return NULL;
442
0
    }
443
444
    /* create new UUID for fleet */
445
0
    if (flb_utils_uuid_v4_gen(uuid) != 0 || strlen(uuid) == 0) {
446
0
        flb_free(uuid);
447
0
        return NULL;
448
0
    }
449
0
    return uuid;
450
0
}
451
452
0
static int write_uuid_to_file(flb_sds_t fleet_machine_id, char* uuid) {
453
0
    int fd;
454
0
    size_t uuid_len;
455
456
0
    if (fleet_machine_id == NULL || uuid == NULL) {
457
0
        return FLB_FALSE;
458
0
    }
459
460
    /* write uuid to file */
461
0
    fd = flb_open(fleet_machine_id, O_CREAT | O_WRONLY | O_TRUNC, 0666);
462
0
    if (fd == -1) {
463
0
        return FLB_FALSE;
464
0
    }
465
466
0
    uuid_len = strlen(uuid);
467
468
0
    if (flb_write(fd, uuid, uuid_len) != uuid_len) {
469
0
        flb_close(fd);
470
0
        return FLB_FALSE;
471
0
    }
472
473
0
    flb_close(fd);
474
0
    return FLB_TRUE;
475
0
}
476
477
static int create_agent_directory(struct calyptia *ctx)
478
0
{
479
0
    if( ctx == NULL ) {
480
0
        return -1;
481
0
    }
482
483
    /* If it exists just return */
484
0
    if (access(ctx->fleet_config_dir, F_OK) == 0) {
485
0
        return 0;
486
0
    }
487
488
    /* Create the directory if it does not exist */
489
0
    if (flb_utils_mkdir(ctx->fleet_config_dir, 0700) != 0) {
490
0
        flb_plg_error(ctx->ins, "failed to create directory: %s", ctx->fleet_config_dir);
491
0
        return -1;
492
0
    }
493
494
0
    return 0;
495
0
}
496
497
flb_sds_t get_machine_id(struct calyptia *ctx)
498
0
{
499
0
    int ret = -1;
500
0
    char *buf = NULL;
501
0
    size_t blen = 0;
502
0
    unsigned char sha256_buf[64] = {0};
503
504
#if defined(FLB_SYSTEM_WINDOWS)
505
    /* retrieve raw machine id */
506
    ret = flb_utils_get_machine_id(&buf, &blen);
507
#else
508
    /* /etc/machine-id is not guaranteed to be unique so we generate one */
509
0
    flb_sds_t fleet_machine_id = NULL;
510
511
    /** ensure we have the directory created */
512
0
    if (create_agent_directory(ctx) != 0) {
513
0
        return NULL;
514
0
    }
515
516
    /** now get the agent filename */
517
0
    fleet_machine_id = machine_id_fleet_config_filename(ctx);
518
0
    if (fleet_machine_id == NULL) {
519
0
        flb_plg_error(ctx->ins, "unable to allocate machine id file");
520
0
        return NULL;
521
0
    }
522
523
    /* check if the file exists first, if it does not we generate a UUID */
524
0
    if (flb_access(fleet_machine_id, F_OK) != 0) {
525
526
        /* create new UUID for fleet */
527
0
        buf = generate_uuid();
528
0
        if( buf == NULL ) {
529
0
            flb_plg_error(ctx->ins, "failed to create uuid for fleet machine id");
530
0
            flb_sds_destroy(fleet_machine_id);
531
0
            return NULL;
532
0
        }
533
0
        flb_plg_info(ctx->ins, "generated UUID for machine ID: %s", buf);
534
535
        /* write uuid to file */
536
0
        if (write_uuid_to_file(fleet_machine_id, buf ) != FLB_TRUE) {
537
0
            flb_plg_error(ctx->ins, "failed to write fleet machine id file: %s", fleet_machine_id);
538
0
            flb_free(buf);
539
0
            flb_sds_destroy(fleet_machine_id);
540
0
            return NULL;
541
0
        }
542
543
0
        flb_free(buf);
544
0
        buf = NULL;
545
546
0
        flb_plg_info(ctx->ins, "written machine ID to file: %s", fleet_machine_id);
547
0
    }
548
549
    /* now check file exists (it always should) and read from it */
550
0
    if (flb_access(fleet_machine_id, F_OK) == 0) {
551
0
        ret = flb_utils_read_file_wrapper(fleet_machine_id, &buf, &blen);
552
0
        if (ret != 0) {
553
0
            flb_plg_error(ctx->ins, "failed to read fleet machine id file: %s", fleet_machine_id);
554
0
            flb_sds_destroy(fleet_machine_id);
555
0
            return NULL;
556
0
        }
557
0
        flb_plg_info(ctx->ins, "read UUID (%s) from file: %s", buf, fleet_machine_id);
558
0
    }
559
0
    else { /* fall back to machine-id */
560
0
        flb_plg_warn(ctx->ins, "unable to get uuid from file (%s) so falling back to machine id", fleet_machine_id);
561
0
        ret = flb_utils_get_machine_id(&buf, &blen);
562
0
    }
563
564
    /* Clean up no longer required filename */
565
0
    flb_sds_destroy(fleet_machine_id);
566
0
#endif
567
568
0
    if (ret == -1) {
569
0
        flb_plg_error(ctx->ins, "could not obtain machine id");
570
0
        return NULL;
571
0
    }
572
573
0
    ret = flb_hash_simple(FLB_HASH_SHA256,
574
0
                          (unsigned char *) buf,
575
0
                          blen,
576
0
                          sha256_buf,
577
0
                          sizeof(sha256_buf));
578
0
    flb_free(buf);
579
580
0
    if (ret != FLB_CRYPTO_SUCCESS) {
581
0
        return NULL;
582
0
    }
583
584
    /* convert to hex */
585
0
    return sha256_to_hex(sha256_buf);
586
0
}
587
588
static int cb_calyptia_init(struct flb_custom_instance *ins,
589
                         struct flb_config *config,
590
                         void *data)
591
0
{
592
0
    int ret;
593
0
    struct calyptia *ctx;
594
0
    (void) data;
595
596
0
    ctx = flb_calloc(1, sizeof(struct calyptia));
597
0
    if (!ctx) {
598
0
        flb_errno();
599
0
        return -1;
600
0
    }
601
0
    ctx->ins = ins;
602
603
    /* Load the config map */
604
0
    ret = flb_custom_config_map_set(ins, (void *) ctx);
605
0
    if (ret == -1) {
606
0
        flb_free(ctx);
607
0
        return -1;
608
0
    }
609
610
    /* map instance and local context */
611
0
    flb_custom_set_context(ins, ctx);
612
613
    /* If no machine_id has been provided via a configuration option get it from the local machine-id. */
614
0
    if (!ctx->machine_id) {
615
0
        ctx->machine_id = get_machine_id(ctx);
616
0
        if (ctx->machine_id == NULL) {
617
0
            flb_plg_error(ctx->ins, "unable to retrieve machine_id");
618
0
            flb_free(ctx);
619
0
            return -1;
620
0
        }
621
0
        ctx->machine_id_auto_configured = FLB_TRUE;
622
0
    }
623
624
    /* input collector */
625
0
    ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE);
626
0
    if (!ctx->i) {
627
0
        flb_plg_error(ctx->ins, "could not load metrics collector");
628
0
        flb_free(ctx);
629
0
        return -1;
630
0
    }
631
632
0
    flb_input_set_property(ctx->i, "tag", "_calyptia_cloud");
633
0
    flb_input_set_property(ctx->i, "scrape_on_start", "true");
634
    // This scrape interval should be configurable.
635
0
    flb_input_set_property(ctx->i, "scrape_interval", "30");
636
637
    /* Setup cloud output if needed */
638
0
    if (ctx->fleet_id != NULL || !ctx->fleet_name) {
639
0
        ctx->o = setup_cloud_output(config, ctx);
640
0
        if (ctx->o == NULL) {
641
0
            flb_free(ctx);
642
0
            return -1;
643
0
        }
644
        /* Set fleet_id for output if present */
645
0
        if (ctx->fleet_id) {
646
0
            flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
647
0
        }
648
0
    }
649
650
    /* Setup fleet input if needed */
651
0
    if (ctx->fleet_id || ctx->fleet_name) {
652
0
        ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);
653
0
        if (!ctx->fleet) {
654
0
            flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin");
655
0
            return -1;
656
0
        }
657
658
0
        ret = set_fleet_input_properties(ctx, ctx->fleet);
659
0
        if (ret == -1) {
660
0
            return -1;
661
0
        }
662
0
    }
663
664
0
    if (ctx->o) {
665
0
        flb_router_connect(ctx->i, ctx->o);
666
0
    }
667
668
0
    flb_plg_info(ins, "custom initialized!");
669
0
    return 0;
670
0
}
671
672
static int cb_calyptia_exit(void *data, struct flb_config *config)
673
0
{
674
0
    struct calyptia *ctx = data;
675
676
0
    if (!ctx) {
677
0
        return 0;
678
0
    }
679
680
0
    if (ctx->machine_id && ctx->machine_id_auto_configured) {
681
0
        flb_sds_destroy(ctx->machine_id);
682
0
    }
683
684
0
    flb_free(ctx);
685
0
    return 0;
686
0
}
687
688
/* Configuration properties map */
689
static struct flb_config_map config_map[] = {
690
    {
691
     FLB_CONFIG_MAP_STR, "api_key", NULL,
692
     0, FLB_TRUE, offsetof(struct calyptia, api_key),
693
     "Calyptia Cloud API Key."
694
    },
695
696
    {
697
     FLB_CONFIG_MAP_STR, "store_path", NULL,
698
     0, FLB_TRUE, offsetof(struct calyptia, store_path)
699
    },
700
701
    {
702
     FLB_CONFIG_MAP_STR, "calyptia_host", DEFAULT_CALYPTIA_HOST,
703
     0, FLB_TRUE, offsetof(struct calyptia, cloud_host),
704
     ""
705
    },
706
707
    {
708
     FLB_CONFIG_MAP_STR, "calyptia_port", DEFAULT_CALYPTIA_PORT,
709
     0, FLB_TRUE, offsetof(struct calyptia, cloud_port),
710
     ""
711
    },
712
713
    {
714
     FLB_CONFIG_MAP_BOOL, "calyptia_tls", "true",
715
     0, FLB_TRUE, offsetof(struct calyptia, cloud_tls),
716
     ""
717
    },
718
719
    {
720
     FLB_CONFIG_MAP_BOOL, "calyptia_tls.verify", "true",
721
     0, FLB_TRUE, offsetof(struct calyptia, cloud_tls_verify),
722
     ""
723
    },
724
725
    {
726
     FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
727
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct calyptia, add_labels),
728
     "Label to append to the generated metric."
729
    },
730
    {
731
     FLB_CONFIG_MAP_STR, "machine_id", NULL,
732
     0, FLB_TRUE, offsetof(struct calyptia, machine_id),
733
     "Custom machine_id to be used when registering agent"
734
    },
735
    {
736
     FLB_CONFIG_MAP_STR, "fleet_id", NULL,
737
     0, FLB_TRUE, offsetof(struct calyptia, fleet_id),
738
     "Fleet id to be used when registering agent in a fleet"
739
    },
740
    {
741
     FLB_CONFIG_MAP_STR, "fleet.config_dir", FLEET_DEFAULT_CONFIG_DIR,
742
     0, FLB_TRUE, offsetof(struct calyptia, fleet_config_dir),
743
     "Base path for the configuration directory."
744
    },
745
    {
746
      FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1",
747
      0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec),
748
      "Set the collector interval"
749
    },
750
    {
751
      FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1",
752
      0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec),
753
      "Set the collector interval (nanoseconds)"
754
    },
755
    {
756
      FLB_CONFIG_MAP_STR, "fleet.max_http_buffer_size", NULL,
757
      0, FLB_TRUE, offsetof(struct calyptia, fleet_max_http_buffer_size),
758
      "Max HTTP buffer size for fleet"
759
    },
760
    {
761
     FLB_CONFIG_MAP_STR, "fleet_name", NULL,
762
     0, FLB_TRUE, offsetof(struct calyptia, fleet_name),
763
     "Fleet name to be used when registering agent in a fleet"
764
    },
765
766
#ifdef FLB_HAVE_CHUNK_TRACE
767
    {
768
     FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
769
     0, FLB_TRUE, offsetof(struct calyptia, pipeline_id),
770
     "Pipeline ID for reporting to calyptia cloud."
771
    },
772
#endif /* FLB_HAVE_CHUNK_TRACE */
773
    {
774
     FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
775
     0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
776
     "Retry agent registration on flush if failed on init."
777
    },
778
    {
779
     FLB_CONFIG_MAP_BOOL, "fleet_config_legacy_format", "true",
780
     0, FLB_TRUE, offsetof(struct calyptia, fleet_config_legacy_format),
781
     "If set, use legacy (TOML) format for configuration files."
782
    },
783
    /* EOF */
784
    {0}
785
};
786
787
struct flb_custom_plugin custom_calyptia_plugin = {
788
    .name         = "calyptia",
789
    .description  = "Calyptia Cloud",
790
    .config_map   = config_map,
791
    .cb_init      = cb_calyptia_init,
792
    .cb_exit      = cb_calyptia_exit,
793
};