Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/boto3/s3/transfer.py: 33%
115 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# https://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Progress callbacks to monitor transfers
22* Retries. While botocore handles retries for streaming uploads,
23 it is not possible for it to handle retries for streaming
24 downloads. This module handles retries for both cases so
25 you don't need to implement any retry logic yourself.
27This module has a reasonable set of defaults. It also allows you
28to configure many aspects of the transfer process including:
30* Multipart threshold size
31* Max parallel downloads
32* Socket timeouts
33* Retry amounts
35There is no support for s3->s3 multipart copies at this
36time.
39.. _ref_s3transfer_usage:
41Usage
42=====
44The simplest way to use this module is:
46.. code-block:: python
48 client = boto3.client('s3', 'us-west-2')
49 transfer = S3Transfer(client)
50 # Upload /tmp/myfile to s3://bucket/key
51 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
53 # Download s3://bucket/key to /tmp/myfile
54 transfer.download_file('bucket', 'key', '/tmp/myfile')
56The ``upload_file`` and ``download_file`` methods also accept
57``**kwargs``, which will be forwarded through to the corresponding
58client operation. Here are a few examples using ``upload_file``::
60 # Making the object public
61 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
62 extra_args={'ACL': 'public-read'})
64 # Setting metadata
65 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
68 # Setting content type
69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
70 extra_args={'ContentType': "application/json"})
73The ``S3Transfer`` class also supports progress callbacks so you can
74provide transfer progress to users. Both the ``upload_file`` and
75``download_file`` methods take an optional ``callback`` parameter.
76Here's an example of how to print a simple progress percentage
77to the user:
79.. code-block:: python
81 class ProgressPercentage(object):
82 def __init__(self, filename):
83 self._filename = filename
84 self._size = float(os.path.getsize(filename))
85 self._seen_so_far = 0
86 self._lock = threading.Lock()
88 def __call__(self, bytes_amount):
89 # To simplify we'll assume this is hooked up
90 # to a single filename.
91 with self._lock:
92 self._seen_so_far += bytes_amount
93 percentage = (self._seen_so_far / self._size) * 100
94 sys.stdout.write(
95 "\r%s %s / %s (%.2f%%)" % (
96 self._filename, self._seen_so_far, self._size,
97 percentage))
98 sys.stdout.flush()
101 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
102 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
103 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
104 callback=ProgressPercentage('/tmp/myfile'))
108You can also provide a TransferConfig object to the S3Transfer
109object that gives you more fine grained control over the
110transfer. For example:
112.. code-block:: python
114 client = boto3.client('s3', 'us-west-2')
115 config = TransferConfig(
116 multipart_threshold=8 * 1024 * 1024,
117 max_concurrency=10,
118 num_download_attempts=10,
119 )
120 transfer = S3Transfer(client, config)
121 transfer.upload_file('/tmp/foo', 'bucket', 'key')
124"""
125import logging
126import threading
127from os import PathLike, fspath, getpid
129from botocore.compat import HAS_CRT
130from botocore.exceptions import ClientError
131from s3transfer.exceptions import (
132 RetriesExceededError as S3TransferRetriesExceededError,
133)
134from s3transfer.futures import NonThreadedExecutor
135from s3transfer.manager import TransferConfig as S3TransferConfig
136from s3transfer.manager import TransferManager
137from s3transfer.subscribers import BaseSubscriber
138from s3transfer.utils import OSUtils
140import boto3.s3.constants as constants
141from boto3.exceptions import RetriesExceededError, S3UploadFailedError
143if HAS_CRT:
144 import awscrt.s3
146 from boto3.crt import create_crt_transfer_manager
148KB = 1024
149MB = KB * KB
151logger = logging.getLogger(__name__)
154def create_transfer_manager(client, config, osutil=None):
155 """Creates a transfer manager based on configuration
157 :type client: boto3.client
158 :param client: The S3 client to use
160 :type config: boto3.s3.transfer.TransferConfig
161 :param config: The transfer config to use
163 :type osutil: s3transfer.utils.OSUtils
164 :param osutil: The os utility to use
166 :rtype: s3transfer.manager.TransferManager
167 :returns: A transfer manager based on parameters provided
168 """
169 if _should_use_crt(config):
170 crt_transfer_manager = create_crt_transfer_manager(client, config)
171 if crt_transfer_manager is not None:
172 logger.debug(
173 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}"
174 )
175 return crt_transfer_manager
177 # If we don't resolve something above, fallback to the default.
178 logger.debug(
179 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}"
180 )
181 return _create_default_transfer_manager(client, config, osutil)
184def _should_use_crt(config):
185 # This feature requires awscrt>=0.19.17
186 if HAS_CRT and has_minimum_crt_version((0, 19, 17)):
187 is_optimized_instance = awscrt.s3.is_optimized_for_system()
188 else:
189 is_optimized_instance = False
190 pref_transfer_client = config.preferred_transfer_client.lower()
192 if (
193 is_optimized_instance
194 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
195 ):
196 logger.debug(
197 "Attempting to use CRTTransferManager. Config settings may be ignored."
198 )
199 return True
201 logger.debug(
202 "Opting out of CRT Transfer Manager. Preferred client: "
203 f"{pref_transfer_client}, CRT available: {HAS_CRT}, "
204 f"Instance Optimized: {is_optimized_instance}."
205 )
206 return False
209def has_minimum_crt_version(minimum_version):
210 """Not intended for use outside boto3."""
211 if not HAS_CRT:
212 return False
214 crt_version_str = awscrt.__version__
215 try:
216 crt_version_ints = map(int, crt_version_str.split("."))
217 crt_version_tuple = tuple(crt_version_ints)
218 except (TypeError, ValueError):
219 return False
221 return crt_version_tuple >= minimum_version
224def _create_default_transfer_manager(client, config, osutil):
225 """Create the default TransferManager implementation for s3transfer."""
226 executor_cls = None
227 if not config.use_threads:
228 executor_cls = NonThreadedExecutor
229 return TransferManager(client, config, osutil, executor_cls)
232class TransferConfig(S3TransferConfig):
233 ALIAS = {
234 'max_concurrency': 'max_request_concurrency',
235 'max_io_queue': 'max_io_queue_size',
236 }
238 def __init__(
239 self,
240 multipart_threshold=8 * MB,
241 max_concurrency=10,
242 multipart_chunksize=8 * MB,
243 num_download_attempts=5,
244 max_io_queue=100,
245 io_chunksize=256 * KB,
246 use_threads=True,
247 max_bandwidth=None,
248 preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT,
249 ):
250 """Configuration object for managed S3 transfers
252 :param multipart_threshold: The transfer size threshold for which
253 multipart uploads, downloads, and copies will automatically be
254 triggered.
256 :param max_concurrency: The maximum number of threads that will be
257 making requests to perform a transfer. If ``use_threads`` is
258 set to ``False``, the value provided is ignored as the transfer
259 will only ever use the main thread.
261 :param multipart_chunksize: The partition size of each part for a
262 multipart transfer.
264 :param num_download_attempts: The number of download attempts that
265 will be retried upon errors with downloading an object in S3.
266 Note that these retries account for errors that occur when
267 streaming down the data from s3 (i.e. socket errors and read
268 timeouts that occur after receiving an OK response from s3).
269 Other retryable exceptions such as throttling errors and 5xx
270 errors are already retried by botocore (this default is 5). This
271 does not take into account the number of exceptions retried by
272 botocore.
274 :param max_io_queue: The maximum amount of read parts that can be
275 queued in memory to be written for a download. The size of each
276 of these read parts is at most the size of ``io_chunksize``.
278 :param io_chunksize: The max size of each chunk in the io queue.
279 Currently, this is size used when ``read`` is called on the
280 downloaded stream as well.
282 :param use_threads: If True, threads will be used when performing
283 S3 transfers. If False, no threads will be used in
284 performing transfers; all logic will be run in the main thread.
286 :param max_bandwidth: The maximum bandwidth that will be consumed
287 in uploading and downloading file content. The value is an integer
288 in terms of bytes per second.
290 :param preferred_transfer_client: String specifying preferred transfer
291 client for transfer operations.
293 Current supported settings are:
294 * auto (default) - Use the CRTTransferManager when calls
295 are made with supported environment and settings.
296 * classic - Only use the origin S3TransferManager with
297 requests. Disables possible CRT upgrade on requests.
298 """
299 super().__init__(
300 multipart_threshold=multipart_threshold,
301 max_request_concurrency=max_concurrency,
302 multipart_chunksize=multipart_chunksize,
303 num_download_attempts=num_download_attempts,
304 max_io_queue_size=max_io_queue,
305 io_chunksize=io_chunksize,
306 max_bandwidth=max_bandwidth,
307 )
308 # Some of the argument names are not the same as the inherited
309 # S3TransferConfig so we add aliases so you can still access the
310 # old version of the names.
311 for alias in self.ALIAS:
312 setattr(self, alias, getattr(self, self.ALIAS[alias]))
313 self.use_threads = use_threads
314 self.preferred_transfer_client = preferred_transfer_client
316 def __setattr__(self, name, value):
317 # If the alias name is used, make sure we set the name that it points
318 # to as that is what actually is used in governing the TransferManager.
319 if name in self.ALIAS:
320 super().__setattr__(self.ALIAS[name], value)
321 # Always set the value of the actual name provided.
322 super().__setattr__(name, value)
325class S3Transfer:
326 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
327 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
329 def __init__(self, client=None, config=None, osutil=None, manager=None):
330 if not client and not manager:
331 raise ValueError(
332 'Either a boto3.Client or s3transfer.manager.TransferManager '
333 'must be provided'
334 )
335 if manager and any([client, config, osutil]):
336 raise ValueError(
337 'Manager cannot be provided with client, config, '
338 'nor osutil. These parameters are mutually exclusive.'
339 )
340 if config is None:
341 config = TransferConfig()
342 if osutil is None:
343 osutil = OSUtils()
344 if manager:
345 self._manager = manager
346 else:
347 self._manager = create_transfer_manager(client, config, osutil)
349 def upload_file(
350 self, filename, bucket, key, callback=None, extra_args=None
351 ):
352 """Upload a file to an S3 object.
354 Variants have also been injected into S3 client, Bucket and Object.
355 You don't have to use S3Transfer.upload_file() directly.
357 .. seealso::
358 :py:meth:`S3.Client.upload_file`
359 :py:meth:`S3.Client.upload_fileobj`
360 """
361 if isinstance(filename, PathLike):
362 filename = fspath(filename)
363 if not isinstance(filename, str):
364 raise ValueError('Filename must be a string or a path-like object')
366 subscribers = self._get_subscribers(callback)
367 future = self._manager.upload(
368 filename, bucket, key, extra_args, subscribers
369 )
370 try:
371 future.result()
372 # If a client error was raised, add the backwards compatibility layer
373 # that raises a S3UploadFailedError. These specific errors were only
374 # ever thrown for upload_parts but now can be thrown for any related
375 # client error.
376 except ClientError as e:
377 raise S3UploadFailedError(
378 "Failed to upload {} to {}: {}".format(
379 filename, '/'.join([bucket, key]), e
380 )
381 )
383 def download_file(
384 self, bucket, key, filename, extra_args=None, callback=None
385 ):
386 """Download an S3 object to a file.
388 Variants have also been injected into S3 client, Bucket and Object.
389 You don't have to use S3Transfer.download_file() directly.
391 .. seealso::
392 :py:meth:`S3.Client.download_file`
393 :py:meth:`S3.Client.download_fileobj`
394 """
395 if isinstance(filename, PathLike):
396 filename = fspath(filename)
397 if not isinstance(filename, str):
398 raise ValueError('Filename must be a string or a path-like object')
400 subscribers = self._get_subscribers(callback)
401 future = self._manager.download(
402 bucket, key, filename, extra_args, subscribers
403 )
404 try:
405 future.result()
406 # This is for backwards compatibility where when retries are
407 # exceeded we need to throw the same error from boto3 instead of
408 # s3transfer's built in RetriesExceededError as current users are
409 # catching the boto3 one instead of the s3transfer exception to do
410 # their own retries.
411 except S3TransferRetriesExceededError as e:
412 raise RetriesExceededError(e.last_exception)
414 def _get_subscribers(self, callback):
415 if not callback:
416 return None
417 return [ProgressCallbackInvoker(callback)]
419 def __enter__(self):
420 return self
422 def __exit__(self, *args):
423 self._manager.__exit__(*args)
426class ProgressCallbackInvoker(BaseSubscriber):
427 """A back-compat wrapper to invoke a provided callback via a subscriber
429 :param callback: A callable that takes a single positional argument for
430 how many bytes were transferred.
431 """
433 def __init__(self, callback):
434 self._callback = callback
436 def on_progress(self, bytes_transferred, **kwargs):
437 self._callback(bytes_transferred)