Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# https://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Progress callbacks to monitor transfers
22* Retries. While botocore handles retries for streaming uploads,
23 it is not possible for it to handle retries for streaming
24 downloads. This module handles retries for both cases so
25 you don't need to implement any retry logic yourself.
27This module has a reasonable set of defaults. It also allows you
28to configure many aspects of the transfer process including:
30* Multipart threshold size
31* Max parallel downloads
32* Socket timeouts
33* Retry amounts
35There is no support for s3->s3 multipart copies at this
36time.
39.. _ref_s3transfer_usage:
41Usage
42=====
44The simplest way to use this module is:
46.. code-block:: python
48 client = boto3.client('s3', 'us-west-2')
49 transfer = S3Transfer(client)
50 # Upload /tmp/myfile to s3://bucket/key
51 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
53 # Download s3://bucket/key to /tmp/myfile
54 transfer.download_file('bucket', 'key', '/tmp/myfile')
56The ``upload_file`` and ``download_file`` methods also accept
57``**kwargs``, which will be forwarded through to the corresponding
58client operation. Here are a few examples using ``upload_file``::
60 # Making the object public
61 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
62 extra_args={'ACL': 'public-read'})
64 # Setting metadata
65 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
68 # Setting content type
69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
70 extra_args={'ContentType': "application/json"})
73The ``S3Transfer`` class also supports progress callbacks so you can
74provide transfer progress to users. Both the ``upload_file`` and
75``download_file`` methods take an optional ``callback`` parameter.
76Here's an example of how to print a simple progress percentage
77to the user:
79.. code-block:: python
81 class ProgressPercentage(object):
82 def __init__(self, filename):
83 self._filename = filename
84 self._size = float(os.path.getsize(filename))
85 self._seen_so_far = 0
86 self._lock = threading.Lock()
88 def __call__(self, bytes_amount):
89 # To simplify we'll assume this is hooked up
90 # to a single filename.
91 with self._lock:
92 self._seen_so_far += bytes_amount
93 percentage = (self._seen_so_far / self._size) * 100
94 sys.stdout.write(
95 "\r%s %s / %s (%.2f%%)" % (
96 self._filename, self._seen_so_far, self._size,
97 percentage))
98 sys.stdout.flush()
101 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
102 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
103 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
104 callback=ProgressPercentage('/tmp/myfile'))
108You can also provide a TransferConfig object to the S3Transfer
109object that gives you more fine grained control over the
110transfer. For example:
112.. code-block:: python
114 client = boto3.client('s3', 'us-west-2')
115 config = TransferConfig(
116 multipart_threshold=8 * 1024 * 1024,
117 max_concurrency=10,
118 num_download_attempts=10,
119 )
120 transfer = S3Transfer(client, config)
121 transfer.upload_file('/tmp/foo', 'bucket', 'key')
124"""
126import logging
127import threading
128from os import PathLike, fspath, getpid
130from botocore.compat import HAS_CRT
131from botocore.exceptions import ClientError, MissingDependencyException
132from s3transfer.exceptions import (
133 RetriesExceededError as S3TransferRetriesExceededError,
134)
135from s3transfer.futures import NonThreadedExecutor
136from s3transfer.manager import TransferConfig as S3TransferConfig
137from s3transfer.manager import TransferManager
138from s3transfer.subscribers import BaseSubscriber
139from s3transfer.utils import OSUtils
141import boto3.s3.constants as constants
142from boto3.compat import TRANSFER_CONFIG_SUPPORTS_CRT
143from boto3.exceptions import (
144 RetriesExceededError,
145 S3UploadFailedError,
146)
148if HAS_CRT:
149 import awscrt.s3
151 from boto3.crt import create_crt_transfer_manager
153KB = 1024
154MB = KB * KB
156logger = logging.getLogger(__name__)
159def create_transfer_manager(client, config, osutil=None):
160 """Creates a transfer manager based on configuration
162 :type client: boto3.client
163 :param client: The S3 client to use
165 :type config: boto3.s3.transfer.TransferConfig
166 :param config: The transfer config to use
168 :type osutil: s3transfer.utils.OSUtils
169 :param osutil: The os utility to use
171 :rtype: s3transfer.manager.TransferManager
172 :returns: A transfer manager based on parameters provided
173 """
174 if _should_use_crt(config):
175 crt_transfer_manager = create_crt_transfer_manager(client, config)
176 if crt_transfer_manager is not None:
177 logger.debug(
178 "Using CRT client. pid: %s, thread: %s",
179 getpid(),
180 threading.get_ident(),
181 )
182 return crt_transfer_manager
184 # If we don't resolve something above, fallback to the default.
185 logger.debug(
186 "Using default client. pid: %s, thread: %s",
187 getpid(),
188 threading.get_ident(),
189 )
190 return _create_default_transfer_manager(client, config, osutil)
193def _should_use_crt(config):
194 # This feature requires awscrt>=0.19.18
195 has_min_crt = HAS_CRT and has_minimum_crt_version((0, 19, 18))
196 is_optimized_instance = has_min_crt and awscrt.s3.is_optimized_for_system()
197 pref_transfer_client = config.preferred_transfer_client.lower()
199 if (
200 pref_transfer_client == constants.CRT_TRANSFER_CLIENT
201 and not has_min_crt
202 ):
203 msg = (
204 "CRT transfer client is configured but is missing minimum CRT "
205 f"version. CRT installed: {HAS_CRT}"
206 )
207 if HAS_CRT:
208 msg += f", with version: {awscrt.__version__}"
209 raise MissingDependencyException(msg=msg)
211 if (
212 is_optimized_instance
213 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
214 ) or pref_transfer_client == constants.CRT_TRANSFER_CLIENT:
215 logger.debug(
216 "Attempting to use CRTTransferManager. Config settings may be ignored."
217 )
218 return True
220 logger.debug(
221 "Opting out of CRT Transfer Manager. "
222 "Preferred client: %s, CRT available: %s, Instance Optimized: %s",
223 pref_transfer_client,
224 HAS_CRT,
225 is_optimized_instance,
226 )
227 return False
230def has_minimum_crt_version(minimum_version):
231 """Not intended for use outside boto3."""
232 if not HAS_CRT:
233 return False
235 crt_version_str = awscrt.__version__
236 try:
237 crt_version_ints = map(int, crt_version_str.split("."))
238 crt_version_tuple = tuple(crt_version_ints)
239 except (TypeError, ValueError):
240 return False
242 return crt_version_tuple >= minimum_version
245def _create_default_transfer_manager(client, config, osutil):
246 """Create the default TransferManager implementation for s3transfer."""
247 executor_cls = None
248 if not config.use_threads:
249 executor_cls = NonThreadedExecutor
250 return TransferManager(client, config, osutil, executor_cls)
253class TransferConfig(S3TransferConfig):
254 ALIAS = {
255 'max_concurrency': 'max_request_concurrency',
256 'max_io_queue': 'max_io_queue_size',
257 }
258 DEFAULTS = {
259 'multipart_threshold': 8 * MB,
260 'max_concurrency': 10,
261 'max_request_concurrency': 10,
262 'multipart_chunksize': 8 * MB,
263 'num_download_attempts': 5,
264 'max_io_queue': 100,
265 'max_io_queue_size': 100,
266 'io_chunksize': 256 * KB,
267 'use_threads': True,
268 'max_bandwidth': None,
269 'preferred_transfer_client': constants.AUTO_RESOLVE_TRANSFER_CLIENT,
270 }
272 def __init__(
273 self,
274 multipart_threshold=None,
275 max_concurrency=None,
276 multipart_chunksize=None,
277 num_download_attempts=None,
278 max_io_queue=None,
279 io_chunksize=None,
280 use_threads=None,
281 max_bandwidth=None,
282 preferred_transfer_client=None,
283 ):
284 """Configuration object for managed S3 transfers
286 :param multipart_threshold: The transfer size threshold for which
287 multipart uploads, downloads, and copies will automatically be
288 triggered.
290 :param max_concurrency: The maximum number of threads that will be
291 making requests to perform a transfer. If ``use_threads`` is
292 set to ``False``, the value provided is ignored as the transfer
293 will only ever use the current thread.
295 :param multipart_chunksize: The partition size of each part for a
296 multipart transfer.
298 :param num_download_attempts: The number of download attempts that
299 will be retried upon errors with downloading an object in S3.
300 Note that these retries account for errors that occur when
301 streaming down the data from s3 (i.e. socket errors and read
302 timeouts that occur after receiving an OK response from s3).
303 Other retryable exceptions such as throttling errors and 5xx
304 errors are already retried by botocore (this default is 5). This
305 does not take into account the number of exceptions retried by
306 botocore. Note: This value is ignored when resolved transfer
307 manager type is CRTTransferManager.
309 :param max_io_queue: The maximum amount of read parts that can be
310 queued in memory to be written for a download. The size of each
311 of these read parts is at most the size of ``io_chunksize``.
312 Note: This value is ignored when resolved transfer manager type
313 is CRTTransferManager.
315 :param io_chunksize: The max size of each chunk in the io queue.
316 Currently, this is size used when ``read`` is called on the
317 downloaded stream as well. Note: This value is ignored when
318 resolved transfer manager type is CRTTransferManager.
320 :param use_threads: If True, threads will be used when performing
321 S3 transfers. If False, no threads will be used in
322 performing transfers; all logic will be run in the current thread.
323 Note: This value is ignored when resolved transfer manager type is
324 CRTTransferManager.
326 :param max_bandwidth: The maximum bandwidth that will be consumed
327 in uploading and downloading file content. The value is an integer
328 in terms of bytes per second. Note: This value is ignored when
329 resolved transfer manager type is CRTTransferManager.
331 :param preferred_transfer_client: String specifying preferred transfer
332 client for transfer operations.
334 Current supported settings are:
335 * auto (default) - Use the CRTTransferManager when calls
336 are made with supported environment and settings.
337 * classic - Only use the origin S3TransferManager with
338 requests. Disables possible CRT upgrade on requests.
339 * crt - Only use the CRTTransferManager with requests.
340 """
341 init_args = {
342 'multipart_threshold': multipart_threshold,
343 'max_concurrency': max_concurrency,
344 'multipart_chunksize': multipart_chunksize,
345 'num_download_attempts': num_download_attempts,
346 'max_io_queue': max_io_queue,
347 'io_chunksize': io_chunksize,
348 'use_threads': use_threads,
349 'max_bandwidth': max_bandwidth,
350 'preferred_transfer_client': preferred_transfer_client,
351 }
352 resolved = self._resolve_init_args(init_args)
353 super().__init__(
354 multipart_threshold=resolved['multipart_threshold'],
355 max_request_concurrency=resolved['max_concurrency'],
356 multipart_chunksize=resolved['multipart_chunksize'],
357 num_download_attempts=resolved['num_download_attempts'],
358 max_io_queue_size=resolved['max_io_queue'],
359 io_chunksize=resolved['io_chunksize'],
360 max_bandwidth=resolved['max_bandwidth'],
361 )
362 # Some of the argument names are not the same as the inherited
363 # S3TransferConfig so we add aliases so you can still access the
364 # old version of the names.
365 for alias in self.ALIAS:
366 setattr(
367 self,
368 alias,
369 object.__getattribute__(self, self.ALIAS[alias]),
370 )
371 self.use_threads = resolved['use_threads']
372 self.preferred_transfer_client = resolved['preferred_transfer_client']
374 def __setattr__(self, name, value):
375 # If the alias name is used, make sure we set the name that it points
376 # to as that is what actually is used in governing the TransferManager.
377 if name in self.ALIAS:
378 super().__setattr__(self.ALIAS[name], value)
379 # Always set the value of the actual name provided.
380 super().__setattr__(name, value)
382 def __getattribute__(self, item):
383 value = object.__getattribute__(self, item)
384 if not TRANSFER_CONFIG_SUPPORTS_CRT:
385 return value
386 defaults = object.__getattribute__(self, 'DEFAULTS')
387 if item not in defaults:
388 return value
389 if value is self.UNSET_DEFAULT:
390 return defaults[item]
391 return value
393 def _resolve_init_args(self, init_args):
394 resolved = {}
395 for init_arg, val in init_args.items():
396 if val is not None:
397 resolved[init_arg] = val
398 elif TRANSFER_CONFIG_SUPPORTS_CRT:
399 resolved[init_arg] = self.UNSET_DEFAULT
400 else:
401 resolved[init_arg] = self.DEFAULTS[init_arg]
402 return resolved
405class S3Transfer:
406 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
407 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
408 ALLOWED_COPY_ARGS = TransferManager.ALLOWED_COPY_ARGS
410 def __init__(self, client=None, config=None, osutil=None, manager=None):
411 if not client and not manager:
412 raise ValueError(
413 'Either a boto3.Client or s3transfer.manager.TransferManager '
414 'must be provided'
415 )
416 if manager and any([client, config, osutil]):
417 raise ValueError(
418 'Manager cannot be provided with client, config, '
419 'nor osutil. These parameters are mutually exclusive.'
420 )
421 if config is None:
422 config = TransferConfig()
423 if osutil is None:
424 osutil = OSUtils()
425 if manager:
426 self._manager = manager
427 else:
428 self._manager = create_transfer_manager(client, config, osutil)
430 def upload_file(
431 self, filename, bucket, key, callback=None, extra_args=None
432 ):
433 """Upload a file to an S3 object.
435 Variants have also been injected into S3 client, Bucket and Object.
436 You don't have to use S3Transfer.upload_file() directly.
438 .. seealso::
439 :py:meth:`S3.Client.upload_file`
440 :py:meth:`S3.Client.upload_fileobj`
441 """
442 if isinstance(filename, PathLike):
443 filename = fspath(filename)
444 if not isinstance(filename, str):
445 raise ValueError('Filename must be a string or a path-like object')
447 subscribers = self._get_subscribers(callback)
448 future = self._manager.upload(
449 filename, bucket, key, extra_args, subscribers
450 )
451 try:
452 future.result()
453 # If a client error was raised, add the backwards compatibility layer
454 # that raises a S3UploadFailedError. These specific errors were only
455 # ever thrown for upload_parts but now can be thrown for any related
456 # client error.
457 except ClientError as e:
458 raise S3UploadFailedError(
459 f"Failed to upload {filename} to {bucket}/{key}: {e}"
460 )
462 def download_file(
463 self, bucket, key, filename, extra_args=None, callback=None
464 ):
465 """Download an S3 object to a file.
467 Variants have also been injected into S3 client, Bucket and Object.
468 You don't have to use S3Transfer.download_file() directly.
470 .. seealso::
471 :py:meth:`S3.Client.download_file`
472 :py:meth:`S3.Client.download_fileobj`
473 """
474 if isinstance(filename, PathLike):
475 filename = fspath(filename)
476 if not isinstance(filename, str):
477 raise ValueError('Filename must be a string or a path-like object')
479 subscribers = self._get_subscribers(callback)
480 future = self._manager.download(
481 bucket, key, filename, extra_args, subscribers
482 )
483 try:
484 future.result()
485 # This is for backwards compatibility where when retries are
486 # exceeded we need to throw the same error from boto3 instead of
487 # s3transfer's built in RetriesExceededError as current users are
488 # catching the boto3 one instead of the s3transfer exception to do
489 # their own retries.
490 except S3TransferRetriesExceededError as e:
491 raise RetriesExceededError(e.last_exception)
493 def _get_subscribers(self, callback):
494 if not callback:
495 return None
496 return [ProgressCallbackInvoker(callback)]
498 def __enter__(self):
499 return self
501 def __exit__(self, *args):
502 self._manager.__exit__(*args)
505class ProgressCallbackInvoker(BaseSubscriber):
506 """A back-compat wrapper to invoke a provided callback via a subscriber
508 :param callback: A callable that takes a single positional argument for
509 how many bytes were transferred.
510 """
512 def __init__(self, callback):
513 self._callback = callback
515 def on_progress(self, bytes_transferred, **kwargs):
516 self._callback(bytes_transferred)