Coverage Report

Created: 2026-03-09 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_azure_kusto/azure_kusto.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_http_client.h>
21
#include <fluent-bit/flb_kv.h>
22
#include <fluent-bit/flb_oauth2.h>
23
#include <fluent-bit/flb_output_plugin.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_log_event_decoder.h>
26
#include <fluent-bit/flb_scheduler.h>
27
#include <fluent-bit/flb_gzip.h>
28
#include <fluent-bit/flb_utils.h>
29
#include <stdio.h>
30
#include <fluent-bit/flb_sds.h>
31
#include <fluent-bit/flb_fstore.h>
32
#include <msgpack.h>
33
#include <fluent-bit/flb_version.h>
34
#include <inttypes.h>
35
36
#include "azure_kusto.h"
37
#include "azure_kusto_conf.h"
38
#include "azure_kusto_ingest.h"
39
#include "azure_msiauth.h"
40
#include "azure_kusto_store.h"
41
42
static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
43
0
{
44
0
    char *token;
45
46
    /* Retrieve access token */
47
0
    token = flb_azure_msiauth_token_get(ctx->o);
48
0
    if (!token) {
49
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
50
0
        return -1;
51
0
    }
52
53
0
    return 0;
54
0
}
55
56
static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
57
0
{
58
0
    int ret;
59
    
60
0
    ret = flb_azure_workload_identity_token_get(ctx->o, 
61
0
                                               ctx->workload_identity_token_file,
62
0
                                               ctx->client_id, 
63
0
                                               ctx->tenant_id);
64
0
    if (ret == -1) {
65
0
        flb_plg_error(ctx->ins, "error retrieving workload identity token");
66
0
        return -1;
67
0
    }
68
    
69
0
    flb_plg_debug(ctx->ins, "Workload identity token retrieved successfully");
70
0
    return 0;
71
0
}
72
73
static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
74
0
{
75
0
    int ret;
76
    
77
    /* Clear any previous oauth2 payload content */
78
0
    flb_oauth2_payload_clear(ctx->o);
79
80
0
    ret = flb_oauth2_payload_append(ctx->o, "grant_type", 10, "client_credentials", 18);
81
0
    if (ret == -1) {
82
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
83
0
        return -1;
84
0
    }
85
86
0
    ret = flb_oauth2_payload_append(ctx->o, "scope", 5, FLB_AZURE_KUSTO_SCOPE, 39);
87
0
    if (ret == -1) {
88
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
89
0
        return -1;
90
0
    }
91
92
0
    ret = flb_oauth2_payload_append(ctx->o, "client_id", 9, ctx->client_id, -1);
93
0
    if (ret == -1) {
94
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
95
0
        return -1;
96
0
    }
97
98
0
    ret = flb_oauth2_payload_append(ctx->o, "client_secret", 13, ctx->client_secret, -1);
99
0
    if (ret == -1) {
100
0
        flb_plg_error(ctx->ins, "error appending oauth2 params");
101
0
        return -1;
102
0
    }
103
104
    /* Retrieve access token */
105
0
    char *token = flb_oauth2_token_get(ctx->o);
106
0
    if (!token) {
107
0
        flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
108
0
        return -1;
109
0
    }
110
111
0
    flb_plg_debug(ctx->ins, "OAuth2 token retrieval process completed successfully");
112
0
    return 0;
113
0
}
114
115
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
116
0
{
117
0
    int ret = 0;
118
0
    flb_sds_t output = NULL;
119
120
0
    if (pthread_mutex_lock(&ctx->token_mutex)) {
121
0
        flb_plg_error(ctx->ins, "error locking mutex");
122
0
        return NULL;
123
0
    }
124
125
0
    if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
126
0
        switch (ctx->auth_type) {
127
0
            case FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY:
128
0
                ret = azure_kusto_get_workload_identity_token(ctx);
129
0
                break;
130
0
            case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM:
131
0
            case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER:
132
0
                ret = azure_kusto_get_msi_token(ctx);
133
0
                break;
134
0
            case FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL:
135
0
            default:
136
0
                ret = azure_kusto_get_service_principal_token(ctx);
137
0
                break;
138
0
        }
139
0
    }
140
141
    /* Copy string to prevent race conditions (get_oauth2 can free the string) */
142
0
    if (ret == 0) {
143
0
        output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
144
0
                                     flb_sds_len(ctx->o->access_token) + 2);
145
0
        if (!output) {
146
0
            flb_plg_error(ctx->ins, "error creating token buffer");
147
0
            return NULL;
148
0
        }
149
0
        flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type,
150
0
                         ctx->o->access_token);
151
0
    }
152
153
0
    if (pthread_mutex_unlock(&ctx->token_mutex)) {
154
0
        flb_plg_error(ctx->ins, "error unlocking mutex");
155
0
        if (output) {
156
0
            flb_sds_destroy(output);
157
0
        }
158
0
        return NULL;
159
0
    }
160
161
0
    return output;
162
0
}
163
164
/**
165
 * Executes a control command against kusto's endpoint
166
 *
167
 * @param ctx       Plugin's context
168
 * @param csl       Kusto's control command
169
 * @return flb_sds_t      Returns the response or NULL on error.
170
 */
171
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl)
172
0
{
173
0
    flb_sds_t token;
174
0
    flb_sds_t body;
175
0
    size_t b_sent;
176
0
    int ret;
177
0
    struct flb_connection *u_conn;
178
0
    struct flb_http_client *c;
179
0
    flb_sds_t resp = NULL;
180
181
0
    flb_plg_debug(ctx->ins, "before getting upstream connection");
182
183
0
    flb_plg_debug(ctx->ins, "Logging attributes of flb_azure_kusto_resources:");
184
0
    flb_plg_debug(ctx->ins, "blob_ha: %p", ctx->resources->blob_ha);
185
0
    flb_plg_debug(ctx->ins, "queue_ha: %p", ctx->resources->queue_ha);
186
0
    flb_plg_debug(ctx->ins, "load_time: %" PRIu64, ctx->resources->load_time);
187
188
0
    ctx->u->base.net.connect_timeout = ctx->ingestion_endpoint_connect_timeout;
189
0
    if (ctx->buffering_enabled == FLB_TRUE){
190
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
191
0
    }
192
0
    flb_plg_debug(ctx->ins, "execute_ingest_csl_command -- async flag is %d", flb_stream_is_async(&ctx->u->base));
193
194
    /* Get upstream connection */
195
0
    u_conn = flb_upstream_conn_get(ctx->u);
196
197
0
    if (u_conn) {
198
0
        token = get_azure_kusto_token(ctx);
199
200
0
        if (token) {
201
            /* Compose request body */
202
0
            body = flb_sds_create_size(sizeof(FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE) - 1 +
203
0
                                       strlen(csl));
204
205
0
            if (body) {
206
0
                flb_sds_snprintf(&body, flb_sds_alloc(body),
207
0
                                 FLB_AZURE_KUSTO_MGMT_BODY_TEMPLATE, csl);
208
209
                /* Compose HTTP Client request */
210
0
                c = flb_http_client(u_conn, FLB_HTTP_POST, FLB_AZURE_KUSTO_MGMT_URI_PATH,
211
0
                                    body, flb_sds_len(body), NULL, 0, NULL, 0);
212
213
0
                if (c) {
214
                    /* Add headers */
215
0
                    flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
216
0
                    flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
217
0
                    flb_http_add_header(c, "Accept", 6, "application/json", 16);
218
0
                    flb_http_add_header(c, "Authorization", 13, token,
219
0
                                        flb_sds_len(token));
220
0
                    flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
221
0
                    flb_http_add_header(c, "x-ms-app", 8, "Fluent-Bit", 10);
222
0
                    flb_http_add_header(c, "x-ms-user", 9, "Fluent-Bit", 10);
223
0
                    flb_http_buffer_size(c, FLB_HTTP_DATA_SIZE_MAX * 10);
224
225
                    /* Send HTTP request */
226
0
                    ret = flb_http_do(c, &b_sent);
227
0
                    flb_plg_debug(
228
0
                            ctx->ins,
229
0
                            "Kusto ingestion command request http_do=%i, HTTP Status: %i",
230
0
                            ret, c->resp.status);
231
0
                    flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP response payload: %.*s", (int)c->resp.payload_size, c->resp.payload);
232
233
0
                    if (ret == 0) {
234
0
                        if (c->resp.status == 200) {
235
                            /* Copy payload response to the response param */
236
0
                            resp = flb_sds_create_len(c->resp.payload, c->resp.payload_size);
237
0
                        }
238
0
                        else {
239
0
                            flb_plg_error(ctx->ins, "Kusto Ingestion Resources Request failed with HTTP Status: %i", c->resp.status);
240
0
                            if (c->resp.payload_size > 0) {
241
0
                                flb_plg_error(ctx->ins, "Kusto Ingestion Resources Response payload: \n%s", c->resp.payload);
242
0
                            }
243
0
                        }
244
0
                    }
245
0
                    else {
246
0
                        flb_plg_error(ctx->ins, "Kusto Ingestion Resources :: cannot send HTTP request");
247
0
                    }
248
249
0
                    flb_http_client_destroy(c);
250
0
                }
251
0
                else {
252
0
                    flb_plg_error(ctx->ins, "cannot create HTTP client context");
253
0
                }
254
255
0
                flb_sds_destroy(body);
256
0
            }
257
0
            else {
258
0
                flb_plg_error(ctx->ins, "cannot construct request body");
259
0
            }
260
261
0
            flb_sds_destroy(token);
262
0
        }
263
0
        else {
264
0
            flb_plg_error(ctx->ins, "cannot retrieve oauth2 token");
265
0
        }
266
267
0
        flb_upstream_conn_release(u_conn);
268
0
    }
269
0
    else {
270
0
        flb_plg_error(ctx->ins, "cannot create upstream connection");
271
0
    }
272
273
0
    return resp;
274
0
}
275
276
/**
277
 * construct_request_buffer - Constructs a request buffer for Azure Kusto ingestion.
278
 *
279
 * This function is responsible for preparing a data buffer that will be used
280
 * to send data to Azure Kusto. It handles both new incoming data and data
281
 * that has been previously buffered in a file. The function performs the
282
 * following tasks:
283
 *
284
 * 1. Validates Input: Checks if both `new_data` and `upload_file` are NULL,
285
 *    which would indicate an error since there is no data to process.
286
 *
287
 * 2. Reads Buffered Data: If an `upload_file` is provided, it reads the
288
 *    locally buffered data from the file and locks the file to prevent
289
 *    concurrent modifications.
290
 *
291
 * 3. Appends New Data: If `new_data` is provided, it appends this data to
292
 *    the buffered data, reallocating memory as necessary to accommodate the
293
 *    combined data size.
294
 *
295
 * 4. Outputs the Result: Sets the output parameters `out_buf` and `out_size`
296
 *    to point to the constructed buffer and its size, respectively.
297
 *
298
 * The function ensures that the buffer is correctly terminated if compression
299
 * is not enabled, and it handles memory allocation and error checking
300
 * throughout the process.
301
 *
302
 * Parameters:
303
 * @ctx:        The context containing configuration and state information.
304
 * @new_data:   The new data to be appended to the buffer, if any.
305
 * @upload_file: The file containing previously buffered data, if any.
306
 * @out_buf:    Pointer to the output buffer that will be constructed.
307
 * @out_size:   Pointer to the size of the constructed buffer.
308
 *
309
 * Returns:
310
 * 0 on success, or -1 on failure with an appropriate error message logged.
311
 */
312
static int construct_request_buffer(struct flb_azure_kusto *ctx, flb_sds_t new_data,
313
                                    struct azure_kusto_file *upload_file,
314
                                    char **out_buf, size_t *out_size)
315
0
{
316
0
    char *body;
317
0
    char *tmp;
318
0
    size_t body_size = 0;
319
0
    char *buffered_data = NULL;
320
0
    size_t buffer_size = 0;
321
0
    int ret;
322
323
0
    if (new_data == NULL && upload_file == NULL) {
324
0
        flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong"
325
0
                                " both chunk and new_data are NULL");
326
0
        return -1;
327
0
    }
328
329
0
    if (upload_file) {
330
0
        ret = azure_kusto_store_file_upload_read(ctx, upload_file->fsf, &buffered_data, &buffer_size);
331
0
        if (ret < 0) {
332
0
            flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
333
0
                          upload_file->fsf->name);
334
0
            return -1;
335
0
        }
336
337
        /*
338
         * lock the upload_file from buffer list
339
         */
340
0
        azure_kusto_store_file_lock(upload_file);
341
0
        body = buffered_data;
342
0
        body_size = buffer_size;
343
0
    }
344
345
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] size of buffer file read %zu", buffer_size);
346
347
    /*
348
     * If new data is arriving, increase the original 'buffered_data' size
349
     * to append the new one.
350
     */
351
0
    if (new_data) {
352
0
        body_size += flb_sds_len(new_data);
353
0
        flb_plg_debug(ctx->ins, "[construct_request_buffer] size of new_data %zu", body_size);
354
355
0
        tmp = flb_realloc(buffered_data, body_size + 1);
356
0
        if (!tmp) {
357
0
            flb_errno();
358
0
            flb_free(buffered_data);
359
0
            if (upload_file) {
360
0
                azure_kusto_store_file_unlock(upload_file);
361
0
            }
362
0
            return -1;
363
0
        }
364
0
        body = buffered_data = tmp;
365
0
        memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
366
0
        if (ctx->compression_enabled == FLB_FALSE){
367
0
            body[body_size] = '\0';
368
0
        }
369
0
    }
370
371
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] final increased %zu", body_size);
372
373
0
    *out_buf = body;
374
0
    *out_size = body_size;
375
376
0
    return 0;
377
0
}
378
379
/**
380
 * Ingest all data chunks from the file storage streams into Azure Kusto.
381
 *
382
 * This function iterates over all file storage streams associated with the
383
 * given Azure Kusto context. For each
384
 * file in the stream, it checks if the file (chunk) is locked or has exceeded
385
 * the maximum number of retry attempts. If the chunk is eligible for processing,
386
 * it constructs a request buffer from the chunk data, optionally compresses
387
 * the payload, and attempts to ingest it into Azure Kusto.
388
 *
389
 * The function performs the following steps:
390
 * 1. Iterate over each file storage stream in the context.
391
 * 2. For each file in the stream, check if it is locked or has exceeded
392
 *    the maximum retry attempts. If so, skip processing.
393
 * 3. Construct a request buffer from the chunk data.
394
 * 4. Create a payload from the buffer and optionally compress it if
395
 *    compression is enabled.
396
 * 5. Load the necessary ingestion resources for Azure Kusto.
397
 * 6. Attempt to ingest the payload into Azure Kusto using queued ingestion.
398
 * 7. If ingestion is successful, clean up the local buffer file.
399
 * 8. Handle errors by unlocking the chunk, incrementing failure counts,
400
 *    and logging appropriate error messages.
401
 *
402
 * @param ctx    Pointer to the Azure Kusto context containing configuration
403
 *               and state information.
404
 * @param config Pointer to the Fluent Bit configuration structure.
405
 *
406
 * @return 0 on success, or -1 on failure.
407
 */
408
static int ingest_all_chunks(struct flb_azure_kusto *ctx, struct flb_config *config)
409
0
{
410
0
    struct azure_kusto_file *chunk;
411
0
    struct mk_list *tmp;
412
0
    struct mk_list *head;
413
0
    struct mk_list *f_head;
414
0
    struct flb_fstore_file *fsf;
415
0
    struct flb_fstore_stream *fs_stream;
416
0
    flb_sds_t payload = NULL;
417
0
    void *final_payload = NULL;
418
0
    size_t final_payload_size = 0;
419
0
    char *buffer = NULL;
420
0
    size_t buffer_size;
421
0
    int ret;
422
0
    int is_compressed = FLB_FALSE;
423
0
    flb_sds_t tag_sds;
424
425
0
    mk_list_foreach_safe(head, tmp, &ctx->fs->streams) {
426
0
        fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
427
0
        if (fs_stream == ctx->stream_upload) {
428
0
            continue;
429
0
        }
430
431
0
        mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
432
0
            fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
433
0
            chunk = fsf->data;
434
435
            /* Locked chunks are being processed, skip */
436
0
            if (chunk->locked == FLB_TRUE) {
437
0
                continue;
438
0
            }
439
440
0
            if (chunk->failures >= ctx->scheduler_max_retries) {
441
0
                flb_plg_warn(ctx->ins,
442
0
                             "ingest_all_old_buffer_files :: Chunk for tag %s failed to send %i times, "
443
0
                             "will not retry",
444
0
                             (char *) fsf->meta_buf, ctx->scheduler_max_retries);
445
0
                if (ctx->delete_on_max_upload_error){
446
0
                    azure_kusto_store_file_delete(ctx, chunk);
447
0
                }
448
0
                else{
449
0
                    azure_kusto_store_file_inactive(ctx, chunk);
450
0
                }
451
0
                continue;
452
0
            }
453
454
0
            ret = construct_request_buffer(ctx, NULL, chunk,
455
0
                                           &buffer, &buffer_size);
456
0
            if (ret < 0) {
457
0
                flb_plg_error(ctx->ins,
458
0
                              "ingest_all_old_buffer_files :: Could not construct request buffer for %s",
459
0
                              chunk->file_path);
460
0
                return -1;
461
0
            }
462
463
0
            payload = flb_sds_create_len(buffer, buffer_size);
464
0
            tag_sds = flb_sds_create(fsf->meta_buf);
465
0
            flb_free(buffer);
466
467
            /* Compress the JSON payload */
468
0
            if (ctx->compression_enabled == FLB_TRUE) {
469
0
                ret = flb_gzip_compress((void *) payload, flb_sds_len(payload),
470
0
                                        &final_payload, &final_payload_size);
471
0
                if (ret != 0) {
472
0
                    flb_plg_error(ctx->ins,
473
0
                                  "ingest_all_old_buffer_files :: cannot gzip payload");
474
0
                    flb_sds_destroy(payload);
475
0
                    flb_sds_destroy(tag_sds);
476
0
                    return -1;
477
0
                }
478
0
                else {
479
0
                    is_compressed = FLB_TRUE;
480
0
                    flb_plg_debug(ctx->ins, "ingest_all_old_buffer_files :: enabled payload gzip compression");
481
0
                }
482
0
            }
483
0
            else {
484
0
                final_payload = payload;
485
0
                final_payload_size = flb_sds_len(payload);
486
0
            }
487
488
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
489
0
            if (ret != 0) {
490
0
                flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: cannot load ingestion resources");
491
0
                return -1;
492
0
            }
493
494
            /* Call azure_kusto_queued_ingestion to ingest the payload */
495
0
            ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, chunk);
496
0
            if (ret != 0) {
497
0
                flb_plg_error(ctx->ins, "ingest_all_old_buffer_files :: Failed to ingest data to Azure Kusto");
498
0
                if (chunk){
499
0
                    azure_kusto_store_file_unlock(chunk);
500
0
                    chunk->failures += 1;
501
0
                }
502
0
                flb_sds_destroy(tag_sds);
503
0
                flb_sds_destroy(payload);
504
0
                if (is_compressed) {
505
0
                    flb_free(final_payload);
506
0
                }
507
0
                return -1;
508
0
            }
509
510
0
            flb_sds_destroy(tag_sds);
511
0
            flb_sds_destroy(payload);
512
0
            if (is_compressed) {
513
0
                flb_free(final_payload);
514
0
            }
515
516
            /* data was sent successfully- delete the local buffer */
517
0
            azure_kusto_store_file_cleanup(ctx, chunk);
518
0
        }
519
0
    }
520
521
0
    return 0;
522
0
}
523
524
/**
525
 * cb_azure_kusto_ingest - Callback function for ingesting data to Azure Kusto.
526
 *
527
 * Parameters:
528
 * @config: Pointer to the Fluent Bit configuration context.
529
 * @data: Pointer to the Kusto plugin context, which contains configuration and
530
 *        state information for the ingestion process.
531
 *
532
 * The function performs the following steps:
533
 * 1. Initializes a random seed for staggered refresh intervals.
534
 * 2. Logs the start of the upload timer callback.
535
 * 3. Iterates over all files in the active stream.
536
 * 4. Checks if each file has timed out and skips those that haven't.
537
 * 5. Skips files that are currently locked.
538
 * 6. For each eligible file, enters a retry loop to handle ingestion attempts:
539
 *    a. Constructs the request buffer for the file.
540
 *    b. Compresses the payload if compression is enabled.
541
 *    c. Loads necessary ingestion resources.
542
 *    d. Performs the queued ingestion to Azure Kusto.
543
 *    e. Deletes the file upon successful ingestion.
544
 * 7. Implements exponential backoff with jitter for retries.
545
 * 8. Logs errors and warnings for failed operations and retries.
546
 * 9. If the maximum number of retries is reached, logs an error and either
547
 *    deletes or marks the file as inactive based on configuration.
548
 * 10. Logs the end of the upload timer callback.
549
 */
550
static void cb_azure_kusto_ingest(struct flb_config *config, void *data)
551
0
{
552
0
    struct flb_azure_kusto *ctx = data;
553
0
    struct azure_kusto_file *file = NULL;
554
0
    struct flb_fstore_file *fsf;
555
0
    char *buffer = NULL;
556
0
    size_t buffer_size = 0;
557
0
    void *final_payload = NULL;
558
0
    size_t final_payload_size = 0;
559
0
    struct mk_list *tmp;
560
0
    struct mk_list *head;
561
0
    int ret;
562
0
    time_t now;
563
0
    flb_sds_t payload;
564
0
    flb_sds_t tag_sds;
565
0
    int is_compressed = FLB_FALSE;
566
0
    int retry_count;
567
0
    int backoff_time;
568
0
    int max_backoff_time = 64; /* Maximum backoff time in seconds */
569
570
    /* Initialize random seed for staggered refresh intervals */
571
0
    srand(time(NULL));
572
573
    /* Log the start of the upload timer callback */
574
0
    flb_plg_debug(ctx->ins, "Running upload timer callback (scheduler_kusto_ingest)..");
575
0
    now = time(NULL);
576
577
    /* Iterate over all files in the active stream */
578
0
    mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
579
0
        fsf = mk_list_entry(head, struct flb_fstore_file, _head);
580
0
        file = fsf->data;
581
0
        flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Iterating files inside upload timer callback (cb_azure_kusto_ingest).. %s", file->fsf->name);
582
583
        /* Check if the file has timed out */
584
0
        if (now < (file->create_time + ctx->upload_timeout + ctx->retry_time)) {
585
0
            continue; /* Skip files that haven't timed out */
586
0
        }
587
588
0
        flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before file locked check %s", file->fsf->name);
589
590
        /* Skip locked files */
591
0
        if (file->locked == FLB_TRUE) {
592
0
            continue;
593
0
        }
594
595
0
        retry_count = 0;
596
0
        backoff_time = 2; /* Initial backoff time in seconds */
597
598
        /* Retry loop for handling retries */
599
0
        while (retry_count < ctx->scheduler_max_retries) {
600
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: Before construct_request_buffer %s", file->fsf->name);
601
602
            /* Construct the request buffer */
603
0
            ret = construct_request_buffer(ctx, NULL, file, &buffer, &buffer_size);
604
0
            if (ret < 0) {
605
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Could not construct request buffer for %s", file->fsf->name);
606
0
                retry_count++;
607
                /* Add jitter: random value between 0 and backoff_time */
608
0
                int jitter = rand() % backoff_time;
609
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
610
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
611
0
                continue; /* Retry on failure */
612
0
            }
613
614
0
            payload = flb_sds_create_len(buffer, buffer_size);
615
0
            tag_sds = flb_sds_create(fsf->meta_buf);
616
617
            /* Compress the JSON payload if compression is enabled */
618
0
            if (ctx->compression_enabled == FLB_TRUE) {
619
0
                ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), &final_payload, &final_payload_size);
620
0
                if (ret != 0) {
621
0
                    flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot gzip payload");
622
0
                    flb_free(buffer);
623
0
                    flb_sds_destroy(payload);
624
0
                    flb_sds_destroy(tag_sds);
625
0
                    retry_count++;
626
0
                    if (file){
627
0
                        azure_kusto_store_file_unlock(file);
628
0
                        file->failures += 1;
629
0
                    }
630
                    /* Add jitter: random value between 0 and backoff_time */
631
0
                    int jitter = rand() % backoff_time;
632
0
                    flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: failed while compressing payload :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
633
0
                                 backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
634
0
                    sleep(backoff_time + jitter); /* Exponential backoff with jitter */
635
0
                    backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
636
0
                    continue; /* Retry on failure */
637
0
                }
638
0
                else {
639
0
                    is_compressed = FLB_TRUE;
640
0
                    flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: enabled payload gzip compression");
641
0
                }
642
0
            }
643
0
            else {
644
0
                final_payload = payload;
645
0
                final_payload_size = flb_sds_len(payload);
646
0
            }
647
648
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: tag of the file %s", tag_sds);
649
650
            /* Load ingestion resources */
651
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
652
0
            if (ret != 0) {
653
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: cannot load ingestion resources");
654
655
                /* Free allocated resources */
656
0
                flb_free(buffer);
657
0
                flb_sds_destroy(payload);
658
0
                flb_sds_destroy(tag_sds);
659
0
                if (is_compressed) {
660
0
                    flb_free(final_payload);
661
0
                }
662
663
0
                retry_count++;
664
0
                if (file){
665
0
                    azure_kusto_store_file_unlock(file);
666
0
                    file->failures += 1;
667
0
                }
668
                /* Add jitter: random value between 0 and backoff_time */
669
0
                int jitter = rand() % backoff_time;
670
0
                flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error loading ingestion resources :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
671
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
672
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
673
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
674
0
                continue; /* Retry on failure */
675
0
            }
676
677
0
            flb_plg_debug(ctx->ins, "scheduler_kusto_ingest ::: before starting kusto queued ingestion %s", file->fsf->name);
678
679
            /* Perform the queued ingestion */
680
0
            ret = azure_kusto_queued_ingestion(ctx, tag_sds, flb_sds_len(tag_sds), final_payload, final_payload_size, NULL);
681
0
            if (ret != 0) {
682
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest: Failed to ingest data to kusto");
683
684
                /* Free allocated resources */
685
0
                flb_free(buffer);
686
0
                flb_sds_destroy(payload);
687
0
                flb_sds_destroy(tag_sds);
688
0
                if (is_compressed) {
689
0
                    flb_free(final_payload);
690
0
                }
691
692
0
                retry_count++;
693
0
                if (file){
694
0
                    azure_kusto_store_file_unlock(file);
695
0
                    file->failures += 1;
696
0
                }
697
                /* Add jitter: random value between 0 and backoff_time */
698
0
                int jitter = rand() % backoff_time;
699
0
                flb_plg_warn(ctx->ins, "scheduler_kusto_ingest :: error while ingesting to kusto :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
700
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
701
0
                sleep(backoff_time + jitter); /* Exponential backoff with jitter */
702
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time; /* Double the backoff time, but cap it */
703
0
                continue; /* Retry on failure */
704
0
            }
705
706
            /* Delete the file after successful ingestion */
707
0
            ret = azure_kusto_store_file_delete(ctx, file);
708
0
            if (ret == 0) {
709
0
                flb_plg_debug(ctx->ins, "scheduler_kusto_ingest :: deleted successfully ingested file");
710
0
            }
711
0
            else {
712
0
                flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: failed to delete ingested file %s", fsf->name);
713
0
                if (file){
714
0
                    azure_kusto_store_file_unlock(file);
715
0
                    file->failures += 1;
716
0
                }
717
0
            }
718
719
            /* Free allocated resources */
720
0
            flb_free(buffer);
721
0
            flb_sds_destroy(payload);
722
0
            flb_sds_destroy(tag_sds);
723
0
            if (is_compressed) {
724
0
                flb_free(final_payload);
725
0
            }
726
727
            /* If all operations succeed, break out of the retry loop */
728
0
            break;
729
0
        }
730
731
        /* If the maximum number of retries is reached, log an error and move to the next file */
732
0
        if (retry_count >= ctx->scheduler_max_retries) {
733
0
            flb_plg_error(ctx->ins, "scheduler_kusto_ingest :: Max retries reached for file %s", file->fsf->name);
734
0
            if (ctx->delete_on_max_upload_error){
735
0
                azure_kusto_store_file_delete(ctx, file);
736
0
            }
737
0
            else {
738
0
                azure_kusto_store_file_inactive(ctx, file);
739
0
            }
740
0
        }
741
0
    }
742
    /* Log the end of the upload timer callback */
743
0
    flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_kusto_ingest)..");
744
0
}
745
746
747
/**
748
 * Ingest data to Azure Kusto
749
 *
750
 * This function is responsible for preparing and sending data to Azure Kusto for ingestion.
751
 * It constructs a request buffer from the provided data, optionally compresses the payload,
752
 * and then sends it to Azure Kusto using a queued ingestion method.
753
 *
754
 * Parameters:
755
 * - out_context: A pointer to the output context, which is expected to be of type `struct flb_azure_kusto`.
756
 * - new_data: The new data to be ingested, represented as a flexible string descriptor (flb_sds_t).
757
 * - upload_file: A pointer to an `azure_kusto_file` structure that contains information about the file to be uploaded.
758
 * - tag: A constant character pointer representing the tag associated with the data.
759
 * - tag_len: An integer representing the length of the tag.
760
 *
761
 * Returns:
762
 * - 0 on successful ingestion.
763
 * - -1 if an error occurs during buffer construction, compression, or ingestion.
764
 *
765
 * The function performs the following steps:
766
 * 1. Constructs a request buffer from the provided data and upload file information.
767
 * 2. Creates a payload from the buffer and frees the buffer memory.
768
 * 3. Optionally compresses the payload using gzip if compression is enabled in the context.
769
 * 4. Calls the `azure_kusto_queued_ingestion` function to send the payload to Azure Kusto.
770
 * 5. Cleans up allocated resources, including destroying the payload and tag strings, and freeing the compressed payload if applicable.
771
 */
772
static int ingest_to_kusto(void *out_context, flb_sds_t new_data,
773
                               struct azure_kusto_file *upload_file,
774
                               const char *tag, int tag_len)
775
0
{
776
0
    int ret;
777
0
    char *buffer = NULL;
778
0
    size_t buffer_size;
779
0
    struct flb_azure_kusto *ctx = out_context;
780
0
    flb_sds_t payload = NULL;
781
0
    void *final_payload = NULL;
782
0
    size_t final_payload_size = 0;
783
0
    int is_compressed = FLB_FALSE;
784
0
    flb_sds_t tag_sds = flb_sds_create_len(tag, tag_len);
785
786
    /* Create buffer */
787
0
    ret = construct_request_buffer(ctx, new_data, upload_file, &buffer, &buffer_size);
788
0
    if (ret < 0) {
789
0
        flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
790
0
                      upload_file->fsf->name);
791
0
        return -1;
792
0
    }
793
0
    payload = flb_sds_create_len(buffer, buffer_size);
794
0
    if (!payload) {
795
0
        flb_plg_error(ctx->ins, "Could not create payload SDS");
796
0
        flb_free(buffer);
797
0
        return -1;
798
0
    }
799
0
    flb_free(buffer);
800
801
    /* Compress the JSON payload */
802
0
    if (ctx->compression_enabled == FLB_TRUE) {
803
0
        ret = flb_gzip_compress((void *) payload, flb_sds_len(payload),
804
0
                                &final_payload, &final_payload_size);
805
0
        if (ret != 0) {
806
0
            flb_plg_error(ctx->ins,
807
0
                          "cannot gzip payload");
808
0
            flb_sds_destroy(payload);
809
0
            flb_sds_destroy(tag_sds);
810
0
            return -1;
811
0
        }
812
0
        else {
813
0
            is_compressed = FLB_TRUE;
814
0
            flb_plg_debug(ctx->ins, "enabled payload gzip compression");
815
            /* JSON buffer will be cleared at cleanup: */
816
0
        }
817
0
    }
818
0
    else {
819
0
        final_payload = payload;
820
0
        final_payload_size = flb_sds_len(payload);
821
0
    }
822
823
    /* Call azure_kusto_queued_ingestion to ingest the payload */
824
0
    ret = azure_kusto_queued_ingestion(ctx, tag_sds, tag_len, final_payload, final_payload_size, upload_file);
825
0
    if (ret != 0) {
826
0
        flb_plg_error(ctx->ins, "Failed to ingest data to Azure Kusto");
827
0
        flb_sds_destroy(tag_sds);
828
0
        flb_sds_destroy(payload);
829
0
        if (is_compressed) {
830
0
            flb_free(final_payload);
831
0
        }
832
0
        return -1;
833
0
    }
834
835
0
    flb_sds_destroy(tag_sds);
836
0
    flb_sds_destroy(payload);
837
0
    if (is_compressed) {
838
0
        flb_free(final_payload);
839
0
    }
840
841
0
    return 0;
842
0
}
843
844
/**
845
 * Initializes the Azure Kusto output plugin.
846
 *
847
 * This function sets up the necessary configurations and resources for the Azure Kusto
848
 * output plugin to function correctly. It performs the following tasks:
849
 *
850
 * 1. Creates a configuration context for the plugin using the provided instance and config.
851
 * 2. Initializes local storage if buffering is enabled, ensuring that the storage directory
852
 *    is set up and any existing buffered data is accounted for.
853
 * 3. Validates the configured file size for uploads, ensuring it meets the minimum and
854
 *    maximum constraints.
855
 * 4. Sets up network configurations, including enabling IPv6 if specified.
856
 * 5. Initializes mutexes for thread-safe operations related to OAuth tokens and resource
857
 *    management.
858
 * 6. Creates an upstream context for connecting to the Kusto Ingestion endpoint, configuring
859
 *    it for synchronous or asynchronous operation based on buffering settings.
860
 * 7. If IMDS (Instance Metadata Service) is used, creates an upstream context for it.
861
 * 8. Establishes an OAuth2 context for handling authentication with Azure services.
862
 * 9. Associates the upstream context with the output instance for data transmission.
863
 *
864
 * The function returns 0 on successful initialization or -1 if any step fails.
865
 *
866
 * @param ins    The output instance to initialize.
867
 * @param config The configuration context for Fluent Bit.
868
 * @param data   Additional data passed to the initialization function.
869
 *
870
 * @return 0 on success, -1 on failure.
871
 */
872
static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_config *config,
873
                               void *data)
874
0
{
875
0
    int io_flags = FLB_IO_TLS;
876
0
    struct flb_azure_kusto *ctx;
877
878
0
    flb_plg_debug(ins, "inside azure kusto init");
879
880
    /* Create config context */
881
0
    ctx = flb_azure_kusto_conf_create(ins, config);
882
0
    if (!ctx) {
883
0
        flb_plg_error(ins, "configuration failed");
884
0
        return -1;
885
0
    }
886
887
0
    if (ctx->buffering_enabled == FLB_TRUE) {
888
0
        ctx->ins = ins;
889
0
        ctx->retry_time = 0;
890
891
        /* Initialize local storage */
892
0
        int ret = azure_kusto_store_init(ctx);
893
0
        if (ret == -1) {
894
0
            flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s",
895
0
                          ctx->store_dir);
896
0
            return -1;
897
0
        }
898
0
        ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
899
900
        /* validate 'total_file_size' */
901
0
        if (ctx->file_size <= 0) {
902
0
            flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
903
0
            return -1;
904
0
        }
905
0
        if (ctx->file_size < 1000000) {
906
0
            flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
907
0
            return -1;
908
0
        }
909
0
        if (ctx->file_size > MAX_FILE_SIZE) {
910
0
            flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
911
0
            return -1;
912
0
        }
913
914
0
        ctx->timer_created = FLB_FALSE;
915
0
        ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
916
0
        flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
917
0
    }
918
919
0
    flb_output_set_context(ins, ctx);
920
921
    /* Network mode IPv6 */
922
0
    if (ins->host.ipv6 == FLB_TRUE) {
923
0
        io_flags |= FLB_IO_IPV6;
924
0
    }
925
926
    /* Create mutex for acquiring oauth tokens  and getting ingestion resources (they
927
     * are shared across flush coroutines)
928
     */
929
0
    pthread_mutex_init(&ctx->token_mutex, NULL);
930
0
    pthread_mutex_init(&ctx->resources_mutex, NULL);
931
0
    pthread_mutex_init(&ctx->blob_mutex, NULL);
932
933
    /*
934
     * Create upstream context for Kusto Ingestion endpoint
935
     */
936
0
    ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
937
0
    if (ctx->buffering_enabled ==  FLB_TRUE){
938
0
        flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC);
939
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
940
0
        ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
941
0
    }
942
0
    if (!ctx->u) {
943
0
        flb_plg_error(ctx->ins, "upstream creation failed");
944
0
        return -1;
945
0
    }
946
947
0
    flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));
948
949
    /* Create oauth2 context */
950
0
    ctx->o =
951
0
        flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
952
0
    if (!ctx->o) {
953
0
        flb_plg_error(ctx->ins, "cannot create oauth2 context");
954
0
        return -1;
955
0
    }
956
0
    flb_output_upstream_set(ctx->u, ins);
957
958
0
    flb_plg_debug(ctx->ins, "azure kusto init completed");
959
960
0
    return 0;
961
0
}
962
963
964
/**
965
     * This function formats log data for Azure Kusto ingestion.
966
     * It processes a batch of log records, encodes them in a specific format,
967
     * and outputs the formatted data.
968
     *
969
     * Parameters:
970
     * - ctx: Context containing configuration and state for Azure Kusto.
971
     * - tag: A string tag associated with the log data.
972
     * - tag_len: Length of the tag string.
973
     * - data: Pointer to the raw log data in msgpack format.
974
     * - bytes: Size of the raw log data.
975
     * - out_data: Pointer to store the formatted output data.
976
     * - out_size: Pointer to store the size of the formatted output data.
977
     *
978
     * Returns:
979
     * - 0 on success, or -1 on error.
980
     */
981
static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int tag_len,
982
                              const void *data, size_t bytes, void **out_data,
983
                              size_t *out_size,
984
                              struct flb_config *config)
985
0
{
986
0
    int index;
987
0
    int records = 0;
988
0
    msgpack_sbuffer mp_sbuf;
989
0
    msgpack_packer mp_pck;
990
0
    struct tm tms;
991
0
    char time_formatted[32];
992
0
    size_t s;
993
0
    int len;
994
0
    struct flb_log_event_decoder log_decoder;
995
0
    struct flb_log_event log_event;
996
0
    int ret;
997
0
    flb_sds_t out_buf;
998
999
    /* Create array for all records */
1000
0
    records = flb_mp_count(data, bytes);
1001
0
    if (records <= 0) {
1002
0
        flb_plg_error(ctx->ins, "error counting msgpack entries");
1003
0
        return -1;
1004
0
    }
1005
1006
0
    ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
1007
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
1008
0
        flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret);
1009
0
        return -1;
1010
0
    }
1011
1012
    /* Initialize the output buffer */
1013
0
    out_buf = flb_sds_create_size(1024);
1014
0
    if (!out_buf) {
1015
0
        flb_plg_error(ctx->ins, "error creating output buffer");
1016
0
        flb_log_event_decoder_destroy(&log_decoder);
1017
0
        return -1;
1018
0
    }
1019
1020
    /* Create temporary msgpack buffer */
1021
0
    msgpack_sbuffer_init(&mp_sbuf);
1022
0
    msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1023
1024
0
    while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) {
1025
0
        msgpack_sbuffer_clear(&mp_sbuf);
1026
1027
0
        int map_size = 1;
1028
0
        if (ctx->include_time_key == FLB_TRUE) {
1029
0
            map_size++;
1030
0
        }
1031
0
        if (ctx->include_tag_key == FLB_TRUE) {
1032
0
            map_size++;
1033
0
        }
1034
1035
0
        msgpack_pack_map(&mp_pck, map_size);
1036
1037
        /* include_time_key */
1038
0
        if (ctx->include_time_key == FLB_TRUE) {
1039
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->time_key));
1040
0
            msgpack_pack_str_body(&mp_pck, ctx->time_key, flb_sds_len(ctx->time_key));
1041
1042
0
            gmtime_r(&log_event.timestamp.tm.tv_sec, &tms);
1043
0
            s = strftime(time_formatted, sizeof(time_formatted) - 1, FLB_PACK_JSON_DATE_ISO8601_FMT, &tms);
1044
0
            len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z",
1045
0
                    (uint64_t) log_event.timestamp.tm.tv_nsec / 1000000);
1046
0
            s += len;
1047
0
            msgpack_pack_str(&mp_pck, s);
1048
0
            msgpack_pack_str_body(&mp_pck, time_formatted, s);
1049
0
        }
1050
1051
        /* include_tag_key */
1052
0
        if (ctx->include_tag_key == FLB_TRUE) {
1053
0
            msgpack_pack_str(&mp_pck, flb_sds_len(ctx->tag_key));
1054
0
            msgpack_pack_str_body(&mp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key));
1055
0
            msgpack_pack_str(&mp_pck, tag_len);
1056
0
            msgpack_pack_str_body(&mp_pck, tag, tag_len);
1057
0
        }
1058
1059
0
        msgpack_pack_str(&mp_pck, flb_sds_len(ctx->log_key));
1060
0
        msgpack_pack_str_body(&mp_pck, ctx->log_key, flb_sds_len(ctx->log_key));
1061
1062
0
        if (log_event.group_attributes != NULL && log_event.body != NULL) {
1063
0
            msgpack_pack_map(&mp_pck,
1064
0
                                 log_event.group_attributes->via.map.size +
1065
0
                                 log_event.metadata->via.map.size +
1066
0
                                 log_event.body->via.map.size);
1067
1068
0
            for (index = 0; index < log_event.group_attributes->via.map.size; index++) { 
1069
0
                msgpack_pack_object(&mp_pck, log_event.group_attributes->via.map.ptr[index].key);
1070
0
                msgpack_pack_object(&mp_pck, log_event.group_attributes->via.map.ptr[index].val);
1071
0
            }
1072
1073
0
            for (index = 0; index < log_event.metadata->via.map.size; index++) {
1074
0
                msgpack_pack_object(&mp_pck, log_event.metadata->via.map.ptr[index].key);
1075
0
                msgpack_pack_object(&mp_pck, log_event.metadata->via.map.ptr[index].val);
1076
0
            }
1077
1078
0
            for (index = 0; index < log_event.body->via.map.size; index++) {
1079
0
                msgpack_pack_object(&mp_pck, log_event.body->via.map.ptr[index].key);
1080
0
                msgpack_pack_object(&mp_pck, log_event.body->via.map.ptr[index].val);
1081
0
            }
1082
0
        }
1083
0
        else if (log_event.body != NULL) {
1084
0
            msgpack_pack_object(&mp_pck, *log_event.body);
1085
0
        }
1086
0
        else {
1087
0
            msgpack_pack_str(&mp_pck, 20);
1088
0
            msgpack_pack_str_body(&mp_pck, "log_attribute_missing", 20);
1089
0
        }
1090
1091
0
        flb_sds_t json_record = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size,
1092
0
                                                            config->json_escape_unicode);
1093
0
        if (!json_record) {
1094
0
            flb_plg_error(ctx->ins, "error converting msgpack to JSON");
1095
0
            flb_sds_destroy(out_buf);
1096
0
            msgpack_sbuffer_destroy(&mp_sbuf);
1097
0
            flb_log_event_decoder_destroy(&log_decoder);
1098
0
            return -1;
1099
0
        }
1100
1101
        /* Concatenate the JSON record to the output buffer */
1102
0
        out_buf = flb_sds_cat(out_buf, json_record, flb_sds_len(json_record));
1103
0
        out_buf = flb_sds_cat(out_buf, "\n", 1);
1104
1105
0
        flb_sds_destroy(json_record);
1106
0
    }
1107
1108
0
    msgpack_sbuffer_destroy(&mp_sbuf);
1109
0
    flb_log_event_decoder_destroy(&log_decoder);
1110
1111
0
    *out_data = out_buf;
1112
0
    *out_size = flb_sds_len(out_buf);
1113
1114
0
    return 0;
1115
0
}
1116
1117
static int buffer_chunk(void *out_context, struct azure_kusto_file *upload_file,
1118
                        flb_sds_t chunk, int chunk_size,
1119
                        flb_sds_t tag, size_t tag_len)
1120
0
{
1121
0
    int ret;
1122
0
    struct flb_azure_kusto *ctx = out_context;
1123
1124
0
    flb_plg_trace(ctx->ins, "Buffering chunk %d", chunk_size);
1125
1126
0
    ret = azure_kusto_store_buffer_put(ctx, upload_file, tag,
1127
0
                                       tag_len, chunk, chunk_size);
1128
0
    if (ret < 0) {
1129
0
        flb_plg_error(ctx->ins, "Could not buffer chunk. ");
1130
0
        return -1;
1131
0
    }
1132
0
    return 0;
1133
0
}
1134
1135
/**
1136
 * @brief Initialize the flush process for Azure Kusto output plugin.
1137
 *
1138
 * This function is responsible for setting up the initial conditions required
1139
 * for flushing data to Azure Kusto. It performs the following tasks:
1140
 *
1141
 * 1. **Old Buffer Cleanup**: Checks if there are any old buffers from previous
1142
 *    executions that need to be sent to Azure Kusto. If such buffers exist, it
1143
 *    attempts to ingest all chunks of data. If the ingestion fails, it logs an
1144
 *    error and marks the buffers to be retried later.
1145
 *
1146
 * 2. **Upload Timer Setup**: If not already created, it sets up a periodic timer
1147
 *    that checks for uploads ready for completion. This timer is crucial for
1148
 *    ensuring that data is uploaded at regular intervals.
1149
 *
1150
 * @param out_context Pointer to the output context, specifically the Azure Kusto context.
1151
 * @param config Pointer to the Fluent Bit configuration structure.
1152
 */
1153
static void flush_init(void *out_context, struct flb_config *config)
1154
0
{
1155
0
    int ret;
1156
0
    struct flb_azure_kusto *ctx = out_context;
1157
0
    struct flb_sched *sched;
1158
1159
0
    flb_plg_debug(ctx->ins,
1160
0
                  "inside flush_init with old_buffers as %d",
1161
0
                  ctx->has_old_buffers);
1162
1163
    /* clean up any old buffers found on startup */
1164
0
    if (ctx->has_old_buffers == FLB_TRUE) {
1165
0
        flb_plg_info(ctx->ins,
1166
0
                     "Sending locally buffered data from previous "
1167
0
                     "executions to kusto; buffer=%s",
1168
0
                     ctx->fs->root_path);
1169
0
        ctx->has_old_buffers = FLB_FALSE;
1170
0
        ret = ingest_all_chunks(ctx, config);
1171
0
        if (ret < 0) {
1172
0
            ctx->has_old_buffers = FLB_TRUE;
1173
0
            flb_plg_error(ctx->ins,
1174
0
                          "Failed to send locally buffered data left over "
1175
0
                          "from previous executions; will retry. Buffer=%s",
1176
0
                          ctx->fs->root_path);
1177
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1178
0
        }
1179
0
    }
1180
0
    else {
1181
0
        flb_plg_debug(ctx->ins,
1182
0
                     "Did not find any local buffered data from previous "
1183
0
                     "executions to kusto; buffer=%s",
1184
0
                     ctx->fs->root_path);
1185
0
    }
1186
1187
    /*
1188
    * create a timer that will run periodically and check if uploads
1189
    * are ready for completion
1190
    * this is created once on the first flush
1191
    */
1192
0
    if (ctx->timer_created == FLB_FALSE) {
1193
0
        flb_plg_debug(ctx->ins,
1194
0
                      "Creating upload timer with frequency %ds",
1195
0
                      ctx->timer_ms / 1000);
1196
1197
0
        sched = flb_sched_ctx_get();
1198
1199
0
        ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
1200
0
                                        ctx->timer_ms, cb_azure_kusto_ingest, ctx, NULL);
1201
0
        if (ret == -1) {
1202
0
            flb_plg_error(ctx->ins, "Failed to create upload timer");
1203
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1204
0
        }
1205
0
        ctx->timer_created = FLB_TRUE;
1206
0
    }
1207
0
}
1208
1209
/**
1210
 * This function handles the flushing of event data to Azure Kusto.
1211
 * It manages both buffered and non-buffered modes, handles JSON formatting,
1212
 * compression, and manages file uploads based on conditions like timeout and file size.
1213
 *
1214
 * @param event_chunk The event chunk containing the data to be flushed.
1215
 * @param out_flush The output flush context.
1216
 * @param i_ins The input instance (unused).
1217
 * @param out_context The output context, specifically for Azure Kusto.
1218
 * @param config The configuration context (unused).
1219
 */
1220
static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
1221
                                 struct flb_output_flush *out_flush,
1222
                                 struct flb_input_instance *i_ins, void *out_context,
1223
                                 struct flb_config *config)
1224
0
{
1225
0
    int ret;
1226
0
    flb_sds_t json = NULL;
1227
0
    size_t json_size;
1228
0
    size_t tag_len;
1229
0
    struct flb_azure_kusto *ctx = out_context;
1230
0
    int is_compressed = FLB_FALSE;
1231
0
    struct azure_kusto_file *upload_file = NULL;
1232
0
    int upload_timeout_check = FLB_FALSE;
1233
0
    int total_file_size_check = FLB_FALSE;
1234
0
    flb_sds_t tag_name = NULL;
1235
0
    size_t tag_name_len;
1236
1237
0
    (void)i_ins;
1238
0
    (void)config;
1239
1240
0
    void *final_payload = NULL;
1241
0
    size_t final_payload_size = 0;
1242
1243
0
    flb_plg_debug(ctx->ins, "flushing bytes for event tag %s and size %zu", event_chunk->tag ,event_chunk->size);
1244
1245
    /* Get the length of the event tag */
1246
0
    tag_len = flb_sds_len(event_chunk->tag);
1247
1248
0
    if (ctx->buffering_enabled == FLB_TRUE) {
1249
    /* Determine the tag name based on the unify_tag setting */
1250
0
        if (ctx->unify_tag == FLB_TRUE){
1251
0
            tag_name = flb_sds_create("fluentbit-buffer-file-unify-tag.log");
1252
0
        }
1253
0
        else {
1254
0
            tag_name = event_chunk->tag;
1255
0
        }
1256
0
        tag_name_len = flb_sds_len(tag_name);
1257
        /* Initialize the flush process */
1258
0
        flush_init(ctx,config);
1259
1260
        /* Reformat msgpack to JSON payload */
1261
0
        ret = azure_kusto_format(ctx, tag_name, tag_name_len, event_chunk->data,
1262
0
                                 event_chunk->size, (void **)&json, &json_size,
1263
0
                                 config);
1264
0
        if (ret != 0) {
1265
0
            flb_plg_error(ctx->ins, "cannot reformat data into json");
1266
0
            ret = FLB_RETRY;
1267
0
            goto error;
1268
0
        }
1269
1270
        /* Get a file candidate matching the given 'tag' */
1271
0
        upload_file = azure_kusto_store_file_get(ctx,
1272
0
                                                 tag_name,
1273
0
                                                 tag_name_len);
1274
1275
        /* Check if the file has failed to upload too many times */
1276
0
        if (upload_file != NULL && upload_file->failures >= ctx->scheduler_max_retries) {
1277
0
            flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
1278
0
                                   "retry", event_chunk->tag, ctx->scheduler_max_retries);
1279
0
            if (ctx->delete_on_max_upload_error){
1280
0
                azure_kusto_store_file_delete(ctx, upload_file);
1281
0
            }
1282
0
            else {
1283
0
                azure_kusto_store_file_inactive(ctx, upload_file);
1284
0
            }
1285
0
            upload_file = NULL;
1286
0
        }
1287
1288
        /* Check if the upload timeout has elapsed */
1289
0
        if (upload_file != NULL && time(NULL) >
1290
0
                                   (upload_file->create_time + ctx->upload_timeout)) {
1291
0
            upload_timeout_check = FLB_TRUE;
1292
0
            flb_plg_trace(ctx->ins, "upload_timeout reached for %s",
1293
0
                          event_chunk->tag);
1294
0
        }
1295
1296
        /* Check if the total file size has been exceeded */
1297
0
        if (upload_file && upload_file->size + json_size > ctx->file_size) {
1298
0
            flb_plg_trace(ctx->ins, "total_file_size exceeded %s",
1299
0
                          event_chunk->tag);
1300
0
            total_file_size_check = FLB_TRUE;
1301
0
        }
1302
1303
        /* If the file is ready for upload */
1304
0
        if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
1305
0
            flb_plg_debug(ctx->ins, "uploading file %s with size %zu", upload_file->fsf->name, upload_file->size);
1306
            /* Load or refresh ingestion resources */
1307
0
            ret = azure_kusto_load_ingestion_resources(ctx, config);
1308
0
            if (ret != 0) {
1309
0
                flb_plg_error(ctx->ins, "cannot load ingestion resources");
1310
0
                ret = FLB_RETRY;
1311
0
                goto error;
1312
0
            }
1313
1314
            /* Ingest data to kusto */
1315
0
            ret = ingest_to_kusto(ctx, json, upload_file,
1316
0
                                      tag_name,
1317
0
                                      tag_name_len);
1318
1319
0
            if (ret == 0){
1320
0
                if (ctx->buffering_enabled == FLB_TRUE && ctx->buffer_file_delete_early == FLB_TRUE){
1321
0
                    flb_plg_debug(ctx->ins, "buffer file already deleted after blob creation");
1322
0
                    ret = FLB_OK;
1323
0
                    goto cleanup;
1324
0
                }
1325
0
                else{
1326
0
                    ret = azure_kusto_store_file_delete(ctx, upload_file);
1327
0
                    if (ret != 0){
1328
                        /* File couldn't be deleted */
1329
0
                        ret = FLB_RETRY;
1330
0
                        if (upload_file){
1331
0
                            azure_kusto_store_file_unlock(upload_file);
1332
0
                            upload_file->failures += 1;
1333
0
                        }
1334
0
                        goto error;
1335
0
                    }
1336
0
                    else{
1337
                        /* File deleted successfully */
1338
0
                        ret = FLB_OK;
1339
0
                        goto cleanup;
1340
0
                    }
1341
0
                }
1342
0
            }
1343
0
            else{
1344
0
                flb_plg_error(ctx->ins, "azure_kusto:: unable to ingest data into kusto : retrying");
1345
0
                ret = FLB_RETRY;
1346
0
                if (upload_file){
1347
0
                    azure_kusto_store_file_unlock(upload_file);
1348
0
                    upload_file->failures += 1;
1349
0
                }
1350
0
                goto cleanup;
1351
0
            }
1352
0
        }
1353
1354
        /* Buffer the current chunk in the filesystem */
1355
0
        ret = buffer_chunk(ctx, upload_file, json, json_size,
1356
0
                           tag_name, tag_name_len);
1357
1358
0
        if (ret == 0) {
1359
0
            flb_plg_debug(ctx->ins, "buffered chunk %s", event_chunk->tag);
1360
0
            ret = FLB_OK;
1361
0
        }
1362
0
        else {
1363
0
            flb_plg_error(ctx->ins, "failed to buffer chunk %s", event_chunk->tag);
1364
0
            ret = FLB_RETRY;
1365
0
        }
1366
0
        goto cleanup;
1367
1368
0
    }
1369
0
    else {
1370
        /* Buffering mode is disabled, proceed with regular flush */
1371
1372
        /* Reformat msgpack data to JSON payload */
1373
0
        ret = azure_kusto_format(ctx, event_chunk->tag, tag_len, event_chunk->data,
1374
0
                                 event_chunk->size, (void **)&json, &json_size,
1375
0
                                 config);
1376
0
        if (ret != 0) {
1377
0
            flb_plg_error(ctx->ins, "cannot reformat data into json");
1378
0
            ret = FLB_RETRY;
1379
0
            goto error;
1380
0
        }
1381
1382
0
        flb_plg_debug(ctx->ins, "payload size before compression %zu", json_size);
1383
        /* Map buffer */
1384
0
        final_payload = json;
1385
0
        final_payload_size = json_size;
1386
        /* Check if compression is enabled */
1387
0
        if (ctx->compression_enabled == FLB_TRUE) {
1388
0
            ret = flb_gzip_compress((void *) json, json_size,
1389
0
                                    &final_payload, &final_payload_size);
1390
0
            if (ret != 0) {
1391
0
                flb_plg_error(ctx->ins,
1392
0
                              "cannot gzip payload");
1393
0
                ret = FLB_ERROR;
1394
0
                goto error;
1395
0
            }
1396
0
            else {
1397
0
                is_compressed = FLB_TRUE;
1398
0
                flb_plg_debug(ctx->ins, "enabled payload gzip compression");
1399
                /* JSON buffer will be cleared at cleanup: */
1400
0
            }
1401
0
        }
1402
0
        flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);
1403
1404
        /* Load or refresh ingestion resources */
1405
0
        ret = azure_kusto_load_ingestion_resources(ctx, config);
1406
0
        flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
1407
0
        if (ret != 0) {
1408
0
            flb_plg_error(ctx->ins, "cannot load ingestion resources");
1409
0
            ret = FLB_RETRY;
1410
0
            goto error;
1411
0
        }
1412
1413
        /* Perform queued ingestion to Kusto */
1414
0
        ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
1415
0
        flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
1416
0
        if (ret != 0) {
1417
0
            flb_plg_error(ctx->ins, "cannot perform queued ingestion");
1418
0
            ret = FLB_RETRY;
1419
0
            goto error;
1420
0
        }
1421
1422
0
        ret = FLB_OK;
1423
0
        goto cleanup;
1424
0
    }
1425
1426
0
    cleanup:
1427
    /* Cleanup resources */
1428
0
    if (json) {
1429
0
        flb_sds_destroy(json);
1430
0
    }
1431
0
    if (is_compressed && final_payload) {
1432
0
        flb_free(final_payload);
1433
0
    }
1434
0
    if (tag_name) {
1435
0
        flb_sds_destroy(tag_name);
1436
0
    }
1437
0
    FLB_OUTPUT_RETURN(ret);
1438
1439
0
    error:
1440
    /* Error handling and cleanup */
1441
0
    if (json) {
1442
0
        flb_sds_destroy(json);
1443
0
    } 
1444
0
    if (is_compressed && final_payload) {
1445
0
        flb_free(final_payload);
1446
0
    }
1447
0
    if (tag_name) {
1448
0
        flb_sds_destroy(tag_name);
1449
0
    }
1450
0
    FLB_OUTPUT_RETURN(ret);
1451
0
}
1452
1453
/**
1454
 * cb_azure_kusto_exit - Clean up and finalize the Azure Kusto plugin context.
1455
 *
1456
 * This function is responsible for performing cleanup operations when the
1457
 * Azure Kusto plugin is exiting. It ensures that all resources are properly
1458
 * released and any remaining data is sent to Azure Kusto before the plugin
1459
 * shuts down.
1460
 *
1461
 * Functionality:
1462
 * - Checks if the plugin context (`ctx`) is valid. If not, it returns an error.
1463
 * - If there is locally buffered data, it attempts to send all chunks to Azure
1464
 *   Kusto using the `ingest_all_chunks` function. Logs an error if the operation
1465
 *   fails.
1466
 * - Destroys any active upstream connections (`ctx->u` and `ctx->imds_upstream`)
1467
 *   to free network resources.
1468
 * - Destroys mutexes (`resources_mutex`, `token_mutex`, `blob_mutex`) to ensure
1469
 *   proper synchronization cleanup.
1470
 * - Calls `azure_kusto_store_exit` to perform any additional storage-related
1471
 *   cleanup operations.
1472
 * - Finally, it calls `flb_azure_kusto_conf_destroy` to free the plugin context
1473
 *   and its associated resources.
1474
 *
1475
 * Parameters:
1476
 * - data: A pointer to the plugin context (`struct flb_azure_kusto`).
1477
 * - config: A pointer to the Fluent Bit configuration (`struct flb_config`).
1478
 *
1479
 * Returns:
1480
 * - 0 on successful cleanup.
1481
 * - -1 if the context is invalid or if an error occurs during cleanup.
1482
 */
1483
static int cb_azure_kusto_exit(void *data, struct flb_config *config)
1484
0
{
1485
0
    struct flb_azure_kusto *ctx = data;
1486
0
    int ret = -1;
1487
1488
0
    if (!ctx) {
1489
0
        return -1;
1490
0
    }
1491
1492
1493
0
    if (ctx->buffering_enabled == FLB_TRUE){
1494
0
        if (azure_kusto_store_has_data(ctx) == FLB_TRUE) {
1495
0
            flb_plg_info(ctx->ins, "Sending all locally buffered data to Kusto");
1496
0
            ret = ingest_all_chunks(ctx, config);
1497
0
            if (ret < 0) {
1498
0
                flb_plg_error(ctx->ins, "Could not send all chunks on exit");
1499
0
            }
1500
0
        }
1501
0
        azure_kusto_store_exit(ctx);
1502
0
    }
1503
1504
0
    if (ctx->u) {
1505
0
        flb_upstream_destroy(ctx->u);
1506
0
        ctx->u = NULL;
1507
0
    }
1508
1509
0
    pthread_mutex_destroy(&ctx->resources_mutex);
1510
0
    pthread_mutex_destroy(&ctx->token_mutex);
1511
0
    pthread_mutex_destroy(&ctx->blob_mutex);
1512
1513
0
    flb_azure_kusto_conf_destroy(ctx);
1514
1515
0
    return 0;
1516
0
}
1517
1518
static struct flb_config_map config_map[] = {
1519
    {FLB_CONFIG_MAP_STR, "tenant_id", (char *)NULL, 0, FLB_TRUE,
1520
     offsetof(struct flb_azure_kusto, tenant_id),
1521
     "Set the tenant ID of the AAD application used for authentication"},
1522
    {FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
1523
     offsetof(struct flb_azure_kusto, client_id),
1524
     "Set the client ID (Application ID) of the AAD application or the user-assigned managed identity's client ID when using managed identity authentication"},
1525
    {FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
1526
     offsetof(struct flb_azure_kusto, client_secret),
1527
     "Set the client secret (Application Password) of the AAD application used for "
1528
     "authentication"},
1529
    {FLB_CONFIG_MAP_STR, "workload_identity_token_file", (char *)NULL, 0, FLB_TRUE,
1530
     offsetof(struct flb_azure_kusto, workload_identity_token_file),
1531
     "Set the token file path for workload identity authentication"},
1532
    {FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE,
1533
     offsetof(struct flb_azure_kusto, auth_type_str),
1534
     "Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. "
1535
     "For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"},
1536
    {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
1537
     offsetof(struct flb_azure_kusto, ingestion_endpoint),
1538
     "Set the Kusto cluster's ingestion endpoint URL (e.g. "
1539
     "https://ingest-mycluster.eastus.kusto.windows.net)"},
1540
    {FLB_CONFIG_MAP_STR, "database_name", (char *)NULL, 0, FLB_TRUE,
1541
     offsetof(struct flb_azure_kusto, database_name), "Set the database name"},
1542
    {FLB_CONFIG_MAP_STR, "table_name", (char *)NULL, 0, FLB_TRUE,
1543
     offsetof(struct flb_azure_kusto, table_name), "Set the table name"},
1544
    {FLB_CONFIG_MAP_STR, "ingestion_mapping_reference", (char *)NULL, 0, FLB_TRUE,
1545
     offsetof(struct flb_azure_kusto, ingestion_mapping_reference),
1546
     "Set the ingestion mapping reference"},
1547
    {FLB_CONFIG_MAP_STR, "log_key", FLB_AZURE_KUSTO_DEFAULT_LOG_KEY, 0, FLB_TRUE,
1548
     offsetof(struct flb_azure_kusto, log_key), "The key name of event payload"},
1549
    {FLB_CONFIG_MAP_BOOL, "include_tag_key", "true", 0, FLB_TRUE,
1550
     offsetof(struct flb_azure_kusto, include_tag_key),
1551
     "If enabled, tag is appended to output. "
1552
     "The key name is used 'tag_key' property."},
1553
    {FLB_CONFIG_MAP_STR, "tag_key", FLB_AZURE_KUSTO_DEFAULT_TAG_KEY, 0, FLB_TRUE,
1554
     offsetof(struct flb_azure_kusto, tag_key),
1555
     "The key name of tag. If 'include_tag_key' is false, "
1556
     "This property is ignored"},
1557
    {FLB_CONFIG_MAP_BOOL, "include_time_key", "true", 0, FLB_TRUE,
1558
     offsetof(struct flb_azure_kusto, include_time_key),
1559
     "If enabled, time is appended to output. "
1560
     "The key name is used 'time_key' property."},
1561
    {FLB_CONFIG_MAP_STR, "time_key", FLB_AZURE_KUSTO_DEFAULT_TIME_KEY, 0, FLB_TRUE,
1562
     offsetof(struct flb_azure_kusto, time_key),
1563
     "The key name of the time. If 'include_time_key' is false, "
1564
     "This property is ignored"},
1565
    {FLB_CONFIG_MAP_TIME, "ingestion_endpoint_connect_timeout", FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT, 0, FLB_TRUE,
1566
     offsetof(struct flb_azure_kusto, ingestion_endpoint_connect_timeout),
1567
    "Set the connection timeout of various kusto endpoints (kusto ingest endpoint, kusto ingestion blob endpoint, kusto ingestion queue endpoint) in seconds."
1568
    "The default is 60 seconds."},
1569
    {FLB_CONFIG_MAP_BOOL, "compression_enabled", "true", 0, FLB_TRUE,
1570
     offsetof(struct flb_azure_kusto, compression_enabled),
1571
    "Enable HTTP payload compression (gzip)."
1572
    "The default is true."},
1573
    {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
1574
     offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
1575
    "Set the azure kusto ingestion resources refresh interval"
1576
    "The default is 3600 seconds."},
1577
    {FLB_CONFIG_MAP_BOOL, "buffering_enabled", "false", 0, FLB_TRUE,
1578
     offsetof(struct flb_azure_kusto, buffering_enabled), "Enable buffering into disk before ingesting into Azure Kusto."
1579
    },
1580
    {FLB_CONFIG_MAP_STR, "buffer_dir", "/tmp/fluent-bit/azure-kusto/", 0, FLB_TRUE,
1581
     offsetof(struct flb_azure_kusto, buffer_dir), "Specifies the location of directory where the buffered data will be stored."
1582
    },
1583
    {FLB_CONFIG_MAP_TIME, "upload_timeout", "30m",
1584
     0, FLB_TRUE, offsetof(struct flb_azure_kusto, upload_timeout),
1585
    "Optionally specify a timeout for uploads. "
1586
    "Fluent Bit will start ingesting buffer files which have been created more than x minutes and haven't reached upload_file_size limit yet.  "
1587
    " Default is 30m."
1588
    },
1589
    {FLB_CONFIG_MAP_SIZE, "upload_file_size", "200M",
1590
     0, FLB_TRUE, offsetof(struct flb_azure_kusto, file_size),
1591
    "Specifies the size of files to be uploaded in MBs. Default is 200MB"
1592
    },
1593
    {FLB_CONFIG_MAP_STR, "azure_kusto_buffer_key", "key",0, FLB_TRUE,
1594
     offsetof(struct flb_azure_kusto, azure_kusto_buffer_key),
1595
    "Set the azure kusto buffer key which needs to be specified when using multiple instances of azure kusto output plugin and buffering is enabled"
1596
    },
1597
    {FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE,0, FLB_TRUE,
1598
     offsetof(struct flb_azure_kusto, store_dir_limit_size),
1599
    "Set the max size of the buffer directory. Default is 8GB"
1600
    },
1601
    {FLB_CONFIG_MAP_BOOL, "buffer_file_delete_early", "false",0, FLB_TRUE,
1602
     offsetof(struct flb_azure_kusto, buffer_file_delete_early),
1603
    "Whether to delete the buffered file early after successful blob creation. Default is false"
1604
    },
1605
    {FLB_CONFIG_MAP_BOOL, "unify_tag", "true",0, FLB_TRUE,
1606
     offsetof(struct flb_azure_kusto, unify_tag),
1607
    "This creates a single buffer file when the buffering mode is ON. Default is true"
1608
    },
1609
    {FLB_CONFIG_MAP_INT, "blob_uri_length", "64",0, FLB_TRUE,
1610
     offsetof(struct flb_azure_kusto, blob_uri_length),
1611
    "Set the length of generated blob uri before ingesting to kusto. Default is 64"
1612
    },
1613
    {FLB_CONFIG_MAP_INT, "scheduler_max_retries", "3",0, FLB_TRUE,
1614
     offsetof(struct flb_azure_kusto, scheduler_max_retries),
1615
    "Set the maximum number of retries for ingestion using the scheduler. Default is 3"
1616
    },
1617
    {FLB_CONFIG_MAP_BOOL, "delete_on_max_upload_error", "false",0, FLB_TRUE,
1618
     offsetof(struct flb_azure_kusto, delete_on_max_upload_error),
1619
    "Whether to delete the buffer file on maximum upload errors. Default is false"
1620
    },
1621
    {FLB_CONFIG_MAP_TIME, "io_timeout", "60s",0, FLB_TRUE,
1622
     offsetof(struct flb_azure_kusto, io_timeout),
1623
    "HTTP IO timeout. Default is 60s"
1624
    },
1625
    /* EOF */
1626
    {0}};
1627
1628
struct flb_output_plugin out_azure_kusto_plugin = {
1629
    .name = "azure_kusto",
1630
    .description = "Send events to Kusto (Azure Data Explorer)",
1631
    .cb_init = cb_azure_kusto_init,
1632
    .cb_flush = cb_azure_kusto_flush,
1633
    .cb_exit = cb_azure_kusto_exit,
1634
    .config_map = config_map,
1635
    /* Plugin flags */
1636
    .flags = FLB_OUTPUT_NET | FLB_IO_TLS,
1637
};