/src/fluent-bit/plugins/out_s3/s3_multipart.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2024 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_output_plugin.h> |
21 | | #include <fluent-bit/flb_utils.h> |
22 | | #include <fluent-bit/flb_slist.h> |
23 | | #include <fluent-bit/flb_time.h> |
24 | | #include <fluent-bit/flb_pack.h> |
25 | | #include <fluent-bit/flb_config_map.h> |
26 | | #include <fluent-bit/flb_aws_util.h> |
27 | | #include <fluent-bit/flb_signv4.h> |
28 | | #include <fluent-bit/flb_fstore.h> |
29 | | #include <ctype.h> |
30 | | |
31 | | #include "s3.h" |
32 | | #include "s3_store.h" |
33 | | |
34 | 0 | #define COMPLETE_MULTIPART_UPLOAD_BASE_LEN 100 |
35 | 0 | #define COMPLETE_MULTIPART_UPLOAD_PART_LEN 124 |
36 | | |
37 | | flb_sds_t get_etag(char *response, size_t size); |
38 | | |
39 | | static inline int try_to_write(char *buf, int *off, size_t left, |
40 | | const char *str, size_t str_len) |
41 | 0 | { |
42 | 0 | if (str_len <= 0){ |
43 | 0 | str_len = strlen(str); |
44 | 0 | } |
45 | 0 | if (left <= *off+str_len) { |
46 | 0 | return FLB_FALSE; |
47 | 0 | } |
48 | 0 | memcpy(buf+*off, str, str_len); |
49 | 0 | *off += str_len; |
50 | 0 | return FLB_TRUE; |
51 | 0 | } |
52 | | |
53 | | |
54 | | /* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ |
55 | | static flb_sds_t upload_key(struct multipart_upload *m_upload) |
56 | 0 | { |
57 | 0 | flb_sds_t key; |
58 | 0 | flb_sds_t tmp; |
59 | |
|
60 | 0 | key = flb_sds_create_size(64); |
61 | |
|
62 | 0 | tmp = flb_sds_printf(&key, "%s\n%s", m_upload->s3_key, m_upload->upload_id); |
63 | 0 | if (!tmp) { |
64 | 0 | flb_errno(); |
65 | 0 | flb_sds_destroy(key); |
66 | 0 | return NULL; |
67 | 0 | } |
68 | 0 | key = tmp; |
69 | |
|
70 | 0 | return key; |
71 | 0 | } |
72 | | |
73 | | /* the 'tag' or key in the upload_dir is s3_key + \n + upload_id */ |
74 | | static int upload_data_from_key(struct multipart_upload *m_upload, char *key) |
75 | 0 | { |
76 | 0 | flb_sds_t tmp_sds; |
77 | 0 | int len = 0; |
78 | 0 | int original_len; |
79 | 0 | char *tmp; |
80 | |
|
81 | 0 | original_len = strlen(key); |
82 | |
|
83 | 0 | tmp = strchr(key, '\n'); |
84 | 0 | if (!tmp) { |
85 | 0 | return -1; |
86 | 0 | } |
87 | | |
88 | 0 | len = tmp - key; |
89 | 0 | tmp_sds = flb_sds_create_len(key, len); |
90 | 0 | if (!tmp_sds) { |
91 | 0 | flb_errno(); |
92 | 0 | return -1; |
93 | 0 | } |
94 | 0 | m_upload->s3_key = tmp_sds; |
95 | |
|
96 | 0 | tmp++; |
97 | 0 | original_len -= (len + 1); |
98 | |
|
99 | 0 | tmp_sds = flb_sds_create_len(tmp, original_len); |
100 | 0 | if (!tmp_sds) { |
101 | 0 | flb_errno(); |
102 | 0 | return -1; |
103 | 0 | } |
104 | 0 | m_upload->upload_id = tmp_sds; |
105 | |
|
106 | 0 | return 0; |
107 | 0 | } |
108 | | |
109 | | /* parse etags from file data */ |
110 | | static void parse_etags(struct multipart_upload *m_upload, char *data) |
111 | 0 | { |
112 | 0 | char *line = data; |
113 | 0 | char *start; |
114 | 0 | char *end; |
115 | 0 | flb_sds_t etag; |
116 | 0 | int part_num; |
117 | 0 | int len; |
118 | |
|
119 | 0 | if (!data) { |
120 | 0 | return; |
121 | 0 | } |
122 | | |
123 | 0 | line = strtok(data, "\n"); |
124 | |
|
125 | 0 | if (!line) { |
126 | 0 | return; |
127 | 0 | } |
128 | | |
129 | 0 | do { |
130 | 0 | start = strstr(line, "part_number="); |
131 | 0 | if (!start) { |
132 | 0 | return; |
133 | 0 | } |
134 | 0 | start += 12; |
135 | 0 | end = strchr(start, '\t'); |
136 | 0 | if (!end) { |
137 | 0 | flb_debug("[s3 restart parser] Did not find tab separator in line %s", start); |
138 | 0 | return; |
139 | 0 | } |
140 | 0 | *end = '\0'; |
141 | 0 | part_num = atoi(start); |
142 | 0 | if (part_num <= 0) { |
143 | 0 | flb_debug("[s3 restart parser] Could not parse part_number from %s", start); |
144 | 0 | return; |
145 | 0 | } |
146 | 0 | m_upload->part_number = part_num; |
147 | 0 | *end = '\t'; |
148 | |
|
149 | 0 | start = strstr(line, "tag="); |
150 | 0 | if (!start) { |
151 | 0 | flb_debug("[s3 restart parser] Could not find 'etag=' %s", line); |
152 | 0 | return; |
153 | 0 | } |
154 | | |
155 | 0 | start += 4; |
156 | 0 | len = strlen(start); |
157 | |
|
158 | 0 | if (len <= 0) { |
159 | 0 | flb_debug("[s3 restart parser] Could not find etag %s", line); |
160 | 0 | return; |
161 | 0 | } |
162 | | |
163 | 0 | etag = flb_sds_create_len(start, len); |
164 | 0 | if (!etag) { |
165 | 0 | flb_debug("[s3 restart parser] Could create etag"); |
166 | 0 | return; |
167 | 0 | } |
168 | 0 | flb_debug("[s3 restart parser] found part number %d=%s", part_num, etag); |
169 | 0 | m_upload->etags[part_num - 1] = etag; |
170 | |
|
171 | 0 | line = strtok(NULL, "\n"); |
172 | 0 | } while (line != NULL); |
173 | 0 | } |
174 | | |
175 | | static struct multipart_upload *upload_from_file(struct flb_s3 *ctx, |
176 | | struct flb_fstore_file *fsf) |
177 | 0 | { |
178 | 0 | struct multipart_upload *m_upload = NULL; |
179 | 0 | char *buffered_data = NULL; |
180 | 0 | size_t buffer_size = 0; |
181 | 0 | int ret; |
182 | |
|
183 | 0 | ret = s3_store_file_upload_read(ctx, fsf, &buffered_data, &buffer_size); |
184 | 0 | if (ret < 0) { |
185 | 0 | flb_plg_error(ctx->ins, "Could not read locally buffered data %s", |
186 | 0 | fsf->name); |
187 | 0 | return NULL; |
188 | 0 | } |
189 | | |
190 | | /* always make sure we have a fresh copy of metadata */ |
191 | 0 | ret = s3_store_file_meta_get(ctx, fsf); |
192 | 0 | if (ret == -1) { |
193 | 0 | flb_plg_error(ctx->ins, "Could not read file metadata: %s", |
194 | 0 | fsf->name); |
195 | 0 | flb_free(buffered_data); |
196 | 0 | return NULL; |
197 | 0 | } |
198 | | |
199 | 0 | m_upload = flb_calloc(1, sizeof(struct multipart_upload)); |
200 | 0 | if (!m_upload) { |
201 | 0 | flb_errno(); |
202 | 0 | flb_free(buffered_data); |
203 | 0 | return NULL; |
204 | 0 | } |
205 | 0 | m_upload->init_time = time(NULL); |
206 | 0 | m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS; |
207 | |
|
208 | 0 | ret = upload_data_from_key(m_upload, fsf->meta_buf); |
209 | 0 | if (ret < 0) { |
210 | 0 | flb_plg_error(ctx->ins, "Could not extract upload data from: %s", |
211 | 0 | fsf->name); |
212 | 0 | flb_free(buffered_data); |
213 | 0 | multipart_upload_destroy(m_upload); |
214 | 0 | return NULL; |
215 | 0 | } |
216 | | |
217 | 0 | parse_etags(m_upload, buffered_data); |
218 | 0 | flb_free(buffered_data); |
219 | 0 | if (m_upload->part_number == 0) { |
220 | 0 | flb_plg_error(ctx->ins, "Could not extract upload data from %s", |
221 | 0 | fsf->name); |
222 | 0 | multipart_upload_destroy(m_upload); |
223 | 0 | return NULL; |
224 | 0 | } |
225 | | |
226 | | /* code expects it to be 1 more than the last part read */ |
227 | 0 | m_upload->part_number++; |
228 | |
|
229 | 0 | return m_upload; |
230 | 0 | } |
231 | | |
232 | | void multipart_read_uploads_from_fs(struct flb_s3 *ctx) |
233 | 0 | { |
234 | 0 | struct mk_list *tmp; |
235 | 0 | struct mk_list *head; |
236 | 0 | struct multipart_upload *m_upload = NULL; |
237 | 0 | struct flb_fstore_file *fsf; |
238 | |
|
239 | 0 | mk_list_foreach_safe(head, tmp, &ctx->stream_upload->files) { |
240 | 0 | fsf = mk_list_entry(head, struct flb_fstore_file, _head); |
241 | 0 | m_upload = upload_from_file(ctx, fsf); |
242 | 0 | if (!m_upload) { |
243 | 0 | flb_plg_error(ctx->ins, |
244 | 0 | "Could not process multipart upload data in %s", |
245 | 0 | fsf->name); |
246 | 0 | continue; |
247 | 0 | } |
248 | 0 | mk_list_add(&m_upload->_head, &ctx->uploads); |
249 | 0 | flb_plg_info(ctx->ins, |
250 | 0 | "Successfully read existing upload from file system, s3_key=%s", |
251 | 0 | m_upload->s3_key); |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | | /* store list of part number and etag */ |
256 | | static flb_sds_t upload_data(flb_sds_t etag, int part_num) |
257 | 0 | { |
258 | 0 | flb_sds_t data; |
259 | 0 | flb_sds_t tmp; |
260 | |
|
261 | 0 | data = flb_sds_create_size(64); |
262 | |
|
263 | 0 | tmp = flb_sds_printf(&data, "part_number=%d\tetag=%s\n", part_num, etag); |
264 | 0 | if (!tmp) { |
265 | 0 | flb_errno(); |
266 | 0 | flb_sds_destroy(data); |
267 | 0 | return NULL; |
268 | 0 | } |
269 | 0 | data = tmp; |
270 | |
|
271 | 0 | return data; |
272 | 0 | } |
273 | | |
274 | | /* persists upload data to the file system */ |
275 | | static int save_upload(struct flb_s3 *ctx, struct multipart_upload *m_upload, |
276 | | flb_sds_t etag) |
277 | 0 | { |
278 | 0 | int ret; |
279 | 0 | flb_sds_t key; |
280 | 0 | flb_sds_t data; |
281 | 0 | struct flb_fstore_file *fsf; |
282 | |
|
283 | 0 | key = upload_key(m_upload); |
284 | 0 | if (!key) { |
285 | 0 | flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); |
286 | 0 | return -1; |
287 | 0 | } |
288 | | |
289 | 0 | data = upload_data(etag, m_upload->part_number); |
290 | 0 | if (!data) { |
291 | 0 | flb_plg_debug(ctx->ins, "Could not constuct upload key for buffer dir"); |
292 | 0 | return -1; |
293 | 0 | } |
294 | | |
295 | 0 | fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); |
296 | | |
297 | | /* Write the key to the file */ |
298 | 0 | ret = s3_store_file_upload_put(ctx, fsf, key, data); |
299 | |
|
300 | 0 | flb_sds_destroy(key); |
301 | 0 | flb_sds_destroy(data); |
302 | |
|
303 | 0 | return ret; |
304 | 0 | } |
305 | | |
306 | | static int remove_upload_from_fs(struct flb_s3 *ctx, struct multipart_upload *m_upload) |
307 | 0 | { |
308 | 0 | flb_sds_t key; |
309 | 0 | struct flb_fstore_file *fsf; |
310 | |
|
311 | 0 | key = upload_key(m_upload); |
312 | 0 | if (!key) { |
313 | 0 | flb_plg_debug(ctx->ins, "Could not construct upload key"); |
314 | 0 | return -1; |
315 | 0 | } |
316 | | |
317 | 0 | fsf = s3_store_file_upload_get(ctx, key, flb_sds_len(key)); |
318 | 0 | if (fsf) { |
319 | 0 | s3_store_file_upload_delete(ctx, fsf); |
320 | 0 | } |
321 | 0 | flb_sds_destroy(key); |
322 | 0 | return 0; |
323 | 0 | } |
324 | | |
325 | | /* |
326 | | * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html |
327 | | */ |
328 | | static int complete_multipart_upload_payload(struct flb_s3 *ctx, |
329 | | struct multipart_upload *m_upload, |
330 | | char **out_buf, size_t *out_size) |
331 | 0 | { |
332 | 0 | char *buf; |
333 | 0 | int i; |
334 | 0 | int offset = 0; |
335 | 0 | flb_sds_t etag; |
336 | 0 | size_t size = COMPLETE_MULTIPART_UPLOAD_BASE_LEN; |
337 | 0 | char part_num[11]; |
338 | |
|
339 | 0 | size = size + (COMPLETE_MULTIPART_UPLOAD_PART_LEN * m_upload->part_number); |
340 | |
|
341 | 0 | buf = flb_malloc(size + 1); |
342 | 0 | if (!buf) { |
343 | 0 | flb_errno(); |
344 | 0 | return -1; |
345 | 0 | } |
346 | | |
347 | 0 | if (!try_to_write(buf, &offset, size, |
348 | 0 | "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">", 73)) { |
349 | 0 | goto error; |
350 | 0 | } |
351 | | |
352 | 0 | for (i = 0; i < m_upload->part_number; i++) { |
353 | 0 | etag = m_upload->etags[i]; |
354 | 0 | if (etag == NULL) { |
355 | 0 | continue; |
356 | 0 | } |
357 | 0 | if (!try_to_write(buf, &offset, size, |
358 | 0 | "<Part><ETag>", 12)) { |
359 | 0 | goto error; |
360 | 0 | } |
361 | | |
362 | 0 | if (!try_to_write(buf, &offset, size, |
363 | 0 | etag, 0)) { |
364 | 0 | goto error; |
365 | 0 | } |
366 | | |
367 | 0 | if (!try_to_write(buf, &offset, size, |
368 | 0 | "</ETag><PartNumber>", 19)) { |
369 | 0 | goto error; |
370 | 0 | } |
371 | | |
372 | 0 | if (!sprintf(part_num, "%d", i + 1)) { |
373 | 0 | goto error; |
374 | 0 | } |
375 | | |
376 | 0 | if (!try_to_write(buf, &offset, size, |
377 | 0 | part_num, 0)) { |
378 | 0 | goto error; |
379 | 0 | } |
380 | | |
381 | 0 | if (!try_to_write(buf, &offset, size, |
382 | 0 | "</PartNumber></Part>", 20)) { |
383 | 0 | goto error; |
384 | 0 | } |
385 | 0 | } |
386 | | |
387 | 0 | if (!try_to_write(buf, &offset, size, |
388 | 0 | "</CompleteMultipartUpload>", 26)) { |
389 | 0 | goto error; |
390 | 0 | } |
391 | | |
392 | 0 | buf[offset] = '\0'; |
393 | |
|
394 | 0 | *out_buf = buf; |
395 | 0 | *out_size = offset; |
396 | 0 | return 0; |
397 | | |
398 | 0 | error: |
399 | 0 | flb_free(buf); |
400 | 0 | flb_plg_error(ctx->ins, "Failed to construct CompleteMultipartUpload " |
401 | 0 | "request body"); |
402 | 0 | return -1; |
403 | 0 | } |
404 | | |
405 | | int complete_multipart_upload(struct flb_s3 *ctx, |
406 | | struct multipart_upload *m_upload, |
407 | | char *pre_signed_url) |
408 | 0 | { |
409 | 0 | char *body; |
410 | 0 | size_t size; |
411 | 0 | flb_sds_t uri = NULL; |
412 | 0 | flb_sds_t tmp; |
413 | 0 | int ret; |
414 | 0 | struct flb_http_client *c = NULL; |
415 | 0 | struct flb_aws_client *s3_client; |
416 | |
|
417 | 0 | if (!m_upload->upload_id) { |
418 | 0 | flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " |
419 | 0 | "upload ID is unset ", m_upload->s3_key); |
420 | 0 | return -1; |
421 | 0 | } |
422 | | |
423 | 0 | uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + |
424 | 0 | flb_sds_len(m_upload->upload_id)); |
425 | 0 | if (!uri) { |
426 | 0 | flb_errno(); |
427 | 0 | return -1; |
428 | 0 | } |
429 | | |
430 | 0 | if (pre_signed_url != NULL) { |
431 | 0 | tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); |
432 | 0 | } |
433 | 0 | else { |
434 | 0 | tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, |
435 | 0 | m_upload->s3_key, m_upload->upload_id); |
436 | 0 | } |
437 | |
|
438 | 0 | if (!tmp) { |
439 | 0 | flb_sds_destroy(uri); |
440 | 0 | return -1; |
441 | 0 | } |
442 | 0 | uri = tmp; |
443 | |
|
444 | 0 | ret = complete_multipart_upload_payload(ctx, m_upload, &body, &size); |
445 | 0 | if (ret < 0) { |
446 | 0 | flb_sds_destroy(uri); |
447 | 0 | return -1; |
448 | 0 | } |
449 | | |
450 | 0 | s3_client = ctx->s3_client; |
451 | 0 | if (s3_plugin_under_test() == FLB_TRUE) { |
452 | 0 | c = mock_s3_call("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR", "CompleteMultipartUpload"); |
453 | 0 | } |
454 | 0 | else { |
455 | 0 | c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, |
456 | 0 | uri, body, size, |
457 | 0 | NULL, 0); |
458 | 0 | } |
459 | 0 | flb_sds_destroy(uri); |
460 | 0 | flb_free(body); |
461 | 0 | if (c) { |
462 | 0 | flb_plg_debug(ctx->ins, "CompleteMultipartUpload http status=%d", |
463 | 0 | c->resp.status); |
464 | 0 | if (c->resp.status == 200) { |
465 | 0 | flb_plg_info(ctx->ins, "Successfully completed multipart upload " |
466 | 0 | "for %s, UploadId=%s", m_upload->s3_key, |
467 | 0 | m_upload->upload_id); |
468 | 0 | flb_http_client_destroy(c); |
469 | | /* remove this upload from the file system */ |
470 | 0 | remove_upload_from_fs(ctx, m_upload); |
471 | 0 | return 0; |
472 | 0 | } |
473 | 0 | flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, |
474 | 0 | "CompleteMultipartUpload", ctx->ins); |
475 | 0 | if (c->resp.payload != NULL) { |
476 | 0 | flb_plg_debug(ctx->ins, "Raw CompleteMultipartUpload response: %s", |
477 | 0 | c->resp.payload); |
478 | 0 | } |
479 | 0 | flb_http_client_destroy(c); |
480 | 0 | } |
481 | | |
482 | 0 | flb_plg_error(ctx->ins, "CompleteMultipartUpload request failed"); |
483 | 0 | return -1; |
484 | 0 | } |
485 | | |
486 | | int abort_multipart_upload(struct flb_s3 *ctx, |
487 | | struct multipart_upload *m_upload, |
488 | | char *pre_signed_url) |
489 | 0 | { |
490 | 0 | flb_sds_t uri = NULL; |
491 | 0 | flb_sds_t tmp; |
492 | 0 | struct flb_http_client *c = NULL; |
493 | 0 | struct flb_aws_client *s3_client; |
494 | |
|
495 | 0 | if (!m_upload->upload_id) { |
496 | 0 | flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: " |
497 | 0 | "upload ID is unset ", m_upload->s3_key); |
498 | 0 | return -1; |
499 | 0 | } |
500 | | |
501 | 0 | uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 + |
502 | 0 | flb_sds_len(m_upload->upload_id)); |
503 | 0 | if (!uri) { |
504 | 0 | flb_errno(); |
505 | 0 | return -1; |
506 | 0 | } |
507 | | |
508 | 0 | if (pre_signed_url != NULL) { |
509 | 0 | tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); |
510 | 0 | } |
511 | 0 | else { |
512 | 0 | tmp = flb_sds_printf(&uri, "/%s%s?uploadId=%s", ctx->bucket, |
513 | 0 | m_upload->s3_key, m_upload->upload_id); |
514 | 0 | } |
515 | |
|
516 | 0 | if (!tmp) { |
517 | 0 | flb_sds_destroy(uri); |
518 | 0 | return -1; |
519 | 0 | } |
520 | 0 | uri = tmp; |
521 | |
|
522 | 0 | s3_client = ctx->s3_client; |
523 | 0 | if (s3_plugin_under_test() == FLB_TRUE) { |
524 | 0 | c = mock_s3_call("TEST_ABORT_MULTIPART_UPLOAD_ERROR", "AbortMultipartUpload"); |
525 | 0 | } |
526 | 0 | else { |
527 | 0 | c = s3_client->client_vtable->request(s3_client, FLB_HTTP_DELETE, |
528 | 0 | uri, NULL, 0, |
529 | 0 | NULL, 0); |
530 | 0 | } |
531 | 0 | flb_sds_destroy(uri); |
532 | |
|
533 | 0 | if (c) { |
534 | 0 | flb_plg_debug(ctx->ins, "AbortMultipartUpload http status=%d", |
535 | 0 | c->resp.status); |
536 | 0 | if (c->resp.status == 204) { |
537 | 0 | flb_plg_info(ctx->ins, "Successfully completed multipart upload " |
538 | 0 | "for %s, UploadId=%s", m_upload->s3_key, |
539 | 0 | m_upload->upload_id); |
540 | 0 | flb_http_client_destroy(c); |
541 | | /* remove this upload from the file system */ |
542 | 0 | remove_upload_from_fs(ctx, m_upload); |
543 | 0 | return 0; |
544 | 0 | } |
545 | 0 | flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, |
546 | 0 | "AbortMultipartUpload", ctx->ins); |
547 | 0 | if (c->resp.payload != NULL) { |
548 | 0 | flb_plg_debug(ctx->ins, "Raw AbortMultipartUpload response: %s", |
549 | 0 | c->resp.payload); |
550 | 0 | } |
551 | 0 | flb_http_client_destroy(c); |
552 | 0 | } |
553 | | |
554 | 0 | flb_plg_error(ctx->ins, "AbortMultipartUpload request failed"); |
555 | 0 | return -1; |
556 | 0 | } |
557 | | |
558 | | int create_multipart_upload(struct flb_s3 *ctx, |
559 | | struct multipart_upload *m_upload, |
560 | | char *pre_signed_url) |
561 | 0 | { |
562 | 0 | flb_sds_t uri = NULL; |
563 | 0 | flb_sds_t tmp; |
564 | 0 | struct flb_http_client *c = NULL; |
565 | 0 | struct flb_aws_client *s3_client; |
566 | 0 | struct flb_aws_header *headers = NULL; |
567 | 0 | int num_headers = 0; |
568 | 0 | int ret; |
569 | |
|
570 | 0 | uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); |
571 | 0 | if (!uri) { |
572 | 0 | flb_errno(); |
573 | 0 | return -1; |
574 | 0 | } |
575 | | |
576 | 0 | if (pre_signed_url != NULL) { |
577 | 0 | tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); |
578 | 0 | } |
579 | 0 | else { |
580 | 0 | tmp = flb_sds_printf(&uri, "/%s%s?uploads=", ctx->bucket, m_upload->s3_key); |
581 | 0 | } |
582 | |
|
583 | 0 | if (!tmp) { |
584 | 0 | flb_sds_destroy(uri); |
585 | 0 | return -1; |
586 | 0 | } |
587 | 0 | uri = tmp; |
588 | |
|
589 | 0 | s3_client = ctx->s3_client; |
590 | 0 | if (s3_plugin_under_test() == FLB_TRUE) { |
591 | 0 | c = mock_s3_call("TEST_CREATE_MULTIPART_UPLOAD_ERROR", "CreateMultipartUpload"); |
592 | 0 | } |
593 | 0 | else { |
594 | 0 | ret = create_headers(ctx, NULL, &headers, &num_headers, FLB_TRUE); |
595 | 0 | if (ret == -1) { |
596 | 0 | flb_plg_error(ctx->ins, "Failed to create headers"); |
597 | 0 | flb_sds_destroy(uri); |
598 | 0 | return -1; |
599 | 0 | } |
600 | 0 | c = s3_client->client_vtable->request(s3_client, FLB_HTTP_POST, |
601 | 0 | uri, NULL, 0, headers, num_headers); |
602 | 0 | if (headers) { |
603 | 0 | flb_free(headers); |
604 | 0 | } |
605 | 0 | } |
606 | 0 | flb_sds_destroy(uri); |
607 | 0 | if (c) { |
608 | 0 | flb_plg_debug(ctx->ins, "CreateMultipartUpload http status=%d", |
609 | 0 | c->resp.status); |
610 | 0 | if (c->resp.status == 200) { |
611 | 0 | tmp = flb_aws_xml_get_val(c->resp.payload, c->resp.payload_size, |
612 | 0 | "<UploadId>", "</UploadId>"); |
613 | 0 | if (!tmp) { |
614 | 0 | flb_plg_error(ctx->ins, "Could not find upload ID in " |
615 | 0 | "CreateMultipartUpload response"); |
616 | 0 | flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", |
617 | 0 | c->resp.payload); |
618 | 0 | flb_http_client_destroy(c); |
619 | 0 | return -1; |
620 | 0 | } |
621 | 0 | m_upload->upload_id = tmp; |
622 | 0 | flb_plg_info(ctx->ins, "Successfully initiated multipart upload " |
623 | 0 | "for %s, UploadId=%s", m_upload->s3_key, |
624 | 0 | m_upload->upload_id); |
625 | 0 | flb_http_client_destroy(c); |
626 | 0 | return 0; |
627 | 0 | } |
628 | 0 | flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, |
629 | 0 | "CreateMultipartUpload", ctx->ins); |
630 | 0 | if (c->resp.payload != NULL) { |
631 | 0 | flb_plg_debug(ctx->ins, "Raw CreateMultipartUpload response: %s", |
632 | 0 | c->resp.payload); |
633 | 0 | } |
634 | 0 | flb_http_client_destroy(c); |
635 | 0 | } |
636 | | |
637 | 0 | flb_plg_error(ctx->ins, "CreateMultipartUpload request failed"); |
638 | 0 | return -1; |
639 | 0 | } |
640 | | |
641 | | /* gets the ETag value from response headers */ |
642 | | flb_sds_t get_etag(char *response, size_t size) |
643 | 0 | { |
644 | 0 | char *tmp; |
645 | 0 | int start; |
646 | 0 | int end; |
647 | 0 | int len; |
648 | 0 | int i = 0; |
649 | 0 | flb_sds_t etag; |
650 | |
|
651 | 0 | if (response == NULL) { |
652 | 0 | return NULL; |
653 | 0 | } |
654 | | |
655 | 0 | tmp = strstr(response, "ETag:"); |
656 | 0 | if (!tmp) { |
657 | 0 | return NULL; |
658 | 0 | } |
659 | 0 | i = tmp - response; |
660 | | |
661 | | /* advance to end of ETag key */ |
662 | 0 | i += 5; |
663 | | |
664 | | /* advance across any whitespace and the opening quote */ |
665 | 0 | while (i < size && (response[i] == '\"' || isspace(response[i]) != 0)) { |
666 | 0 | i++; |
667 | 0 | } |
668 | 0 | start = i; |
669 | | /* advance until we hit whitespace or the end quote */ |
670 | 0 | while (i < size && (response[i] != '\"' && isspace(response[i]) == 0)) { |
671 | 0 | i++; |
672 | 0 | } |
673 | 0 | end = i; |
674 | 0 | len = end - start; |
675 | |
|
676 | 0 | etag = flb_sds_create_len(response + start, len); |
677 | 0 | if (!etag) { |
678 | 0 | flb_errno(); |
679 | 0 | return NULL; |
680 | 0 | } |
681 | | |
682 | 0 | return etag; |
683 | 0 | } |
684 | | |
685 | | int upload_part(struct flb_s3 *ctx, struct multipart_upload *m_upload, |
686 | | char *body, size_t body_size, char *pre_signed_url) |
687 | 0 | { |
688 | 0 | flb_sds_t uri = NULL; |
689 | 0 | flb_sds_t tmp; |
690 | 0 | int ret; |
691 | 0 | struct flb_http_client *c = NULL; |
692 | 0 | struct flb_aws_client *s3_client; |
693 | 0 | struct flb_aws_header *headers = NULL; |
694 | 0 | int num_headers = 0; |
695 | 0 | char body_md5[25]; |
696 | |
|
697 | 0 | uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 8); |
698 | 0 | if (!uri) { |
699 | 0 | flb_errno(); |
700 | 0 | return -1; |
701 | 0 | } |
702 | | |
703 | 0 | if (pre_signed_url != NULL) { |
704 | 0 | tmp = flb_sds_copy(uri, pre_signed_url, strlen(pre_signed_url)); |
705 | 0 | } |
706 | 0 | else { |
707 | 0 | tmp = flb_sds_printf(&uri, "/%s%s?partNumber=%d&uploadId=%s", |
708 | 0 | ctx->bucket, m_upload->s3_key, m_upload->part_number, |
709 | 0 | m_upload->upload_id); |
710 | 0 | } |
711 | |
|
712 | 0 | if (!tmp) { |
713 | 0 | flb_errno(); |
714 | 0 | flb_sds_destroy(uri); |
715 | 0 | return -1; |
716 | 0 | } |
717 | 0 | uri = tmp; |
718 | |
|
719 | 0 | memset(body_md5, 0, sizeof(body_md5)); |
720 | 0 | if (ctx->send_content_md5 == FLB_TRUE) { |
721 | 0 | ret = get_md5_base64(body, body_size, body_md5, sizeof(body_md5)); |
722 | 0 | if (ret != 0) { |
723 | 0 | flb_plg_error(ctx->ins, "Failed to create Content-MD5 header"); |
724 | 0 | flb_sds_destroy(uri); |
725 | 0 | return -1; |
726 | 0 | } |
727 | | |
728 | 0 | num_headers = 1; |
729 | 0 | headers = flb_malloc(sizeof(struct flb_aws_header) * num_headers); |
730 | 0 | if (headers == NULL) { |
731 | 0 | flb_errno(); |
732 | 0 | flb_sds_destroy(uri); |
733 | 0 | return -1; |
734 | 0 | } |
735 | | |
736 | 0 | headers[0].key = "Content-MD5"; |
737 | 0 | headers[0].key_len = 11; |
738 | 0 | headers[0].val = body_md5; |
739 | 0 | headers[0].val_len = strlen(body_md5); |
740 | 0 | } |
741 | | |
742 | 0 | s3_client = ctx->s3_client; |
743 | 0 | if (s3_plugin_under_test() == FLB_TRUE) { |
744 | 0 | c = mock_s3_call("TEST_UPLOAD_PART_ERROR", "UploadPart"); |
745 | 0 | } |
746 | 0 | else { |
747 | 0 | c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT, |
748 | 0 | uri, body, body_size, |
749 | 0 | headers, num_headers); |
750 | 0 | } |
751 | 0 | flb_free(headers); |
752 | 0 | flb_sds_destroy(uri); |
753 | 0 | if (c) { |
754 | 0 | flb_plg_info(ctx->ins, "UploadPart http status=%d", |
755 | 0 | c->resp.status); |
756 | 0 | if (c->resp.status == 200) { |
757 | 0 | tmp = get_etag(c->resp.data, c->resp.data_size); |
758 | 0 | if (!tmp) { |
759 | 0 | flb_plg_error(ctx->ins, "Could not find ETag in " |
760 | 0 | "UploadPart response"); |
761 | 0 | flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", |
762 | 0 | c->resp.payload); |
763 | 0 | flb_http_client_destroy(c); |
764 | 0 | return -1; |
765 | 0 | } |
766 | 0 | m_upload->etags[m_upload->part_number - 1] = tmp; |
767 | 0 | flb_plg_info(ctx->ins, "Successfully uploaded part #%d " |
768 | 0 | "for %s, UploadId=%s, ETag=%s", m_upload->part_number, |
769 | 0 | m_upload->s3_key, m_upload->upload_id, tmp); |
770 | 0 | flb_http_client_destroy(c); |
771 | | /* track how many bytes are have gone toward this upload */ |
772 | 0 | m_upload->bytes += body_size; |
773 | | |
774 | | /* finally, attempt to persist the data for this upload */ |
775 | 0 | ret = save_upload(ctx, m_upload, tmp); |
776 | 0 | if (ret == 0) { |
777 | 0 | flb_plg_debug(ctx->ins, "Successfully persisted upload data, UploadId=%s", |
778 | 0 | m_upload->upload_id); |
779 | 0 | } |
780 | 0 | else { |
781 | 0 | flb_plg_warn(ctx->ins, "Was not able to persisted upload data to disk; " |
782 | 0 | "if fluent bit dies without completing this upload the part " |
783 | 0 | "could be lost, UploadId=%s, ETag=%s", |
784 | 0 | m_upload->upload_id, tmp); |
785 | 0 | } |
786 | 0 | return 0; |
787 | 0 | } |
788 | 0 | flb_aws_print_xml_error(c->resp.payload, c->resp.payload_size, |
789 | 0 | "UploadPart", ctx->ins); |
790 | 0 | if (c->resp.payload != NULL) { |
791 | 0 | flb_plg_debug(ctx->ins, "Raw UploadPart response: %s", |
792 | 0 | c->resp.payload); |
793 | 0 | } |
794 | 0 | flb_http_client_destroy(c); |
795 | 0 | } |
796 | | |
797 | 0 | flb_plg_error(ctx->ins, "UploadPart request failed"); |
798 | 0 | return -1; |
799 | 0 | } |