Coverage Report

Created: 2023-03-26 07:01

/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2022 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_http_client.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_oauth2.h>
23
#include <fluent-bit/flb_output_plugin.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_signv4.h>
26
27
#include "azure_kusto.h"
28
#include "azure_kusto_conf.h"
29
#include "azure_kusto_ingest.h"
30
31
/* Create a new oauth2 context and get a oauth2 token */
32
static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
33
0
{
34
0
    int ret;
35
0
    char *token;
36
37
    /* Clear any previous oauth2 payload content */
38
0
    flb_oauth2_payload_clear(ctx->o);
39
40
0
    ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);
41
0
    if (ret == -1) {
42
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
43
0
        return -1;
44
0
    }
45
46
0
    ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
47
0
    if (ret == -1) {
48
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
49
0
        return -1;
50
0
    }
51
52
0
    ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);
53
0
    if (ret == -1) {
54
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
55
0
        return -1;
56
0
    }
57
58
0
    ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);
59
0
    if (ret == -1) {
60
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
61
0
        return -1;
62
0
    }
63
64
    /* Retrieve access token */
65
0
    token = flb_oauth2_token_get(ctx->o);
66
0
    if (!token) {
67
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
68
0
        return -1;
69
0
    }
70
71
0
    return 0;
72
0
}
73
74
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
75
0
{
76
0
    int ret = 0;
77
0
    flb_sds_t output = NULL;
78
79
0
    if (pthread_mutex_lock(&ctx->token_mutex)) {
80
0
        flb_plg_error(ctx->ins, "error locking mutex");
81
0
        return NULL;
82
0
    }
83
84
0
    if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
85
0
        ret = azure_kusto_get_oauth2_token(ctx);
86
0
    }
87
88
    /* Copy string to prevent race conditions (get_oauth2 can free the string) */
89
0
    if (ret == 0) {
90
0
        output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
91
0
                                     flb_sds_len(ctx->o->access_token) + 2);
92
0
        if (!output) {
93
0
            flb_plg_error(ctx->ins, "error creating token buffer");
94
0
            return NULL;
95
0
        }
96
0
        flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
97
0
                         ctx->o->access_token);
98
0
    }
99
100
0
    if (pthread_mutex_unlock(&ctx->token_mutex)) {
101
0
        flb_plg_error(ctx->ins, "error unlocking mutex");
102
0
        if (output) {
103
0
            flb_sds_destroy(output);
104
0
        }
105
0
        return NULL;
106
0
    }
107
108
0
    return output;
109
0
}
110
111
/**
112
 * Executes a control command against kusto's endpoint
113
 *
114
 * @param ctx       Plugin's context
115
 * @param csl       Kusto's control command
116
 * @return flb_sds_t      Returns the response or NULL on error.
117
 */
118
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)
119
0
{
120
0
    flb_sds_t token;
121
0
    flb_sds_t body;
122
0
    size_t b_sent;
123
0
    int ret;
124
0
    struct flb_connection *u_conn;
125
0
    struct flb_http_client *c;
126
0
    flb_sds_t resp = NULL;
127
128
    /* Get upstream connection */
129
0
    u_conn = flb_upstream_conn_get(ctx->u);
130
131
0
    if (u_conn) {
132
0
        token = get_azure_kusto_token(ctx);
133
134
0
        if (token) {
135
            /* Compose request body */
136
0
            body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
137
0
                                       strlen(csl));
138
139
0
            if (body) {
140
0
                flb_sds_snprintf(&body, flb_sds_alloc(body),
141
0
                                 FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);
142
143
                /* Compose HTTP Client request */
144
0
                c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,
145
0
                                    body, flb_sds_len(body), NULL, 0, NULL, 0);
146
147
0
                if (c) {
148
                    /* Add headers */
149
0
                    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
150
0
                    flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
151
0
                    flb_http_add_header(c, "Accept", 6, "application/json", 16);
152
0
                    flb_http_add_header(c, "Authorization", 13, token,
153
0
                                        flb_sds_len(token));
154
0
                    flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);
155
156
                    /* Send HTTP request */
157
0
                    ret = flb_http_do(c, &b_sent);
158
0
                    flb_plg_debug(
159
0
                        ctx->ins,
160
0
                        "Kusto ingestion command request http_do=%i, HTTP Status: %i",
161
0
                        ret, c->resp.status);
162
163
0
                    if (ret == 0) {
164
0
                        if (c->resp.status == 200) {
165
                            /* Copy payload response to the response param */
166
0
                            resp =
167
0
                                flb_sds_create_len(c->resp.payload, c->resp.payload_size);
168
0
                        }
169
0
                        else if (c->resp.payload_size > 0) {
170
0
                            flb_plg_debug(ctx->ins, "Request failed and returned: \n%s",
171
0
                                          c->resp.payload);
172
0
                        }
173
0
                        else {
174
0
                            flb_plg_debug(ctx->ins, "Request failed");
175
0
                        }
176
0
                    }
177
0
                    else {
178
0
                        flb_plg_error(ctx->ins, "cannot send HTTP request");
179
0
                    }
180
181
0
                    flb_http_client_destroy(c);
182
0
                }
183
0
                else {
184
0
                    flb_plg_error(ctx->ins, "cannot create HTTP client context");
185
0
                }
186
187
0
                flb_sds_destroy(body);
188
0
            }
189
0
            else {
190
0
                flb_plg_error(ctx->ins, "cannot construct request body");
191
0
            }
192
193
0
            flb_sds_destroy(token);
194
0
        }
195
0
        else {
196
0
            flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
197
0
        }
198
199
0
        flb_upstream_conn_release(u_conn);
200
0
    }
201
0
    else {
202
0
        flb_plg_error(ctx->ins, "cannot create upstream connection");
203
0
    }
204
205
0
    return resp;
206
0
}
207
208
static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,
209
                               void *data)
210
0
{
211
0
    int io_flags = FLB_IO_TLS;
212
0
    struct flb_azure_kusto *ctx;
213
214
    /* Create config context */
215
0
    ctx = flb_azure_kusto_conf_create(ins, config);
216
0
    if (!ctx) {
217
0
        flb_plg_error(ins, "configuration failed");
218
0
        return -1;
219
0
    }
220
221
0
    flb_output_set_context(ins, ctx);
222
223
    /* Network mode IPv6 */
224
0
    if (ins->host.ipv6 == FLB_TRUE) {
225
0
        io_flags |= FLB_IO_IPV6;
226
0
    }
227
228
    /* Create mutex for acquiring oauth tokens  and getting ingestion resources (they
229
     * are shared across flush coroutines)
230
     */
231
0
    pthread_mutex_init(&ctx->token_mutex, NULL);
232
0
    pthread_mutex_init(&ctx->resources_mutex, NULL);
233
234
    /*
235
     * Create upstream context for Kusto Ingestion endpoint
236
     */
237
0
    ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
238
0
    if (!ctx->u) {
239
0
        flb_plg_error(ctx->ins, "upstream creation failed");
240
0
        return -1;
241
0
    }
242
243
    /* Create oauth2 context */
244
0
    ctx->o =
245
0
        flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
246
0
    if (!ctx->o) {
247
0
        flb_plg_error(ctx->ins, "cannot create oauth2 context");
248
0
        return -1;
249
0
    }
250
0
    flb_output_upstream_set(ctx->u, ins);
251
252
0
    return 0;
253
0
}
254
255
static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,
256
                              const void *data, size_t bytes, void **out_data,
257
                              size_t *out_size)
258
0
{
259
0
    int records = 0;
260
0
    size_t off = 0;
261
0
    msgpack_unpacked result;
262
0
    msgpack_sbuffer mp_sbuf;
263
0
    msgpack_packer mp_pck;
264
    /* for sub msgpack objs */
265
0
    int map_size;
266
0
    struct flb_time tm;
267
0
    struct tm tms;
268
0
    msgpack_object root;
269
0
    msgpack_object *obj;
270
0
    char time_formatted[32];
271
0
    size_t s;
272
0
    int len;
273
274
    /* output buffer */
275
0
    flb_sds_t out_buf;
276
277
    /* Create array for all records */
278
0
    records = flb_mp_count(data, bytes);
279
0
    if (records <= 0) {
280
0
        flb_plg_error(ctx->ins, "error counting msgpack entries");
281
0
        return -1;
282
0
    }
283
284
    /* Create temporary msgpack buffer */
285
0
    msgpack_sbuffer_init(&mp_sbuf);
286
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
287
288
0
    msgpack_pack_array(&mp_pck, records);
289
290
0
    off = 0;
291
0
    msgpack_unpacked_init(&result);
292
0
    while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
293
0
        root = result.data;
294
        /* Each array must have two entries: time and record */
295
0
        if (root.type != MSGPACK_OBJECT_ARRAY) {
296
0
            flb_plg_debug(ctx->ins, "unexpected msgpack object type: %d", root.type);
297
0
            continue;
298
0
        }
299
0
        if (root.via.array.size != 2) {
300
0
            flb_plg_debug(ctx->ins, "unexpected msgpack array size: %d",
301
0
                          root.via.array.size);
302
0
            continue;
303
0
        }
304
305
        /* Get timestamp and object */
306
0
        flb_time_pop_from_msgpack(&tm, &result, &obj);
307
308
0
        map_size = 1;
309
0
        if (ctx->include_time_key == FLB_TRUE) {
310
0
            map_size++;
311
0
        }
312
313
0
        if (ctx->include_tag_key == FLB_TRUE) {
314
0
            map_size++;
315
0
        }
316
317
0
        msgpack_pack_map(&mp_pck, map_size);
318
319
        /* include_time_key */
320
0
        if (ctx->include_time_key == FLB_TRUE) {
321
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
322
0
            msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
323
324
            /* Append the time value as ISO 8601 */
325
0
            gmtime_r(&tm.tm.tv_sec, &tms);
326
0
            s = strftime(time_formatted, sizeof(time_formatted) - 1,
327
0
                         FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
328
329
0
            len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
330
0
                           ".%03" PRIu64 "Z", (uint64_t)tm.tm.tv_nsec / 1000000);
331
0
            s += len;
332
0
            msgpack_pack_str(&mp_pck, s);
333
0
            msgpack_pack_str_body(&mp_pck, time_formatted, s);
334
0
        }
335
336
        /* include_tag_key */
337
0
        if (ctx->include_tag_key == FLB_TRUE) {
338
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));
339
0
            msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
340
0
            msgpack_pack_str(&mp_pck, tag_len);
341
0
            msgpack_pack_str_body(&mp_pck, tag, tag_len);
342
0
        }
343
344
0
        msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));
345
0
        msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));
346
0
        msgpack_pack_object(&mp_pck, *obj);
347
0
    }
348
349
    /* Convert from msgpack to JSON */
350
0
    out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size);
351
352
    /* Cleanup */
353
0
    msgpack_sbuffer_destroy(&mp_sbuf);
354
0
    msgpack_unpacked_destroy(&result);
355
356
0
    if (!out_buf) {
357
0
        flb_plg_error(ctx->ins, "error formatting JSON payload");
358
0
        return -1;
359
0
    }
360
361
0
    *out_data = out_buf;
362
0
    *out_size = flb_sds_len(out_buf);
363
364
0
    return 0;
365
0
}
366
367
static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
368
                                 struct flb_output_flush *out_flush,
369
                                 struct flb_input_instance *i_ins, void *out_context,
370
                                 struct flb_config *config)
371
0
{
372
0
    int ret;
373
0
    flb_sds_t json;
374
0
    size_t json_size;
375
0
    size_t tag_len;
376
0
    struct flb_azure_kusto *ctx = out_context;
377
378
0
    (void)i_ins;
379
0
    (void)config;
380
381
0
    flb_plg_trace(ctx->ins, "flushing bytes %zu", event_chunk->size);
382
383
0
    tag_len = flb_sds_len(event_chunk->tag);
384
385
    /* Load or refresh ingestion resources */
386
0
    ret = azure_kusto_load_ingestion_resources(ctx, config);
387
0
    if (ret != 0) {
388
0
        flb_plg_error(ctx->ins, "cannot load ingestion resources");
389
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
390
0
    }
391
392
    /* Reformat msgpack to JSON payload */
393
0
    ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
394
0
                             event_chunk->size, (void **)&json, &json_size);
395
0
    if (ret != 0) {
396
0
        flb_plg_error(ctx->ins, "cannot reformat data into json");
397
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
398
0
    }
399
400
0
    ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, json, json_size);
401
0
    if (ret != 0) {
402
0
        flb_plg_error(ctx->ins, "cannot perform queued ingestion");
403
0
        flb_sds_destroy(json);
404
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
405
0
    }
406
407
    /* Cleanup */
408
0
    flb_sds_destroy(json);
409
410
    /* Done */
411
0
    FLB_OUTPUT_RETURN(FLB_OK);
412
0
}
413
414
static int cb_azure_kusto_exit(void *data, struct flb_config *config)
415
0
{
416
0
    struct flb_azure_kusto *ctx = data;
417
418
0
    if (!ctx) {
419
0
        return -1;
420
0
    }
421
422
0
    if (ctx->u) {
423
0
        flb_upstream_destroy(ctx->u);
424
0
        ctx->u = NULL;
425
0
    }
426
427
0
    flb_azure_kusto_conf_destroy(ctx);
428
429
0
    return 0;
430
0
}
431
432
static struct flb_config_map config_map[] = {
433
    {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE,
434
     offsetof(struct flb_azure_kusto, tenant_id),
435
     "Set the tenant ID of the AAD application used for authentication"},
436
    {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
437
     offsetof(struct flb_azure_kusto, client_id),
438
     "Set the client ID (Application ID) of the AAD application used for authentication"},
439
    {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
440
     offsetof(struct flb_azure_kusto, client_secret),
441
     "Set the client secret (Application Password) of the AAD application used for "
442
     "authentication"},
443
    {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
444
     offsetof(struct flb_azure_kusto, ingestion_endpoint),
445
     "Set the Kusto cluster's ingestion endpoint URL (e.g. "
446
     "https://ingest-mycluster.eastus.kusto.windows.net)"},
447
    {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE,
448
     offsetof(struct flb_azure_kusto, database_name), "Set the database name"},
449
    {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE,
450
     offsetof(struct flb_azure_kusto, table_name), "Set the table name"},
451
    {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE,
452
     offsetof(struct flb_azure_kusto, ingestion_mapping_reference),
453
     "Set the ingestion mapping reference"},
454
    {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE,
455
     offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},
456
    {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE,
457
     offsetof(struct flb_azure_kusto, include_tag_key),
458
     "If enabled, tag is appended to output. "
459
     "The key name is used 'tag_key' property."},
460
    {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE,
461
     offsetof(struct flb_azure_kusto, tag_key),
462
     "The key name of tag. If 'include_tag_key' is false, "
463
     "This property is ignored"},
464
    {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE,
465
     offsetof(struct flb_azure_kusto, include_time_key),
466
     "If enabled, time is appended to output. "
467
     "The key name is used 'time_key' property."},
468
    {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE,
469
     offsetof(struct flb_azure_kusto, time_key),
470
     "The key name of the time. If 'include_time_key' is false, "
471
     "This property is ignored"},
472
    /* EOF */
473
    {0}};
474
475
struct flb_output_plugin out_azure_kusto_plugin = {
476
    .name = "azure_kusto",
477
    .description = "Send events to Kusto (Azure Data Explorer)",
478
    .cb_init = cb_azure_kusto_init,
479
    .cb_flush = cb_azure_kusto_flush,
480
    .cb_exit = cb_azure_kusto_exit,
481
    .config_map = config_map,
482
    /* Plugin flags */
483
    .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
484
};