Coverage Report

Created: 2025-10-14 08:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/plugins/out_s3/s3_multipart.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2024 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_output_plugin.h>
21
#include <fluent-bit/flb_utils.h>
22
#include <fluent-bit/flb_slist.h>
23
#include <fluent-bit/flb_time.h>
24
#include <fluent-bit/flb_pack.h>
25
#include <fluent-bit/flb_config_map.h>
26
#include <fluent-bit/flb_aws_util.h>
27
#include <fluent-bit/flb_signv4.h>
28
#include <fluent-bit/flb_fstore.h>
29
#include <ctype.h>
30
31
#include "s3.h"
32
#include "s3_store.h"
33
34
0
#define COMPLETE_MULTIPART_UPLOAD_BASE_LEN 100
35
0
#define COMPLETE_MULTIPART_UPLOAD_PART_LEN 124
36
37
flb_sds_t get_etag(char *response, size_t size);
38
39
static inline int try_to_write(char *buf, int *off, size_t left,
40
                               const char *str, size_t str_len)
41
0
{
42
0
    if (str_len <= 0){
43
0
        str_len = strlen(str);
44
0
    }
45
0
    if (left <= *off+str_len) {
46
0
        return FLB_FALSE;
47
0
    }
48
0
    memcpy(buf+*off, str, str_len);
49
0
    *off += str_len;
50
0
    return FLB_TRUE;
51
0
}
52
53
54
/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */
55
static flb_sds_t upload_key(struct multipart_upload *m_upload)
56
0
{
57
0
    flb_sds_t key;
58
0
    flb_sds_t tmp;
59
60
0
    key = flb_sds_create_size(64);
61
62
0
    tmp = flb_sds_printf(&key, "%s\n%s", m_upload->s3_key, m_upload->upload_id);
63
0
    if (!tmp) {
64
0
        flb_errno();
65
0
        flb_sds_destroy(key);
66
0
        return NULL;
67
0
    }
68
0
    key = tmp;
69
70
0
    return key;
71
0
}
72
73
/* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */
74
static int upload_data_from_key(struct multipart_upload *m_upload, char *key)
75
0
{
76
0
    flb_sds_t tmp_sds;
77
0
    int len = 0;
78
0
    int original_len;
79
0
    char *tmp;
80
81
0
    original_len = strlen(key);
82
83
0
    tmp = strchr(key, '\n');
84
0
    if (!tmp) {
85
0
        return -1;
86
0
    }
87
88
0
    len = tmp - key;
89
0
    tmp_sds = flb_sds_create_len(key, len);
90
0
    if (!tmp_sds) {
91
0
        flb_errno();
92
0
        return -1;
93
0
    }
94
0
    m_upload->s3_key = tmp_sds;
95
96
0
    tmp++;
97
0
    original_len -= (len + 1);
98
99
0
    tmp_sds = flb_sds_create_len(tmp, original_len);
100
0
    if (!tmp_sds) {
101
0
        flb_errno();
102
0
        return -1;
103
0
    }
104
0
    m_upload->upload_id = tmp_sds;
105
106
0
    return 0;
107
0
}
108
109
/* parse etags from file data */
110
static void parse_etags(struct multipart_upload *m_upload, char *data)
111
0
{
112
0
    char *line = data;
113
0
    char *start;
114
0
    char *end;
115
0
    flb_sds_t etag;
116
0
    int part_num;
117
0
    int len;
118
119
0
    if (!data) {
120
0
        return;
121
0
    }
122
123
0
    line = strtok(data, "\n");
124
125
0
    if (!line) {
126
0
        return;
127
0
    }
128
129
0
    do {
130
0
        start = strstr(line, "part_number=");
131
0
        if (!start) {
132
0
            return;
133
0
        }
134
0
        start += 12;
135
0
        end = strchr(start, '\t');
136
0
        if (!end) {
137
0
            flb_debug("[s3 restart parser] Did not find tab separator in line %s", start);
138
0
            return;
139
0
        }
140
0
        *end = '\0';
141
0
        part_num = atoi(start);
142
0
        if (part_num <= 0) {
143
0
            flb_debug("[s3 restart parser] Could not parse part_number from %s", start);
144
0
            return;
145
0
        }
146
0
        m_upload->part_number = part_num;
147
0
        *end = '\t';
148
149
0
        start = strstr(line, "tag=");
150
0
        if (!start) {
151
0
            flb_debug("[s3 restart parser] Could not find 'etag=' %s", line);
152
0
            return;
153
0
        }
154
155
0
        start += 4;
156
0
        len = strlen(start);
157
158
0
        if (len <= 0) {
159
0
            flb_debug("[s3 restart parser] Could not find etag %s", line);
160
0
            return;
161
0
        }
162
163
0
        etag = flb_sds_create_len(start, len);
164
0
        if (!etag) {
165
0
            flb_debug("[s3 restart parser] Could create etag");
166
0
            return;
167
0
        }
168
0
        flb_debug("[s3 restart parser] found part number %d=%s", part_num, etag);
169
0
        m_upload->etags[part_num - 1] = etag;
170
171
0
        line = strtok(NULL, "\n");
172
0
    } while (line != NULL);
173
0
}
174
175
static struct multipart_upload *upload_from_file(struct flb_s3 *ctx,
176
                                                 struct flb_fstore_file *fsf)
177
0
{
178
0
    struct multipart_upload *m_upload = NULL;
179
0
    char *buffered_data = NULL;
180
0
    size_t buffer_size = 0;
181
0
    int ret;
182
183
0
    ret = s3_store_file_upload_read(ctx, fsf, &buffered_data, &buffer_size);
184
0
    if (ret < 0) {
185
0
        flb_plg_error(ctx->ins, "Could not read locally buffered data %s",
186
0
                      fsf->name);
187
0
        return NULL;
188
0
    }
189
190
    /* always make sure we have a fresh copy of metadata */
191
0
    ret = s3_store_file_meta_get(ctx, fsf);
192
0
    if (ret == -1) {
193
0
        flb_plg_error(ctx->ins, "Could not read file metadata: %s",
194
0
                      fsf->name);
195
0
        flb_free(buffered_data);
196
0
        return NULL;
197
0
    }
198
199
0
    m_upload = flb_calloc(1, sizeof(struct multipart_upload));
200
0
    if (!m_upload) {
201
0
        flb_errno();
202
0
        flb_free(buffered_data);
203
0
        return NULL;
204
0
    }
205
0
    m_upload->init_time = time(NULL);
206
0
    m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
207
208
0
    ret = upload_data_from_key(m_upload, fsf->meta_buf);
209
0
    if (ret < 0) {
210
0
        flb_plg_error(ctx->ins, "Could not extract upload data from: %s",
211
0
                      fsf->name);
212
0
        flb_free(buffered_data);
213
0
        multipart_upload_destroy(m_upload);
214
0
        return NULL;
215
0
    }
216
217
0
    parse_etags(m_upload, buffered_data);
218
0
    flb_free(buffered_data);
219
0
    if (m_upload->part_number == 0) {
220
0
        flb_plg_error(ctx->ins, "Could not extract upload data from %s",
221
0
                      fsf->name);
222
0
        multipart_upload_destroy(m_upload);
223
0
        return NULL;
224
0
    }
225
226
    /* code expects it to be 1 more than the last part read */
227
0
    m_upload->part_number++;
228
229
0
    return m_upload;
230
0
}
231
232
void multipart_read_uploads_from_fs(struct flb_s3 *ctx)
233
0
{
234
0
    struct mk_list *tmp;
235
0
    struct mk_list *head;
236
0
    struct multipart_upload *m_upload = NULL;
237
0
    struct flb_fstore_file *fsf;
238
239
0
    mk_list_foreach_safe(head, tmp, &ctx->stream_upload->files) {
240
0
        fsf = mk_list_entry(head, struct flb_fstore_file, _head);
241
0
        m_upload = upload_from_file(ctx, fsf);
242
0
        if (!m_upload) {
243
0
            flb_plg_error(ctx->ins,
244
0
                          "Could not process multipart upload data in %s",
245
0
                          fsf->name);
246
0
            continue;
247
0
        }
248
0
        mk_list_add(&m_upload->_head, &ctx->uploads);
249
0
        flb_plg_info(ctx->ins,
250
0
                     "Successfully read existing upload from file system, s3_key=%s",
251
0
                     m_upload->s3_key);
252
0
    }
253
0
}
254
255
/* store list of part number and etag */
256
static flb_sds_t upload_data(flb_sds_t etag, int part_num)
257
0
{
258
0
    flb_sds_t data;
259
0
    flb_sds_t tmp;
260
261
0
    data = flb_sds_create_size(64);
262
263
0
    tmp = flb_sds_printf(&data, "part_number=%d\tetag=%s\n", part_num, etag);
264
0
    if (!tmp) {
265
0
        flb_errno();
266
0
        flb_sds_destroy(data);
267
0
        return NULL;
268
0
    }
269
0
    data = tmp;
270
271
0
    return data;
272
0
}
273
274
/* persists upload data to the file system */
275
static int save_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload,
276
                       flb_sds_t etag)
277
0
{
278
0
    int ret;
279
0
    flb_sds_t key;
280
0
    flb_sds_t data;
281
0
    struct flb_fstore_file *fsf;
282
283
0
    key = upload_key(m_upload);
284
0
    if (!key) {
285
0
        flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir");
286
0
        return -1;
287
0
    }
288
289
0
    data = upload_data(etag, m_upload->part_number);
290
0
    if (!data) {
291
0
        flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir");
292
0
        return -1;
293
0
    }
294
295
0
    fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key));
296
297
    /* Write the key to the file */
298
0
    ret = s3_store_file_upload_put(ctx, fsf, key, data);
299
300
0
    flb_sds_destroy(key);
301
0
    flb_sds_destroy(data);
302
303
0
    return ret;
304
0
}
305
306
static int remove_upload_from_fs(struct flb_s3 *ctx, struct multipart_upload *m_upload)
307
0
{
308
0
    flb_sds_t key;
309
0
    struct flb_fstore_file *fsf;
310
311
0
    key = upload_key(m_upload);
312
0
    if (!key) {
313
0
        flb_plg_debug(ctx->ins, "Could not construct upload key");
314
0
        return -1;
315
0
    }
316
317
0
    fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key));
318
0
    if (fsf) {
319
0
        s3_store_file_upload_delete(ctx, fsf);
320
0
    }
321
0
    flb_sds_destroy(key);
322
0
    return 0;
323
0
}
324
325
/*
326
 * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
327
 */
328
static int complete_multipart_upload_payload(struct flb_s3 *ctx,
329
                                             struct multipart_upload *m_upload,
330
                                             char **out_buf, size_t *out_size)
331
0
{
332
0
    char *buf;
333
0
    int i;
334
0
    int offset = 0;
335
0
    flb_sds_t etag;
336
0
    size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN;
337
0
    char part_num[11];
338
339
0
    size = size + (COMPLETE_MULTIPART_UPLOAD_PART_LEN * m_upload->part_number);
340
341
0
    buf = flb_malloc(size + 1);
342
0
    if (!buf) {
343
0
        flb_errno();
344
0
        return -1;
345
0
    }
346
347
0
    if (!try_to_write(buf, &offset, size,
348
0
                      "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", 73)) {
349
0
        goto error;
350
0
    }
351
352
0
    for (i = 0; i < m_upload->part_number; i++) {
353
0
        etag = m_upload->etags[i];
354
0
        if (etag == NULL) {
355
0
            continue;
356
0
        }
357
0
        if (!try_to_write(buf, &offset, size,
358
0
                          "<Part><ETag>", 12)) {
359
0
            goto error;
360
0
        }
361
362
0
        if (!try_to_write(buf, &offset, size,
363
0
                          etag, 0)) {
364
0
            goto error;
365
0
        }
366
367
0
        if (!try_to_write(buf, &offset, size,
368
0
                          "</ETag><PartNumber>", 19)) {
369
0
            goto error;
370
0
        }
371
372
0
        if (!sprintf(part_num, "%d", i + 1)) {
373
0
            goto error;
374
0
        }
375
376
0
        if (!try_to_write(buf, &offset, size,
377
0
                          part_num, 0)) {
378
0
            goto error;
379
0
        }
380
381
0
        if (!try_to_write(buf, &offset, size,
382
0
                          "</PartNumber></Part>", 20)) {
383
0
            goto error;
384
0
        }
385
0
    }
386
387
0
    if (!try_to_write(buf, &offset, size,
388
0
                      "</CompleteMultipartUpload>", 26)) {
389
0
        goto error;
390
0
    }
391
392
0
    buf[offset] = '\0';
393
394
0
    *out_buf = buf;
395
0
    *out_size = offset;
396
0
    return 0;
397
398
0
error:
399
0
    flb_free(buf);
400
0
    flb_plg_error(ctx->ins, "Failed to construct CompleteMultipartUpload "
401
0
                  "request body");
402
0
    return -1;
403
0
}
404
405
int complete_multipart_upload(struct flb_s3 *ctx,
406
                              struct multipart_upload *m_upload,
407
                              char *pre_signed_url)
408
0
{
409
0
    char *body;
410
0
    size_t size;
411
0
    flb_sds_t uri = NULL;
412
0
    flb_sds_t tmp;
413
0
    int ret;
414
0
    struct flb_http_client *c = NULL;
415
0
    struct flb_aws_client *s3_client;
416
417
0
    if (!m_upload->upload_id) {
418
0
        flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: "
419
0
                      "upload ID is unset ", m_upload->s3_key);
420
0
        return -1;
421
0
    }
422
423
0
    uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 +
424
0
                              flb_sds_len(m_upload->upload_id));
425
0
    if (!uri) {
426
0
        flb_errno();
427
0
        return -1;
428
0
    }
429
430
0
    if (pre_signed_url != NULL) {
431
0
        tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
432
0
    }
433
0
    else {
434
0
        tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket,
435
0
                            m_upload->s3_key, m_upload->upload_id);
436
0
    }
437
438
0
    if (!tmp) {
439
0
        flb_sds_destroy(uri);
440
0
        return -1;
441
0
    }
442
0
    uri = tmp;
443
444
0
    ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size);
445
0
    if (ret < 0) {
446
0
        flb_sds_destroy(uri);
447
0
        return -1;
448
0
    }
449
450
0
    s3_client = ctx->s3_client;
451
0
    if (s3_plugin_under_test() == FLB_TRUE) {
452
0
        c = mock_s3_call("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR", "CompleteMultipartUpload");
453
0
    }
454
0
    else {
455
0
        c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST,
456
0
                                              uri, body, size,
457
0
                                              NULL, 0);
458
0
    }
459
0
    flb_sds_destroy(uri);
460
0
    flb_free(body);
461
0
    if (c) {
462
0
        flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d",
463
0
                      c->resp.status);
464
0
        if (c->resp.status == 200) {
465
0
            flb_plg_info(ctx->ins, "Successfully completed multipart upload "
466
0
                         "for %s, UploadId=%s", m_upload->s3_key,
467
0
                         m_upload->upload_id);
468
0
            flb_http_client_destroy(c);
469
            /* remove this upload from the file system */
470
0
            remove_upload_from_fs(ctx, m_upload);
471
0
            return 0;
472
0
        }
473
0
        flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
474
0
                                "CompleteMultipartUpload", ctx->ins);
475
0
        if (c->resp.payload != NULL) {
476
0
            flb_plg_debug(ctx->ins, "Raw CompleteMultipartUpload response: %s",
477
0
                          c->resp.payload);
478
0
        }
479
0
        flb_http_client_destroy(c);
480
0
    }
481
482
0
    flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed");
483
0
    return -1;
484
0
}
485
486
int abort_multipart_upload(struct flb_s3 *ctx,
487
                           struct multipart_upload *m_upload,
488
                           char *pre_signed_url)
489
0
{
490
0
    flb_sds_t uri = NULL;
491
0
    flb_sds_t tmp;
492
0
    struct flb_http_client *c = NULL;
493
0
    struct flb_aws_client *s3_client;
494
495
0
    if (!m_upload->upload_id) {
496
0
        flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: "
497
0
                      "upload ID is unset ", m_upload->s3_key);
498
0
        return -1;
499
0
    }
500
501
0
    uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 +
502
0
                              flb_sds_len(m_upload->upload_id));
503
0
    if (!uri) {
504
0
        flb_errno();
505
0
        return -1;
506
0
    }
507
508
0
    if (pre_signed_url != NULL) {
509
0
        tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
510
0
    }
511
0
    else {
512
0
        tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket,
513
0
                            m_upload->s3_key, m_upload->upload_id);
514
0
    }
515
516
0
    if (!tmp) {
517
0
        flb_sds_destroy(uri);
518
0
        return -1;
519
0
    }
520
0
    uri = tmp;
521
522
0
    s3_client = ctx->s3_client;
523
0
    if (s3_plugin_under_test() == FLB_TRUE) {
524
0
        c = mock_s3_call("TEST_ABORT_MULTIPART_UPLOAD_ERROR", "AbortMultipartUpload");
525
0
    }
526
0
    else {
527
0
        c = s3_client->client_vtable->request(s3_client, FLB_HTTP_DELETE,
528
0
                                              uri, NULL, 0,
529
0
                                              NULL, 0);
530
0
    }
531
0
    flb_sds_destroy(uri);
532
533
0
    if (c) {
534
0
        flb_plg_debug(ctx->ins, "AbortMultipartUpload http status=%d",
535
0
                      c->resp.status);
536
0
        if (c->resp.status == 204) {
537
0
            flb_plg_info(ctx->ins, "Successfully completed multipart upload "
538
0
                         "for %s, UploadId=%s", m_upload->s3_key,
539
0
                         m_upload->upload_id);
540
0
            flb_http_client_destroy(c);
541
            /* remove this upload from the file system */
542
0
            remove_upload_from_fs(ctx, m_upload);
543
0
            return 0;
544
0
        }
545
0
        flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
546
0
                                "AbortMultipartUpload", ctx->ins);
547
0
        if (c->resp.payload != NULL) {
548
0
            flb_plg_debug(ctx->ins, "Raw AbortMultipartUpload response: %s",
549
0
                          c->resp.payload);
550
0
        }
551
0
        flb_http_client_destroy(c);
552
0
    }
553
554
0
    flb_plg_error(ctx->ins, "AbortMultipartUpload request failed");
555
0
    return -1;
556
0
}
557
558
int create_multipart_upload(struct flb_s3 *ctx,
559
                            struct multipart_upload *m_upload,
560
                            char *pre_signed_url)
561
0
{
562
0
    flb_sds_t uri = NULL;
563
0
    flb_sds_t tmp;
564
0
    struct flb_http_client *c = NULL;
565
0
    struct flb_aws_client *s3_client;
566
0
    struct flb_aws_header *headers = NULL;
567
0
    int num_headers = 0;
568
0
    int ret;
569
570
0
    uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8);
571
0
    if (!uri) {
572
0
        flb_errno();
573
0
        return -1;
574
0
    }
575
576
0
    if (pre_signed_url != NULL) {
577
0
        tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
578
0
    }
579
0
    else {
580
0
        tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key);
581
0
    }
582
583
0
    if (!tmp) {
584
0
        flb_sds_destroy(uri);
585
0
        return -1;
586
0
    }
587
0
    uri = tmp;
588
589
0
    s3_client = ctx->s3_client;
590
0
    if (s3_plugin_under_test() == FLB_TRUE) {
591
0
        c = mock_s3_call("TEST_CREATE_MULTIPART_UPLOAD_ERROR", "CreateMultipartUpload");
592
0
    }
593
0
    else {
594
0
        ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE);
595
0
        if (ret == -1) {
596
0
            flb_plg_error(ctx->ins, "Failed to create headers");
597
0
            flb_sds_destroy(uri);
598
0
            return -1;
599
0
        }
600
0
        c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST,
601
0
                                              uri, NULL, 0, headers, num_headers);
602
0
        if (headers) {
603
0
           flb_free(headers);
604
0
        }
605
0
    }
606
0
    flb_sds_destroy(uri);
607
0
    if (c) {
608
0
        flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d",
609
0
                      c->resp.status);
610
0
        if (c->resp.status == 200) {
611
0
            tmp = flb_aws_xml_get_val(c->resp.payload, c->resp.payload_size,
612
0
                                  "<UploadId>", "</UploadId>");
613
0
            if (!tmp) {
614
0
                flb_plg_error(ctx->ins, "Could not find upload ID in "
615
0
                              "CreateMultipartUpload response");
616
0
                flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s",
617
0
                              c->resp.payload);
618
0
                flb_http_client_destroy(c);
619
0
                return -1;
620
0
            }
621
0
            m_upload->upload_id = tmp;
622
0
            flb_plg_info(ctx->ins, "Successfully initiated multipart upload "
623
0
                         "for %s, UploadId=%s", m_upload->s3_key,
624
0
                         m_upload->upload_id);
625
0
            flb_http_client_destroy(c);
626
0
            return 0;
627
0
        }
628
0
        flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
629
0
                                "CreateMultipartUpload", ctx->ins);
630
0
        if (c->resp.payload != NULL) {
631
0
            flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s",
632
0
                          c->resp.payload);
633
0
        }
634
0
        flb_http_client_destroy(c);
635
0
    }
636
637
0
    flb_plg_error(ctx->ins, "CreateMultipartUpload request failed");
638
0
    return -1;
639
0
}
640
641
/* gets the ETag value from response headers */
642
flb_sds_t get_etag(char *response, size_t size)
643
0
{
644
0
    char *tmp;
645
0
    int start;
646
0
    int end;
647
0
    int len;
648
0
    int i = 0;
649
0
    flb_sds_t etag;
650
651
0
    if (response == NULL) {
652
0
        return NULL;
653
0
    }
654
655
0
    tmp = strstr(response, "ETag:");
656
0
    if (!tmp) {
657
0
        return NULL;
658
0
    }
659
0
    i = tmp - response;
660
661
    /* advance to end of ETag key */
662
0
    i += 5;
663
664
    /* advance across any whitespace and the opening quote */
665
0
    while (i < size && (response[i] == '\"' || isspace(response[i]) != 0)) {
666
0
        i++;
667
0
    }
668
0
    start = i;
669
    /* advance until we hit whitespace or the end quote */
670
0
    while (i < size && (response[i] != '\"' && isspace(response[i]) == 0)) {
671
0
        i++;
672
0
    }
673
0
    end = i;
674
0
    len = end - start;
675
676
0
    etag = flb_sds_create_len(response + start, len);
677
0
    if (!etag) {
678
0
        flb_errno();
679
0
        return NULL;
680
0
    }
681
682
0
    return etag;
683
0
}
684
685
int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload,
686
                char *body, size_t body_size, char *pre_signed_url)
687
0
{
688
0
    flb_sds_t uri = NULL;
689
0
    flb_sds_t tmp;
690
0
    int ret;
691
0
    struct flb_http_client *c = NULL;
692
0
    struct flb_aws_client *s3_client;
693
0
    struct flb_aws_header *headers = NULL;
694
0
    int num_headers = 0;
695
0
    char body_md5[25];
696
697
0
    uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8);
698
0
    if (!uri) {
699
0
        flb_errno();
700
0
        return -1;
701
0
    }
702
703
0
    if (pre_signed_url != NULL) {
704
0
        tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url));
705
0
    }
706
0
    else {
707
0
        tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s",
708
0
                            ctx->bucket, m_upload->s3_key, m_upload->part_number,
709
0
                            m_upload->upload_id);
710
0
    }
711
712
0
    if (!tmp) {
713
0
        flb_errno();
714
0
        flb_sds_destroy(uri);
715
0
        return -1;
716
0
    }
717
0
    uri = tmp;
718
719
0
    memset(body_md5, 0, sizeof(body_md5));
720
0
    if (ctx->send_content_md5 == FLB_TRUE) {
721
0
        ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5));
722
0
        if (ret != 0) {
723
0
            flb_plg_error(ctx->ins, "Failed to create Content-MD5 header");
724
0
            flb_sds_destroy(uri);
725
0
            return -1;
726
0
        }
727
728
0
        num_headers = 1;
729
0
        headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers);
730
0
        if (headers == NULL) {
731
0
            flb_errno();
732
0
            flb_sds_destroy(uri);
733
0
            return -1;
734
0
        }
735
736
0
        headers[0].key = "Content-MD5";
737
0
        headers[0].key_len = 11;
738
0
        headers[0].val = body_md5;
739
0
        headers[0].val_len = strlen(body_md5);
740
0
    }
741
742
0
    s3_client = ctx->s3_client;
743
0
    if (s3_plugin_under_test() == FLB_TRUE) {
744
0
        c = mock_s3_call("TEST_UPLOAD_PART_ERROR", "UploadPart");
745
0
    }
746
0
    else {
747
0
        c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
748
0
                                              uri, body, body_size,
749
0
                                              headers, num_headers);
750
0
    }
751
0
    flb_free(headers);
752
0
    flb_sds_destroy(uri);
753
0
    if (c) {
754
0
        flb_plg_info(ctx->ins, "UploadPart http status=%d",
755
0
                      c->resp.status);
756
0
        if (c->resp.status == 200) {
757
0
            tmp = get_etag(c->resp.data, c->resp.data_size);
758
0
            if (!tmp) {
759
0
                flb_plg_error(ctx->ins, "Could not find ETag in "
760
0
                              "UploadPart response");
761
0
                flb_plg_debug(ctx->ins, "Raw UploadPart response: %s",
762
0
                              c->resp.payload);
763
0
                flb_http_client_destroy(c);
764
0
                return -1;
765
0
            }
766
0
            m_upload->etags[m_upload->part_number - 1] = tmp;
767
0
            flb_plg_info(ctx->ins, "Successfully uploaded part #%d "
768
0
                         "for %s, UploadId=%s, ETag=%s", m_upload->part_number,
769
0
                         m_upload->s3_key, m_upload->upload_id, tmp);
770
0
            flb_http_client_destroy(c);
771
            /* track how many bytes are have gone toward this upload */
772
0
            m_upload->bytes += body_size;
773
774
            /* finally, attempt to persist the data for this upload */
775
0
            ret = save_upload(ctx, m_upload, tmp);
776
0
            if (ret == 0) {
777
0
                flb_plg_debug(ctx->ins, "Successfully persisted upload data, UploadId=%s",
778
0
                              m_upload->upload_id);
779
0
            }
780
0
            else {
781
0
                flb_plg_warn(ctx->ins, "Was not able to persisted upload data to disk; "
782
0
                            "if fluent bit dies without completing this upload the part "
783
0
                            "could be lost, UploadId=%s, ETag=%s",
784
0
                            m_upload->upload_id, tmp);
785
0
            }
786
0
            return 0;
787
0
        }
788
0
        flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size,
789
0
                                "UploadPart", ctx->ins);
790
0
        if (c->resp.payload != NULL) {
791
0
            flb_plg_debug(ctx->ins, "Raw UploadPart response: %s",
792
0
                          c->resp.payload);
793
0
        }
794
0
        flb_http_client_destroy(c);
795
0
    }
796
797
0
    flb_plg_error(ctx->ins, "UploadPart request failed");
798
0
    return -1;
799
0
}