Coverage Report

Created: 2023-11-27 07:46

/src/fluent-bit/plugins/in_calyptia_fleet/in_calyptia_fleet.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2023 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <stdio.h>
21
#include <stdlib.h>
22
#include <time.h>
23
#include <signal.h>
24
#include <sys/stat.h>
25
26
#include <msgpack.h>
27
#include <fluent-bit/flb_input.h>
28
#include <fluent-bit/flb_input_plugin.h>
29
#include <fluent-bit/flb_config.h>
30
#include <fluent-bit/flb_config_map.h>
31
#include <fluent-bit/flb_error.h>
32
#include <fluent-bit/flb_time.h>
33
#include <fluent-bit/flb_pack.h>
34
#include <fluent-bit/flb_strptime.h>
35
#include <fluent-bit/flb_reload.h>
36
#include <fluent-bit/flb_lib.h>
37
#include <fluent-bit/config_format/flb_cf_fluentbit.h>
38
#include <fluent-bit/flb_base64.h>
39
40
41
0
#define CALYPTIA_H_PROJECT       "X-Project-Token"
42
#define CALYPTIA_H_CTYPE         "Content-Type"
43
#define CALYPTIA_H_CTYPE_JSON    "application/json"
44
45
0
#define DEFAULT_INTERVAL_SEC  "15"
46
0
#define DEFAULT_INTERVAL_NSEC "0"
47
48
#define CALYPTIA_HOST "cloud-api.calyptia.com"
49
#define CALYPTIA_PORT "443"
50
51
#ifndef _WIN32
52
0
#define PATH_SEPARATOR "/"
53
#define DEFAULT_CONFIG_DIR "/tmp/calyptia-fleet"
54
#else
55
#define DEFAULT_CONFIG_DIR NULL
56
#define PATH_SEPARATOR "\\"
57
#endif
58
59
struct flb_in_calyptia_fleet_config {
60
    /* Time interval check */
61
    int interval_sec;
62
    int interval_nsec;
63
64
    /* Grabbed from the cfg_path, used to check if configuration has
65
     * has been updated.
66
     */
67
    long config_timestamp;
68
69
    flb_sds_t api_key;
70
71
    flb_sds_t fleet_id;
72
    /* flag used to mark fleet_id for release when found automatically. */
73
    int fleet_id_found;
74
75
    flb_sds_t fleet_name;
76
    flb_sds_t machine_id;
77
    flb_sds_t config_dir;
78
    flb_sds_t cloud_host;
79
    flb_sds_t cloud_port;
80
81
    flb_sds_t fleet_url;
82
83
    struct flb_input_instance *ins;       /* plugin instance */
84
    struct flb_config *config;            /* Fluent Bit context */
85
86
    /* Networking */
87
    struct flb_upstream *u;
88
89
    int collect_fd;
90
};
91
92
static char *find_case_header(struct flb_http_client *cli, const char *header)
93
0
{
94
0
    char *ptr;
95
0
    char *headstart;
96
97
98
0
    headstart = strstr(cli->resp.data, "\r\n");
99
100
0
    if (headstart == NULL) {
101
0
        return NULL;
102
0
    }
103
104
    /* Lookup the beginning of the header */
105
0
    for (ptr = headstart; ptr != NULL && ptr+2 < cli->resp.payload; ptr = strstr(ptr, "\r\n")) {
106
107
0
        if (ptr + 4 < cli->resp.payload && strcmp(ptr, "\r\n\r\n") == 0) {
108
0
            return NULL;
109
0
        }
110
111
0
        ptr+=2;
112
113
        /* no space left for header */
114
0
        if (ptr + strlen(header)+2 >= cli->resp.payload) {
115
0
            return NULL;
116
0
        }
117
118
        /* matched header and the delimiter */
119
0
        if (strncasecmp(ptr, header, strlen(header)) == 0) {
120
121
0
            if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') {
122
0
                return ptr;                
123
0
            }
124
0
        }
125
0
    }
126
127
0
    return NULL;
128
0
}
129
130
/* Try to find a header value in the buffer. Copied from flb_http_client.c. */
131
static int case_header_lookup(struct flb_http_client *cli,
132
                         const char *header, int header_len,
133
                         const char **out_val, int *out_len)
134
0
{
135
0
    char *ptr;
136
0
    char *crlf;
137
0
    char *end;
138
139
0
    if (!cli->resp.data) {
140
0
        return -1;
141
0
    }
142
143
0
    ptr = find_case_header(cli, header);
144
0
    end = strstr(cli->resp.data, "\r\n\r\n");
145
146
0
    if (!ptr) {
147
148
0
        if (end) {
149
            /* The headers are complete but the header is not there */
150
0
            return -1;
151
0
        }
152
153
        /* We need more data */
154
0
        return -1;
155
0
    }
156
157
    /* Exclude matches in the body */
158
0
    if (end && ptr > end) {
159
0
        return -1;
160
0
    }
161
162
    /* Lookup CRLF (end of line \r\n) */
163
0
    crlf = strstr(ptr, "\r\n");
164
165
0
    if (!crlf) {
166
0
        return -1;
167
0
    }
168
169
    /* sanity check that the header_len does not exceed the headers. */
170
0
    if (ptr + header_len + 2 > end) {
171
0
        return -1;
172
0
    }
173
174
0
    ptr += header_len + 2;
175
176
0
    *out_val = ptr;
177
0
    *out_len = (crlf - ptr);
178
179
0
    return 0;
180
0
}
181
182
struct reload_ctx {
183
    flb_ctx_t *flb;
184
    flb_sds_t cfg_path;
185
};
186
187
static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname)
188
0
{
189
0
    flb_sds_t cfgname;
190
191
0
    cfgname = flb_sds_create_size(4096);
192
193
0
    if (ctx->fleet_name != NULL) {
194
0
        flb_sds_printf(&cfgname,
195
0
                       "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini",
196
0
                       ctx->config_dir, ctx->machine_id, ctx->fleet_name, fname);
197
0
    }
198
0
    else {
199
0
        flb_sds_printf(&cfgname,
200
0
                       "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini",
201
0
                       ctx->config_dir, ctx->machine_id, ctx->fleet_id, fname);
202
0
    }
203
204
0
    return cfgname;
205
0
}
206
207
static flb_sds_t new_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
208
0
{
209
0
    return fleet_config_filename(ctx, "new");
210
0
}
211
212
static flb_sds_t cur_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
213
0
{
214
0
    return fleet_config_filename(ctx, "cur");
215
0
}
216
217
static flb_sds_t old_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx)
218
0
{
219
0
    return fleet_config_filename(ctx, "old");
220
0
}
221
222
static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t)
223
0
{
224
0
    char s_last_modified[32];
225
226
0
    snprintf(s_last_modified, sizeof(s_last_modified)-1, "%d", (int)t);
227
0
    return fleet_config_filename(ctx, s_last_modified);
228
0
}
229
230
static int is_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
231
0
{
232
0
    flb_sds_t cfgnewname;
233
0
    int ret = FLB_FALSE;
234
235
236
0
    if (cfg->conf_path_file == NULL) {
237
0
        return FLB_FALSE;
238
0
    }
239
240
0
    cfgnewname = new_fleet_config_filename(ctx);
241
242
0
    if (strcmp(cfgnewname, cfg->conf_path_file) == 0) {
243
0
        ret = FLB_TRUE;
244
0
    }
245
246
0
    flb_sds_destroy(cfgnewname);
247
248
0
    return ret;
249
0
}
250
251
static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
252
0
{
253
0
    flb_sds_t cfgcurname;
254
0
    int ret = FLB_FALSE;
255
256
257
0
    if (cfg->conf_path_file == NULL) {
258
0
        return FLB_FALSE;
259
0
    }
260
261
0
    cfgcurname = cur_fleet_config_filename(ctx);
262
263
0
    if (strcmp(cfgcurname, cfg->conf_path_file) == 0) {
264
0
        ret = FLB_TRUE;
265
0
    }
266
267
0
    flb_sds_destroy(cfgcurname);
268
269
0
    return ret;
270
0
}
271
272
static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
273
0
{
274
0
    flb_sds_t cfgcurname;
275
0
    int ret = FLB_FALSE;
276
277
278
0
    if (cfg == NULL) {
279
0
        return FLB_FALSE;
280
0
    }
281
282
0
    if (cfg->conf_path_file == NULL) {
283
0
        return FLB_FALSE;
284
0
    }
285
286
0
    cfgcurname = old_fleet_config_filename(ctx);
287
0
    if (cfgcurname == NULL) {
288
0
        flb_plg_error(ctx->ins, "unable to allocate configuration name");
289
0
        return FLB_FALSE;
290
0
    }
291
292
0
    if (strcmp(cfgcurname, cfg->conf_path_file) == 0) {
293
0
        ret = FLB_TRUE;
294
0
    }
295
296
0
    flb_sds_destroy(cfgcurname);
297
298
0
    return ret;
299
0
}
300
301
static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
302
0
{
303
0
    char *fname;
304
0
    char *end;
305
0
    long val;
306
307
0
    if (cfg->conf_path_file == NULL) {
308
0
        return FLB_FALSE;
309
0
    }
310
311
0
    fname = strrchr(cfg->conf_path_file, PATH_SEPARATOR[0]);
312
313
0
    if (fname == NULL) {
314
0
        return FLB_FALSE;
315
0
    }
316
317
0
    fname++;
318
319
0
    errno = 0;
320
0
    val = strtol(fname, &end, 10);
321
322
0
    if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) ||
323
0
         (errno != 0 && val == 0)) {
324
0
        flb_errno();
325
0
        return FLB_FALSE;
326
0
    }
327
328
0
    if (strcmp(end, ".ini") == 0) {
329
0
        return FLB_TRUE;
330
0
    }
331
332
0
    return FLB_FALSE;
333
0
}
334
335
static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
336
0
{
337
0
    if (cfg->conf_path_file == NULL) {
338
0
        return FLB_FALSE;
339
0
    }
340
341
0
    return is_new_fleet_config(ctx, cfg) ||
342
0
           is_cur_fleet_config(ctx, cfg) ||
343
0
           is_old_fleet_config(ctx, cfg) ||
344
0
           is_timestamped_fleet_config(ctx, cfg);
345
0
}
346
347
static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
348
0
{
349
0
    flb_sds_t cfgnewname;
350
0
    int ret = FLB_FALSE;
351
352
353
0
    cfgnewname = new_fleet_config_filename(ctx);
354
0
    ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
355
356
0
    flb_sds_destroy(cfgnewname);
357
0
    return ret;
358
0
}
359
360
static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
361
0
{
362
0
    flb_sds_t cfgcurname;
363
0
    int ret = FLB_FALSE;
364
365
366
0
    cfgcurname = cur_fleet_config_filename(ctx);
367
0
    ret = access(cfgcurname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
368
369
0
    flb_sds_destroy(cfgcurname);
370
0
    return ret;
371
0
}
372
373
static void *do_reload(void *data)
374
0
{
375
0
    struct reload_ctx *reload = (struct reload_ctx *)data;
376
377
    /* avoid reloading the current configuration... just use our new one! */
378
0
    flb_context_set(reload->flb);
379
0
    reload->flb->config->enable_hot_reload = FLB_TRUE;
380
0
    reload->flb->config->conf_path_file = reload->cfg_path;
381
382
0
    flb_free(reload);
383
0
    sleep(5);
384
0
#ifndef FLB_SYSTEM_WINDOWS
385
0
    kill(getpid(), SIGHUP);
386
#else
387
  GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0);
388
#endif
389
0
    return NULL;
390
0
}
391
392
static int test_config_is_valid(flb_sds_t cfgpath)
393
0
{
394
0
    struct flb_cf *conf;
395
0
    int ret = FLB_FALSE;
396
397
398
0
    conf = flb_cf_create();
399
400
0
    if (conf == NULL) {
401
0
        goto config_init_error;
402
0
    } 
403
404
0
    conf = flb_cf_create_from_file(conf, cfgpath);
405
406
0
    if (conf == NULL) {
407
0
        goto cf_create_from_file_error;
408
0
    } 
409
410
0
    ret = FLB_TRUE;
411
412
0
cf_create_from_file_error:
413
0
    flb_cf_destroy(conf);
414
0
config_init_error:
415
0
    return ret;
416
0
}
417
418
static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgpath)
419
0
{
420
0
    struct reload_ctx *reload;
421
0
    pthread_t pth;
422
0
    pthread_attr_t ptha;
423
0
    flb_ctx_t *flb = flb_context_get();
424
425
0
    if (ctx->collect_fd > 0) {
426
0
        flb_input_collector_pause(ctx->collect_fd, ctx->ins);
427
0
    }
428
429
0
    if (flb == NULL) {
430
0
        flb_plg_error(ctx->ins, "unable to get fluent-bit context.");
431
432
0
        if (ctx->collect_fd > 0) {
433
0
            flb_input_collector_resume(ctx->collect_fd, ctx->ins);
434
0
        }
435
436
0
        flb_sds_destroy(cfgpath);
437
0
        return FLB_FALSE;
438
0
    }
439
440
    /* fix execution in valgrind...
441
     * otherwise flb_reload errors out with:
442
     *    [error] [reload] given flb context is NULL
443
     */
444
0
    flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath);
445
446
0
    if (test_config_is_valid(cfgpath) == FLB_FALSE) {
447
0
        flb_plg_error(ctx->ins, "unable to load configuration.");
448
449
0
        if (ctx->collect_fd > 0) {
450
0
            flb_input_collector_resume(ctx->collect_fd, ctx->ins);
451
0
        }
452
453
0
        flb_sds_destroy(cfgpath);
454
0
        return FLB_FALSE;
455
0
    }
456
457
0
    reload = flb_calloc(1, sizeof(struct reload_ctx));
458
0
    reload->flb = flb;
459
0
    reload->cfg_path = cfgpath;
460
461
0
    pthread_attr_init(&ptha);
462
0
    pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED);
463
0
    pthread_create(&pth, &ptha, do_reload, reload);
464
465
0
    return FLB_TRUE;
466
0
}
467
468
static char *tls_setting_string(int use_tls)
469
0
{
470
0
    if (use_tls) {
471
0
        return "On";
472
0
    }
473
474
0
    return "Off";
475
0
}
476
477
static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx,
478
                                    char *payload, size_t size)
479
0
{
480
0
    int ret;
481
0
    int out_size;
482
0
    char *pack;
483
0
    struct flb_pack_state pack_state;
484
0
    size_t off = 0;
485
0
    msgpack_unpacked result;
486
0
    msgpack_object_kv *cur;
487
0
    msgpack_object_str *key;
488
0
    flb_sds_t project_id;
489
0
    int idx = 0;
490
491
    /* Initialize packer */
492
0
    flb_pack_state_init(&pack_state);
493
494
    /* Pack JSON as msgpack */
495
0
    ret = flb_pack_json_state(payload, size,
496
0
                              &pack, &out_size, &pack_state);
497
0
    flb_pack_state_reset(&pack_state);
498
499
    /* Handle exceptions */
500
0
    if (ret == FLB_ERR_JSON_PART) {
501
0
        flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
502
0
        return NULL;
503
0
    }
504
0
    else if (ret == FLB_ERR_JSON_INVAL) {
505
0
        flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
506
0
        return NULL;
507
0
    }
508
0
    else if (ret == -1) {
509
0
        return NULL;
510
0
    }
511
512
0
    msgpack_unpacked_init(&result);
513
0
    while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) {
514
515
0
        if (result.data.type == MSGPACK_OBJECT_MAP) {
516
0
            for (idx = 0; idx < result.data.via.map.size; idx++) {
517
0
                cur = &result.data.via.map.ptr[idx];
518
0
                key = &cur->key.via.str;
519
520
0
                if (strncmp(key->ptr, "ProjectID", key->size) == 0) {
521
522
0
                    if (cur->val.type != MSGPACK_OBJECT_STR) {
523
0
                        flb_plg_error(ctx->ins, "unable to find fleet by name");
524
0
                        msgpack_unpacked_destroy(&result);
525
0
                        return NULL;
526
0
                    }
527
528
0
                    project_id = flb_sds_create_len(cur->val.via.str.ptr, 
529
0
                                                    cur->val.via.str.size);
530
0
                    msgpack_unpacked_destroy(&result);
531
0
                    flb_free(pack);
532
533
0
                    return project_id;
534
0
                }
535
0
            }
536
0
        }
537
0
    }
538
539
0
    msgpack_unpacked_destroy(&result);
540
0
    flb_free(pack);
541
542
0
    return NULL;
543
0
}
544
545
static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx,
546
                                       char *payload, size_t size)
547
0
{
548
0
    int ret;
549
0
    int out_size;
550
0
    char *pack;
551
0
    struct flb_pack_state pack_state;
552
0
    size_t off = 0;
553
0
    msgpack_unpacked result;
554
0
    msgpack_object_array *results;
555
0
    msgpack_object_kv *cur;
556
0
    msgpack_object_str *key;
557
0
    int idx = 0;
558
559
    /* Initialize packer */
560
0
    flb_pack_state_init(&pack_state);
561
562
    /* Pack JSON as msgpack */
563
0
    ret = flb_pack_json_state(payload, size,
564
0
                              &pack, &out_size, &pack_state);
565
0
    flb_pack_state_reset(&pack_state);
566
567
    /* Handle exceptions */
568
0
    if (ret == FLB_ERR_JSON_PART) {
569
0
        flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping");
570
0
        return -1;
571
0
    }
572
0
    else if (ret == FLB_ERR_JSON_INVAL) {
573
0
        flb_plg_warn(ctx->ins, "invalid JSON message, skipping");
574
0
        return -1;
575
0
    }
576
0
    else if (ret == -1) {
577
0
        return -1;
578
0
    }
579
580
0
    msgpack_unpacked_init(&result);
581
0
    while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) {
582
583
0
        if (result.data.type == MSGPACK_OBJECT_ARRAY) {
584
0
            results = &result.data.via.array;
585
586
0
            if (results->ptr[0].type == MSGPACK_OBJECT_MAP) {
587
588
0
                for (idx = 0; idx < results->ptr[0].via.map.size; idx++) {
589
0
                    cur = &results->ptr[0].via.map.ptr[idx];
590
0
                    key = &cur->key.via.str;
591
592
0
                    if (strncasecmp(key->ptr, "id", key->size) == 0) {
593
594
0
                        if (cur->val.type != MSGPACK_OBJECT_STR) {
595
0
                            flb_plg_error(ctx->ins, "unable to find fleet by name");
596
0
                            msgpack_unpacked_destroy(&result);
597
0
                            return -1;
598
0
                        }
599
600
0
                        ctx->fleet_id_found = 1;
601
0
                        ctx->fleet_id = flb_sds_create_len(cur->val.via.str.ptr,
602
0
                                                           cur->val.via.str.size);
603
0
                        break;
604
0
                    }
605
0
                    break;
606
0
                }
607
0
                break;
608
0
            }
609
0
        }
610
0
    }
611
612
0
    msgpack_unpacked_destroy(&result);
613
0
    flb_free(pack);
614
615
0
    if (ctx->fleet_id == NULL) {
616
0
        return -1;
617
0
    }
618
619
0
    return 0;
620
0
}
621
622
static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx,
623
                                         struct flb_connection *u_conn,
624
                                         struct flb_config *config)
625
0
{
626
0
    struct flb_http_client *client;
627
0
    flb_sds_t url;
628
0
    flb_sds_t project_id;
629
0
    unsigned char token[512] = {0};
630
0
    unsigned char encoded[256];
631
0
    size_t elen;
632
0
    size_t tlen;
633
0
    char *api_token_sep;
634
0
    size_t b_sent;
635
0
    int ret;
636
637
0
    api_token_sep = strchr(ctx->api_key, '.');
638
639
0
    if (api_token_sep == NULL) {
640
0
        return -1;
641
0
    }
642
643
0
    elen = api_token_sep-ctx->api_key;
644
0
    elen = elen + (4 - (elen % 4));
645
646
0
    if (elen > sizeof(encoded)) {
647
0
        flb_plg_error(ctx->ins, "API Token is too large");
648
0
        return -1;
649
0
    }
650
651
0
    memset(encoded, '=', sizeof(encoded));
652
0
    memcpy(encoded, ctx->api_key, api_token_sep-ctx->api_key);
653
654
0
    ret = flb_base64_decode(token, sizeof(token)-1, &tlen,
655
0
                            encoded, elen); 
656
657
0
    if (ret != 0) {
658
0
        return ret;
659
0
    }
660
661
0
    project_id = parse_api_key_json(ctx, (char *)token, tlen);
662
663
0
    if (project_id == NULL) {
664
0
        return -1;
665
0
    }
666
667
0
    url = flb_sds_create_size(4096);
668
0
    flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", 
669
0
                   project_id, ctx->fleet_name);
670
671
0
    client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0,
672
0
                             ctx->ins->host.name, ctx->ins->host.port, NULL, 0);
673
674
0
    if (!client) {
675
0
        flb_plg_error(ctx->ins, "unable to create http client");
676
0
        return -1;
677
0
    }
678
679
0
    flb_http_buffer_size(client, 8192);
680
681
0
    flb_http_add_header(client,
682
0
                        CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
683
0
                        ctx->api_key, flb_sds_len(ctx->api_key));
684
685
0
    ret = flb_http_do(client, &b_sent);
686
687
0
    if (ret != 0) {
688
0
        flb_plg_error(ctx->ins, "http do error");
689
0
        flb_http_client_destroy(client);
690
0
        return -1;
691
0
    }
692
693
0
    if (client->resp.status != 200) {
694
0
        flb_plg_error(ctx->ins, "search http status code error: %d", client->resp.status);
695
0
        flb_http_client_destroy(client);
696
0
        return -1;
697
0
    }
698
699
0
    if (client->resp.payload_size <= 0) {
700
0
        flb_plg_error(ctx->ins, "empty response");
701
0
        flb_http_client_destroy(client);
702
0
        return -1;
703
0
    }
704
705
0
    if (parse_fleet_search_json(ctx, client->resp.payload, client->resp.payload_size) == -1) {
706
0
        flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
707
0
        flb_http_client_destroy(client);
708
0
        return -1;
709
0
    }
710
711
0
    if (ctx->fleet_id == NULL) {
712
0
        flb_http_client_destroy(client);
713
0
        return -1;
714
0
    }
715
716
0
    flb_http_client_destroy(client);
717
0
    return 0;
718
0
}
719
720
#ifdef FLB_SYSTEM_WINDOWS
721
#define link(a, b) CreateHardLinkA(b, a, 0)
722
723
ssize_t readlink(const char *path, char *realpath, size_t srealpath) {
724
    HANDLE hFile;
725
    DWORD ret;
726
727
    hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 
728
                       FILE_ATTRIBUTE_NORMAL, NULL);
729
730
    if (hFile == INVALID_HANDLE_VALUE) {
731
        return -1;
732
    }
733
734
    ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT);
735
736
    if (ret < srealpath) {
737
        CloseHandle(hFile);
738
        return -1;
739
    }
740
741
    CloseHandle(hFile);
742
    return ret;
743
}
744
745
#endif
746
747
#ifdef FLB_SYSTEM_WINDOWS
748
#define _mkdir(a, b) mkdir(a)
749
#else
750
0
#define _mkdir(a, b) mkdir(a, b)
751
#endif
752
753
/* recursively create directories, based on:
754
 *   https://stackoverflow.com/a/2336245
755
 * who found it at:
756
 *   http://nion.modprobe.de/blog/archives/357-Recursive-directory-creation.html
757
 */
758
0
static int __mkdir(const char *dir, int perms) {
759
0
    char tmp[255];
760
0
    char *ptr = NULL;
761
0
    size_t len;
762
0
    int ret;
763
764
0
    ret = snprintf(tmp, sizeof(tmp),"%s",dir);
765
0
    if (ret > sizeof(tmp)) {
766
0
        flb_error("directory too long for __mkdir: %s", dir);
767
0
        return -1;
768
0
    }
769
770
0
    len = strlen(tmp);
771
772
0
    if (tmp[len - 1] == PATH_SEPARATOR[0]) {
773
0
        tmp[len - 1] = 0;
774
0
    }
775
776
0
#ifndef FLB_SYSTEM_WINDOWS
777
0
    for (ptr = tmp + 1; *ptr; ptr++) {
778
#else
779
    for (ptr = tmp + 3; *ptr; ptr++) {
780
#endif
781
782
0
        if (*ptr == PATH_SEPARATOR[0]) {
783
0
            *ptr = 0;
784
0
            if (access(tmp, F_OK) != 0) {
785
0
                ret = _mkdir(tmp, perms);
786
0
                if (ret != 0) {
787
0
                    return ret;
788
0
                }
789
0
            }
790
0
            *ptr = PATH_SEPARATOR[0];
791
0
        }
792
0
    }
793
794
0
    return _mkdir(tmp, perms);
795
0
}
796
797
static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
798
0
{
799
0
    flb_sds_t myfleetdir;
800
801
0
    flb_plg_debug(ctx->ins, "checking for configuration directory=%s", ctx->config_dir);
802
0
    if (access(ctx->config_dir, F_OK) != 0) {
803
0
        if (__mkdir(ctx->config_dir, 0700) != 0) {
804
0
            flb_plg_error(ctx->ins, "unable to create fleet config directory");
805
0
            return -1;
806
0
        }
807
0
    }
808
809
0
    myfleetdir = flb_sds_create_size(256);
810
811
0
    if (ctx->fleet_name != NULL) {
812
0
        flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s",
813
0
                       ctx->config_dir, ctx->machine_id, ctx->fleet_name);
814
0
    }
815
0
    else {
816
0
        flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s",
817
0
                       ctx->config_dir, ctx->machine_id, ctx->fleet_id);
818
0
    }
819
820
0
    flb_plg_debug(ctx->ins, "checking for fleet directory=%s", myfleetdir);
821
0
    if (access(myfleetdir, F_OK) != 0) {
822
0
        if (__mkdir(myfleetdir, 0700) !=0) {
823
0
            flb_plg_error(ctx->ins, "unable to create fleet specific directory");
824
0
            return -1;
825
0
        }
826
0
    }
827
828
0
    flb_sds_destroy(myfleetdir);
829
0
    return 0;
830
0
}
831
832
/* cb_collect callback */
833
static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
834
                                     struct flb_config *config, 
835
                                     void *in_context)
836
0
{
837
0
    struct flb_in_calyptia_fleet_config *ctx = in_context;
838
0
    struct flb_connection *u_conn;
839
0
    struct flb_http_client *client;
840
0
    flb_sds_t cfgname;
841
0
    flb_sds_t cfgnewname;
842
0
    flb_sds_t cfgoldname;
843
0
    flb_sds_t cfgcurname;
844
0
    flb_sds_t header = NULL;
845
0
    flb_sds_t hdr;
846
0
    FILE *cfgfp;
847
0
    const char *fbit_last_modified;
848
0
    int fbit_last_modified_len;
849
0
    struct flb_tm tm_last_modified = { 0 };
850
0
    time_t time_last_modified;
851
0
    char *data = NULL;
852
0
    size_t b_sent;
853
0
    int ret = -1;
854
#ifdef FLB_SYSTEM_WINDOWS
855
    DWORD err;
856
    LPSTR lpMsg;
857
#endif
858
859
0
    u_conn = flb_upstream_conn_get(ctx->u);
860
861
0
    if (!u_conn) {
862
0
        flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u",
863
0
                      ctx->ins->host.name, ctx->ins->host.port);
864
0
        goto conn_error;
865
0
    }
866
867
0
    if (ctx->fleet_id == NULL) {
868
869
0
        if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) {
870
0
            flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
871
0
            goto conn_error;
872
0
        }
873
0
    }
874
875
0
    if (ctx->fleet_url == NULL) {
876
0
        ctx->fleet_url = flb_sds_create_size(4096);
877
0
        flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id);
878
0
    }
879
880
0
    client = flb_http_client(u_conn, FLB_HTTP_GET, ctx->fleet_url,
881
0
                             NULL, 0, 
882
0
                             ctx->ins->host.name, ctx->ins->host.port, NULL, 0);
883
884
0
    if (!client) {
885
0
        flb_plg_error(ins, "unable to create http client");
886
0
        goto client_error;
887
0
    }
888
889
0
    flb_http_buffer_size(client, 8192);
890
891
0
    flb_http_add_header(client,
892
0
                        CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1,
893
0
                        ctx->api_key, flb_sds_len(ctx->api_key));
894
895
0
    ret = flb_http_do(client, &b_sent);
896
897
0
    if (ret != 0) {
898
0
        flb_plg_error(ins, "http do error");
899
0
        goto http_error;
900
0
    }
901
902
0
    if (client->resp.status != 200) {
903
0
        flb_plg_error(ins, "http status code error: %d", client->resp.status);
904
0
        goto http_error;
905
0
    }
906
907
0
    if (client->resp.payload_size <= 0) {
908
0
        flb_plg_error(ins, "empty response");
909
0
        goto http_error;
910
0
    }
911
912
    /* copy and NULL terminate the payload */
913
0
    data = flb_sds_create_size(client->resp.payload_size + 1);
914
915
0
    if (!data) {
916
0
        goto http_error;
917
0
    }
918
0
    memcpy(data, client->resp.payload, client->resp.payload_size);
919
0
    data[client->resp.payload_size] = '\0';
920
921
0
    ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"),
922
0
                        &fbit_last_modified, &fbit_last_modified_len);
923
924
0
    if (ret == -1) {
925
0
        flb_plg_error(ctx->ins, "unable to get last-modified header");
926
0
        goto payload_error;
927
0
    }
928
929
0
    flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
930
0
    time_last_modified = mktime(&tm_last_modified.tm);
931
932
0
    cfgname = time_fleet_config_filename(ctx, time_last_modified);
933
934
0
    if (access(cfgname, F_OK) == -1 && errno == ENOENT) {
935
0
        if (create_fleet_directory(ctx) != 0) {
936
0
            flb_plg_error(ctx->ins, "unable to create fleet directories");
937
0
            goto http_error;
938
0
        }
939
0
        cfgfp = fopen(cfgname, "w+");
940
941
0
        if (cfgfp == NULL) {
942
0
            flb_plg_error(ctx->ins, "unable to open configuration file: %s", cfgname);
943
0
            flb_sds_destroy(cfgname);
944
0
            goto payload_error;
945
0
        }
946
947
0
        header = flb_sds_create_size(4096);
948
949
0
        if (ctx->fleet_name == NULL) {
950
0
            hdr = flb_sds_printf(&header,
951
0
                        "[CUSTOM]\n"
952
0
                        "    Name             calyptia\n"
953
0
                        "    api_key          %s\n"
954
0
                        "    fleet_id         %s\n"
955
0
                        "    add_label        fleet_id %s\n"
956
0
                        "    fleet.config_dir %s\n"
957
0
                        "    calyptia_host    %s\n"
958
0
                        "    calyptia_port    %d\n"
959
0
                        "    calyptia_tls     %s\n",
960
0
                        ctx->api_key,
961
0
                        ctx->fleet_id,
962
0
                        ctx->fleet_id,
963
0
                        ctx->config_dir,
964
0
                        ctx->ins->host.name,
965
0
                        ctx->ins->host.port,
966
0
                        tls_setting_string(ctx->ins->use_tls)
967
0
            );
968
0
        }
969
0
        else {
970
0
            hdr = flb_sds_printf(&header,
971
0
                        "[CUSTOM]\n"
972
0
                        "    Name          calyptia\n"
973
0
                        "    api_key       %s\n"
974
0
                        "    fleet_name    %s\n"
975
0
                        "    fleet_id      %s\n"
976
0
                        "    add_label     fleet_id %s\n"
977
0
                        "    fleet.config_dir    %s\n"
978
0
                        "    calyptia_host %s\n"
979
0
                        "    calyptia_port %d\n"
980
0
                        "    calyptia_tls  %s\n",
981
0
                        ctx->api_key,
982
0
                        ctx->fleet_name,
983
0
                        ctx->fleet_id,
984
0
                        ctx->fleet_id,
985
0
                        ctx->config_dir,
986
0
                        ctx->ins->host.name,
987
0
                        ctx->ins->host.port,
988
0
                        tls_setting_string(ctx->ins->use_tls)
989
0
            );
990
0
        }
991
0
        if (hdr == NULL) {
992
0
            fclose(cfgfp);
993
0
            flb_sds_destroy(cfgname);
994
0
            goto header_error;
995
0
        }
996
0
        if (ctx->machine_id) {
997
0
            hdr = flb_sds_printf(&header, "    machine_id %s\n", ctx->machine_id);
998
0
            if (hdr == NULL) {
999
0
                fclose(cfgfp);
1000
0
                flb_sds_destroy(cfgname);
1001
0
                goto header_error;
1002
0
            }
1003
0
        }
1004
0
        fwrite(header, strlen(header), 1, cfgfp);
1005
0
        flb_sds_destroy(header);
1006
0
        header = NULL;
1007
0
        fwrite(data, client->resp.payload_size, 1, cfgfp);
1008
0
        fclose(cfgfp);
1009
1010
0
        cfgnewname = new_fleet_config_filename(ctx);
1011
1012
0
        if (exists_new_fleet_config(ctx) == FLB_TRUE) {
1013
0
            cfgoldname = old_fleet_config_filename(ctx);
1014
0
            rename(cfgnewname, cfgoldname);
1015
0
            unlink(cfgnewname);
1016
0
            flb_sds_destroy(cfgoldname);
1017
0
        }
1018
1019
0
        if (!link(cfgname, cfgnewname)) {
1020
#ifdef FLB_SYSTEM_WINDOWS
1021
            err = GetLastError();
1022
            FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER,
1023
                          NULL, err, 0, &lpMsg, 0, NULL);
1024
            flb_plg_error(ctx->ins, "unable to create hard link: %s", lpMsg);
1025
#else
1026
0
            flb_errno();
1027
0
#endif
1028
0
        }
1029
1030
0
        flb_sds_destroy(cfgnewname);
1031
0
    }
1032
1033
0
    if (ctx->config_timestamp < time_last_modified) {
1034
0
        flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld",
1035
0
                      ctx->config_timestamp, time_last_modified);
1036
1037
0
        if (execute_reload(ctx, cfgname) == FLB_FALSE) {
1038
0
            cfgoldname = old_fleet_config_filename(ctx);
1039
0
            cfgcurname = cur_fleet_config_filename(ctx);
1040
0
            rename(cfgoldname, cfgcurname);
1041
0
            flb_sds_destroy(cfgcurname);
1042
0
            flb_sds_destroy(cfgoldname);
1043
0
            flb_sds_destroy(cfgname);
1044
0
            goto reload_error;
1045
0
        }
1046
0
    }
1047
0
    else {
1048
0
        flb_sds_destroy(cfgname);
1049
0
    }
1050
1051
0
    ret = 0;
1052
1053
0
reload_error:
1054
0
header_error:
1055
0
    if (header) {
1056
0
        flb_sds_destroy(header);
1057
0
    }
1058
0
payload_error:
1059
0
    if (data) {
1060
0
        flb_sds_destroy(data);
1061
0
    }
1062
1063
0
http_error:
1064
0
    flb_plg_debug(ctx->ins, "freeing http client in fleet collect");
1065
0
    flb_http_client_destroy(client);
1066
0
client_error:
1067
0
    flb_upstream_conn_release(u_conn);
1068
0
conn_error:
1069
0
    FLB_INPUT_RETURN(ret);
1070
0
}
1071
1072
static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
1073
0
{
1074
0
    flb_ctx_t *flb_ctx = flb_context_get();
1075
0
    char *fname;
1076
0
    char *ext;
1077
0
    long timestamp;
1078
0
    char realname[4096];
1079
0
    ssize_t len;
1080
1081
0
    if (create_fleet_directory(ctx) != 0) {
1082
0
        flb_plg_error(ctx->ins, "unable to create fleet directories");
1083
0
        return -1;
1084
0
    }
1085
1086
    /* check if we are already using the fleet configuration file. */
1087
0
    if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
1088
        /* check which one and load it */
1089
0
        if (exists_cur_fleet_config(ctx) == FLB_TRUE) {
1090
0
            return execute_reload(ctx, cur_fleet_config_filename(ctx));
1091
0
        }
1092
0
        else if (exists_new_fleet_config(ctx) == FLB_TRUE) {
1093
0
            return execute_reload(ctx, new_fleet_config_filename(ctx));
1094
0
        }
1095
0
    }
1096
0
    else {
1097
0
        if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) {
1098
0
            len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname));
1099
1100
0
            if (len > sizeof(realname)) {
1101
0
                return FLB_FALSE;
1102
0
            }
1103
1104
0
            fname = basename(realname);
1105
0
        }
1106
0
        else {
1107
0
            fname = basename(flb_ctx->config->conf_path_file);
1108
0
        }
1109
1110
0
        if (fname == NULL) {
1111
0
            return FLB_FALSE;
1112
0
        }
1113
1114
0
        errno = 0;
1115
0
        timestamp = strtol(fname, &ext, 10);
1116
1117
0
        if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) ||
1118
0
             (errno != 0 && timestamp == 0)) {
1119
0
            flb_errno();
1120
0
            return FLB_FALSE;
1121
0
        }
1122
1123
        /* unable to parse the timstamp */
1124
0
        if (errno == ERANGE) {
1125
0
            return FLB_FALSE;
1126
0
        }
1127
1128
0
        ctx->config_timestamp = timestamp;
1129
0
    }
1130
1131
0
    return FLB_FALSE;
1132
0
}
1133
1134
static int in_calyptia_fleet_init(struct flb_input_instance *in,
1135
                          struct flb_config *config, void *data)
1136
0
{
1137
0
    int ret;
1138
0
    int upstream_flags;
1139
0
    struct flb_in_calyptia_fleet_config *ctx;
1140
0
    (void) data;
1141
1142
#ifdef _WIN32
1143
    char *tmpdir;
1144
#endif
1145
1146
0
    flb_plg_info(in, "initializing calyptia fleet input.");
1147
1148
0
    if (in->host.name == NULL) {
1149
0
        flb_plg_error(in, "no input 'Host' provided");
1150
0
        return -1;
1151
0
    }
1152
1153
    /* Allocate space for the configuration */
1154
0
    ctx = flb_calloc(1, sizeof(struct flb_in_calyptia_fleet_config));
1155
1156
0
    if (!ctx) {
1157
0
        flb_errno();
1158
0
        return -1;
1159
0
    }
1160
0
    ctx->ins = in;
1161
0
    ctx->collect_fd = -1;
1162
1163
1164
    /* Load the config map */
1165
0
    ret = flb_input_config_map_set(in, (void *)ctx);
1166
1167
0
    if (ret == -1) {
1168
0
        flb_free(ctx);
1169
0
        flb_plg_error(in, "unable to load configuration");
1170
0
        return -1;
1171
0
    }
1172
1173
#ifdef _WIN32
1174
    if (ctx->config_dir == NULL) {
1175
        tmpdir = getenv("TEMP");
1176
1177
        if (tmpdir == NULL) {
1178
            flb_plg_error(in, "unable to find temporary directory (%%TEMP%%).");
1179
            flb_free(ctx);
1180
            return -1;
1181
        }
1182
1183
        ctx->config_dir = flb_sds_create_size(4096);
1184
1185
        if (ctx->config_dir == NULL) {
1186
            flb_plg_error(in, "unable to allocate config-dir.");
1187
            flb_free(ctx);
1188
            return -1;
1189
        }
1190
        flb_sds_printf(&ctx->config_dir, "%s" PATH_SEPARATOR "%s", tmpdir, "calyptia-fleet");
1191
    }
1192
#endif
1193
1194
0
    upstream_flags = FLB_IO_TCP;
1195
1196
0
    if (in->use_tls) {
1197
0
        upstream_flags |= FLB_IO_TLS;
1198
0
    }
1199
1200
0
    ctx->u = flb_upstream_create(config, in->host.name, in->host.port,
1201
0
                                 upstream_flags, in->tls);
1202
1203
0
    if (!ctx->u) {
1204
0
        flb_plg_error(ctx->ins, "could not initialize upstream");
1205
0
        flb_free(ctx);
1206
0
        return -1;
1207
0
    }
1208
1209
0
    if (ctx->interval_sec <= 0 && ctx->interval_nsec <= 0) {
1210
        /* Illegal settings. Override them. */
1211
0
        ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
1212
0
        ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
1213
0
    }
1214
1215
0
    if (ctx->interval_sec < atoi(DEFAULT_INTERVAL_SEC)) {
1216
0
        ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
1217
0
    }
1218
1219
    /* Set the context */
1220
0
    flb_input_set_context(in, ctx);
1221
1222
    /* if we load a new configuration then we will be reloaded anyways */
1223
0
    if (load_fleet_config(ctx) == FLB_TRUE) {
1224
0
        return 0;
1225
0
    }
1226
1227
    /* Set our collector based on time */
1228
0
    ret = flb_input_set_collector_time(in,
1229
0
                                       in_calyptia_fleet_collect,
1230
0
                                       ctx->interval_sec,
1231
0
                                       ctx->interval_nsec,
1232
0
                                       config);
1233
1234
0
    if (ret == -1) {
1235
0
        flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
1236
0
        flb_upstream_destroy(ctx->u);
1237
0
        flb_free(ctx);
1238
0
        return -1;
1239
0
    }
1240
1241
0
    ctx->collect_fd = ret;
1242
1243
0
    return 0;
1244
0
}
1245
1246
static void cb_in_calyptia_fleet_pause(void *data, struct flb_config *config)
1247
0
{
1248
0
    struct flb_in_calyptia_fleet_config *ctx = data;
1249
0
    flb_input_collector_pause(ctx->collect_fd, ctx->ins);
1250
0
}
1251
1252
static void cb_in_calyptia_fleet_resume(void *data, struct flb_config *config)
1253
0
{
1254
0
    struct flb_in_calyptia_fleet_config *ctx = data;
1255
0
    flb_input_collector_resume(ctx->collect_fd, ctx->ins);
1256
0
}
1257
1258
static int in_calyptia_fleet_exit(void *data, struct flb_config *config)
1259
0
{
1260
0
    (void) *config;
1261
0
    struct flb_in_calyptia_fleet_config *ctx = (struct flb_in_calyptia_fleet_config *)data;
1262
1263
0
    if (ctx->fleet_url) {
1264
0
        flb_sds_destroy(ctx->fleet_url);
1265
0
    }
1266
1267
0
    if (ctx->fleet_id && ctx->fleet_id_found) {
1268
0
        flb_sds_destroy(ctx->fleet_id);
1269
0
    }
1270
1271
0
    flb_input_collector_delete(ctx->collect_fd, ctx->ins);
1272
0
    flb_upstream_destroy(ctx->u);
1273
0
    flb_free(ctx);
1274
1275
0
    return 0;
1276
0
}
1277
1278
static struct flb_config_map config_map[] = {
1279
    {
1280
     FLB_CONFIG_MAP_STR, "api_key", NULL,
1281
     0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, api_key),
1282
     "Calyptia Cloud API Key."
1283
    },
1284
    {
1285
     FLB_CONFIG_MAP_STR, "config_dir", DEFAULT_CONFIG_DIR,
1286
     0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, config_dir),
1287
     "Base path for the configuration directory."
1288
    },
1289
    {
1290
     FLB_CONFIG_MAP_STR, "fleet_id", NULL,
1291
     0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, fleet_id),
1292
     "Calyptia Fleet ID."
1293
    },
1294
    {
1295
     FLB_CONFIG_MAP_STR, "fleet_name", NULL,
1296
     0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, fleet_name),
1297
     "Calyptia Fleet Name (used to lookup the fleet ID via the cloud API)."
1298
    },
1299
    {
1300
     FLB_CONFIG_MAP_STR, "machine_id", NULL,
1301
     0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, machine_id),
1302
     "Agent Machine ID."
1303
    },
1304
    {
1305
      FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
1306
      0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, interval_sec),
1307
      "Set the collector interval"
1308
    },
1309
    {
1310
      FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
1311
      0, FLB_TRUE, offsetof(struct flb_in_calyptia_fleet_config, interval_nsec),
1312
      "Set the collector interval (nanoseconds)"
1313
    },
1314
    /* EOF */
1315
    {0}
1316
};
1317
1318
/* Plugin reference */
1319
struct flb_input_plugin in_calyptia_fleet_plugin = {
1320
    .name         = "calyptia_fleet",
1321
    .description  = "Calyptia Fleet Input",
1322
    .cb_init      = in_calyptia_fleet_init,
1323
    .cb_pre_run   = NULL,
1324
    .cb_collect   = in_calyptia_fleet_collect,
1325
    .cb_resume    = cb_in_calyptia_fleet_resume,
1326
    .cb_pause     = cb_in_calyptia_fleet_pause,
1327
    .cb_flush_buf = NULL,
1328
    .cb_exit      = in_calyptia_fleet_exit,
1329
    .config_map   = config_map,
1330
    .flags        = FLB_INPUT_NET|FLB_INPUT_CORO|FLB_IO_OPT_TLS|FLB_INPUT_PRIVATE
1331
};