Coverage Report

Created: 2026-05-16 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_calyptia/calyptia.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_log.h>
22
#include <fluent-bit/flb_kv.h>
23
#include <fluent-bit/flb_upstream.h>
24
#include <fluent-bit/flb_utils.h>
25
#include <fluent-bit/flb_pack.h>
26
#include <fluent-bit/flb_version.h>
27
#include <fluent-bit/flb_metrics.h>
28
#include <fluent-bit/flb_fstore.h>
29
30
#include "calyptia.h"
31
32
#include <cmetrics/cmetrics.h>
33
#include <cmetrics/cmt_encode_influx.h>
34
35
flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx);
36
static void calyptia_ctx_destroy(struct flb_calyptia *ctx);
37
38
static int get_io_flags(struct flb_output_instance *ins)
39
0
{
40
0
    int flags = 0;
41
42
0
    if (ins->use_tls) {
43
0
        flags = FLB_IO_TLS;
44
0
    }
45
0
    else {
46
0
        flags = FLB_IO_TCP;
47
0
    }
48
49
0
    return flags;
50
0
}
51
52
static int config_add_labels(struct flb_output_instance *ins,
53
                             struct flb_calyptia *ctx)
54
0
{
55
0
    struct mk_list *head;
56
0
    struct flb_config_map_val *mv;
57
0
    struct flb_slist_entry *k = NULL;
58
0
    struct flb_slist_entry *v = NULL;
59
0
    struct flb_kv *kv;
60
61
0
    if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
62
0
        return 0;
63
0
    }
64
65
    /* iterate all 'add_label' definitions */
66
0
    flb_config_map_foreach(head, mv, ctx->add_labels) {
67
0
        if (mk_list_size(mv->val.list) != 2) {
68
0
            flb_plg_error(ins, "'add_label' expects a key and a value, "
69
0
                          "e.g: 'add_label version 1.8.x'");
70
0
            return -1;
71
0
        }
72
73
0
        k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
74
0
        v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
75
76
0
        kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
77
0
        if (!kv) {
78
0
            flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
79
0
            return -1;
80
0
        }
81
0
    }
82
83
0
    return 0;
84
0
}
85
86
static void append_labels(struct flb_calyptia *ctx, struct cmt *cmt)
87
0
{
88
0
    struct flb_kv *kv;
89
0
    struct mk_list *head;
90
91
0
    mk_list_foreach(head, &ctx->kv_labels) {
92
0
        kv = mk_list_entry(head, struct flb_kv, _head);
93
0
        cmt_label_add(cmt, kv->key, kv->val);
94
0
    }
95
0
}
96
97
static void pack_str(msgpack_packer *mp_pck, char *str)
98
0
{
99
0
    int len;
100
101
0
    len = strlen(str);
102
0
    msgpack_pack_str(mp_pck, len);
103
0
    msgpack_pack_str_body(mp_pck, str, len);
104
0
}
105
106
static void pack_env(struct flb_env *env, char *prefix,  char *key,
107
                     struct flb_mp_map_header *h,
108
                     msgpack_packer *mp_pck)
109
0
{
110
0
    int len = 0;
111
0
    char *val;
112
113
    /* prefix set in the key, if set, adjust the key name */
114
0
    if (prefix) {
115
0
        len = strlen(prefix);
116
0
    }
117
118
0
    val = (char *) flb_env_get(env, key);
119
0
    if (val) {
120
0
        flb_mp_map_header_append(h);
121
0
        pack_str(mp_pck, key + len);
122
0
        pack_str(mp_pck, val);
123
0
    }
124
0
}
125
126
static void pack_env_metadata(struct flb_env *env,
127
                              struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
128
0
{
129
0
    char *tmp;
130
0
    struct flb_mp_map_header h;
131
0
    struct flb_mp_map_header meta;
132
133
    /* Metadata */
134
0
    flb_mp_map_header_append(mh);
135
0
    pack_str(mp_pck, "metadata");
136
137
0
    flb_mp_map_header_init(&meta, mp_pck);
138
139
    /* Kubernetes */
140
0
    tmp = (char *) flb_env_get(env, "k8s");
141
0
    if (tmp && strcasecmp(tmp, "enabled") == 0) {
142
0
        flb_mp_map_header_append(&meta);
143
0
        pack_str(mp_pck, "k8s");
144
145
        /* adding k8s map */
146
0
        flb_mp_map_header_init(&h, mp_pck);
147
148
0
        pack_env(env, "k8s.", "k8s.namespace", &h, mp_pck);
149
0
        pack_env(env, "k8s.", "k8s.pod_name", &h, mp_pck);
150
0
        pack_env(env, "k8s.", "k8s.node_name", &h, mp_pck);
151
152
0
        flb_mp_map_header_end(&h);
153
0
    }
154
155
    /* AWS */
156
0
    tmp = (char *) flb_env_get(env, "aws");
157
0
    if (tmp && strcasecmp(tmp, "enabled") == 0) {
158
0
        flb_mp_map_header_append(&meta);
159
0
        pack_str(mp_pck, "aws");
160
161
        /* adding aws map */
162
0
        flb_mp_map_header_init(&h, mp_pck);
163
164
0
        pack_env(env, "aws.", "aws.az", &h, mp_pck);
165
0
        pack_env(env, "aws.", "aws.ec2_instance_id", &h, mp_pck);
166
0
        pack_env(env, "aws.", "aws.ec2_instance_type", &h, mp_pck);
167
0
        pack_env(env, "aws.", "aws.private_ip", &h, mp_pck);
168
0
        pack_env(env, "aws.", "aws.vpc_id", &h, mp_pck);
169
0
        pack_env(env, "aws.", "aws.ami_id", &h, mp_pck);
170
0
        pack_env(env, "aws.", "aws.account_id", &h, mp_pck);
171
0
        pack_env(env, "aws.", "aws.hostname", &h, mp_pck);
172
173
0
        flb_mp_map_header_end(&h);
174
0
    }
175
0
    flb_mp_map_header_end(&meta);
176
0
}
177
178
static flb_sds_t get_agent_metadata(struct flb_calyptia *ctx)
179
0
{
180
0
    int len;
181
0
    char *host;
182
0
    flb_sds_t conf;
183
0
    flb_sds_t meta;
184
0
    struct flb_mp_map_header mh;
185
0
    msgpack_sbuffer mp_sbuf;
186
0
    msgpack_packer mp_pck;
187
0
    struct flb_config *config = ctx->config;
188
189
    /* init msgpack */
190
0
    msgpack_sbuffer_init(&mp_sbuf);
191
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
192
193
    /* pack map */
194
0
    flb_mp_map_header_init(&mh, &mp_pck);
195
196
0
    host = (char *) flb_env_get(ctx->env, "HOSTNAME");
197
0
    if (!host) {
198
0
        host = "unknown";
199
0
    }
200
0
    len = strlen(host);
201
202
    /* name */
203
0
    flb_mp_map_header_append(&mh);
204
0
    msgpack_pack_str(&mp_pck, 4);
205
0
    msgpack_pack_str_body(&mp_pck, "name", 4);
206
0
    msgpack_pack_str(&mp_pck, len);
207
0
    msgpack_pack_str_body(&mp_pck, host, len);
208
209
    /* type */
210
0
    flb_mp_map_header_append(&mh);
211
0
    msgpack_pack_str(&mp_pck, 4);
212
0
    msgpack_pack_str_body(&mp_pck, "type", 4);
213
0
    msgpack_pack_str(&mp_pck, 9);
214
0
    msgpack_pack_str_body(&mp_pck, "fluentbit", 9);
215
216
    /* rawConfig */
217
0
    conf = custom_calyptia_pipeline_config_get(ctx->config);
218
0
    if (conf) {
219
0
        flb_mp_map_header_append(&mh);
220
0
        len = flb_sds_len(conf);
221
0
        msgpack_pack_str(&mp_pck, 9);
222
0
        msgpack_pack_str_body(&mp_pck, "rawConfig", 9);
223
0
        msgpack_pack_str(&mp_pck, len);
224
0
        msgpack_pack_str_body(&mp_pck, conf, len);
225
0
    }
226
0
    flb_sds_destroy(conf);
227
228
    /* version */
229
0
    flb_mp_map_header_append(&mh);
230
0
    msgpack_pack_str(&mp_pck, 7);
231
0
    msgpack_pack_str_body(&mp_pck, "version", 7);
232
0
    len = strlen(FLB_VERSION_STR);
233
0
    msgpack_pack_str(&mp_pck, len);
234
0
    msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, len);
235
236
    /* edition */
237
0
    flb_mp_map_header_append(&mh);
238
0
    msgpack_pack_str(&mp_pck, 7);
239
0
    msgpack_pack_str_body(&mp_pck, "edition", 7);
240
0
    msgpack_pack_str(&mp_pck, 9);
241
0
    msgpack_pack_str_body(&mp_pck, "community", 9);
242
243
0
    flb_mp_map_header_append(&mh);
244
0
    msgpack_pack_str(&mp_pck, 2);
245
0
    msgpack_pack_str_body(&mp_pck, "os", 2);
246
#ifdef FLB_SYSTEM_WINDOWS
247
    len = strlen("windows");
248
    msgpack_pack_str(&mp_pck, len);
249
    msgpack_pack_str_body(&mp_pck, "windows", len);
250
#elif FLB_SYSTEM_MACOS
251
    len = strlen("macos");
252
    msgpack_pack_str(&mp_pck, len);
253
    msgpack_pack_str_body(&mp_pck, "macos", len);
254
#elif __linux__
255
    len = strlen("linux");
256
0
    msgpack_pack_str(&mp_pck, len);
257
0
    msgpack_pack_str_body(&mp_pck, "linux", len);
258
#else
259
    len = strlen("unknown");
260
    msgpack_pack_str(&mp_pck, len);
261
    msgpack_pack_str_body(&mp_pck, "unknown", len);
262
#endif
263
264
0
    flb_mp_map_header_append(&mh);
265
0
    msgpack_pack_str(&mp_pck, 4);
266
0
    msgpack_pack_str_body(&mp_pck, "arch", 4);
267
#if defined(__arm__) || defined(_M_ARM)
268
    len = strlen("arm");
269
    msgpack_pack_str(&mp_pck, len);
270
    msgpack_pack_str_body(&mp_pck, "arm", len);
271
#elif defined(__aarch64__)
272
    len = strlen("arm64");
273
    msgpack_pack_str(&mp_pck, len);
274
    msgpack_pack_str_body(&mp_pck, "arm64", len);
275
#elif defined(__amd64__) || defined(_M_AMD64)
276
    len = strlen("x86_64");
277
0
    msgpack_pack_str(&mp_pck, len);
278
0
    msgpack_pack_str_body(&mp_pck, "x86_64", len);
279
#elif defined(__i686__) || defined(_M_I86)
280
    len = strlen("x86");
281
    msgpack_pack_str(&mp_pck, len);
282
    msgpack_pack_str_body(&mp_pck, "x86", len);
283
#else
284
    len = strlen("unknown");
285
    msgpack_pack_str(&mp_pck, len);
286
    msgpack_pack_str_body(&mp_pck, "unknown", len);
287
#endif
288
289
    /* machineID */
290
0
    flb_mp_map_header_append(&mh);
291
0
    msgpack_pack_str(&mp_pck, 9);
292
0
    msgpack_pack_str_body(&mp_pck, "machineID", 9);
293
0
    len = flb_sds_len(ctx->machine_id);
294
0
    msgpack_pack_str(&mp_pck, len);
295
0
    msgpack_pack_str_body(&mp_pck, ctx->machine_id, len);
296
297
    /* fleetID */
298
0
    if (ctx->fleet_id) {
299
0
        flb_mp_map_header_append(&mh);
300
0
        msgpack_pack_str(&mp_pck, 7);
301
0
        msgpack_pack_str_body(&mp_pck, "fleetID", 7);
302
0
        len = flb_sds_len(ctx->fleet_id);
303
0
        msgpack_pack_str(&mp_pck, len);
304
0
        msgpack_pack_str_body(&mp_pck, ctx->fleet_id, len);
305
0
    }
306
307
    /* pack environment metadata */
308
0
    pack_env_metadata(config->env, &mh, &mp_pck);
309
310
    /* finalize */
311
0
    flb_mp_map_header_end(&mh);
312
313
    /* convert to json */
314
0
    meta = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size, FLB_TRUE); /* could be ASCII */
315
0
    msgpack_sbuffer_destroy(&mp_sbuf);
316
317
0
    return meta;
318
0
}
319
320
static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
321
                            int type)
322
0
{
323
0
    int ret;
324
0
    size_t b_sent;
325
326
0
    if( !ctx || !c ) {
327
0
        return FLB_ERROR;
328
0
    }
329
330
    /* Ensure agent_token is not empty when required */
331
0
    if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
332
0
        !ctx->agent_token) {
333
0
        flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
334
0
        return FLB_ERROR;
335
0
    }
336
337
    /* append headers */
338
0
    if (type == CALYPTIA_ACTION_REGISTER) {
339
        // When registering a new agent api key is required
340
0
        if (!ctx->api_key) {
341
0
            flb_plg_error(ctx->ins, "api_key is missing");
342
0
            return FLB_ERROR;
343
0
        }
344
0
        flb_http_add_header(c,
345
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
346
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
347
348
0
        flb_http_add_header(c,
349
0
                            CALYPTIA_HEADERS_PROJECT, sizeof(CALYPTIA_HEADERS_PROJECT) - 1,
350
0
                            ctx->api_key, flb_sds_len(ctx->api_key));
351
0
    }
352
0
    else if (type == CALYPTIA_ACTION_PATCH) {
353
0
        flb_http_add_header(c,
354
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
355
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
356
357
0
        flb_http_add_header(c,
358
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
359
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
360
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
361
0
    }
362
0
    else if (type == CALYPTIA_ACTION_METRICS) {
363
0
        flb_http_add_header(c,
364
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
365
0
                            CALYPTIA_HEADERS_CTYPE_MSGPACK,
366
0
                            sizeof(CALYPTIA_HEADERS_CTYPE_MSGPACK) - 1);
367
368
0
        flb_http_add_header(c,
369
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
370
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
371
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
372
0
    }
373
0
#ifdef FLB_HAVE_CHUNK_TRACE
374
0
    else if (type == CALYPTIA_ACTION_TRACE)  {
375
0
        flb_http_add_header(c,
376
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
377
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
378
379
0
        flb_http_add_header(c,
380
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
381
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
382
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
383
0
    }
384
0
#endif
385
386
    /* Map debug callbacks */
387
0
    flb_http_client_debug(c, ctx->ins->callback);
388
389
    /* Perform HTTP request */
390
0
    ret = flb_http_do(c, &b_sent);
391
0
    if (ret != 0) {
392
0
        flb_plg_warn(ctx->ins, "http_do=%i", ret);
393
0
        return FLB_RETRY;
394
0
    }
395
396
0
    if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) {
397
0
        if (c->resp.payload_size > 0) {
398
0
            flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
399
0
                         c->resp.status, c->resp.payload);
400
0
        }
401
0
        else {
402
0
            flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
403
0
        }
404
405
        /* invalid metrics */
406
0
        if (c->resp.status == 422) {
407
0
            return FLB_ERROR;
408
0
        }
409
0
        return FLB_RETRY;;
410
0
    }
411
412
0
    return FLB_OK;
413
0
}
414
415
static flb_sds_t get_agent_info(char *buf, size_t size, char *k)
416
0
{
417
0
    int i;
418
0
    int ret;
419
0
    int type;
420
0
    int len;
421
0
    char *out_buf;
422
0
    flb_sds_t v = NULL;
423
0
    size_t off = 0;
424
0
    size_t out_size;
425
0
    msgpack_unpacked result;
426
0
    msgpack_object root;
427
0
    msgpack_object key;
428
0
    msgpack_object val;
429
430
0
    len = strlen(k);
431
432
0
    ret = flb_pack_json(buf, size, &out_buf, &out_size, &type, NULL);
433
0
    if (ret != 0) {
434
0
        return NULL;
435
0
    }
436
437
0
    msgpack_unpacked_init(&result);
438
0
    ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
439
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
440
0
        flb_free(out_buf);
441
0
        msgpack_unpacked_destroy(&result);
442
0
        return NULL;
443
0
    }
444
445
0
    root = result.data;
446
0
    if (root.type != MSGPACK_OBJECT_MAP) {
447
0
        flb_free(out_buf);
448
0
        msgpack_unpacked_destroy(&result);
449
0
        return NULL;
450
0
    }
451
452
0
    for (i = 0; i < root.via.map.size; i++) {
453
0
        key = root.via.map.ptr[i].key;
454
0
        val = root.via.map.ptr[i].val;
455
456
0
        if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
457
0
            continue;
458
0
        }
459
460
0
        if (key.via.str.size != len) {
461
0
            continue;
462
0
        }
463
464
0
        if (strncmp(key.via.str.ptr, k, len) == 0) {
465
0
            v = flb_sds_create_len(val.via.str.ptr, val.via.str.size);
466
0
            break;
467
0
        }
468
0
    }
469
470
0
    flb_free(out_buf);
471
0
    msgpack_unpacked_destroy(&result);
472
0
    return v;
473
0
}
474
475
/* Set the session content */
476
static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
477
0
{
478
0
    int ret;
479
0
    int type;
480
0
    char *mp_buf;
481
0
    size_t mp_size;
482
483
    /* remove any previous session file */
484
0
    if (ctx->fs_file) {
485
0
        flb_fstore_file_delete(ctx->fs, ctx->fs_file);
486
0
    }
487
488
    /* create session file */
489
0
    ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream,
490
0
                                          CALYPTIA_SESSION_FILE, 1024);
491
0
    if (!ctx->fs_file) {
492
0
        flb_plg_error(ctx->ins, "could not create new session file");
493
0
        return -1;
494
0
    }
495
496
    /* store meta */
497
0
    flb_fstore_file_meta_set(ctx->fs, ctx->fs_file,
498
0
                             FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1);
499
500
    /* encode */
501
0
    ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type, NULL);
502
0
    if (ret < 0) {
503
0
        flb_plg_error(ctx->ins, "could not encode session information");
504
0
        return -1;
505
0
    }
506
507
    /* store content */
508
0
    ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
509
0
    if (ret == -1) {
510
0
        flb_plg_error(ctx->ins, "could not store session information");
511
0
        flb_free(mp_buf);
512
0
        return -1;
513
0
    }
514
515
0
    flb_free(mp_buf);
516
0
    return 0;
517
0
}
518
519
static int store_session_get(struct flb_calyptia *ctx,
520
                             void **out_buf, size_t *out_size)
521
0
{
522
0
    int ret;
523
0
    void *buf;
524
0
    size_t size;
525
0
    flb_sds_t json;
526
527
0
    ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file,
528
0
                                       &buf, &size);
529
530
0
    if (size == 0) {
531
0
        return -1;
532
0
    }
533
534
    /* decode */
535
0
    json = flb_msgpack_raw_to_json_sds(buf, size, FLB_TRUE); /* TODO: could be ASCII? */
536
0
    flb_free(buf);
537
0
    if (!json) {
538
0
        return -1;
539
0
    }
540
541
0
    *out_buf = json;
542
0
    *out_size = flb_sds_len(json);
543
544
0
    return ret;
545
0
}
546
547
static int store_init(struct flb_calyptia *ctx)
548
0
{
549
0
    int ret;
550
0
    struct flb_fstore *fs;
551
0
    struct flb_fstore_file *fsf;
552
0
    void *buf;
553
0
    size_t size;
554
555
    /* store context */
556
0
    fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS);
557
0
    if (!fs) {
558
0
        flb_plg_error(ctx->ins,
559
0
                      "could not initialize 'store_path': %s",
560
0
                      ctx->store_path);
561
0
        return -1;
562
0
    }
563
0
    ctx->fs = fs;
564
565
    /* stream */
566
0
    ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia");
567
0
    if (!ctx->fs_stream) {
568
0
        flb_plg_error(ctx->ins, "could not create storage stream");
569
0
        return -1;
570
0
    }
571
572
    /* lookup any previous file */
573
0
    fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE,
574
0
                              sizeof(CALYPTIA_SESSION_FILE) - 1);
575
0
    if (!fsf) {
576
0
        flb_plg_debug(ctx->ins, "no session file was found");
577
0
        return 0;
578
0
    }
579
0
    ctx->fs_file = fsf;
580
581
    /* retrieve session info */
582
0
    ret = store_session_get(ctx, &buf, &size);
583
0
    if (ret == 0) {
584
        /* agent id */
585
0
        ctx->agent_id = get_agent_info(buf, size, "id");
586
587
        /* agent token */
588
0
        ctx->agent_token = get_agent_info(buf, size, "token");
589
590
0
        if (ctx->agent_id && ctx->agent_token) {
591
0
            flb_plg_info(ctx->ins, "session setup OK");
592
0
        }
593
0
        else {
594
0
            if (ctx->agent_id) {
595
0
                flb_sds_destroy(ctx->agent_id);
596
0
            }
597
0
            if (ctx->agent_token) {
598
0
                flb_sds_destroy(ctx->agent_token);
599
0
            }
600
0
        }
601
0
        flb_sds_destroy(buf);
602
0
    }
603
604
0
    return 0;
605
0
}
606
607
/* Agent creation is perform on initialization using a sync upstream connection */
608
static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
609
0
{
610
0
    int ret;
611
0
    int flb_ret;
612
0
    int flags;
613
0
    int action = CALYPTIA_ACTION_REGISTER;
614
0
    char uri[1024];
615
0
    flb_sds_t meta;
616
0
    struct flb_upstream *u;
617
0
    struct flb_connection *u_conn;
618
0
    struct flb_http_client *c;
619
620
    /* Meta */
621
0
    meta = get_agent_metadata(ctx);
622
0
    if (!meta) {
623
0
        flb_plg_error(ctx->ins, "could not retrieve metadata");
624
0
        return -1;
625
0
    }
626
627
    /* Upstream */
628
0
    flags = get_io_flags(ctx->ins);
629
0
    u = flb_upstream_create(ctx->config,
630
0
                            ctx->cloud_host, ctx->cloud_port,
631
0
                            flags, ctx->ins->tls);
632
0
    if (!u) {
633
0
        flb_plg_error(ctx->ins,
634
0
                      "could not create upstream connection on 'agent create'");
635
0
        flb_sds_destroy(meta);
636
0
        return -1;
637
0
    }
638
639
    /* Make it synchronous */
640
0
    flb_stream_disable_async_mode(&u->base);
641
642
    /* Get upstream connection */
643
0
    u_conn = flb_upstream_conn_get(u);
644
0
    if (!u_conn) {
645
0
        flb_upstream_destroy(u);
646
0
        flb_sds_destroy(meta);
647
0
        return -1;
648
0
    }
649
650
0
    if (ctx->agent_id && ctx->agent_token) {
651
        /* Patch */
652
0
        action = CALYPTIA_ACTION_PATCH;
653
0
        snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id);
654
0
        c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri,
655
0
                            meta, flb_sds_len(meta), NULL, 0, NULL, 0);
656
0
    }
657
0
    else {
658
        /* Create */
659
0
        action = CALYPTIA_ACTION_REGISTER;
660
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
661
0
                            meta, flb_sds_len(meta), NULL, 0, NULL, 0);
662
0
    }
663
664
0
    if (!c) {
665
0
        flb_upstream_conn_release(u_conn);
666
0
        flb_upstream_destroy(u);
667
0
        return -1;
668
0
    }
669
670
    /* perform request */
671
0
    flb_ret = calyptia_http_do(ctx, c, action);
672
0
    if (flb_ret == FLB_OK &&
673
0
        (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) {
674
0
        if (c->resp.payload_size > 0) {
675
0
            if (action == CALYPTIA_ACTION_REGISTER) {
676
                /* agent id */
677
0
                ctx->agent_id = get_agent_info(c->resp.payload,
678
0
                                               c->resp.payload_size,
679
0
                                               "id");
680
681
                /* agent token */
682
0
                ctx->agent_token = get_agent_info(c->resp.payload,
683
0
                                                  c->resp.payload_size,
684
0
                                                  "token");
685
686
0
                if (ctx->agent_id && ctx->agent_token) {
687
0
                    flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
688
0
                                 ctx->agent_id);
689
690
0
                    if (ctx->store_path && ctx->fs) {
691
0
                        ret = store_session_set(ctx,
692
0
                                                c->resp.payload,
693
0
                                                c->resp.payload_size);
694
0
                        if (ret == -1) {
695
0
                            flb_plg_warn(ctx->ins,
696
0
                                         "could not store Calyptia session");
697
0
                        }
698
0
                    }
699
0
                }
700
0
            }
701
0
        }
702
703
0
        if (action == CALYPTIA_ACTION_PATCH) {
704
0
            flb_plg_info(ctx->ins, "known agent registration successful");
705
0
        }
706
0
    }
707
708
    /* release resources */
709
0
    flb_sds_destroy(meta);
710
0
    flb_http_client_destroy(c);
711
0
    flb_upstream_conn_release(u_conn);
712
0
    flb_upstream_destroy(u);
713
714
0
    return flb_ret;
715
0
}
716
717
static struct flb_calyptia *config_init(struct flb_output_instance *ins,
718
                                        struct flb_config *config)
719
0
{
720
0
    int ret;
721
0
    int flags;
722
0
    struct flb_calyptia *ctx;
723
724
    /* Calyptia plugin context */
725
0
    ctx = flb_calloc(1, sizeof(struct flb_calyptia));
726
0
    if (!ctx) {
727
0
        flb_errno();
728
0
        return NULL;
729
0
    }
730
0
    ctx->ins = ins;
731
0
    ctx->config = config;
732
0
    flb_kv_init(&ctx->kv_labels);
733
734
    /* Load the config map */
735
0
    ret = flb_output_config_map_set(ins, (void *) ctx);
736
0
    if (ret == -1) {
737
0
        calyptia_ctx_destroy(ctx);
738
0
        return NULL;
739
0
    }
740
741
0
    ctx->metrics_endpoint = flb_sds_create_size(256);
742
0
    if (!ctx->metrics_endpoint) {
743
0
        calyptia_ctx_destroy(ctx);
744
0
        return NULL;
745
0
    }
746
747
0
#ifdef FLB_HAVE_CHUNK_TRACE
748
0
    ctx->trace_endpoint = flb_sds_create_size(256);
749
0
    if (!ctx->trace_endpoint) {
750
0
        calyptia_ctx_destroy(ctx);
751
0
        return NULL;
752
0
    }
753
0
#endif
754
755
    /* api_key */
756
0
    if (!ctx->api_key) {
757
0
        flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
758
0
        calyptia_ctx_destroy(ctx);
759
0
        return NULL;
760
0
    }
761
762
    /* parse 'add_label' */
763
0
    ret = config_add_labels(ins, ctx);
764
0
    if (ret == -1) {
765
0
        calyptia_ctx_destroy(ctx);
766
0
        return NULL;
767
0
    }
768
769
    /* env reader */
770
0
    ctx->env = flb_env_create();
771
772
    /* Set context */
773
0
    flb_output_set_context(ins, ctx);
774
775
    /* Initialize optional storage */
776
0
    if (ctx->store_path) {
777
0
        ret = store_init(ctx);
778
0
        if (ret == -1) {
779
0
            flb_output_set_context(ins, NULL);
780
0
            calyptia_ctx_destroy(ctx);
781
0
            return NULL;
782
0
        }
783
0
    }
784
785
    /* the machine-id is provided by custom calyptia, which invokes this plugin. */
786
0
    if (!ctx->machine_id) {
787
0
        flb_plg_error(ctx->ins, "machine_id has not been set");
788
0
        flb_output_set_context(ins, NULL);
789
0
        calyptia_ctx_destroy(ctx);
790
0
        return NULL;
791
0
    }
792
793
0
    flb_plg_debug(ctx->ins, "machine_id=%s", ctx->machine_id);
794
795
    /* Upstream */
796
0
    flags = get_io_flags(ctx->ins);
797
0
    ctx->u = flb_upstream_create(ctx->config,
798
0
                                 ctx->cloud_host, ctx->cloud_port,
799
0
                                 flags, ctx->ins->tls);
800
0
    if (!ctx->u) {
801
0
        flb_output_set_context(ins, NULL);
802
0
        calyptia_ctx_destroy(ctx);
803
0
        return NULL;
804
0
    }
805
806
    /* Set instance flags into upstream */
807
0
    flb_output_upstream_set(ctx->u, ins);
808
809
0
    return ctx;
810
0
}
811
812
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
813
0
{
814
0
    int ret;
815
816
    /* Try registration */
817
0
    ret = api_agent_create(config, ctx);
818
0
    if (ret != FLB_OK) {
819
0
        flb_plg_warn(ctx->ins, "agent registration failed");
820
0
        return FLB_ERROR;
821
0
    }
822
823
    /* Update endpoints */
824
0
    flb_sds_len_set(ctx->metrics_endpoint, 0);
825
0
    flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
826
0
                   ctx->agent_id);
827
828
0
#ifdef FLB_HAVE_CHUNK_TRACE
829
0
    if (ctx->pipeline_id) {
830
0
        flb_sds_len_set(ctx->trace_endpoint, 0);
831
0
        flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
832
0
                       ctx->pipeline_id);
833
0
    }
834
0
#endif
835
836
0
    flb_plg_info(ctx->ins, "agent registration successful");
837
0
    return FLB_OK;
838
0
}
839
840
static int cb_calyptia_init(struct flb_output_instance *ins,
841
                           struct flb_config *config, void *data)
842
{
843
    struct flb_calyptia *ctx;
844
    (void) data;
845
    int ret;
846
847
    /* create config context */
848
    ctx = config_init(ins, config);
849
    if (!ctx) {
850
        flb_plg_error(ins, "could not initialize configuration");
851
        return -1;
852
    }
853
854
    /*
855
     * This plugin instance uses the HTTP client interface, let's register
856
     * it debugging callbacks.
857
     */
858
    flb_output_set_http_debug_callbacks(ins);
859
860
    ret = register_agent(ctx, config);
861
    if (ret != FLB_OK && !ctx->register_retry_on_flush) {
862
        flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
863
        return -1;
864
    }
865
866
    return 0;
867
}
868
869
static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
870
0
{
871
0
    int ret;
872
0
    size_t off = 0;
873
0
    struct cmt *cmt;
874
0
    cfl_sds_t out;
875
876
0
    ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
877
0
    if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
878
0
        flb_plg_warn(ctx->ins, "could not unpack debug payload");
879
0
        return;
880
0
    }
881
882
0
    out = cmt_encode_text_create(cmt);
883
0
    flb_plg_debug(ctx->ins, "debug payload:\n%s", out);
884
0
    cmt_encode_text_destroy(out);
885
0
    cmt_destroy(cmt);
886
0
}
887
888
static void calyptia_ctx_destroy(struct flb_calyptia *ctx)
889
0
{
890
0
    if (!ctx) {
891
0
        return;
892
0
    }
893
894
0
    if (ctx->u) {
895
0
        flb_upstream_destroy(ctx->u);
896
0
    }
897
898
0
    if (ctx->agent_id) {
899
0
        flb_sds_destroy(ctx->agent_id);
900
0
    }
901
902
0
    if (ctx->agent_token) {
903
0
        flb_sds_destroy(ctx->agent_token);
904
0
    }
905
906
0
    if (ctx->env) {
907
0
        flb_env_destroy(ctx->env);
908
0
    }
909
910
0
    if (ctx->metrics_endpoint) {
911
0
        flb_sds_destroy(ctx->metrics_endpoint);
912
0
    }
913
914
0
#ifdef FLB_HAVE_CHUNK_TRACE
915
0
    if (ctx->trace_endpoint) {
916
0
        flb_sds_destroy(ctx->trace_endpoint);
917
0
    }
918
0
#endif
919
920
0
    if (ctx->fs) {
921
0
        flb_fstore_destroy(ctx->fs);
922
0
    }
923
924
0
    flb_kv_release(&ctx->kv_labels);
925
0
    flb_free(ctx);
926
0
}
927
928
static int cb_calyptia_exit(void *data, struct flb_config *config)
929
{
930
    (void) config;
931
    calyptia_ctx_destroy((struct flb_calyptia *) data);
932
    return 0;
933
}
934
935
static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
936
                             struct flb_output_flush *out_flush,
937
                             struct flb_input_instance *i_ins,
938
                             void *out_context,
939
                             struct flb_config *config)
940
0
{
941
0
    int ret;
942
0
    size_t off = 0;
943
0
    size_t out_size = 0;
944
0
    char *out_buf = NULL;
945
0
    struct flb_connection *u_conn;
946
0
    struct flb_http_client *c = NULL;
947
0
    struct flb_calyptia *ctx = out_context;
948
0
    struct cmt *cmt;
949
0
    flb_sds_t json;
950
0
    (void) i_ins;
951
0
    (void) config;
952
953
0
    if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
954
0
        flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
955
0
        if (register_agent(ctx, config) != FLB_OK) {
956
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
957
0
        }
958
0
    }
959
0
    else if (!ctx->agent_id || !ctx->agent_token) {
960
0
        flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
961
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
962
0
    }
963
964
    /* Get upstream connection */
965
0
    u_conn = flb_upstream_conn_get(ctx->u);
966
0
    if (!u_conn) {
967
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
968
0
    }
969
970
0
    if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
971
        /* if we have labels append them */
972
0
        if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
973
0
            ret = cmt_decode_msgpack_create(&cmt,
974
0
                                            (char *) event_chunk->data,
975
0
                                            event_chunk->size,
976
0
                                            &off);
977
0
            if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
978
0
                flb_upstream_conn_release(u_conn);
979
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
980
0
            }
981
982
            /* append labels set by config */
983
0
            append_labels(ctx, cmt);
984
985
            /* encode back to msgpack */
986
0
            ret = cmt_encode_msgpack_create(cmt, &out_buf, &out_size);
987
0
            if (ret != 0) {
988
0
                cmt_destroy(cmt);
989
0
                flb_upstream_conn_release(u_conn);
990
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
991
0
            }
992
0
            cmt_destroy(cmt);
993
0
        }
994
0
        else {
995
0
            out_buf = (char *) event_chunk->data;
996
0
            out_size = event_chunk->size;
997
0
        }
998
999
        /* Compose HTTP Client request */
1000
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
1001
0
                           out_buf, out_size, NULL, 0, NULL, 0);
1002
0
        if (!c) {
1003
0
            if (out_buf != event_chunk->data) {
1004
0
                cmt_encode_msgpack_destroy(out_buf);
1005
0
            }
1006
0
            flb_upstream_conn_release(u_conn);
1007
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1008
0
        }
1009
1010
        /* perform request */
1011
0
        ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
1012
0
        if (ret == FLB_OK) {
1013
0
            flb_plg_debug(ctx->ins, "metrics delivered OK");
1014
0
        }
1015
0
        else {
1016
0
            flb_plg_error(ctx->ins, "could not deliver metrics");
1017
0
            debug_payload(ctx, out_buf, out_size);
1018
0
        }
1019
1020
0
        if (out_buf != event_chunk->data) {
1021
0
            cmt_encode_msgpack_destroy(out_buf);
1022
0
        }
1023
0
    }
1024
1025
0
#ifdef FLB_HAVE_CHUNK_TRACE
1026
0
    if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
1027
0
        event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
1028
0
        json = flb_pack_msgpack_to_json_format(event_chunk->data,
1029
0
                                            event_chunk->size,
1030
0
                                            FLB_PACK_JSON_FORMAT_STREAM,
1031
0
                                            FLB_PACK_JSON_DATE_DOUBLE,
1032
0
                                            NULL,
1033
0
                                            FLB_TRUE); /* Trace is ASCII */
1034
0
        if (json == NULL) {
1035
0
            flb_upstream_conn_release(u_conn);
1036
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1037
0
        }
1038
1039
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
1040
0
                           (char *) json, flb_sds_len(json),
1041
0
                           NULL, 0, NULL, 0);
1042
1043
0
        if (!c) {
1044
0
            flb_upstream_conn_release(u_conn);
1045
0
            flb_sds_destroy(json);
1046
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1047
0
        }
1048
1049
0
        ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
1050
0
        if (ret == FLB_OK) {
1051
0
            flb_plg_debug(ctx->ins, "trace delivered OK");
1052
0
        }
1053
0
        else {
1054
0
            flb_plg_error(ctx->ins, "could not deliver trace");
1055
0
            debug_payload(ctx, (char *) json, flb_sds_len(json));
1056
0
        }
1057
0
        flb_sds_destroy(json);
1058
0
    }
1059
0
#endif /* FLB_HAVE_CHUNK_TRACE */
1060
1061
0
    if (c) {
1062
0
        flb_http_client_destroy(c);
1063
0
    }
1064
1065
0
    flb_upstream_conn_release(u_conn);
1066
1067
0
    FLB_OUTPUT_RETURN(ret);
1068
0
}
1069
1070
/* Configuration properties map */
1071
static struct flb_config_map config_map[] = {
1072
    {
1073
     FLB_CONFIG_MAP_STR, "cloud_host", DEFAULT_CALYPTIA_HOST,
1074
     0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_host),
1075
     "",
1076
    },
1077
1078
    {
1079
     FLB_CONFIG_MAP_INT, "cloud_port", DEFAULT_CALYPTIA_PORT,
1080
     0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_port),
1081
     "",
1082
    },
1083
1084
    {
1085
     FLB_CONFIG_MAP_STR, "api_key", NULL,
1086
     0, FLB_TRUE, offsetof(struct flb_calyptia, api_key),
1087
     "Calyptia Cloud API Key."
1088
    },
1089
    {
1090
     FLB_CONFIG_MAP_STR, "machine_id", NULL,
1091
     0, FLB_TRUE, offsetof(struct flb_calyptia, machine_id),
1092
     "Custom machine_id to be used when registering agent"
1093
    },
1094
    {
1095
     FLB_CONFIG_MAP_STR, "fleet_id", NULL,
1096
     0, FLB_TRUE, offsetof(struct flb_calyptia, fleet_id),
1097
     "Fleet ID for identifying as part of a managed fleet"
1098
    },
1099
1100
    {
1101
     FLB_CONFIG_MAP_STR, "store_path", NULL,
1102
     0, FLB_TRUE, offsetof(struct flb_calyptia, store_path),
1103
     ""
1104
    },
1105
1106
    {
1107
     FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
1108
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_calyptia, add_labels),
1109
     "Label to append to the generated metric."
1110
    },
1111
1112
#ifdef FLB_HAVE_CHUNK_TRACE
1113
    {
1114
     FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
1115
     0, FLB_TRUE, offsetof(struct flb_calyptia, pipeline_id),
1116
     "Pipeline ID for calyptia core traces."
1117
    },
1118
#endif
1119
    {
1120
     FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
1121
     0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
1122
     "Retry agent registration on flush if failed on init."
1123
    },
1124
    /* EOF */
1125
    {0}
1126
};
1127
1128
struct flb_output_plugin out_calyptia_plugin = {
1129
    .name         = "calyptia",
1130
    .description  = "Calyptia Cloud",
1131
    .cb_init      = cb_calyptia_init,
1132
    .cb_flush     = cb_calyptia_flush,
1133
    .cb_exit      = cb_calyptia_exit,
1134
    .config_map   = config_map,
1135
    .flags        = FLB_OUTPUT_NET | FLB_OUTPUT_PRIVATE | FLB_IO_OPT_TLS,
1136
    .event_type   = FLB_OUTPUT_METRICS
1137
};