1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import math
14from io import BytesIO
15
16from s3transfer.compat import readable, seekable
17from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
18from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
19from s3transfer.tasks import (
20 CompleteMultipartUploadTask,
21 CreateMultipartUploadTask,
22 SubmissionTask,
23 Task,
24)
25from s3transfer.utils import (
26 ChunksizeAdjuster,
27 DeferredOpenFile,
28 get_callbacks,
29 get_filtered_dict,
30)
31
32
33class AggregatedProgressCallback:
34 def __init__(self, callbacks, threshold=1024 * 256):
35 """Aggregates progress updates for every provided progress callback
36
37 :type callbacks: A list of functions that accepts bytes_transferred
38 as a single argument
39 :param callbacks: The callbacks to invoke when threshold is reached
40
41 :type threshold: int
42 :param threshold: The progress threshold in which to take the
43 aggregated progress and invoke the progress callback with that
44 aggregated progress total
45 """
46 self._callbacks = callbacks
47 self._threshold = threshold
48 self._bytes_seen = 0
49
50 def __call__(self, bytes_transferred):
51 self._bytes_seen += bytes_transferred
52 if self._bytes_seen >= self._threshold:
53 self._trigger_callbacks()
54
55 def flush(self):
56 """Flushes out any progress that has not been sent to its callbacks"""
57 if self._bytes_seen > 0:
58 self._trigger_callbacks()
59
60 def _trigger_callbacks(self):
61 for callback in self._callbacks:
62 callback(bytes_transferred=self._bytes_seen)
63 self._bytes_seen = 0
64
65
66class InterruptReader:
67 """Wrapper that can interrupt reading using an error
68
69 It uses a transfer coordinator to propagate an error if it notices
70 that a read is being made while the file is being read from.
71
72 :type fileobj: file-like obj
73 :param fileobj: The file-like object to read from
74
75 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
76 :param transfer_coordinator: The transfer coordinator to use if the
77 reader needs to be interrupted.
78 """
79
80 def __init__(self, fileobj, transfer_coordinator):
81 self._fileobj = fileobj
82 self._transfer_coordinator = transfer_coordinator
83
84 def read(self, amount=None):
85 # If there is an exception, then raise the exception.
86 # We raise an error instead of returning no bytes because for
87 # requests where the content length and md5 was sent, it will
88 # cause md5 mismatches and retries as there was no indication that
89 # the stream being read from encountered any issues.
90 if self._transfer_coordinator.exception:
91 raise self._transfer_coordinator.exception
92 return self._fileobj.read(amount)
93
94 def seek(self, where, whence=0):
95 self._fileobj.seek(where, whence)
96
97 def tell(self):
98 return self._fileobj.tell()
99
100 def close(self):
101 self._fileobj.close()
102
103 def __enter__(self):
104 return self
105
106 def __exit__(self, *args, **kwargs):
107 self.close()
108
109
110class UploadInputManager:
111 """Base manager class for handling various types of files for uploads
112
113 This class is typically used for the UploadSubmissionTask class to help
114 determine the following:
115
116 * How to determine the size of the file
117 * How to determine if a multipart upload is required
118 * How to retrieve the body for a PutObject
119 * How to retrieve the bodies for a set of UploadParts
120
121 The answers/implementations differ for the various types of file inputs
122 that may be accepted. All implementations must subclass and override
123 public methods from this class.
124 """
125
126 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
127 self._osutil = osutil
128 self._transfer_coordinator = transfer_coordinator
129 self._bandwidth_limiter = bandwidth_limiter
130
131 @classmethod
132 def is_compatible(cls, upload_source):
133 """Determines if the source for the upload is compatible with manager
134
135 :param upload_source: The source for which the upload will pull data
136 from.
137
138 :returns: True if the manager can handle the type of source specified
139 otherwise returns False.
140 """
141 raise NotImplementedError('must implement _is_compatible()')
142
143 def stores_body_in_memory(self, operation_name):
144 """Whether the body it provides are stored in-memory
145
146 :type operation_name: str
147 :param operation_name: The name of the client operation that the body
148 is being used for. Valid operation_names are ``put_object`` and
149 ``upload_part``.
150
151 :rtype: boolean
152 :returns: True if the body returned by the manager will be stored in
153 memory. False if the manager will not directly store the body in
154 memory.
155 """
156 raise NotImplementedError('must implement store_body_in_memory()')
157
158 def provide_transfer_size(self, transfer_future):
159 """Provides the transfer size of an upload
160
161 :type transfer_future: s3transfer.futures.TransferFuture
162 :param transfer_future: The future associated with upload request
163 """
164 raise NotImplementedError('must implement provide_transfer_size()')
165
166 def requires_multipart_upload(self, transfer_future, config):
167 """Determines where a multipart upload is required
168
169 :type transfer_future: s3transfer.futures.TransferFuture
170 :param transfer_future: The future associated with upload request
171
172 :type config: s3transfer.manager.TransferConfig
173 :param config: The config associated to the transfer manager
174
175 :rtype: boolean
176 :returns: True, if the upload should be multipart based on
177 configuration and size. False, otherwise.
178 """
179 raise NotImplementedError('must implement requires_multipart_upload()')
180
181 def get_put_object_body(self, transfer_future):
182 """Returns the body to use for PutObject
183
184 :type transfer_future: s3transfer.futures.TransferFuture
185 :param transfer_future: The future associated with upload request
186
187 :type config: s3transfer.manager.TransferConfig
188 :param config: The config associated to the transfer manager
189
190 :rtype: s3transfer.utils.ReadFileChunk
191 :returns: A ReadFileChunk including all progress callbacks
192 associated with the transfer future.
193 """
194 raise NotImplementedError('must implement get_put_object_body()')
195
196 def yield_upload_part_bodies(self, transfer_future, chunksize):
197 """Yields the part number and body to use for each UploadPart
198
199 :type transfer_future: s3transfer.futures.TransferFuture
200 :param transfer_future: The future associated with upload request
201
202 :type chunksize: int
203 :param chunksize: The chunksize to use for this upload.
204
205 :rtype: int, s3transfer.utils.ReadFileChunk
206 :returns: Yields the part number and the ReadFileChunk including all
207 progress callbacks associated with the transfer future for that
208 specific yielded part.
209 """
210 raise NotImplementedError('must implement yield_upload_part_bodies()')
211
212 def _wrap_fileobj(self, fileobj):
213 fileobj = InterruptReader(fileobj, self._transfer_coordinator)
214 if self._bandwidth_limiter:
215 fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
216 fileobj, self._transfer_coordinator, enabled=False
217 )
218 return fileobj
219
220 def _get_progress_callbacks(self, transfer_future):
221 callbacks = get_callbacks(transfer_future, 'progress')
222 # We only want to be wrapping the callbacks if there are callbacks to
223 # invoke because we do not want to be doing any unnecessary work if
224 # there are no callbacks to invoke.
225 if callbacks:
226 return [AggregatedProgressCallback(callbacks)]
227 return []
228
229 def _get_close_callbacks(self, aggregated_progress_callbacks):
230 return [callback.flush for callback in aggregated_progress_callbacks]
231
232
233class UploadFilenameInputManager(UploadInputManager):
234 """Upload utility for filenames"""
235
236 @classmethod
237 def is_compatible(cls, upload_source):
238 return isinstance(upload_source, str)
239
240 def stores_body_in_memory(self, operation_name):
241 return False
242
243 def provide_transfer_size(self, transfer_future):
244 transfer_future.meta.provide_transfer_size(
245 self._osutil.get_file_size(transfer_future.meta.call_args.fileobj)
246 )
247
248 def requires_multipart_upload(self, transfer_future, config):
249 return transfer_future.meta.size >= config.multipart_threshold
250
251 def get_put_object_body(self, transfer_future):
252 # Get a file-like object for the given input
253 fileobj, full_size = self._get_put_object_fileobj_with_full_size(
254 transfer_future
255 )
256
257 # Wrap fileobj with interrupt reader that will quickly cancel
258 # uploads if needed instead of having to wait for the socket
259 # to completely read all of the data.
260 fileobj = self._wrap_fileobj(fileobj)
261
262 callbacks = self._get_progress_callbacks(transfer_future)
263 close_callbacks = self._get_close_callbacks(callbacks)
264 size = transfer_future.meta.size
265 # Return the file-like object wrapped into a ReadFileChunk to get
266 # progress.
267 return self._osutil.open_file_chunk_reader_from_fileobj(
268 fileobj=fileobj,
269 chunk_size=size,
270 full_file_size=full_size,
271 callbacks=callbacks,
272 close_callbacks=close_callbacks,
273 )
274
275 def yield_upload_part_bodies(self, transfer_future, chunksize):
276 full_file_size = transfer_future.meta.size
277 num_parts = self._get_num_parts(transfer_future, chunksize)
278 for part_number in range(1, num_parts + 1):
279 callbacks = self._get_progress_callbacks(transfer_future)
280 close_callbacks = self._get_close_callbacks(callbacks)
281 start_byte = chunksize * (part_number - 1)
282 # Get a file-like object for that part and the size of the full
283 # file size for the associated file-like object for that part.
284 fileobj, full_size = self._get_upload_part_fileobj_with_full_size(
285 transfer_future.meta.call_args.fileobj,
286 start_byte=start_byte,
287 part_size=chunksize,
288 full_file_size=full_file_size,
289 )
290
291 # Wrap fileobj with interrupt reader that will quickly cancel
292 # uploads if needed instead of having to wait for the socket
293 # to completely read all of the data.
294 fileobj = self._wrap_fileobj(fileobj)
295
296 # Wrap the file-like object into a ReadFileChunk to get progress.
297 read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
298 fileobj=fileobj,
299 chunk_size=chunksize,
300 full_file_size=full_size,
301 callbacks=callbacks,
302 close_callbacks=close_callbacks,
303 )
304 yield part_number, read_file_chunk
305
306 def _get_deferred_open_file(self, fileobj, start_byte):
307 fileobj = DeferredOpenFile(
308 fileobj, start_byte, open_function=self._osutil.open
309 )
310 return fileobj
311
312 def _get_put_object_fileobj_with_full_size(self, transfer_future):
313 fileobj = transfer_future.meta.call_args.fileobj
314 size = transfer_future.meta.size
315 return self._get_deferred_open_file(fileobj, 0), size
316
317 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
318 start_byte = kwargs['start_byte']
319 full_size = kwargs['full_file_size']
320 return self._get_deferred_open_file(fileobj, start_byte), full_size
321
322 def _get_num_parts(self, transfer_future, part_size):
323 return int(math.ceil(transfer_future.meta.size / float(part_size)))
324
325
326class UploadSeekableInputManager(UploadFilenameInputManager):
327 """Upload utility for an open file object"""
328
329 @classmethod
330 def is_compatible(cls, upload_source):
331 return readable(upload_source) and seekable(upload_source)
332
333 def stores_body_in_memory(self, operation_name):
334 if operation_name == 'put_object':
335 return False
336 else:
337 return True
338
339 def provide_transfer_size(self, transfer_future):
340 fileobj = transfer_future.meta.call_args.fileobj
341 # To determine size, first determine the starting position
342 # Seek to the end and then find the difference in the length
343 # between the end and start positions.
344 start_position = fileobj.tell()
345 fileobj.seek(0, 2)
346 end_position = fileobj.tell()
347 fileobj.seek(start_position)
348 transfer_future.meta.provide_transfer_size(
349 end_position - start_position
350 )
351
352 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
353 # Note: It is unfortunate that in order to do a multithreaded
354 # multipart upload we cannot simply copy the filelike object
355 # since there is not really a mechanism in python (i.e. os.dup
356 # points to the same OS filehandle which causes concurrency
357 # issues). So instead we need to read from the fileobj and
358 # chunk the data out to separate file-like objects in memory.
359 data = fileobj.read(kwargs['part_size'])
360 # We return the length of the data instead of the full_file_size
361 # because we partitioned the data into separate BytesIO objects
362 # meaning the BytesIO object has no knowledge of its start position
363 # relative the input source nor access to the rest of the input
364 # source. So we must treat it as its own standalone file.
365 return BytesIO(data), len(data)
366
367 def _get_put_object_fileobj_with_full_size(self, transfer_future):
368 fileobj = transfer_future.meta.call_args.fileobj
369 # The current position needs to be taken into account when retrieving
370 # the full size of the file.
371 size = fileobj.tell() + transfer_future.meta.size
372 return fileobj, size
373
374
375class UploadNonSeekableInputManager(UploadInputManager):
376 """Upload utility for a file-like object that cannot seek."""
377
378 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
379 super().__init__(osutil, transfer_coordinator, bandwidth_limiter)
380 self._initial_data = b''
381
382 @classmethod
383 def is_compatible(cls, upload_source):
384 return readable(upload_source)
385
386 def stores_body_in_memory(self, operation_name):
387 return True
388
389 def provide_transfer_size(self, transfer_future):
390 # No-op because there is no way to do this short of reading the entire
391 # body into memory.
392 return
393
394 def requires_multipart_upload(self, transfer_future, config):
395 # If the user has set the size, we can use that.
396 if transfer_future.meta.size is not None:
397 return transfer_future.meta.size >= config.multipart_threshold
398
399 # This is tricky to determine in this case because we can't know how
400 # large the input is. So to figure it out, we read data into memory
401 # up until the threshold and compare how much data was actually read
402 # against the threshold.
403 fileobj = transfer_future.meta.call_args.fileobj
404 threshold = config.multipart_threshold
405 self._initial_data = self._read(fileobj, threshold, False)
406 if len(self._initial_data) < threshold:
407 return False
408 else:
409 return True
410
411 def get_put_object_body(self, transfer_future):
412 callbacks = self._get_progress_callbacks(transfer_future)
413 close_callbacks = self._get_close_callbacks(callbacks)
414 fileobj = transfer_future.meta.call_args.fileobj
415
416 body = self._wrap_data(
417 self._initial_data + fileobj.read(), callbacks, close_callbacks
418 )
419
420 # Zero out the stored data so we don't have additional copies
421 # hanging around in memory.
422 self._initial_data = None
423 return body
424
425 def yield_upload_part_bodies(self, transfer_future, chunksize):
426 file_object = transfer_future.meta.call_args.fileobj
427 part_number = 0
428
429 # Continue reading parts from the file-like object until it is empty.
430 while True:
431 callbacks = self._get_progress_callbacks(transfer_future)
432 close_callbacks = self._get_close_callbacks(callbacks)
433 part_number += 1
434 part_content = self._read(file_object, chunksize)
435 if not part_content:
436 break
437 part_object = self._wrap_data(
438 part_content, callbacks, close_callbacks
439 )
440
441 # Zero out part_content to avoid hanging on to additional data.
442 part_content = None
443 yield part_number, part_object
444
445 def _read(self, fileobj, amount, truncate=True):
446 """
447 Reads a specific amount of data from a stream and returns it. If there
448 is any data in initial_data, that will be popped out first.
449
450 :type fileobj: A file-like object that implements read
451 :param fileobj: The stream to read from.
452
453 :type amount: int
454 :param amount: The number of bytes to read from the stream.
455
456 :type truncate: bool
457 :param truncate: Whether or not to truncate initial_data after
458 reading from it.
459
460 :return: Generator which generates part bodies from the initial data.
461 """
462 # If the the initial data is empty, we simply read from the fileobj
463 if len(self._initial_data) == 0:
464 return fileobj.read(amount)
465
466 # If the requested number of bytes is less than the amount of
467 # initial data, pull entirely from initial data.
468 if amount <= len(self._initial_data):
469 data = self._initial_data[:amount]
470 # Truncate initial data so we don't hang onto the data longer
471 # than we need.
472 if truncate:
473 self._initial_data = self._initial_data[amount:]
474 return data
475
476 # At this point there is some initial data left, but not enough to
477 # satisfy the number of bytes requested. Pull out the remaining
478 # initial data and read the rest from the fileobj.
479 amount_to_read = amount - len(self._initial_data)
480 data = self._initial_data + fileobj.read(amount_to_read)
481
482 # Zero out initial data so we don't hang onto the data any more.
483 if truncate:
484 self._initial_data = b''
485 return data
486
487 def _wrap_data(self, data, callbacks, close_callbacks):
488 """
489 Wraps data with the interrupt reader and the file chunk reader.
490
491 :type data: bytes
492 :param data: The data to wrap.
493
494 :type callbacks: list
495 :param callbacks: The callbacks associated with the transfer future.
496
497 :type close_callbacks: list
498 :param close_callbacks: The callbacks to be called when closing the
499 wrapper for the data.
500
501 :return: Fully wrapped data.
502 """
503 fileobj = self._wrap_fileobj(BytesIO(data))
504 return self._osutil.open_file_chunk_reader_from_fileobj(
505 fileobj=fileobj,
506 chunk_size=len(data),
507 full_file_size=len(data),
508 callbacks=callbacks,
509 close_callbacks=close_callbacks,
510 )
511
512
513class UploadSubmissionTask(SubmissionTask):
514 """Task for submitting tasks to execute an upload"""
515
516 PUT_OBJECT_BLOCKLIST = ["ChecksumType", "MpuObjectSize"]
517
518 CREATE_MULTIPART_BLOCKLIST = FULL_OBJECT_CHECKSUM_ARGS + ["MpuObjectSize"]
519
520 UPLOAD_PART_ARGS = [
521 'ChecksumAlgorithm',
522 'SSECustomerKey',
523 'SSECustomerAlgorithm',
524 'SSECustomerKeyMD5',
525 'RequestPayer',
526 'ExpectedBucketOwner',
527 ]
528
529 COMPLETE_MULTIPART_ARGS = [
530 'SSECustomerKey',
531 'SSECustomerAlgorithm',
532 'SSECustomerKeyMD5',
533 'RequestPayer',
534 'ExpectedBucketOwner',
535 'ChecksumType',
536 'MpuObjectSize',
537 ] + FULL_OBJECT_CHECKSUM_ARGS
538
539 def _get_upload_input_manager_cls(self, transfer_future):
540 """Retrieves a class for managing input for an upload based on file type
541
542 :type transfer_future: s3transfer.futures.TransferFuture
543 :param transfer_future: The transfer future for the request
544
545 :rtype: class of UploadInputManager
546 :returns: The appropriate class to use for managing a specific type of
547 input for uploads.
548 """
549 upload_manager_resolver_chain = [
550 UploadFilenameInputManager,
551 UploadSeekableInputManager,
552 UploadNonSeekableInputManager,
553 ]
554
555 fileobj = transfer_future.meta.call_args.fileobj
556 for upload_manager_cls in upload_manager_resolver_chain:
557 if upload_manager_cls.is_compatible(fileobj):
558 return upload_manager_cls
559 raise RuntimeError(
560 f'Input {fileobj} of type: {type(fileobj)} is not supported.'
561 )
562
563 def _submit(
564 self,
565 client,
566 config,
567 osutil,
568 request_executor,
569 transfer_future,
570 bandwidth_limiter=None,
571 ):
572 """
573 :param client: The client associated with the transfer manager
574
575 :type config: s3transfer.manager.TransferConfig
576 :param config: The transfer config associated with the transfer
577 manager
578
579 :type osutil: s3transfer.utils.OSUtil
580 :param osutil: The os utility associated to the transfer manager
581
582 :type request_executor: s3transfer.futures.BoundedExecutor
583 :param request_executor: The request executor associated with the
584 transfer manager
585
586 :type transfer_future: s3transfer.futures.TransferFuture
587 :param transfer_future: The transfer future associated with the
588 transfer request that tasks are being submitted for
589 """
590 upload_input_manager = self._get_upload_input_manager_cls(
591 transfer_future
592 )(osutil, self._transfer_coordinator, bandwidth_limiter)
593
594 # Determine the size if it was not provided
595 if transfer_future.meta.size is None:
596 upload_input_manager.provide_transfer_size(transfer_future)
597
598 # Do a multipart upload if needed, otherwise do a regular put object.
599 if not upload_input_manager.requires_multipart_upload(
600 transfer_future, config
601 ):
602 self._submit_upload_request(
603 client,
604 config,
605 osutil,
606 request_executor,
607 transfer_future,
608 upload_input_manager,
609 )
610 else:
611 self._submit_multipart_request(
612 client,
613 config,
614 osutil,
615 request_executor,
616 transfer_future,
617 upload_input_manager,
618 )
619
620 def _submit_upload_request(
621 self,
622 client,
623 config,
624 osutil,
625 request_executor,
626 transfer_future,
627 upload_input_manager,
628 ):
629 call_args = transfer_future.meta.call_args
630
631 put_object_extra_args = self._extra_put_object_args(
632 call_args.extra_args
633 )
634
635 # Get any tags that need to be associated to the put object task
636 put_object_tag = self._get_upload_task_tag(
637 upload_input_manager, 'put_object'
638 )
639
640 # Submit the request of a single upload.
641 self._transfer_coordinator.submit(
642 request_executor,
643 PutObjectTask(
644 transfer_coordinator=self._transfer_coordinator,
645 main_kwargs={
646 'client': client,
647 'fileobj': upload_input_manager.get_put_object_body(
648 transfer_future
649 ),
650 'bucket': call_args.bucket,
651 'key': call_args.key,
652 'extra_args': put_object_extra_args,
653 },
654 is_final=True,
655 ),
656 tag=put_object_tag,
657 )
658
659 def _submit_multipart_request(
660 self,
661 client,
662 config,
663 osutil,
664 request_executor,
665 transfer_future,
666 upload_input_manager,
667 ):
668 call_args = transfer_future.meta.call_args
669
670 # When a user provided checksum is passed, set "ChecksumType" to "FULL_OBJECT"
671 # and "ChecksumAlgorithm" to the related algorithm.
672 for checksum in FULL_OBJECT_CHECKSUM_ARGS:
673 if checksum in call_args.extra_args:
674 call_args.extra_args["ChecksumType"] = "FULL_OBJECT"
675 call_args.extra_args["ChecksumAlgorithm"] = checksum.replace(
676 "Checksum", ""
677 )
678
679 create_multipart_extra_args = self._extra_create_multipart_args(
680 call_args.extra_args
681 )
682
683 # Submit the request to create a multipart upload.
684 create_multipart_future = self._transfer_coordinator.submit(
685 request_executor,
686 CreateMultipartUploadTask(
687 transfer_coordinator=self._transfer_coordinator,
688 main_kwargs={
689 'client': client,
690 'bucket': call_args.bucket,
691 'key': call_args.key,
692 'extra_args': create_multipart_extra_args,
693 },
694 ),
695 )
696
697 # Submit requests to upload the parts of the file.
698 part_futures = []
699 extra_part_args = self._extra_upload_part_args(call_args.extra_args)
700
701 # Get any tags that need to be associated to the submitted task
702 # for upload the data
703 upload_part_tag = self._get_upload_task_tag(
704 upload_input_manager, 'upload_part'
705 )
706
707 size = transfer_future.meta.size
708 adjuster = ChunksizeAdjuster()
709 chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
710 part_iterator = upload_input_manager.yield_upload_part_bodies(
711 transfer_future, chunksize
712 )
713
714 for part_number, fileobj in part_iterator:
715 part_futures.append(
716 self._transfer_coordinator.submit(
717 request_executor,
718 UploadPartTask(
719 transfer_coordinator=self._transfer_coordinator,
720 main_kwargs={
721 'client': client,
722 'fileobj': fileobj,
723 'bucket': call_args.bucket,
724 'key': call_args.key,
725 'part_number': part_number,
726 'extra_args': extra_part_args,
727 },
728 pending_main_kwargs={
729 'upload_id': create_multipart_future
730 },
731 ),
732 tag=upload_part_tag,
733 )
734 )
735
736 complete_multipart_extra_args = self._extra_complete_multipart_args(
737 call_args.extra_args
738 )
739 # Submit the request to complete the multipart upload.
740 self._transfer_coordinator.submit(
741 request_executor,
742 CompleteMultipartUploadTask(
743 transfer_coordinator=self._transfer_coordinator,
744 main_kwargs={
745 'client': client,
746 'bucket': call_args.bucket,
747 'key': call_args.key,
748 'extra_args': complete_multipart_extra_args,
749 },
750 pending_main_kwargs={
751 'upload_id': create_multipart_future,
752 'parts': part_futures,
753 },
754 is_final=True,
755 ),
756 )
757
758 def _extra_upload_part_args(self, extra_args):
759 # Only the args in UPLOAD_PART_ARGS actually need to be passed
760 # onto the upload_part calls.
761 return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS)
762
763 def _extra_complete_multipart_args(self, extra_args):
764 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
765
766 def _extra_create_multipart_args(self, extra_args):
767 return get_filtered_dict(
768 extra_args, blocklisted_keys=self.CREATE_MULTIPART_BLOCKLIST
769 )
770
771 def _extra_put_object_args(self, extra_args):
772 return get_filtered_dict(
773 extra_args, blocklisted_keys=self.PUT_OBJECT_BLOCKLIST
774 )
775
776 def _get_upload_task_tag(self, upload_input_manager, operation_name):
777 tag = None
778 if upload_input_manager.stores_body_in_memory(operation_name):
779 tag = IN_MEMORY_UPLOAD_TAG
780 return tag
781
782
783class PutObjectTask(Task):
784 """Task to do a nonmultipart upload"""
785
786 def _main(self, client, fileobj, bucket, key, extra_args):
787 """
788 :param client: The client to use when calling PutObject
789 :param fileobj: The file to upload.
790 :param bucket: The name of the bucket to upload to
791 :param key: The name of the key to upload to
792 :param extra_args: A dictionary of any extra arguments that may be
793 used in the upload.
794 """
795 with fileobj as body:
796 client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
797
798
799class UploadPartTask(Task):
800 """Task to upload a part in a multipart upload"""
801
802 def _main(
803 self, client, fileobj, bucket, key, upload_id, part_number, extra_args
804 ):
805 """
806 :param client: The client to use when calling PutObject
807 :param fileobj: The file to upload.
808 :param bucket: The name of the bucket to upload to
809 :param key: The name of the key to upload to
810 :param upload_id: The id of the upload
811 :param part_number: The number representing the part of the multipart
812 upload
813 :param extra_args: A dictionary of any extra arguments that may be
814 used in the upload.
815
816 :rtype: dict
817 :returns: A dictionary representing a part::
818
819 {'Etag': etag_value, 'PartNumber': part_number}
820
821 This value can be appended to a list to be used to complete
822 the multipart upload.
823 """
824 with fileobj as body:
825 response = client.upload_part(
826 Bucket=bucket,
827 Key=key,
828 UploadId=upload_id,
829 PartNumber=part_number,
830 Body=body,
831 **extra_args,
832 )
833 etag = response['ETag']
834 part_metadata = {'ETag': etag, 'PartNumber': part_number}
835 if 'ChecksumAlgorithm' in extra_args:
836 algorithm_name = extra_args['ChecksumAlgorithm'].upper()
837 checksum_member = f'Checksum{algorithm_name}'
838 if checksum_member in response:
839 part_metadata[checksum_member] = response[checksum_member]
840 return part_metadata