Coverage Report

Created: 2023-03-10 07:33

/src/fluent-bit/plugins/out_logdna/logdna.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_mp.h>
22
#include <fluent-bit/flb_pack.h>
23
#include <fluent-bit/flb_env.h>
24
#include <fluent-bit/flb_http_client.h>
25
26
#include "logdna.h"
27
28
static inline int primary_key_check(msgpack_object k, char *name, int len)
29
0
{
30
0
    if (k.type != MSGPACK_OBJECT_STR) {
31
0
        return FLB_FALSE;
32
0
    }
33
34
0
    if (k.via.str.size != len) {
35
0
        return FLB_FALSE;
36
0
    }
37
38
0
    if (memcmp(k.via.str.ptr, name, len) == 0) {
39
0
        return FLB_TRUE;
40
0
    }
41
42
0
    return FLB_FALSE;
43
0
}
44
45
/*
46
 * This function looks for the following keys and add them to the buffer
47
 *
48
 * - level or severity
49
 * - file
50
 * - app
51
 * - meta
52
 */
53
static int record_append_primary_keys(struct flb_logdna *ctx,
54
                                      msgpack_object *map,
55
                                      msgpack_packer *mp_sbuf)
56
0
{
57
0
    int i;
58
0
    int c = 0;
59
0
    msgpack_object *level = NULL;
60
0
    msgpack_object *file = NULL;
61
0
    msgpack_object *app = NULL;
62
0
    msgpack_object *meta = NULL;
63
0
    msgpack_object k;
64
0
    msgpack_object v;
65
66
0
    for (i = 0; i < map->via.array.size; i++) {
67
0
        k = map->via.map.ptr[i].key;
68
0
        v = map->via.map.ptr[i].val;
69
70
        /* Level - optional */
71
0
        if (!level &&
72
0
            (primary_key_check(k, "level", 5) == FLB_TRUE ||
73
0
             primary_key_check(k, "severity", 8) == FLB_TRUE)) {
74
0
            level = &k;
75
0
            msgpack_pack_str(mp_sbuf, 5);
76
0
            msgpack_pack_str_body(mp_sbuf, "level", 5);
77
0
            msgpack_pack_object(mp_sbuf, v);
78
0
            c++;
79
0
        }
80
81
        /* Meta - optional */
82
0
        if (!meta && primary_key_check(k, "meta", 4) == FLB_TRUE) {
83
0
            meta = &k;
84
0
            msgpack_pack_str(mp_sbuf, 4);
85
0
            msgpack_pack_str_body(mp_sbuf, "meta", 4);
86
0
            msgpack_pack_object(mp_sbuf, v);
87
0
            c++;
88
0
        }
89
90
        /* File */
91
0
        if (!file && primary_key_check(k, "file", 4) == FLB_TRUE) {
92
0
            file = &k;
93
0
            msgpack_pack_str(mp_sbuf, 4);
94
0
            msgpack_pack_str_body(mp_sbuf, "file", 4);
95
0
            msgpack_pack_object(mp_sbuf, v);
96
0
            c++;
97
0
        }
98
99
        /* App */
100
0
        if (primary_key_check(k, "app", 3) == FLB_TRUE) {
101
0
            app = &k;
102
0
            msgpack_pack_str(mp_sbuf, 3);
103
0
            msgpack_pack_str_body(mp_sbuf, "app", 3);
104
0
            msgpack_pack_object(mp_sbuf, v);
105
0
            c++;
106
0
        }
107
0
    }
108
109
    /* Set the global file name if the record did not provided one */
110
0
    if (!file && ctx->file) {
111
0
        msgpack_pack_str(mp_sbuf, 4);
112
0
        msgpack_pack_str_body(mp_sbuf, "file", 4);
113
0
        msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->file));
114
0
        msgpack_pack_str_body(mp_sbuf, ctx->file, flb_sds_len(ctx->file));
115
0
        c++;
116
0
    }
117
118
119
    /* If no application name is set, set the default */
120
0
    if (!app) {
121
0
        msgpack_pack_str(mp_sbuf, 3);
122
0
        msgpack_pack_str_body(mp_sbuf, "app", 3);
123
0
        msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->app));
124
0
        msgpack_pack_str_body(mp_sbuf, ctx->app, flb_sds_len(ctx->app));
125
0
        c++;
126
0
    }
127
128
0
    return c;
129
0
}
130
131
static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
132
                                        const void *data, size_t bytes,
133
                                        const char *tag, int tag_len)
134
0
{
135
0
    int ret;
136
0
    int len;
137
0
    int mp_ok = MSGPACK_UNPACK_SUCCESS;
138
0
    int total_lines;
139
0
    int array_size = 0;
140
0
    size_t off = 0;
141
0
    off_t map_off;
142
0
    char *line_json;
143
0
    flb_sds_t json;
144
0
    struct flb_time tms;
145
0
    msgpack_object *obj;
146
0
    msgpack_unpacked result;
147
0
    msgpack_packer mp_pck;
148
0
    msgpack_sbuffer mp_sbuf;
149
150
    /* Count number of records */
151
0
    total_lines = flb_mp_count(data, bytes);
152
153
    /* Initialize msgpack buffers */
154
0
    msgpack_unpacked_init(&result);
155
0
    msgpack_sbuffer_init(&mp_sbuf);
156
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
157
158
0
    msgpack_pack_map(&mp_pck, 1);
159
160
0
    msgpack_pack_str(&mp_pck, 5);
161
0
    msgpack_pack_str_body(&mp_pck, "lines", 5);
162
163
0
    msgpack_pack_array(&mp_pck, total_lines);
164
165
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == mp_ok) {
166
0
        flb_time_pop_from_msgpack(&tms, &result, &obj);
167
168
0
        map_off = mp_sbuf.size;
169
170
0
        array_size = 2;
171
0
        msgpack_pack_map(&mp_pck, array_size);
172
173
        /*
174
         * Append primary keys found, the return values is the number of appended
175
         * keys to the record, we use that to adjust the map header size.
176
         */
177
0
        ret = record_append_primary_keys(ctx, obj, &mp_pck);
178
0
        array_size += ret;
179
180
        /* Timestamp */
181
0
        msgpack_pack_str(&mp_pck, 9);
182
0
        msgpack_pack_str_body(&mp_pck, "timestamp", 9);
183
0
        msgpack_pack_int(&mp_pck, (int) flb_time_to_double(&tms));
184
185
        /* Line */
186
0
        msgpack_pack_str(&mp_pck, 4);
187
0
        msgpack_pack_str_body(&mp_pck, "line", 4);
188
189
0
        line_json = flb_msgpack_to_json_str(1024, obj);
190
0
        len = strlen(line_json);
191
0
        msgpack_pack_str(&mp_pck, len);
192
0
        msgpack_pack_str_body(&mp_pck, line_json, len);
193
0
        flb_free(line_json);
194
195
        /* Adjust map header size */
196
0
        flb_mp_set_map_header_size(mp_sbuf.data + map_off, array_size);
197
0
    }
198
0
    msgpack_unpacked_destroy(&result);
199
200
0
    json = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
201
0
    msgpack_sbuffer_destroy(&mp_sbuf);
202
203
0
    return json;
204
0
}
205
206
static void logdna_config_destroy(struct flb_logdna *ctx)
207
0
{
208
0
    if (ctx->u) {
209
0
        flb_upstream_destroy(ctx->u);
210
0
    }
211
212
0
    if (ctx->tags_formatted) {
213
0
        flb_sds_destroy(ctx->tags_formatted);
214
0
    }
215
216
0
    flb_free(ctx);
217
0
}
218
219
static struct flb_logdna *logdna_config_create(struct flb_output_instance *ins,
220
                                               struct flb_config *config)
221
0
{
222
0
    int ret;
223
0
    int len = 0;
224
0
    char *hostname;
225
0
    flb_sds_t tmp;
226
0
    flb_sds_t encoded;
227
0
    struct mk_list *head;
228
0
    struct flb_slist_entry *tag_entry;
229
0
    struct flb_logdna *ctx;
230
0
    struct flb_upstream *upstream;
231
232
    /* Create context */
233
0
    ctx = flb_calloc(1, sizeof(struct flb_logdna));
234
0
    if (!ctx) {
235
0
        flb_errno();
236
0
        return NULL;
237
0
    }
238
0
    ctx->ins = ins;
239
240
    /* Load config map */
241
0
    ret = flb_output_config_map_set(ins, (void *) ctx);
242
0
    if (ret == -1) {
243
0
        logdna_config_destroy(ctx);
244
0
        return NULL;
245
0
    }
246
247
    /* validate API key */
248
0
    if (!ctx->api_key) {
249
0
        flb_plg_error(ins, "no `api_key` was set, this is a mandatory property");
250
0
        logdna_config_destroy(ctx);
251
0
        return NULL;
252
0
    }
253
254
    /*
255
     * Tags: this value is a linked list of values created by the config map
256
     * reader.
257
     */
258
0
    if (ctx->tags) {
259
        /* For every tag, make sure no empty spaces exists */
260
0
        mk_list_foreach(head, ctx->tags) {
261
0
            tag_entry = mk_list_entry(head, struct flb_slist_entry, _head);
262
0
            len += flb_sds_len(tag_entry->str) + 1;
263
0
        }
264
265
        /* Compose a full tag for URI request */
266
0
        ctx->tags_formatted = flb_sds_create_size(len);
267
0
        if (!ctx->tags_formatted) {
268
0
            logdna_config_destroy(ctx);
269
0
            return NULL;
270
0
        }
271
272
0
        mk_list_foreach(head, ctx->tags) {
273
0
            tag_entry = mk_list_entry(head, struct flb_slist_entry, _head);
274
275
0
            encoded = flb_uri_encode(tag_entry->str,
276
0
                                     flb_sds_len(tag_entry->str));
277
0
            tmp = flb_sds_cat(ctx->tags_formatted,
278
0
                              encoded, flb_sds_len(encoded));
279
0
            ctx->tags_formatted = tmp;
280
0
            flb_sds_destroy(encoded);
281
282
0
            if (tag_entry->_head.next != ctx->tags) {
283
0
                tmp = flb_sds_cat(ctx->tags_formatted, ",", 1);
284
0
                ctx->tags_formatted = tmp;
285
0
            }
286
0
        }
287
0
    }
288
289
    /*
290
     * Hostname: if the hostname was not set manually, try to get it from the
291
     * environment variable.
292
     *
293
     * Note that hostname is populated by a config map, and config maps are
294
     * immutable so we use an internal variable to do a final composition
295
     * if required.
296
     */
297
0
    if (!ctx->hostname) {
298
0
        tmp = NULL;
299
0
        hostname = (char *) flb_env_get(config->env, "HOSTNAME");
300
0
        if (hostname) {
301
0
            len = strlen(hostname);
302
0
            ctx->_hostname = flb_sds_create(hostname);
303
0
        }
304
0
        else {
305
0
            ctx->_hostname = flb_sds_create("unknown");
306
0
        }
307
0
        if (!ctx->_hostname) {
308
0
            flb_free(ctx);
309
0
            return NULL;
310
0
        }
311
0
    }
312
0
    else {
313
0
        ctx->_hostname = flb_sds_create(ctx->hostname);
314
0
    }
315
316
    /* Create Upstream connection context */
317
0
    upstream = flb_upstream_create(config,
318
0
                                   ctx->logdna_host,
319
0
                                   ctx->logdna_port,
320
0
                                   FLB_IO_TLS, ins->tls);
321
0
    if (!upstream) {
322
0
        flb_free(ctx);
323
0
        return NULL;
324
0
    }
325
0
    ctx->u = upstream;
326
0
    flb_output_upstream_set(ctx->u, ins);
327
328
    /* Set networking defaults */
329
0
    flb_output_net_default(FLB_LOGDNA_HOST, atoi(FLB_LOGDNA_PORT), ins);
330
0
    return ctx;
331
0
}
332
333
static int cb_logdna_init(struct flb_output_instance *ins,
334
                          struct flb_config *config, void *data)
335
0
{
336
0
    struct flb_logdna *ctx;
337
338
0
    ctx = logdna_config_create(ins, config);
339
0
    if (!ctx) {
340
0
        flb_plg_error(ins, "cannot initialize configuration");
341
0
        return -1;
342
0
    }
343
344
0
    flb_output_set_context(ins, ctx);
345
346
    /*
347
     * This plugin instance uses the HTTP client interface, let's register
348
     * it debugging callbacks.
349
     */
350
0
    flb_output_set_http_debug_callbacks(ins);
351
352
0
    flb_plg_info(ins, "configured, hostname=%s", ctx->hostname);
353
0
    return 0;
354
0
}
355
356
static void cb_logdna_flush(struct flb_event_chunk *event_chunk,
357
                            struct flb_output_flush *out_flush,
358
                            struct flb_input_instance *i_ins,
359
                            void *out_context,
360
                            struct flb_config *config)
361
0
{
362
0
    int ret;
363
0
    int out_ret = FLB_OK;
364
0
    size_t b_sent;
365
0
    flb_sds_t uri;
366
0
    flb_sds_t tmp;
367
0
    flb_sds_t payload;
368
0
    struct flb_logdna *ctx = out_context;
369
0
    struct flb_connection *u_conn;
370
0
    struct flb_http_client *c;
371
372
    /* Format the data to the expected LogDNA Payload */
373
0
    payload = logdna_compose_payload(ctx,
374
0
                                     event_chunk->data,
375
0
                                     event_chunk->size,
376
0
                                     event_chunk->tag,
377
0
                                     flb_sds_len(event_chunk->tag));
378
0
    if (!payload) {
379
0
        flb_plg_error(ctx->ins, "cannot compose request payload");
380
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
381
0
    }
382
383
    /* Lookup an available connection context */
384
0
    u_conn = flb_upstream_conn_get(ctx->u);
385
0
    if (!u_conn) {
386
0
        flb_plg_error(ctx->ins, "no upstream connections available");
387
0
        flb_sds_destroy(payload);
388
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
389
0
    }
390
391
    /* Compose the HTTP URI */
392
0
    uri = flb_sds_create_size(256);
393
0
    if (!uri) {
394
0
        flb_plg_error(ctx->ins, "cannot allocate buffer for URI");
395
0
        flb_sds_destroy(payload);
396
0
        flb_free(ctx);
397
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
398
0
    }
399
0
    tmp = flb_sds_printf(&uri,
400
0
                         "/logs/ingest?hostname=%s&mac=%s&ip=%s&now=%lu&tags=%s",
401
0
                         ctx->_hostname,
402
0
                         ctx->mac_addr,
403
0
                         ctx->ip_addr,
404
0
                         time(NULL),
405
0
                         ctx->tags_formatted);
406
0
    if (!tmp) {
407
0
        flb_plg_error(ctx->ins, "error formatting URI");
408
0
        flb_sds_destroy(payload);
409
0
        flb_free(ctx);
410
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
411
0
    }
412
413
    /* Create HTTP client context */
414
0
    c = flb_http_client(u_conn, FLB_HTTP_POST, uri,
415
0
                        payload, flb_sds_len(payload),
416
0
                        ctx->logdna_host, ctx->logdna_port,
417
0
                        NULL, 0);
418
0
    if (!c) {
419
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
420
0
        flb_sds_destroy(uri);
421
0
        flb_sds_destroy(payload);
422
0
        flb_upstream_conn_release(u_conn);
423
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
424
0
    }
425
426
    /* Set callback context to the HTTP client context */
427
0
    flb_http_set_callback_context(c, ctx->ins->callback);
428
429
    /* User Agent */
430
0
    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
431
432
    /* Add Content-Type header */
433
0
    flb_http_add_header(c,
434
0
                        FLB_LOGDNA_CT, sizeof(FLB_LOGDNA_CT) - 1,
435
0
                        FLB_LOGDNA_CT_JSON, sizeof(FLB_LOGDNA_CT_JSON) - 1);
436
437
    /* Add auth */
438
0
    flb_http_basic_auth(c, ctx->api_key, "");
439
440
0
    flb_http_strip_port_from_host(c);
441
442
    /* Send HTTP request */
443
0
    ret = flb_http_do(c, &b_sent);
444
445
    /* Destroy buffers */
446
0
    flb_sds_destroy(uri);
447
0
    flb_sds_destroy(payload);
448
449
    /* Validate HTTP client return status */
450
0
    if (ret == 0) {
451
        /*
452
         * Only allow the following HTTP status:
453
         *
454
         * - 200: OK
455
         * - 201: Created
456
         * - 202: Accepted
457
         * - 203: no authorative resp
458
         * - 204: No Content
459
         * - 205: Reset content
460
         *
461
         */
462
0
        if (c->resp.status < 200 || c->resp.status > 205) {
463
0
            if (c->resp.payload) {
464
0
                flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i\n%s",
465
0
                              ctx->logdna_host, ctx->logdna_port, c->resp.status,
466
0
                              c->resp.payload);
467
0
            }
468
0
            else {
469
0
                flb_plg_error(ctx->ins, "%s:%i, HTTP status=%i",
470
0
                              ctx->logdna_host, ctx->logdna_port, c->resp.status);
471
0
            }
472
0
            out_ret = FLB_RETRY;
473
0
        }
474
0
        else {
475
0
            if (c->resp.payload) {
476
0
                flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s",
477
0
                             ctx->logdna_host, ctx->logdna_port,
478
0
                             c->resp.status, c->resp.payload);
479
0
            }
480
0
            else {
481
0
                flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i",
482
0
                             ctx->logdna_host, ctx->logdna_port,
483
0
                             c->resp.status);
484
0
            }
485
0
        }
486
0
    }
487
0
    else {
488
0
        flb_plg_error(ctx->ins, "could not flush records to %s:%s (http_do=%i)",
489
0
                      FLB_LOGDNA_HOST, FLB_LOGDNA_PORT, ret);
490
0
        out_ret = FLB_RETRY;
491
0
    }
492
493
0
    flb_http_client_destroy(c);
494
0
    flb_upstream_conn_release(u_conn);
495
0
    FLB_OUTPUT_RETURN(out_ret);
496
0
}
497
498
static int cb_logdna_exit(void *data, struct flb_config *config)
499
0
{
500
0
    struct flb_logdna *ctx = data;
501
502
0
    if (!ctx) {
503
0
        return 0;
504
0
    }
505
506
0
    if (ctx->_hostname) {
507
0
        flb_sds_destroy(ctx->_hostname);
508
0
    }
509
0
    logdna_config_destroy(ctx);
510
0
    return 0;
511
0
}
512
513
/* Configuration properties map */
514
static struct flb_config_map config_map[] = {
515
    {
516
     FLB_CONFIG_MAP_STR, "logdna_host", FLB_LOGDNA_HOST,
517
     0, FLB_TRUE, offsetof(struct flb_logdna, logdna_host),
518
     "LogDNA Host address"
519
    },
520
521
    {
522
     FLB_CONFIG_MAP_INT, "logdna_port", FLB_LOGDNA_PORT,
523
     0, FLB_TRUE, offsetof(struct flb_logdna, logdna_port),
524
     "LogDNA TCP port"
525
    },
526
527
    {
528
     FLB_CONFIG_MAP_STR, "api_key", NULL,
529
     0, FLB_TRUE, offsetof(struct flb_logdna, api_key),
530
     "Logdna API key"
531
    },
532
533
    {
534
     FLB_CONFIG_MAP_STR, "hostname", NULL,
535
     0, FLB_TRUE, offsetof(struct flb_logdna, hostname),
536
     "Local Server or device host name"
537
    },
538
539
    {
540
     FLB_CONFIG_MAP_STR, "mac", "",
541
     0, FLB_TRUE, offsetof(struct flb_logdna, mac_addr),
542
     "MAC address (optional)"
543
    },
544
545
    {
546
     FLB_CONFIG_MAP_STR, "ip", "",
547
     0, FLB_TRUE, offsetof(struct flb_logdna, ip_addr),
548
     "IP address (optional)"
549
    },
550
551
    {
552
     FLB_CONFIG_MAP_CLIST, "tags", "",
553
     0, FLB_TRUE, offsetof(struct flb_logdna, tags),
554
     "Tags (optional)"
555
    },
556
557
    {
558
     FLB_CONFIG_MAP_STR, "file", NULL,
559
     0, FLB_TRUE, offsetof(struct flb_logdna, file),
560
     "Name of the monitored file (optional)"
561
    },
562
563
    {
564
     FLB_CONFIG_MAP_STR, "app", "Fluent Bit",
565
     0, FLB_TRUE, offsetof(struct flb_logdna, app),
566
     "Name of the application generating the data (optional)"
567
    },
568
569
    /* EOF */
570
    {0}
571
572
};
573
574
/* Plugin reference */
575
struct flb_output_plugin out_logdna_plugin = {
576
    .name        = "logdna",
577
    .description = "LogDNA",
578
    .cb_init     = cb_logdna_init,
579
    .cb_flush    = cb_logdna_flush,
580
    .cb_exit     = cb_logdna_exit,
581
    .config_map  = config_map,
582
    .flags       = FLB_OUTPUT_NET | FLB_IO_TLS,
583
};