1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
14
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
17
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Throttling based on max bandwidth
22* Progress callbacks to monitor transfers
23* Retries. While botocore handles retries for streaming uploads,
24 it is not possible for it to handle retries for streaming
25 downloads. This module handles retries for both cases so
26 you don't need to implement any retry logic yourself.
27
28This module has a reasonable set of defaults. It also allows you
29to configure many aspects of the transfer process including:
30
31* Multipart threshold size
32* Max parallel downloads
33* Max bandwidth
34* Socket timeouts
35* Retry amounts
36
37There is no support for s3->s3 multipart copies at this
38time.
39
40
41.. _ref_s3transfer_usage:
42
43Usage
44=====
45
46The simplest way to use this module is:
47
48.. code-block:: python
49
50 client = boto3.client('s3', 'us-west-2')
51 transfer = S3Transfer(client)
52 # Upload /tmp/myfile to s3://bucket/key
53 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
54
55 # Download s3://bucket/key to /tmp/myfile
56 transfer.download_file('bucket', 'key', '/tmp/myfile')
57
58The ``upload_file`` and ``download_file`` methods also accept
59``**kwargs``, which will be forwarded through to the corresponding
60client operation. Here are a few examples using ``upload_file``::
61
62 # Making the object public
63 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
64 extra_args={'ACL': 'public-read'})
65
66 # Setting metadata
67 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
69
70 # Setting content type
71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
72 extra_args={'ContentType': "application/json"})
73
74
75The ``S3Transfer`` class also supports progress callbacks so you can
76provide transfer progress to users. Both the ``upload_file`` and
77``download_file`` methods take an optional ``callback`` parameter.
78Here's an example of how to print a simple progress percentage
79to the user:
80
81.. code-block:: python
82
83 class ProgressPercentage(object):
84 def __init__(self, filename):
85 self._filename = filename
86 self._size = float(os.path.getsize(filename))
87 self._seen_so_far = 0
88 self._lock = threading.Lock()
89
90 def __call__(self, bytes_amount):
91 # To simplify we'll assume this is hooked up
92 # to a single filename.
93 with self._lock:
94 self._seen_so_far += bytes_amount
95 percentage = (self._seen_so_far / self._size) * 100
96 sys.stdout.write(
97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
98 self._size, percentage))
99 sys.stdout.flush()
100
101
102 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
103 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
104 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
105 callback=ProgressPercentage('/tmp/myfile'))
106
107
108
109You can also provide a TransferConfig object to the S3Transfer
110object that gives you more fine grained control over the
111transfer. For example:
112
113.. code-block:: python
114
115 client = boto3.client('s3', 'us-west-2')
116 config = TransferConfig(
117 multipart_threshold=8 * 1024 * 1024,
118 max_concurrency=10,
119 num_download_attempts=10,
120 )
121 transfer = S3Transfer(client, config)
122 transfer.upload_file('/tmp/foo', 'bucket', 'key')
123
124
125"""
126import concurrent.futures
127import functools
128import logging
129import math
130import os
131import queue
132import random
133import socket
134import string
135import threading
136
137from botocore.compat import six # noqa: F401
138from botocore.exceptions import IncompleteReadError, ResponseStreamingError
139from botocore.vendored.requests.packages.urllib3.exceptions import (
140 ReadTimeoutError,
141)
142
143import s3transfer.compat
144from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
145
146__author__ = 'Amazon Web Services'
147__version__ = '0.10.2'
148
149
150class NullHandler(logging.Handler):
151 def emit(self, record):
152 pass
153
154
155logger = logging.getLogger(__name__)
156logger.addHandler(NullHandler())
157
158MB = 1024 * 1024
159SHUTDOWN_SENTINEL = object()
160
161
162def random_file_extension(num_digits=8):
163 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
164
165
166def disable_upload_callbacks(request, operation_name, **kwargs):
167 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
168 request.body, 'disable_callback'
169 ):
170 request.body.disable_callback()
171
172
173def enable_upload_callbacks(request, operation_name, **kwargs):
174 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
175 request.body, 'enable_callback'
176 ):
177 request.body.enable_callback()
178
179
180class QueueShutdownError(Exception):
181 pass
182
183
184class ReadFileChunk:
185 def __init__(
186 self,
187 fileobj,
188 start_byte,
189 chunk_size,
190 full_file_size,
191 callback=None,
192 enable_callback=True,
193 ):
194 """
195
196 Given a file object shown below:
197
198 |___________________________________________________|
199 0 | | full_file_size
200 |----chunk_size---|
201 start_byte
202
203 :type fileobj: file
204 :param fileobj: File like object
205
206 :type start_byte: int
207 :param start_byte: The first byte from which to start reading.
208
209 :type chunk_size: int
210 :param chunk_size: The max chunk size to read. Trying to read
211 pass the end of the chunk size will behave like you've
212 reached the end of the file.
213
214 :type full_file_size: int
215 :param full_file_size: The entire content length associated
216 with ``fileobj``.
217
218 :type callback: function(amount_read)
219 :param callback: Called whenever data is read from this object.
220
221 """
222 self._fileobj = fileobj
223 self._start_byte = start_byte
224 self._size = self._calculate_file_size(
225 self._fileobj,
226 requested_size=chunk_size,
227 start_byte=start_byte,
228 actual_file_size=full_file_size,
229 )
230 self._fileobj.seek(self._start_byte)
231 self._amount_read = 0
232 self._callback = callback
233 self._callback_enabled = enable_callback
234
235 @classmethod
236 def from_filename(
237 cls,
238 filename,
239 start_byte,
240 chunk_size,
241 callback=None,
242 enable_callback=True,
243 ):
244 """Convenience factory function to create from a filename.
245
246 :type start_byte: int
247 :param start_byte: The first byte from which to start reading.
248
249 :type chunk_size: int
250 :param chunk_size: The max chunk size to read. Trying to read
251 pass the end of the chunk size will behave like you've
252 reached the end of the file.
253
254 :type full_file_size: int
255 :param full_file_size: The entire content length associated
256 with ``fileobj``.
257
258 :type callback: function(amount_read)
259 :param callback: Called whenever data is read from this object.
260
261 :type enable_callback: bool
262 :param enable_callback: Indicate whether to invoke callback
263 during read() calls.
264
265 :rtype: ``ReadFileChunk``
266 :return: A new instance of ``ReadFileChunk``
267
268 """
269 f = open(filename, 'rb')
270 file_size = os.fstat(f.fileno()).st_size
271 return cls(
272 f, start_byte, chunk_size, file_size, callback, enable_callback
273 )
274
275 def _calculate_file_size(
276 self, fileobj, requested_size, start_byte, actual_file_size
277 ):
278 max_chunk_size = actual_file_size - start_byte
279 return min(max_chunk_size, requested_size)
280
281 def read(self, amount=None):
282 if amount is None:
283 amount_to_read = self._size - self._amount_read
284 else:
285 amount_to_read = min(self._size - self._amount_read, amount)
286 data = self._fileobj.read(amount_to_read)
287 self._amount_read += len(data)
288 if self._callback is not None and self._callback_enabled:
289 self._callback(len(data))
290 return data
291
292 def enable_callback(self):
293 self._callback_enabled = True
294
295 def disable_callback(self):
296 self._callback_enabled = False
297
298 def seek(self, where):
299 self._fileobj.seek(self._start_byte + where)
300 if self._callback is not None and self._callback_enabled:
301 # To also rewind the callback() for an accurate progress report
302 self._callback(where - self._amount_read)
303 self._amount_read = where
304
305 def close(self):
306 self._fileobj.close()
307
308 def tell(self):
309 return self._amount_read
310
311 def __len__(self):
312 # __len__ is defined because requests will try to determine the length
313 # of the stream to set a content length. In the normal case
314 # of the file it will just stat the file, but we need to change that
315 # behavior. By providing a __len__, requests will use that instead
316 # of stat'ing the file.
317 return self._size
318
319 def __enter__(self):
320 return self
321
322 def __exit__(self, *args, **kwargs):
323 self.close()
324
325 def __iter__(self):
326 # This is a workaround for http://bugs.python.org/issue17575
327 # Basically httplib will try to iterate over the contents, even
328 # if its a file like object. This wasn't noticed because we've
329 # already exhausted the stream so iterating over the file immediately
330 # stops, which is what we're simulating here.
331 return iter([])
332
333
334class StreamReaderProgress:
335 """Wrapper for a read only stream that adds progress callbacks."""
336
337 def __init__(self, stream, callback=None):
338 self._stream = stream
339 self._callback = callback
340
341 def read(self, *args, **kwargs):
342 value = self._stream.read(*args, **kwargs)
343 if self._callback is not None:
344 self._callback(len(value))
345 return value
346
347
348class OSUtils:
349 def get_file_size(self, filename):
350 return os.path.getsize(filename)
351
352 def open_file_chunk_reader(self, filename, start_byte, size, callback):
353 return ReadFileChunk.from_filename(
354 filename, start_byte, size, callback, enable_callback=False
355 )
356
357 def open(self, filename, mode):
358 return open(filename, mode)
359
360 def remove_file(self, filename):
361 """Remove a file, noop if file does not exist."""
362 # Unlike os.remove, if the file does not exist,
363 # then this method does nothing.
364 try:
365 os.remove(filename)
366 except OSError:
367 pass
368
369 def rename_file(self, current_filename, new_filename):
370 s3transfer.compat.rename_file(current_filename, new_filename)
371
372
373class MultipartUploader:
374 # These are the extra_args that need to be forwarded onto
375 # subsequent upload_parts.
376 UPLOAD_PART_ARGS = [
377 'SSECustomerKey',
378 'SSECustomerAlgorithm',
379 'SSECustomerKeyMD5',
380 'RequestPayer',
381 ]
382
383 def __init__(
384 self,
385 client,
386 config,
387 osutil,
388 executor_cls=concurrent.futures.ThreadPoolExecutor,
389 ):
390 self._client = client
391 self._config = config
392 self._os = osutil
393 self._executor_cls = executor_cls
394
395 def _extra_upload_part_args(self, extra_args):
396 # Only the args in UPLOAD_PART_ARGS actually need to be passed
397 # onto the upload_part calls.
398 upload_parts_args = {}
399 for key, value in extra_args.items():
400 if key in self.UPLOAD_PART_ARGS:
401 upload_parts_args[key] = value
402 return upload_parts_args
403
404 def upload_file(self, filename, bucket, key, callback, extra_args):
405 response = self._client.create_multipart_upload(
406 Bucket=bucket, Key=key, **extra_args
407 )
408 upload_id = response['UploadId']
409 try:
410 parts = self._upload_parts(
411 upload_id, filename, bucket, key, callback, extra_args
412 )
413 except Exception as e:
414 logger.debug(
415 "Exception raised while uploading parts, "
416 "aborting multipart upload.",
417 exc_info=True,
418 )
419 self._client.abort_multipart_upload(
420 Bucket=bucket, Key=key, UploadId=upload_id
421 )
422 raise S3UploadFailedError(
423 "Failed to upload {} to {}: {}".format(
424 filename, '/'.join([bucket, key]), e
425 )
426 )
427 self._client.complete_multipart_upload(
428 Bucket=bucket,
429 Key=key,
430 UploadId=upload_id,
431 MultipartUpload={'Parts': parts},
432 )
433
434 def _upload_parts(
435 self, upload_id, filename, bucket, key, callback, extra_args
436 ):
437 upload_parts_extra_args = self._extra_upload_part_args(extra_args)
438 parts = []
439 part_size = self._config.multipart_chunksize
440 num_parts = int(
441 math.ceil(self._os.get_file_size(filename) / float(part_size))
442 )
443 max_workers = self._config.max_concurrency
444 with self._executor_cls(max_workers=max_workers) as executor:
445 upload_partial = functools.partial(
446 self._upload_one_part,
447 filename,
448 bucket,
449 key,
450 upload_id,
451 part_size,
452 upload_parts_extra_args,
453 callback,
454 )
455 for part in executor.map(upload_partial, range(1, num_parts + 1)):
456 parts.append(part)
457 return parts
458
459 def _upload_one_part(
460 self,
461 filename,
462 bucket,
463 key,
464 upload_id,
465 part_size,
466 extra_args,
467 callback,
468 part_number,
469 ):
470 open_chunk_reader = self._os.open_file_chunk_reader
471 with open_chunk_reader(
472 filename, part_size * (part_number - 1), part_size, callback
473 ) as body:
474 response = self._client.upload_part(
475 Bucket=bucket,
476 Key=key,
477 UploadId=upload_id,
478 PartNumber=part_number,
479 Body=body,
480 **extra_args,
481 )
482 etag = response['ETag']
483 return {'ETag': etag, 'PartNumber': part_number}
484
485
486class ShutdownQueue(queue.Queue):
487 """A queue implementation that can be shutdown.
488
489 Shutting down a queue means that this class adds a
490 trigger_shutdown method that will trigger all subsequent
491 calls to put() to fail with a ``QueueShutdownError``.
492
493 It purposefully deviates from queue.Queue, and is *not* meant
494 to be a drop in replacement for ``queue.Queue``.
495
496 """
497
498 def _init(self, maxsize):
499 self._shutdown = False
500 self._shutdown_lock = threading.Lock()
501 # queue.Queue is an old style class so we don't use super().
502 return queue.Queue._init(self, maxsize)
503
504 def trigger_shutdown(self):
505 with self._shutdown_lock:
506 self._shutdown = True
507 logger.debug("The IO queue is now shutdown.")
508
509 def put(self, item):
510 # Note: this is not sufficient, it's still possible to deadlock!
511 # Need to hook into the condition vars used by this class.
512 with self._shutdown_lock:
513 if self._shutdown:
514 raise QueueShutdownError(
515 "Cannot put item to queue when " "queue has been shutdown."
516 )
517 return queue.Queue.put(self, item)
518
519
520class MultipartDownloader:
521 def __init__(
522 self,
523 client,
524 config,
525 osutil,
526 executor_cls=concurrent.futures.ThreadPoolExecutor,
527 ):
528 self._client = client
529 self._config = config
530 self._os = osutil
531 self._executor_cls = executor_cls
532 self._ioqueue = ShutdownQueue(self._config.max_io_queue)
533
534 def download_file(
535 self, bucket, key, filename, object_size, extra_args, callback=None
536 ):
537 with self._executor_cls(max_workers=2) as controller:
538 # 1 thread for the future that manages the uploading of files
539 # 1 thread for the future that manages IO writes.
540 download_parts_handler = functools.partial(
541 self._download_file_as_future,
542 bucket,
543 key,
544 filename,
545 object_size,
546 callback,
547 )
548 parts_future = controller.submit(download_parts_handler)
549
550 io_writes_handler = functools.partial(
551 self._perform_io_writes, filename
552 )
553 io_future = controller.submit(io_writes_handler)
554 results = concurrent.futures.wait(
555 [parts_future, io_future],
556 return_when=concurrent.futures.FIRST_EXCEPTION,
557 )
558 self._process_future_results(results)
559
560 def _process_future_results(self, futures):
561 finished, unfinished = futures
562 for future in finished:
563 future.result()
564
565 def _download_file_as_future(
566 self, bucket, key, filename, object_size, callback
567 ):
568 part_size = self._config.multipart_chunksize
569 num_parts = int(math.ceil(object_size / float(part_size)))
570 max_workers = self._config.max_concurrency
571 download_partial = functools.partial(
572 self._download_range,
573 bucket,
574 key,
575 filename,
576 part_size,
577 num_parts,
578 callback,
579 )
580 try:
581 with self._executor_cls(max_workers=max_workers) as executor:
582 list(executor.map(download_partial, range(num_parts)))
583 finally:
584 self._ioqueue.put(SHUTDOWN_SENTINEL)
585
586 def _calculate_range_param(self, part_size, part_index, num_parts):
587 start_range = part_index * part_size
588 if part_index == num_parts - 1:
589 end_range = ''
590 else:
591 end_range = start_range + part_size - 1
592 range_param = f'bytes={start_range}-{end_range}'
593 return range_param
594
595 def _download_range(
596 self, bucket, key, filename, part_size, num_parts, callback, part_index
597 ):
598 try:
599 range_param = self._calculate_range_param(
600 part_size, part_index, num_parts
601 )
602
603 max_attempts = self._config.num_download_attempts
604 last_exception = None
605 for i in range(max_attempts):
606 try:
607 logger.debug("Making get_object call.")
608 response = self._client.get_object(
609 Bucket=bucket, Key=key, Range=range_param
610 )
611 streaming_body = StreamReaderProgress(
612 response['Body'], callback
613 )
614 buffer_size = 1024 * 16
615 current_index = part_size * part_index
616 for chunk in iter(
617 lambda: streaming_body.read(buffer_size), b''
618 ):
619 self._ioqueue.put((current_index, chunk))
620 current_index += len(chunk)
621 return
622 except (
623 socket.timeout,
624 OSError,
625 ReadTimeoutError,
626 IncompleteReadError,
627 ResponseStreamingError,
628 ) as e:
629 logger.debug(
630 "Retrying exception caught (%s), "
631 "retrying request, (attempt %s / %s)",
632 e,
633 i,
634 max_attempts,
635 exc_info=True,
636 )
637 last_exception = e
638 continue
639 raise RetriesExceededError(last_exception)
640 finally:
641 logger.debug("EXITING _download_range for part: %s", part_index)
642
643 def _perform_io_writes(self, filename):
644 with self._os.open(filename, 'wb') as f:
645 while True:
646 task = self._ioqueue.get()
647 if task is SHUTDOWN_SENTINEL:
648 logger.debug(
649 "Shutdown sentinel received in IO handler, "
650 "shutting down IO handler."
651 )
652 return
653 else:
654 try:
655 offset, data = task
656 f.seek(offset)
657 f.write(data)
658 except Exception as e:
659 logger.debug(
660 "Caught exception in IO thread: %s",
661 e,
662 exc_info=True,
663 )
664 self._ioqueue.trigger_shutdown()
665 raise
666
667
668class TransferConfig:
669 def __init__(
670 self,
671 multipart_threshold=8 * MB,
672 max_concurrency=10,
673 multipart_chunksize=8 * MB,
674 num_download_attempts=5,
675 max_io_queue=100,
676 ):
677 self.multipart_threshold = multipart_threshold
678 self.max_concurrency = max_concurrency
679 self.multipart_chunksize = multipart_chunksize
680 self.num_download_attempts = num_download_attempts
681 self.max_io_queue = max_io_queue
682
683
684class S3Transfer:
685 ALLOWED_DOWNLOAD_ARGS = [
686 'VersionId',
687 'SSECustomerAlgorithm',
688 'SSECustomerKey',
689 'SSECustomerKeyMD5',
690 'RequestPayer',
691 ]
692
693 ALLOWED_UPLOAD_ARGS = [
694 'ACL',
695 'CacheControl',
696 'ContentDisposition',
697 'ContentEncoding',
698 'ContentLanguage',
699 'ContentType',
700 'Expires',
701 'GrantFullControl',
702 'GrantRead',
703 'GrantReadACP',
704 'GrantWriteACL',
705 'Metadata',
706 'RequestPayer',
707 'ServerSideEncryption',
708 'StorageClass',
709 'SSECustomerAlgorithm',
710 'SSECustomerKey',
711 'SSECustomerKeyMD5',
712 'SSEKMSKeyId',
713 'SSEKMSEncryptionContext',
714 'Tagging',
715 ]
716
717 def __init__(self, client, config=None, osutil=None):
718 self._client = client
719 if config is None:
720 config = TransferConfig()
721 self._config = config
722 if osutil is None:
723 osutil = OSUtils()
724 self._osutil = osutil
725
726 def upload_file(
727 self, filename, bucket, key, callback=None, extra_args=None
728 ):
729 """Upload a file to an S3 object.
730
731 Variants have also been injected into S3 client, Bucket and Object.
732 You don't have to use S3Transfer.upload_file() directly.
733 """
734 if extra_args is None:
735 extra_args = {}
736 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
737 events = self._client.meta.events
738 events.register_first(
739 'request-created.s3',
740 disable_upload_callbacks,
741 unique_id='s3upload-callback-disable',
742 )
743 events.register_last(
744 'request-created.s3',
745 enable_upload_callbacks,
746 unique_id='s3upload-callback-enable',
747 )
748 if (
749 self._osutil.get_file_size(filename)
750 >= self._config.multipart_threshold
751 ):
752 self._multipart_upload(filename, bucket, key, callback, extra_args)
753 else:
754 self._put_object(filename, bucket, key, callback, extra_args)
755
756 def _put_object(self, filename, bucket, key, callback, extra_args):
757 # We're using open_file_chunk_reader so we can take advantage of the
758 # progress callback functionality.
759 open_chunk_reader = self._osutil.open_file_chunk_reader
760 with open_chunk_reader(
761 filename,
762 0,
763 self._osutil.get_file_size(filename),
764 callback=callback,
765 ) as body:
766 self._client.put_object(
767 Bucket=bucket, Key=key, Body=body, **extra_args
768 )
769
770 def download_file(
771 self, bucket, key, filename, extra_args=None, callback=None
772 ):
773 """Download an S3 object to a file.
774
775 Variants have also been injected into S3 client, Bucket and Object.
776 You don't have to use S3Transfer.download_file() directly.
777 """
778 # This method will issue a ``head_object`` request to determine
779 # the size of the S3 object. This is used to determine if the
780 # object is downloaded in parallel.
781 if extra_args is None:
782 extra_args = {}
783 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
784 object_size = self._object_size(bucket, key, extra_args)
785 temp_filename = filename + os.extsep + random_file_extension()
786 try:
787 self._download_file(
788 bucket, key, temp_filename, object_size, extra_args, callback
789 )
790 except Exception:
791 logger.debug(
792 "Exception caught in download_file, removing partial "
793 "file: %s",
794 temp_filename,
795 exc_info=True,
796 )
797 self._osutil.remove_file(temp_filename)
798 raise
799 else:
800 self._osutil.rename_file(temp_filename, filename)
801
802 def _download_file(
803 self, bucket, key, filename, object_size, extra_args, callback
804 ):
805 if object_size >= self._config.multipart_threshold:
806 self._ranged_download(
807 bucket, key, filename, object_size, extra_args, callback
808 )
809 else:
810 self._get_object(bucket, key, filename, extra_args, callback)
811
812 def _validate_all_known_args(self, actual, allowed):
813 for kwarg in actual:
814 if kwarg not in allowed:
815 raise ValueError(
816 "Invalid extra_args key '%s', "
817 "must be one of: %s" % (kwarg, ', '.join(allowed))
818 )
819
820 def _ranged_download(
821 self, bucket, key, filename, object_size, extra_args, callback
822 ):
823 downloader = MultipartDownloader(
824 self._client, self._config, self._osutil
825 )
826 downloader.download_file(
827 bucket, key, filename, object_size, extra_args, callback
828 )
829
830 def _get_object(self, bucket, key, filename, extra_args, callback):
831 # precondition: num_download_attempts > 0
832 max_attempts = self._config.num_download_attempts
833 last_exception = None
834 for i in range(max_attempts):
835 try:
836 return self._do_get_object(
837 bucket, key, filename, extra_args, callback
838 )
839 except (
840 socket.timeout,
841 OSError,
842 ReadTimeoutError,
843 IncompleteReadError,
844 ResponseStreamingError,
845 ) as e:
846 # TODO: we need a way to reset the callback if the
847 # download failed.
848 logger.debug(
849 "Retrying exception caught (%s), "
850 "retrying request, (attempt %s / %s)",
851 e,
852 i,
853 max_attempts,
854 exc_info=True,
855 )
856 last_exception = e
857 continue
858 raise RetriesExceededError(last_exception)
859
860 def _do_get_object(self, bucket, key, filename, extra_args, callback):
861 response = self._client.get_object(
862 Bucket=bucket, Key=key, **extra_args
863 )
864 streaming_body = StreamReaderProgress(response['Body'], callback)
865 with self._osutil.open(filename, 'wb') as f:
866 for chunk in iter(lambda: streaming_body.read(8192), b''):
867 f.write(chunk)
868
869 def _object_size(self, bucket, key, extra_args):
870 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
871 'ContentLength'
872 ]
873
874 def _multipart_upload(self, filename, bucket, key, callback, extra_args):
875 uploader = MultipartUploader(self._client, self._config, self._osutil)
876 uploader.upload_file(filename, bucket, key, callback, extra_args)