Coverage Report

Created: 2026-06-07 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/custom_calyptia/calyptia.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_custom_plugin.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_env.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_router.h>
25
26
/* pipeline plugins */
27
#include <fluent-bit/flb_input.h>
28
#include <fluent-bit/flb_filter.h>
29
#include <fluent-bit/flb_output.h>
30
#include <fluent-bit/flb_custom_plugin.h>
31
#include <fluent-bit/flb_config.h>
32
#include <fluent-bit/flb_config_map.h>
33
#include <fluent-bit/flb_utils.h>
34
#include <fluent-bit/flb_hash.h>
35
36
#include <fluent-bit/calyptia/calyptia_constants.h>
37
38
#include "calyptia.h"
39
40
0
#define UUID_BUFFER_SIZE 38 /* Maximum length of UUID string + null terminator */
41
42
/* Function wrappers to enable mocking for unit test filesystem access */
43
int (*flb_access)(const char *pathname, int mode) = access;
44
int (*flb_open)(const char *pathname, int flags, ...) = open;
45
ssize_t (*flb_write)(int fd, const void *buf, size_t count) = write;
46
int (*flb_close)(int fd) = close;
47
int (*flb_utils_read_file_wrapper)(char *path, char **out_buf, size_t *out_size) = flb_utils_read_file;
48
49
/*
50
 * Check if the key belongs to a sensitive data field, if so report it. We never
51
 * share any sensitive data.
52
 */
53
static int is_sensitive_property(char *key)
54
0
{
55
56
0
    if (strcasecmp(key, "password") == 0 ||
57
0
        strcasecmp(key, "passwd") == 0   ||
58
0
        strcasecmp(key, "user") == 0 ||
59
0
        strcasecmp(key, "http_user") == 0 ||
60
0
        strcasecmp(key, "http_passwd") == 0 ||
61
0
        strcasecmp(key, "shared_key") == 0 ||
62
0
        strcasecmp(key, "endpoint") == 0 ||
63
0
        strcasecmp(key, "apikey") == 0 ||
64
0
        strcasecmp(key, "private_key") == 0 ||
65
0
        strcasecmp(key, "service_account_secret") == 0 ||
66
0
        strcasecmp(key, "splunk_token") == 0 ||
67
0
        strcasecmp(key, "logdna_host") == 0 ||
68
0
        strcasecmp(key, "api_key") == 0 ||
69
0
        strcasecmp(key, "hostname") == 0 ||
70
0
        strcasecmp(key, "license_key") == 0 ||
71
0
        strcasecmp(key, "base_uri") == 0 ||
72
0
        strcasecmp(key, "api") == 0) {
73
74
0
        return FLB_TRUE;
75
0
    }
76
77
0
    return FLB_FALSE;
78
0
}
79
80
static void pipeline_config_add_properties(flb_sds_t *buf, struct mk_list *props)
81
0
{
82
0
    struct mk_list *head;
83
0
    struct flb_kv *kv;
84
85
0
    mk_list_foreach(head, props) {
86
0
        kv = mk_list_entry(head, struct flb_kv, _head);
87
88
0
        if (kv->key != NULL && kv->val != NULL) {
89
0
            flb_sds_printf(buf, "    %s ", kv->key);
90
91
0
            if (is_sensitive_property(kv->key)) {
92
0
                flb_sds_cat_safe(buf, "--redacted--", strlen("--redacted--"));
93
0
            }
94
0
            else {
95
0
                flb_sds_cat_safe(buf, kv->val, strlen(kv->val));
96
0
            }
97
98
0
            flb_sds_cat_safe(buf, "\n", 1);
99
0
        }
100
0
    }
101
0
}
102
103
flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx)
104
0
{
105
0
    char tmp[32];
106
0
    flb_sds_t buf;
107
0
    struct mk_list *head;
108
0
    struct flb_input_instance *i_ins;
109
0
    struct flb_filter_instance *f_ins;
110
0
    struct flb_output_instance *o_ins;
111
112
0
    buf = flb_sds_create_size(2048);
113
114
0
    if (!buf) {
115
0
        return NULL;
116
0
    }
117
118
    /* [INPUT] */
119
0
    mk_list_foreach(head, &ctx->inputs) {
120
0
        i_ins = mk_list_entry(head, struct flb_input_instance, _head);
121
0
        flb_sds_printf(&buf, "[INPUT]\n");
122
0
        flb_sds_printf(&buf, "    name %s\n", i_ins->name);
123
124
0
        if (i_ins->alias) {
125
0
            flb_sds_printf(&buf, "    alias %s\n", i_ins->alias);
126
0
        }
127
128
0
        if (i_ins->tag) {
129
0
            flb_sds_printf(&buf, "    tag %s\n", i_ins->tag);
130
0
        }
131
132
0
        if (i_ins->mem_buf_limit > 0) {
133
0
            flb_utils_bytes_to_human_readable_size(i_ins->mem_buf_limit,
134
0
                                                   tmp, sizeof(tmp) - 1);
135
0
            flb_sds_printf(&buf, "    mem_buf_limit %s\n", tmp);
136
0
        }
137
138
0
        pipeline_config_add_properties(&buf, &i_ins->properties);
139
0
    }
140
0
    flb_sds_printf(&buf, "\n");
141
142
    /* Config: [FILTER] */
143
0
    mk_list_foreach(head, &ctx->filters) {
144
0
        f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
145
146
0
        flb_sds_printf(&buf, "[FILTER]\n");
147
0
        flb_sds_printf(&buf, "    name  %s\n", f_ins->name);
148
0
        flb_sds_printf(&buf, "    match %s\n", f_ins->match);
149
150
0
        pipeline_config_add_properties(&buf, &f_ins->properties);
151
0
    }
152
0
    flb_sds_printf(&buf, "\n");
153
154
    /* Config: [OUTPUT] */
155
0
    mk_list_foreach(head, &ctx->outputs) {
156
0
        o_ins = mk_list_entry(head, struct flb_output_instance, _head);
157
158
0
        flb_sds_printf(&buf, "[OUTPUT]\n");
159
0
        flb_sds_printf(&buf, "    name  %s\n", o_ins->name);
160
161
0
        if (o_ins->match) {
162
0
            flb_sds_printf(&buf, "    match %s\n", o_ins->match);
163
0
        }
164
0
        else {
165
0
            flb_sds_printf(&buf, "    match *\n");
166
0
        }
167
168
0
#ifdef FLB_HAVE_TLS
169
0
        if (o_ins->use_tls == FLB_TRUE) {
170
0
            flb_sds_printf(&buf, "    tls   %s\n", o_ins->use_tls ? "on" : "off");
171
0
            flb_sds_printf(&buf, "    tls.verify     %s\n",
172
0
                             o_ins->tls_verify ? "on": "off");
173
174
0
            if (o_ins->tls_ca_file) {
175
0
                flb_sds_printf(&buf, "    tls.ca_file    %s\n",
176
0
                               o_ins->tls_ca_file);
177
0
            }
178
179
0
            if (o_ins->tls_crt_file) {
180
0
                flb_sds_printf(&buf, "    tls.crt_file   %s\n",
181
0
                               o_ins->tls_crt_file);
182
0
            }
183
184
0
            if (o_ins->tls_key_file) {
185
0
                flb_sds_printf(&buf, "    tls.key_file   %s\n",
186
0
                               o_ins->tls_key_file);
187
0
            }
188
189
0
            if (o_ins->tls_key_passwd) {
190
0
                flb_sds_printf(&buf, "    tls.key_passwd --redacted--\n");
191
0
            }
192
0
        }
193
0
#endif
194
195
0
        if (o_ins->retry_limit == FLB_OUT_RETRY_UNLIMITED) {
196
0
            flb_sds_printf(&buf, "    retry_limit no_limits\n");
197
0
        }
198
0
        else if (o_ins->retry_limit == FLB_OUT_RETRY_NONE) {
199
0
            flb_sds_printf(&buf, "    retry_limit no_retries\n");
200
0
        }
201
0
        else {
202
0
            flb_sds_printf(&buf, "    retry_limit %i\n", o_ins->retry_limit);
203
0
        }
204
205
0
        if (o_ins->host.name) {
206
0
            flb_sds_printf(&buf, "    host  --redacted--\n");
207
0
        }
208
209
0
        pipeline_config_add_properties(&buf, &o_ins->properties);
210
0
        flb_sds_printf(&buf, "\n");
211
0
    }
212
213
0
    return buf;
214
0
}
215
216
int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet)
217
0
{
218
0
    struct mk_list *head;
219
0
    struct flb_kv *keyval;
220
221
0
    if (!fleet) {
222
0
        flb_plg_error(ctx->ins, "invalid fleet input instance");
223
0
        return -1;
224
0
    }
225
226
0
    if (ctx->fleet_name) {
227
0
        flb_input_set_property(fleet, "fleet_name", ctx->fleet_name);
228
0
    }
229
230
0
    if (ctx->fleet_id) {
231
0
        flb_input_set_property(fleet, "fleet_id", ctx->fleet_id);
232
0
    }
233
234
0
    flb_input_set_property(fleet, "api_key", ctx->api_key);
235
0
    flb_input_set_property(fleet, "host", ctx->cloud_host);
236
0
    flb_input_set_property(fleet, "port", ctx->cloud_port);
237
0
    flb_input_set_property(fleet, "config_dir", ctx->fleet_config_dir);
238
0
    flb_input_set_property(fleet, "fleet_config_legacy_format", ctx->fleet_config_legacy_format == 1 ? "on" : "off");
239
240
    /* Set TLS properties */
241
0
    flb_input_set_property(fleet, "tls", ctx->cloud_tls == 1 ? "on" : "off");
242
0
    flb_input_set_property(fleet, "tls.verify", ctx->cloud_tls_verify == 1 ? "on" : "off");
243
244
    /* Optional configurations */
245
0
    if (ctx->fleet_max_http_buffer_size) {
246
0
        flb_input_set_property(fleet, "max_http_buffer_size", ctx->fleet_max_http_buffer_size);
247
0
    }
248
249
0
    if (ctx->machine_id) {
250
0
        flb_input_set_property(fleet, "machine_id", ctx->machine_id);
251
0
    }
252
253
0
    if (ctx->fleet_interval_sec) {
254
0
        flb_input_set_property(fleet, "interval_sec", ctx->fleet_interval_sec);
255
0
    }
256
257
0
    if (ctx->fleet_interval_nsec) {
258
0
        flb_input_set_property(fleet, "interval_nsec", ctx->fleet_interval_nsec);
259
0
    }
260
261
0
    mk_list_foreach(head, &ctx->ins->net_properties) {
262
0
        keyval = mk_list_entry(head, struct flb_kv, _head);
263
0
        flb_debug("set fleet net property: %s=%s", keyval->key, keyval->val);
264
0
        flb_input_set_property(fleet, keyval->key, keyval->val);
265
0
    }
266
267
0
    return 0;
268
0
}
269
270
static struct flb_output_instance *setup_cloud_output(struct flb_config *config, struct calyptia *ctx)
271
0
{
272
0
    int ret;
273
0
    struct flb_output_instance *cloud;
274
0
    struct mk_list *head;
275
0
    struct flb_slist_entry *key = NULL;
276
0
    struct flb_slist_entry *val = NULL;
277
0
    flb_sds_t label;
278
0
    struct flb_config_map_val *mv;
279
0
    struct flb_kv *keyval;
280
281
0
    cloud = flb_output_new(config, "calyptia", ctx, FLB_FALSE);
282
283
0
    if (!cloud) {
284
0
        flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector");
285
0
        return NULL;
286
0
    }
287
288
    /* direct connect / routing */
289
0
    ret = flb_router_connect_direct(ctx->i, cloud);
290
291
0
    if (ret != 0) {
292
0
        flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector");
293
0
        return NULL;
294
0
    }
295
296
0
    if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
297
298
        /* iterate all 'add_label' definitions */
299
0
        flb_config_map_foreach(head, mv, ctx->add_labels) {
300
0
            key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
301
0
            val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
302
303
0
            label = flb_sds_create_size(strlen(key->str) + strlen(val->str) + 1);
304
305
0
            if (!label) {
306
0
                return NULL;
307
0
            }
308
309
0
            flb_sds_printf(&label, "%s %s", key->str, val->str);
310
0
            flb_output_set_property(cloud, "add_label", label);
311
0
            flb_sds_destroy(label);
312
0
        }
313
0
    }
314
315
0
    flb_output_set_property(cloud, "match", "_calyptia_cloud");
316
0
    flb_output_set_property(cloud, "api_key", ctx->api_key);
317
318
0
    if (ctx->register_retry_on_flush) {
319
0
        flb_output_set_property(cloud, "register_retry_on_flush", "true");
320
0
    } else {
321
0
        flb_output_set_property(cloud, "register_retry_on_flush", "false");
322
0
    }
323
324
0
    if (ctx->store_path) {
325
0
        flb_output_set_property(cloud, "store_path", ctx->store_path);
326
0
    }
327
328
0
    if (ctx->machine_id) {
329
0
        flb_output_set_property(cloud, "machine_id", ctx->machine_id);
330
0
    }
331
332
    /* Override network details: development purposes only */
333
0
    if (ctx->cloud_host) {
334
0
        flb_output_set_property(cloud, "cloud_host", ctx->cloud_host);
335
0
    }
336
337
0
    if (ctx->cloud_port) {
338
0
        flb_output_set_property(cloud, "cloud_port", ctx->cloud_port);
339
0
    }
340
341
0
    if (ctx->cloud_tls) {
342
0
        flb_output_set_property(cloud, "tls", "true");
343
0
    }
344
0
    else {
345
0
        flb_output_set_property(cloud, "tls", "false");
346
0
    }
347
348
0
    if (ctx->cloud_tls_verify) {
349
0
        flb_output_set_property(cloud, "tls.verify", "true");
350
0
    }
351
0
    else {
352
0
        flb_output_set_property(cloud, "tls.verify", "false");
353
0
    }
354
355
0
    if (ctx->fleet_id) {
356
0
        label = flb_sds_create_size(strlen("fleet_id") + strlen(ctx->fleet_id) + 1);
357
358
0
        if (!label) {
359
0
            return NULL;
360
0
        }
361
362
0
        flb_sds_printf(&label, "fleet_id %s", ctx->fleet_id);
363
0
        flb_output_set_property(cloud, "add_label", label);
364
0
        flb_sds_destroy(label);
365
0
    }
366
367
0
    mk_list_foreach(head, &ctx->ins->net_properties) {
368
0
        keyval = mk_list_entry(head, struct flb_kv, _head);
369
0
        flb_debug("set cloud net property: %s=%s", keyval->key, keyval->val);
370
0
        flb_output_set_property(cloud, keyval->key, keyval->val);
371
0
    }
372
373
0
#ifdef FLB_HAVE_CHUNK_TRACE
374
0
    flb_output_set_property(cloud, "pipeline_id", ctx->pipeline_id);
375
0
#endif /* FLB_HAVE_CHUNK_TRACE */
376
377
0
    return cloud;
378
0
}
379
380
static flb_sds_t sha256_to_hex(unsigned char *sha256)
381
0
{
382
0
    int idx;
383
0
    flb_sds_t hex;
384
0
    flb_sds_t tmp;
385
386
0
    hex = flb_sds_create_size(64);
387
388
0
    if (!hex) {
389
0
        return NULL;
390
0
    }
391
392
0
    for (idx = 0; idx < 32; idx++) {
393
0
        tmp = flb_sds_printf(&hex, "%02x", sha256[idx]);
394
395
0
        if (!tmp) {
396
0
            flb_sds_destroy(hex);
397
0
            return NULL;
398
0
        }
399
400
0
        hex = tmp;
401
0
    }
402
403
0
    flb_sds_len_set(hex, 64);
404
0
    return hex;
405
0
}
406
407
static flb_sds_t generate_base_agent_directory(struct calyptia *ctx, flb_sds_t *fleet_dir)
408
0
{
409
0
    flb_sds_t ret = NULL;
410
411
0
    if (ctx == NULL || fleet_dir == NULL) {
412
0
        return NULL;
413
0
    }
414
415
0
    if (*fleet_dir == NULL) {
416
0
        *fleet_dir = flb_sds_create_size(CALYPTIA_MAX_DIR_SIZE);
417
0
        if (*fleet_dir == NULL) {
418
0
            return NULL;
419
0
        }
420
0
    }
421
422
0
    ret = flb_sds_printf(fleet_dir, "%s", ctx->fleet_config_dir);
423
0
    if (ret == NULL) {
424
0
        flb_sds_destroy(*fleet_dir);
425
0
        return NULL;
426
0
    }
427
428
0
    return ret;
429
0
}
430
431
flb_sds_t agent_config_filename(struct calyptia *ctx, char *fname)
432
0
{
433
0
    flb_sds_t cfgname = NULL;
434
0
    flb_sds_t ret;
435
436
0
    if (ctx == NULL || fname == NULL) {
437
0
        return NULL;
438
0
    }
439
440
0
    if (generate_base_agent_directory(ctx, &cfgname) == NULL) {
441
0
        return NULL;
442
0
    }
443
444
0
    ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.conf", fname);
445
0
    if (ret == NULL) {
446
0
        flb_sds_destroy(cfgname);
447
0
        return NULL;
448
0
    }
449
450
0
    return cfgname;
451
0
}
452
453
0
static char* generate_uuid() {
454
0
    char* uuid = flb_malloc(UUID_BUFFER_SIZE);
455
0
    if (!uuid) {
456
0
        flb_errno();
457
0
        return NULL;
458
0
    }
459
460
    /* create new UUID for fleet */
461
0
    if (flb_utils_uuid_v4_gen(uuid) != 0 || strlen(uuid) == 0) {
462
0
        flb_free(uuid);
463
0
        return NULL;
464
0
    }
465
0
    return uuid;
466
0
}
467
468
0
static int write_uuid_to_file(flb_sds_t fleet_machine_id, char* uuid) {
469
0
    int fd;
470
0
    size_t uuid_len;
471
472
0
    if (fleet_machine_id == NULL || uuid == NULL) {
473
0
        return FLB_FALSE;
474
0
    }
475
476
    /* write uuid to file */
477
0
    fd = flb_open(fleet_machine_id, O_CREAT | O_WRONLY | O_TRUNC, 0666);
478
0
    if (fd == -1) {
479
0
        return FLB_FALSE;
480
0
    }
481
482
0
    uuid_len = strlen(uuid);
483
484
0
    if (flb_write(fd, uuid, uuid_len) != uuid_len) {
485
0
        flb_close(fd);
486
0
        return FLB_FALSE;
487
0
    }
488
489
0
    flb_close(fd);
490
0
    return FLB_TRUE;
491
0
}
492
493
static int create_agent_directory(struct calyptia *ctx)
494
0
{
495
0
    if( ctx == NULL ) {
496
0
        return -1;
497
0
    }
498
499
    /* If it exists just return */
500
0
    if (access(ctx->fleet_config_dir, F_OK) == 0) {
501
0
        return 0;
502
0
    }
503
504
    /* Create the directory if it does not exist */
505
0
    if (flb_utils_mkdir(ctx->fleet_config_dir, 0700) != 0) {
506
0
        flb_plg_error(ctx->ins, "failed to create directory: %s", ctx->fleet_config_dir);
507
0
        return -1;
508
0
    }
509
510
0
    return 0;
511
0
}
512
513
flb_sds_t get_machine_id(struct calyptia *ctx)
514
0
{
515
0
    int ret = -1;
516
0
    char *buf = NULL;
517
0
    size_t blen = 0;
518
0
    unsigned char sha256_buf[64] = {0};
519
520
#if defined(FLB_SYSTEM_WINDOWS)
521
    /* retrieve raw machine id */
522
    ret = flb_utils_get_machine_id(&buf, &blen);
523
#else
524
    /* /etc/machine-id is not guaranteed to be unique so we generate one */
525
0
    flb_sds_t fleet_machine_id = NULL;
526
527
    /** ensure we have the directory created */
528
0
    if (create_agent_directory(ctx) != 0) {
529
0
        return NULL;
530
0
    }
531
532
    /** now get the agent filename */
533
0
    fleet_machine_id = machine_id_fleet_config_filename(ctx);
534
0
    if (fleet_machine_id == NULL) {
535
0
        flb_plg_error(ctx->ins, "unable to allocate machine id file");
536
0
        return NULL;
537
0
    }
538
539
    /* check if the file exists first, if it does not we generate a UUID */
540
0
    if (flb_access(fleet_machine_id, F_OK) != 0) {
541
542
        /* create new UUID for fleet */
543
0
        buf = generate_uuid();
544
0
        if( buf == NULL ) {
545
0
            flb_plg_error(ctx->ins, "failed to create uuid for fleet machine id");
546
0
            flb_sds_destroy(fleet_machine_id);
547
0
            return NULL;
548
0
        }
549
0
        flb_plg_info(ctx->ins, "generated UUID for machine ID: %s", buf);
550
551
        /* write uuid to file */
552
0
        if (write_uuid_to_file(fleet_machine_id, buf ) != FLB_TRUE) {
553
0
            flb_plg_error(ctx->ins, "failed to write fleet machine id file: %s", fleet_machine_id);
554
0
            flb_free(buf);
555
0
            flb_sds_destroy(fleet_machine_id);
556
0
            return NULL;
557
0
        }
558
559
0
        flb_free(buf);
560
0
        buf = NULL;
561
562
0
        flb_plg_info(ctx->ins, "written machine ID to file: %s", fleet_machine_id);
563
0
    }
564
565
    /* now check file exists (it always should) and read from it */
566
0
    if (flb_access(fleet_machine_id, F_OK) == 0) {
567
0
        ret = flb_utils_read_file_wrapper(fleet_machine_id, &buf, &blen);
568
0
        if (ret != 0) {
569
0
            flb_plg_error(ctx->ins, "failed to read fleet machine id file: %s", fleet_machine_id);
570
0
            flb_sds_destroy(fleet_machine_id);
571
0
            return NULL;
572
0
        }
573
0
        flb_plg_info(ctx->ins, "read UUID (%s) from file: %s", buf, fleet_machine_id);
574
0
    }
575
0
    else { /* fall back to machine-id */
576
0
        flb_plg_warn(ctx->ins, "unable to get uuid from file (%s) so falling back to machine id", fleet_machine_id);
577
0
        ret = flb_utils_get_machine_id(&buf, &blen);
578
0
    }
579
580
    /* Clean up no longer required filename */
581
0
    flb_sds_destroy(fleet_machine_id);
582
0
#endif
583
584
0
    if (ret == -1) {
585
0
        flb_plg_error(ctx->ins, "could not obtain machine id");
586
0
        return NULL;
587
0
    }
588
589
0
    ret = flb_hash_simple(FLB_HASH_SHA256,
590
0
                          (unsigned char *) buf,
591
0
                          blen,
592
0
                          sha256_buf,
593
0
                          sizeof(sha256_buf));
594
0
    flb_free(buf);
595
596
0
    if (ret != FLB_CRYPTO_SUCCESS) {
597
0
        return NULL;
598
0
    }
599
600
    /* convert to hex */
601
0
    return sha256_to_hex(sha256_buf);
602
0
}
603
604
static int cb_calyptia_init(struct flb_custom_instance *ins,
605
                         struct flb_config *config,
606
                         void *data)
607
0
{
608
0
    int ret;
609
0
    struct calyptia *ctx;
610
0
    (void) data;
611
612
0
    ctx = flb_calloc(1, sizeof(struct calyptia));
613
0
    if (!ctx) {
614
0
        flb_errno();
615
0
        return -1;
616
0
    }
617
0
    ctx->ins = ins;
618
619
    /* Load the config map */
620
0
    ret = flb_custom_config_map_set(ins, (void *) ctx);
621
0
    if (ret == -1) {
622
0
        flb_free(ctx);
623
0
        return -1;
624
0
    }
625
626
    /* map instance and local context */
627
0
    flb_custom_set_context(ins, ctx);
628
629
    /* If no machine_id has been provided via a configuration option get it from the local machine-id. */
630
0
    if (!ctx->machine_id) {
631
0
        ctx->machine_id = get_machine_id(ctx);
632
0
        if (ctx->machine_id == NULL) {
633
0
            flb_plg_error(ctx->ins, "unable to retrieve machine_id");
634
0
            flb_free(ctx);
635
0
            return -1;
636
0
        }
637
0
        ctx->machine_id_auto_configured = FLB_TRUE;
638
0
    }
639
640
    /* input collector */
641
0
    ctx->i = flb_input_new(config, "fluentbit_metrics", NULL, FLB_TRUE);
642
0
    if (!ctx->i) {
643
0
        flb_plg_error(ctx->ins, "could not load metrics collector");
644
0
        flb_free(ctx);
645
0
        return -1;
646
0
    }
647
648
0
    flb_input_set_property(ctx->i, "tag", "_calyptia_cloud");
649
0
    flb_input_set_property(ctx->i, "scrape_on_start", "true");
650
    // This scrape interval should be configurable.
651
0
    flb_input_set_property(ctx->i, "scrape_interval", "30");
652
653
    /* Setup cloud output if needed */
654
0
    if (ctx->fleet_id != NULL || !ctx->fleet_name) {
655
0
        ctx->o = setup_cloud_output(config, ctx);
656
0
        if (ctx->o == NULL) {
657
0
            flb_free(ctx);
658
0
            return -1;
659
0
        }
660
        /* Set fleet_id for output if present */
661
0
        if (ctx->fleet_id) {
662
0
            flb_output_set_property(ctx->o, "fleet_id", ctx->fleet_id);
663
0
        }
664
0
    }
665
666
    /* Setup fleet input if needed */
667
0
    if (ctx->fleet_id || ctx->fleet_name) {
668
0
        ctx->fleet = flb_input_new(config, "calyptia_fleet", NULL, FLB_FALSE);
669
0
        if (!ctx->fleet) {
670
0
            flb_plg_error(ctx->ins, "could not load Calyptia Fleet plugin");
671
0
            return -1;
672
0
        }
673
674
0
        ret = set_fleet_input_properties(ctx, ctx->fleet);
675
0
        if (ret == -1) {
676
0
            return -1;
677
0
        }
678
0
    }
679
680
0
    if (ctx->o) {
681
0
        flb_router_connect(ctx->i, ctx->o);
682
0
    }
683
684
0
    flb_plg_info(ins, "custom initialized!");
685
0
    return 0;
686
0
}
687
688
static int cb_calyptia_exit(void *data, struct flb_config *config)
689
0
{
690
0
    struct calyptia *ctx = data;
691
692
0
    if (!ctx) {
693
0
        return 0;
694
0
    }
695
696
0
    if (ctx->machine_id && ctx->machine_id_auto_configured) {
697
0
        flb_sds_destroy(ctx->machine_id);
698
0
    }
699
700
0
    flb_free(ctx);
701
0
    return 0;
702
0
}
703
704
/* Configuration properties map */
705
static struct flb_config_map config_map[] = {
706
    {
707
     FLB_CONFIG_MAP_STR, "api_key", NULL,
708
     0, FLB_TRUE, offsetof(struct calyptia, api_key),
709
     "Calyptia Cloud API Key."
710
    },
711
712
    {
713
     FLB_CONFIG_MAP_STR, "store_path", NULL,
714
     0, FLB_TRUE, offsetof(struct calyptia, store_path)
715
    },
716
717
    {
718
     FLB_CONFIG_MAP_STR, "calyptia_host", DEFAULT_CALYPTIA_HOST,
719
     0, FLB_TRUE, offsetof(struct calyptia, cloud_host),
720
     ""
721
    },
722
723
    {
724
     FLB_CONFIG_MAP_STR, "calyptia_port", DEFAULT_CALYPTIA_PORT,
725
     0, FLB_TRUE, offsetof(struct calyptia, cloud_port),
726
     ""
727
    },
728
729
    {
730
     FLB_CONFIG_MAP_BOOL, "calyptia_tls", "true",
731
     0, FLB_TRUE, offsetof(struct calyptia, cloud_tls),
732
     ""
733
    },
734
735
    {
736
     FLB_CONFIG_MAP_BOOL, "calyptia_tls.verify", "true",
737
     0, FLB_TRUE, offsetof(struct calyptia, cloud_tls_verify),
738
     ""
739
    },
740
741
    {
742
     FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
743
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct calyptia, add_labels),
744
     "Label to append to the generated metric."
745
    },
746
747
    {
748
     FLB_CONFIG_MAP_STR, "machine_id", NULL,
749
     0, FLB_TRUE, offsetof(struct calyptia, machine_id),
750
     "Custom machine_id to be used when registering agent"
751
    },
752
    {
753
     FLB_CONFIG_MAP_STR, "fleet_id", NULL,
754
     0, FLB_TRUE, offsetof(struct calyptia, fleet_id),
755
     "Fleet id to be used when registering agent in a fleet"
756
    },
757
    {
758
     FLB_CONFIG_MAP_STR, "fleet.config_dir", FLEET_DEFAULT_CONFIG_DIR,
759
     0, FLB_TRUE, offsetof(struct calyptia, fleet_config_dir),
760
     "Base path for the configuration directory."
761
    },
762
    {
763
      FLB_CONFIG_MAP_STR, "fleet.interval_sec", "-1",
764
      0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_sec),
765
      "Set the collector interval"
766
    },
767
    {
768
      FLB_CONFIG_MAP_STR, "fleet.interval_nsec", "-1",
769
      0, FLB_TRUE, offsetof(struct calyptia, fleet_interval_nsec),
770
      "Set the collector interval (nanoseconds)"
771
    },
772
    {
773
      FLB_CONFIG_MAP_STR, "fleet.max_http_buffer_size", NULL,
774
      0, FLB_TRUE, offsetof(struct calyptia, fleet_max_http_buffer_size),
775
      "Max HTTP buffer size for fleet"
776
    },
777
    {
778
     FLB_CONFIG_MAP_STR, "fleet_name", NULL,
779
     0, FLB_TRUE, offsetof(struct calyptia, fleet_name),
780
     "Fleet name to be used when registering agent in a fleet"
781
    },
782
783
#ifdef FLB_HAVE_CHUNK_TRACE
784
    {
785
     FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
786
     0, FLB_TRUE, offsetof(struct calyptia, pipeline_id),
787
     "Pipeline ID for reporting to calyptia cloud."
788
    },
789
#endif /* FLB_HAVE_CHUNK_TRACE */
790
    {
791
     FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
792
     0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
793
     "Retry agent registration on flush if failed on init."
794
    },
795
    {
796
     FLB_CONFIG_MAP_BOOL, "fleet_config_legacy_format", "true",
797
     0, FLB_TRUE, offsetof(struct calyptia, fleet_config_legacy_format),
798
     "If set, use legacy (TOML) format for configuration files."
799
    },
800
    /* EOF */
801
    {0}
802
};
803
804
struct flb_custom_plugin custom_calyptia_plugin = {
805
    .name         = "calyptia",
806
    .description  = "Calyptia Cloud",
807
    .config_map   = config_map,
808
    .cb_init      = cb_calyptia_init,
809
    .cb_exit      = cb_calyptia_exit,
810
    .flags        = FLB_CUSTOM_NET_CLIENT,
811
};