Coverage Report

Created: 2026-03-22 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_azure_blob/azure_blob.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_sds.h>
22
#include <fluent-bit/flb_kv.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_config_map.h>
26
#include <fluent-bit/flb_gzip.h>
27
#include <fluent-bit/flb_base64.h>
28
#include <fluent-bit/flb_sqldb.h>
29
#include <fluent-bit/flb_input_blob.h>
30
#include <fluent-bit/flb_log_event_decoder.h>
31
#include <fluent-bit/flb_plugin.h>
32
#include <fluent-bit/flb_notification.h>
33
#include <fluent-bit/flb_scheduler.h>
34
35
#include <msgpack.h>
36
37
#include "azure_blob.h"
38
#include "azure_blob_db.h"
39
#include "azure_blob_uri.h"
40
#include "azure_blob_conf.h"
41
#include "azure_blob_appendblob.h"
42
#include "azure_blob_blockblob.h"
43
#include "azure_blob_http.h"
44
#include "azure_blob_store.h"
45
46
0
#define CREATE_BLOB  1337
47
48
/* thread_local_storage for workers */
49
50
struct worker_info {
51
    int active_upload;
52
};
53
54
FLB_TLS_DEFINE(struct worker_info, worker_info);
55
56
static int azure_blob_format(struct flb_config *config,
57
                             struct flb_input_instance *ins,
58
                             void *plugin_context,
59
                             void *flush_ctx,
60
                             int event_type,
61
                             const char *tag, int tag_len,
62
                             const void *data, size_t bytes,
63
                             void **out_data, size_t *out_size)
64
0
{
65
0
    flb_sds_t out_buf;
66
0
    struct flb_azure_blob *ctx = plugin_context;
67
68
0
    out_buf = flb_pack_msgpack_to_json_format(data, bytes,
69
0
                                              FLB_PACK_JSON_FORMAT_LINES,
70
0
                                              FLB_PACK_JSON_DATE_ISO8601,
71
0
                                              ctx->date_key,
72
0
                                              config->json_escape_unicode);
73
0
    if (!out_buf) {
74
0
        return -1;
75
0
    }
76
77
0
    *out_data = out_buf;
78
0
    *out_size = flb_sds_len(out_buf);
79
0
    return 0;
80
0
}
81
82
/*
83
 * Either new_data or chunk can be NULL, but not both
84
 */
85
static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_data,
86
                                    struct azure_blob_file *upload_file,
87
                                    char **out_buf, size_t *out_size)
88
0
{
89
0
    char *body;
90
0
    char *tmp;
91
0
    size_t body_size = 0;
92
0
    char *buffered_data = NULL;
93
0
    size_t buffer_size = 0;
94
0
    int ret;
95
96
0
    if (new_data == NULL && upload_file == NULL) {
97
0
        flb_plg_error(ctx->ins, "[construct_request_buffer] Something went wrong"
98
0
                                " both chunk and new_data are NULL");
99
0
        return -1;
100
0
    }
101
102
0
    if (upload_file) {
103
0
        ret = azure_blob_store_file_upload_read(ctx, upload_file->fsf, &buffered_data, &buffer_size);
104
0
        if (ret < 0) {
105
0
            flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
106
0
                          upload_file->fsf->name);
107
0
            return -1;
108
0
        }
109
110
        /*
111
         * lock the upload_file from buffer list
112
         */
113
0
        azure_blob_store_file_lock(upload_file);
114
0
        body = buffered_data;
115
0
        body_size = buffer_size;
116
0
    }
117
118
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] size of buffer file read %zu", buffer_size);
119
120
    /*
121
     * If new data is arriving, increase the original 'buffered_data' size
122
     * to append the new one.
123
     */
124
0
    if (new_data) {
125
0
        body_size += flb_sds_len(new_data);
126
0
        flb_plg_debug(ctx->ins, "[construct_request_buffer] size of new_data %zu", body_size);
127
128
0
        tmp = flb_realloc(buffered_data, body_size + 1);
129
0
        if (!tmp) {
130
0
            flb_errno();
131
0
            flb_free(buffered_data);
132
0
            if (upload_file) {
133
0
                azure_blob_store_file_unlock(upload_file);
134
0
            }
135
0
            return -1;
136
0
        }
137
0
        body = buffered_data = tmp;
138
0
        memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
139
0
        if (ctx->compress_gzip == FLB_FALSE){
140
0
            body[body_size] = '\0';
141
0
        }
142
0
    }
143
144
0
    flb_plg_debug(ctx->ins, "[construct_request_buffer] final increased %zu", body_size);
145
146
0
    *out_buf = body;
147
0
    *out_size = body_size;
148
149
0
    return 0;
150
0
}
151
152
void generate_random_string_blob(char *str, size_t length)
153
0
{
154
0
    const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
155
0
    const size_t charset_size = sizeof(charset) - 1;
156
0
    size_t i;
157
0
    size_t index;
158
159
    /* Seed the random number generator with multiple sources of entropy */
160
0
    unsigned int seed = (unsigned int)(time(NULL) ^ clock() ^ getpid());
161
0
    srand(seed);
162
163
0
    for (i = 0; i < length; ++i) {
164
0
        index = (size_t)rand() % charset_size;
165
0
        str[i] = charset[index];
166
0
    }
167
168
0
    str[length] = '\0';
169
0
}
170
171
static int create_blob(struct flb_azure_blob *ctx, char *name)
172
0
{
173
0
    int ret;
174
0
    size_t b_sent;
175
0
    flb_sds_t uri = NULL;
176
0
    struct flb_http_client *c;
177
0
    struct flb_connection *u_conn;
178
179
0
    uri = azb_uri_create_blob(ctx, name);
180
0
    if (!uri) {
181
0
        return FLB_RETRY;
182
0
    }
183
184
0
    if (ctx->buffering_enabled == FLB_TRUE){
185
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
186
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
187
0
    }
188
189
    /* Get upstream connection */
190
0
    u_conn = flb_upstream_conn_get(ctx->u);
191
0
    if (!u_conn) {
192
0
        flb_plg_error(ctx->ins,
193
0
                      "cannot create upstream connection for create_append_blob");
194
0
        flb_sds_destroy(uri);
195
0
        return FLB_RETRY;
196
0
    }
197
198
    /* Create HTTP client context */
199
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
200
0
                        uri,
201
0
                        NULL, 0, NULL, 0, NULL, 0);
202
0
    if (!c) {
203
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
204
0
        flb_upstream_conn_release(u_conn);
205
0
        flb_sds_destroy(uri);
206
0
        return FLB_RETRY;
207
0
    }
208
209
    /* Prepare headers and authentication */
210
0
    azb_http_client_setup(ctx, c, -1, FLB_TRUE,
211
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
212
213
    /* Send HTTP request */
214
0
    ret = flb_http_do(c, &b_sent);
215
0
    flb_sds_destroy(uri);
216
217
0
    if (ret == -1) {
218
0
        flb_plg_error(ctx->ins, "error sending append_blob");
219
0
        flb_http_client_destroy(c);
220
0
        flb_upstream_conn_release(u_conn);
221
0
        return FLB_RETRY;
222
0
    }
223
224
0
    if (c->resp.status == 201) {
225
        /* delete "&sig=..." in the c->uri for security */
226
0
        char *p = strstr(c->uri, "&sig=");
227
0
        if (p) {
228
0
            *p = '\0';
229
0
        }
230
0
        flb_plg_info(ctx->ins, "blob created successfully: %s", c->uri);
231
0
    }
232
0
    else {
233
0
        if (c->resp.payload_size > 0) {
234
0
            flb_plg_error(ctx->ins, "http_status=%i cannot create append blob\n%s",
235
0
                          c->resp.status, c->resp.payload);
236
0
        }
237
0
        else {
238
0
            flb_plg_error(ctx->ins, "http_status=%i cannot create append blob",
239
0
                          c->resp.status);
240
0
        }
241
0
        flb_http_client_destroy(c);
242
0
        flb_upstream_conn_release(u_conn);
243
0
        return FLB_RETRY;
244
0
    }
245
246
0
    flb_http_client_destroy(c);
247
0
    flb_upstream_conn_release(u_conn);
248
0
    return FLB_OK;
249
0
}
250
251
static int delete_blob(struct flb_azure_blob *ctx, char *name)
252
0
{
253
0
    int ret;
254
0
    size_t b_sent;
255
0
    flb_sds_t uri = NULL;
256
0
    struct flb_http_client *c;
257
0
    struct flb_connection *u_conn;
258
259
0
    uri = azb_uri_create_blob(ctx, name);
260
0
    if (!uri) {
261
0
        return FLB_RETRY;
262
0
    }
263
264
    /* Get upstream connection */
265
0
    u_conn = flb_upstream_conn_get(ctx->u);
266
0
    if (!u_conn) {
267
0
        flb_plg_error(ctx->ins,
268
0
                      "cannot create upstream connection for create_append_blob");
269
0
        flb_sds_destroy(uri);
270
0
        return FLB_RETRY;
271
0
    }
272
273
    /* Create HTTP client context */
274
0
    c = flb_http_client(u_conn, FLB_HTTP_DELETE,
275
0
                        uri,
276
0
                        NULL, 0, NULL, 0, NULL, 0);
277
0
    if (!c) {
278
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
279
0
        flb_upstream_conn_release(u_conn);
280
0
        flb_sds_destroy(uri);
281
0
        return FLB_RETRY;
282
0
    }
283
284
    /* Prepare headers and authentication */
285
0
    azb_http_client_setup(ctx, c, -1, FLB_TRUE,
286
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
287
288
    /* Send HTTP request */
289
0
    ret = flb_http_do(c, &b_sent);
290
0
    flb_sds_destroy(uri);
291
292
0
    if (ret == -1) {
293
0
        flb_plg_error(ctx->ins, "error sending append_blob");
294
0
        flb_http_client_destroy(c);
295
0
        flb_upstream_conn_release(u_conn);
296
0
        return FLB_RETRY;
297
0
    }
298
299
0
    if (c->resp.status == 201) {
300
        /* delete "&sig=..." in the c->uri for security */
301
0
        char *p = strstr(c->uri, "&sig=");
302
0
        if (p) {
303
0
            *p = '\0';
304
0
        }
305
0
        flb_plg_info(ctx->ins, "blob deleted successfully: %s", c->uri);
306
0
    }
307
0
    else {
308
0
        if (c->resp.payload_size > 0) {
309
0
            flb_plg_error(ctx->ins, "http_status=%i cannot delete append blob\n%s",
310
0
                          c->resp.status, c->resp.payload);
311
0
        }
312
0
        else {
313
0
            flb_plg_error(ctx->ins, "http_status=%i cannot delete append blob",
314
0
                          c->resp.status);
315
0
        }
316
0
        flb_http_client_destroy(c);
317
0
        flb_upstream_conn_release(u_conn);
318
0
        return FLB_RETRY;
319
0
    }
320
321
0
    flb_http_client_destroy(c);
322
0
    flb_upstream_conn_release(u_conn);
323
0
    return FLB_OK;
324
0
}
325
326
static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
327
                          flb_sds_t ref_name,
328
                          flb_sds_t uri,
329
                          flb_sds_t block_id,
330
                          int event_type,
331
                          void *data, size_t bytes)
332
0
{
333
0
    int ret;
334
0
    int compressed = FLB_FALSE;
335
0
    int content_encoding = FLB_FALSE;
336
0
    int content_type = FLB_FALSE;
337
0
    size_t b_sent;
338
0
    void *payload_buf;
339
0
    size_t payload_size;
340
0
    struct flb_http_client *c;
341
0
    struct flb_connection *u_conn;
342
343
0
    flb_plg_debug(ctx->ins, "generated blob uri ::: %s", uri);
344
345
0
    if (ctx->buffering_enabled == FLB_TRUE){
346
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
347
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
348
0
    }
349
350
    /* Get upstream connection */
351
0
    u_conn = flb_upstream_conn_get(ctx->u);
352
0
    if (!u_conn) {
353
0
        flb_plg_error(ctx->ins,
354
0
                      "cannot create TCP upstream connection");
355
0
        return FLB_RETRY;
356
0
    }
357
358
0
    payload_buf = data;
359
0
    payload_size = bytes;
360
361
    /* Handle compression requests */
362
0
    if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
363
0
        ret = flb_gzip_compress((void *) data, bytes, &payload_buf, &payload_size);
364
0
        if (ret == 0) {
365
0
            compressed = FLB_TRUE;
366
0
        }
367
0
        else {
368
0
            flb_plg_warn(ctx->ins,
369
0
                        "cannot gzip payload, disabling compression");
370
0
            payload_buf = data;
371
0
            payload_size = bytes;
372
0
        }
373
0
    }
374
375
    /* set http header flags */
376
0
    if (ctx->compress_blob == FLB_TRUE) {
377
0
        content_encoding = AZURE_BLOB_CE_NONE;
378
0
        content_type = AZURE_BLOB_CT_GZIP;
379
0
    }
380
0
    else if (compressed == FLB_TRUE) {
381
0
        content_encoding = AZURE_BLOB_CE_GZIP;
382
0
        content_type = AZURE_BLOB_CT_JSON;
383
0
    }
384
385
    /* Create HTTP client context */
386
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
387
0
                        uri,
388
0
                        payload_buf, payload_size, NULL, 0, NULL, 0);
389
0
    if (!c) {
390
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
391
0
        if (compressed == FLB_TRUE) {
392
0
            flb_free(payload_buf);
393
0
        }
394
0
        flb_upstream_conn_release(u_conn);
395
0
        return FLB_RETRY;
396
0
    }
397
398
    /* Prepare headers and authentication */
399
0
    azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
400
0
                          content_type, content_encoding);
401
402
    /* Send HTTP request */
403
0
    ret = flb_http_do(c, &b_sent);
404
405
    /* Release compressed buffer */
406
0
    if (compressed == FLB_TRUE) {
407
0
        flb_free(payload_buf);
408
0
    }
409
410
0
    flb_upstream_conn_release(u_conn);
411
412
    /* Validate HTTP status */
413
0
    if (ret == -1) {
414
0
        flb_plg_error(ctx->ins, "error sending append_blob for %s", ref_name);
415
0
        return FLB_RETRY;
416
0
    }
417
418
0
    if (c->resp.status == 201) {
419
0
        flb_plg_info(ctx->ins, "content uploaded successfully: %s", ref_name);
420
0
        flb_http_client_destroy(c);
421
0
        return FLB_OK;
422
0
    }
423
0
    else if (c->resp.status == 404) {
424
        /* delete "&sig=..." in the c->uri for security */
425
0
        char *p = strstr(c->uri, "&sig=");
426
0
        if (p) {
427
0
            *p = '\0';
428
0
        }
429
430
0
        flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
431
0
        flb_http_client_destroy(c);
432
0
        return CREATE_BLOB;
433
0
    }
434
0
    else if (c->resp.payload_size > 0) {
435
0
        flb_plg_error(ctx->ins, "http_status=%i cannot append content to blob\n%s",
436
0
                      c->resp.status, c->resp.payload);
437
0
        if (strstr(c->resp.payload, "must be 0 for Create Append")) {
438
0
            flb_http_client_destroy(c);
439
0
            return CREATE_BLOB;
440
0
        }
441
0
    }
442
0
    else {
443
0
        flb_plg_error(ctx->ins, "cannot upload %s content to blob (http_status=%i)",
444
0
                      ref_name, c->resp.status);
445
0
    }
446
0
    flb_http_client_destroy(c);
447
448
0
    return FLB_RETRY;
449
0
}
450
451
static int send_blob(struct flb_config *config,
452
                     struct flb_input_instance *i_ins,
453
                     struct flb_azure_blob *ctx,
454
                     int event_type,
455
                     int blob_type, char *name, uint64_t part_id,
456
                     char *tag, int tag_len, void *data, size_t bytes)
457
0
{
458
0
    int ret;
459
0
    uint64_t ms = 0;
460
0
    flb_sds_t uri = NULL;
461
0
    flb_sds_t block_id = NULL;
462
0
    flb_sds_t ref_name = NULL;
463
0
    void *payload_buf = data;
464
0
    size_t payload_size = bytes;
465
0
    char *generated_random_string;
466
467
0
    ref_name = flb_sds_create_size(256);
468
0
    if (!ref_name) {
469
0
        return FLB_RETRY;
470
0
    }
471
472
    /* Allocate memory for the random string dynamically */
473
0
    generated_random_string = flb_malloc(ctx->blob_uri_length + 1);
474
0
    if (!generated_random_string) {
475
0
        flb_errno();
476
0
        flb_plg_error(ctx->ins, "cannot allocate memory for random string");
477
0
        flb_sds_destroy(ref_name);
478
0
        return FLB_RETRY;
479
0
    }
480
481
0
    if (blob_type == AZURE_BLOB_APPENDBLOB) {
482
0
        uri = azb_append_blob_uri(ctx, tag);
483
0
    }
484
0
    else if (blob_type == AZURE_BLOB_BLOCKBLOB) {
485
0
        generate_random_string_blob(generated_random_string, ctx->blob_uri_length); /* Generate the random string */
486
0
        if (event_type == FLB_EVENT_TYPE_LOGS) {
487
0
            block_id = azb_block_blob_id_logs(&ms);
488
0
            if (!block_id) {
489
0
                flb_plg_error(ctx->ins, "could not generate block id");
490
0
                flb_free(generated_random_string);
491
0
                cfl_sds_destroy(ref_name);
492
0
                return FLB_RETRY;
493
0
            }
494
0
            uri = azb_block_blob_uri(ctx, tag, block_id, ms, generated_random_string);
495
0
            ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms);
496
0
        }
497
0
        else if (event_type == FLB_EVENT_TYPE_BLOBS) {
498
0
            block_id = azb_block_blob_id_blob(ctx, name, part_id);
499
0
            uri = azb_block_blob_uri(ctx, name, block_id, 0, generated_random_string);
500
0
            ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id);
501
0
        }
502
0
    }
503
504
0
    if (!uri) {
505
0
        flb_free(generated_random_string);
506
0
        if (block_id != NULL) {
507
0
            flb_free(block_id);
508
0
        }
509
0
        flb_sds_destroy(ref_name);
510
0
        return FLB_RETRY;
511
0
    }
512
513
    /* Map buffer */
514
0
    payload_buf = data;
515
0
    payload_size = bytes;
516
517
0
    ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size);
518
0
    flb_plg_debug(ctx->ins, "http_send_blob()=%i", ret);
519
520
0
    if (ret == FLB_OK) {
521
        /* For Logs type, we need to commit the block right away */
522
0
        if (event_type == FLB_EVENT_TYPE_LOGS) {
523
0
            ret = azb_block_blob_commit_block(ctx, block_id, tag, ms, generated_random_string);
524
0
        }
525
0
    }
526
0
    else if (ret == CREATE_BLOB) {
527
0
        ret = create_blob(ctx, name);
528
0
        if (ret == FLB_OK) {
529
0
            ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size);
530
0
        }
531
0
    }
532
0
    flb_sds_destroy(ref_name);
533
534
0
    if (payload_buf != data) {
535
0
        flb_sds_destroy(payload_buf);
536
0
    }
537
538
0
    flb_sds_destroy(uri);
539
0
    flb_free(generated_random_string);
540
541
0
    if (block_id != NULL) {
542
0
        flb_free(block_id);
543
0
    }
544
545
0
    return ret;
546
0
}
547
548
static int create_container(struct flb_azure_blob *ctx, char *name)
549
0
{
550
0
    int ret;
551
0
    size_t b_sent;
552
0
    flb_sds_t uri;
553
0
    struct flb_http_client *c;
554
0
    struct flb_connection *u_conn;
555
556
0
    if (ctx->buffering_enabled == FLB_TRUE){
557
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
558
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
559
0
    }
560
561
    /* Get upstream connection */
562
0
    u_conn = flb_upstream_conn_get(ctx->u);
563
0
    if (!u_conn) {
564
0
        flb_plg_error(ctx->ins,
565
0
                      "cannot create upstream connection for container creation");
566
0
        return FLB_FALSE;
567
0
    }
568
569
    /* URI */
570
0
    uri = azb_uri_ensure_or_create_container(ctx);
571
0
    if (!uri) {
572
0
        flb_upstream_conn_release(u_conn);
573
0
        return FLB_FALSE;
574
0
    }
575
576
    /* Create HTTP client context */
577
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
578
0
                        uri,
579
0
                        NULL, 0, NULL, 0, NULL, 0);
580
0
    if (!c) {
581
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
582
0
        flb_upstream_conn_release(u_conn);
583
0
        return FLB_FALSE;
584
0
    }
585
586
    /* Prepare headers and authentication */
587
0
    azb_http_client_setup(ctx, c, -1, FLB_FALSE,
588
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
589
590
    /* Send HTTP request */
591
0
    ret = flb_http_do(c, &b_sent);
592
593
    /* Release URI */
594
0
    flb_sds_destroy(uri);
595
596
    /* Validate http response */
597
0
    if (ret == -1) {
598
0
        flb_plg_error(ctx->ins, "error requesting container creation");
599
0
        flb_http_client_destroy(c);
600
0
        flb_upstream_conn_release(u_conn);
601
0
        return FLB_FALSE;
602
0
    }
603
604
0
    if (c->resp.status == 201) {
605
0
        flb_plg_info(ctx->ins, "container '%s' created sucessfully", name);
606
0
    }
607
0
    else {
608
0
        if (c->resp.payload_size > 0) {
609
0
            flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
610
0
                          name, c->resp.payload);
611
0
        }
612
0
        else {
613
0
            flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
614
0
                          name, c->resp.payload);
615
0
        }
616
0
        flb_http_client_destroy(c);
617
0
        flb_upstream_conn_release(u_conn);
618
0
        return FLB_FALSE;
619
0
    }
620
621
0
    flb_http_client_destroy(c);
622
0
    flb_upstream_conn_release(u_conn);
623
0
    return FLB_TRUE;
624
0
}
625
626
/*
627
 * Check that the container exists, if it doesn't and the configuration property
628
 * auto_create_container is enabled, it will send a request to create it. If it
629
 * could not be created, it returns FLB_FALSE.
630
 * If auto_create_container is disabled, it will return FLB_TRUE assuming the container
631
 * already exists.
632
 */
633
static int ensure_container(struct flb_azure_blob *ctx)
634
0
{
635
0
    int ret;
636
0
    int status;
637
0
    size_t b_sent;
638
0
    flb_sds_t uri;
639
0
    struct flb_http_client *c;
640
0
    struct flb_connection *u_conn;
641
642
0
    if (!ctx->auto_create_container) {
643
0
        flb_plg_info(ctx->ins, "auto_create_container is disabled, assuming container '%s' already exists",
644
0
                     ctx->container_name);
645
0
        return FLB_TRUE;
646
0
    }
647
648
0
    uri = azb_uri_ensure_or_create_container(ctx);
649
0
    if (!uri) {
650
0
        flb_plg_error(ctx->ins, "cannot create container URI");
651
0
        return FLB_FALSE;
652
0
    }
653
654
0
    if (ctx->buffering_enabled == FLB_TRUE){
655
0
        ctx->u->base.flags &= ~(FLB_IO_ASYNC);
656
0
        ctx->u->base.net.io_timeout = ctx->io_timeout;
657
0
    }
658
659
    /* Get upstream connection */
660
0
    u_conn = flb_upstream_conn_get(ctx->u);
661
0
    if (!u_conn) {
662
0
        flb_plg_error(ctx->ins,
663
0
                      "cannot create upstream connection for container check");
664
0
        flb_sds_destroy(uri);
665
0
        return FLB_FALSE;
666
0
    }
667
668
    /* Create HTTP client context */
669
0
    c = flb_http_client(u_conn, FLB_HTTP_GET,
670
0
                        uri,
671
0
                        NULL, 0, NULL, 0, NULL, 0);
672
0
    if (!c) {
673
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
674
0
        flb_upstream_conn_release(u_conn);
675
0
        return FLB_FALSE;
676
0
    }
677
0
    flb_http_strip_port_from_host(c);
678
679
    /* Prepare headers and authentication */
680
0
    azb_http_client_setup(ctx, c, -1, FLB_FALSE,
681
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
682
683
    /* Send HTTP request */
684
0
    ret = flb_http_do(c, &b_sent);
685
0
    flb_sds_destroy(uri);
686
687
0
    if (ret == -1) {
688
0
        flb_plg_error(ctx->ins, "error requesting container properties");
689
0
        flb_upstream_conn_release(u_conn);
690
0
        return FLB_FALSE;
691
0
    }
692
693
0
    status = c->resp.status;
694
0
    flb_http_client_destroy(c);
695
696
    /* Release connection */
697
0
    flb_upstream_conn_release(u_conn);
698
699
    /* Request was successful, validate HTTP status code */
700
0
    if (status == 404) {
701
        /* The container was not found, try to create it */
702
0
        flb_plg_info(ctx->ins, "container '%s' not found, trying to create it",
703
0
                     ctx->container_name);
704
0
        ret = create_container(ctx, ctx->container_name);
705
0
        return ret;
706
0
    }
707
0
    else if (status == 200) {
708
0
        flb_plg_info(ctx->ins, "container '%s' already exists", ctx->container_name);
709
0
        return FLB_TRUE;
710
0
    }
711
0
    else if (status == 403) {
712
0
        flb_plg_error(ctx->ins, "failed getting container '%s', access denied",
713
0
                      ctx->container_name);
714
0
        return FLB_FALSE;
715
0
    }
716
    
717
0
    flb_plg_error(ctx->ins, "get container request failed, status=%i",
718
0
                  status);
719
720
0
    return FLB_FALSE;
721
0
}
722
723
static int cb_azure_blob_init(struct flb_output_instance *ins,
724
                              struct flb_config *config, void *data)
725
0
{
726
0
    struct flb_azure_blob *ctx = NULL;
727
0
    (void) ins;
728
0
    (void) config;
729
0
    (void) data;
730
731
0
    FLB_TLS_INIT(worker_info);
732
733
0
    ctx = flb_azure_blob_conf_create(ins, config);
734
0
    if (!ctx) {
735
0
        return -1;
736
0
    }
737
738
0
    if (ctx->buffering_enabled == FLB_TRUE) {
739
0
        ctx->ins = ins;
740
0
        ctx->retry_time = 0;
741
742
        /* Initialize local storage */
743
0
        int ret = azure_blob_store_init(ctx);
744
0
        if (ret == -1) {
745
0
            flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s",
746
0
                          ctx->store_dir);
747
0
            return -1;
748
0
        }
749
750
        /* validate 'total_file_size' */
751
0
        if (ctx->file_size <= 0) {
752
0
            flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
753
0
            return -1;
754
0
        }
755
0
        if (ctx->file_size < 1000000) {
756
0
            flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
757
0
            return -1;
758
0
        }
759
0
        if (ctx->file_size > MAX_FILE_SIZE) {
760
0
            flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
761
0
            return -1;
762
0
        }
763
0
        ctx->has_old_buffers = azure_blob_store_has_data(ctx);
764
0
        ctx->timer_created = FLB_FALSE;
765
0
        ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
766
0
        flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
767
0
    }
768
769
0
    flb_output_set_context(ins, ctx);
770
771
0
    flb_output_set_http_debug_callbacks(ins);
772
0
    return 0;
773
0
}
774
775
static int blob_chunk_register_parts(struct flb_azure_blob *ctx, uint64_t file_id, size_t total_size)
776
0
{
777
0
    int ret;
778
0
    int64_t parts = 0;
779
0
    int64_t id;
780
0
    size_t offset_start = 0;
781
0
    size_t offset_end = 0;
782
783
    /* generate file parts */
784
0
    while (offset_start < total_size) {
785
0
        offset_end = offset_start + ctx->part_size;
786
787
        /* do not exceed maximum size */
788
0
        if (offset_end > total_size) {
789
0
            offset_end = total_size;
790
0
        }
791
792
        /* insert part */
793
0
        ret = azb_db_file_part_insert(ctx, file_id, parts, offset_start, offset_end, &id);
794
0
        if (ret == -1) {
795
0
            flb_plg_error(ctx->ins, "cannot insert blob file part into database");
796
0
            return -1;
797
0
        }
798
0
        offset_start = offset_end;
799
0
        parts++;
800
0
    }
801
802
0
    return parts;
803
0
}
804
805
static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk *event_chunk)
806
0
{
807
0
    int64_t ret;
808
0
    int64_t file_id;
809
0
    cfl_sds_t file_path = NULL;
810
0
    cfl_sds_t source = NULL;
811
0
    size_t file_size;
812
0
    msgpack_object map;
813
814
0
    struct flb_log_event_decoder log_decoder;
815
0
    struct flb_log_event         log_event;
816
817
0
    if (ctx->db == NULL) {
818
0
        flb_plg_error(ctx->ins, "Cannot process blob because this operation requires a database.");
819
820
0
        return -1;
821
0
    }
822
823
0
    ret = flb_log_event_decoder_init(&log_decoder,
824
0
                                    (char *) event_chunk->data,
825
0
                                     event_chunk->size);
826
827
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
828
0
        flb_plg_error(ctx->ins,
829
0
                    "Log event decoder initialization error : %i", (int) ret);
830
0
        return -1;
831
832
0
    }
833
834
0
    while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) {
835
0
        map = *log_event.body;
836
0
        ret = flb_input_blob_file_get_info(map, &source, &file_path, &file_size);
837
0
        if (ret == -1) {
838
0
            flb_plg_error(ctx->ins, "cannot get file info from blob record, skipping");
839
0
            continue;
840
0
        }
841
842
0
        ret = azb_db_file_insert(ctx, source, ctx->real_endpoint, file_path, file_size);
843
844
0
        if (ret == -1) {
845
0
            flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)",
846
0
                          file_path, file_size);
847
0
            cfl_sds_destroy(file_path);
848
0
            cfl_sds_destroy(source);
849
0
            continue;
850
0
        }
851
0
        cfl_sds_destroy(file_path);
852
0
        cfl_sds_destroy(source);
853
854
        /* generate the parts by using the newest id created (ret) */
855
0
        file_id = ret;
856
0
        ret = blob_chunk_register_parts(ctx, file_id, file_size);
857
0
        if (ret == -1) {
858
0
            flb_plg_error(ctx->ins, "cannot register blob file '%s 'parts into database",
859
0
                            file_path);
860
0
            return -1;
861
0
        }
862
863
0
        flb_plg_debug(ctx->ins, "blob file '%s' (id=%zu) registered with %zu parts",
864
0
                      file_path, file_id, ret);
865
0
    }
866
867
0
    flb_log_event_decoder_destroy(&log_decoder);
868
0
    return 0;
869
0
}
870
871
static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context)
872
0
{
873
0
    int ret;
874
0
    char *out_buf = NULL;
875
0
    size_t out_size;
876
0
    uint64_t id;
877
0
    uint64_t file_id;
878
0
    uint64_t part_id;
879
0
    uint64_t part_delivery_attempts;
880
0
    uint64_t file_delivery_attempts;
881
0
    off_t offset_start;
882
0
    off_t offset_end;
883
0
    cfl_sds_t file_destination = NULL;
884
0
    cfl_sds_t file_path = NULL;
885
0
    cfl_sds_t part_ids = NULL;
886
0
    cfl_sds_t source = NULL;
887
0
    struct flb_azure_blob *ctx = out_context;
888
0
    struct worker_info *info;
889
0
    struct flb_blob_delivery_notification *notification;
890
891
0
    info = FLB_TLS_GET(worker_info);
892
893
0
    if (info->active_upload) {
894
0
        flb_plg_trace(ctx->ins, "[worker: file upload] upload already in progress...");
895
0
        flb_sched_timer_cb_coro_return();
896
0
    }
897
898
0
    if (ctx->db == NULL) {
899
0
        flb_sched_timer_cb_coro_return();
900
0
    }
901
902
0
    info->active_upload = FLB_TRUE;
903
904
    /*
905
     * Check if is there any file which has been fully uploaded and we need to commit it with
906
     * the Put Block List operation
907
     */
908
909
0
    pthread_mutex_lock(&ctx->file_upload_commit_file_parts);
910
911
0
    while (1) {
912
0
        ret = azb_db_file_get_next_stale(ctx,
913
0
                                         &file_id,
914
0
                                         &file_path);
915
916
0
        if (ret == 1) {
917
0
            delete_blob(ctx, file_path);
918
919
0
            azb_db_file_reset_upload_states(ctx, file_id, file_path);
920
0
            azb_db_file_set_aborted_state(ctx, file_id, file_path, 0);
921
922
0
            cfl_sds_destroy(file_path);
923
924
0
            file_path = NULL;
925
0
        }
926
0
        else {
927
0
            break;
928
0
        }
929
0
    }
930
931
0
    while (1) {
932
0
        ret = azb_db_file_get_next_aborted(ctx,
933
0
                                           &file_id,
934
0
                                           &file_delivery_attempts,
935
0
                                           &file_path,
936
0
                                           &source);
937
938
0
        if (ret == 1) {
939
0
            ret = delete_blob(ctx, file_path);
940
941
0
            if (ctx->file_delivery_attempt_limit != FLB_OUT_RETRY_UNLIMITED &&
942
0
                file_delivery_attempts < ctx->file_delivery_attempt_limit) {
943
0
                azb_db_file_reset_upload_states(ctx, file_id, file_path);
944
0
                azb_db_file_set_aborted_state(ctx, file_id, file_path, 0);
945
0
            }
946
0
            else {
947
0
                ret = azb_db_file_delete(ctx, file_id, file_path);
948
949
0
                notification = flb_calloc(1,
950
0
                                        sizeof(
951
0
                                            struct flb_blob_delivery_notification));
952
953
0
                if (notification != NULL) {
954
0
                    notification->base.dynamically_allocated = FLB_TRUE;
955
0
                    notification->base.notification_type = FLB_NOTIFICATION_TYPE_BLOB_DELIVERY;
956
0
                    notification->base.destructor = flb_input_blob_delivery_notification_destroy;
957
0
                    notification->success = FLB_FALSE;
958
0
                    notification->path = cfl_sds_create(file_path);
959
960
0
                    ret = flb_notification_enqueue(FLB_PLUGIN_INPUT,
961
0
                                                source,
962
0
                                                &notification->base,
963
0
                                                config);
964
965
0
                    if (ret != 0) {
966
0
                        flb_plg_error(ctx->ins,
967
0
                                    "blob file '%s' (id=%" PRIu64 ") notification " \
968
0
                                    "delivery error %d", file_path, file_id, ret);
969
970
0
                        flb_notification_cleanup(&notification->base);
971
0
                    }
972
0
                }
973
0
            }
974
975
0
            cfl_sds_destroy(file_path);
976
0
            cfl_sds_destroy(source);
977
978
0
            file_path = NULL;
979
0
            source = NULL;
980
0
        }
981
0
        else {
982
0
            break;
983
0
        }
984
0
    }
985
986
0
    ret = azb_db_file_oldest_ready(ctx, &file_id, &file_path, &part_ids, &source);
987
0
    if (ret == 0) {
988
0
        flb_plg_trace(ctx->ins, "no blob files ready to commit");
989
0
    }
990
0
    else if (ret == -1) {
991
0
        flb_plg_error(ctx->ins, "cannot get oldest blob file ready to upload");
992
0
    }
993
0
    else if (ret == 1) {
994
        /* one file is ready to be committed */
995
0
        flb_plg_debug(ctx->ins, "blob file '%s' (id=%" PRIu64 ") ready to upload", file_path, file_id);
996
0
        ret = azb_block_blob_commit_file_parts(ctx, file_id, file_path, part_ids);
997
0
        if (ret == -1) {
998
0
            flb_plg_error(ctx->ins, "cannot commit blob file parts for file id=%" PRIu64 " path=%s",
999
0
                          file_id, file_path);
1000
0
        }
1001
0
        else {
1002
0
            flb_plg_info(ctx->ins, "blob file '%s' (id=%" PRIu64 ") committed successfully", file_path, file_id);
1003
            /* notify the engine the blob file has been processed */
1004
            /* FIXME! */
1005
1006
0
            notification = flb_calloc(1,
1007
0
                                    sizeof(
1008
0
                                        struct flb_blob_delivery_notification));
1009
1010
0
            if (notification != NULL) {
1011
0
                notification->base.dynamically_allocated = FLB_TRUE;
1012
0
                notification->base.notification_type = FLB_NOTIFICATION_TYPE_BLOB_DELIVERY;
1013
0
                notification->base.destructor = flb_input_blob_delivery_notification_destroy;
1014
0
                notification->success = FLB_TRUE;
1015
0
                notification->path = cfl_sds_create(file_path);
1016
1017
0
                ret = flb_notification_enqueue(FLB_PLUGIN_INPUT,
1018
0
                                               source,
1019
0
                                               &notification->base,
1020
0
                                               config);
1021
1022
0
                if (ret != 0) {
1023
0
                    flb_plg_error(ctx->ins,
1024
0
                                "blob file '%s' (id=%" PRIu64 ") notification " \
1025
0
                                "delivery error %d", file_path, file_id, ret);
1026
1027
0
                    flb_notification_cleanup(&notification->base);
1028
0
                }
1029
0
            }
1030
1031
            /* remove the file entry from the database */
1032
0
            ret = azb_db_file_delete(ctx, file_id, file_path);
1033
0
            if (ret == -1) {
1034
0
                flb_plg_error(ctx->ins, "cannot delete blob file '%s' (id=%" PRIu64 ") from the database",
1035
0
                              file_path, file_id);
1036
0
            }
1037
0
        }
1038
0
    }
1039
0
    pthread_mutex_unlock(&ctx->file_upload_commit_file_parts);
1040
1041
0
    if (file_path) {
1042
0
        cfl_sds_destroy(file_path);
1043
0
    }
1044
0
    if (part_ids) {
1045
0
        cfl_sds_destroy(part_ids);
1046
0
    }
1047
0
    if (source) {
1048
0
        cfl_sds_destroy(source);
1049
0
    }
1050
1051
    /* check for a next part file and lock it */
1052
0
    ret = azb_db_file_part_get_next(ctx, &id, &file_id, &part_id,
1053
0
                                    &offset_start, &offset_end,
1054
0
                                    &part_delivery_attempts,
1055
0
                                    &file_delivery_attempts,
1056
0
                                    &file_path,
1057
0
                                    &file_destination);
1058
0
    if (ret == -1) {
1059
0
        flb_plg_error(ctx->ins, "cannot get next blob file part");
1060
0
        info->active_upload = FLB_FALSE;
1061
0
        flb_sched_timer_cb_coro_return();
1062
0
    }
1063
0
    else if (ret == 0) {
1064
0
        flb_plg_trace(ctx->ins, "no more blob file parts to process");
1065
0
        info->active_upload = FLB_FALSE;
1066
0
        flb_sched_timer_cb_coro_return();
1067
0
    }
1068
0
    else if (ret == 1) {
1069
        /* just continue, the row info was retrieved */
1070
0
    }
1071
1072
1073
0
    if (strcmp(file_destination, ctx->real_endpoint) != 0) {
1074
0
        flb_plg_info(ctx->ins,
1075
0
                     "endpoint change detected, restarting file : %s\n%s\n%s",
1076
0
                     file_path,
1077
0
                     file_destination,
1078
0
                     ctx->real_endpoint);
1079
1080
0
        info->active_upload = FLB_FALSE;
1081
1082
        /* we need to set the aborted state flag to wait for existing uploads
1083
         * to finish and then wipe the slate and start again but we don't want
1084
         * to increment the failure count in this case.
1085
         */
1086
0
        azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);
1087
1088
0
        cfl_sds_destroy(file_path);
1089
0
        cfl_sds_destroy(file_destination);
1090
1091
0
        flb_sched_timer_cb_coro_return();
1092
0
    }
1093
1094
    /* since this is the first part we want to increment the files
1095
     * delivery attempt counter.
1096
     */
1097
0
    if (part_id == 0) {
1098
0
        ret = azb_db_file_delivery_attempts(ctx, file_id, ++file_delivery_attempts);
1099
0
    }
1100
1101
    /* read the file content */
1102
0
    ret = flb_utils_read_file_offset(file_path, offset_start, offset_end, &out_buf, &out_size);
1103
0
    if (ret == -1) {
1104
0
        flb_plg_error(ctx->ins, "cannot read file part %s", file_path);
1105
1106
0
        info->active_upload = FLB_FALSE;
1107
1108
0
        cfl_sds_destroy(file_path);
1109
0
        cfl_sds_destroy(file_destination);
1110
1111
0
        flb_sched_timer_cb_coro_return();
1112
0
    }
1113
1114
0
    azb_db_file_part_delivery_attempts(ctx, file_id, part_id, ++part_delivery_attempts);
1115
1116
0
    flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id);
1117
1118
0
    ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS,
1119
0
                    AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size);
1120
1121
0
    if (ret == FLB_OK) {
1122
0
        ret = azb_db_file_part_uploaded(ctx, id);
1123
1124
0
        if (ret == -1) {
1125
0
            info->active_upload = FLB_FALSE;
1126
1127
0
            cfl_sds_destroy(file_path);
1128
0
            cfl_sds_destroy(file_destination);
1129
1130
0
            flb_sched_timer_cb_coro_return();
1131
0
        }
1132
0
    }
1133
0
    else if (ret == FLB_RETRY) {
1134
0
        azb_db_file_part_in_progress(ctx, 0, id);
1135
1136
0
        if (ctx->part_delivery_attempt_limit != FLB_OUT_RETRY_UNLIMITED &&
1137
0
            part_delivery_attempts >= ctx->part_delivery_attempt_limit) {
1138
0
            azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);
1139
0
        }
1140
0
    }
1141
1142
0
    info->active_upload = FLB_FALSE;
1143
1144
0
    if (out_buf) {
1145
0
        flb_free(out_buf);
1146
0
    }
1147
1148
0
    cfl_sds_destroy(file_path);
1149
0
    cfl_sds_destroy(file_destination);
1150
1151
0
    flb_sched_timer_cb_coro_return();
1152
0
}
1153
1154
static int azb_timer_create(struct flb_azure_blob *ctx)
1155
0
{
1156
0
    int ret;
1157
0
    int64_t ms;
1158
0
    struct flb_sched *sched;
1159
1160
0
    sched = flb_sched_ctx_get();
1161
1162
    /* convert from seconds to milliseconds (scheduler needs ms) */
1163
0
    ms = ctx->upload_parts_timeout * 1000;
1164
1165
0
    ret = flb_sched_timer_coro_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, ms,
1166
0
                                         cb_azb_blob_file_upload, ctx, NULL);
1167
0
    if (ret == -1) {
1168
0
        flb_plg_error(ctx->ins, "failed to create upload timer");
1169
0
        return -1;
1170
0
    }
1171
1172
0
    return 0;
1173
0
}
1174
1175
/**
1176
 * Azure Blob Storage ingestion callback function
1177
 * This function handles the upload of data chunks to Azure Blob Storage with retry mechanism
1178
 * @param config: Fluent Bit configuration
1179
 * @param data: Azure Blob context data
1180
 */
1181
0
static void cb_azure_blob_ingest(struct flb_config *config, void *data) {
1182
    /* Initialize context and file handling variables */
1183
0
    struct flb_azure_blob *ctx = data;
1184
0
    struct azure_blob_file *file = NULL;
1185
0
    struct flb_fstore_file *fsf;
1186
0
    char *buffer = NULL;
1187
0
    size_t buffer_size = 0;
1188
0
    struct mk_list *tmp;
1189
0
    struct mk_list *head;
1190
0
    int ret;
1191
0
    time_t now;
1192
0
    flb_sds_t payload;
1193
0
    flb_sds_t tag_sds;
1194
1195
    /* Retry mechanism configuration */
1196
0
    int retry_count;
1197
0
    int backoff_time;
1198
0
    const int max_backoff_time = 64;  /* Maximum backoff time in seconds */
1199
1200
    /* Log entry point and container information */
1201
0
    flb_plg_debug(ctx->ins, "Running upload timer callback (cb_azure_blob_ingest)..");
1202
1203
    /* Initialize jitter for retry mechanism */
1204
0
    srand(time(NULL));
1205
0
    now = time(NULL);
1206
1207
    /* Iterate through all chunks in the active stream */
1208
0
    mk_list_foreach_safe(head, tmp, &ctx->stream_active->files) {
1209
0
        fsf = mk_list_entry(head, struct flb_fstore_file, _head);
1210
0
        file = fsf->data;
1211
1212
        /* Debug logging for current file processing */
1213
0
        flb_plg_debug(ctx->ins, "Iterating files inside upload timer callback (cb_azure_blob_ingest).. %s",
1214
0
                      file->fsf->name);
1215
1216
        /* Skip if chunk hasn't timed out yet */
1217
0
        if (now < (file->create_time + ctx->upload_timeout + ctx->retry_time)) {
1218
0
            continue;
1219
0
        }
1220
1221
        /* Skip if file is already being processed */
1222
0
        flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: Before file locked check %s", file->fsf->name);
1223
0
        if (file->locked == FLB_TRUE) {
1224
0
            continue;
1225
0
        }
1226
1227
        /* Initialize retry mechanism parameters */
1228
0
        retry_count = 0;
1229
0
        backoff_time = 2;  /* Initial backoff time in seconds */
1230
1231
        /* Retry loop for upload attempts */
1232
0
        while (retry_count < ctx->scheduler_max_retries) {
1233
            /* Construct request buffer for upload */
1234
0
            flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: Before construct_request_buffer %s", file->fsf->name);
1235
0
            ret = construct_request_buffer(ctx, NULL, file, &buffer, &buffer_size);
1236
1237
            /* Handle request buffer construction failure */
1238
0
            if (ret < 0) {
1239
0
                flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Could not construct request buffer for %s",
1240
0
                              file->fsf->name);
1241
0
                retry_count++;
1242
1243
                /* Implement exponential backoff with jitter */
1244
0
                int jitter = rand() % backoff_time;
1245
0
                flb_plg_warn(ctx->ins, "cb_azure_blob_ingest :: failure in construct_request_buffer :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
1246
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
1247
0
                sleep(backoff_time + jitter);
1248
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time;
1249
0
                continue;
1250
0
            }
1251
1252
            /* Create payload and tags for blob upload */
1253
0
            payload = flb_sds_create_len(buffer, buffer_size);
1254
0
            tag_sds = flb_sds_create(fsf->meta_buf);
1255
0
            flb_plg_debug(ctx->ins, "cb_azure_blob_ingest ::: tag of the file %s", tag_sds);
1256
1257
            /* Attempt to send blob */
1258
0
            ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype , (char *) tag_sds,0, (char *) tag_sds,
1259
0
                            flb_sds_len(tag_sds), payload, flb_sds_len(payload));
1260
1261
            /* Handle blob creation if necessary */
1262
0
            if (ret == CREATE_BLOB) {
1263
0
                ret = create_blob(ctx, tag_sds);
1264
0
                if (ret == FLB_OK) {
1265
0
                    ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype, (char *) tag_sds, 0, (char *) tag_sds,
1266
0
                                    flb_sds_len(tag_sds), payload, flb_sds_len(payload));
1267
0
                }
1268
0
            }
1269
1270
            /* Handle blob send failure */
1271
0
            if (ret != FLB_OK) {
1272
                /* Clean up resources and update failure count */
1273
0
                flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Failed to ingest data to Azure Blob Storage (attempt %d of %d)",
1274
0
                              retry_count + 1, ctx->scheduler_max_retries);
1275
0
                flb_free(buffer);
1276
0
                flb_sds_destroy(payload);
1277
0
                flb_sds_destroy(tag_sds);
1278
1279
0
                if (file) {
1280
0
                    azure_blob_store_file_unlock(file);
1281
0
                    file->failures += 1;
1282
0
                }
1283
1284
0
                retry_count++;
1285
1286
                /* Implement exponential backoff with jitter for retry */
1287
0
                int jitter = rand() % backoff_time;
1288
0
                flb_plg_warn(ctx->ins, "cb_azure_blob_ingest :: error sending blob :: Retrying in %d seconds (attempt %d of %d) with jitter %d for file %s",
1289
0
                             backoff_time + jitter, retry_count, ctx->scheduler_max_retries, jitter, file->fsf->name);
1290
0
                sleep(backoff_time + jitter);
1291
0
                backoff_time = (backoff_time * 2 < max_backoff_time) ? backoff_time * 2 : max_backoff_time;
1292
0
                continue;
1293
0
            }
1294
1295
            /* Handle successful upload */
1296
0
            ret = azure_blob_store_file_delete(ctx, file);
1297
0
            if (ret == 0) {
1298
0
                flb_plg_debug(ctx->ins, "cb_azure_blob_ingest :: deleted successfully ingested file %s", fsf->name);
1299
0
            }
1300
0
            else {
1301
0
                flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: failed to delete ingested file %s", fsf->name);
1302
0
                if (file) {
1303
0
                    azure_blob_store_file_unlock(file);
1304
0
                    file->failures += 1;
1305
0
                }
1306
0
            }
1307
1308
            /* Clean up resources */
1309
0
            flb_free(buffer);
1310
0
            flb_sds_destroy(payload);
1311
0
            flb_sds_destroy(tag_sds);
1312
0
            break;
1313
0
        }
1314
1315
        /* Ensure file is unlocked if max retries reached */
1316
0
        if (retry_count >= ctx->scheduler_max_retries) {
1317
0
            flb_plg_error(ctx->ins, "cb_azure_blob_ingest :: Max retries reached for file :: attempting to delete/marking inactive %s",
1318
0
                          file->fsf->name);
1319
0
            if (ctx->delete_on_max_upload_error){
1320
0
                azure_blob_store_file_delete(ctx, file);
1321
0
            }
1322
0
            else {
1323
0
                azure_blob_store_file_inactive(ctx, file);
1324
0
            }
1325
0
        }
1326
1327
0
        flb_plg_debug(ctx->ins, "Exited upload timer callback (cb_azure_blob_ingest)..");
1328
0
    }
1329
0
}
1330
1331
1332
static int ingest_all_chunks(struct flb_azure_blob *ctx, struct flb_config *config)
1333
0
{
1334
0
    struct azure_blob_file *chunk;
1335
0
    struct mk_list *tmp;
1336
0
    struct mk_list *head;
1337
0
    struct mk_list *f_head;
1338
0
    struct flb_fstore_file *fsf;
1339
0
    struct flb_fstore_stream *fs_stream;
1340
0
    flb_sds_t payload = NULL;
1341
0
    char *buffer = NULL;
1342
0
    size_t buffer_size;
1343
0
    int ret;
1344
0
    flb_sds_t tag_sds;
1345
1346
0
    mk_list_foreach(head, &ctx->fs->streams) {
1347
        /* skip multi upload stream */
1348
0
        fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
1349
0
        if (fs_stream == ctx->stream_upload) {
1350
0
            continue;
1351
0
        }
1352
1353
0
        mk_list_foreach_safe(f_head, tmp, &fs_stream->files) {
1354
0
            fsf = mk_list_entry(f_head, struct flb_fstore_file, _head);
1355
0
            chunk = fsf->data;
1356
1357
            /* Locked chunks are being processed, skip */
1358
0
            if (chunk->locked == FLB_TRUE) {
1359
0
                continue;
1360
0
            }
1361
1362
0
            if (chunk->failures >= ctx->scheduler_max_retries) {
1363
0
                flb_plg_warn(ctx->ins,
1364
0
                             "ingest_all_chunks :: Chunk for tag %s failed to send %i times, "
1365
0
                             "will not retry",
1366
0
                             (char *) fsf->meta_buf, ctx->scheduler_max_retries);
1367
0
                if (ctx->delete_on_max_upload_error){
1368
0
                    azure_blob_store_file_delete(ctx, chunk);
1369
0
                }
1370
0
                else {
1371
0
                    azure_blob_store_file_inactive(ctx, chunk);
1372
0
                }
1373
0
                continue;
1374
0
            }
1375
1376
0
            ret = construct_request_buffer(ctx, NULL, chunk,
1377
0
                                           &buffer, &buffer_size);
1378
0
            if (ret < 0) {
1379
0
                flb_plg_error(ctx->ins,
1380
0
                              "ingest_all_chunks :: Could not construct request buffer for %s",
1381
0
                              chunk->file_path);
1382
0
                return -1;
1383
0
            }
1384
1385
0
            payload = flb_sds_create_len(buffer, buffer_size);
1386
0
            tag_sds = flb_sds_create(fsf->meta_buf);
1387
0
            flb_free(buffer);
1388
1389
0
            ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload));
1390
1391
0
            if (ret == CREATE_BLOB) {
1392
0
                ret = create_blob(ctx, tag_sds);
1393
0
                if (ret == FLB_OK) {
1394
0
                    ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload));
1395
0
                }
1396
0
            }
1397
1398
0
            if (ret != FLB_OK) {
1399
0
                flb_plg_error(ctx->ins, "ingest_all_chunks :: Failed to ingest data to Azure Blob Storage");
1400
0
                if (chunk){
1401
0
                    azure_blob_store_file_unlock(chunk);
1402
0
                    chunk->failures += 1;
1403
0
                }
1404
0
                flb_sds_destroy(tag_sds);
1405
0
                flb_sds_destroy(payload);
1406
0
                return -1;
1407
0
            }
1408
1409
0
            flb_sds_destroy(tag_sds);
1410
0
            flb_sds_destroy(payload);
1411
1412
            /* data was sent successfully- delete the local buffer */
1413
0
            azure_blob_store_file_cleanup(ctx, chunk);
1414
0
        }
1415
0
    }
1416
1417
0
    return 0;
1418
0
}
1419
1420
static void flush_init(void *out_context, struct flb_config *config)
1421
0
{
1422
0
    int ret;
1423
0
    struct flb_azure_blob *ctx = out_context;
1424
0
    struct flb_sched *sched;
1425
1426
    /* clean up any old buffers found on startup */
1427
0
    if (ctx->has_old_buffers == FLB_TRUE) {
1428
0
        flb_plg_info(ctx->ins,
1429
0
                     "Sending locally buffered data from previous "
1430
0
                     "executions to azure blob; buffer=%s",
1431
0
                     ctx->fs->root_path);
1432
0
        ctx->has_old_buffers = FLB_FALSE;
1433
0
        ret = ingest_all_chunks(ctx, config);
1434
0
        if (ret < 0) {
1435
0
            ctx->has_old_buffers = FLB_TRUE;
1436
0
            flb_plg_error(ctx->ins,
1437
0
                          "Failed to send locally buffered data left over "
1438
0
                          "from previous executions; will retry. Buffer=%s",
1439
0
                          ctx->fs->root_path);
1440
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1441
0
        }
1442
0
    }
1443
0
    else {
1444
0
        flb_plg_debug(ctx->ins,
1445
0
                      "Did not find any local buffered data from previous "
1446
0
                      "executions to azure blob; buffer=%s",
1447
0
                      ctx->fs->root_path);
1448
0
    }
1449
1450
    /*
1451
    * create a timer that will run periodically and check if uploads
1452
    * are ready for completion
1453
    * this is created once on the first flush
1454
    */
1455
0
    if (ctx->timer_created == FLB_FALSE) {
1456
0
        flb_plg_debug(ctx->ins,
1457
0
                      "Creating upload timer with frequency %ds",
1458
0
                      ctx->timer_ms / 1000);
1459
1460
0
        sched = flb_sched_ctx_get();
1461
1462
0
        ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
1463
0
                                        ctx->timer_ms, cb_azure_blob_ingest, ctx, NULL);
1464
0
        if (ret == -1) {
1465
0
            flb_plg_error(ctx->ins, "Failed to create upload timer");
1466
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1467
0
        }
1468
0
        ctx->timer_created = FLB_TRUE;
1469
0
    }
1470
0
}
1471
1472
static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk,
1473
                                struct flb_output_flush *out_flush,
1474
                                struct flb_input_instance *i_ins,
1475
                                void *out_context,
1476
                                struct flb_config *config)
1477
0
{
1478
0
    int ret = FLB_OK;
1479
0
    struct flb_azure_blob *ctx = out_context;
1480
0
    (void) i_ins;
1481
0
    (void) config;
1482
0
    flb_sds_t json = NULL;
1483
0
    size_t json_size;
1484
1485
0
    if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
1486
0
        if (ctx->buffering_enabled == FLB_TRUE) {
1487
0
            size_t tag_len;
1488
0
            struct azure_blob_file *upload_file = NULL;
1489
0
            int upload_timeout_check = FLB_FALSE;
1490
0
            int total_file_size_check = FLB_FALSE;
1491
1492
0
            char *final_payload = NULL;
1493
0
            size_t final_payload_size = 0;
1494
0
            flb_sds_t tag_name = NULL;
1495
1496
0
            flb_plg_trace(ctx->ins, "flushing bytes for event tag %s and size %zu", event_chunk->tag, event_chunk->size);
1497
1498
0
            if (ctx->unify_tag == FLB_TRUE) {
1499
0
                tag_name = flb_sds_create("fluentbit-buffer-file-unify-tag.log");
1500
0
            }
1501
0
            else {
1502
0
                tag_name = event_chunk->tag;
1503
0
            }
1504
0
            tag_len = flb_sds_len(tag_name);
1505
1506
0
            flush_init(ctx, config);
1507
            /* Reformat msgpack to JSON payload */
1508
0
            ret = azure_blob_format(config, i_ins, ctx, NULL, FLB_EVENT_TYPE_LOGS, tag_name, tag_len, event_chunk->data, event_chunk->size, (void **)&json, &json_size);
1509
0
            if (ret != 0) {
1510
0
                flb_plg_error(ctx->ins, "cannot reformat data into json");
1511
0
                goto error;
1512
0
            }
1513
1514
            /* Get a file candidate matching the given 'tag' */
1515
0
            upload_file = azure_blob_store_file_get(ctx, tag_name, tag_len);
1516
1517
            /* Handle upload timeout or file size limits */
1518
0
            if (upload_file != NULL) {
1519
0
                if (upload_file->failures >= ctx->scheduler_max_retries) {
1520
0
                    flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not retry", event_chunk->tag, ctx->scheduler_max_retries);
1521
0
                    if (ctx->delete_on_max_upload_error) {
1522
0
                        azure_blob_store_file_delete(ctx, upload_file);
1523
0
                    } else {
1524
0
                        azure_blob_store_file_inactive(ctx, upload_file);
1525
0
                    }
1526
0
                    upload_file = NULL;
1527
0
                } else if (time(NULL) > (upload_file->create_time + ctx->upload_timeout)) {
1528
0
                    upload_timeout_check = FLB_TRUE;
1529
0
                } else if (upload_file->size + json_size > ctx->file_size) {
1530
0
                    total_file_size_check = FLB_TRUE;
1531
0
                }
1532
0
            }
1533
1534
0
            if (upload_file != NULL && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
1535
0
                flb_plg_debug(ctx->ins, "uploading file %s with size %zu", upload_file->fsf->name, upload_file->size);
1536
1537
                /* Construct the payload for upload */
1538
0
                ret = construct_request_buffer(ctx, json, upload_file, &final_payload, &final_payload_size);
1539
0
                if (ret != 0) {
1540
0
                    flb_plg_error(ctx->ins, "error constructing request buffer for %s", event_chunk->tag);
1541
0
                    flb_sds_destroy(json);
1542
0
                    upload_file->failures += 1;
1543
0
                    FLB_OUTPUT_RETURN(FLB_RETRY);
1544
0
                }
1545
1546
                /*
1547
                * Azure blob requires a container. The following function validate that the container exists,
1548
                * otherwise it will be created. Note that that container name is specified by the user
1549
                * in the configuration file.
1550
                *
1551
                * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming
1552
                */
1553
0
                ret = ensure_container(ctx);
1554
0
                if (ret == FLB_FALSE) {
1555
0
                    FLB_OUTPUT_RETURN(FLB_RETRY);
1556
0
                }
1557
1558
                /* Upload the file */
1559
0
                ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size);
1560
1561
0
                if (ret == CREATE_BLOB) {
1562
0
                    ret = create_blob(ctx, upload_file->fsf->name);
1563
0
                    if (ret == FLB_OK) {
1564
0
                        ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size);
1565
0
                    }
1566
0
                }
1567
1568
0
                if (ret == FLB_OK) {
1569
0
                    flb_plg_debug(ctx->ins, "uploaded file %s successfully", upload_file->fsf->name);
1570
0
                    azure_blob_store_file_delete(ctx, upload_file);
1571
0
                    goto cleanup;
1572
0
                }
1573
0
                else {
1574
0
                    flb_plg_error(ctx->ins, "error uploading file %s", upload_file->fsf->name);
1575
0
                    if (upload_file) {
1576
0
                        azure_blob_store_file_unlock(upload_file);
1577
0
                        upload_file->failures += 1;
1578
0
                    }
1579
0
                    goto error;
1580
0
                }
1581
0
            }
1582
0
            else {
1583
                /* Buffer current chunk */
1584
0
                ret = azure_blob_store_buffer_put(ctx, upload_file, tag_name, tag_len, json, json_size);
1585
0
                if (ret == 0) {
1586
0
                    flb_plg_debug(ctx->ins, "buffered chunk %s", event_chunk->tag);
1587
0
                    goto cleanup;
1588
0
                }
1589
0
                else {
1590
0
                    flb_plg_error(ctx->ins, "failed to buffer chunk %s", event_chunk->tag);
1591
0
                    goto error;
1592
0
                }
1593
0
            }
1594
1595
0
            cleanup:
1596
0
            if (json) {
1597
0
                flb_sds_destroy(json);
1598
0
            }
1599
0
            if (tag_name && ctx->unify_tag == FLB_TRUE) {
1600
0
                flb_sds_destroy(tag_name);
1601
0
            }
1602
0
            if (final_payload) {
1603
0
                flb_free(final_payload);
1604
0
            }
1605
0
            FLB_OUTPUT_RETURN(FLB_OK);
1606
1607
0
            error:
1608
0
            if (json) {
1609
0
                flb_sds_destroy(json);
1610
0
            }
1611
0
            if (tag_name && ctx->unify_tag == FLB_TRUE) {
1612
0
                flb_sds_destroy(tag_name);
1613
0
            }
1614
0
            if (final_payload) {
1615
0
                flb_free(final_payload);
1616
0
            }
1617
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1618
0
        }
1619
0
        else {
1620
1621
            /*
1622
            * Azure blob requires a container. The following function validate that the container exists,
1623
            * otherwise it will be created. Note that that container name is specified by the user
1624
            * in the configuration file.
1625
            *
1626
            * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming
1627
            */
1628
0
            ret = ensure_container(ctx);
1629
0
            if (ret == FLB_FALSE) {
1630
0
                FLB_OUTPUT_RETURN(FLB_RETRY);
1631
0
            }
1632
1633
0
            ret = azure_blob_format(config, i_ins, ctx, NULL, FLB_EVENT_TYPE_LOGS,(char *) event_chunk->tag, flb_sds_len(event_chunk->tag), (char *) event_chunk->data ,event_chunk->size, (void **)&json, &json_size);
1634
0
            if (ret != 0) {
1635
0
                flb_plg_error(ctx->ins, "cannot reformat data into json");
1636
0
                ret = FLB_RETRY;
1637
0
            }
1638
            /* Buffering mode is disabled, proceed with regular flow */
1639
0
            ret = send_blob(config, i_ins, ctx,
1640
0
                            FLB_EVENT_TYPE_LOGS,
1641
0
                            ctx->btype, /* blob type per user configuration  */
1642
0
                            (char *) event_chunk->tag,  /* use tag as 'name' */
1643
0
                            0,  /* part id */
1644
0
                            (char *) event_chunk->tag, flb_sds_len(event_chunk->tag),
1645
0
                            json, json_size);
1646
1647
0
            if (ret == CREATE_BLOB) {
1648
0
                ret = create_blob(ctx, event_chunk->tag);
1649
0
                if (ret == FLB_OK) {
1650
0
                    ret = send_blob(config, i_ins, ctx,
1651
0
                                    FLB_EVENT_TYPE_LOGS,
1652
0
                                    ctx->btype, /* blob type per user configuration  */
1653
0
                                    (char *) event_chunk->tag,  /* use tag as 'name' */
1654
0
                                    0,  /* part id */
1655
0
                                    (char *) event_chunk->tag,  /* use tag as 'name' */
1656
0
                                    flb_sds_len(event_chunk->tag),
1657
0
                                    json, json_size);
1658
0
                }
1659
0
            }
1660
0
        }
1661
0
    }
1662
0
    else if (event_chunk->type == FLB_EVENT_TYPE_BLOBS) {
1663
        /*
1664
         * For Blob types, we use the flush callback to enqueue the file, then cb_azb_blob_file_upload()
1665
         * takes care of the rest like reading the file and uploading it to Azure.
1666
         */
1667
0
        ret = process_blob_chunk(ctx, event_chunk);
1668
0
        if (ret == -1) {
1669
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1670
0
        }
1671
0
    }
1672
1673
0
    if (json){
1674
0
        flb_sds_destroy(json);
1675
0
    }
1676
1677
    /* FLB_RETRY, FLB_OK, FLB_ERROR */
1678
0
    FLB_OUTPUT_RETURN(ret);
1679
0
}
1680
1681
static int cb_azure_blob_exit(void *data, struct flb_config *config)
1682
0
{
1683
0
    struct flb_azure_blob *ctx = data;
1684
0
    int ret = -1;
1685
1686
0
    if (!ctx) {
1687
0
        return 0;
1688
0
    }
1689
1690
0
    if (ctx->buffering_enabled == FLB_TRUE){
1691
0
        if (azure_blob_store_has_data(ctx) == FLB_TRUE) {
1692
0
            flb_plg_info(ctx->ins, "Sending all locally buffered data to Azure Blob");
1693
0
            ret = ingest_all_chunks(ctx, config);
1694
0
            if (ret < 0) {
1695
0
                flb_plg_error(ctx->ins, "Could not send all chunks on exit");
1696
0
            }
1697
0
        }
1698
0
        azure_blob_store_exit(ctx);
1699
0
    }
1700
1701
0
    if (ctx->u) {
1702
0
        flb_upstream_destroy(ctx->u);
1703
0
        ctx->u = NULL;
1704
0
    }
1705
1706
0
    flb_azure_blob_conf_destroy(ctx);
1707
0
    return 0;
1708
0
}
1709
1710
/* worker initialization, used for our internal timers */
1711
static int cb_worker_init(void *data, struct flb_config *config)
1712
0
{
1713
0
    int ret;
1714
0
    struct worker_info *info;
1715
0
    struct flb_azure_blob *ctx = data;
1716
1717
0
    flb_plg_info(ctx->ins, "initializing worker");
1718
1719
0
    info = FLB_TLS_GET(worker_info);
1720
0
    if (!info) {
1721
        /* initialize worker global info */
1722
0
        info = flb_malloc(sizeof(struct worker_info));
1723
0
        if (!info) {
1724
0
            flb_errno();
1725
0
            return -1;
1726
0
        }
1727
0
        info->active_upload = FLB_FALSE;
1728
0
        FLB_TLS_SET(worker_info, info);
1729
0
    }
1730
1731
0
    ret = azb_timer_create(ctx);
1732
0
    if (ret == -1) {
1733
0
        flb_plg_error(ctx->ins, "failed to create upload timer");
1734
0
        return -1;
1735
0
    }
1736
1737
0
    return 0;
1738
0
}
1739
1740
/* worker teardown */
1741
static int cb_worker_exit(void *data, struct flb_config *config)
1742
0
{
1743
0
    struct worker_info *info;
1744
0
    struct flb_azure_blob *ctx = data;
1745
1746
0
    flb_plg_info(ctx->ins, "initializing worker");
1747
1748
0
    info = FLB_TLS_GET(worker_info);
1749
0
    if (info != NULL) {
1750
0
        flb_free(info);
1751
0
        FLB_TLS_SET(worker_info, NULL);
1752
0
    }
1753
1754
0
    return 0;
1755
0
}
1756
1757
/* Configuration properties map */
1758
static struct flb_config_map config_map[] = {
1759
    {
1760
     FLB_CONFIG_MAP_STR, "account_name", NULL,
1761
     0, FLB_TRUE, offsetof(struct flb_azure_blob, account_name),
1762
     "Azure account name (mandatory)"
1763
    },
1764
1765
    {
1766
     FLB_CONFIG_MAP_STR, "container_name", NULL,
1767
     0, FLB_TRUE, offsetof(struct flb_azure_blob, container_name),
1768
     "Container name (mandatory)"
1769
    },
1770
1771
    {
1772
     FLB_CONFIG_MAP_BOOL, "auto_create_container", "true",
1773
     0, FLB_TRUE, offsetof(struct flb_azure_blob, auto_create_container),
1774
     "Auto create container if it don't exists"
1775
    },
1776
1777
    {
1778
     FLB_CONFIG_MAP_STR, "blob_type", "appendblob",
1779
     0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_type),
1780
     "Set the block type: appendblob or blockblob"
1781
    },
1782
1783
    {
1784
     FLB_CONFIG_MAP_STR, "compress", NULL,
1785
     0, FLB_FALSE, 0,
1786
     "Set payload compression in network transfer. Option available is 'gzip'"
1787
    },
1788
1789
    {
1790
     FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
1791
     0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
1792
     "Enable block blob GZIP compression in the final blob file. This option is "
1793
     "not compatible with 'appendblob' block type"
1794
    },
1795
1796
    {
1797
     FLB_CONFIG_MAP_BOOL, "emulator_mode", "false",
1798
     0, FLB_TRUE, offsetof(struct flb_azure_blob, emulator_mode),
1799
     "Use emulator mode, enable it if you want to use Azurite"
1800
    },
1801
1802
    {
1803
     FLB_CONFIG_MAP_STR, "shared_key", NULL,
1804
     0, FLB_TRUE, offsetof(struct flb_azure_blob, shared_key),
1805
     "Azure shared key"
1806
    },
1807
1808
    {
1809
     FLB_CONFIG_MAP_STR, "endpoint", NULL,
1810
     0, FLB_TRUE, offsetof(struct flb_azure_blob, endpoint),
1811
     "Custom full URL endpoint to use an emulator"
1812
    },
1813
1814
    {
1815
     FLB_CONFIG_MAP_STR, "path", NULL,
1816
     0, FLB_TRUE, offsetof(struct flb_azure_blob, path),
1817
     "Set a path for your blob"
1818
    },
1819
1820
    {
1821
     FLB_CONFIG_MAP_STR, "date_key", "@timestamp",
1822
     0, FLB_TRUE, offsetof(struct flb_azure_blob, date_key),
1823
     "Name of the key that will have the record timestamp"
1824
    },
1825
1826
    {
1827
     FLB_CONFIG_MAP_STR, "auth_type", "key",
1828
     0, FLB_TRUE, offsetof(struct flb_azure_blob, auth_type),
1829
     "Set the auth type: key or sas"
1830
    },
1831
1832
    {
1833
     FLB_CONFIG_MAP_STR, "sas_token", NULL,
1834
     0, FLB_TRUE, offsetof(struct flb_azure_blob, sas_token),
1835
     "Azure Blob SAS token"
1836
    },
1837
1838
    {
1839
     FLB_CONFIG_MAP_STR, "database_file", NULL,
1840
     0, FLB_TRUE, offsetof(struct flb_azure_blob, database_file),
1841
     "Absolute path to a database file to be used to store blob files contexts"
1842
    },
1843
1844
    {
1845
     FLB_CONFIG_MAP_SIZE, "part_size", "25M",
1846
     0, FLB_TRUE, offsetof(struct flb_azure_blob, part_size),
1847
     "Size of each part when uploading blob files"
1848
    },
1849
1850
    {
1851
     FLB_CONFIG_MAP_INT, "file_delivery_attempt_limit", "1",
1852
     0, FLB_TRUE, offsetof(struct flb_azure_blob, file_delivery_attempt_limit),
1853
     "File delivery attempt limit"
1854
    },
1855
1856
    {
1857
     FLB_CONFIG_MAP_INT, "part_delivery_attempt_limit", "1",
1858
     0, FLB_TRUE, offsetof(struct flb_azure_blob, part_delivery_attempt_limit),
1859
     "File part delivery attempt limit"
1860
    },
1861
1862
    {
1863
     FLB_CONFIG_MAP_TIME, "upload_parts_timeout", "10M",
1864
     0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_parts_timeout),
1865
     "Timeout to upload parts of a blob file"
1866
    },
1867
1868
    {
1869
     FLB_CONFIG_MAP_TIME, "upload_part_freshness_limit", "6D",
1870
     0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_parts_freshness_threshold),
1871
     "Maximum lifespan of an uncommitted file part"
1872
    },
1873
1874
    {
1875
     FLB_CONFIG_MAP_STR, "configuration_endpoint_url", NULL,
1876
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_url),
1877
     "Configuration endpoint URL"
1878
    },
1879
1880
    {
1881
     FLB_CONFIG_MAP_STR, "configuration_endpoint_username", NULL,
1882
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_username),
1883
     "Configuration endpoint basic authentication username"
1884
    },
1885
1886
    {
1887
     FLB_CONFIG_MAP_STR, "configuration_endpoint_password", NULL,
1888
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_password),
1889
     "Configuration endpoint basic authentication password"
1890
    },
1891
1892
    {
1893
     FLB_CONFIG_MAP_STR, "configuration_endpoint_bearer_token", NULL,
1894
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_bearer_token),
1895
     "Configuration endpoint bearer token"
1896
    },
1897
1898
    {
1899
     FLB_CONFIG_MAP_BOOL, "buffering_enabled", "false",
1900
     0, FLB_TRUE, offsetof(struct flb_azure_blob, buffering_enabled),
1901
     "Enable buffering into disk before ingesting into Azure Blob"
1902
    },
1903
1904
    {
1905
     FLB_CONFIG_MAP_STR, "buffer_dir", "/tmp/fluent-bit/azure-blob/",
1906
     0, FLB_TRUE, offsetof(struct flb_azure_blob, buffer_dir),
1907
     "Specifies the location of directory where the buffered data will be stored"
1908
    },
1909
1910
    {
1911
     FLB_CONFIG_MAP_TIME, "upload_timeout", "30m",
1912
     0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_timeout),
1913
     "Optionally specify a timeout for uploads. "
1914
           "Fluent Bit will start ingesting buffer files which have been created more than x minutes and haven't reached upload_file_size limit yet"
1915
           "Default is 30m."
1916
    },
1917
1918
    {
1919
     FLB_CONFIG_MAP_SIZE, "upload_file_size", "200M",
1920
     0, FLB_TRUE, offsetof(struct flb_azure_blob, file_size),
1921
     "Specifies the size of files to be uploaded in MBs. Default is 200MB"
1922
    },
1923
1924
    {
1925
     FLB_CONFIG_MAP_STR, "azure_blob_buffer_key", "key",
1926
     0, FLB_TRUE, offsetof(struct flb_azure_blob, azure_blob_buffer_key),
1927
     "Set the azure blob buffer key which needs to be specified when using multiple instances of azure blob output plugin and buffering is enabled"
1928
    },
1929
1930
    {
1931
     FLB_CONFIG_MAP_SIZE, "store_dir_limit_size", "8G",
1932
     0, FLB_TRUE, offsetof(struct flb_azure_blob, store_dir_limit_size),
1933
     "Set the max size of the buffer directory. Default is 8GB"
1934
    },
1935
1936
    {
1937
     FLB_CONFIG_MAP_BOOL, "buffer_file_delete_early", "false",
1938
     0, FLB_TRUE, offsetof(struct flb_azure_blob, buffer_file_delete_early),
1939
     "Whether to delete the buffered file early after successful blob creation. Default is false"
1940
    },
1941
1942
    { 
1943
     FLB_CONFIG_MAP_INT, "blob_uri_length", "64",
1944
     0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_uri_length),
1945
     "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64"
1946
    },
1947
1948
    {
1949
     FLB_CONFIG_MAP_BOOL, "unify_tag", "false",
1950
     0, FLB_TRUE, offsetof(struct flb_azure_blob, unify_tag),
1951
     "Whether to create a single buffer file when buffering mode is enabled. Default is false"
1952
    },
1953
1954
    {
1955
     FLB_CONFIG_MAP_INT, "scheduler_max_retries", "3",
1956
     0, FLB_TRUE, offsetof(struct flb_azure_blob, scheduler_max_retries),
1957
     "Maximum number of retries for the scheduler send blob. Default is 3"
1958
    },
1959
1960
    {
1961
     FLB_CONFIG_MAP_BOOL, "delete_on_max_upload_error", "false",
1962
     0, FLB_TRUE, offsetof(struct flb_azure_blob, delete_on_max_upload_error),
1963
     "Whether to delete the buffer file on maximum upload errors. Default is false"
1964
    },
1965
1966
    {
1967
     FLB_CONFIG_MAP_TIME, "io_timeout", "60s",0, FLB_TRUE, offsetof(struct flb_azure_blob, io_timeout),
1968
     "HTTP IO timeout. Default is 60s"
1969
    },
1970
1971
    /* EOF */
1972
    {0}
1973
};
1974
1975
/* Plugin registration */
1976
struct flb_output_plugin out_azure_blob_plugin = {
1977
    .name           = "azure_blob",
1978
    .description    = "Azure Blob Storage",
1979
    .cb_init        = cb_azure_blob_init,
1980
    .cb_flush       = cb_azure_blob_flush,
1981
    .cb_exit        = cb_azure_blob_exit,
1982
    .cb_worker_init = cb_worker_init,
1983
    .cb_worker_exit = cb_worker_exit,
1984
1985
    /* Test */
1986
    .test_formatter.callback = azure_blob_format,
1987
1988
    .flags        = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
1989
    .event_type   = FLB_OUTPUT_LOGS | FLB_OUTPUT_BLOBS,
1990
    .config_map   = config_map,
1991
    .workers      = 1,
1992
};