1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
14
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
17
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Throttling based on max bandwidth
22* Progress callbacks to monitor transfers
23* Retries. While botocore handles retries for streaming uploads,
24 it is not possible for it to handle retries for streaming
25 downloads. This module handles retries for both cases so
26 you don't need to implement any retry logic yourself.
27
28This module has a reasonable set of defaults. It also allows you
29to configure many aspects of the transfer process including:
30
31* Multipart threshold size
32* Max parallel downloads
33* Max bandwidth
34* Socket timeouts
35* Retry amounts
36
37There is no support for s3->s3 multipart copies at this
38time.
39
40
41.. _ref_s3transfer_usage:
42
43Usage
44=====
45
46The simplest way to use this module is:
47
48.. code-block:: python
49
50 client = boto3.client('s3', 'us-west-2')
51 transfer = S3Transfer(client)
52 # Upload /tmp/myfile to s3://bucket/key
53 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
54
55 # Download s3://bucket/key to /tmp/myfile
56 transfer.download_file('bucket', 'key', '/tmp/myfile')
57
58The ``upload_file`` and ``download_file`` methods also accept
59``**kwargs``, which will be forwarded through to the corresponding
60client operation. Here are a few examples using ``upload_file``::
61
62 # Making the object public
63 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
64 extra_args={'ACL': 'public-read'})
65
66 # Setting metadata
67 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
69
70 # Setting content type
71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
72 extra_args={'ContentType': "application/json"})
73
74
75The ``S3Transfer`` class also supports progress callbacks so you can
76provide transfer progress to users. Both the ``upload_file`` and
77``download_file`` methods take an optional ``callback`` parameter.
78Here's an example of how to print a simple progress percentage
79to the user:
80
81.. code-block:: python
82
83 class ProgressPercentage(object):
84 def __init__(self, filename):
85 self._filename = filename
86 self._size = float(os.path.getsize(filename))
87 self._seen_so_far = 0
88 self._lock = threading.Lock()
89
90 def __call__(self, bytes_amount):
91 # To simplify we'll assume this is hooked up
92 # to a single filename.
93 with self._lock:
94 self._seen_so_far += bytes_amount
95 percentage = (self._seen_so_far / self._size) * 100
96 sys.stdout.write(
97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
98 self._size, percentage))
99 sys.stdout.flush()
100
101
102 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
103 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
104 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
105 callback=ProgressPercentage('/tmp/myfile'))
106
107
108
109You can also provide a TransferConfig object to the S3Transfer
110object that gives you more fine grained control over the
111transfer. For example:
112
113.. code-block:: python
114
115 client = boto3.client('s3', 'us-west-2')
116 config = TransferConfig(
117 multipart_threshold=8 * 1024 * 1024,
118 max_concurrency=10,
119 num_download_attempts=10,
120 )
121 transfer = S3Transfer(client, config)
122 transfer.upload_file('/tmp/foo', 'bucket', 'key')
123
124
125"""
126
127import concurrent.futures
128import functools
129import logging
130import math
131import os
132import queue
133import random
134import socket
135import string
136import threading
137from logging import NullHandler
138
139from botocore.compat import six # noqa: F401
140from botocore.exceptions import IncompleteReadError, ResponseStreamingError
141from botocore.vendored.requests.packages.urllib3.exceptions import (
142 ReadTimeoutError,
143)
144
145import s3transfer.compat
146from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
147
148__author__ = 'Amazon Web Services'
149__version__ = '0.13.1'
150
151
152logger = logging.getLogger(__name__)
153logger.addHandler(NullHandler())
154
155MB = 1024 * 1024
156SHUTDOWN_SENTINEL = object()
157
158
159def random_file_extension(num_digits=8):
160 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
161
162
163def disable_upload_callbacks(request, operation_name, **kwargs):
164 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
165 request.body, 'disable_callback'
166 ):
167 request.body.disable_callback()
168
169
170def enable_upload_callbacks(request, operation_name, **kwargs):
171 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
172 request.body, 'enable_callback'
173 ):
174 request.body.enable_callback()
175
176
177class QueueShutdownError(Exception):
178 pass
179
180
181class ReadFileChunk:
182 def __init__(
183 self,
184 fileobj,
185 start_byte,
186 chunk_size,
187 full_file_size,
188 callback=None,
189 enable_callback=True,
190 ):
191 """
192
193 Given a file object shown below:
194
195 |___________________________________________________|
196 0 | | full_file_size
197 |----chunk_size---|
198 start_byte
199
200 :type fileobj: file
201 :param fileobj: File like object
202
203 :type start_byte: int
204 :param start_byte: The first byte from which to start reading.
205
206 :type chunk_size: int
207 :param chunk_size: The max chunk size to read. Trying to read
208 pass the end of the chunk size will behave like you've
209 reached the end of the file.
210
211 :type full_file_size: int
212 :param full_file_size: The entire content length associated
213 with ``fileobj``.
214
215 :type callback: function(amount_read)
216 :param callback: Called whenever data is read from this object.
217
218 """
219 self._fileobj = fileobj
220 self._start_byte = start_byte
221 self._size = self._calculate_file_size(
222 self._fileobj,
223 requested_size=chunk_size,
224 start_byte=start_byte,
225 actual_file_size=full_file_size,
226 )
227 self._fileobj.seek(self._start_byte)
228 self._amount_read = 0
229 self._callback = callback
230 self._callback_enabled = enable_callback
231
232 @classmethod
233 def from_filename(
234 cls,
235 filename,
236 start_byte,
237 chunk_size,
238 callback=None,
239 enable_callback=True,
240 ):
241 """Convenience factory function to create from a filename.
242
243 :type start_byte: int
244 :param start_byte: The first byte from which to start reading.
245
246 :type chunk_size: int
247 :param chunk_size: The max chunk size to read. Trying to read
248 pass the end of the chunk size will behave like you've
249 reached the end of the file.
250
251 :type full_file_size: int
252 :param full_file_size: The entire content length associated
253 with ``fileobj``.
254
255 :type callback: function(amount_read)
256 :param callback: Called whenever data is read from this object.
257
258 :type enable_callback: bool
259 :param enable_callback: Indicate whether to invoke callback
260 during read() calls.
261
262 :rtype: ``ReadFileChunk``
263 :return: A new instance of ``ReadFileChunk``
264
265 """
266 f = open(filename, 'rb')
267 file_size = os.fstat(f.fileno()).st_size
268 return cls(
269 f, start_byte, chunk_size, file_size, callback, enable_callback
270 )
271
272 def _calculate_file_size(
273 self, fileobj, requested_size, start_byte, actual_file_size
274 ):
275 max_chunk_size = actual_file_size - start_byte
276 return min(max_chunk_size, requested_size)
277
278 def read(self, amount=None):
279 if amount is None:
280 amount_to_read = self._size - self._amount_read
281 else:
282 amount_to_read = min(self._size - self._amount_read, amount)
283 data = self._fileobj.read(amount_to_read)
284 self._amount_read += len(data)
285 if self._callback is not None and self._callback_enabled:
286 self._callback(len(data))
287 return data
288
289 def enable_callback(self):
290 self._callback_enabled = True
291
292 def disable_callback(self):
293 self._callback_enabled = False
294
295 def seek(self, where):
296 self._fileobj.seek(self._start_byte + where)
297 if self._callback is not None and self._callback_enabled:
298 # To also rewind the callback() for an accurate progress report
299 self._callback(where - self._amount_read)
300 self._amount_read = where
301
302 def close(self):
303 self._fileobj.close()
304
305 def tell(self):
306 return self._amount_read
307
308 def __len__(self):
309 # __len__ is defined because requests will try to determine the length
310 # of the stream to set a content length. In the normal case
311 # of the file it will just stat the file, but we need to change that
312 # behavior. By providing a __len__, requests will use that instead
313 # of stat'ing the file.
314 return self._size
315
316 def __enter__(self):
317 return self
318
319 def __exit__(self, *args, **kwargs):
320 self.close()
321
322 def __iter__(self):
323 # This is a workaround for http://bugs.python.org/issue17575
324 # Basically httplib will try to iterate over the contents, even
325 # if its a file like object. This wasn't noticed because we've
326 # already exhausted the stream so iterating over the file immediately
327 # stops, which is what we're simulating here.
328 return iter([])
329
330
331class StreamReaderProgress:
332 """Wrapper for a read only stream that adds progress callbacks."""
333
334 def __init__(self, stream, callback=None):
335 self._stream = stream
336 self._callback = callback
337
338 def read(self, *args, **kwargs):
339 value = self._stream.read(*args, **kwargs)
340 if self._callback is not None:
341 self._callback(len(value))
342 return value
343
344
345class OSUtils:
346 def get_file_size(self, filename):
347 return os.path.getsize(filename)
348
349 def open_file_chunk_reader(self, filename, start_byte, size, callback):
350 return ReadFileChunk.from_filename(
351 filename, start_byte, size, callback, enable_callback=False
352 )
353
354 def open(self, filename, mode):
355 return open(filename, mode)
356
357 def remove_file(self, filename):
358 """Remove a file, noop if file does not exist."""
359 # Unlike os.remove, if the file does not exist,
360 # then this method does nothing.
361 try:
362 os.remove(filename)
363 except OSError:
364 pass
365
366 def rename_file(self, current_filename, new_filename):
367 s3transfer.compat.rename_file(current_filename, new_filename)
368
369
370class MultipartUploader:
371 # These are the extra_args that need to be forwarded onto
372 # subsequent upload_parts.
373 UPLOAD_PART_ARGS = [
374 'SSECustomerKey',
375 'SSECustomerAlgorithm',
376 'SSECustomerKeyMD5',
377 'RequestPayer',
378 ]
379
380 def __init__(
381 self,
382 client,
383 config,
384 osutil,
385 executor_cls=concurrent.futures.ThreadPoolExecutor,
386 ):
387 self._client = client
388 self._config = config
389 self._os = osutil
390 self._executor_cls = executor_cls
391
392 def _extra_upload_part_args(self, extra_args):
393 # Only the args in UPLOAD_PART_ARGS actually need to be passed
394 # onto the upload_part calls.
395 upload_parts_args = {}
396 for key, value in extra_args.items():
397 if key in self.UPLOAD_PART_ARGS:
398 upload_parts_args[key] = value
399 return upload_parts_args
400
401 def upload_file(self, filename, bucket, key, callback, extra_args):
402 response = self._client.create_multipart_upload(
403 Bucket=bucket, Key=key, **extra_args
404 )
405 upload_id = response['UploadId']
406 try:
407 parts = self._upload_parts(
408 upload_id, filename, bucket, key, callback, extra_args
409 )
410 except Exception as e:
411 logger.debug(
412 "Exception raised while uploading parts, "
413 "aborting multipart upload.",
414 exc_info=True,
415 )
416 self._client.abort_multipart_upload(
417 Bucket=bucket, Key=key, UploadId=upload_id
418 )
419 raise S3UploadFailedError(
420 "Failed to upload {} to {}: {}".format(
421 filename, '/'.join([bucket, key]), e
422 )
423 )
424 self._client.complete_multipart_upload(
425 Bucket=bucket,
426 Key=key,
427 UploadId=upload_id,
428 MultipartUpload={'Parts': parts},
429 )
430
431 def _upload_parts(
432 self, upload_id, filename, bucket, key, callback, extra_args
433 ):
434 upload_parts_extra_args = self._extra_upload_part_args(extra_args)
435 parts = []
436 part_size = self._config.multipart_chunksize
437 num_parts = int(
438 math.ceil(self._os.get_file_size(filename) / float(part_size))
439 )
440 max_workers = self._config.max_concurrency
441 with self._executor_cls(max_workers=max_workers) as executor:
442 upload_partial = functools.partial(
443 self._upload_one_part,
444 filename,
445 bucket,
446 key,
447 upload_id,
448 part_size,
449 upload_parts_extra_args,
450 callback,
451 )
452 for part in executor.map(upload_partial, range(1, num_parts + 1)):
453 parts.append(part)
454 return parts
455
456 def _upload_one_part(
457 self,
458 filename,
459 bucket,
460 key,
461 upload_id,
462 part_size,
463 extra_args,
464 callback,
465 part_number,
466 ):
467 open_chunk_reader = self._os.open_file_chunk_reader
468 with open_chunk_reader(
469 filename, part_size * (part_number - 1), part_size, callback
470 ) as body:
471 response = self._client.upload_part(
472 Bucket=bucket,
473 Key=key,
474 UploadId=upload_id,
475 PartNumber=part_number,
476 Body=body,
477 **extra_args,
478 )
479 etag = response['ETag']
480 return {'ETag': etag, 'PartNumber': part_number}
481
482
483class ShutdownQueue(queue.Queue):
484 """A queue implementation that can be shutdown.
485
486 Shutting down a queue means that this class adds a
487 trigger_shutdown method that will trigger all subsequent
488 calls to put() to fail with a ``QueueShutdownError``.
489
490 It purposefully deviates from queue.Queue, and is *not* meant
491 to be a drop in replacement for ``queue.Queue``.
492
493 """
494
495 def _init(self, maxsize):
496 self._shutdown = False
497 self._shutdown_lock = threading.Lock()
498 # queue.Queue is an old style class so we don't use super().
499 return queue.Queue._init(self, maxsize)
500
501 def trigger_shutdown(self):
502 with self._shutdown_lock:
503 self._shutdown = True
504 logger.debug("The IO queue is now shutdown.")
505
506 def put(self, item):
507 # Note: this is not sufficient, it's still possible to deadlock!
508 # Need to hook into the condition vars used by this class.
509 with self._shutdown_lock:
510 if self._shutdown:
511 raise QueueShutdownError(
512 "Cannot put item to queue when queue has been shutdown."
513 )
514 return queue.Queue.put(self, item)
515
516
517class MultipartDownloader:
518 def __init__(
519 self,
520 client,
521 config,
522 osutil,
523 executor_cls=concurrent.futures.ThreadPoolExecutor,
524 ):
525 self._client = client
526 self._config = config
527 self._os = osutil
528 self._executor_cls = executor_cls
529 self._ioqueue = ShutdownQueue(self._config.max_io_queue)
530
531 def download_file(
532 self, bucket, key, filename, object_size, extra_args, callback=None
533 ):
534 with self._executor_cls(max_workers=2) as controller:
535 # 1 thread for the future that manages the uploading of files
536 # 1 thread for the future that manages IO writes.
537 download_parts_handler = functools.partial(
538 self._download_file_as_future,
539 bucket,
540 key,
541 filename,
542 object_size,
543 callback,
544 )
545 parts_future = controller.submit(download_parts_handler)
546
547 io_writes_handler = functools.partial(
548 self._perform_io_writes, filename
549 )
550 io_future = controller.submit(io_writes_handler)
551 results = concurrent.futures.wait(
552 [parts_future, io_future],
553 return_when=concurrent.futures.FIRST_EXCEPTION,
554 )
555 self._process_future_results(results)
556
557 def _process_future_results(self, futures):
558 finished, unfinished = futures
559 for future in finished:
560 future.result()
561
562 def _download_file_as_future(
563 self, bucket, key, filename, object_size, callback
564 ):
565 part_size = self._config.multipart_chunksize
566 num_parts = int(math.ceil(object_size / float(part_size)))
567 max_workers = self._config.max_concurrency
568 download_partial = functools.partial(
569 self._download_range,
570 bucket,
571 key,
572 filename,
573 part_size,
574 num_parts,
575 callback,
576 )
577 try:
578 with self._executor_cls(max_workers=max_workers) as executor:
579 list(executor.map(download_partial, range(num_parts)))
580 finally:
581 self._ioqueue.put(SHUTDOWN_SENTINEL)
582
583 def _calculate_range_param(self, part_size, part_index, num_parts):
584 start_range = part_index * part_size
585 if part_index == num_parts - 1:
586 end_range = ''
587 else:
588 end_range = start_range + part_size - 1
589 range_param = f'bytes={start_range}-{end_range}'
590 return range_param
591
592 def _download_range(
593 self, bucket, key, filename, part_size, num_parts, callback, part_index
594 ):
595 try:
596 range_param = self._calculate_range_param(
597 part_size, part_index, num_parts
598 )
599
600 max_attempts = self._config.num_download_attempts
601 last_exception = None
602 for i in range(max_attempts):
603 try:
604 logger.debug("Making get_object call.")
605 response = self._client.get_object(
606 Bucket=bucket, Key=key, Range=range_param
607 )
608 streaming_body = StreamReaderProgress(
609 response['Body'], callback
610 )
611 buffer_size = 1024 * 16
612 current_index = part_size * part_index
613 for chunk in iter(
614 lambda: streaming_body.read(buffer_size), b''
615 ):
616 self._ioqueue.put((current_index, chunk))
617 current_index += len(chunk)
618 return
619 except (
620 socket.timeout,
621 OSError,
622 ReadTimeoutError,
623 IncompleteReadError,
624 ResponseStreamingError,
625 ) as e:
626 logger.debug(
627 "Retrying exception caught (%s), "
628 "retrying request, (attempt %s / %s)",
629 e,
630 i,
631 max_attempts,
632 exc_info=True,
633 )
634 last_exception = e
635 continue
636 raise RetriesExceededError(last_exception)
637 finally:
638 logger.debug("EXITING _download_range for part: %s", part_index)
639
640 def _perform_io_writes(self, filename):
641 with self._os.open(filename, 'wb') as f:
642 while True:
643 task = self._ioqueue.get()
644 if task is SHUTDOWN_SENTINEL:
645 logger.debug(
646 "Shutdown sentinel received in IO handler, "
647 "shutting down IO handler."
648 )
649 return
650 else:
651 try:
652 offset, data = task
653 f.seek(offset)
654 f.write(data)
655 except Exception as e:
656 logger.debug(
657 "Caught exception in IO thread: %s",
658 e,
659 exc_info=True,
660 )
661 self._ioqueue.trigger_shutdown()
662 raise
663
664
665class TransferConfig:
666 def __init__(
667 self,
668 multipart_threshold=8 * MB,
669 max_concurrency=10,
670 multipart_chunksize=8 * MB,
671 num_download_attempts=5,
672 max_io_queue=100,
673 ):
674 self.multipart_threshold = multipart_threshold
675 self.max_concurrency = max_concurrency
676 self.multipart_chunksize = multipart_chunksize
677 self.num_download_attempts = num_download_attempts
678 self.max_io_queue = max_io_queue
679
680
681class S3Transfer:
682 ALLOWED_DOWNLOAD_ARGS = [
683 'VersionId',
684 'SSECustomerAlgorithm',
685 'SSECustomerKey',
686 'SSECustomerKeyMD5',
687 'RequestPayer',
688 ]
689
690 ALLOWED_UPLOAD_ARGS = [
691 'ACL',
692 'CacheControl',
693 'ContentDisposition',
694 'ContentEncoding',
695 'ContentLanguage',
696 'ContentType',
697 'Expires',
698 'GrantFullControl',
699 'GrantRead',
700 'GrantReadACP',
701 'GrantWriteACL',
702 'Metadata',
703 'RequestPayer',
704 'ServerSideEncryption',
705 'StorageClass',
706 'SSECustomerAlgorithm',
707 'SSECustomerKey',
708 'SSECustomerKeyMD5',
709 'SSEKMSKeyId',
710 'SSEKMSEncryptionContext',
711 'Tagging',
712 ]
713
714 def __init__(self, client, config=None, osutil=None):
715 self._client = client
716 self._client.meta.events.register(
717 'before-call.s3.*', self._update_checksum_context
718 )
719 if config is None:
720 config = TransferConfig()
721 self._config = config
722 if osutil is None:
723 osutil = OSUtils()
724 self._osutil = osutil
725
726 def _update_checksum_context(self, params, **kwargs):
727 request_context = params.get("context", {})
728 checksum_context = request_context.get("checksum", {})
729 if "request_algorithm" in checksum_context:
730 # Force request checksum algorithm in the header if specified.
731 checksum_context["request_algorithm"]["in"] = "header"
732
733 def upload_file(
734 self, filename, bucket, key, callback=None, extra_args=None
735 ):
736 """Upload a file to an S3 object.
737
738 Variants have also been injected into S3 client, Bucket and Object.
739 You don't have to use S3Transfer.upload_file() directly.
740 """
741 if extra_args is None:
742 extra_args = {}
743 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
744 events = self._client.meta.events
745 events.register_first(
746 'request-created.s3',
747 disable_upload_callbacks,
748 unique_id='s3upload-callback-disable',
749 )
750 events.register_last(
751 'request-created.s3',
752 enable_upload_callbacks,
753 unique_id='s3upload-callback-enable',
754 )
755 if (
756 self._osutil.get_file_size(filename)
757 >= self._config.multipart_threshold
758 ):
759 self._multipart_upload(filename, bucket, key, callback, extra_args)
760 else:
761 self._put_object(filename, bucket, key, callback, extra_args)
762
763 def _put_object(self, filename, bucket, key, callback, extra_args):
764 # We're using open_file_chunk_reader so we can take advantage of the
765 # progress callback functionality.
766 open_chunk_reader = self._osutil.open_file_chunk_reader
767 with open_chunk_reader(
768 filename,
769 0,
770 self._osutil.get_file_size(filename),
771 callback=callback,
772 ) as body:
773 self._client.put_object(
774 Bucket=bucket, Key=key, Body=body, **extra_args
775 )
776
777 def download_file(
778 self, bucket, key, filename, extra_args=None, callback=None
779 ):
780 """Download an S3 object to a file.
781
782 Variants have also been injected into S3 client, Bucket and Object.
783 You don't have to use S3Transfer.download_file() directly.
784 """
785 # This method will issue a ``head_object`` request to determine
786 # the size of the S3 object. This is used to determine if the
787 # object is downloaded in parallel.
788 if extra_args is None:
789 extra_args = {}
790 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
791 object_size = self._object_size(bucket, key, extra_args)
792 temp_filename = filename + os.extsep + random_file_extension()
793 try:
794 self._download_file(
795 bucket, key, temp_filename, object_size, extra_args, callback
796 )
797 except Exception:
798 logger.debug(
799 "Exception caught in download_file, removing partial file: %s",
800 temp_filename,
801 exc_info=True,
802 )
803 self._osutil.remove_file(temp_filename)
804 raise
805 else:
806 self._osutil.rename_file(temp_filename, filename)
807
808 def _download_file(
809 self, bucket, key, filename, object_size, extra_args, callback
810 ):
811 if object_size >= self._config.multipart_threshold:
812 self._ranged_download(
813 bucket, key, filename, object_size, extra_args, callback
814 )
815 else:
816 self._get_object(bucket, key, filename, extra_args, callback)
817
818 def _validate_all_known_args(self, actual, allowed):
819 for kwarg in actual:
820 if kwarg not in allowed:
821 raise ValueError(
822 f"Invalid extra_args key '{kwarg}', "
823 f"must be one of: {', '.join(allowed)}"
824 )
825
826 def _ranged_download(
827 self, bucket, key, filename, object_size, extra_args, callback
828 ):
829 downloader = MultipartDownloader(
830 self._client, self._config, self._osutil
831 )
832 downloader.download_file(
833 bucket, key, filename, object_size, extra_args, callback
834 )
835
836 def _get_object(self, bucket, key, filename, extra_args, callback):
837 # precondition: num_download_attempts > 0
838 max_attempts = self._config.num_download_attempts
839 last_exception = None
840 for i in range(max_attempts):
841 try:
842 return self._do_get_object(
843 bucket, key, filename, extra_args, callback
844 )
845 except (
846 socket.timeout,
847 OSError,
848 ReadTimeoutError,
849 IncompleteReadError,
850 ResponseStreamingError,
851 ) as e:
852 # TODO: we need a way to reset the callback if the
853 # download failed.
854 logger.debug(
855 "Retrying exception caught (%s), "
856 "retrying request, (attempt %s / %s)",
857 e,
858 i,
859 max_attempts,
860 exc_info=True,
861 )
862 last_exception = e
863 continue
864 raise RetriesExceededError(last_exception)
865
866 def _do_get_object(self, bucket, key, filename, extra_args, callback):
867 response = self._client.get_object(
868 Bucket=bucket, Key=key, **extra_args
869 )
870 streaming_body = StreamReaderProgress(response['Body'], callback)
871 with self._osutil.open(filename, 'wb') as f:
872 for chunk in iter(lambda: streaming_body.read(8192), b''):
873 f.write(chunk)
874
875 def _object_size(self, bucket, key, extra_args):
876 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
877 'ContentLength'
878 ]
879
880 def _multipart_upload(self, filename, bucket, key, callback, extra_args):
881 uploader = MultipartUploader(self._client, self._config, self._osutil)
882 uploader.upload_file(filename, bucket, key, callback, extra_args)