1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
14
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
17
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Throttling based on max bandwidth
22* Progress callbacks to monitor transfers
23* Retries. While botocore handles retries for streaming uploads,
24 it is not possible for it to handle retries for streaming
25 downloads. This module handles retries for both cases so
26 you don't need to implement any retry logic yourself.
27
28This module has a reasonable set of defaults. It also allows you
29to configure many aspects of the transfer process including:
30
31* Multipart threshold size
32* Max parallel downloads
33* Max bandwidth
34* Socket timeouts
35* Retry amounts
36
37There is no support for s3->s3 multipart copies at this
38time.
39
40
41.. _ref_s3transfer_usage:
42
43Usage
44=====
45
46The simplest way to use this module is:
47
48.. code-block:: python
49
50 client = boto3.client('s3', 'us-west-2')
51 transfer = S3Transfer(client)
52 # Upload /tmp/myfile to s3://bucket/key
53 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
54
55 # Download s3://bucket/key to /tmp/myfile
56 transfer.download_file('bucket', 'key', '/tmp/myfile')
57
58The ``upload_file`` and ``download_file`` methods also accept
59``**kwargs``, which will be forwarded through to the corresponding
60client operation. Here are a few examples using ``upload_file``::
61
62 # Making the object public
63 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
64 extra_args={'ACL': 'public-read'})
65
66 # Setting metadata
67 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
69
70 # Setting content type
71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
72 extra_args={'ContentType': "application/json"})
73
74
75The ``S3Transfer`` class also supports progress callbacks so you can
76provide transfer progress to users. Both the ``upload_file`` and
77``download_file`` methods take an optional ``callback`` parameter.
78Here's an example of how to print a simple progress percentage
79to the user:
80
81.. code-block:: python
82
83 class ProgressPercentage(object):
84 def __init__(self, filename):
85 self._filename = filename
86 self._size = float(os.path.getsize(filename))
87 self._seen_so_far = 0
88 self._lock = threading.Lock()
89
90 def __call__(self, bytes_amount):
91 # To simplify we'll assume this is hooked up
92 # to a single filename.
93 with self._lock:
94 self._seen_so_far += bytes_amount
95 percentage = (self._seen_so_far / self._size) * 100
96 sys.stdout.write(
97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
98 self._size, percentage))
99 sys.stdout.flush()
100
101
102 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
103 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
104 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
105 callback=ProgressPercentage('/tmp/myfile'))
106
107
108
109You can also provide a TransferConfig object to the S3Transfer
110object that gives you more fine grained control over the
111transfer. For example:
112
113.. code-block:: python
114
115 client = boto3.client('s3', 'us-west-2')
116 config = TransferConfig(
117 multipart_threshold=8 * 1024 * 1024,
118 max_concurrency=10,
119 num_download_attempts=10,
120 )
121 transfer = S3Transfer(client, config)
122 transfer.upload_file('/tmp/foo', 'bucket', 'key')
123
124
125"""
126
127import concurrent.futures
128import functools
129import logging
130import math
131import os
132import queue
133import random
134import socket
135import string
136import threading
137
138from botocore.compat import six # noqa: F401
139from botocore.exceptions import IncompleteReadError, ResponseStreamingError
140from botocore.vendored.requests.packages.urllib3.exceptions import (
141 ReadTimeoutError,
142)
143
144import s3transfer.compat
145from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
146
147__author__ = 'Amazon Web Services'
148__version__ = '0.13.0'
149
150
151class NullHandler(logging.Handler):
152 def emit(self, record):
153 pass
154
155
156logger = logging.getLogger(__name__)
157logger.addHandler(NullHandler())
158
159MB = 1024 * 1024
160SHUTDOWN_SENTINEL = object()
161
162
163def random_file_extension(num_digits=8):
164 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
165
166
167def disable_upload_callbacks(request, operation_name, **kwargs):
168 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
169 request.body, 'disable_callback'
170 ):
171 request.body.disable_callback()
172
173
174def enable_upload_callbacks(request, operation_name, **kwargs):
175 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
176 request.body, 'enable_callback'
177 ):
178 request.body.enable_callback()
179
180
181class QueueShutdownError(Exception):
182 pass
183
184
185class ReadFileChunk:
186 def __init__(
187 self,
188 fileobj,
189 start_byte,
190 chunk_size,
191 full_file_size,
192 callback=None,
193 enable_callback=True,
194 ):
195 """
196
197 Given a file object shown below:
198
199 |___________________________________________________|
200 0 | | full_file_size
201 |----chunk_size---|
202 start_byte
203
204 :type fileobj: file
205 :param fileobj: File like object
206
207 :type start_byte: int
208 :param start_byte: The first byte from which to start reading.
209
210 :type chunk_size: int
211 :param chunk_size: The max chunk size to read. Trying to read
212 pass the end of the chunk size will behave like you've
213 reached the end of the file.
214
215 :type full_file_size: int
216 :param full_file_size: The entire content length associated
217 with ``fileobj``.
218
219 :type callback: function(amount_read)
220 :param callback: Called whenever data is read from this object.
221
222 """
223 self._fileobj = fileobj
224 self._start_byte = start_byte
225 self._size = self._calculate_file_size(
226 self._fileobj,
227 requested_size=chunk_size,
228 start_byte=start_byte,
229 actual_file_size=full_file_size,
230 )
231 self._fileobj.seek(self._start_byte)
232 self._amount_read = 0
233 self._callback = callback
234 self._callback_enabled = enable_callback
235
236 @classmethod
237 def from_filename(
238 cls,
239 filename,
240 start_byte,
241 chunk_size,
242 callback=None,
243 enable_callback=True,
244 ):
245 """Convenience factory function to create from a filename.
246
247 :type start_byte: int
248 :param start_byte: The first byte from which to start reading.
249
250 :type chunk_size: int
251 :param chunk_size: The max chunk size to read. Trying to read
252 pass the end of the chunk size will behave like you've
253 reached the end of the file.
254
255 :type full_file_size: int
256 :param full_file_size: The entire content length associated
257 with ``fileobj``.
258
259 :type callback: function(amount_read)
260 :param callback: Called whenever data is read from this object.
261
262 :type enable_callback: bool
263 :param enable_callback: Indicate whether to invoke callback
264 during read() calls.
265
266 :rtype: ``ReadFileChunk``
267 :return: A new instance of ``ReadFileChunk``
268
269 """
270 f = open(filename, 'rb')
271 file_size = os.fstat(f.fileno()).st_size
272 return cls(
273 f, start_byte, chunk_size, file_size, callback, enable_callback
274 )
275
276 def _calculate_file_size(
277 self, fileobj, requested_size, start_byte, actual_file_size
278 ):
279 max_chunk_size = actual_file_size - start_byte
280 return min(max_chunk_size, requested_size)
281
282 def read(self, amount=None):
283 if amount is None:
284 amount_to_read = self._size - self._amount_read
285 else:
286 amount_to_read = min(self._size - self._amount_read, amount)
287 data = self._fileobj.read(amount_to_read)
288 self._amount_read += len(data)
289 if self._callback is not None and self._callback_enabled:
290 self._callback(len(data))
291 return data
292
293 def enable_callback(self):
294 self._callback_enabled = True
295
296 def disable_callback(self):
297 self._callback_enabled = False
298
299 def seek(self, where):
300 self._fileobj.seek(self._start_byte + where)
301 if self._callback is not None and self._callback_enabled:
302 # To also rewind the callback() for an accurate progress report
303 self._callback(where - self._amount_read)
304 self._amount_read = where
305
306 def close(self):
307 self._fileobj.close()
308
309 def tell(self):
310 return self._amount_read
311
312 def __len__(self):
313 # __len__ is defined because requests will try to determine the length
314 # of the stream to set a content length. In the normal case
315 # of the file it will just stat the file, but we need to change that
316 # behavior. By providing a __len__, requests will use that instead
317 # of stat'ing the file.
318 return self._size
319
320 def __enter__(self):
321 return self
322
323 def __exit__(self, *args, **kwargs):
324 self.close()
325
326 def __iter__(self):
327 # This is a workaround for http://bugs.python.org/issue17575
328 # Basically httplib will try to iterate over the contents, even
329 # if its a file like object. This wasn't noticed because we've
330 # already exhausted the stream so iterating over the file immediately
331 # stops, which is what we're simulating here.
332 return iter([])
333
334
335class StreamReaderProgress:
336 """Wrapper for a read only stream that adds progress callbacks."""
337
338 def __init__(self, stream, callback=None):
339 self._stream = stream
340 self._callback = callback
341
342 def read(self, *args, **kwargs):
343 value = self._stream.read(*args, **kwargs)
344 if self._callback is not None:
345 self._callback(len(value))
346 return value
347
348
349class OSUtils:
350 def get_file_size(self, filename):
351 return os.path.getsize(filename)
352
353 def open_file_chunk_reader(self, filename, start_byte, size, callback):
354 return ReadFileChunk.from_filename(
355 filename, start_byte, size, callback, enable_callback=False
356 )
357
358 def open(self, filename, mode):
359 return open(filename, mode)
360
361 def remove_file(self, filename):
362 """Remove a file, noop if file does not exist."""
363 # Unlike os.remove, if the file does not exist,
364 # then this method does nothing.
365 try:
366 os.remove(filename)
367 except OSError:
368 pass
369
370 def rename_file(self, current_filename, new_filename):
371 s3transfer.compat.rename_file(current_filename, new_filename)
372
373
374class MultipartUploader:
375 # These are the extra_args that need to be forwarded onto
376 # subsequent upload_parts.
377 UPLOAD_PART_ARGS = [
378 'SSECustomerKey',
379 'SSECustomerAlgorithm',
380 'SSECustomerKeyMD5',
381 'RequestPayer',
382 ]
383
384 def __init__(
385 self,
386 client,
387 config,
388 osutil,
389 executor_cls=concurrent.futures.ThreadPoolExecutor,
390 ):
391 self._client = client
392 self._config = config
393 self._os = osutil
394 self._executor_cls = executor_cls
395
396 def _extra_upload_part_args(self, extra_args):
397 # Only the args in UPLOAD_PART_ARGS actually need to be passed
398 # onto the upload_part calls.
399 upload_parts_args = {}
400 for key, value in extra_args.items():
401 if key in self.UPLOAD_PART_ARGS:
402 upload_parts_args[key] = value
403 return upload_parts_args
404
405 def upload_file(self, filename, bucket, key, callback, extra_args):
406 response = self._client.create_multipart_upload(
407 Bucket=bucket, Key=key, **extra_args
408 )
409 upload_id = response['UploadId']
410 try:
411 parts = self._upload_parts(
412 upload_id, filename, bucket, key, callback, extra_args
413 )
414 except Exception as e:
415 logger.debug(
416 "Exception raised while uploading parts, "
417 "aborting multipart upload.",
418 exc_info=True,
419 )
420 self._client.abort_multipart_upload(
421 Bucket=bucket, Key=key, UploadId=upload_id
422 )
423 raise S3UploadFailedError(
424 "Failed to upload {} to {}: {}".format(
425 filename, '/'.join([bucket, key]), e
426 )
427 )
428 self._client.complete_multipart_upload(
429 Bucket=bucket,
430 Key=key,
431 UploadId=upload_id,
432 MultipartUpload={'Parts': parts},
433 )
434
435 def _upload_parts(
436 self, upload_id, filename, bucket, key, callback, extra_args
437 ):
438 upload_parts_extra_args = self._extra_upload_part_args(extra_args)
439 parts = []
440 part_size = self._config.multipart_chunksize
441 num_parts = int(
442 math.ceil(self._os.get_file_size(filename) / float(part_size))
443 )
444 max_workers = self._config.max_concurrency
445 with self._executor_cls(max_workers=max_workers) as executor:
446 upload_partial = functools.partial(
447 self._upload_one_part,
448 filename,
449 bucket,
450 key,
451 upload_id,
452 part_size,
453 upload_parts_extra_args,
454 callback,
455 )
456 for part in executor.map(upload_partial, range(1, num_parts + 1)):
457 parts.append(part)
458 return parts
459
460 def _upload_one_part(
461 self,
462 filename,
463 bucket,
464 key,
465 upload_id,
466 part_size,
467 extra_args,
468 callback,
469 part_number,
470 ):
471 open_chunk_reader = self._os.open_file_chunk_reader
472 with open_chunk_reader(
473 filename, part_size * (part_number - 1), part_size, callback
474 ) as body:
475 response = self._client.upload_part(
476 Bucket=bucket,
477 Key=key,
478 UploadId=upload_id,
479 PartNumber=part_number,
480 Body=body,
481 **extra_args,
482 )
483 etag = response['ETag']
484 return {'ETag': etag, 'PartNumber': part_number}
485
486
487class ShutdownQueue(queue.Queue):
488 """A queue implementation that can be shutdown.
489
490 Shutting down a queue means that this class adds a
491 trigger_shutdown method that will trigger all subsequent
492 calls to put() to fail with a ``QueueShutdownError``.
493
494 It purposefully deviates from queue.Queue, and is *not* meant
495 to be a drop in replacement for ``queue.Queue``.
496
497 """
498
499 def _init(self, maxsize):
500 self._shutdown = False
501 self._shutdown_lock = threading.Lock()
502 # queue.Queue is an old style class so we don't use super().
503 return queue.Queue._init(self, maxsize)
504
505 def trigger_shutdown(self):
506 with self._shutdown_lock:
507 self._shutdown = True
508 logger.debug("The IO queue is now shutdown.")
509
510 def put(self, item):
511 # Note: this is not sufficient, it's still possible to deadlock!
512 # Need to hook into the condition vars used by this class.
513 with self._shutdown_lock:
514 if self._shutdown:
515 raise QueueShutdownError(
516 "Cannot put item to queue when " "queue has been shutdown."
517 )
518 return queue.Queue.put(self, item)
519
520
521class MultipartDownloader:
522 def __init__(
523 self,
524 client,
525 config,
526 osutil,
527 executor_cls=concurrent.futures.ThreadPoolExecutor,
528 ):
529 self._client = client
530 self._config = config
531 self._os = osutil
532 self._executor_cls = executor_cls
533 self._ioqueue = ShutdownQueue(self._config.max_io_queue)
534
535 def download_file(
536 self, bucket, key, filename, object_size, extra_args, callback=None
537 ):
538 with self._executor_cls(max_workers=2) as controller:
539 # 1 thread for the future that manages the uploading of files
540 # 1 thread for the future that manages IO writes.
541 download_parts_handler = functools.partial(
542 self._download_file_as_future,
543 bucket,
544 key,
545 filename,
546 object_size,
547 callback,
548 )
549 parts_future = controller.submit(download_parts_handler)
550
551 io_writes_handler = functools.partial(
552 self._perform_io_writes, filename
553 )
554 io_future = controller.submit(io_writes_handler)
555 results = concurrent.futures.wait(
556 [parts_future, io_future],
557 return_when=concurrent.futures.FIRST_EXCEPTION,
558 )
559 self._process_future_results(results)
560
561 def _process_future_results(self, futures):
562 finished, unfinished = futures
563 for future in finished:
564 future.result()
565
566 def _download_file_as_future(
567 self, bucket, key, filename, object_size, callback
568 ):
569 part_size = self._config.multipart_chunksize
570 num_parts = int(math.ceil(object_size / float(part_size)))
571 max_workers = self._config.max_concurrency
572 download_partial = functools.partial(
573 self._download_range,
574 bucket,
575 key,
576 filename,
577 part_size,
578 num_parts,
579 callback,
580 )
581 try:
582 with self._executor_cls(max_workers=max_workers) as executor:
583 list(executor.map(download_partial, range(num_parts)))
584 finally:
585 self._ioqueue.put(SHUTDOWN_SENTINEL)
586
587 def _calculate_range_param(self, part_size, part_index, num_parts):
588 start_range = part_index * part_size
589 if part_index == num_parts - 1:
590 end_range = ''
591 else:
592 end_range = start_range + part_size - 1
593 range_param = f'bytes={start_range}-{end_range}'
594 return range_param
595
596 def _download_range(
597 self, bucket, key, filename, part_size, num_parts, callback, part_index
598 ):
599 try:
600 range_param = self._calculate_range_param(
601 part_size, part_index, num_parts
602 )
603
604 max_attempts = self._config.num_download_attempts
605 last_exception = None
606 for i in range(max_attempts):
607 try:
608 logger.debug("Making get_object call.")
609 response = self._client.get_object(
610 Bucket=bucket, Key=key, Range=range_param
611 )
612 streaming_body = StreamReaderProgress(
613 response['Body'], callback
614 )
615 buffer_size = 1024 * 16
616 current_index = part_size * part_index
617 for chunk in iter(
618 lambda: streaming_body.read(buffer_size), b''
619 ):
620 self._ioqueue.put((current_index, chunk))
621 current_index += len(chunk)
622 return
623 except (
624 socket.timeout,
625 OSError,
626 ReadTimeoutError,
627 IncompleteReadError,
628 ResponseStreamingError,
629 ) as e:
630 logger.debug(
631 "Retrying exception caught (%s), "
632 "retrying request, (attempt %s / %s)",
633 e,
634 i,
635 max_attempts,
636 exc_info=True,
637 )
638 last_exception = e
639 continue
640 raise RetriesExceededError(last_exception)
641 finally:
642 logger.debug("EXITING _download_range for part: %s", part_index)
643
644 def _perform_io_writes(self, filename):
645 with self._os.open(filename, 'wb') as f:
646 while True:
647 task = self._ioqueue.get()
648 if task is SHUTDOWN_SENTINEL:
649 logger.debug(
650 "Shutdown sentinel received in IO handler, "
651 "shutting down IO handler."
652 )
653 return
654 else:
655 try:
656 offset, data = task
657 f.seek(offset)
658 f.write(data)
659 except Exception as e:
660 logger.debug(
661 "Caught exception in IO thread: %s",
662 e,
663 exc_info=True,
664 )
665 self._ioqueue.trigger_shutdown()
666 raise
667
668
669class TransferConfig:
670 def __init__(
671 self,
672 multipart_threshold=8 * MB,
673 max_concurrency=10,
674 multipart_chunksize=8 * MB,
675 num_download_attempts=5,
676 max_io_queue=100,
677 ):
678 self.multipart_threshold = multipart_threshold
679 self.max_concurrency = max_concurrency
680 self.multipart_chunksize = multipart_chunksize
681 self.num_download_attempts = num_download_attempts
682 self.max_io_queue = max_io_queue
683
684
685class S3Transfer:
686 ALLOWED_DOWNLOAD_ARGS = [
687 'VersionId',
688 'SSECustomerAlgorithm',
689 'SSECustomerKey',
690 'SSECustomerKeyMD5',
691 'RequestPayer',
692 ]
693
694 ALLOWED_UPLOAD_ARGS = [
695 'ACL',
696 'CacheControl',
697 'ContentDisposition',
698 'ContentEncoding',
699 'ContentLanguage',
700 'ContentType',
701 'Expires',
702 'GrantFullControl',
703 'GrantRead',
704 'GrantReadACP',
705 'GrantWriteACL',
706 'Metadata',
707 'RequestPayer',
708 'ServerSideEncryption',
709 'StorageClass',
710 'SSECustomerAlgorithm',
711 'SSECustomerKey',
712 'SSECustomerKeyMD5',
713 'SSEKMSKeyId',
714 'SSEKMSEncryptionContext',
715 'Tagging',
716 ]
717
718 def __init__(self, client, config=None, osutil=None):
719 self._client = client
720 self._client.meta.events.register(
721 'before-call.s3.*', self._update_checksum_context
722 )
723 if config is None:
724 config = TransferConfig()
725 self._config = config
726 if osutil is None:
727 osutil = OSUtils()
728 self._osutil = osutil
729
730 def _update_checksum_context(self, params, **kwargs):
731 request_context = params.get("context", {})
732 checksum_context = request_context.get("checksum", {})
733 if "request_algorithm" in checksum_context:
734 # Force request checksum algorithm in the header if specified.
735 checksum_context["request_algorithm"]["in"] = "header"
736
737 def upload_file(
738 self, filename, bucket, key, callback=None, extra_args=None
739 ):
740 """Upload a file to an S3 object.
741
742 Variants have also been injected into S3 client, Bucket and Object.
743 You don't have to use S3Transfer.upload_file() directly.
744 """
745 if extra_args is None:
746 extra_args = {}
747 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
748 events = self._client.meta.events
749 events.register_first(
750 'request-created.s3',
751 disable_upload_callbacks,
752 unique_id='s3upload-callback-disable',
753 )
754 events.register_last(
755 'request-created.s3',
756 enable_upload_callbacks,
757 unique_id='s3upload-callback-enable',
758 )
759 if (
760 self._osutil.get_file_size(filename)
761 >= self._config.multipart_threshold
762 ):
763 self._multipart_upload(filename, bucket, key, callback, extra_args)
764 else:
765 self._put_object(filename, bucket, key, callback, extra_args)
766
767 def _put_object(self, filename, bucket, key, callback, extra_args):
768 # We're using open_file_chunk_reader so we can take advantage of the
769 # progress callback functionality.
770 open_chunk_reader = self._osutil.open_file_chunk_reader
771 with open_chunk_reader(
772 filename,
773 0,
774 self._osutil.get_file_size(filename),
775 callback=callback,
776 ) as body:
777 self._client.put_object(
778 Bucket=bucket, Key=key, Body=body, **extra_args
779 )
780
781 def download_file(
782 self, bucket, key, filename, extra_args=None, callback=None
783 ):
784 """Download an S3 object to a file.
785
786 Variants have also been injected into S3 client, Bucket and Object.
787 You don't have to use S3Transfer.download_file() directly.
788 """
789 # This method will issue a ``head_object`` request to determine
790 # the size of the S3 object. This is used to determine if the
791 # object is downloaded in parallel.
792 if extra_args is None:
793 extra_args = {}
794 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
795 object_size = self._object_size(bucket, key, extra_args)
796 temp_filename = filename + os.extsep + random_file_extension()
797 try:
798 self._download_file(
799 bucket, key, temp_filename, object_size, extra_args, callback
800 )
801 except Exception:
802 logger.debug(
803 "Exception caught in download_file, removing partial "
804 "file: %s",
805 temp_filename,
806 exc_info=True,
807 )
808 self._osutil.remove_file(temp_filename)
809 raise
810 else:
811 self._osutil.rename_file(temp_filename, filename)
812
813 def _download_file(
814 self, bucket, key, filename, object_size, extra_args, callback
815 ):
816 if object_size >= self._config.multipart_threshold:
817 self._ranged_download(
818 bucket, key, filename, object_size, extra_args, callback
819 )
820 else:
821 self._get_object(bucket, key, filename, extra_args, callback)
822
823 def _validate_all_known_args(self, actual, allowed):
824 for kwarg in actual:
825 if kwarg not in allowed:
826 raise ValueError(
827 f"Invalid extra_args key '{kwarg}', "
828 f"must be one of: {', '.join(allowed)}"
829 )
830
831 def _ranged_download(
832 self, bucket, key, filename, object_size, extra_args, callback
833 ):
834 downloader = MultipartDownloader(
835 self._client, self._config, self._osutil
836 )
837 downloader.download_file(
838 bucket, key, filename, object_size, extra_args, callback
839 )
840
841 def _get_object(self, bucket, key, filename, extra_args, callback):
842 # precondition: num_download_attempts > 0
843 max_attempts = self._config.num_download_attempts
844 last_exception = None
845 for i in range(max_attempts):
846 try:
847 return self._do_get_object(
848 bucket, key, filename, extra_args, callback
849 )
850 except (
851 socket.timeout,
852 OSError,
853 ReadTimeoutError,
854 IncompleteReadError,
855 ResponseStreamingError,
856 ) as e:
857 # TODO: we need a way to reset the callback if the
858 # download failed.
859 logger.debug(
860 "Retrying exception caught (%s), "
861 "retrying request, (attempt %s / %s)",
862 e,
863 i,
864 max_attempts,
865 exc_info=True,
866 )
867 last_exception = e
868 continue
869 raise RetriesExceededError(last_exception)
870
871 def _do_get_object(self, bucket, key, filename, extra_args, callback):
872 response = self._client.get_object(
873 Bucket=bucket, Key=key, **extra_args
874 )
875 streaming_body = StreamReaderProgress(response['Body'], callback)
876 with self._osutil.open(filename, 'wb') as f:
877 for chunk in iter(lambda: streaming_body.read(8192), b''):
878 f.write(chunk)
879
880 def _object_size(self, bucket, key, extra_args):
881 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
882 'ContentLength'
883 ]
884
885 def _multipart_upload(self, filename, bucket, key, callback, extra_args):
886 uploader = MultipartUploader(self._client, self._config, self._osutil)
887 uploader.upload_file(filename, bucket, key, callback, extra_args)