Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# https://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Progress callbacks to monitor transfers
22* Retries. While botocore handles retries for streaming uploads,
23 it is not possible for it to handle retries for streaming
24 downloads. This module handles retries for both cases so
25 you don't need to implement any retry logic yourself.
27This module has a reasonable set of defaults. It also allows you
28to configure many aspects of the transfer process including:
30* Multipart threshold size
31* Max parallel downloads
32* Socket timeouts
33* Retry amounts
35There is no support for s3->s3 multipart copies at this
36time.
39.. _ref_s3transfer_usage:
41Usage
42=====
44The simplest way to use this module is:
46.. code-block:: python
48 client = boto3.client('s3', 'us-west-2')
49 transfer = S3Transfer(client)
50 # Upload /tmp/myfile to s3://bucket/key
51 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
53 # Download s3://bucket/key to /tmp/myfile
54 transfer.download_file('bucket', 'key', '/tmp/myfile')
56The ``upload_file`` and ``download_file`` methods also accept
57``**kwargs``, which will be forwarded through to the corresponding
58client operation. Here are a few examples using ``upload_file``::
60 # Making the object public
61 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
62 extra_args={'ACL': 'public-read'})
64 # Setting metadata
65 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
68 # Setting content type
69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
70 extra_args={'ContentType': "application/json"})
73The ``S3Transfer`` class also supports progress callbacks so you can
74provide transfer progress to users. Both the ``upload_file`` and
75``download_file`` methods take an optional ``callback`` parameter.
76Here's an example of how to print a simple progress percentage
77to the user:
79.. code-block:: python
81 class ProgressPercentage(object):
82 def __init__(self, filename):
83 self._filename = filename
84 self._size = float(os.path.getsize(filename))
85 self._seen_so_far = 0
86 self._lock = threading.Lock()
88 def __call__(self, bytes_amount):
89 # To simplify we'll assume this is hooked up
90 # to a single filename.
91 with self._lock:
92 self._seen_so_far += bytes_amount
93 percentage = (self._seen_so_far / self._size) * 100
94 sys.stdout.write(
95 "\r%s %s / %s (%.2f%%)" % (
96 self._filename, self._seen_so_far, self._size,
97 percentage))
98 sys.stdout.flush()
101 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
102 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
103 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
104 callback=ProgressPercentage('/tmp/myfile'))
108You can also provide a TransferConfig object to the S3Transfer
109object that gives you more fine grained control over the
110transfer. For example:
112.. code-block:: python
114 client = boto3.client('s3', 'us-west-2')
115 config = TransferConfig(
116 multipart_threshold=8 * 1024 * 1024,
117 max_concurrency=10,
118 num_download_attempts=10,
119 )
120 transfer = S3Transfer(client, config)
121 transfer.upload_file('/tmp/foo', 'bucket', 'key')
124"""
126import logging
127import threading
128from os import PathLike, fspath, getpid
130from botocore.compat import HAS_CRT
131from botocore.exceptions import ClientError, MissingDependencyException
132from s3transfer.exceptions import (
133 RetriesExceededError as S3TransferRetriesExceededError,
134)
135from s3transfer.futures import NonThreadedExecutor
136from s3transfer.manager import TransferConfig as S3TransferConfig
137from s3transfer.manager import TransferManager
138from s3transfer.subscribers import BaseSubscriber
139from s3transfer.utils import OSUtils
141import boto3.s3.constants as constants
142from boto3.compat import TRANSFER_CONFIG_SUPPORTS_CRT
143from boto3.exceptions import (
144 RetriesExceededError,
145 S3UploadFailedError,
146)
148if HAS_CRT:
149 import awscrt.s3
151 from boto3.crt import create_crt_transfer_manager
153KB = 1024
154MB = KB * KB
156logger = logging.getLogger(__name__)
159def create_transfer_manager(client, config, osutil=None):
160 """Creates a transfer manager based on configuration
162 :type client: boto3.client
163 :param client: The S3 client to use
165 :type config: boto3.s3.transfer.TransferConfig
166 :param config: The transfer config to use
168 :type osutil: s3transfer.utils.OSUtils
169 :param osutil: The os utility to use
171 :rtype: s3transfer.manager.TransferManager
172 :returns: A transfer manager based on parameters provided
173 """
174 if _should_use_crt(config):
175 crt_transfer_manager = create_crt_transfer_manager(client, config)
176 if crt_transfer_manager is not None:
177 logger.debug(
178 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}"
179 )
180 return crt_transfer_manager
182 # If we don't resolve something above, fallback to the default.
183 logger.debug(
184 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}"
185 )
186 return _create_default_transfer_manager(client, config, osutil)
189def _should_use_crt(config):
190 # This feature requires awscrt>=0.19.18
191 has_min_crt = HAS_CRT and has_minimum_crt_version((0, 19, 18))
192 is_optimized_instance = has_min_crt and awscrt.s3.is_optimized_for_system()
193 pref_transfer_client = config.preferred_transfer_client.lower()
195 if (
196 pref_transfer_client == constants.CRT_TRANSFER_CLIENT
197 and not has_min_crt
198 ):
199 msg = (
200 "CRT transfer client is configured but is missing minimum CRT "
201 f"version. CRT installed: {HAS_CRT}"
202 )
203 if HAS_CRT:
204 msg += f", with version: {awscrt.__version__}"
205 raise MissingDependencyException(msg=msg)
207 if (
208 is_optimized_instance
209 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
210 ) or pref_transfer_client == constants.CRT_TRANSFER_CLIENT:
211 logger.debug(
212 "Attempting to use CRTTransferManager. Config settings may be ignored."
213 )
214 return True
216 logger.debug(
217 "Opting out of CRT Transfer Manager. Preferred client: "
218 f"{pref_transfer_client}, CRT available: {HAS_CRT}, "
219 f"Instance Optimized: {is_optimized_instance}."
220 )
221 return False
224def has_minimum_crt_version(minimum_version):
225 """Not intended for use outside boto3."""
226 if not HAS_CRT:
227 return False
229 crt_version_str = awscrt.__version__
230 try:
231 crt_version_ints = map(int, crt_version_str.split("."))
232 crt_version_tuple = tuple(crt_version_ints)
233 except (TypeError, ValueError):
234 return False
236 return crt_version_tuple >= minimum_version
239def _create_default_transfer_manager(client, config, osutil):
240 """Create the default TransferManager implementation for s3transfer."""
241 executor_cls = None
242 if not config.use_threads:
243 executor_cls = NonThreadedExecutor
244 return TransferManager(client, config, osutil, executor_cls)
247class TransferConfig(S3TransferConfig):
248 ALIAS = {
249 'max_concurrency': 'max_request_concurrency',
250 'max_io_queue': 'max_io_queue_size',
251 }
252 DEFAULTS = {
253 'multipart_threshold': 8 * MB,
254 'max_concurrency': 10,
255 'max_request_concurrency': 10,
256 'multipart_chunksize': 8 * MB,
257 'num_download_attempts': 5,
258 'max_io_queue': 100,
259 'max_io_queue_size': 100,
260 'io_chunksize': 256 * KB,
261 'use_threads': True,
262 'max_bandwidth': None,
263 'preferred_transfer_client': constants.AUTO_RESOLVE_TRANSFER_CLIENT,
264 }
266 def __init__(
267 self,
268 multipart_threshold=None,
269 max_concurrency=None,
270 multipart_chunksize=None,
271 num_download_attempts=None,
272 max_io_queue=None,
273 io_chunksize=None,
274 use_threads=None,
275 max_bandwidth=None,
276 preferred_transfer_client=None,
277 ):
278 """Configuration object for managed S3 transfers
280 :param multipart_threshold: The transfer size threshold for which
281 multipart uploads, downloads, and copies will automatically be
282 triggered.
284 :param max_concurrency: The maximum number of threads that will be
285 making requests to perform a transfer. If ``use_threads`` is
286 set to ``False``, the value provided is ignored as the transfer
287 will only ever use the current thread.
289 :param multipart_chunksize: The partition size of each part for a
290 multipart transfer.
292 :param num_download_attempts: The number of download attempts that
293 will be retried upon errors with downloading an object in S3.
294 Note that these retries account for errors that occur when
295 streaming down the data from s3 (i.e. socket errors and read
296 timeouts that occur after receiving an OK response from s3).
297 Other retryable exceptions such as throttling errors and 5xx
298 errors are already retried by botocore (this default is 5). This
299 does not take into account the number of exceptions retried by
300 botocore. Note: This value is ignored when resolved transfer
301 manager type is CRTTransferManager.
303 :param max_io_queue: The maximum amount of read parts that can be
304 queued in memory to be written for a download. The size of each
305 of these read parts is at most the size of ``io_chunksize``.
306 Note: This value is ignored when resolved transfer manager type
307 is CRTTransferManager.
309 :param io_chunksize: The max size of each chunk in the io queue.
310 Currently, this is size used when ``read`` is called on the
311 downloaded stream as well. Note: This value is ignored when
312 resolved transfer manager type is CRTTransferManager.
314 :param use_threads: If True, threads will be used when performing
315 S3 transfers. If False, no threads will be used in
316 performing transfers; all logic will be run in the current thread.
317 Note: This value is ignored when resolved transfer manager type is
318 CRTTransferManager.
320 :param max_bandwidth: The maximum bandwidth that will be consumed
321 in uploading and downloading file content. The value is an integer
322 in terms of bytes per second. Note: This value is ignored when
323 resolved transfer manager type is CRTTransferManager.
325 :param preferred_transfer_client: String specifying preferred transfer
326 client for transfer operations.
328 Current supported settings are:
329 * auto (default) - Use the CRTTransferManager when calls
330 are made with supported environment and settings.
331 * classic - Only use the origin S3TransferManager with
332 requests. Disables possible CRT upgrade on requests.
333 * crt - Only use the CRTTransferManager with requests.
334 """
335 init_args = {
336 'multipart_threshold': multipart_threshold,
337 'max_concurrency': max_concurrency,
338 'multipart_chunksize': multipart_chunksize,
339 'num_download_attempts': num_download_attempts,
340 'max_io_queue': max_io_queue,
341 'io_chunksize': io_chunksize,
342 'use_threads': use_threads,
343 'max_bandwidth': max_bandwidth,
344 'preferred_transfer_client': preferred_transfer_client,
345 }
346 resolved = self._resolve_init_args(init_args)
347 super().__init__(
348 multipart_threshold=resolved['multipart_threshold'],
349 max_request_concurrency=resolved['max_concurrency'],
350 multipart_chunksize=resolved['multipart_chunksize'],
351 num_download_attempts=resolved['num_download_attempts'],
352 max_io_queue_size=resolved['max_io_queue'],
353 io_chunksize=resolved['io_chunksize'],
354 max_bandwidth=resolved['max_bandwidth'],
355 )
356 # Some of the argument names are not the same as the inherited
357 # S3TransferConfig so we add aliases so you can still access the
358 # old version of the names.
359 for alias in self.ALIAS:
360 setattr(
361 self,
362 alias,
363 object.__getattribute__(self, self.ALIAS[alias]),
364 )
365 self.use_threads = resolved['use_threads']
366 self.preferred_transfer_client = resolved['preferred_transfer_client']
368 def __setattr__(self, name, value):
369 # If the alias name is used, make sure we set the name that it points
370 # to as that is what actually is used in governing the TransferManager.
371 if name in self.ALIAS:
372 super().__setattr__(self.ALIAS[name], value)
373 # Always set the value of the actual name provided.
374 super().__setattr__(name, value)
376 def __getattribute__(self, item):
377 value = object.__getattribute__(self, item)
378 if not TRANSFER_CONFIG_SUPPORTS_CRT:
379 return value
380 defaults = object.__getattribute__(self, 'DEFAULTS')
381 if item not in defaults:
382 return value
383 if value is self.UNSET_DEFAULT:
384 return defaults[item]
385 return value
387 def _resolve_init_args(self, init_args):
388 resolved = {}
389 for init_arg, val in init_args.items():
390 if val is not None:
391 resolved[init_arg] = val
392 elif TRANSFER_CONFIG_SUPPORTS_CRT:
393 resolved[init_arg] = self.UNSET_DEFAULT
394 else:
395 resolved[init_arg] = self.DEFAULTS[init_arg]
396 return resolved
399class S3Transfer:
400 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
401 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
402 ALLOWED_COPY_ARGS = TransferManager.ALLOWED_COPY_ARGS
404 def __init__(self, client=None, config=None, osutil=None, manager=None):
405 if not client and not manager:
406 raise ValueError(
407 'Either a boto3.Client or s3transfer.manager.TransferManager '
408 'must be provided'
409 )
410 if manager and any([client, config, osutil]):
411 raise ValueError(
412 'Manager cannot be provided with client, config, '
413 'nor osutil. These parameters are mutually exclusive.'
414 )
415 if config is None:
416 config = TransferConfig()
417 if osutil is None:
418 osutil = OSUtils()
419 if manager:
420 self._manager = manager
421 else:
422 self._manager = create_transfer_manager(client, config, osutil)
424 def upload_file(
425 self, filename, bucket, key, callback=None, extra_args=None
426 ):
427 """Upload a file to an S3 object.
429 Variants have also been injected into S3 client, Bucket and Object.
430 You don't have to use S3Transfer.upload_file() directly.
432 .. seealso::
433 :py:meth:`S3.Client.upload_file`
434 :py:meth:`S3.Client.upload_fileobj`
435 """
436 if isinstance(filename, PathLike):
437 filename = fspath(filename)
438 if not isinstance(filename, str):
439 raise ValueError('Filename must be a string or a path-like object')
441 subscribers = self._get_subscribers(callback)
442 future = self._manager.upload(
443 filename, bucket, key, extra_args, subscribers
444 )
445 try:
446 future.result()
447 # If a client error was raised, add the backwards compatibility layer
448 # that raises a S3UploadFailedError. These specific errors were only
449 # ever thrown for upload_parts but now can be thrown for any related
450 # client error.
451 except ClientError as e:
452 raise S3UploadFailedError(
453 "Failed to upload {} to {}: {}".format(
454 filename, '/'.join([bucket, key]), e
455 )
456 )
458 def download_file(
459 self, bucket, key, filename, extra_args=None, callback=None
460 ):
461 """Download an S3 object to a file.
463 Variants have also been injected into S3 client, Bucket and Object.
464 You don't have to use S3Transfer.download_file() directly.
466 .. seealso::
467 :py:meth:`S3.Client.download_file`
468 :py:meth:`S3.Client.download_fileobj`
469 """
470 if isinstance(filename, PathLike):
471 filename = fspath(filename)
472 if not isinstance(filename, str):
473 raise ValueError('Filename must be a string or a path-like object')
475 subscribers = self._get_subscribers(callback)
476 future = self._manager.download(
477 bucket, key, filename, extra_args, subscribers
478 )
479 try:
480 future.result()
481 # This is for backwards compatibility where when retries are
482 # exceeded we need to throw the same error from boto3 instead of
483 # s3transfer's built in RetriesExceededError as current users are
484 # catching the boto3 one instead of the s3transfer exception to do
485 # their own retries.
486 except S3TransferRetriesExceededError as e:
487 raise RetriesExceededError(e.last_exception)
489 def _get_subscribers(self, callback):
490 if not callback:
491 return None
492 return [ProgressCallbackInvoker(callback)]
494 def __enter__(self):
495 return self
497 def __exit__(self, *args):
498 self._manager.__exit__(*args)
501class ProgressCallbackInvoker(BaseSubscriber):
502 """A back-compat wrapper to invoke a provided callback via a subscriber
504 :param callback: A callable that takes a single positional argument for
505 how many bytes were transferred.
506 """
508 def __init__(self, callback):
509 self._callback = callback
511 def on_progress(self, bytes_transferred, **kwargs):
512 self._callback(bytes_transferred)