Coverage Report

Created: 2025-07-04 07:08

/src/fluent-bit/plugins/out_calyptia/calyptia.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_log.h>
22
#include <fluent-bit/flb_kv.h>
23
#include <fluent-bit/flb_upstream.h>
24
#include <fluent-bit/flb_utils.h>
25
#include <fluent-bit/flb_pack.h>
26
#include <fluent-bit/flb_version.h>
27
#include <fluent-bit/flb_metrics.h>
28
#include <fluent-bit/flb_fstore.h>
29
30
#include "calyptia.h"
31
32
#include <cmetrics/cmetrics.h>
33
#include <cmetrics/cmt_encode_influx.h>
34
35
flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx);
36
37
static int get_io_flags(struct flb_output_instance *ins)
38
0
{
39
0
    int flags = 0;
40
41
0
    if (ins->use_tls) {
42
0
        flags = FLB_IO_TLS;
43
0
    }
44
0
    else {
45
0
        flags = FLB_IO_TCP;
46
0
    }
47
48
0
    return flags;
49
0
}
50
51
static int config_add_labels(struct flb_output_instance *ins,
52
                             struct flb_calyptia *ctx)
53
0
{
54
0
    struct mk_list *head;
55
0
    struct flb_config_map_val *mv;
56
0
    struct flb_slist_entry *k = NULL;
57
0
    struct flb_slist_entry *v = NULL;
58
0
    struct flb_kv *kv;
59
60
0
    if (!ctx->add_labels || mk_list_size(ctx->add_labels) == 0) {
61
0
        return 0;
62
0
    }
63
64
    /* iterate all 'add_label' definitions */
65
0
    flb_config_map_foreach(head, mv, ctx->add_labels) {
66
0
        if (mk_list_size(mv->val.list) != 2) {
67
0
            flb_plg_error(ins, "'add_label' expects a key and a value, "
68
0
                          "e.g: 'add_label version 1.8.x'");
69
0
            return -1;
70
0
        }
71
72
0
        k = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
73
0
        v = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
74
75
0
        kv = flb_kv_item_create(&ctx->kv_labels, k->str, v->str);
76
0
        if (!kv) {
77
0
            flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
78
0
            return -1;
79
0
        }
80
0
    }
81
82
0
    return 0;
83
0
}
84
85
static void append_labels(struct flb_calyptia *ctx, struct cmt *cmt)
86
0
{
87
0
    struct flb_kv *kv;
88
0
    struct mk_list *head;
89
90
0
    mk_list_foreach(head, &ctx->kv_labels) {
91
0
        kv = mk_list_entry(head, struct flb_kv, _head);
92
0
        cmt_label_add(cmt, kv->key, kv->val);
93
0
    }
94
0
}
95
96
static void pack_str(msgpack_packer *mp_pck, char *str)
97
0
{
98
0
    int len;
99
100
0
    len = strlen(str);
101
0
    msgpack_pack_str(mp_pck, len);
102
0
    msgpack_pack_str_body(mp_pck, str, len);
103
0
}
104
105
static void pack_env(struct flb_env *env, char *prefix,  char *key,
106
                     struct flb_mp_map_header *h,
107
                     msgpack_packer *mp_pck)
108
0
{
109
0
    int len = 0;
110
0
    char *val;
111
112
    /* prefix set in the key, if set, adjust the key name */
113
0
    if (prefix) {
114
0
        len = strlen(prefix);
115
0
    }
116
117
0
    val = (char *) flb_env_get(env, key);
118
0
    if (val) {
119
0
        flb_mp_map_header_append(h);
120
0
        pack_str(mp_pck, key + len);
121
0
        pack_str(mp_pck, val);
122
0
    }
123
0
}
124
125
static void pack_env_metadata(struct flb_env *env,
126
                              struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
127
0
{
128
0
    char *tmp;
129
0
    struct flb_mp_map_header h;
130
0
    struct flb_mp_map_header meta;
131
132
    /* Metadata */
133
0
    flb_mp_map_header_append(mh);
134
0
    pack_str(mp_pck, "metadata");
135
136
0
    flb_mp_map_header_init(&meta, mp_pck);
137
138
    /* Kubernetes */
139
0
    tmp = (char *) flb_env_get(env, "k8s");
140
0
    if (tmp && strcasecmp(tmp, "enabled") == 0) {
141
0
        flb_mp_map_header_append(&meta);
142
0
        pack_str(mp_pck, "k8s");
143
144
        /* adding k8s map */
145
0
        flb_mp_map_header_init(&h, mp_pck);
146
147
0
        pack_env(env, "k8s.", "k8s.namespace", &h, mp_pck);
148
0
        pack_env(env, "k8s.", "k8s.pod_name", &h, mp_pck);
149
0
        pack_env(env, "k8s.", "k8s.node_name", &h, mp_pck);
150
151
0
        flb_mp_map_header_end(&h);
152
0
    }
153
154
    /* AWS */
155
0
    tmp = (char *) flb_env_get(env, "aws");
156
0
    if (tmp && strcasecmp(tmp, "enabled") == 0) {
157
0
        flb_mp_map_header_append(&meta);
158
0
        pack_str(mp_pck, "aws");
159
160
        /* adding aws map */
161
0
        flb_mp_map_header_init(&h, mp_pck);
162
163
0
        pack_env(env, "aws.", "aws.az", &h, mp_pck);
164
0
        pack_env(env, "aws.", "aws.ec2_instance_id", &h, mp_pck);
165
0
        pack_env(env, "aws.", "aws.ec2_instance_type", &h, mp_pck);
166
0
        pack_env(env, "aws.", "aws.private_ip", &h, mp_pck);
167
0
        pack_env(env, "aws.", "aws.vpc_id", &h, mp_pck);
168
0
        pack_env(env, "aws.", "aws.ami_id", &h, mp_pck);
169
0
        pack_env(env, "aws.", "aws.account_id", &h, mp_pck);
170
0
        pack_env(env, "aws.", "aws.hostname", &h, mp_pck);
171
172
0
        flb_mp_map_header_end(&h);
173
0
    }
174
0
    flb_mp_map_header_end(&meta);
175
0
}
176
177
static flb_sds_t get_agent_metadata(struct flb_calyptia *ctx)
178
0
{
179
0
    int len;
180
0
    char *host;
181
0
    flb_sds_t conf;
182
0
    flb_sds_t meta;
183
0
    struct flb_mp_map_header mh;
184
0
    msgpack_sbuffer mp_sbuf;
185
0
    msgpack_packer mp_pck;
186
0
    struct flb_config *config = ctx->config;
187
188
    /* init msgpack */
189
0
    msgpack_sbuffer_init(&mp_sbuf);
190
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
191
192
    /* pack map */
193
0
    flb_mp_map_header_init(&mh, &mp_pck);
194
195
0
    host = (char *) flb_env_get(ctx->env, "HOSTNAME");
196
0
    if (!host) {
197
0
        host = "unknown";
198
0
    }
199
0
    len = strlen(host);
200
201
    /* name */
202
0
    flb_mp_map_header_append(&mh);
203
0
    msgpack_pack_str(&mp_pck, 4);
204
0
    msgpack_pack_str_body(&mp_pck, "name", 4);
205
0
    msgpack_pack_str(&mp_pck, len);
206
0
    msgpack_pack_str_body(&mp_pck, host, len);
207
208
    /* type */
209
0
    flb_mp_map_header_append(&mh);
210
0
    msgpack_pack_str(&mp_pck, 4);
211
0
    msgpack_pack_str_body(&mp_pck, "type", 4);
212
0
    msgpack_pack_str(&mp_pck, 9);
213
0
    msgpack_pack_str_body(&mp_pck, "fluentbit", 9);
214
215
    /* rawConfig */
216
0
    conf = custom_calyptia_pipeline_config_get(ctx->config);
217
0
    if (conf) {
218
0
        flb_mp_map_header_append(&mh);
219
0
        len = flb_sds_len(conf);
220
0
        msgpack_pack_str(&mp_pck, 9);
221
0
        msgpack_pack_str_body(&mp_pck, "rawConfig", 9);
222
0
        msgpack_pack_str(&mp_pck, len);
223
0
        msgpack_pack_str_body(&mp_pck, conf, len);
224
0
    }
225
0
    flb_sds_destroy(conf);
226
227
    /* version */
228
0
    flb_mp_map_header_append(&mh);
229
0
    msgpack_pack_str(&mp_pck, 7);
230
0
    msgpack_pack_str_body(&mp_pck, "version", 7);
231
0
    len = strlen(FLB_VERSION_STR);
232
0
    msgpack_pack_str(&mp_pck, len);
233
0
    msgpack_pack_str_body(&mp_pck, FLB_VERSION_STR, len);
234
235
    /* edition */
236
0
    flb_mp_map_header_append(&mh);
237
0
    msgpack_pack_str(&mp_pck, 7);
238
0
    msgpack_pack_str_body(&mp_pck, "edition", 7);
239
0
    msgpack_pack_str(&mp_pck, 9);
240
0
    msgpack_pack_str_body(&mp_pck, "community", 9);
241
242
0
    flb_mp_map_header_append(&mh);
243
0
    msgpack_pack_str(&mp_pck, 2);
244
0
    msgpack_pack_str_body(&mp_pck, "os", 2);
245
#ifdef FLB_SYSTEM_WINDOWS
246
    len = strlen("windows");
247
    msgpack_pack_str(&mp_pck, len);
248
    msgpack_pack_str_body(&mp_pck, "windows", len);
249
#elif FLB_SYSTEM_MACOS
250
    len = strlen("macos");
251
    msgpack_pack_str(&mp_pck, len);
252
    msgpack_pack_str_body(&mp_pck, "macos", len);
253
#elif __linux__
254
    len = strlen("linux");
255
0
    msgpack_pack_str(&mp_pck, len);
256
0
    msgpack_pack_str_body(&mp_pck, "linux", len);
257
#else
258
    len = strlen("unknown");
259
    msgpack_pack_str(&mp_pck, len);
260
    msgpack_pack_str_body(&mp_pck, "unknown", len);
261
#endif
262
263
0
    flb_mp_map_header_append(&mh);
264
0
    msgpack_pack_str(&mp_pck, 4);
265
0
    msgpack_pack_str_body(&mp_pck, "arch", 4);
266
#if defined(__arm__) || defined(_M_ARM)
267
    len = strlen("arm");
268
    msgpack_pack_str(&mp_pck, len);
269
    msgpack_pack_str_body(&mp_pck, "arm", len);
270
#elif defined(__aarch64__)
271
    len = strlen("arm64");
272
    msgpack_pack_str(&mp_pck, len);
273
    msgpack_pack_str_body(&mp_pck, "arm64", len);
274
#elif defined(__amd64__) || defined(_M_AMD64)
275
    len = strlen("x86_64");
276
0
    msgpack_pack_str(&mp_pck, len);
277
0
    msgpack_pack_str_body(&mp_pck, "x86_64", len);
278
#elif defined(__i686__) || defined(_M_I86)
279
    len = strlen("x86");
280
    msgpack_pack_str(&mp_pck, len);
281
    msgpack_pack_str_body(&mp_pck, "x86", len);
282
#else
283
    len = strlen("unknown");
284
    msgpack_pack_str(&mp_pck, len);
285
    msgpack_pack_str_body(&mp_pck, "unknown", len);
286
#endif
287
288
    /* machineID */
289
0
    flb_mp_map_header_append(&mh);
290
0
    msgpack_pack_str(&mp_pck, 9);
291
0
    msgpack_pack_str_body(&mp_pck, "machineID", 9);
292
0
    len = flb_sds_len(ctx->machine_id);
293
0
    msgpack_pack_str(&mp_pck, len);
294
0
    msgpack_pack_str_body(&mp_pck, ctx->machine_id, len);
295
296
    /* fleetID */
297
0
    if (ctx->fleet_id) {
298
0
        flb_mp_map_header_append(&mh);
299
0
        msgpack_pack_str(&mp_pck, 7);
300
0
        msgpack_pack_str_body(&mp_pck, "fleetID", 7);
301
0
        len = flb_sds_len(ctx->fleet_id);
302
0
        msgpack_pack_str(&mp_pck, len);
303
0
        msgpack_pack_str_body(&mp_pck, ctx->fleet_id, len);
304
0
    }
305
306
    /* pack environment metadata */
307
0
    pack_env_metadata(config->env, &mh, &mp_pck);
308
309
    /* finalize */
310
0
    flb_mp_map_header_end(&mh);
311
312
    /* convert to json */
313
0
    meta = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
314
0
    msgpack_sbuffer_destroy(&mp_sbuf);
315
316
0
    return meta;
317
0
}
318
319
static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
320
                            int type)
321
0
{
322
0
    int ret;
323
0
    size_t b_sent;
324
325
0
    if( !ctx || !c ) {
326
0
        return FLB_ERROR;
327
0
    }
328
329
    /* Ensure agent_token is not empty when required */
330
0
    if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
331
0
        !ctx->agent_token) {
332
0
        flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
333
0
        return FLB_ERROR;
334
0
    }
335
336
    /* append headers */
337
0
    if (type == CALYPTIA_ACTION_REGISTER) {
338
        // When registering a new agent api key is required
339
0
        if (!ctx->api_key) {
340
0
            flb_plg_error(ctx->ins, "api_key is missing");
341
0
            return FLB_ERROR;
342
0
        }
343
0
        flb_http_add_header(c,
344
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
345
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
346
347
0
        flb_http_add_header(c,
348
0
                            CALYPTIA_HEADERS_PROJECT, sizeof(CALYPTIA_HEADERS_PROJECT) - 1,
349
0
                            ctx->api_key, flb_sds_len(ctx->api_key));
350
0
    }
351
0
    else if (type == CALYPTIA_ACTION_PATCH) {
352
0
        flb_http_add_header(c,
353
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
354
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
355
356
0
        flb_http_add_header(c,
357
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
358
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
359
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
360
0
    }
361
0
    else if (type == CALYPTIA_ACTION_METRICS) {
362
0
        flb_http_add_header(c,
363
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
364
0
                            CALYPTIA_HEADERS_CTYPE_MSGPACK,
365
0
                            sizeof(CALYPTIA_HEADERS_CTYPE_MSGPACK) - 1);
366
367
0
        flb_http_add_header(c,
368
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
369
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
370
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
371
0
    }
372
0
#ifdef FLB_HAVE_CHUNK_TRACE
373
0
    else if (type == CALYPTIA_ACTION_TRACE)  {
374
0
        flb_http_add_header(c,
375
0
                            CALYPTIA_HEADERS_CTYPE, sizeof(CALYPTIA_HEADERS_CTYPE) - 1,
376
0
                            CALYPTIA_HEADERS_CTYPE_JSON, sizeof(CALYPTIA_HEADERS_CTYPE_JSON) - 1);
377
378
0
        flb_http_add_header(c,
379
0
                            CALYPTIA_HEADERS_AGENT_TOKEN,
380
0
                            sizeof(CALYPTIA_HEADERS_AGENT_TOKEN) - 1,
381
0
                            ctx->agent_token, flb_sds_len(ctx->agent_token));
382
0
    }
383
0
#endif
384
385
    /* Map debug callbacks */
386
0
    flb_http_client_debug(c, ctx->ins->callback);
387
388
    /* Perform HTTP request */
389
0
    ret = flb_http_do(c, &b_sent);
390
0
    if (ret != 0) {
391
0
        flb_plg_warn(ctx->ins, "http_do=%i", ret);
392
0
        return FLB_RETRY;
393
0
    }
394
395
0
    if (c->resp.status != 200 && c->resp.status != 201 && c->resp.status != 204) {
396
0
        if (c->resp.payload_size > 0) {
397
0
            flb_plg_warn(ctx->ins, "http_status=%i:\n%s",
398
0
                         c->resp.status, c->resp.payload);
399
0
        }
400
0
        else {
401
0
            flb_plg_warn(ctx->ins, "http_status=%i", c->resp.status);
402
0
        }
403
404
        /* invalid metrics */
405
0
        if (c->resp.status == 422) {
406
0
            return FLB_ERROR;
407
0
        }
408
0
        return FLB_RETRY;;
409
0
    }
410
411
0
    return FLB_OK;
412
0
}
413
414
static flb_sds_t get_agent_info(char *buf, size_t size, char *k)
415
0
{
416
0
    int i;
417
0
    int ret;
418
0
    int type;
419
0
    int len;
420
0
    char *out_buf;
421
0
    flb_sds_t v = NULL;
422
0
    size_t off = 0;
423
0
    size_t out_size;
424
0
    msgpack_unpacked result;
425
0
    msgpack_object root;
426
0
    msgpack_object key;
427
0
    msgpack_object val;
428
429
0
    len = strlen(k);
430
431
0
    ret = flb_pack_json(buf, size, &out_buf, &out_size, &type, NULL);
432
0
    if (ret != 0) {
433
0
        return NULL;
434
0
    }
435
436
0
    msgpack_unpacked_init(&result);
437
0
    ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
438
0
    if (ret != MSGPACK_UNPACK_SUCCESS) {
439
0
        flb_free(out_buf);
440
0
        msgpack_unpacked_destroy(&result);
441
0
        return NULL;
442
0
    }
443
444
0
    root = result.data;
445
0
    if (root.type != MSGPACK_OBJECT_MAP) {
446
0
        flb_free(out_buf);
447
0
        msgpack_unpacked_destroy(&result);
448
0
        return NULL;
449
0
    }
450
451
0
    for (i = 0; i < root.via.map.size; i++) {
452
0
        key = root.via.map.ptr[i].key;
453
0
        val = root.via.map.ptr[i].val;
454
455
0
        if (key.type != MSGPACK_OBJECT_STR || val.type != MSGPACK_OBJECT_STR) {
456
0
            continue;
457
0
        }
458
459
0
        if (key.via.str.size != len) {
460
0
            continue;
461
0
        }
462
463
0
        if (strncmp(key.via.str.ptr, k, len) == 0) {
464
0
            v = flb_sds_create_len(val.via.str.ptr, val.via.str.size);
465
0
            break;
466
0
        }
467
0
    }
468
469
0
    flb_free(out_buf);
470
0
    msgpack_unpacked_destroy(&result);
471
0
    return v;
472
0
}
473
474
/* Set the session content */
475
static int store_session_set(struct flb_calyptia *ctx, char *buf, size_t size)
476
0
{
477
0
    int ret;
478
0
    int type;
479
0
    char *mp_buf;
480
0
    size_t mp_size;
481
482
    /* remove any previous session file */
483
0
    if (ctx->fs_file) {
484
0
        flb_fstore_file_delete(ctx->fs, ctx->fs_file);
485
0
    }
486
487
    /* create session file */
488
0
    ctx->fs_file = flb_fstore_file_create(ctx->fs, ctx->fs_stream,
489
0
                                          CALYPTIA_SESSION_FILE, 1024);
490
0
    if (!ctx->fs_file) {
491
0
        flb_plg_error(ctx->ins, "could not create new session file");
492
0
        return -1;
493
0
    }
494
495
    /* store meta */
496
0
    flb_fstore_file_meta_set(ctx->fs, ctx->fs_file,
497
0
                             FLB_VERSION_STR "\n", sizeof(FLB_VERSION_STR) - 1);
498
499
    /* encode */
500
0
    ret = flb_pack_json(buf, size, &mp_buf, &mp_size, &type, NULL);
501
0
    if (ret < 0) {
502
0
        flb_plg_error(ctx->ins, "could not encode session information");
503
0
        return -1;
504
0
    }
505
506
    /* store content */
507
0
    ret = flb_fstore_file_append(ctx->fs_file, mp_buf, mp_size);
508
0
    if (ret == -1) {
509
0
        flb_plg_error(ctx->ins, "could not store session information");
510
0
        flb_free(mp_buf);
511
0
        return -1;
512
0
    }
513
514
0
    flb_free(mp_buf);
515
0
    return 0;
516
0
}
517
518
static int store_session_get(struct flb_calyptia *ctx,
519
                             void **out_buf, size_t *out_size)
520
0
{
521
0
    int ret;
522
0
    void *buf;
523
0
    size_t size;
524
0
    flb_sds_t json;
525
526
0
    ret = flb_fstore_file_content_copy(ctx->fs, ctx->fs_file,
527
0
                                       &buf, &size);
528
529
0
    if (size == 0) {
530
0
        return -1;
531
0
    }
532
533
    /* decode */
534
0
    json = flb_msgpack_raw_to_json_sds(buf, size);
535
0
    flb_free(buf);
536
0
    if (!json) {
537
0
        return -1;
538
0
    }
539
540
0
    *out_buf = json;
541
0
    *out_size = flb_sds_len(json);
542
543
0
    return ret;
544
0
}
545
546
static int store_init(struct flb_calyptia *ctx)
547
0
{
548
0
    int ret;
549
0
    struct flb_fstore *fs;
550
0
    struct flb_fstore_file *fsf;
551
0
    void *buf;
552
0
    size_t size;
553
554
    /* store context */
555
0
    fs = flb_fstore_create(ctx->store_path, FLB_FSTORE_FS);
556
0
    if (!fs) {
557
0
        flb_plg_error(ctx->ins,
558
0
                      "could not initialize 'store_path': %s",
559
0
                      ctx->store_path);
560
0
        return -1;
561
0
    }
562
0
    ctx->fs = fs;
563
564
    /* stream */
565
0
    ctx->fs_stream = flb_fstore_stream_create(ctx->fs, "calyptia");
566
0
    if (!ctx->fs_stream) {
567
0
        flb_plg_error(ctx->ins, "could not create storage stream");
568
0
        return -1;
569
0
    }
570
571
    /* lookup any previous file */
572
0
    fsf = flb_fstore_file_get(ctx->fs, ctx->fs_stream, CALYPTIA_SESSION_FILE,
573
0
                              sizeof(CALYPTIA_SESSION_FILE) - 1);
574
0
    if (!fsf) {
575
0
        flb_plg_debug(ctx->ins, "no session file was found");
576
0
        return 0;
577
0
    }
578
0
    ctx->fs_file = fsf;
579
580
    /* retrieve session info */
581
0
    ret = store_session_get(ctx, &buf, &size);
582
0
    if (ret == 0) {
583
        /* agent id */
584
0
        ctx->agent_id = get_agent_info(buf, size, "id");
585
586
        /* agent token */
587
0
        ctx->agent_token = get_agent_info(buf, size, "token");
588
589
0
        if (ctx->agent_id && ctx->agent_token) {
590
0
            flb_plg_info(ctx->ins, "session setup OK");
591
0
        }
592
0
        else {
593
0
            if (ctx->agent_id) {
594
0
                flb_sds_destroy(ctx->agent_id);
595
0
            }
596
0
            if (ctx->agent_token) {
597
0
                flb_sds_destroy(ctx->agent_token);
598
0
            }
599
0
        }
600
0
        flb_sds_destroy(buf);
601
0
    }
602
603
0
    return 0;
604
0
}
605
606
/* Agent creation is perform on initialization using a sync upstream connection */
607
static int api_agent_create(struct flb_config *config, struct flb_calyptia *ctx)
608
0
{
609
0
    int ret;
610
0
    int flb_ret;
611
0
    int flags;
612
0
    int action = CALYPTIA_ACTION_REGISTER;
613
0
    char uri[1024];
614
0
    flb_sds_t meta;
615
0
    struct flb_upstream *u;
616
0
    struct flb_connection *u_conn;
617
0
    struct flb_http_client *c;
618
619
    /* Meta */
620
0
    meta = get_agent_metadata(ctx);
621
0
    if (!meta) {
622
0
        flb_plg_error(ctx->ins, "could not retrieve metadata");
623
0
        return -1;
624
0
    }
625
626
    /* Upstream */
627
0
    flags = get_io_flags(ctx->ins);
628
0
    u = flb_upstream_create(ctx->config,
629
0
                            ctx->cloud_host, ctx->cloud_port,
630
0
                            flags, ctx->ins->tls);
631
0
    if (!u) {
632
0
        flb_plg_error(ctx->ins,
633
0
                      "could not create upstream connection on 'agent create'");
634
0
        flb_sds_destroy(meta);
635
0
        return -1;
636
0
    }
637
638
    /* Make it synchronous */
639
0
    flb_stream_disable_async_mode(&u->base);
640
641
    /* Get upstream connection */
642
0
    u_conn = flb_upstream_conn_get(u);
643
0
    if (!u_conn) {
644
0
        flb_upstream_destroy(u);
645
0
        flb_sds_destroy(meta);
646
0
        return -1;
647
0
    }
648
649
0
    if (ctx->agent_id && ctx->agent_token) {
650
        /* Patch */
651
0
        action = CALYPTIA_ACTION_PATCH;
652
0
        snprintf(uri, sizeof(uri) - 1, CALYPTIA_ENDPOINT_PATCH, ctx->agent_id);
653
0
        c = flb_http_client(u_conn, FLB_HTTP_PATCH, uri,
654
0
                            meta, flb_sds_len(meta), NULL, 0, NULL, 0);
655
0
    }
656
0
    else {
657
        /* Create */
658
0
        action = CALYPTIA_ACTION_REGISTER;
659
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, CALYPTIA_ENDPOINT_CREATE,
660
0
                            meta, flb_sds_len(meta), NULL, 0, NULL, 0);
661
0
    }
662
663
0
    if (!c) {
664
0
        flb_upstream_conn_release(u_conn);
665
0
        flb_upstream_destroy(u);
666
0
        return -1;
667
0
    }
668
669
    /* perform request */
670
0
    flb_ret = calyptia_http_do(ctx, c, action);
671
0
    if (flb_ret == FLB_OK &&
672
0
        (c->resp.status == 200 || c->resp.status == 201 || c->resp.status == 204)) {
673
0
        if (c->resp.payload_size > 0) {
674
0
            if (action == CALYPTIA_ACTION_REGISTER) {
675
                /* agent id */
676
0
                ctx->agent_id = get_agent_info(c->resp.payload,
677
0
                                               c->resp.payload_size,
678
0
                                               "id");
679
680
                /* agent token */
681
0
                ctx->agent_token = get_agent_info(c->resp.payload,
682
0
                                                  c->resp.payload_size,
683
0
                                                  "token");
684
685
0
                if (ctx->agent_id && ctx->agent_token) {
686
0
                    flb_plg_info(ctx->ins, "connected to Calyptia, agent_id='%s'",
687
0
                                 ctx->agent_id);
688
689
0
                    if (ctx->store_path && ctx->fs) {
690
0
                        ret = store_session_set(ctx,
691
0
                                                c->resp.payload,
692
0
                                                c->resp.payload_size);
693
0
                        if (ret == -1) {
694
0
                            flb_plg_warn(ctx->ins,
695
0
                                         "could not store Calyptia session");
696
0
                        }
697
0
                    }
698
0
                }
699
0
            }
700
0
        }
701
702
0
        if (action == CALYPTIA_ACTION_PATCH) {
703
0
            flb_plg_info(ctx->ins, "known agent registration successful");
704
0
        }
705
0
    }
706
707
    /* release resources */
708
0
    flb_sds_destroy(meta);
709
0
    flb_http_client_destroy(c);
710
0
    flb_upstream_conn_release(u_conn);
711
0
    flb_upstream_destroy(u);
712
713
0
    return flb_ret;
714
0
}
715
716
static struct flb_calyptia *config_init(struct flb_output_instance *ins,
717
                                        struct flb_config *config)
718
0
{
719
0
    int ret;
720
0
    int flags;
721
0
    struct flb_calyptia *ctx;
722
723
    /* Calyptia plugin context */
724
0
    ctx = flb_calloc(1, sizeof(struct flb_calyptia));
725
0
    if (!ctx) {
726
0
        flb_errno();
727
0
        return NULL;
728
0
    }
729
0
    ctx->ins = ins;
730
0
    ctx->config = config;
731
0
    flb_kv_init(&ctx->kv_labels);
732
733
    /* Load the config map */
734
0
    ret = flb_output_config_map_set(ins, (void *) ctx);
735
0
    if (ret == -1) {
736
0
        flb_free(ctx);
737
0
        return NULL;
738
0
    }
739
740
0
    ctx->metrics_endpoint = flb_sds_create_size(256);
741
0
    if (!ctx->metrics_endpoint) {
742
0
        flb_free(ctx);
743
0
        return NULL;
744
0
    }
745
746
0
#ifdef FLB_HAVE_CHUNK_TRACE
747
0
    ctx->trace_endpoint = flb_sds_create_size(256);
748
0
    if (!ctx->trace_endpoint) {
749
0
        flb_sds_destroy(ctx->metrics_endpoint);
750
0
        flb_free(ctx);
751
0
        return NULL;
752
0
    }
753
0
#endif
754
755
    /* api_key */
756
0
    if (!ctx->api_key) {
757
0
        flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
758
0
        flb_free(ctx);
759
0
        return NULL;
760
0
    }
761
762
    /* parse 'add_label' */
763
0
    ret = config_add_labels(ins, ctx);
764
0
    if (ret == -1) {
765
0
        return NULL;
766
0
    }
767
768
    /* env reader */
769
0
    ctx->env = flb_env_create();
770
771
    /* Set context */
772
0
    flb_output_set_context(ins, ctx);
773
774
    /* Initialize optional storage */
775
0
    if (ctx->store_path) {
776
0
        ret = store_init(ctx);
777
0
        if (ret == -1) {
778
0
            return NULL;
779
0
        }
780
0
    }
781
782
    /* the machine-id is provided by custom calyptia, which invokes this plugin. */
783
0
    if (!ctx->machine_id) {
784
0
        flb_plg_error(ctx->ins, "machine_id has not been set");
785
0
        return NULL;
786
0
    }
787
788
0
    flb_plg_debug(ctx->ins, "machine_id=%s", ctx->machine_id);
789
790
    /* Upstream */
791
0
    flags = get_io_flags(ctx->ins);
792
0
    ctx->u = flb_upstream_create(ctx->config,
793
0
                                 ctx->cloud_host, ctx->cloud_port,
794
0
                                 flags, ctx->ins->tls);
795
0
    if (!ctx->u) {
796
0
        return NULL;
797
0
    }
798
799
    /* Set instance flags into upstream */
800
0
    flb_output_upstream_set(ctx->u, ins);
801
802
0
    return ctx;
803
0
}
804
805
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
806
0
{
807
0
    int ret;
808
809
    /* Try registration */
810
0
    ret = api_agent_create(config, ctx);
811
0
    if (ret != FLB_OK) {
812
0
        flb_plg_warn(ctx->ins, "agent registration failed");
813
0
        return FLB_ERROR;
814
0
    }
815
816
    /* Update endpoints */
817
0
    flb_sds_len_set(ctx->metrics_endpoint, 0);
818
0
    flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
819
0
                   ctx->agent_id);
820
821
0
#ifdef FLB_HAVE_CHUNK_TRACE
822
0
    if (ctx->pipeline_id) {
823
0
        flb_sds_len_set(ctx->trace_endpoint, 0);
824
0
        flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
825
0
                       ctx->pipeline_id);
826
0
    }
827
0
#endif
828
829
0
    flb_plg_info(ctx->ins, "agent registration successful");
830
0
    return FLB_OK;
831
0
}
832
833
static int cb_calyptia_init(struct flb_output_instance *ins,
834
                           struct flb_config *config, void *data)
835
{
836
    struct flb_calyptia *ctx;
837
    (void) data;
838
    int ret;
839
840
    /* create config context */
841
    ctx = config_init(ins, config);
842
    if (!ctx) {
843
        flb_plg_error(ins, "could not initialize configuration");
844
        return -1;
845
    }
846
847
    /*
848
     * This plugin instance uses the HTTP client interface, let's register
849
     * it debugging callbacks.
850
     */
851
    flb_output_set_http_debug_callbacks(ins);
852
853
    ret = register_agent(ctx, config);
854
    if (ret != FLB_OK && !ctx->register_retry_on_flush) {
855
        flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
856
        return -1;
857
    }
858
859
    return 0;
860
}
861
862
static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
863
0
{
864
0
    int ret;
865
0
    size_t off = 0;
866
0
    struct cmt *cmt;
867
0
    cfl_sds_t out;
868
869
0
    ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off);
870
0
    if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
871
0
        flb_plg_warn(ctx->ins, "could not unpack debug payload");
872
0
        return;
873
0
    }
874
875
0
    out = cmt_encode_text_create(cmt);
876
0
    flb_plg_debug(ctx->ins, "debug payload:\n%s", out);
877
0
    cmt_encode_text_destroy(out);
878
0
    cmt_destroy(cmt);
879
0
}
880
881
static int cb_calyptia_exit(void *data, struct flb_config *config)
882
{
883
    struct flb_calyptia *ctx = data;
884
885
    if (!ctx) {
886
        return 0;
887
    }
888
889
    if (ctx->u) {
890
        flb_upstream_destroy(ctx->u);
891
    }
892
893
    if (ctx->agent_id) {
894
        flb_sds_destroy(ctx->agent_id);
895
    }
896
897
    if (ctx->agent_token) {
898
        flb_sds_destroy(ctx->agent_token);
899
    }
900
901
    if (ctx->env) {
902
        flb_env_destroy(ctx->env);
903
    }
904
905
    if (ctx->metrics_endpoint) {
906
        flb_sds_destroy(ctx->metrics_endpoint);
907
    }
908
909
#ifdef FLB_HAVE_CHUNK_TRACE
910
    if (ctx->trace_endpoint) {
911
        flb_sds_destroy(ctx->trace_endpoint);
912
    }
913
#endif /* FLB_HAVE_CHUNK_TRACE */
914
915
    if (ctx->fs) {
916
        flb_fstore_destroy(ctx->fs);
917
    }
918
919
    flb_kv_release(&ctx->kv_labels);
920
    flb_free(ctx);
921
922
    return 0;
923
}
924
925
static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
926
                             struct flb_output_flush *out_flush,
927
                             struct flb_input_instance *i_ins,
928
                             void *out_context,
929
                             struct flb_config *config)
930
0
{
931
0
    int ret;
932
0
    size_t off = 0;
933
0
    size_t out_size = 0;
934
0
    char *out_buf = NULL;
935
0
    struct flb_connection *u_conn;
936
0
    struct flb_http_client *c = NULL;
937
0
    struct flb_calyptia *ctx = out_context;
938
0
    struct cmt *cmt;
939
0
    flb_sds_t json;
940
0
    (void) i_ins;
941
0
    (void) config;
942
943
0
    if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
944
0
        flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
945
0
        if (register_agent(ctx, config) != FLB_OK) {
946
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
947
0
        }
948
0
    }
949
0
    else if (!ctx->agent_id || !ctx->agent_token) {
950
0
        flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
951
0
        FLB_OUTPUT_RETURN(FLB_ERROR);
952
0
    }
953
954
    /* Get upstream connection */
955
0
    u_conn = flb_upstream_conn_get(ctx->u);
956
0
    if (!u_conn) {
957
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
958
0
    }
959
960
0
    if (event_chunk->type == FLB_EVENT_TYPE_METRICS) {
961
        /* if we have labels append them */
962
0
        if (ctx->add_labels && mk_list_size(ctx->add_labels) > 0) {
963
0
            ret = cmt_decode_msgpack_create(&cmt,
964
0
                                            (char *) event_chunk->data,
965
0
                                            event_chunk->size,
966
0
                                            &off);
967
0
            if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
968
0
                flb_upstream_conn_release(u_conn);
969
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
970
0
            }
971
972
            /* append labels set by config */
973
0
            append_labels(ctx, cmt);
974
975
            /* encode back to msgpack */
976
0
            ret = cmt_encode_msgpack_create(cmt, &out_buf, &out_size);
977
0
            if (ret != 0) {
978
0
                cmt_destroy(cmt);
979
0
                flb_upstream_conn_release(u_conn);
980
0
                FLB_OUTPUT_RETURN(FLB_ERROR);
981
0
            }
982
0
            cmt_destroy(cmt);
983
0
        }
984
0
        else {
985
0
            out_buf = (char *) event_chunk->data;
986
0
            out_size = event_chunk->size;
987
0
        }
988
989
        /* Compose HTTP Client request */
990
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
991
0
                           out_buf, out_size, NULL, 0, NULL, 0);
992
0
        if (!c) {
993
0
            if (out_buf != event_chunk->data) {
994
0
                cmt_encode_msgpack_destroy(out_buf);
995
0
            }
996
0
            flb_upstream_conn_release(u_conn);
997
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
998
0
        }
999
1000
        /* perform request */
1001
0
        ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
1002
0
        if (ret == FLB_OK) {
1003
0
            flb_plg_debug(ctx->ins, "metrics delivered OK");
1004
0
        }
1005
0
        else {
1006
0
            flb_plg_error(ctx->ins, "could not deliver metrics");
1007
0
            debug_payload(ctx, out_buf, out_size);
1008
0
        }
1009
1010
0
        if (out_buf != event_chunk->data) {
1011
0
            cmt_encode_msgpack_destroy(out_buf);
1012
0
        }
1013
0
    }
1014
1015
0
#ifdef FLB_HAVE_CHUNK_TRACE
1016
0
    if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
1017
0
        event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
1018
0
        json = flb_pack_msgpack_to_json_format(event_chunk->data,
1019
0
                                            event_chunk->size,
1020
0
                                            FLB_PACK_JSON_FORMAT_STREAM,
1021
0
                                            FLB_PACK_JSON_DATE_DOUBLE,
1022
0
                                            NULL);
1023
0
        if (json == NULL) {
1024
0
            flb_upstream_conn_release(u_conn);
1025
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1026
0
        }
1027
1028
0
        c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
1029
0
                           (char *) json, flb_sds_len(json),
1030
0
                           NULL, 0, NULL, 0);
1031
1032
0
        if (!c) {
1033
0
            flb_upstream_conn_release(u_conn);
1034
0
            flb_sds_destroy(json);
1035
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1036
0
        }
1037
1038
0
        ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
1039
0
        if (ret == FLB_OK) {
1040
0
            flb_plg_debug(ctx->ins, "trace delivered OK");
1041
0
        }
1042
0
        else {
1043
0
            flb_plg_error(ctx->ins, "could not deliver trace");
1044
0
            debug_payload(ctx, (char *) json, flb_sds_len(json));
1045
0
        }
1046
0
        flb_sds_destroy(json);
1047
0
    }
1048
0
#endif /* FLB_HAVE_CHUNK_TRACE */
1049
1050
0
    flb_upstream_conn_release(u_conn);
1051
1052
0
    if (c) {
1053
0
        flb_http_client_destroy(c);
1054
0
    }
1055
1056
0
    FLB_OUTPUT_RETURN(ret);
1057
0
}
1058
1059
/* Configuration properties map */
1060
static struct flb_config_map config_map[] = {
1061
    {
1062
     FLB_CONFIG_MAP_STR, "cloud_host", DEFAULT_CALYPTIA_HOST,
1063
     0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_host),
1064
     "",
1065
    },
1066
1067
    {
1068
     FLB_CONFIG_MAP_INT, "cloud_port", DEFAULT_CALYPTIA_PORT,
1069
     0, FLB_TRUE, offsetof(struct flb_calyptia, cloud_port),
1070
     "",
1071
    },
1072
1073
    {
1074
     FLB_CONFIG_MAP_STR, "api_key", NULL,
1075
     0, FLB_TRUE, offsetof(struct flb_calyptia, api_key),
1076
     "Calyptia Cloud API Key."
1077
    },
1078
    {
1079
     FLB_CONFIG_MAP_STR, "machine_id", NULL,
1080
     0, FLB_TRUE, offsetof(struct flb_calyptia, machine_id),
1081
     "Custom machine_id to be used when registering agent"
1082
    },
1083
    {
1084
     FLB_CONFIG_MAP_STR, "fleet_id", NULL,
1085
     0, FLB_TRUE, offsetof(struct flb_calyptia, fleet_id),
1086
     "Fleet ID for identifying as part of a managed fleet"
1087
    },
1088
1089
    {
1090
     FLB_CONFIG_MAP_STR, "store_path", NULL,
1091
     0, FLB_TRUE, offsetof(struct flb_calyptia, store_path),
1092
     ""
1093
    },
1094
1095
    {
1096
     FLB_CONFIG_MAP_SLIST_1, "add_label", NULL,
1097
     FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_calyptia, add_labels),
1098
     "Label to append to the generated metric."
1099
    },
1100
1101
#ifdef FLB_HAVE_CHUNK_TRACE
1102
    {
1103
     FLB_CONFIG_MAP_STR, "pipeline_id", NULL,
1104
     0, FLB_TRUE, offsetof(struct flb_calyptia, pipeline_id),
1105
     "Pipeline ID for calyptia core traces."
1106
    },
1107
#endif
1108
    {
1109
     FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
1110
     0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
1111
     "Retry agent registration on flush if failed on init."
1112
    },
1113
    /* EOF */
1114
    {0}
1115
};
1116
1117
struct flb_output_plugin out_calyptia_plugin = {
1118
    .name         = "calyptia",
1119
    .description  = "Calyptia Cloud",
1120
    .cb_init      = cb_calyptia_init,
1121
    .cb_flush     = cb_calyptia_flush,
1122
    .cb_exit      = cb_calyptia_exit,
1123
    .config_map   = config_map,
1124
    .flags        = FLB_OUTPUT_NET | FLB_OUTPUT_PRIVATE | FLB_IO_OPT_TLS,
1125
    .event_type   = FLB_OUTPUT_METRICS
1126
};