Coverage Report

Created: 2026-05-16 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_http_client.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_oauth2.h>
23
#include <fluent-bit/flb_output_plugin.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_log_event_decoder.h>
26
#include <fluent-bit/flb_scheduler.h>
27
#include <fluent-bit/flb_gzip.h>
28
#include <fluent-bit/flb_utils.h>
29
#include <stdio.h>
30
#include <fluent-bit/flb_sds.h>
31
#include <fluent-bit/flb_fstore.h>
32
#include <msgpack.h>
33
#include <fluent-bit/flb_version.h>
34
#include <inttypes.h>
35
36
#include "azure_kusto.h"
37
#include "azure_kusto_conf.h"
38
#include "azure_kusto_ingest.h"
39
#include "azure_msiauth.h"
40
#include "azure_kusto_store.h"
41
42
/**
43
 * Retrieve an OAuth2 access token using Managed Service Identity (MSI).
44
 *
45
 * @param ctx  Plugin's context containing the OAuth2 configuration.
46
 * @return int 0 on success, -1 on failure.
47
 */
48
static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
49
0
{
50
0
    char *token;
51
52
    /* Retrieve access token */
53
0
    token = flb_azure_msiauth_token_get(ctx->o);
54
0
    if (!token) {
55
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)");
56
0
        return -1;
57
0
    }
58
59
0
    return 0;
60
0
}
61
62
/**
63
 * Retrieve an OAuth2 access token using workload identity federation.
64
 *
65
 * @param ctx  Plugin's context containing workload identity configuration.
66
 * @return int 0 on success, -1 on failure.
67
 */
68
static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
69
0
{
70
0
    int ret;
71
    
72
0
    ret = flb_azure_workload_identity_token_get(ctx->o, 
73
0
                                               ctx->workload_identity_token_file,
74
0
                                               ctx->client_id, 
75
0
                                               ctx->tenant_id);
76
0
    if (ret == -1) {
77
0
        flb_plg_error(ctx->ins, "error retrieving workload identity token");
78
0
        return -1;
79
0
    }
80
    
81
0
    flb_plg_debug(ctx->ins, "Workload identity token retrieved successfully");
82
0
    return 0;
83
0
}
84
85
/**
86
 * Retrieve an OAuth2 access token using service principal credentials.
87
 *
88
 * Constructs the OAuth2 payload with client credentials and requests
89
 * an access token from the configured OAuth2 endpoint.
90
 *
91
 * @param ctx  Plugin's context containing client credentials and OAuth2 config.
92
 * @return int 0 on success, -1 on failure.
93
 */
94
static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
95
0
{
96
0
    int ret;
97
    
98
    /* Clear any previous oauth2 payload content */
99
0
    flb_oauth2_payload_clear(ctx->o);
100
101
0
    ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);
102
0
    if (ret == -1) {
103
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
104
0
        return -1;
105
0
    }
106
107
0
    ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
108
0
    if (ret == -1) {
109
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
110
0
        return -1;
111
0
    }
112
113
0
    ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);
114
0
    if (ret == -1) {
115
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
116
0
        return -1;
117
0
    }
118
119
0
    ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);
120
0
    if (ret == -1) {
121
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
122
0
        return -1;
123
0
    }
124
    /* Retrieve access token */
125
0
    char *token = flb_oauth2_token_get(ctx->o);
126
0
    if (!token) {
127
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token - "
128
0
                      "check Fluent Bit logs for '[oauth2]' errors "
129
0
                      "(common causes: connection failure to '%s', invalid credentials, "
130
0
                      "or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown");
131
0
        return -1;
132
0
    }
133
134
0
    flb_plg_debug(ctx->ins, "OAuth2 token retrieval process completed successfully");
135
0
    return 0;
136
0
}
137
138
/**
139
 * Obtain the current Azure Kusto bearer token as a formatted string.
140
 *
141
 * Acquires the token mutex, refreshes the token if expired based on the
142
 * configured authentication type, and returns a copy of the token string
143
 * in the format "<token_type> <access_token>".
144
 *
145
 * @param ctx  Plugin's context.
146
 * @return flb_sds_t  The bearer token string, or NULL on error.
147
 */
148
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
149
0
{
150
0
    int ret = 0;
151
0
    flb_sds_t output = NULL;
152
153
0
    if (pthread_mutex_lock(&ctx->token_mutex)) {
154
0
        return NULL;
155
0
    }
156
157
0
    if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
158
0
        switch (ctx->auth_type) {
159
0
            case FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY:
160
0
                ret = azure_kusto_get_workload_identity_token(ctx);
161
0
                break;
162
0
            case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM:
163
0
            case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER:
164
0
                ret = azure_kusto_get_msi_token(ctx);
165
0
                break;
166
0
            case FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL:
167
0
            default:
168
0
                ret = azure_kusto_get_service_principal_token(ctx);
169
0
                break;
170
0
        }
171
0
    }
172
173
    /* Copy string to prevent race conditions (get_oauth2 can free the string) */
174
0
    if (ret == 0) {
175
0
        output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
176
0
                                     flb_sds_len(ctx->o->access_token) + 2);
177
0
        if (!output) {
178
0
            flb_plg_error(ctx->ins, "error creating token buffer");
179
0
            if (pthread_mutex_unlock(&ctx->token_mutex)) {
180
0
                flb_plg_error(ctx->ins, "error unlocking mutex");
181
0
            }
182
0
            return NULL;
183
0
        }
184
0
        flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
185
0
                         ctx->o->access_token);
186
0
    }
187
188
0
    if (pthread_mutex_unlock(&ctx->token_mutex)) {
189
0
        flb_plg_error(ctx->ins, "error unlocking mutex");
190
0
        if (output) {
191
0
            flb_sds_destroy(output);
192
0
        }
193
0
        return NULL;
194
0
    }
195
196
0
    return output;
197
0
}
198
199
/**
200
 * Executes a control command against kusto's endpoint
201
 *
202
 * @param ctx       Plugin's context
203
 * @param csl       Kusto's control command
204
 * @return flb_sds_t      Returns the response or NULL on error.
205
 */
206
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)
207
0
{
208
0
    flb_sds_t token;
209
0
    flb_sds_t body;
210
0
    size_t b_sent;
211
0
    int ret;
212
0
    struct flb_connection *u_conn;
213
0
    struct flb_http_client *c;
214
0
    flb_sds_t resp = NULL;
215
216
0
    flb_plg_debug(ctx->ins, "before getting upstream connection");
217
218
0
    flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
219
0
    flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
220
0
    flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
221
0
    flb_plg_debug(ctx->ins, "load_time: %" PRIu64, ctx->resources->load_time);
222
223
0
    ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout;
224
0
    if (ctx->buffering_enabled == FLB_TRUE){
225
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
226
0
    }
227
0
    flb_plg_debug(ctx->ins, "execute_ingest_csl_command -- async flag is %d", flb_stream_is_async(&ctx->u->base));
228
229
    /* Get upstream connection */
230
0
    u_conn = flb_upstream_conn_get(ctx->u);
231
232
0
    if (u_conn) {
233
0
        token = get_azure_kusto_token(ctx);
234
235
0
        if (token) {
236
            /* Compose request body */
237
0
            body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
238
0
                                       strlen(csl));
239
240
0
            if (body) {
241
0
                flb_sds_snprintf(&body, flb_sds_alloc(body),
242
0
                                 FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);
243
244
                /* Compose HTTP Client request */
245
0
                c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,
246
0
                                    body, flb_sds_len(body), NULL, 0, NULL, 0);
247
248
0
                if (c) {
249
                    /* Add headers */
250
0
                    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
251
0
                    flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
252
0
                    flb_http_add_header(c, "Accept", 6, "application/json", 16);
253
0
                    flb_http_add_header(c, "Authorization", 13, token,
254
0
                                        flb_sds_len(token));
255
0
                    flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
256
0
                    flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10);
257
0
                    flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10);
258
0
                    flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);
259
260
                    /* Send HTTP request */
261
0
                    ret = flb_http_do(c, &b_sent);
262
0
                    flb_plg_debug(
263
0
                            ctx->ins,
264
0
                            "Kusto ingestion command request http_do=%i, HTTP Status: %i",
265
0
                            ret, c->resp.status);
266
0
                    flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP response payload: %.*s", (int)c->resp.payload_size, c->resp.payload);
267
268
0
                    if (ret == 0) {
269
0
                        if (c->resp.status == 200) {
270
                            /* Copy payload response to the response param */
271
0
                            resp = flb_sds_create_len(c->resp.payload, c->resp.payload_size);
272
0
                        }
273
0
                        else {
274
0
                            flb_plg_error(ctx->ins, "Kusto Ingestion Resources Request failed with HTTP Status: %i", c->resp.status);
275
0
                            if (c->resp.payload_size > 0) {
276
0
                                flb_plg_error(ctx->ins, "Kusto Ingestion Resources Response payload: \n%s", c->resp.payload);
277
0
                            }
278
0
                        }
279
0
                    }
280
0
                    else {
281
0
                        flb_plg_error(ctx->ins, "Kusto Ingestion Resources :: cannot send HTTP request");
282
0
                    }
283
284
0
                    flb_http_client_destroy(c);
285
0
                }
286
0
                else {
287
0
                    flb_plg_error(ctx->ins, "cannot create HTTP client context");
288
0
                }
289
290
0
                flb_sds_destroy(body);
291
0
            }
292
0
            else {
293
0
                flb_plg_error(ctx->ins, "cannot construct request body");
294
0
            }
295
296
0
            flb_sds_destroy(token);
297
0
        }
298
0
        else {
299
0
            flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
300
0
        }
301
302
0
        flb_upstream_conn_release(u_conn);
303
0
    }
304
0
    else {
305
0
        flb_plg_error(ctx->ins, "cannot create upstream connection");
306
0
    }
307
308
0
    return resp;
309
0
}
310
311
/**
312
 * construct_request_buffer - Constructs a request buffer for Azure Kusto ingestion.
313
 *
314
 * This function is responsible for preparing a data buffer that will be used
315
 * to send data to Azure Kusto. It handles both new incoming data and data
316
 * that has been previously buffered in a file. The function performs the
317
 * following tasks:
318
 *
319
 * 1. Validates Input: Checks if both `new_data` and `upload_file` are NULL,
320
 *    which would indicate an error since there is no data to process.
321
 *
322
 * 2. Reads Buffered Data: If an `upload_file` is provided, it reads the
323
 *    locally buffered data from the file and locks the file to prevent
324
 *    concurrent modifications.
325
 *
326
 * 3. Appends New Data: If `new_data` is provided, it appends this data to
327
 *    the buffered data, reallocating memory as necessary to accommodate the
328
 *    combined data size.
329
 *
330
 * 4. Outputs the Result: Sets the output parameters `out_buf` and `out_size`
331
 *    to point to the constructed buffer and its size, respectively.
332
 *
333
 * The function ensures that the buffer is correctly terminated if compression
334
 * is not enabled, and it handles memory allocation and error checking
335
 * throughout the process.
336
 *
337
 * Parameters:
338
 * @ctx:        The context containing configuration and state information.
339
 * @new_data:   The new data to be appended to the buffer, if any.
340
 * @upload_file: The file containing previously buffered data, if any.
341
 * @out_buf:    Pointer to the output buffer that will be constructed.
342
 * @out_size:   Pointer to the size of the constructed buffer.
343
 *
344
 * Returns:
345
 * 0 on success, or -1 on failure with an appropriate error message logged.
346
 */
347
static int construct_request_buffer(struct flb_azure_kusto *ctx, flb_sds_t new_data,
348
                                    struct azure_kusto_file *upload_file,
349
                                    char **out_buf, size_t *out_size)
350
0
{
351
0
    char *body;
352
0
    char *tmp;
353
0
    size_t body_size = 0;
354
0
    char *buffered_data = NULL;
355
0
    size_t buffer_size = 0;
356
0
    int ret;
357
358
0
    if (new_data == NULL && upload_file == NULL) {
359
0
        flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong"
360
0
                                " both chunk and new_data are NULL");
361
0
        return -1;
362
0
    }
363
364
0
    if (upload_file) {
365
0
        ret = azure_kusto_store_file_upload_read(ctx, upload_file->fsf, &buffered_data, &buffer_size);
366
0
        if (ret < 0) {
367
0
            flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
368
0
                          upload_file->fsf->name);
369
0
            return -1;
370
0
        }
371
372
        /*
373
         * lock the upload_file from buffer list
374
         */
375
0
        azure_kusto_store_file_lock(upload_file);
376
0
        body = buffered_data;
377
0
        body_size = buffer_size;
378
0
    }
379
380
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] size of buffer file read %zu", buffer_size);
381
382
    /*
383
     * If new data is arriving, increase the original 'buffered_data' size
384
     * to append the new one.
385
     */
386
0
    if (new_data) {
387
0
        body_size += flb_sds_len(new_data);
388
0
        flb_plg_debug(ctx->ins, "[construct_request_buffer] size of new_data %zu", body_size);
389
390
0
        tmp = flb_realloc(buffered_data, body_size + 1);
391
0
        if (!tmp) {
392
0
            flb_errno();
393
0
            flb_free(buffered_data);
394
0
            if (upload_file) {
395
0
                azure_kusto_store_file_unlock(upload_file);
396
0
            }
397
0
            return -1;
398
0
        }
399
0
        body = buffered_data = tmp;
400
0
        memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
401
0
        if (ctx->compression_enabled == FLB_FALSE){
402
0
            body[body_size] = '\0';
403
0
        }
404
0
    }
405
406
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] final increased %zu", body_size);
407
408
0
    *out_buf = body;
409
0
    *out_size = body_size;
410
411
0
    return 0;
412
0
}
413
414
/**
415
 * Ingest all data chunks from the file storage streams into Azure Kusto.
416
 *
417
 * This function iterates over all file storage streams associated with the
418
 * given Azure Kusto context. For each
419
 * file in the stream, it checks if the file (chunk) is locked or has exceeded
420
 * the maximum number of retry attempts. If the chunk is eligible for processing,
421
 * it constructs a request buffer from the chunk data, optionally compresses
422
 * the payload, and attempts to ingest it into Azure Kusto.
423
 *
424
 * The function performs the following steps:
425
 * 1. Iterate over each file storage stream in the context.
426
 * 2. For each file in the stream, check if it is locked or has exceeded
427
 *    the maximum retry attempts. If so, skip processing.
428
 * 3. Construct a request buffer from the chunk data.
429
 * 4. Create a payload from the buffer and optionally compress it if
430
 *    compression is enabled.
431
 * 5. Load the necessary ingestion resources for Azure Kusto.
432
 * 6. Attempt to ingest the payload into Azure Kusto using queued ingestion.
433
 * 7. If ingestion is successful, clean up the local buffer file.
434
 * 8. Handle errors by unlocking the chunk, incrementing failure counts,
435
 *    and logging appropriate error messages.
436
 *
437
 * @param ctx    Pointer to the Azure Kusto context containing configuration
438
 *               and state information.
439
 * @param config Pointer to the Fluent Bit configuration structure.
440
 *
441
 * @return 0 on success, or -1 on failure.
442
 */
443
static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *config)
444
0
{
445
0
    struct azure_kusto_file *chunk;
446
0
    struct mk_list *tmp;
447
0
    struct mk_list *head;
448
0
    struct mk_list *f_head;
449
0
    struct mk_list *f_tmp;
450
0
    struct flb_fstore_file *fsf;
451
0
    struct flb_fstore_stream *fs_stream;
452
0
    flb_sds_t payload = NULL;
453
0
    void *final_payload = NULL;
454
0
    size_t final_payload_size = 0;
455
0
    char *buffer = NULL;
456
0
    size_t buffer_size;
457
0
    int ret;
458
0
    int is_compressed = FLB_FALSE;
459
0
    flb_sds_t tag_sds;
460
461
0
    mk_list_foreach_safe(head, tmp, &ctx->fs->streams) {
462
0
        fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
463
0
        if (fs_stream == ctx->stream_upload) {
464
0
            continue;
465
0
        }
466
467
0
        mk_list_foreach_safe(f_head, f_tmp, &fs_stream->files) {
468
0
            fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
469
0
            chunk = fsf->data;
470
471
            /* Skip files with no associated chunk data (may happen during shutdown) */
472
0
            if (chunk == NULL) {
473
0
                continue;
474
0
            }
475
476
            /* Locked chunks are being processed, skip */
477
0
            if (chunk->locked == FLB_TRUE) {
478
0
                continue;
479
0
            }
480
481
0
            if (chunk->failures >= ctx->scheduler_max_retries) {
482
0
                flb_plg_warn(ctx->ins,
483
0
                             "ingest_all_old_buffer_files :: Chunk for tag %s failed to send %i times, "
484
0
                             "will not retry",
485
0
                             (char *) fsf->meta_buf, ctx->scheduler_max_retries);
486
0
                if (ctx->delete_on_max_upload_error){
487
0
                    azure_kusto_store_file_delete(ctx, chunk);
488
0
                }
489
0
                else{
490
0
                    azure_kusto_store_file_inactive(ctx, chunk);
491
0
                }
492
0
                continue;
493
0
            }
494
495
0
            ret = construct_request_buffer(ctx, NULL, chunk,
496
0
                                           &buffer, &buffer_size);
497
0
            if (ret < 0) {
498
0
                flb_plg_error(ctx->ins,
499
0
                              "ingest_all_old_buffer_files :: Could not construct request buffer for %s",
500
0
                              chunk->file_path);
501
0
                return -1;
502
0
            }
503
504
0
            payload = flb_sds_create_len(buffer, buffer_size);
505
0
            tag_sds = flb_sds_create(fsf->meta_buf);
506
0
            flb_free(buffer);
507
508
            /* Compress the JSON payload */
509
0
            if (ctx->compression_enabled == FLB_TRUE) {
510
0
                ret = flb_gzip_compress((void *) payload, flb_sds_len(payload),
511
0
                                        &final_payload, &final_payload_size);
512
0
                if (ret != 0) {
513
0
                    flb_plg_error(ctx->ins,
514
0
                                  "ingest_all_old_buffer_files :: cannot gzip payload");
515
0
                    flb_sds_destroy(payload);
516
0
                    flb_sds_destroy(tag_sds);
517
0
                    return -1;
518
0
                }
519
0
                else {
520
0
                    is_compressed = FLB_TRUE;
521
0
                    flb_plg_debug(ctx->ins, "ingest_all_old_buffer_files :: enabled payload gzip compression");
522
0
                }
523
0
            }
524
0
            else {
525
0
                final_payload = payload;
526
0
                final_payload_size = flb_sds_len(payload);
527
0
            }
528
529
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
530
0
            if (ret != 0) {
531
0
                flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources");
532
0
                return -1;
533
0
            }
534
535
            /* Call azure_kusto_queued_ingestion to ingest the payload */
536
0
            ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk);
537
0
            if (ret != 0) {
538
0
                flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto");
539
0
                if (chunk){
540
0
                    azure_kusto_store_file_unlock(chunk);
541
0
                    chunk->failures += 1;
542
0
                }
543
0
                flb_sds_destroy(tag_sds);
544
0
                flb_sds_destroy(payload);
545
0
                if (is_compressed) {
546
0
                    flb_free(final_payload);
547
0
                }
548
0
                return -1;
549
0
            }
550
551
0
            flb_sds_destroy(tag_sds);
552
0
            flb_sds_destroy(payload);
553
0
            if (is_compressed) {
554
0
                flb_free(final_payload);
555
0
            }
556
557
            /* data was sent successfully- delete the local buffer */
558
0
            azure_kusto_store_file_cleanup(ctx, chunk);
559
0
        }
560
0
    }
561
562
0
    return 0;
563
0
}
564
565
/**
566
 * cb_azure_kusto_ingest - Callback function for ingesting data to Azure Kusto.
567
 *
568
 * Parameters:
569
 * @config: Pointer to the Fluent Bit configuration context.
570
 * @data: Pointer to the Kusto plugin context, which contains configuration and
571
 *        state information for the ingestion process.
572
 *
573
 * The function performs the following steps:
574
 * 1. Initializes a random seed for staggered refresh intervals.
575
 * 2. Logs the start of the upload timer callback.
576
 * 3. Iterates over all files in the active stream.
577
 * 4. Checks if each file has timed out and skips those that haven't.
578
 * 5. Skips files that are currently locked.
579
 * 6. For each eligible file, enters a retry loop to handle ingestion attempts:
580
 *    a. Constructs the request buffer for the file.
581
 *    b. Compresses the payload if compression is enabled.
582
 *    c. Loads necessary ingestion resources.
583
 *    d. Performs the queued ingestion to Azure Kusto.
584
 *    e. Deletes the file upon successful ingestion.
585
 * 7. Implements exponential backoff with jitter for retries.
586
 * 8. Logs errors and warnings for failed operations and retries.
587
 * 9. If the maximum number of retries is reached, logs an error and either
588
 *    deletes or marks the file as inactive based on configuration.
589
 * 10. Logs the end of the upload timer callback.
590
 */
591
static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
592
0
{
593
0
    struct flb_azure_kusto *ctx = data;
594
0
    struct azure_kusto_file *file = NULL;
595
0
    struct flb_fstore_file *fsf;
596
0
    char *buffer = NULL;
597
0
    size_t buffer_size = 0;
598
0
    void *final_payload = NULL;
599
0
    size_t final_payload_size = 0;
600
0
    struct mk_list *tmp;
601
0
    struct mk_list *head;
602
0
    int ret;
603
0
    time_t now;
604
0
    flb_sds_t payload;
605
0
    flb_sds_t tag_sds;
606
0
    int is_compressed = FLB_FALSE;
607
0
    int retry_count;
608
0
    int backoff_time;
609
0
    int max_backoff_time = 64; /* Maximum backoff time in seconds */
610
611
    /* Initialize random seed for staggered refresh intervals */
612
0
    srand(time(NULL));
613
614
    /* Log the start of the upload timer callback */
615
0
    flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest)..");
616
0
    now = time(NULL);
617
618
    /* Iterate over all files in the active stream */
619
0
    mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
620
0
        fsf = mk_list_entry(head, struct flb_fstore_file, _head);
621
0
        file = fsf->data;
622
0
        flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Iterating files inside upload timer callback (cb_azure_kusto_ingest).. %s", file->fsf->name);
623
624
        /* Check if the file has timed out */
625
0
        if (now < (file->create_time + ctx->upload_timeout + ctx->retry_time)) {
626
0
            continue; /* Skip files that haven't timed out */
627
0
        }
628
629
0
        flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before file locked check %s", file->fsf->name);
630
631
        /* Skip locked files */
632
0
        if (file->locked == FLB_TRUE) {
633
0
            continue;
634
0
        }
635
636
0
        retry_count = 0;
637
0
        backoff_time = 2; /* Initial backoff time in seconds */
638
639
        /* Retry loop for handling retries */
640
0
        while (retry_count < ctx->scheduler_max_retries) {
641
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before construct_request_buffer %s", file->fsf->name);
642
643
            /* Construct the request buffer */
644
0
            ret = construct_request_buffer(ctx, NULL, file, &buffer, &buffer_size);
645
0
            if (ret < 0) {
646
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Could not construct request buffer for %s", file->fsf->name);
647
0
                retry_count++;
648
                /* Add jitter: random value between 0 and backoff_time */
649
0
                int jitter = rand() % backoff_time;
650
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
651
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
652
0
                continue; /* Retry on failure */
653
0
            }
654
655
0
            payload = flb_sds_create_len(buffer, buffer_size);
656
0
            tag_sds = flb_sds_create(fsf->meta_buf);
657
658
            /* Compress the JSON payload if compression is enabled */
659
0
            if (ctx->compression_enabled == FLB_TRUE) {
660
0
                ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), &final_payload, &final_payload_size);
661
0
                if (ret != 0) {
662
0
                    flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot gzip payload");
663
0
                    flb_free(buffer);
664
0
                    flb_sds_destroy(payload);
665
0
                    flb_sds_destroy(tag_sds);
666
0
                    retry_count++;
667
0
                    if (file){
668
0
                        azure_kusto_store_file_unlock(file);
669
0
                        file->failures += 1;
670
0
                    }
671
                    /* Add jitter: random value between 0 and backoff_time */
672
0
                    int jitter = rand() % backoff_time;
673
0
                    flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: failed while compressing payload :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
674
0
                                 backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
675
0
                    sleep(backoff_time + jitter); /* Exponential backoff with jitter */
676
0
                    backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
677
0
                    continue; /* Retry on failure */
678
0
                }
679
0
                else {
680
0
                    is_compressed = FLB_TRUE;
681
0
                    flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: enabled payload gzip compression");
682
0
                }
683
0
            }
684
0
            else {
685
0
                final_payload = payload;
686
0
                final_payload_size = flb_sds_len(payload);
687
0
            }
688
689
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: tag of the file %s", tag_sds);
690
691
            /* Load ingestion resources */
692
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
693
0
            if (ret != 0) {
694
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot load ingestion resources");
695
696
                /* Free allocated resources */
697
0
                flb_free(buffer);
698
0
                flb_sds_destroy(payload);
699
0
                flb_sds_destroy(tag_sds);
700
0
                if (is_compressed) {
701
0
                    flb_free(final_payload);
702
0
                }
703
704
0
                retry_count++;
705
0
                if (file){
706
0
                    azure_kusto_store_file_unlock(file);
707
0
                    file->failures += 1;
708
0
                }
709
                /* Add jitter: random value between 0 and backoff_time */
710
0
                int jitter = rand() % backoff_time;
711
0
                flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error loading ingestion resources :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
712
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
713
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
714
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
715
0
                continue; /* Retry on failure */
716
0
            }
717
718
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: before starting kusto queued ingestion %s", file->fsf->name);
719
720
            /* Perform the queued ingestion */
721
0
            ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, NULL);
722
0
            if (ret != 0) {
723
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest: Failed to ingest data to kusto");
724
725
                /* Free allocated resources */
726
0
                flb_free(buffer);
727
0
                flb_sds_destroy(payload);
728
0
                flb_sds_destroy(tag_sds);
729
0
                if (is_compressed) {
730
0
                    flb_free(final_payload);
731
0
                }
732
733
0
                retry_count++;
734
0
                if (file){
735
0
                    azure_kusto_store_file_unlock(file);
736
0
                    file->failures += 1;
737
0
                }
738
                /* Add jitter: random value between 0 and backoff_time */
739
0
                int jitter = rand() % backoff_time;
740
0
                flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error while ingesting to kusto :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
741
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
742
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
743
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
744
0
                continue; /* Retry on failure */
745
0
            }
746
747
            /* Delete the file after successful ingestion */
748
0
            ret = azure_kusto_store_file_delete(ctx, file);
749
0
            if (ret == 0) {
750
0
                flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file");
751
0
            }
752
0
            else {
753
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: failed to delete ingested file %s", fsf->name);
754
0
                if (file){
755
0
                    azure_kusto_store_file_unlock(file);
756
0
                    file->failures += 1;
757
0
                }
758
0
            }
759
760
            /* Free allocated resources */
761
0
            flb_free(buffer);
762
0
            flb_sds_destroy(payload);
763
0
            flb_sds_destroy(tag_sds);
764
0
            if (is_compressed) {
765
0
                flb_free(final_payload);
766
0
            }
767
768
            /* If all operations succeed, break out of the retry loop */
769
0
            break;
770
0
        }
771
772
        /* If the maximum number of retries is reached, log an error and move to the next file */
773
0
        if (retry_count >= ctx->scheduler_max_retries) {
774
0
            flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name);
775
0
            if (ctx->delete_on_max_upload_error){
776
0
                azure_kusto_store_file_delete(ctx, file);
777
0
            }
778
0
            else {
779
0
                azure_kusto_store_file_inactive(ctx, file);
780
0
            }
781
0
        }
782
0
    }
783
    /* Log the end of the upload timer callback */
784
0
    flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest)..");
785
0
}
786
787
788
/**
789
 * Ingest data to Azure Kusto
790
 *
791
 * This function is responsible for preparing and sending data to Azure Kusto for ingestion.
792
 * It constructs a request buffer from the provided data, optionally compresses the payload,
793
 * and then sends it to Azure Kusto using a queued ingestion method.
794
 *
795
 * Parameters:
796
 * - out_context: A pointer to the output context, which is expected to be of type `struct flb_azure_kusto`.
797
 * - new_data: The new data to be ingested, represented as a flexible string descriptor (flb_sds_t).
798
 * - upload_file: A pointer to an `azure_kusto_file` structure that contains information about the file to be uploaded.
799
 * - tag: A constant character pointer representing the tag associated with the data.
800
 * - tag_len: An integer representing the length of the tag.
801
 *
802
 * Returns:
803
 * - 0 on successful ingestion.
804
 * - -1 if an error occurs during buffer construction, compression, or ingestion.
805
 *
806
 * The function performs the following steps:
807
 * 1. Constructs a request buffer from the provided data and upload file information.
808
 * 2. Creates a payload from the buffer and frees the buffer memory.
809
 * 3. Optionally compresses the payload using gzip if compression is enabled in the context.
810
 * 4. Calls the `azure_kusto_queued_ingestion` function to send the payload to Azure Kusto.
811
 * 5. Cleans up allocated resources, including destroying the payload and tag strings, and freeing the compressed payload if applicable.
812
 */
813
static int ingest_to_kusto(void *out_context, flb_sds_t new_data,
814
                               struct azure_kusto_file *upload_file,
815
                               const char *tag, int tag_len)
816
0
{
817
0
    int ret;
818
0
    char *buffer = NULL;
819
0
    size_t buffer_size;
820
0
    struct flb_azure_kusto *ctx = out_context;
821
0
    flb_sds_t payload = NULL;
822
0
    void *final_payload = NULL;
823
0
    size_t final_payload_size = 0;
824
0
    int is_compressed = FLB_FALSE;
825
0
    flb_sds_t tag_sds = flb_sds_create_len(tag, tag_len);
826
827
    /* Create buffer */
828
0
    ret = construct_request_buffer(ctx, new_data, upload_file, &buffer, &buffer_size);
829
0
    if (ret < 0) {
830
0
        flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
831
0
                      upload_file->fsf->name);
832
0
        return -1;
833
0
    }
834
0
    payload = flb_sds_create_len(buffer, buffer_size);
835
0
    if (!payload) {
836
0
        flb_plg_error(ctx->ins, "Could not create payload SDS");
837
0
        flb_free(buffer);
838
0
        return -1;
839
0
    }
840
0
    flb_free(buffer);
841
842
    /* Compress the JSON payload */
843
0
    if (ctx->compression_enabled == FLB_TRUE) {
844
0
        ret = flb_gzip_compress((void *) payload, flb_sds_len(payload),
845
0
                                &final_payload, &final_payload_size);
846
0
        if (ret != 0) {
847
0
            flb_plg_error(ctx->ins,
848
0
                          "cannot gzip payload");
849
0
            flb_sds_destroy(payload);
850
0
            flb_sds_destroy(tag_sds);
851
0
            return -1;
852
0
        }
853
0
        else {
854
0
            is_compressed = FLB_TRUE;
855
0
            flb_plg_debug(ctx->ins, "enabled payload gzip compression");
856
            /* JSON buffer will be cleared at cleanup: */
857
0
        }
858
0
    }
859
0
    else {
860
0
        final_payload = payload;
861
0
        final_payload_size = flb_sds_len(payload);
862
0
    }
863
864
    /* Call azure_kusto_queued_ingestion to ingest the payload */
865
0
    ret = azure_kusto_queued_ingestion(ctx, tag_sds, tag_len, final_payload, final_payload_size, upload_file);
866
0
    if (ret != 0) {
867
0
        flb_plg_error(ctx->ins, "Failed to ingest data to Azure Kusto");
868
0
        flb_sds_destroy(tag_sds);
869
0
        flb_sds_destroy(payload);
870
0
        if (is_compressed) {
871
0
            flb_free(final_payload);
872
0
        }
873
0
        return -1;
874
0
    }
875
876
0
    flb_sds_destroy(tag_sds);
877
0
    flb_sds_destroy(payload);
878
0
    if (is_compressed) {
879
0
        flb_free(final_payload);
880
0
    }
881
882
0
    return 0;
883
0
}
884
885
/**
886
 * Initializes the Azure Kusto output plugin.
887
 *
888
 * This function sets up the necessary configurations and resources for the Azure Kusto
889
 * output plugin to function correctly. It performs the following tasks:
890
 *
891
 * 1. Creates a configuration context for the plugin using the provided instance and config.
892
 * 2. Initializes local storage if buffering is enabled, ensuring that the storage directory
893
 *    is set up and any existing buffered data is accounted for.
894
 * 3. Validates the configured file size for uploads, ensuring it meets the minimum and
895
 *    maximum constraints.
896
 * 4. Sets up network configurations, including enabling IPv6 if specified.
897
 * 5. Initializes mutexes for thread-safe operations related to OAuth tokens and resource
898
 *    management.
899
 * 6. Creates an upstream context for connecting to the Kusto Ingestion endpoint, configuring
900
 *    it for synchronous or asynchronous operation based on buffering settings.
901
 * 7. If IMDS (Instance Metadata Service) is used, creates an upstream context for it.
902
 * 8. Establishes an OAuth2 context for handling authentication with Azure services.
903
 * 9. Associates the upstream context with the output instance for data transmission.
904
 *
905
 * The function returns 0 on successful initialization or -1 if any step fails.
906
 *
907
 * @param ins    The output instance to initialize.
908
 * @param config The configuration context for Fluent Bit.
909
 * @param data   Additional data passed to the initialization function.
910
 *
911
 * @return 0 on success, -1 on failure.
912
 */
913
static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,
914
                               void *data)
915
0
{
916
0
    int io_flags = FLB_IO_TLS;
917
0
    struct flb_azure_kusto *ctx;
918
919
0
    flb_plg_debug(ins, "inside azure kusto init");
920
921
    /* Create config context */
922
0
    ctx = flb_azure_kusto_conf_create(ins, config);
923
0
    if (!ctx) {
924
0
        flb_plg_error(ins, "configuration failed");
925
0
        return -1;
926
0
    }
927
928
0
    if (ctx->buffering_enabled == FLB_TRUE) {
929
0
        ctx->ins = ins;
930
0
        ctx->retry_time = 0;
931
932
        /* Initialize local storage */
933
0
        int ret = azure_kusto_store_init(ctx);
934
0
        if (ret == -1) {
935
0
            flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s",
936
0
                          ctx->store_dir);
937
0
            flb_azure_kusto_conf_destroy(ctx);
938
0
            return -1;
939
0
        }
940
0
        ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
941
942
        /* validate 'total_file_size' */
943
0
        if (ctx->file_size <= 0) {
944
0
            flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
945
0
            azure_kusto_store_exit(ctx);
946
0
            flb_azure_kusto_conf_destroy(ctx);
947
0
            return -1;
948
0
        }
949
0
        if (ctx->file_size < 1000000) {
950
0
            flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
951
0
            azure_kusto_store_exit(ctx);
952
0
            flb_azure_kusto_conf_destroy(ctx);
953
0
            return -1;
954
0
        }
955
0
        if (ctx->file_size > MAX_FILE_SIZE) {
956
0
            flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
957
0
            azure_kusto_store_exit(ctx);
958
0
            flb_azure_kusto_conf_destroy(ctx);
959
0
            return -1;
960
0
        }
961
962
0
        ctx->timer_created = FLB_FALSE;
963
0
        ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
964
0
        flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
965
0
    }
966
967
0
    flb_output_set_context(ins, ctx);
968
969
    /* Network mode IPv6 */
970
0
    if (ins->host.ipv6 == FLB_TRUE) {
971
0
        io_flags |= FLB_IO_IPV6;
972
0
    }
973
974
    /* Create mutex for acquiring oauth tokens  and getting ingestion resources (they
975
     * are shared across flush coroutines)
976
     */
977
0
    pthread_mutex_init(&ctx->token_mutex, NULL);
978
0
    pthread_mutex_init(&ctx->resources_mutex, NULL);
979
0
    pthread_mutex_init(&ctx->blob_mutex, NULL);
980
981
    /*
982
     * Create upstream context for Kusto Ingestion endpoint
983
     */
984
0
    ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
985
0
    if (!ctx->u) {
986
0
        flb_plg_error(ctx->ins, "upstream creation failed");
987
0
        if (ctx->buffering_enabled == FLB_TRUE) {
988
0
            azure_kusto_store_exit(ctx);
989
0
        }
990
0
        pthread_mutex_destroy(&ctx->resources_mutex);
991
0
        pthread_mutex_destroy(&ctx->token_mutex);
992
0
        pthread_mutex_destroy(&ctx->blob_mutex);
993
0
        flb_azure_kusto_conf_destroy(ctx);
994
0
        return -1;
995
0
    }    
996
0
    if (ctx->buffering_enabled ==  FLB_TRUE){
997
0
        flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC);
998
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
999
0
        ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
1000
0
    }
1001
1002
0
    flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
1003
1004
    /* Create oauth2 context */
1005
0
    ctx->o =
1006
0
        flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
1007
0
    if (!ctx->o) {
1008
0
        flb_plg_error(ctx->ins, "cannot create oauth2 context");
1009
0
        if (ctx->buffering_enabled == FLB_TRUE) {
1010
0
            azure_kusto_store_exit(ctx);
1011
0
        }
1012
0
        flb_upstream_destroy(ctx->u);
1013
0
        pthread_mutex_destroy(&ctx->resources_mutex);
1014
0
        pthread_mutex_destroy(&ctx->token_mutex);
1015
0
        pthread_mutex_destroy(&ctx->blob_mutex);
1016
0
        flb_azure_kusto_conf_destroy(ctx);
1017
0
        return -1;
1018
0
    }
1019
0
    flb_output_upstream_set(ctx->u, ins);
1020
1021
0
    flb_plg_debug(ctx->ins, "azure kusto init completed");
1022
1023
0
    return 0;
1024
0
}
1025
1026
1027
/**
1028
     * This function formats log data for Azure Kusto ingestion.
1029
     * It processes a batch of log records, encodes them in a specific format,
1030
     * and outputs the formatted data.
1031
     *
1032
     * Parameters:
1033
     * - ctx: Context containing configuration and state for Azure Kusto.
1034
     * - tag: A string tag associated with the log data.
1035
     * - tag_len: Length of the tag string.
1036
     * - data: Pointer to the raw log data in msgpack format.
1037
     * - bytes: Size of the raw log data.
1038
     * - out_data: Pointer to store the formatted output data.
1039
     * - out_size: Pointer to store the size of the formatted output data.
1040
     *
1041
     * Returns:
1042
     * - 0 on success, or -1 on error.
1043
     */
1044
static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,
1045
                              const void *data, size_t bytes, void **out_data,
1046
                              size_t *out_size,
1047
                              struct flb_config *config)
1048
0
{
1049
0
    int index;
1050
0
    int records = 0;
1051
0
    msgpack_sbuffer mp_sbuf;
1052
0
    msgpack_packer mp_pck;
1053
0
    struct tm tms;
1054
0
    char time_formatted[32];
1055
0
    size_t s;
1056
0
    int len;
1057
0
    struct flb_log_event_decoder log_decoder;
1058
0
    struct flb_log_event log_event;
1059
0
    int ret;
1060
0
    flb_sds_t out_buf;
1061
0
    flb_sds_t tmp_buf;
1062
0
    flb_sds_t json_record;
1063
0
    int map_size;
1064
1065
    /* Create array for all records */
1066
0
    records = flb_mp_count_log_records(data, bytes);
1067
0
    if (records <= 0) {
1068
0
        flb_plg_error(ctx->ins, "error counting msgpack entries");
1069
0
        return -1;
1070
0
    }
1071
1072
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
1073
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
1074
0
        flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret);
1075
0
        return -1;
1076
0
    }
1077
1078
    /* Initialize the output buffer */
1079
0
    out_buf = flb_sds_create_size(1024);
1080
0
    if (!out_buf) {
1081
0
        flb_plg_error(ctx->ins, "error creating output buffer");
1082
0
        flb_log_event_decoder_destroy(&log_decoder);
1083
0
        return -1;
1084
0
    }
1085
1086
    /* Create temporary msgpack buffer */
1087
0
    msgpack_sbuffer_init(&mp_sbuf);
1088
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1089
1090
0
    while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
1091
0
        msgpack_sbuffer_clear(&mp_sbuf);
1092
1093
0
        map_size = 1;
1094
0
        if (ctx->include_time_key == FLB_TRUE) {
1095
0
            map_size++;
1096
0
        }
1097
0
        if (ctx->include_tag_key == FLB_TRUE) {
1098
0
            map_size++;
1099
0
        }
1100
1101
0
        msgpack_pack_map(&mp_pck, map_size);
1102
1103
        /* include_time_key */
1104
0
        if (ctx->include_time_key == FLB_TRUE) {
1105
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
1106
0
            msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
1107
1108
0
            gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
1109
0
            s = strftime(time_formatted, sizeof(time_formatted) - 1, FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
1110
0
            len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z",
1111
0
                    (uint64_t) log_event.timestamp.tm.tv_nsec / 1000000);
1112
0
            s += len;
1113
0
            msgpack_pack_str(&mp_pck, s);
1114
0
            msgpack_pack_str_body(&mp_pck, time_formatted, s);
1115
0
        }
1116
1117
        /* include_tag_key */
1118
0
        if (ctx->include_tag_key == FLB_TRUE) {
1119
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));
1120
0
            msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
1121
0
            msgpack_pack_str(&mp_pck, tag_len);
1122
0
            msgpack_pack_str_body(&mp_pck, tag, tag_len);
1123
0
        }
1124
1125
0
        msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));
1126
0
        msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));
1127
1128
0
        if (log_event.group_attributes != NULL && log_event.body != NULL) {
1129
0
            msgpack_pack_map(&mp_pck,
1130
0
                                 log_event.group_attributes->via.map.size +
1131
0
                                 log_event.metadata->via.map.size +
1132
0
                                 log_event.body->via.map.size);
1133
1134
0
            for (index = 0; index < log_event.group_attributes->via.map.size; index++) { 
1135
0
                msgpack_pack_object(&mp_pck, log_event.group_attributes->via.map.ptr[index].key);
1136
0
                msgpack_pack_object(&mp_pck, log_event.group_attributes->via.map.ptr[index].val);
1137
0
            }
1138
1139
0
            for (index = 0; index < log_event.metadata->via.map.size; index++) {
1140
0
                msgpack_pack_object(&mp_pck, log_event.metadata->via.map.ptr[index].key);
1141
0
                msgpack_pack_object(&mp_pck, log_event.metadata->via.map.ptr[index].val);
1142
0
            }
1143
1144
0
            for (index = 0; index < log_event.body->via.map.size; index++) {
1145
0
                msgpack_pack_object(&mp_pck, log_event.body->via.map.ptr[index].key);
1146
0
                msgpack_pack_object(&mp_pck, log_event.body->via.map.ptr[index].val);
1147
0
            }
1148
0
        }
1149
0
        else if (log_event.body != NULL) {
1150
0
            msgpack_pack_object(&mp_pck, *log_event.body);
1151
0
        }
1152
0
        else {
1153
0
            msgpack_pack_str(&mp_pck, 20);
1154
0
            msgpack_pack_str_body(&mp_pck, "log_attribute_missing", 20);
1155
0
        }
1156
1157
0
        json_record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size,
1158
0
                                                  config->json_escape_unicode);
1159
0
        if (!json_record) {
1160
0
            flb_plg_error(ctx->ins, "error converting msgpack to JSON");
1161
0
            flb_sds_destroy(out_buf);
1162
0
            msgpack_sbuffer_destroy(&mp_sbuf);
1163
0
            flb_log_event_decoder_destroy(&log_decoder);
1164
0
            return -1;
1165
0
        }
1166
1167
        /* Concatenate the JSON record to the output buffer */
1168
0
        tmp_buf = flb_sds_cat(out_buf, json_record, flb_sds_len(json_record));
1169
0
        if (!tmp_buf) {
1170
0
            flb_plg_error(ctx->ins, "error appending JSON record");
1171
0
            flb_sds_destroy(json_record);
1172
0
            flb_sds_destroy(out_buf);
1173
0
            msgpack_sbuffer_destroy(&mp_sbuf);
1174
0
            flb_log_event_decoder_destroy(&log_decoder);
1175
0
            return -1;
1176
0
        }
1177
0
        out_buf = tmp_buf;
1178
1179
0
        tmp_buf = flb_sds_cat(out_buf, "\n", 1);
1180
0
        if (!tmp_buf) {
1181
0
            flb_plg_error(ctx->ins, "error appending JSON record delimiter");
1182
0
            flb_sds_destroy(json_record);
1183
0
            flb_sds_destroy(out_buf);
1184
0
            msgpack_sbuffer_destroy(&mp_sbuf);
1185
0
            flb_log_event_decoder_destroy(&log_decoder);
1186
0
            return -1;
1187
0
        }
1188
0
        out_buf = tmp_buf;
1189
1190
0
        flb_sds_destroy(json_record);
1191
0
    }
1192
1193
0
    msgpack_sbuffer_destroy(&mp_sbuf);
1194
0
    flb_log_event_decoder_destroy(&log_decoder);
1195
1196
0
    *out_data = out_buf;
1197
0
    *out_size = flb_sds_len(out_buf);
1198
1199
0
    return 0;
1200
0
}
1201
1202
/**
1203
 * Buffer a data chunk into the file storage for later ingestion.
1204
 *
1205
 * Writes the formatted chunk data to the upload file via the store layer.
1206
 *
1207
 * @param out_context  Plugin's context (cast to struct flb_azure_kusto).
1208
 * @param upload_file  Target file handle for buffered storage.
1209
 * @param chunk        Data chunk to buffer.
1210
 * @param chunk_size   Size of the data chunk.
1211
 * @param tag          Fluent Bit tag associated with the chunk.
1212
 * @param tag_len      Length of the tag string.
1213
 * @return int         0 on success, -1 on failure.
1214
 */
1215
static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file,
1216
                        flb_sds_t chunk, int chunk_size,
1217
                        flb_sds_t tag, size_t tag_len)
1218
0
{
1219
0
    int ret;
1220
0
    struct flb_azure_kusto *ctx = out_context;
1221
1222
0
    flb_plg_trace(ctx->ins, "Buffering chunk %d", chunk_size);
1223
1224
0
    ret = azure_kusto_store_buffer_put(ctx, upload_file, tag,
1225
0
                                       tag_len, chunk, chunk_size);
1226
0
    if (ret < 0) {
1227
0
        flb_plg_error(ctx->ins, "Could not buffer chunk. ");
1228
0
        return -1;
1229
0
    }
1230
0
    return 0;
1231
0
}
1232
1233
/**
1234
 * @brief Initialize the flush process for Azure Kusto output plugin.
1235
 *
1236
 * This function is responsible for setting up the initial conditions required
1237
 * for flushing data to Azure Kusto. It performs the following tasks:
1238
 *
1239
 * 1. **Old Buffer Cleanup**: Checks if there are any old buffers from previous
1240
 *    executions that need to be sent to Azure Kusto. If such buffers exist, it
1241
 *    attempts to ingest all chunks of data. If the ingestion fails, it logs an
1242
 *    error and marks the buffers to be retried later.
1243
 *
1244
 * 2. **Upload Timer Setup**: If not already created, it sets up a periodic timer
1245
 *    that checks for uploads ready for completion. This timer is crucial for
1246
 *    ensuring that data is uploaded at regular intervals.
1247
 *
1248
 * @param out_context Pointer to the output context, specifically the Azure Kusto context.
1249
 * @param config Pointer to the Fluent Bit configuration structure.
1250
 */
1251
static void flush_init(void *out_context, struct flb_config *config)
1252
0
{
1253
0
    int ret;
1254
0
    struct flb_azure_kusto *ctx = out_context;
1255
0
    struct flb_sched *sched;
1256
1257
0
    flb_plg_debug(ctx->ins,
1258
0
                  "inside flush_init with old_buffers as %d",
1259
0
                  ctx->has_old_buffers);
1260
1261
    /* clean up any old buffers found on startup */
1262
0
    if (ctx->has_old_buffers == FLB_TRUE) {
1263
0
        flb_plg_info(ctx->ins,
1264
0
                     "Sending locally buffered data from previous "
1265
0
                     "executions to kusto; buffer=%s",
1266
0
                     ctx->fs->root_path);
1267
0
        ctx->has_old_buffers = FLB_FALSE;
1268
0
        ret = ingest_all_chunks(ctx, config);
1269
0
        if (ret < 0) {
1270
0
            ctx->has_old_buffers = FLB_TRUE;
1271
0
            flb_plg_error(ctx->ins,
1272
0
                          "Failed to send locally buffered data left over "
1273
0
                          "from previous executions; will retry. Buffer=%s",
1274
0
                          ctx->fs->root_path);
1275
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1276
0
        }
1277
0
    }
1278
0
    else {
1279
0
        flb_plg_debug(ctx->ins,
1280
0
                     "Did not find any local buffered data from previous "
1281
0
                     "executions to kusto; buffer=%s",
1282
0
                     ctx->fs->root_path);
1283
0
    }
1284
1285
    /*
1286
    * create a timer that will run periodically and check if uploads
1287
    * are ready for completion
1288
    * this is created once on the first flush
1289
    */
1290
0
    if (ctx->timer_created == FLB_FALSE) {
1291
0
        flb_plg_debug(ctx->ins,
1292
0
                      "Creating upload timer with frequency %ds",
1293
0
                      ctx->timer_ms / 1000);
1294
1295
0
        sched = flb_sched_ctx_get();
1296
1297
0
        ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
1298
0
                                        ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL);
1299
0
        if (ret == -1) {
1300
0
            flb_plg_error(ctx->ins, "Failed to create upload timer");
1301
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1302
0
        }
1303
0
        ctx->timer_created = FLB_TRUE;
1304
0
    }
1305
0
}
1306
1307
/**
1308
 * This function handles the flushing of event data to Azure Kusto.
1309
 * It manages both buffered and non-buffered modes, handles JSON formatting,
1310
 * compression, and manages file uploads based on conditions like timeout and file size.
1311
 *
1312
 * @param event_chunk The event chunk containing the data to be flushed.
1313
 * @param out_flush The output flush context.
1314
 * @param i_ins The input instance (unused).
1315
 * @param out_context The output context, specifically for Azure Kusto.
1316
 * @param config The configuration context (unused).
1317
 */
1318
static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
1319
                                 struct flb_output_flush *out_flush,
1320
                                 struct flb_input_instance *i_ins, void *out_context,
1321
                                 struct flb_config *config)
1322
0
{
1323
0
    int ret;
1324
0
    flb_sds_t json = NULL;
1325
0
    size_t json_size;
1326
0
    size_t tag_len;
1327
0
    struct flb_azure_kusto *ctx = out_context;
1328
0
    int is_compressed = FLB_FALSE;
1329
0
    struct azure_kusto_file *upload_file = NULL;
1330
0
    int upload_timeout_check = FLB_FALSE;
1331
0
    int total_file_size_check = FLB_FALSE;
1332
0
    flb_sds_t tag_name = NULL;
1333
0
    size_t tag_name_len;
1334
1335
0
    (void)i_ins;
1336
0
    (void)config;
1337
1338
0
    void *final_payload = NULL;
1339
0
    size_t final_payload_size = 0;
1340
1341
0
    flb_plg_debug(ctx->ins, "flushing bytes for event tag %s and size %zu", event_chunk->tag ,event_chunk->size);
1342
1343
    /* Get the length of the event tag */
1344
0
    tag_len = flb_sds_len(event_chunk->tag);
1345
1346
0
    if (ctx->buffering_enabled == FLB_TRUE) {
1347
    /* Determine the tag name based on the unify_tag setting */
1348
0
        if (ctx->unify_tag == FLB_TRUE){
1349
0
            tag_name = flb_sds_create("fluentbit-buffer-file-unify-tag.log");
1350
0
        }
1351
0
        else {
1352
0
            tag_name = event_chunk->tag;
1353
0
        }
1354
0
        tag_name_len = flb_sds_len(tag_name);
1355
        /* Initialize the flush process */
1356
0
        flush_init(ctx,config);
1357
1358
        /* Reformat msgpack to JSON payload */
1359
0
        ret = azure_kusto_format(ctx, tag_name, tag_name_len, event_chunk->data,
1360
0
                                 event_chunk->size, (void **)&json, &json_size,
1361
0
                                 config);
1362
0
        if (ret != 0) {
1363
0
            flb_plg_error(ctx->ins, "cannot reformat data into json");
1364
0
            ret = FLB_RETRY;
1365
0
            goto error;
1366
0
        }
1367
1368
        /* Get a file candidate matching the given 'tag' */
1369
0
        upload_file = azure_kusto_store_file_get(ctx,
1370
0
                                                 tag_name,
1371
0
                                                 tag_name_len);
1372
1373
        /* Check if the file has failed to upload too many times */
1374
0
        if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) {
1375
0
            flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
1376
0
                                   "retry", event_chunk->tag, ctx->scheduler_max_retries);
1377
0
            if (ctx->delete_on_max_upload_error){
1378
0
                azure_kusto_store_file_delete(ctx, upload_file);
1379
0
            }
1380
0
            else {
1381
0
                azure_kusto_store_file_inactive(ctx, upload_file);
1382
0
            }
1383
0
            upload_file = NULL;
1384
0
        }
1385
1386
        /* Check if the upload timeout has elapsed */
1387
0
        if (upload_file != NULL && time(NULL) >
1388
0
                                   (upload_file->create_time + ctx->upload_timeout)) {
1389
0
            upload_timeout_check = FLB_TRUE;
1390
0
            flb_plg_trace(ctx->ins, "upload_timeout reached for %s",
1391
0
                          event_chunk->tag);
1392
0
        }
1393
1394
        /* Check if the total file size has been exceeded */
1395
0
        if (upload_file && upload_file->size + json_size > ctx->file_size) {
1396
0
            flb_plg_trace(ctx->ins, "total_file_size exceeded %s",
1397
0
                          event_chunk->tag);
1398
0
            total_file_size_check = FLB_TRUE;
1399
0
        }
1400
1401
        /* If the file is ready for upload */
1402
0
        if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
1403
0
            flb_plg_debug(ctx->ins, "uploading file %s with size %zu", upload_file->fsf->name, upload_file->size);
1404
            /* Load or refresh ingestion resources */
1405
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
1406
0
            if (ret != 0) {
1407
0
                flb_plg_error(ctx->ins, "cannot load ingestion resources");
1408
0
                ret = FLB_RETRY;
1409
0
                goto error;
1410
0
            }
1411
1412
            /* Ingest data to kusto */
1413
0
            ret = ingest_to_kusto(ctx, json, upload_file,
1414
0
                                      tag_name,
1415
0
                                      tag_name_len);
1416
1417
0
            if (ret == 0){
1418
0
                if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){
1419
0
                    flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation");
1420
0
                    ret = FLB_OK;
1421
0
                    goto cleanup;
1422
0
                }
1423
0
                else{
1424
0
                    ret = azure_kusto_store_file_delete(ctx, upload_file);
1425
0
                    if (ret != 0){
1426
                        /* File couldn't be deleted */
1427
0
                        ret = FLB_RETRY;
1428
0
                        if (upload_file){
1429
0
                            azure_kusto_store_file_unlock(upload_file);
1430
0
                            upload_file->failures += 1;
1431
0
                        }
1432
0
                        goto error;
1433
0
                    }
1434
0
                    else{
1435
                        /* File deleted successfully */
1436
0
                        ret = FLB_OK;
1437
0
                        goto cleanup;
1438
0
                    }
1439
0
                }
1440
0
            }
1441
0
            else{
1442
0
                flb_plg_error(ctx->ins, "azure_kusto:: unable to ingest data into kusto : retrying");
1443
0
                ret = FLB_RETRY;
1444
0
                if (upload_file){
1445
0
                    azure_kusto_store_file_unlock(upload_file);
1446
0
                    upload_file->failures += 1;
1447
0
                }
1448
0
                goto cleanup;
1449
0
            }
1450
0
        }
1451
1452
        /* Buffer the current chunk in the filesystem */
1453
0
        ret = buffer_chunk(ctx, upload_file, json, json_size,
1454
0
                           tag_name, tag_name_len);
1455
1456
0
        if (ret == 0) {
1457
0
            flb_plg_debug(ctx->ins, "buffered chunk %s", event_chunk->tag);
1458
0
            ret = FLB_OK;
1459
0
        }
1460
0
        else {
1461
0
            flb_plg_error(ctx->ins, "failed to buffer chunk %s", event_chunk->tag);
1462
0
            ret = FLB_RETRY;
1463
0
        }
1464
0
        goto cleanup;
1465
1466
0
    }
1467
0
    else {
1468
        /* Buffering mode is disabled, proceed with regular flush */
1469
1470
        /* Reformat msgpack data to JSON payload */
1471
0
        ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
1472
0
                                 event_chunk->size, (void **)&json, &json_size,
1473
0
                                 config);
1474
0
        if (ret != 0) {
1475
0
            flb_plg_error(ctx->ins, "cannot reformat data into json");
1476
0
            ret = FLB_RETRY;
1477
0
            goto error;
1478
0
        }
1479
1480
0
        flb_plg_debug(ctx->ins, "payload size before compression %zu", json_size);
1481
        /* Map buffer */
1482
0
        final_payload = json;
1483
0
        final_payload_size = json_size;
1484
        /* Check if compression is enabled */
1485
0
        if (ctx->compression_enabled == FLB_TRUE) {
1486
0
            ret = flb_gzip_compress((void *) json, json_size,
1487
0
                                    &final_payload, &final_payload_size);
1488
0
            if (ret != 0) {
1489
0
                flb_plg_error(ctx->ins,
1490
0
                              "cannot gzip payload");
1491
0
                ret = FLB_ERROR;
1492
0
                goto error;
1493
0
            }
1494
0
            else {
1495
0
                is_compressed = FLB_TRUE;
1496
0
                flb_plg_debug(ctx->ins, "enabled payload gzip compression");
1497
                /* JSON buffer will be cleared at cleanup: */
1498
0
            }
1499
0
        }
1500
0
        flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);
1501
1502
        /* Load or refresh ingestion resources */
1503
0
        ret = azure_kusto_load_ingestion_resources(ctx, config);
1504
0
        flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1505
0
        if (ret != 0) {
1506
0
            flb_plg_error(ctx->ins, "cannot load ingestion resources");
1507
0
            ret = FLB_RETRY;
1508
0
            goto error;
1509
0
        }
1510
1511
        /* Perform queued ingestion to Kusto */
1512
0
        ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1513
0
        flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1514
0
        if (ret != 0) {
1515
0
            flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1516
0
            ret = FLB_RETRY;
1517
0
            goto error;
1518
0
        }
1519
1520
0
        ret = FLB_OK;
1521
0
        goto cleanup;
1522
0
    }
1523
1524
0
    cleanup:
1525
    /* Cleanup resources */
1526
0
    if (json) {
1527
0
        flb_sds_destroy(json);
1528
0
    }
1529
0
    if (is_compressed && final_payload) {
1530
0
        flb_free(final_payload);
1531
0
    }
1532
0
    if (tag_name) {
1533
0
        flb_sds_destroy(tag_name);
1534
0
    }
1535
0
    FLB_OUTPUT_RETURN(ret);
1536
1537
0
    error:
1538
    /* Error handling and cleanup */
1539
0
    if (json) {
1540
0
        flb_sds_destroy(json);
1541
0
    } 
1542
0
    if (is_compressed && final_payload) {
1543
0
        flb_free(final_payload);
1544
0
    }
1545
0
    if (tag_name) {
1546
0
        flb_sds_destroy(tag_name);
1547
0
    }
1548
0
    FLB_OUTPUT_RETURN(ret);
1549
0
}
1550
1551
/**
1552
 * cb_azure_kusto_exit - Clean up and finalize the Azure Kusto plugin context.
1553
 *
1554
 * This function is responsible for performing cleanup operations when the
1555
 * Azure Kusto plugin is exiting. It ensures that all resources are properly
1556
 * released and any remaining data is sent to Azure Kusto before the plugin
1557
 * shuts down.
1558
 *
1559
 * Functionality:
1560
 * - Checks if the plugin context (`ctx`) is valid. If not, it returns an error.
1561
 * - If there is locally buffered data, it attempts to send all chunks to Azure
1562
 *   Kusto using the `ingest_all_chunks` function. Logs an error if the operation
1563
 *   fails.
1564
 * - Destroys any active upstream connections (`ctx->u` and `ctx->imds_upstream`)
1565
 *   to free network resources.
1566
 * - Destroys mutexes (`resources_mutex`, `token_mutex`, `blob_mutex`) to ensure
1567
 *   proper synchronization cleanup.
1568
 * - Calls `azure_kusto_store_exit` to perform any additional storage-related
1569
 *   cleanup operations.
1570
 * - Finally, it calls `flb_azure_kusto_conf_destroy` to free the plugin context
1571
 *   and its associated resources.
1572
 *
1573
 * Parameters:
1574
 * - data: A pointer to the plugin context (`struct flb_azure_kusto`).
1575
 * - config: A pointer to the Fluent Bit configuration (`struct flb_config`).
1576
 *
1577
 * Returns:
1578
 * - 0 on successful cleanup.
1579
 * - -1 if the context is invalid or if an error occurs during cleanup.
1580
 */
1581
static int cb_azure_kusto_exit(void *data, struct flb_config *config)
1582
0
{
1583
0
    struct flb_azure_kusto *ctx = data;
1584
0
    int ret = -1;
1585
1586
0
    if (!ctx) {
1587
0
        return -1;
1588
0
    }
1589
1590
1591
0
    if (ctx->buffering_enabled == FLB_TRUE){
1592
0
        if (azure_kusto_store_has_data(ctx) == FLB_TRUE) {
1593
0
            flb_plg_info(ctx->ins, "Sending all locally buffered data to Kusto");
1594
0
            ret = ingest_all_chunks(ctx, config);
1595
0
            if (ret < 0) {
1596
0
                flb_plg_error(ctx->ins, "Could not send all chunks on exit");
1597
0
            }
1598
0
        }
1599
0
        azure_kusto_store_exit(ctx);
1600
0
    }
1601
1602
0
    if (ctx->u) {
1603
0
        flb_upstream_destroy(ctx->u);
1604
0
        ctx->u = NULL;
1605
0
    }
1606
1607
0
    pthread_mutex_destroy(&ctx->resources_mutex);
1608
0
    pthread_mutex_destroy(&ctx->token_mutex);
1609
0
    pthread_mutex_destroy(&ctx->blob_mutex);
1610
1611
0
    flb_azure_kusto_conf_destroy(ctx);
1612
1613
0
    return 0;
1614
0
}
1615
1616
static struct flb_config_map config_map[] = {
1617
    {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE,
1618
     offsetof(struct flb_azure_kusto, tenant_id),
1619
     "Set the tenant ID of the AAD application used for authentication"},
1620
    {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
1621
     offsetof(struct flb_azure_kusto, client_id),
1622
     "Set the client ID (Application ID) of the AAD application or the user-assigned managed identity's client ID when using managed identity authentication"},
1623
    {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
1624
     offsetof(struct flb_azure_kusto, client_secret),
1625
     "Set the client secret (Application Password) of the AAD application used for "
1626
     "authentication"},
1627
    {FLB_CONFIG_MAP_STR, "workload_identity_token_file", (char *)NULL, 0, FLB_TRUE,
1628
     offsetof(struct flb_azure_kusto, workload_identity_token_file),
1629
     "Set the token file path for workload identity authentication"},
1630
    {FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE,
1631
     offsetof(struct flb_azure_kusto, auth_type_str),
1632
     "Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. "
1633
     "For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"},
1634
    {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
1635
     offsetof(struct flb_azure_kusto, ingestion_endpoint),
1636
     "Set the Kusto cluster's ingestion endpoint URL (e.g. "
1637
     "https://ingest-mycluster.eastus.kusto.windows.net)"},
1638
    {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE,
1639
     offsetof(struct flb_azure_kusto, database_name), "Set the database name"},
1640
    {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE,
1641
     offsetof(struct flb_azure_kusto, table_name), "Set the table name"},
1642
    {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE,
1643
     offsetof(struct flb_azure_kusto, ingestion_mapping_reference),
1644
     "Set the ingestion mapping reference"},
1645
    {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE,
1646
     offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},
1647
    {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE,
1648
     offsetof(struct flb_azure_kusto, include_tag_key),
1649
     "If enabled, tag is appended to output. "
1650
     "The key name is used 'tag_key' property."},
1651
    {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE,
1652
     offsetof(struct flb_azure_kusto, tag_key),
1653
     "The key name of tag. If 'include_tag_key' is false, "
1654
     "This property is ignored"},
1655
    {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE,
1656
     offsetof(struct flb_azure_kusto, include_time_key),
1657
     "If enabled, time is appended to output. "
1658
     "The key name is used 'time_key' property."},
1659
    {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE,
1660
     offsetof(struct flb_azure_kusto, time_key),
1661
     "The key name of the time. If 'include_time_key' is false, "
1662
     "This property is ignored"},
1663
    {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
1664
     offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
1665
    "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds."
1666
    "The default is 60 seconds."},
1667
    {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
1668
     offsetof(struct flb_azure_kusto, compression_enabled),
1669
    "Enable HTTP payload compression (gzip)."
1670
    "The default is true."},
1671
    {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
1672
     offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
1673
    "Set the azure kusto ingestion resources refresh interval"
1674
    "The default is 3600 seconds."},
1675
    {FLB_CONFIG_MAP_BOOL, "buffering_enabled", "false", 0, FLB_TRUE,
1676
     offsetof(struct flb_azure_kusto, buffering_enabled), "Enable buffering into disk before ingesting into Azure Kusto."
1677
    },
1678
    {FLB_CONFIG_MAP_STR, "buffer_dir", "/tmp/fluent-bit/azure-kusto/", 0, FLB_TRUE,
1679
     offsetof(struct flb_azure_kusto, buffer_dir), "Specifies the location of directory where the buffered data will be stored."
1680
    },
1681
    {FLB_CONFIG_MAP_TIME, "upload_timeout", "30m",
1682
     0, FLB_TRUE, offsetof(struct flb_azure_kusto, upload_timeout),
1683
    "Optionally specify a timeout for uploads. "
1684
    "Fluent Bit will start ingesting buffer files which have been created more than x minutes and haven't reached upload_file_size limit yet.  "
1685
    " Default is 30m."
1686
    },
1687
    {FLB_CONFIG_MAP_SIZE, "upload_file_size", "200M",
1688
     0, FLB_TRUE, offsetof(struct flb_azure_kusto, file_size),
1689
    "Specifies the size of files to be uploaded in MBs. Default is 200MB"
1690
    },
1691
    {FLB_CONFIG_MAP_STR, "azure_kusto_buffer_key", "key",0, FLB_TRUE,
1692
     offsetof(struct flb_azure_kusto, azure_kusto_buffer_key),
1693
    "Set the azure kusto buffer key which needs to be specified when using multiple instances of azure kusto output plugin and buffering is enabled"
1694
    },
1695
    {FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE,0, FLB_TRUE,
1696
     offsetof(struct flb_azure_kusto, store_dir_limit_size),
1697
    "Set the max size of the buffer directory. Default is 8GB"
1698
    },
1699
    {FLB_CONFIG_MAP_BOOL, "buffer_file_delete_early", "false",0, FLB_TRUE,
1700
     offsetof(struct flb_azure_kusto, buffer_file_delete_early),
1701
    "Whether to delete the buffered file early after successful blob creation. Default is false"
1702
    },
1703
    {FLB_CONFIG_MAP_BOOL, "unify_tag", "true",0, FLB_TRUE,
1704
     offsetof(struct flb_azure_kusto, unify_tag),
1705
    "This creates a single buffer file when the buffering mode is ON. Default is true"
1706
    },
1707
    {FLB_CONFIG_MAP_INT, "blob_uri_length", "64",0, FLB_TRUE,
1708
     offsetof(struct flb_azure_kusto, blob_uri_length),
1709
    "Set the length of generated blob uri before ingesting to kusto. Default is 64"
1710
    },
1711
    {FLB_CONFIG_MAP_INT, "scheduler_max_retries", "3",0, FLB_TRUE,
1712
     offsetof(struct flb_azure_kusto, scheduler_max_retries),
1713
    "Set the maximum number of retries for ingestion using the scheduler. Default is 3"
1714
    },
1715
    {FLB_CONFIG_MAP_BOOL, "delete_on_max_upload_error", "false",0, FLB_TRUE,
1716
     offsetof(struct flb_azure_kusto, delete_on_max_upload_error),
1717
    "Whether to delete the buffer file on maximum upload errors. Default is false"
1718
    },
1719
    {FLB_CONFIG_MAP_TIME, "io_timeout", "60s",0, FLB_TRUE,
1720
     offsetof(struct flb_azure_kusto, io_timeout),
1721
    "HTTP IO timeout. Default is 60s"
1722
    },
1723
    /* EOF */
1724
    {0}};
1725
1726
struct flb_output_plugin out_azure_kusto_plugin = {
1727
    .name = "azure_kusto",
1728
    .description = "Send events to Kusto (Azure Data Explorer)",
1729
    .cb_init = cb_azure_kusto_init,
1730
    .cb_flush = cb_azure_kusto_flush,
1731
    .cb_exit = cb_azure_kusto_exit,
1732
    .config_map = config_map,
1733
    /* Plugin flags */
1734
    .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
1735
};