Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/__init__.py: 29%
308 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Throttling based on max bandwidth
22* Progress callbacks to monitor transfers
23* Retries. While botocore handles retries for streaming uploads,
24 it is not possible for it to handle retries for streaming
25 downloads. This module handles retries for both cases so
26 you don't need to implement any retry logic yourself.
28This module has a reasonable set of defaults. It also allows you
29to configure many aspects of the transfer process including:
31* Multipart threshold size
32* Max parallel downloads
33* Max bandwidth
34* Socket timeouts
35* Retry amounts
37There is no support for s3->s3 multipart copies at this
38time.
41.. _ref_s3transfer_usage:
43Usage
44=====
46The simplest way to use this module is:
48.. code-block:: python
50 client = boto3.client('s3', 'us-west-2')
51 transfer = S3Transfer(client)
52 # Upload /tmp/myfile to s3://bucket/key
53 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
55 # Download s3://bucket/key to /tmp/myfile
56 transfer.download_file('bucket', 'key', '/tmp/myfile')
58The ``upload_file`` and ``download_file`` methods also accept
59``**kwargs``, which will be forwarded through to the corresponding
60client operation. Here are a few examples using ``upload_file``::
62 # Making the object public
63 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
64 extra_args={'ACL': 'public-read'})
66 # Setting metadata
67 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
70 # Setting content type
71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
72 extra_args={'ContentType': "application/json"})
75The ``S3Transfer`` class also supports progress callbacks so you can
76provide transfer progress to users. Both the ``upload_file`` and
77``download_file`` methods take an optional ``callback`` parameter.
78Here's an example of how to print a simple progress percentage
79to the user:
81.. code-block:: python
83 class ProgressPercentage(object):
84 def __init__(self, filename):
85 self._filename = filename
86 self._size = float(os.path.getsize(filename))
87 self._seen_so_far = 0
88 self._lock = threading.Lock()
90 def __call__(self, bytes_amount):
91 # To simplify we'll assume this is hooked up
92 # to a single filename.
93 with self._lock:
94 self._seen_so_far += bytes_amount
95 percentage = (self._seen_so_far / self._size) * 100
96 sys.stdout.write(
97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
98 self._size, percentage))
99 sys.stdout.flush()
102 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
103 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
104 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
105 callback=ProgressPercentage('/tmp/myfile'))
109You can also provide a TransferConfig object to the S3Transfer
110object that gives you more fine grained control over the
111transfer. For example:
113.. code-block:: python
115 client = boto3.client('s3', 'us-west-2')
116 config = TransferConfig(
117 multipart_threshold=8 * 1024 * 1024,
118 max_concurrency=10,
119 num_download_attempts=10,
120 )
121 transfer = S3Transfer(client, config)
122 transfer.upload_file('/tmp/foo', 'bucket', 'key')
125"""
126import concurrent.futures
127import functools
128import logging
129import math
130import os
131import queue
132import random
133import socket
134import string
135import threading
137from botocore.compat import six # noqa: F401
138from botocore.exceptions import IncompleteReadError
139from botocore.vendored.requests.packages.urllib3.exceptions import (
140 ReadTimeoutError,
141)
143import s3transfer.compat
144from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
146__author__ = 'Amazon Web Services'
147__version__ = '0.8.2'
150class NullHandler(logging.Handler):
151 def emit(self, record):
152 pass
155logger = logging.getLogger(__name__)
156logger.addHandler(NullHandler())
158MB = 1024 * 1024
159SHUTDOWN_SENTINEL = object()
162def random_file_extension(num_digits=8):
163 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
166def disable_upload_callbacks(request, operation_name, **kwargs):
167 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
168 request.body, 'disable_callback'
169 ):
170 request.body.disable_callback()
173def enable_upload_callbacks(request, operation_name, **kwargs):
174 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
175 request.body, 'enable_callback'
176 ):
177 request.body.enable_callback()
180class QueueShutdownError(Exception):
181 pass
184class ReadFileChunk:
185 def __init__(
186 self,
187 fileobj,
188 start_byte,
189 chunk_size,
190 full_file_size,
191 callback=None,
192 enable_callback=True,
193 ):
194 """
196 Given a file object shown below:
198 |___________________________________________________|
199 0 | | full_file_size
200 |----chunk_size---|
201 start_byte
203 :type fileobj: file
204 :param fileobj: File like object
206 :type start_byte: int
207 :param start_byte: The first byte from which to start reading.
209 :type chunk_size: int
210 :param chunk_size: The max chunk size to read. Trying to read
211 pass the end of the chunk size will behave like you've
212 reached the end of the file.
214 :type full_file_size: int
215 :param full_file_size: The entire content length associated
216 with ``fileobj``.
218 :type callback: function(amount_read)
219 :param callback: Called whenever data is read from this object.
221 """
222 self._fileobj = fileobj
223 self._start_byte = start_byte
224 self._size = self._calculate_file_size(
225 self._fileobj,
226 requested_size=chunk_size,
227 start_byte=start_byte,
228 actual_file_size=full_file_size,
229 )
230 self._fileobj.seek(self._start_byte)
231 self._amount_read = 0
232 self._callback = callback
233 self._callback_enabled = enable_callback
235 @classmethod
236 def from_filename(
237 cls,
238 filename,
239 start_byte,
240 chunk_size,
241 callback=None,
242 enable_callback=True,
243 ):
244 """Convenience factory function to create from a filename.
246 :type start_byte: int
247 :param start_byte: The first byte from which to start reading.
249 :type chunk_size: int
250 :param chunk_size: The max chunk size to read. Trying to read
251 pass the end of the chunk size will behave like you've
252 reached the end of the file.
254 :type full_file_size: int
255 :param full_file_size: The entire content length associated
256 with ``fileobj``.
258 :type callback: function(amount_read)
259 :param callback: Called whenever data is read from this object.
261 :type enable_callback: bool
262 :param enable_callback: Indicate whether to invoke callback
263 during read() calls.
265 :rtype: ``ReadFileChunk``
266 :return: A new instance of ``ReadFileChunk``
268 """
269 f = open(filename, 'rb')
270 file_size = os.fstat(f.fileno()).st_size
271 return cls(
272 f, start_byte, chunk_size, file_size, callback, enable_callback
273 )
275 def _calculate_file_size(
276 self, fileobj, requested_size, start_byte, actual_file_size
277 ):
278 max_chunk_size = actual_file_size - start_byte
279 return min(max_chunk_size, requested_size)
281 def read(self, amount=None):
282 if amount is None:
283 amount_to_read = self._size - self._amount_read
284 else:
285 amount_to_read = min(self._size - self._amount_read, amount)
286 data = self._fileobj.read(amount_to_read)
287 self._amount_read += len(data)
288 if self._callback is not None and self._callback_enabled:
289 self._callback(len(data))
290 return data
292 def enable_callback(self):
293 self._callback_enabled = True
295 def disable_callback(self):
296 self._callback_enabled = False
298 def seek(self, where):
299 self._fileobj.seek(self._start_byte + where)
300 if self._callback is not None and self._callback_enabled:
301 # To also rewind the callback() for an accurate progress report
302 self._callback(where - self._amount_read)
303 self._amount_read = where
305 def close(self):
306 self._fileobj.close()
308 def tell(self):
309 return self._amount_read
311 def __len__(self):
312 # __len__ is defined because requests will try to determine the length
313 # of the stream to set a content length. In the normal case
314 # of the file it will just stat the file, but we need to change that
315 # behavior. By providing a __len__, requests will use that instead
316 # of stat'ing the file.
317 return self._size
319 def __enter__(self):
320 return self
322 def __exit__(self, *args, **kwargs):
323 self.close()
325 def __iter__(self):
326 # This is a workaround for http://bugs.python.org/issue17575
327 # Basically httplib will try to iterate over the contents, even
328 # if its a file like object. This wasn't noticed because we've
329 # already exhausted the stream so iterating over the file immediately
330 # stops, which is what we're simulating here.
331 return iter([])
334class StreamReaderProgress:
335 """Wrapper for a read only stream that adds progress callbacks."""
337 def __init__(self, stream, callback=None):
338 self._stream = stream
339 self._callback = callback
341 def read(self, *args, **kwargs):
342 value = self._stream.read(*args, **kwargs)
343 if self._callback is not None:
344 self._callback(len(value))
345 return value
348class OSUtils:
349 def get_file_size(self, filename):
350 return os.path.getsize(filename)
352 def open_file_chunk_reader(self, filename, start_byte, size, callback):
353 return ReadFileChunk.from_filename(
354 filename, start_byte, size, callback, enable_callback=False
355 )
357 def open(self, filename, mode):
358 return open(filename, mode)
360 def remove_file(self, filename):
361 """Remove a file, noop if file does not exist."""
362 # Unlike os.remove, if the file does not exist,
363 # then this method does nothing.
364 try:
365 os.remove(filename)
366 except OSError:
367 pass
369 def rename_file(self, current_filename, new_filename):
370 s3transfer.compat.rename_file(current_filename, new_filename)
373class MultipartUploader:
374 # These are the extra_args that need to be forwarded onto
375 # subsequent upload_parts.
376 UPLOAD_PART_ARGS = [
377 'SSECustomerKey',
378 'SSECustomerAlgorithm',
379 'SSECustomerKeyMD5',
380 'RequestPayer',
381 ]
383 def __init__(
384 self,
385 client,
386 config,
387 osutil,
388 executor_cls=concurrent.futures.ThreadPoolExecutor,
389 ):
390 self._client = client
391 self._config = config
392 self._os = osutil
393 self._executor_cls = executor_cls
395 def _extra_upload_part_args(self, extra_args):
396 # Only the args in UPLOAD_PART_ARGS actually need to be passed
397 # onto the upload_part calls.
398 upload_parts_args = {}
399 for key, value in extra_args.items():
400 if key in self.UPLOAD_PART_ARGS:
401 upload_parts_args[key] = value
402 return upload_parts_args
404 def upload_file(self, filename, bucket, key, callback, extra_args):
405 response = self._client.create_multipart_upload(
406 Bucket=bucket, Key=key, **extra_args
407 )
408 upload_id = response['UploadId']
409 try:
410 parts = self._upload_parts(
411 upload_id, filename, bucket, key, callback, extra_args
412 )
413 except Exception as e:
414 logger.debug(
415 "Exception raised while uploading parts, "
416 "aborting multipart upload.",
417 exc_info=True,
418 )
419 self._client.abort_multipart_upload(
420 Bucket=bucket, Key=key, UploadId=upload_id
421 )
422 raise S3UploadFailedError(
423 "Failed to upload {} to {}: {}".format(
424 filename, '/'.join([bucket, key]), e
425 )
426 )
427 self._client.complete_multipart_upload(
428 Bucket=bucket,
429 Key=key,
430 UploadId=upload_id,
431 MultipartUpload={'Parts': parts},
432 )
434 def _upload_parts(
435 self, upload_id, filename, bucket, key, callback, extra_args
436 ):
437 upload_parts_extra_args = self._extra_upload_part_args(extra_args)
438 parts = []
439 part_size = self._config.multipart_chunksize
440 num_parts = int(
441 math.ceil(self._os.get_file_size(filename) / float(part_size))
442 )
443 max_workers = self._config.max_concurrency
444 with self._executor_cls(max_workers=max_workers) as executor:
445 upload_partial = functools.partial(
446 self._upload_one_part,
447 filename,
448 bucket,
449 key,
450 upload_id,
451 part_size,
452 upload_parts_extra_args,
453 callback,
454 )
455 for part in executor.map(upload_partial, range(1, num_parts + 1)):
456 parts.append(part)
457 return parts
459 def _upload_one_part(
460 self,
461 filename,
462 bucket,
463 key,
464 upload_id,
465 part_size,
466 extra_args,
467 callback,
468 part_number,
469 ):
470 open_chunk_reader = self._os.open_file_chunk_reader
471 with open_chunk_reader(
472 filename, part_size * (part_number - 1), part_size, callback
473 ) as body:
474 response = self._client.upload_part(
475 Bucket=bucket,
476 Key=key,
477 UploadId=upload_id,
478 PartNumber=part_number,
479 Body=body,
480 **extra_args,
481 )
482 etag = response['ETag']
483 return {'ETag': etag, 'PartNumber': part_number}
486class ShutdownQueue(queue.Queue):
487 """A queue implementation that can be shutdown.
489 Shutting down a queue means that this class adds a
490 trigger_shutdown method that will trigger all subsequent
491 calls to put() to fail with a ``QueueShutdownError``.
493 It purposefully deviates from queue.Queue, and is *not* meant
494 to be a drop in replacement for ``queue.Queue``.
496 """
498 def _init(self, maxsize):
499 self._shutdown = False
500 self._shutdown_lock = threading.Lock()
501 # queue.Queue is an old style class so we don't use super().
502 return queue.Queue._init(self, maxsize)
504 def trigger_shutdown(self):
505 with self._shutdown_lock:
506 self._shutdown = True
507 logger.debug("The IO queue is now shutdown.")
509 def put(self, item):
510 # Note: this is not sufficient, it's still possible to deadlock!
511 # Need to hook into the condition vars used by this class.
512 with self._shutdown_lock:
513 if self._shutdown:
514 raise QueueShutdownError(
515 "Cannot put item to queue when " "queue has been shutdown."
516 )
517 return queue.Queue.put(self, item)
520class MultipartDownloader:
521 def __init__(
522 self,
523 client,
524 config,
525 osutil,
526 executor_cls=concurrent.futures.ThreadPoolExecutor,
527 ):
528 self._client = client
529 self._config = config
530 self._os = osutil
531 self._executor_cls = executor_cls
532 self._ioqueue = ShutdownQueue(self._config.max_io_queue)
534 def download_file(
535 self, bucket, key, filename, object_size, extra_args, callback=None
536 ):
537 with self._executor_cls(max_workers=2) as controller:
538 # 1 thread for the future that manages the uploading of files
539 # 1 thread for the future that manages IO writes.
540 download_parts_handler = functools.partial(
541 self._download_file_as_future,
542 bucket,
543 key,
544 filename,
545 object_size,
546 callback,
547 )
548 parts_future = controller.submit(download_parts_handler)
550 io_writes_handler = functools.partial(
551 self._perform_io_writes, filename
552 )
553 io_future = controller.submit(io_writes_handler)
554 results = concurrent.futures.wait(
555 [parts_future, io_future],
556 return_when=concurrent.futures.FIRST_EXCEPTION,
557 )
558 self._process_future_results(results)
560 def _process_future_results(self, futures):
561 finished, unfinished = futures
562 for future in finished:
563 future.result()
565 def _download_file_as_future(
566 self, bucket, key, filename, object_size, callback
567 ):
568 part_size = self._config.multipart_chunksize
569 num_parts = int(math.ceil(object_size / float(part_size)))
570 max_workers = self._config.max_concurrency
571 download_partial = functools.partial(
572 self._download_range,
573 bucket,
574 key,
575 filename,
576 part_size,
577 num_parts,
578 callback,
579 )
580 try:
581 with self._executor_cls(max_workers=max_workers) as executor:
582 list(executor.map(download_partial, range(num_parts)))
583 finally:
584 self._ioqueue.put(SHUTDOWN_SENTINEL)
586 def _calculate_range_param(self, part_size, part_index, num_parts):
587 start_range = part_index * part_size
588 if part_index == num_parts - 1:
589 end_range = ''
590 else:
591 end_range = start_range + part_size - 1
592 range_param = f'bytes={start_range}-{end_range}'
593 return range_param
595 def _download_range(
596 self, bucket, key, filename, part_size, num_parts, callback, part_index
597 ):
598 try:
599 range_param = self._calculate_range_param(
600 part_size, part_index, num_parts
601 )
603 max_attempts = self._config.num_download_attempts
604 last_exception = None
605 for i in range(max_attempts):
606 try:
607 logger.debug("Making get_object call.")
608 response = self._client.get_object(
609 Bucket=bucket, Key=key, Range=range_param
610 )
611 streaming_body = StreamReaderProgress(
612 response['Body'], callback
613 )
614 buffer_size = 1024 * 16
615 current_index = part_size * part_index
616 for chunk in iter(
617 lambda: streaming_body.read(buffer_size), b''
618 ):
619 self._ioqueue.put((current_index, chunk))
620 current_index += len(chunk)
621 return
622 except (
623 socket.timeout,
624 OSError,
625 ReadTimeoutError,
626 IncompleteReadError,
627 ) as e:
628 logger.debug(
629 "Retrying exception caught (%s), "
630 "retrying request, (attempt %s / %s)",
631 e,
632 i,
633 max_attempts,
634 exc_info=True,
635 )
636 last_exception = e
637 continue
638 raise RetriesExceededError(last_exception)
639 finally:
640 logger.debug("EXITING _download_range for part: %s", part_index)
642 def _perform_io_writes(self, filename):
643 with self._os.open(filename, 'wb') as f:
644 while True:
645 task = self._ioqueue.get()
646 if task is SHUTDOWN_SENTINEL:
647 logger.debug(
648 "Shutdown sentinel received in IO handler, "
649 "shutting down IO handler."
650 )
651 return
652 else:
653 try:
654 offset, data = task
655 f.seek(offset)
656 f.write(data)
657 except Exception as e:
658 logger.debug(
659 "Caught exception in IO thread: %s",
660 e,
661 exc_info=True,
662 )
663 self._ioqueue.trigger_shutdown()
664 raise
667class TransferConfig:
668 def __init__(
669 self,
670 multipart_threshold=8 * MB,
671 max_concurrency=10,
672 multipart_chunksize=8 * MB,
673 num_download_attempts=5,
674 max_io_queue=100,
675 ):
676 self.multipart_threshold = multipart_threshold
677 self.max_concurrency = max_concurrency
678 self.multipart_chunksize = multipart_chunksize
679 self.num_download_attempts = num_download_attempts
680 self.max_io_queue = max_io_queue
683class S3Transfer:
685 ALLOWED_DOWNLOAD_ARGS = [
686 'VersionId',
687 'SSECustomerAlgorithm',
688 'SSECustomerKey',
689 'SSECustomerKeyMD5',
690 'RequestPayer',
691 ]
693 ALLOWED_UPLOAD_ARGS = [
694 'ACL',
695 'CacheControl',
696 'ContentDisposition',
697 'ContentEncoding',
698 'ContentLanguage',
699 'ContentType',
700 'Expires',
701 'GrantFullControl',
702 'GrantRead',
703 'GrantReadACP',
704 'GrantWriteACL',
705 'Metadata',
706 'RequestPayer',
707 'ServerSideEncryption',
708 'StorageClass',
709 'SSECustomerAlgorithm',
710 'SSECustomerKey',
711 'SSECustomerKeyMD5',
712 'SSEKMSKeyId',
713 'SSEKMSEncryptionContext',
714 'Tagging',
715 ]
717 def __init__(self, client, config=None, osutil=None):
718 self._client = client
719 if config is None:
720 config = TransferConfig()
721 self._config = config
722 if osutil is None:
723 osutil = OSUtils()
724 self._osutil = osutil
726 def upload_file(
727 self, filename, bucket, key, callback=None, extra_args=None
728 ):
729 """Upload a file to an S3 object.
731 Variants have also been injected into S3 client, Bucket and Object.
732 You don't have to use S3Transfer.upload_file() directly.
733 """
734 if extra_args is None:
735 extra_args = {}
736 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
737 events = self._client.meta.events
738 events.register_first(
739 'request-created.s3',
740 disable_upload_callbacks,
741 unique_id='s3upload-callback-disable',
742 )
743 events.register_last(
744 'request-created.s3',
745 enable_upload_callbacks,
746 unique_id='s3upload-callback-enable',
747 )
748 if (
749 self._osutil.get_file_size(filename)
750 >= self._config.multipart_threshold
751 ):
752 self._multipart_upload(filename, bucket, key, callback, extra_args)
753 else:
754 self._put_object(filename, bucket, key, callback, extra_args)
756 def _put_object(self, filename, bucket, key, callback, extra_args):
757 # We're using open_file_chunk_reader so we can take advantage of the
758 # progress callback functionality.
759 open_chunk_reader = self._osutil.open_file_chunk_reader
760 with open_chunk_reader(
761 filename,
762 0,
763 self._osutil.get_file_size(filename),
764 callback=callback,
765 ) as body:
766 self._client.put_object(
767 Bucket=bucket, Key=key, Body=body, **extra_args
768 )
770 def download_file(
771 self, bucket, key, filename, extra_args=None, callback=None
772 ):
773 """Download an S3 object to a file.
775 Variants have also been injected into S3 client, Bucket and Object.
776 You don't have to use S3Transfer.download_file() directly.
777 """
778 # This method will issue a ``head_object`` request to determine
779 # the size of the S3 object. This is used to determine if the
780 # object is downloaded in parallel.
781 if extra_args is None:
782 extra_args = {}
783 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
784 object_size = self._object_size(bucket, key, extra_args)
785 temp_filename = filename + os.extsep + random_file_extension()
786 try:
787 self._download_file(
788 bucket, key, temp_filename, object_size, extra_args, callback
789 )
790 except Exception:
791 logger.debug(
792 "Exception caught in download_file, removing partial "
793 "file: %s",
794 temp_filename,
795 exc_info=True,
796 )
797 self._osutil.remove_file(temp_filename)
798 raise
799 else:
800 self._osutil.rename_file(temp_filename, filename)
802 def _download_file(
803 self, bucket, key, filename, object_size, extra_args, callback
804 ):
805 if object_size >= self._config.multipart_threshold:
806 self._ranged_download(
807 bucket, key, filename, object_size, extra_args, callback
808 )
809 else:
810 self._get_object(bucket, key, filename, extra_args, callback)
812 def _validate_all_known_args(self, actual, allowed):
813 for kwarg in actual:
814 if kwarg not in allowed:
815 raise ValueError(
816 "Invalid extra_args key '%s', "
817 "must be one of: %s" % (kwarg, ', '.join(allowed))
818 )
820 def _ranged_download(
821 self, bucket, key, filename, object_size, extra_args, callback
822 ):
823 downloader = MultipartDownloader(
824 self._client, self._config, self._osutil
825 )
826 downloader.download_file(
827 bucket, key, filename, object_size, extra_args, callback
828 )
830 def _get_object(self, bucket, key, filename, extra_args, callback):
831 # precondition: num_download_attempts > 0
832 max_attempts = self._config.num_download_attempts
833 last_exception = None
834 for i in range(max_attempts):
835 try:
836 return self._do_get_object(
837 bucket, key, filename, extra_args, callback
838 )
839 except (
840 socket.timeout,
841 OSError,
842 ReadTimeoutError,
843 IncompleteReadError,
844 ) as e:
845 # TODO: we need a way to reset the callback if the
846 # download failed.
847 logger.debug(
848 "Retrying exception caught (%s), "
849 "retrying request, (attempt %s / %s)",
850 e,
851 i,
852 max_attempts,
853 exc_info=True,
854 )
855 last_exception = e
856 continue
857 raise RetriesExceededError(last_exception)
859 def _do_get_object(self, bucket, key, filename, extra_args, callback):
860 response = self._client.get_object(
861 Bucket=bucket, Key=key, **extra_args
862 )
863 streaming_body = StreamReaderProgress(response['Body'], callback)
864 with self._osutil.open(filename, 'wb') as f:
865 for chunk in iter(lambda: streaming_body.read(8192), b''):
866 f.write(chunk)
868 def _object_size(self, bucket, key, extra_args):
869 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
870 'ContentLength'
871 ]
873 def _multipart_upload(self, filename, bucket, key, callback, extra_args):
874 uploader = MultipartUploader(self._client, self._config, self._osutil)
875 uploader.upload_file(filename, bucket, key, callback, extra_args)