Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/upload.py: 30%
248 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import math
14from io import BytesIO
16from s3transfer.compat import readable, seekable
17from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
18from s3transfer.tasks import (
19 CompleteMultipartUploadTask,
20 CreateMultipartUploadTask,
21 SubmissionTask,
22 Task,
23)
24from s3transfer.utils import (
25 ChunksizeAdjuster,
26 DeferredOpenFile,
27 get_callbacks,
28 get_filtered_dict,
29)
32class AggregatedProgressCallback:
33 def __init__(self, callbacks, threshold=1024 * 256):
34 """Aggregates progress updates for every provided progress callback
36 :type callbacks: A list of functions that accepts bytes_transferred
37 as a single argument
38 :param callbacks: The callbacks to invoke when threshold is reached
40 :type threshold: int
41 :param threshold: The progress threshold in which to take the
42 aggregated progress and invoke the progress callback with that
43 aggregated progress total
44 """
45 self._callbacks = callbacks
46 self._threshold = threshold
47 self._bytes_seen = 0
49 def __call__(self, bytes_transferred):
50 self._bytes_seen += bytes_transferred
51 if self._bytes_seen >= self._threshold:
52 self._trigger_callbacks()
54 def flush(self):
55 """Flushes out any progress that has not been sent to its callbacks"""
56 if self._bytes_seen > 0:
57 self._trigger_callbacks()
59 def _trigger_callbacks(self):
60 for callback in self._callbacks:
61 callback(bytes_transferred=self._bytes_seen)
62 self._bytes_seen = 0
65class InterruptReader:
66 """Wrapper that can interrupt reading using an error
68 It uses a transfer coordinator to propagate an error if it notices
69 that a read is being made while the file is being read from.
71 :type fileobj: file-like obj
72 :param fileobj: The file-like object to read from
74 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
75 :param transfer_coordinator: The transfer coordinator to use if the
76 reader needs to be interrupted.
77 """
79 def __init__(self, fileobj, transfer_coordinator):
80 self._fileobj = fileobj
81 self._transfer_coordinator = transfer_coordinator
83 def read(self, amount=None):
84 # If there is an exception, then raise the exception.
85 # We raise an error instead of returning no bytes because for
86 # requests where the content length and md5 was sent, it will
87 # cause md5 mismatches and retries as there was no indication that
88 # the stream being read from encountered any issues.
89 if self._transfer_coordinator.exception:
90 raise self._transfer_coordinator.exception
91 return self._fileobj.read(amount)
93 def seek(self, where, whence=0):
94 self._fileobj.seek(where, whence)
96 def tell(self):
97 return self._fileobj.tell()
99 def close(self):
100 self._fileobj.close()
102 def __enter__(self):
103 return self
105 def __exit__(self, *args, **kwargs):
106 self.close()
109class UploadInputManager:
110 """Base manager class for handling various types of files for uploads
112 This class is typically used for the UploadSubmissionTask class to help
113 determine the following:
115 * How to determine the size of the file
116 * How to determine if a multipart upload is required
117 * How to retrieve the body for a PutObject
118 * How to retrieve the bodies for a set of UploadParts
120 The answers/implementations differ for the various types of file inputs
121 that may be accepted. All implementations must subclass and override
122 public methods from this class.
123 """
125 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
126 self._osutil = osutil
127 self._transfer_coordinator = transfer_coordinator
128 self._bandwidth_limiter = bandwidth_limiter
130 @classmethod
131 def is_compatible(cls, upload_source):
132 """Determines if the source for the upload is compatible with manager
134 :param upload_source: The source for which the upload will pull data
135 from.
137 :returns: True if the manager can handle the type of source specified
138 otherwise returns False.
139 """
140 raise NotImplementedError('must implement _is_compatible()')
142 def stores_body_in_memory(self, operation_name):
143 """Whether the body it provides are stored in-memory
145 :type operation_name: str
146 :param operation_name: The name of the client operation that the body
147 is being used for. Valid operation_names are ``put_object`` and
148 ``upload_part``.
150 :rtype: boolean
151 :returns: True if the body returned by the manager will be stored in
152 memory. False if the manager will not directly store the body in
153 memory.
154 """
155 raise NotImplementedError('must implement store_body_in_memory()')
157 def provide_transfer_size(self, transfer_future):
158 """Provides the transfer size of an upload
160 :type transfer_future: s3transfer.futures.TransferFuture
161 :param transfer_future: The future associated with upload request
162 """
163 raise NotImplementedError('must implement provide_transfer_size()')
165 def requires_multipart_upload(self, transfer_future, config):
166 """Determines where a multipart upload is required
168 :type transfer_future: s3transfer.futures.TransferFuture
169 :param transfer_future: The future associated with upload request
171 :type config: s3transfer.manager.TransferConfig
172 :param config: The config associated to the transfer manager
174 :rtype: boolean
175 :returns: True, if the upload should be multipart based on
176 configuration and size. False, otherwise.
177 """
178 raise NotImplementedError('must implement requires_multipart_upload()')
180 def get_put_object_body(self, transfer_future):
181 """Returns the body to use for PutObject
183 :type transfer_future: s3transfer.futures.TransferFuture
184 :param transfer_future: The future associated with upload request
186 :type config: s3transfer.manager.TransferConfig
187 :param config: The config associated to the transfer manager
189 :rtype: s3transfer.utils.ReadFileChunk
190 :returns: A ReadFileChunk including all progress callbacks
191 associated with the transfer future.
192 """
193 raise NotImplementedError('must implement get_put_object_body()')
195 def yield_upload_part_bodies(self, transfer_future, chunksize):
196 """Yields the part number and body to use for each UploadPart
198 :type transfer_future: s3transfer.futures.TransferFuture
199 :param transfer_future: The future associated with upload request
201 :type chunksize: int
202 :param chunksize: The chunksize to use for this upload.
204 :rtype: int, s3transfer.utils.ReadFileChunk
205 :returns: Yields the part number and the ReadFileChunk including all
206 progress callbacks associated with the transfer future for that
207 specific yielded part.
208 """
209 raise NotImplementedError('must implement yield_upload_part_bodies()')
211 def _wrap_fileobj(self, fileobj):
212 fileobj = InterruptReader(fileobj, self._transfer_coordinator)
213 if self._bandwidth_limiter:
214 fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
215 fileobj, self._transfer_coordinator, enabled=False
216 )
217 return fileobj
219 def _get_progress_callbacks(self, transfer_future):
220 callbacks = get_callbacks(transfer_future, 'progress')
221 # We only want to be wrapping the callbacks if there are callbacks to
222 # invoke because we do not want to be doing any unnecessary work if
223 # there are no callbacks to invoke.
224 if callbacks:
225 return [AggregatedProgressCallback(callbacks)]
226 return []
228 def _get_close_callbacks(self, aggregated_progress_callbacks):
229 return [callback.flush for callback in aggregated_progress_callbacks]
232class UploadFilenameInputManager(UploadInputManager):
233 """Upload utility for filenames"""
235 @classmethod
236 def is_compatible(cls, upload_source):
237 return isinstance(upload_source, str)
239 def stores_body_in_memory(self, operation_name):
240 return False
242 def provide_transfer_size(self, transfer_future):
243 transfer_future.meta.provide_transfer_size(
244 self._osutil.get_file_size(transfer_future.meta.call_args.fileobj)
245 )
247 def requires_multipart_upload(self, transfer_future, config):
248 return transfer_future.meta.size >= config.multipart_threshold
250 def get_put_object_body(self, transfer_future):
251 # Get a file-like object for the given input
252 fileobj, full_size = self._get_put_object_fileobj_with_full_size(
253 transfer_future
254 )
256 # Wrap fileobj with interrupt reader that will quickly cancel
257 # uploads if needed instead of having to wait for the socket
258 # to completely read all of the data.
259 fileobj = self._wrap_fileobj(fileobj)
261 callbacks = self._get_progress_callbacks(transfer_future)
262 close_callbacks = self._get_close_callbacks(callbacks)
263 size = transfer_future.meta.size
264 # Return the file-like object wrapped into a ReadFileChunk to get
265 # progress.
266 return self._osutil.open_file_chunk_reader_from_fileobj(
267 fileobj=fileobj,
268 chunk_size=size,
269 full_file_size=full_size,
270 callbacks=callbacks,
271 close_callbacks=close_callbacks,
272 )
274 def yield_upload_part_bodies(self, transfer_future, chunksize):
275 full_file_size = transfer_future.meta.size
276 num_parts = self._get_num_parts(transfer_future, chunksize)
277 for part_number in range(1, num_parts + 1):
278 callbacks = self._get_progress_callbacks(transfer_future)
279 close_callbacks = self._get_close_callbacks(callbacks)
280 start_byte = chunksize * (part_number - 1)
281 # Get a file-like object for that part and the size of the full
282 # file size for the associated file-like object for that part.
283 fileobj, full_size = self._get_upload_part_fileobj_with_full_size(
284 transfer_future.meta.call_args.fileobj,
285 start_byte=start_byte,
286 part_size=chunksize,
287 full_file_size=full_file_size,
288 )
290 # Wrap fileobj with interrupt reader that will quickly cancel
291 # uploads if needed instead of having to wait for the socket
292 # to completely read all of the data.
293 fileobj = self._wrap_fileobj(fileobj)
295 # Wrap the file-like object into a ReadFileChunk to get progress.
296 read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
297 fileobj=fileobj,
298 chunk_size=chunksize,
299 full_file_size=full_size,
300 callbacks=callbacks,
301 close_callbacks=close_callbacks,
302 )
303 yield part_number, read_file_chunk
305 def _get_deferred_open_file(self, fileobj, start_byte):
306 fileobj = DeferredOpenFile(
307 fileobj, start_byte, open_function=self._osutil.open
308 )
309 return fileobj
311 def _get_put_object_fileobj_with_full_size(self, transfer_future):
312 fileobj = transfer_future.meta.call_args.fileobj
313 size = transfer_future.meta.size
314 return self._get_deferred_open_file(fileobj, 0), size
316 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
317 start_byte = kwargs['start_byte']
318 full_size = kwargs['full_file_size']
319 return self._get_deferred_open_file(fileobj, start_byte), full_size
321 def _get_num_parts(self, transfer_future, part_size):
322 return int(math.ceil(transfer_future.meta.size / float(part_size)))
325class UploadSeekableInputManager(UploadFilenameInputManager):
326 """Upload utility for an open file object"""
328 @classmethod
329 def is_compatible(cls, upload_source):
330 return readable(upload_source) and seekable(upload_source)
332 def stores_body_in_memory(self, operation_name):
333 if operation_name == 'put_object':
334 return False
335 else:
336 return True
338 def provide_transfer_size(self, transfer_future):
339 fileobj = transfer_future.meta.call_args.fileobj
340 # To determine size, first determine the starting position
341 # Seek to the end and then find the difference in the length
342 # between the end and start positions.
343 start_position = fileobj.tell()
344 fileobj.seek(0, 2)
345 end_position = fileobj.tell()
346 fileobj.seek(start_position)
347 transfer_future.meta.provide_transfer_size(
348 end_position - start_position
349 )
351 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
352 # Note: It is unfortunate that in order to do a multithreaded
353 # multipart upload we cannot simply copy the filelike object
354 # since there is not really a mechanism in python (i.e. os.dup
355 # points to the same OS filehandle which causes concurrency
356 # issues). So instead we need to read from the fileobj and
357 # chunk the data out to separate file-like objects in memory.
358 data = fileobj.read(kwargs['part_size'])
359 # We return the length of the data instead of the full_file_size
360 # because we partitioned the data into separate BytesIO objects
361 # meaning the BytesIO object has no knowledge of its start position
362 # relative the input source nor access to the rest of the input
363 # source. So we must treat it as its own standalone file.
364 return BytesIO(data), len(data)
366 def _get_put_object_fileobj_with_full_size(self, transfer_future):
367 fileobj = transfer_future.meta.call_args.fileobj
368 # The current position needs to be taken into account when retrieving
369 # the full size of the file.
370 size = fileobj.tell() + transfer_future.meta.size
371 return fileobj, size
374class UploadNonSeekableInputManager(UploadInputManager):
375 """Upload utility for a file-like object that cannot seek."""
377 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
378 super().__init__(osutil, transfer_coordinator, bandwidth_limiter)
379 self._initial_data = b''
381 @classmethod
382 def is_compatible(cls, upload_source):
383 return readable(upload_source)
385 def stores_body_in_memory(self, operation_name):
386 return True
388 def provide_transfer_size(self, transfer_future):
389 # No-op because there is no way to do this short of reading the entire
390 # body into memory.
391 return
393 def requires_multipart_upload(self, transfer_future, config):
394 # If the user has set the size, we can use that.
395 if transfer_future.meta.size is not None:
396 return transfer_future.meta.size >= config.multipart_threshold
398 # This is tricky to determine in this case because we can't know how
399 # large the input is. So to figure it out, we read data into memory
400 # up until the threshold and compare how much data was actually read
401 # against the threshold.
402 fileobj = transfer_future.meta.call_args.fileobj
403 threshold = config.multipart_threshold
404 self._initial_data = self._read(fileobj, threshold, False)
405 if len(self._initial_data) < threshold:
406 return False
407 else:
408 return True
410 def get_put_object_body(self, transfer_future):
411 callbacks = self._get_progress_callbacks(transfer_future)
412 close_callbacks = self._get_close_callbacks(callbacks)
413 fileobj = transfer_future.meta.call_args.fileobj
415 body = self._wrap_data(
416 self._initial_data + fileobj.read(), callbacks, close_callbacks
417 )
419 # Zero out the stored data so we don't have additional copies
420 # hanging around in memory.
421 self._initial_data = None
422 return body
424 def yield_upload_part_bodies(self, transfer_future, chunksize):
425 file_object = transfer_future.meta.call_args.fileobj
426 part_number = 0
428 # Continue reading parts from the file-like object until it is empty.
429 while True:
430 callbacks = self._get_progress_callbacks(transfer_future)
431 close_callbacks = self._get_close_callbacks(callbacks)
432 part_number += 1
433 part_content = self._read(file_object, chunksize)
434 if not part_content:
435 break
436 part_object = self._wrap_data(
437 part_content, callbacks, close_callbacks
438 )
440 # Zero out part_content to avoid hanging on to additional data.
441 part_content = None
442 yield part_number, part_object
444 def _read(self, fileobj, amount, truncate=True):
445 """
446 Reads a specific amount of data from a stream and returns it. If there
447 is any data in initial_data, that will be popped out first.
449 :type fileobj: A file-like object that implements read
450 :param fileobj: The stream to read from.
452 :type amount: int
453 :param amount: The number of bytes to read from the stream.
455 :type truncate: bool
456 :param truncate: Whether or not to truncate initial_data after
457 reading from it.
459 :return: Generator which generates part bodies from the initial data.
460 """
461 # If the the initial data is empty, we simply read from the fileobj
462 if len(self._initial_data) == 0:
463 return fileobj.read(amount)
465 # If the requested number of bytes is less than the amount of
466 # initial data, pull entirely from initial data.
467 if amount <= len(self._initial_data):
468 data = self._initial_data[:amount]
469 # Truncate initial data so we don't hang onto the data longer
470 # than we need.
471 if truncate:
472 self._initial_data = self._initial_data[amount:]
473 return data
475 # At this point there is some initial data left, but not enough to
476 # satisfy the number of bytes requested. Pull out the remaining
477 # initial data and read the rest from the fileobj.
478 amount_to_read = amount - len(self._initial_data)
479 data = self._initial_data + fileobj.read(amount_to_read)
481 # Zero out initial data so we don't hang onto the data any more.
482 if truncate:
483 self._initial_data = b''
484 return data
486 def _wrap_data(self, data, callbacks, close_callbacks):
487 """
488 Wraps data with the interrupt reader and the file chunk reader.
490 :type data: bytes
491 :param data: The data to wrap.
493 :type callbacks: list
494 :param callbacks: The callbacks associated with the transfer future.
496 :type close_callbacks: list
497 :param close_callbacks: The callbacks to be called when closing the
498 wrapper for the data.
500 :return: Fully wrapped data.
501 """
502 fileobj = self._wrap_fileobj(BytesIO(data))
503 return self._osutil.open_file_chunk_reader_from_fileobj(
504 fileobj=fileobj,
505 chunk_size=len(data),
506 full_file_size=len(data),
507 callbacks=callbacks,
508 close_callbacks=close_callbacks,
509 )
512class UploadSubmissionTask(SubmissionTask):
513 """Task for submitting tasks to execute an upload"""
515 UPLOAD_PART_ARGS = [
516 'ChecksumAlgorithm',
517 'SSECustomerKey',
518 'SSECustomerAlgorithm',
519 'SSECustomerKeyMD5',
520 'RequestPayer',
521 'ExpectedBucketOwner',
522 ]
524 COMPLETE_MULTIPART_ARGS = [
525 'SSECustomerKey',
526 'SSECustomerAlgorithm',
527 'SSECustomerKeyMD5',
528 'RequestPayer',
529 'ExpectedBucketOwner',
530 ]
532 def _get_upload_input_manager_cls(self, transfer_future):
533 """Retrieves a class for managing input for an upload based on file type
535 :type transfer_future: s3transfer.futures.TransferFuture
536 :param transfer_future: The transfer future for the request
538 :rtype: class of UploadInputManager
539 :returns: The appropriate class to use for managing a specific type of
540 input for uploads.
541 """
542 upload_manager_resolver_chain = [
543 UploadFilenameInputManager,
544 UploadSeekableInputManager,
545 UploadNonSeekableInputManager,
546 ]
548 fileobj = transfer_future.meta.call_args.fileobj
549 for upload_manager_cls in upload_manager_resolver_chain:
550 if upload_manager_cls.is_compatible(fileobj):
551 return upload_manager_cls
552 raise RuntimeError(
553 'Input {} of type: {} is not supported.'.format(
554 fileobj, type(fileobj)
555 )
556 )
558 def _submit(
559 self,
560 client,
561 config,
562 osutil,
563 request_executor,
564 transfer_future,
565 bandwidth_limiter=None,
566 ):
567 """
568 :param client: The client associated with the transfer manager
570 :type config: s3transfer.manager.TransferConfig
571 :param config: The transfer config associated with the transfer
572 manager
574 :type osutil: s3transfer.utils.OSUtil
575 :param osutil: The os utility associated to the transfer manager
577 :type request_executor: s3transfer.futures.BoundedExecutor
578 :param request_executor: The request executor associated with the
579 transfer manager
581 :type transfer_future: s3transfer.futures.TransferFuture
582 :param transfer_future: The transfer future associated with the
583 transfer request that tasks are being submitted for
584 """
585 upload_input_manager = self._get_upload_input_manager_cls(
586 transfer_future
587 )(osutil, self._transfer_coordinator, bandwidth_limiter)
589 # Determine the size if it was not provided
590 if transfer_future.meta.size is None:
591 upload_input_manager.provide_transfer_size(transfer_future)
593 # Do a multipart upload if needed, otherwise do a regular put object.
594 if not upload_input_manager.requires_multipart_upload(
595 transfer_future, config
596 ):
597 self._submit_upload_request(
598 client,
599 config,
600 osutil,
601 request_executor,
602 transfer_future,
603 upload_input_manager,
604 )
605 else:
606 self._submit_multipart_request(
607 client,
608 config,
609 osutil,
610 request_executor,
611 transfer_future,
612 upload_input_manager,
613 )
615 def _submit_upload_request(
616 self,
617 client,
618 config,
619 osutil,
620 request_executor,
621 transfer_future,
622 upload_input_manager,
623 ):
624 call_args = transfer_future.meta.call_args
626 # Get any tags that need to be associated to the put object task
627 put_object_tag = self._get_upload_task_tag(
628 upload_input_manager, 'put_object'
629 )
631 # Submit the request of a single upload.
632 self._transfer_coordinator.submit(
633 request_executor,
634 PutObjectTask(
635 transfer_coordinator=self._transfer_coordinator,
636 main_kwargs={
637 'client': client,
638 'fileobj': upload_input_manager.get_put_object_body(
639 transfer_future
640 ),
641 'bucket': call_args.bucket,
642 'key': call_args.key,
643 'extra_args': call_args.extra_args,
644 },
645 is_final=True,
646 ),
647 tag=put_object_tag,
648 )
650 def _submit_multipart_request(
651 self,
652 client,
653 config,
654 osutil,
655 request_executor,
656 transfer_future,
657 upload_input_manager,
658 ):
659 call_args = transfer_future.meta.call_args
661 # Submit the request to create a multipart upload.
662 create_multipart_future = self._transfer_coordinator.submit(
663 request_executor,
664 CreateMultipartUploadTask(
665 transfer_coordinator=self._transfer_coordinator,
666 main_kwargs={
667 'client': client,
668 'bucket': call_args.bucket,
669 'key': call_args.key,
670 'extra_args': call_args.extra_args,
671 },
672 ),
673 )
675 # Submit requests to upload the parts of the file.
676 part_futures = []
677 extra_part_args = self._extra_upload_part_args(call_args.extra_args)
679 # Get any tags that need to be associated to the submitted task
680 # for upload the data
681 upload_part_tag = self._get_upload_task_tag(
682 upload_input_manager, 'upload_part'
683 )
685 size = transfer_future.meta.size
686 adjuster = ChunksizeAdjuster()
687 chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
688 part_iterator = upload_input_manager.yield_upload_part_bodies(
689 transfer_future, chunksize
690 )
692 for part_number, fileobj in part_iterator:
693 part_futures.append(
694 self._transfer_coordinator.submit(
695 request_executor,
696 UploadPartTask(
697 transfer_coordinator=self._transfer_coordinator,
698 main_kwargs={
699 'client': client,
700 'fileobj': fileobj,
701 'bucket': call_args.bucket,
702 'key': call_args.key,
703 'part_number': part_number,
704 'extra_args': extra_part_args,
705 },
706 pending_main_kwargs={
707 'upload_id': create_multipart_future
708 },
709 ),
710 tag=upload_part_tag,
711 )
712 )
714 complete_multipart_extra_args = self._extra_complete_multipart_args(
715 call_args.extra_args
716 )
717 # Submit the request to complete the multipart upload.
718 self._transfer_coordinator.submit(
719 request_executor,
720 CompleteMultipartUploadTask(
721 transfer_coordinator=self._transfer_coordinator,
722 main_kwargs={
723 'client': client,
724 'bucket': call_args.bucket,
725 'key': call_args.key,
726 'extra_args': complete_multipart_extra_args,
727 },
728 pending_main_kwargs={
729 'upload_id': create_multipart_future,
730 'parts': part_futures,
731 },
732 is_final=True,
733 ),
734 )
736 def _extra_upload_part_args(self, extra_args):
737 # Only the args in UPLOAD_PART_ARGS actually need to be passed
738 # onto the upload_part calls.
739 return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS)
741 def _extra_complete_multipart_args(self, extra_args):
742 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
744 def _get_upload_task_tag(self, upload_input_manager, operation_name):
745 tag = None
746 if upload_input_manager.stores_body_in_memory(operation_name):
747 tag = IN_MEMORY_UPLOAD_TAG
748 return tag
751class PutObjectTask(Task):
752 """Task to do a nonmultipart upload"""
754 def _main(self, client, fileobj, bucket, key, extra_args):
755 """
756 :param client: The client to use when calling PutObject
757 :param fileobj: The file to upload.
758 :param bucket: The name of the bucket to upload to
759 :param key: The name of the key to upload to
760 :param extra_args: A dictionary of any extra arguments that may be
761 used in the upload.
762 """
763 with fileobj as body:
764 client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
767class UploadPartTask(Task):
768 """Task to upload a part in a multipart upload"""
770 def _main(
771 self, client, fileobj, bucket, key, upload_id, part_number, extra_args
772 ):
773 """
774 :param client: The client to use when calling PutObject
775 :param fileobj: The file to upload.
776 :param bucket: The name of the bucket to upload to
777 :param key: The name of the key to upload to
778 :param upload_id: The id of the upload
779 :param part_number: The number representing the part of the multipart
780 upload
781 :param extra_args: A dictionary of any extra arguments that may be
782 used in the upload.
784 :rtype: dict
785 :returns: A dictionary representing a part::
787 {'Etag': etag_value, 'PartNumber': part_number}
789 This value can be appended to a list to be used to complete
790 the multipart upload.
791 """
792 with fileobj as body:
793 response = client.upload_part(
794 Bucket=bucket,
795 Key=key,
796 UploadId=upload_id,
797 PartNumber=part_number,
798 Body=body,
799 **extra_args,
800 )
801 etag = response['ETag']
802 part_metadata = {'ETag': etag, 'PartNumber': part_number}
803 if 'ChecksumAlgorithm' in extra_args:
804 algorithm_name = extra_args['ChecksumAlgorithm'].upper()
805 checksum_member = f'Checksum{algorithm_name}'
806 if checksum_member in response:
807 part_metadata[checksum_member] = response[checksum_member]
808 return part_metadata