Coverage Report

Created: 2025-01-28 07:34

/src/fluent-bit/plugins/out_azure_blob/azure_blob.c
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_sds.h>
22
#include <fluent-bit/flb_kv.h>
23
#include <fluent-bit/flb_utils.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_config_map.h>
26
#include <fluent-bit/flb_gzip.h>
27
#include <fluent-bit/flb_base64.h>
28
#include <fluent-bit/flb_sqldb.h>
29
#include <fluent-bit/flb_input_blob.h>
30
#include <fluent-bit/flb_log_event_decoder.h>
31
#include <fluent-bit/flb_plugin.h>
32
#include <fluent-bit/flb_notification.h>
33
34
#include <msgpack.h>
35
36
#include "azure_blob.h"
37
#include "azure_blob_db.h"
38
#include "azure_blob_uri.h"
39
#include "azure_blob_conf.h"
40
#include "azure_blob_appendblob.h"
41
#include "azure_blob_blockblob.h"
42
#include "azure_blob_http.h"
43
44
0
#define CREATE_BLOB  1337
45
46
/* thread_local_storage for workers */
47
48
struct worker_info {
49
    int active_upload;
50
};
51
52
FLB_TLS_DEFINE(struct worker_info, worker_info);
53
54
static int azure_blob_format(struct flb_config *config,
55
                             struct flb_input_instance *ins,
56
                             void *plugin_context,
57
                             void *flush_ctx,
58
                             int event_type,
59
                             const char *tag, int tag_len,
60
                             const void *data, size_t bytes,
61
                             void **out_data, size_t *out_size)
62
0
{
63
0
    flb_sds_t out_buf;
64
0
    struct flb_azure_blob *ctx = plugin_context;
65
66
0
    out_buf = flb_pack_msgpack_to_json_format(data, bytes,
67
0
                                              FLB_PACK_JSON_FORMAT_LINES,
68
0
                                              FLB_PACK_JSON_DATE_ISO8601,
69
0
                                              ctx->date_key);
70
0
    if (!out_buf) {
71
0
        return -1;
72
0
    }
73
74
0
    *out_data = out_buf;
75
0
    *out_size = flb_sds_len(out_buf);
76
0
    return 0;
77
0
}
78
79
static int create_blob(struct flb_azure_blob *ctx, char *name)
80
0
{
81
0
    int ret;
82
0
    size_t b_sent;
83
0
    flb_sds_t uri = NULL;
84
0
    struct flb_http_client *c;
85
0
    struct flb_connection *u_conn;
86
87
0
    uri = azb_uri_create_blob(ctx, name);
88
0
    if (!uri) {
89
0
        return FLB_RETRY;
90
0
    }
91
92
    /* Get upstream connection */
93
0
    u_conn = flb_upstream_conn_get(ctx->u);
94
0
    if (!u_conn) {
95
0
        flb_plg_error(ctx->ins,
96
0
                      "cannot create upstream connection for create_append_blob");
97
0
        flb_sds_destroy(uri);
98
0
        return FLB_RETRY;
99
0
    }
100
101
    /* Create HTTP client context */
102
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
103
0
                        uri,
104
0
                        NULL, 0, NULL, 0, NULL, 0);
105
0
    if (!c) {
106
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
107
0
        flb_upstream_conn_release(u_conn);
108
0
        flb_sds_destroy(uri);
109
0
        return FLB_RETRY;
110
0
    }
111
112
    /* Prepare headers and authentication */
113
0
    azb_http_client_setup(ctx, c, -1, FLB_TRUE,
114
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
115
116
    /* Send HTTP request */
117
0
    ret = flb_http_do(c, &b_sent);
118
0
    flb_sds_destroy(uri);
119
120
0
    if (ret == -1) {
121
0
        flb_plg_error(ctx->ins, "error sending append_blob");
122
0
        flb_http_client_destroy(c);
123
0
        flb_upstream_conn_release(u_conn);
124
0
        return FLB_RETRY;
125
0
    }
126
127
0
    if (c->resp.status == 201) {
128
        /* delete "&sig=..." in the c->uri for security */
129
0
        char *p = strstr(c->uri, "&sig=");
130
0
        if (p) {
131
0
            *p = '\0';
132
0
        }
133
0
        flb_plg_info(ctx->ins, "blob created successfully: %s", c->uri);
134
0
    }
135
0
    else {
136
0
        if (c->resp.payload_size > 0) {
137
0
            flb_plg_error(ctx->ins, "http_status=%i cannot create append blob\n%s",
138
0
                          c->resp.status, c->resp.payload);
139
0
        }
140
0
        else {
141
0
            flb_plg_error(ctx->ins, "http_status=%i cannot create append blob",
142
0
                          c->resp.status);
143
0
        }
144
0
        flb_http_client_destroy(c);
145
0
        flb_upstream_conn_release(u_conn);
146
0
        return FLB_RETRY;
147
0
    }
148
149
0
    flb_http_client_destroy(c);
150
0
    flb_upstream_conn_release(u_conn);
151
0
    return FLB_OK;
152
0
}
153
154
static int delete_blob(struct flb_azure_blob *ctx, char *name)
155
0
{
156
0
    int ret;
157
0
    size_t b_sent;
158
0
    flb_sds_t uri = NULL;
159
0
    struct flb_http_client *c;
160
0
    struct flb_connection *u_conn;
161
162
0
    uri = azb_uri_create_blob(ctx, name);
163
0
    if (!uri) {
164
0
        return FLB_RETRY;
165
0
    }
166
167
    /* Get upstream connection */
168
0
    u_conn = flb_upstream_conn_get(ctx->u);
169
0
    if (!u_conn) {
170
0
        flb_plg_error(ctx->ins,
171
0
                      "cannot create upstream connection for create_append_blob");
172
0
        flb_sds_destroy(uri);
173
0
        return FLB_RETRY;
174
0
    }
175
176
    /* Create HTTP client context */
177
0
    c = flb_http_client(u_conn, FLB_HTTP_DELETE,
178
0
                        uri,
179
0
                        NULL, 0, NULL, 0, NULL, 0);
180
0
    if (!c) {
181
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
182
0
        flb_upstream_conn_release(u_conn);
183
0
        flb_sds_destroy(uri);
184
0
        return FLB_RETRY;
185
0
    }
186
187
    /* Prepare headers and authentication */
188
0
    azb_http_client_setup(ctx, c, -1, FLB_TRUE,
189
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
190
191
    /* Send HTTP request */
192
0
    ret = flb_http_do(c, &b_sent);
193
0
    flb_sds_destroy(uri);
194
195
0
    if (ret == -1) {
196
0
        flb_plg_error(ctx->ins, "error sending append_blob");
197
0
        flb_http_client_destroy(c);
198
0
        flb_upstream_conn_release(u_conn);
199
0
        return FLB_RETRY;
200
0
    }
201
202
0
    if (c->resp.status == 201) {
203
        /* delete "&sig=..." in the c->uri for security */
204
0
        char *p = strstr(c->uri, "&sig=");
205
0
        if (p) {
206
0
            *p = '\0';
207
0
        }
208
0
        flb_plg_info(ctx->ins, "blob deleted successfully: %s", c->uri);
209
0
    }
210
0
    else {
211
0
        if (c->resp.payload_size > 0) {
212
0
            flb_plg_error(ctx->ins, "http_status=%i cannot delete append blob\n%s",
213
0
                          c->resp.status, c->resp.payload);
214
0
        }
215
0
        else {
216
0
            flb_plg_error(ctx->ins, "http_status=%i cannot delete append blob",
217
0
                          c->resp.status);
218
0
        }
219
0
        flb_http_client_destroy(c);
220
0
        flb_upstream_conn_release(u_conn);
221
0
        return FLB_RETRY;
222
0
    }
223
224
0
    flb_http_client_destroy(c);
225
0
    flb_upstream_conn_release(u_conn);
226
0
    return FLB_OK;
227
0
}
228
229
static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
230
                          flb_sds_t ref_name,
231
                          flb_sds_t uri,
232
                          flb_sds_t block_id,
233
                          int event_type,
234
                          void *data, size_t bytes)
235
0
{
236
0
    int ret;
237
0
    int compressed = FLB_FALSE;
238
0
    int content_encoding = FLB_FALSE;
239
0
    int content_type = FLB_FALSE;
240
0
    size_t b_sent;
241
0
    void *payload_buf;
242
0
    size_t payload_size;
243
0
    struct flb_http_client *c;
244
0
    struct flb_connection *u_conn;
245
246
    /* Get upstream connection */
247
0
    u_conn = flb_upstream_conn_get(ctx->u);
248
0
    if (!u_conn) {
249
0
        flb_plg_error(ctx->ins,
250
0
                      "cannot create TCP upstream connection");
251
0
        return FLB_RETRY;
252
0
    }
253
254
0
    payload_buf = data;
255
0
    payload_size = bytes;
256
257
    /* Handle compression requests */
258
0
    if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
259
0
        ret = flb_gzip_compress((void *) data, bytes, &payload_buf, &payload_size);
260
0
        if (ret == 0) {
261
0
            compressed = FLB_TRUE;
262
0
        }
263
0
        else {
264
0
            flb_plg_warn(ctx->ins,
265
0
                        "cannot gzip payload, disabling compression");
266
0
            payload_buf = data;
267
0
            payload_size = bytes;
268
0
        }
269
0
    }
270
271
    /* set http header flags */
272
0
    if (ctx->compress_blob == FLB_TRUE) {
273
0
        content_encoding = AZURE_BLOB_CE_NONE;
274
0
        content_type = AZURE_BLOB_CT_GZIP;
275
0
    }
276
0
    else if (compressed == FLB_TRUE) {
277
0
        content_encoding = AZURE_BLOB_CE_GZIP;
278
0
        content_type = AZURE_BLOB_CT_JSON;
279
0
    }
280
281
    /* Create HTTP client context */
282
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
283
0
                        uri,
284
0
                        payload_buf, payload_size, NULL, 0, NULL, 0);
285
0
    if (!c) {
286
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
287
0
        if (compressed == FLB_TRUE) {
288
0
            flb_free(payload_buf);
289
0
        }
290
0
        flb_upstream_conn_release(u_conn);
291
0
        return FLB_RETRY;
292
0
    }
293
294
    /* Prepare headers and authentication */
295
0
    azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
296
0
                          content_type, content_encoding);
297
298
    /* Send HTTP request */
299
0
    ret = flb_http_do(c, &b_sent);
300
301
    /* Release compressed buffer */
302
0
    if (compressed == FLB_TRUE) {
303
0
        flb_free(payload_buf);
304
0
    }
305
306
0
    flb_upstream_conn_release(u_conn);
307
308
    /* Validate HTTP status */
309
0
    if (ret == -1) {
310
0
        flb_plg_error(ctx->ins, "error sending append_blob for %s", ref_name);
311
0
        return FLB_RETRY;
312
0
    }
313
314
0
    if (c->resp.status == 201) {
315
0
        flb_plg_info(ctx->ins, "content uploaded successfully: %s", ref_name);
316
0
        flb_http_client_destroy(c);
317
0
        return FLB_OK;
318
0
    }
319
0
    else if (c->resp.status == 404) {
320
        /* delete "&sig=..." in the c->uri for security */
321
0
        char *p = strstr(c->uri, "&sig=");
322
0
        if (p) {
323
0
            *p = '\0';
324
0
        }
325
326
0
        flb_plg_info(ctx->ins, "blob not found: %s", c->uri);
327
0
        flb_http_client_destroy(c);
328
0
        return CREATE_BLOB;
329
0
    }
330
0
    else if (c->resp.payload_size > 0) {
331
0
        flb_plg_error(ctx->ins, "http_status=%i cannot append content to blob\n%s",
332
0
                      c->resp.status, c->resp.payload);
333
0
        if (strstr(c->resp.payload, "must be 0 for Create Append")) {
334
0
            flb_http_client_destroy(c);
335
0
            return CREATE_BLOB;
336
0
        }
337
0
    }
338
0
    else {
339
0
        flb_plg_error(ctx->ins, "cannot upload %s content to blob (http_status=%i)",
340
0
                      ref_name, c->resp.status);
341
0
    }
342
0
    flb_http_client_destroy(c);
343
344
0
    return FLB_RETRY;
345
0
}
346
347
static int send_blob(struct flb_config *config,
348
                     struct flb_input_instance *i_ins,
349
                     struct flb_azure_blob *ctx,
350
                     int event_type,
351
                     int blob_type, char *name, uint64_t part_id,
352
                     char *tag, int tag_len, void *data, size_t bytes)
353
0
{
354
0
    int ret;
355
0
    uint64_t ms = 0;
356
0
    flb_sds_t uri = NULL;
357
0
    flb_sds_t block_id = NULL;
358
0
    flb_sds_t ref_name = NULL;
359
0
    void *payload_buf = data;
360
0
    size_t payload_size = bytes;
361
362
0
    ref_name = flb_sds_create_size(256);
363
0
    if (!ref_name) {
364
0
        return FLB_RETRY;
365
0
    }
366
367
0
    if (blob_type == AZURE_BLOB_APPENDBLOB) {
368
0
        uri = azb_append_blob_uri(ctx, tag);
369
0
    }
370
0
    else if (blob_type == AZURE_BLOB_BLOCKBLOB) {
371
0
        if (event_type == FLB_EVENT_TYPE_LOGS) {
372
0
            block_id = azb_block_blob_id_logs(&ms);
373
0
            if (!block_id) {
374
0
                flb_plg_error(ctx->ins, "could not generate block id");
375
376
0
                cfl_sds_destroy(ref_name);
377
378
0
                return FLB_RETRY;
379
0
            }
380
0
            uri = azb_block_blob_uri(ctx, tag, block_id, ms);
381
0
            ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms);
382
0
        }
383
0
        else if (event_type == FLB_EVENT_TYPE_BLOBS) {
384
0
            block_id = azb_block_blob_id_blob(ctx, name, part_id);
385
0
            uri = azb_block_blob_uri(ctx, name, block_id, 0);
386
0
            ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id);
387
0
        }
388
0
    }
389
390
0
    if (!uri) {
391
0
        flb_free(block_id);
392
0
        flb_sds_destroy(ref_name);
393
0
        return FLB_RETRY;
394
0
    }
395
396
    /* Logs: Format the data (msgpack -> JSON) */
397
0
    if (event_type == FLB_EVENT_TYPE_LOGS) {
398
0
        ret = azure_blob_format(config, i_ins,
399
0
                                ctx, NULL,
400
0
                                FLB_EVENT_TYPE_LOGS,
401
0
                                tag, tag_len,
402
0
                                data, bytes,
403
0
                                &payload_buf, &payload_size);
404
0
        if (ret != 0) {
405
0
            flb_sds_destroy(uri);
406
0
            flb_free(block_id);
407
0
            flb_sds_destroy(ref_name);
408
0
            return FLB_ERROR;
409
0
        }
410
0
    }
411
0
    else if (event_type == FLB_EVENT_TYPE_BLOBS) {
412
0
        payload_buf = data;
413
0
        payload_size = bytes;
414
0
    }
415
416
0
    ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size);
417
0
    flb_plg_debug(ctx->ins, "http_send_blob()=%i", ret);
418
419
0
    if (ret == FLB_OK) {
420
        /* For Logs type, we need to commit the block right away */
421
0
        if (event_type == FLB_EVENT_TYPE_LOGS) {
422
0
            ret = azb_block_blob_commit_block(ctx, block_id, tag, ms);
423
0
            flb_free(block_id);
424
0
        }
425
0
    }
426
0
    else if (ret == CREATE_BLOB) {
427
0
        ret = create_blob(ctx, name);
428
0
        if (ret == FLB_OK) {
429
0
            ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size);
430
0
        }
431
0
    }
432
0
    flb_sds_destroy(ref_name);
433
434
0
    if (payload_buf != data) {
435
0
        flb_sds_destroy(payload_buf);
436
0
    }
437
438
0
    flb_sds_destroy(uri);
439
0
    flb_free(block_id);
440
441
0
    return ret;
442
0
}
443
444
static int create_container(struct flb_azure_blob *ctx, char *name)
445
0
{
446
0
    int ret;
447
0
    size_t b_sent;
448
0
    flb_sds_t uri;
449
0
    struct flb_http_client *c;
450
0
    struct flb_connection *u_conn;
451
452
    /* Get upstream connection */
453
0
    u_conn = flb_upstream_conn_get(ctx->u);
454
0
    if (!u_conn) {
455
0
        flb_plg_error(ctx->ins,
456
0
                      "cannot create upstream connection for container creation");
457
0
        return FLB_FALSE;
458
0
    }
459
460
    /* URI */
461
0
    uri = azb_uri_ensure_or_create_container(ctx);
462
0
    if (!uri) {
463
0
        flb_upstream_conn_release(u_conn);
464
0
        return FLB_FALSE;
465
0
    }
466
467
    /* Create HTTP client context */
468
0
    c = flb_http_client(u_conn, FLB_HTTP_PUT,
469
0
                        uri,
470
0
                        NULL, 0, NULL, 0, NULL, 0);
471
0
    if (!c) {
472
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
473
0
        flb_upstream_conn_release(u_conn);
474
0
        return FLB_FALSE;
475
0
    }
476
477
    /* Prepare headers and authentication */
478
0
    azb_http_client_setup(ctx, c, -1, FLB_FALSE,
479
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
480
481
    /* Send HTTP request */
482
0
    ret = flb_http_do(c, &b_sent);
483
484
    /* Release URI */
485
0
    flb_sds_destroy(uri);
486
487
    /* Validate http response */
488
0
    if (ret == -1) {
489
0
        flb_plg_error(ctx->ins, "error requesting container creation");
490
0
        flb_http_client_destroy(c);
491
0
        flb_upstream_conn_release(u_conn);
492
0
        return FLB_FALSE;
493
0
    }
494
495
0
    if (c->resp.status == 201) {
496
0
        flb_plg_info(ctx->ins, "container '%s' created sucessfully", name);
497
0
    }
498
0
    else {
499
0
        if (c->resp.payload_size > 0) {
500
0
            flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
501
0
                          name, c->resp.payload);
502
0
        }
503
0
        else {
504
0
            flb_plg_error(ctx->ins, "cannot create container '%s'\n%s",
505
0
                          name, c->resp.payload);
506
0
        }
507
0
        flb_http_client_destroy(c);
508
0
        flb_upstream_conn_release(u_conn);
509
0
        return FLB_FALSE;
510
0
    }
511
512
0
    flb_http_client_destroy(c);
513
0
    flb_upstream_conn_release(u_conn);
514
0
    return FLB_TRUE;
515
0
}
516
517
/*
518
 * Check that the container exists, if it doesn't and the configuration property
519
 * auto_create_container is enabled, it will send a request to create it. If it
520
 * could not be created, it returns FLB_FALSE.
521
 * If auto_create_container is disabled, it will return FLB_TRUE assuming the container
522
 * already exists.
523
 */
524
static int ensure_container(struct flb_azure_blob *ctx)
525
0
{
526
0
    int ret;
527
0
    int status;
528
0
    size_t b_sent;
529
0
    flb_sds_t uri;
530
0
    struct flb_http_client *c;
531
0
    struct flb_connection *u_conn;
532
533
0
    if (!ctx->auto_create_container) {
534
0
        flb_plg_info(ctx->ins, "auto_create_container is disabled, assuming container '%s' already exists",
535
0
                     ctx->container_name);
536
0
        return FLB_TRUE;
537
0
    }
538
539
0
    uri = azb_uri_ensure_or_create_container(ctx);
540
0
    if (!uri) {
541
0
        flb_plg_error(ctx->ins, "cannot create container URI");
542
0
        return FLB_FALSE;
543
0
    }
544
545
    /* Get upstream connection */
546
0
    u_conn = flb_upstream_conn_get(ctx->u);
547
0
    if (!u_conn) {
548
0
        flb_plg_error(ctx->ins,
549
0
                      "cannot create upstream connection for container check");
550
0
        flb_sds_destroy(uri);
551
0
        return FLB_FALSE;
552
0
    }
553
554
    /* Create HTTP client context */
555
0
    c = flb_http_client(u_conn, FLB_HTTP_GET,
556
0
                        uri,
557
0
                        NULL, 0, NULL, 0, NULL, 0);
558
0
    if (!c) {
559
0
        flb_plg_error(ctx->ins, "cannot create HTTP client context");
560
0
        flb_upstream_conn_release(u_conn);
561
0
        return FLB_FALSE;
562
0
    }
563
0
    flb_http_strip_port_from_host(c);
564
565
    /* Prepare headers and authentication */
566
0
    azb_http_client_setup(ctx, c, -1, FLB_FALSE,
567
0
                          AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
568
569
    /* Send HTTP request */
570
0
    ret = flb_http_do(c, &b_sent);
571
0
    flb_sds_destroy(uri);
572
573
0
    if (ret == -1) {
574
0
        flb_plg_error(ctx->ins, "error requesting container properties");
575
0
        flb_upstream_conn_release(u_conn);
576
0
        return FLB_FALSE;
577
0
    }
578
579
0
    status = c->resp.status;
580
0
    flb_http_client_destroy(c);
581
582
    /* Release connection */
583
0
    flb_upstream_conn_release(u_conn);
584
585
    /* Request was successful, validate HTTP status code */
586
0
    if (status == 404) {
587
        /* The container was not found, try to create it */
588
0
        flb_plg_info(ctx->ins, "container '%s' not found, trying to create it",
589
0
                     ctx->container_name);
590
0
        ret = create_container(ctx, ctx->container_name);
591
0
        return ret;
592
0
    }
593
0
    else if (status == 200) {
594
0
        flb_plg_info(ctx->ins, "container '%s' already exists", ctx->container_name);
595
0
        return FLB_TRUE;
596
0
    } 
597
0
    else if (status == 403) {
598
0
        flb_plg_error(ctx->ins, "failed getting container '%s', access denied",
599
0
                      ctx->container_name);
600
0
        return FLB_FALSE;
601
0
    }
602
    
603
0
    flb_plg_error(ctx->ins, "get container request failed, status=%i",
604
0
                  status);
605
606
0
    return FLB_FALSE;
607
0
}
608
609
static int cb_azure_blob_init(struct flb_output_instance *ins,
610
                              struct flb_config *config, void *data)
611
0
{
612
0
    struct flb_azure_blob *ctx = NULL;
613
0
    (void) ins;
614
0
    (void) config;
615
0
    (void) data;
616
617
0
    FLB_TLS_INIT(worker_info);
618
619
0
    ctx = flb_azure_blob_conf_create(ins, config);
620
0
    if (!ctx) {
621
0
        return -1;
622
0
    }
623
624
0
    flb_output_set_http_debug_callbacks(ins);
625
0
    return 0;
626
0
}
627
628
static int blob_chunk_register_parts(struct flb_azure_blob *ctx, uint64_t file_id, size_t total_size)
629
0
{
630
0
    int ret;
631
0
    int64_t parts = 0;
632
0
    int64_t id;
633
0
    size_t offset_start = 0;
634
0
    size_t offset_end = 0;
635
636
    /* generate file parts */
637
0
    while (offset_start < total_size) {
638
0
        offset_end = offset_start + ctx->part_size;
639
640
        /* do not exceed maximum size */
641
0
        if (offset_end > total_size) {
642
0
            offset_end = total_size;
643
0
        }
644
645
        /* insert part */
646
0
        ret = azb_db_file_part_insert(ctx, file_id, parts, offset_start, offset_end, &id);
647
0
        if (ret == -1) {
648
0
            flb_plg_error(ctx->ins, "cannot insert blob file part into database");
649
0
            return -1;
650
0
        }
651
0
        offset_start = offset_end;
652
0
        parts++;
653
0
    }
654
655
0
    return parts;
656
0
}
657
658
static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk *event_chunk)
659
0
{
660
0
    int64_t ret;
661
0
    int64_t file_id;
662
0
    cfl_sds_t file_path = NULL;
663
0
    cfl_sds_t source = NULL;
664
0
    size_t file_size;
665
0
    msgpack_object map;
666
667
0
    struct flb_log_event_decoder log_decoder;
668
0
    struct flb_log_event         log_event;
669
670
0
    if (ctx->db == NULL) {
671
0
        flb_plg_error(ctx->ins, "Cannot process blob because this operation requires a database.");
672
673
0
        return -1;
674
0
    }
675
676
0
    ret = flb_log_event_decoder_init(&log_decoder,
677
0
                                    (char *) event_chunk->data,
678
0
                                     event_chunk->size);
679
680
0
    if (ret != FLB_EVENT_DECODER_SUCCESS) {
681
0
        flb_plg_error(ctx->ins,
682
0
                    "Log event decoder initialization error : %i", (int) ret);
683
0
        return -1;
684
685
0
    }
686
687
0
    while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) {
688
0
        map = *log_event.body;
689
0
        ret = flb_input_blob_file_get_info(map, &source, &file_path, &file_size);
690
0
        if (ret == -1) {
691
0
            flb_plg_error(ctx->ins, "cannot get file info from blob record, skipping");
692
0
            continue;
693
0
        }
694
695
0
        ret = azb_db_file_insert(ctx, source, ctx->real_endpoint, file_path, file_size);
696
697
0
        if (ret == -1) {
698
0
            flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)",
699
0
                          file_path, file_size);
700
0
            cfl_sds_destroy(file_path);
701
0
            cfl_sds_destroy(source);
702
0
            continue;
703
0
        }
704
0
        cfl_sds_destroy(file_path);
705
0
        cfl_sds_destroy(source);
706
707
        /* generate the parts by using the newest id created (ret) */
708
0
        file_id = ret;
709
0
        ret = blob_chunk_register_parts(ctx, file_id, file_size);
710
0
        if (ret == -1) {
711
0
            flb_plg_error(ctx->ins, "cannot register blob file '%s 'parts into database",
712
0
                            file_path);
713
0
            return -1;
714
0
        }
715
716
0
        flb_plg_debug(ctx->ins, "blob file '%s' (id=%zu) registered with %zu parts",
717
0
                      file_path, file_id, ret);
718
0
    }
719
720
0
    flb_log_event_decoder_destroy(&log_decoder);
721
0
    return 0;
722
0
}
723
724
static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context)
725
0
{
726
0
    int ret;
727
0
    char *out_buf = NULL;
728
0
    size_t out_size;
729
0
    uint64_t id;
730
0
    uint64_t file_id;
731
0
    uint64_t part_id;
732
0
    uint64_t part_delivery_attempts;
733
0
    uint64_t file_delivery_attempts;
734
0
    off_t offset_start;
735
0
    off_t offset_end;
736
0
    cfl_sds_t file_destination = NULL;
737
0
    cfl_sds_t file_path = NULL;
738
0
    cfl_sds_t part_ids = NULL;
739
0
    cfl_sds_t source = NULL;
740
0
    struct flb_azure_blob *ctx = out_context;
741
0
    struct worker_info *info;
742
0
    struct flb_blob_delivery_notification *notification;
743
744
0
    info = FLB_TLS_GET(worker_info);
745
746
0
    if (info->active_upload) {
747
0
        flb_plg_trace(ctx->ins, "[worker: file upload] upload already in progress...");
748
0
        flb_sched_timer_cb_coro_return();
749
0
    }
750
751
0
    if (ctx->db == NULL) {
752
0
        flb_sched_timer_cb_coro_return();
753
0
    }
754
755
0
    info->active_upload = FLB_TRUE;
756
757
    /*
758
     * Check if is there any file which has been fully uploaded and we need to commit it with
759
     * the Put Block List operation
760
     */
761
762
0
    pthread_mutex_lock(&ctx->file_upload_commit_file_parts);
763
764
0
    while (1) {
765
0
        ret = azb_db_file_get_next_stale(ctx,
766
0
                                         &file_id,
767
0
                                         &file_path);
768
769
0
        if (ret == 1) {
770
0
            delete_blob(ctx, file_path);
771
772
0
            azb_db_file_reset_upload_states(ctx, file_id, file_path);
773
0
            azb_db_file_set_aborted_state(ctx, file_id, file_path, 0);
774
775
0
            cfl_sds_destroy(file_path);
776
777
0
            file_path = NULL;
778
0
        }
779
0
        else {
780
0
            break;
781
0
        }
782
0
    }
783
784
0
    while (1) {
785
0
        ret = azb_db_file_get_next_aborted(ctx,
786
0
                                           &file_id,
787
0
                                           &file_delivery_attempts,
788
0
                                           &file_path,
789
0
                                           &source);
790
791
0
        if (ret == 1) {
792
0
            ret = delete_blob(ctx, file_path);
793
794
0
            if (ctx->file_delivery_attempt_limit != FLB_OUT_RETRY_UNLIMITED &&
795
0
                file_delivery_attempts < ctx->file_delivery_attempt_limit) {
796
0
                azb_db_file_reset_upload_states(ctx, file_id, file_path);
797
0
                azb_db_file_set_aborted_state(ctx, file_id, file_path, 0);
798
0
            }
799
0
            else {
800
0
                ret = azb_db_file_delete(ctx, file_id, file_path);
801
802
0
                notification = flb_calloc(1,
803
0
                                        sizeof(
804
0
                                            struct flb_blob_delivery_notification));
805
806
0
                if (notification != NULL) {
807
0
                    notification->base.dynamically_allocated = FLB_TRUE;
808
0
                    notification->base.notification_type = FLB_NOTIFICATION_TYPE_BLOB_DELIVERY;
809
0
                    notification->base.destructor = flb_input_blob_delivery_notification_destroy;
810
0
                    notification->success = FLB_FALSE;
811
0
                    notification->path = cfl_sds_create(file_path);
812
813
0
                    ret = flb_notification_enqueue(FLB_PLUGIN_INPUT,
814
0
                                                source,
815
0
                                                &notification->base,
816
0
                                                config);
817
818
0
                    if (ret != 0) {
819
0
                        flb_plg_error(ctx->ins,
820
0
                                    "blob file '%s' (id=%" PRIu64 ") notification " \
821
0
                                    "delivery error %d", file_path, file_id, ret);
822
823
0
                        flb_notification_cleanup(&notification->base);
824
0
                    }
825
0
                }
826
0
            }
827
828
0
            cfl_sds_destroy(file_path);
829
0
            cfl_sds_destroy(source);
830
831
0
            file_path = NULL;
832
0
            source = NULL;
833
0
        }
834
0
        else {
835
0
            break;
836
0
        }
837
0
    }
838
839
0
    ret = azb_db_file_oldest_ready(ctx, &file_id, &file_path, &part_ids, &source);
840
0
    if (ret == 0) {
841
0
        flb_plg_trace(ctx->ins, "no blob files ready to commit");
842
0
    }
843
0
    else if (ret == -1) {
844
0
        flb_plg_error(ctx->ins, "cannot get oldest blob file ready to upload");
845
0
    }
846
0
    else if (ret == 1) {
847
        /* one file is ready to be committed */
848
0
        flb_plg_debug(ctx->ins, "blob file '%s' (id=%" PRIu64 ") ready to upload", file_path, file_id);
849
0
        ret = azb_block_blob_commit_file_parts(ctx, file_id, file_path, part_ids);
850
0
        if (ret == -1) {
851
0
            flb_plg_error(ctx->ins, "cannot commit blob file parts for file id=%" PRIu64 " path=%s",
852
0
                          file_id, file_path);
853
0
        }
854
0
        else {
855
0
            flb_plg_info(ctx->ins, "blob file '%s' (id=%" PRIu64 ") committed successfully", file_path, file_id);
856
            /* notify the engine the blob file has been processed */
857
            /* FIXME! */
858
859
0
            notification = flb_calloc(1,
860
0
                                    sizeof(
861
0
                                        struct flb_blob_delivery_notification));
862
863
0
            if (notification != NULL) {
864
0
                notification->base.dynamically_allocated = FLB_TRUE;
865
0
                notification->base.notification_type = FLB_NOTIFICATION_TYPE_BLOB_DELIVERY;
866
0
                notification->base.destructor = flb_input_blob_delivery_notification_destroy;
867
0
                notification->success = FLB_TRUE;
868
0
                notification->path = cfl_sds_create(file_path);
869
870
0
                ret = flb_notification_enqueue(FLB_PLUGIN_INPUT,
871
0
                                               source,
872
0
                                               &notification->base,
873
0
                                               config);
874
875
0
                if (ret != 0) {
876
0
                    flb_plg_error(ctx->ins,
877
0
                                "blob file '%s' (id=%" PRIu64 ") notification " \
878
0
                                "delivery error %d", file_path, file_id, ret);
879
880
0
                    flb_notification_cleanup(&notification->base);
881
0
                }
882
0
            }
883
884
            /* remove the file entry from the database */
885
0
            ret = azb_db_file_delete(ctx, file_id, file_path);
886
0
            if (ret == -1) {
887
0
                flb_plg_error(ctx->ins, "cannot delete blob file '%s' (id=%" PRIu64 ") from the database",
888
0
                              file_path, file_id);
889
0
            }
890
0
        }
891
0
    }
892
0
    pthread_mutex_unlock(&ctx->file_upload_commit_file_parts);
893
894
0
    if (file_path) {
895
0
        cfl_sds_destroy(file_path);
896
0
    }
897
0
    if (part_ids) {
898
0
        cfl_sds_destroy(part_ids);
899
0
    }
900
0
    if (source) {
901
0
        cfl_sds_destroy(source);
902
0
    }
903
904
    /* check for a next part file and lock it */
905
0
    ret = azb_db_file_part_get_next(ctx, &id, &file_id, &part_id,
906
0
                                    &offset_start, &offset_end,
907
0
                                    &part_delivery_attempts,
908
0
                                    &file_delivery_attempts,
909
0
                                    &file_path,
910
0
                                    &file_destination);
911
0
    if (ret == -1) {
912
0
        flb_plg_error(ctx->ins, "cannot get next blob file part");
913
0
        info->active_upload = FLB_FALSE;
914
0
        flb_sched_timer_cb_coro_return();
915
0
    }
916
0
    else if (ret == 0) {
917
0
        flb_plg_trace(ctx->ins, "no more blob file parts to process");
918
0
        info->active_upload = FLB_FALSE;
919
0
        flb_sched_timer_cb_coro_return();
920
0
    }
921
0
    else if (ret == 1) {
922
        /* just continue, the row info was retrieved */
923
0
    }
924
925
926
0
    if (strcmp(file_destination, ctx->real_endpoint) != 0) {
927
0
        flb_plg_info(ctx->ins,
928
0
                     "endpoint change detected, restarting file : %s\n%s\n%s",
929
0
                     file_path,
930
0
                     file_destination,
931
0
                     ctx->real_endpoint);
932
933
0
        info->active_upload = FLB_FALSE;
934
935
        /* we need to set the aborted state flag to wait for existing uploads
936
         * to finish and then wipe the slate and start again but we don't want
937
         * to increment the failure count in this case.
938
         */
939
0
        azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);
940
941
0
        cfl_sds_destroy(file_path);
942
0
        cfl_sds_destroy(file_destination);
943
944
0
        flb_sched_timer_cb_coro_return();
945
0
    }
946
947
    /* since this is the first part we want to increment the files
948
     * delivery attempt counter.
949
     */
950
0
    if (part_id == 0) {
951
0
        ret = azb_db_file_delivery_attempts(ctx, file_id, ++file_delivery_attempts);
952
0
    }
953
954
    /* read the file content */
955
0
    ret = flb_utils_read_file_offset(file_path, offset_start, offset_end, &out_buf, &out_size);
956
0
    if (ret == -1) {
957
0
        flb_plg_error(ctx->ins, "cannot read file part %s", file_path);
958
959
0
        info->active_upload = FLB_FALSE;
960
961
0
        cfl_sds_destroy(file_path);
962
0
        cfl_sds_destroy(file_destination);
963
964
0
        flb_sched_timer_cb_coro_return();
965
0
    }
966
967
0
    azb_db_file_part_delivery_attempts(ctx, file_id, part_id, ++part_delivery_attempts);
968
969
0
    flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id);
970
971
0
    ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS,
972
0
                    AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size);
973
974
0
    if (ret == FLB_OK) {
975
0
        ret = azb_db_file_part_uploaded(ctx, id);
976
977
0
        if (ret == -1) {
978
0
            info->active_upload = FLB_FALSE;
979
980
0
            cfl_sds_destroy(file_path);
981
0
            cfl_sds_destroy(file_destination);
982
983
0
            flb_sched_timer_cb_coro_return();
984
0
        }
985
0
    }
986
0
    else if (ret == FLB_RETRY) {
987
0
        azb_db_file_part_in_progress(ctx, 0, id);
988
989
0
        if (ctx->part_delivery_attempt_limit != FLB_OUT_RETRY_UNLIMITED &&
990
0
            part_delivery_attempts >= ctx->part_delivery_attempt_limit) {
991
0
            azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);
992
0
        }
993
0
    }
994
995
0
    info->active_upload = FLB_FALSE;
996
997
0
    if (out_buf) {
998
0
        flb_free(out_buf);
999
0
    }
1000
1001
0
    cfl_sds_destroy(file_path);
1002
0
    cfl_sds_destroy(file_destination);
1003
1004
0
    flb_sched_timer_cb_coro_return();
1005
0
}
1006
1007
static int azb_timer_create(struct flb_azure_blob *ctx)
1008
0
{
1009
0
    int ret;
1010
0
    int64_t ms;
1011
0
    struct flb_sched *sched;
1012
1013
0
    sched = flb_sched_ctx_get();
1014
1015
    /* convert from seconds to milliseconds (scheduler needs ms) */
1016
0
    ms = ctx->upload_parts_timeout * 1000;
1017
1018
0
    ret = flb_sched_timer_coro_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, ms,
1019
0
                                         cb_azb_blob_file_upload, ctx, NULL);
1020
0
    if (ret == -1) {
1021
0
        flb_plg_error(ctx->ins, "failed to create upload timer");
1022
0
        return -1;
1023
0
    }
1024
1025
0
    return 0;
1026
0
}
1027
1028
static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk,
1029
                                struct flb_output_flush *out_flush,
1030
                                struct flb_input_instance *i_ins,
1031
                                void *out_context,
1032
                                struct flb_config *config)
1033
0
{
1034
0
    int ret = FLB_OK;
1035
0
    struct flb_azure_blob *ctx = out_context;
1036
0
    (void) i_ins;
1037
0
    (void) config;
1038
1039
    /*
1040
     * Azure blob requires a container. The following function validate that the container exists,
1041
     * otherwise it will be created. Note that that container name is specified by the user
1042
     * in the configuration file.
1043
     *
1044
     * https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-container-create#about-container-naming
1045
     */
1046
0
    ret = ensure_container(ctx);
1047
0
    if (ret == FLB_FALSE) {
1048
0
        FLB_OUTPUT_RETURN(FLB_RETRY);
1049
0
    }
1050
1051
0
    if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
1052
0
        ret = send_blob(config, i_ins, ctx,
1053
0
                        FLB_EVENT_TYPE_LOGS,
1054
0
                        ctx->btype, /* blob type per user configuration  */
1055
0
                        (char *) event_chunk->tag,  /* use tag as 'name' */
1056
0
                        0,  /* part id */
1057
0
                        (char *) event_chunk->tag, flb_sds_len(event_chunk->tag),
1058
0
                        (char *) event_chunk->data, event_chunk->size);
1059
1060
0
        if (ret == CREATE_BLOB) {
1061
0
            ret = create_blob(ctx, event_chunk->tag);
1062
0
            if (ret == FLB_OK) {
1063
0
                ret = send_blob(config, i_ins, ctx,
1064
0
                                FLB_EVENT_TYPE_LOGS,
1065
0
                                ctx->btype, /* blob type per user configuration  */
1066
0
                                (char *) event_chunk->tag,  /* use tag as 'name' */
1067
0
                                0,  /* part id */
1068
0
                                (char *) event_chunk->tag,  /* use tag as 'name' */
1069
0
                                flb_sds_len(event_chunk->tag),
1070
0
                                (char *) event_chunk->data, event_chunk->size);
1071
0
            }
1072
0
        }
1073
0
    }
1074
0
    else if (event_chunk->type == FLB_EVENT_TYPE_BLOBS) {
1075
        /*
1076
         * For Blob types, we use the flush callback to enqueue the file, then cb_azb_blob_file_upload()
1077
         * takes care of the rest like reading the file and uploading it to Azure.
1078
         */
1079
0
        ret = process_blob_chunk(ctx, event_chunk);
1080
0
        if (ret == -1) {
1081
0
            FLB_OUTPUT_RETURN(FLB_RETRY);
1082
0
        }
1083
0
    }
1084
1085
    /* FLB_RETRY, FLB_OK, FLB_ERROR */
1086
0
    FLB_OUTPUT_RETURN(ret);
1087
0
}
1088
1089
static int cb_azure_blob_exit(void *data, struct flb_config *config)
1090
0
{
1091
0
    struct flb_azure_blob *ctx = data;
1092
1093
0
    if (!ctx) {
1094
0
        return 0;
1095
0
    }
1096
1097
0
    flb_azure_blob_conf_destroy(ctx);
1098
0
    return 0;
1099
0
}
1100
1101
/* worker initialization, used for our internal timers */
1102
static int cb_worker_init(void *data, struct flb_config *config)
1103
0
{
1104
0
    int ret;
1105
0
    struct worker_info *info;
1106
0
    struct flb_azure_blob *ctx = data;
1107
1108
0
    flb_plg_info(ctx->ins, "initializing worker");
1109
1110
0
    info = FLB_TLS_GET(worker_info);
1111
0
    if (!info) {
1112
        /* initialize worker global info */
1113
0
        info = flb_malloc(sizeof(struct worker_info));
1114
0
        if (!info) {
1115
0
            flb_errno();
1116
0
            return -1;
1117
0
        }
1118
0
        info->active_upload = FLB_FALSE;
1119
0
        FLB_TLS_SET(worker_info, info);
1120
0
    }
1121
1122
0
    ret = azb_timer_create(ctx);
1123
0
    if (ret == -1) {
1124
0
        flb_plg_error(ctx->ins, "failed to create upload timer");
1125
0
        return -1;
1126
0
    }
1127
1128
0
    return 0;
1129
0
}
1130
1131
/* worker teardown */
1132
static int cb_worker_exit(void *data, struct flb_config *config)
1133
0
{
1134
0
    struct worker_info *info;
1135
0
    struct flb_azure_blob *ctx = data;
1136
1137
0
    flb_plg_info(ctx->ins, "initializing worker");
1138
1139
0
    info = FLB_TLS_GET(worker_info);
1140
0
    if (info != NULL) {
1141
0
        flb_free(info);
1142
0
        FLB_TLS_SET(worker_info, NULL);
1143
0
    }
1144
1145
0
    return 0;
1146
0
}
1147
1148
/* Configuration properties map */
1149
static struct flb_config_map config_map[] = {
1150
    {
1151
     FLB_CONFIG_MAP_STR, "account_name", NULL,
1152
     0, FLB_TRUE, offsetof(struct flb_azure_blob, account_name),
1153
     "Azure account name (mandatory)"
1154
    },
1155
1156
    {
1157
     FLB_CONFIG_MAP_STR, "container_name", NULL,
1158
     0, FLB_TRUE, offsetof(struct flb_azure_blob, container_name),
1159
     "Container name (mandatory)"
1160
    },
1161
1162
    {
1163
     FLB_CONFIG_MAP_BOOL, "auto_create_container", "true",
1164
     0, FLB_TRUE, offsetof(struct flb_azure_blob, auto_create_container),
1165
     "Auto create container if it don't exists"
1166
    },
1167
1168
    {
1169
     FLB_CONFIG_MAP_STR, "blob_type", "appendblob",
1170
     0, FLB_TRUE, offsetof(struct flb_azure_blob, blob_type),
1171
     "Set the block type: appendblob or blockblob"
1172
    },
1173
1174
    {
1175
     FLB_CONFIG_MAP_STR, "compress", NULL,
1176
     0, FLB_FALSE, 0,
1177
     "Set payload compression in network transfer. Option available is 'gzip'"
1178
    },
1179
1180
    {
1181
     FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
1182
     0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
1183
     "Enable block blob GZIP compression in the final blob file. This option is "
1184
     "not compatible with 'appendblob' block type"
1185
    },
1186
1187
    {
1188
     FLB_CONFIG_MAP_BOOL, "emulator_mode", "false",
1189
     0, FLB_TRUE, offsetof(struct flb_azure_blob, emulator_mode),
1190
     "Use emulator mode, enable it if you want to use Azurite"
1191
    },
1192
1193
    {
1194
     FLB_CONFIG_MAP_STR, "shared_key", NULL,
1195
     0, FLB_TRUE, offsetof(struct flb_azure_blob, shared_key),
1196
     "Azure shared key"
1197
    },
1198
1199
    {
1200
     FLB_CONFIG_MAP_STR, "endpoint", NULL,
1201
     0, FLB_TRUE, offsetof(struct flb_azure_blob, endpoint),
1202
     "Custom full URL endpoint to use an emulator"
1203
    },
1204
1205
    {
1206
     FLB_CONFIG_MAP_STR, "path", NULL,
1207
     0, FLB_TRUE, offsetof(struct flb_azure_blob, path),
1208
     "Set a path for your blob"
1209
    },
1210
1211
    {
1212
     FLB_CONFIG_MAP_STR, "date_key", "@timestamp",
1213
     0, FLB_TRUE, offsetof(struct flb_azure_blob, date_key),
1214
     "Name of the key that will have the record timestamp"
1215
    },
1216
1217
    {
1218
     FLB_CONFIG_MAP_STR, "auth_type", "key",
1219
     0, FLB_TRUE, offsetof(struct flb_azure_blob, auth_type),
1220
     "Set the auth type: key or sas"
1221
    },
1222
1223
    {
1224
     FLB_CONFIG_MAP_STR, "sas_token", NULL,
1225
     0, FLB_TRUE, offsetof(struct flb_azure_blob, sas_token),
1226
     "Azure Blob SAS token"
1227
    },
1228
1229
    {
1230
     FLB_CONFIG_MAP_STR, "database_file", NULL,
1231
     0, FLB_TRUE, offsetof(struct flb_azure_blob, database_file),
1232
     "Absolute path to a database file to be used to store blob files contexts"
1233
    },
1234
1235
    {
1236
     FLB_CONFIG_MAP_SIZE, "part_size", "25M",
1237
     0, FLB_TRUE, offsetof(struct flb_azure_blob, part_size),
1238
     "Size of each part when uploading blob files"
1239
    },
1240
1241
    {
1242
     FLB_CONFIG_MAP_INT, "file_delivery_attempt_limit", "1",
1243
     0, FLB_TRUE, offsetof(struct flb_azure_blob, file_delivery_attempt_limit),
1244
     "File delivery attempt limit"
1245
    },
1246
1247
    {
1248
     FLB_CONFIG_MAP_INT, "part_delivery_attempt_limit", "1",
1249
     0, FLB_TRUE, offsetof(struct flb_azure_blob, part_delivery_attempt_limit),
1250
     "File part delivery attempt limit"
1251
    },
1252
1253
    {
1254
     FLB_CONFIG_MAP_TIME, "upload_parts_timeout", "10M",
1255
     0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_parts_timeout),
1256
     "Timeout to upload parts of a blob file"
1257
    },
1258
1259
    {
1260
     FLB_CONFIG_MAP_TIME, "upload_part_freshness_limit", "6D",
1261
     0, FLB_TRUE, offsetof(struct flb_azure_blob, upload_parts_freshness_threshold),
1262
     "Maximum lifespan of an uncommitted file part"
1263
    },
1264
1265
    {
1266
     FLB_CONFIG_MAP_STR, "configuration_endpoint_url", NULL,
1267
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_url),
1268
     "Configuration endpoint URL"
1269
    },
1270
1271
    {
1272
     FLB_CONFIG_MAP_STR, "configuration_endpoint_username", NULL,
1273
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_username),
1274
     "Configuration endpoint basic authentication username"
1275
    },
1276
1277
    {
1278
     FLB_CONFIG_MAP_STR, "configuration_endpoint_password", NULL,
1279
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_password),
1280
     "Configuration endpoint basic authentication password"
1281
    },
1282
1283
    {
1284
     FLB_CONFIG_MAP_STR, "configuration_endpoint_bearer_token", NULL,
1285
     0, FLB_TRUE, offsetof(struct flb_azure_blob, configuration_endpoint_bearer_token),
1286
     "Configuration endpoint bearer token"
1287
    },
1288
1289
    /* EOF */
1290
    {0}
1291
};
1292
1293
/* Plugin registration */
1294
struct flb_output_plugin out_azure_blob_plugin = {
1295
    .name           = "azure_blob",
1296
    .description    = "Azure Blob Storage",
1297
    .cb_init        = cb_azure_blob_init,
1298
    .cb_flush       = cb_azure_blob_flush,
1299
    .cb_exit        = cb_azure_blob_exit,
1300
    .cb_worker_init = cb_worker_init,
1301
    .cb_worker_exit = cb_worker_exit,
1302
1303
    /* Test */
1304
    .test_formatter.callback = azure_blob_format,
1305
1306
    .flags        = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
1307
    .event_type   = FLB_OUTPUT_LOGS | FLB_OUTPUT_BLOBS,
1308
    .config_map   = config_map,
1309
    .workers      = 1,
1310
};