1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# https://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""Abstractions over S3's upload/download operations.
14
15This module provides high level abstractions for efficient
16uploads/downloads. It handles several things for the user:
17
18* Automatically switching to multipart transfers when
19 a file is over a specific size threshold
20* Uploading/downloading a file in parallel
21* Progress callbacks to monitor transfers
22* Retries. While botocore handles retries for streaming uploads,
23 it is not possible for it to handle retries for streaming
24 downloads. This module handles retries for both cases so
25 you don't need to implement any retry logic yourself.
26
27This module has a reasonable set of defaults. It also allows you
28to configure many aspects of the transfer process including:
29
30* Multipart threshold size
31* Max parallel downloads
32* Socket timeouts
33* Retry amounts
34
35There is no support for s3->s3 multipart copies at this
36time.
37
38
39.. _ref_s3transfer_usage:
40
41Usage
42=====
43
44The simplest way to use this module is:
45
46.. code-block:: python
47
48 client = boto3.client('s3', 'us-west-2')
49 transfer = S3Transfer(client)
50 # Upload /tmp/myfile to s3://bucket/key
51 transfer.upload_file('/tmp/myfile', 'bucket', 'key')
52
53 # Download s3://bucket/key to /tmp/myfile
54 transfer.download_file('bucket', 'key', '/tmp/myfile')
55
56The ``upload_file`` and ``download_file`` methods also accept
57``**kwargs``, which will be forwarded through to the corresponding
58client operation. Here are a few examples using ``upload_file``::
59
60 # Making the object public
61 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
62 extra_args={'ACL': 'public-read'})
63
64 # Setting metadata
65 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
67
68 # Setting content type
69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
70 extra_args={'ContentType': "application/json"})
71
72
73The ``S3Transfer`` class also supports progress callbacks so you can
74provide transfer progress to users. Both the ``upload_file`` and
75``download_file`` methods take an optional ``callback`` parameter.
76Here's an example of how to print a simple progress percentage
77to the user:
78
79.. code-block:: python
80
81 class ProgressPercentage(object):
82 def __init__(self, filename):
83 self._filename = filename
84 self._size = float(os.path.getsize(filename))
85 self._seen_so_far = 0
86 self._lock = threading.Lock()
87
88 def __call__(self, bytes_amount):
89 # To simplify we'll assume this is hooked up
90 # to a single filename.
91 with self._lock:
92 self._seen_so_far += bytes_amount
93 percentage = (self._seen_so_far / self._size) * 100
94 sys.stdout.write(
95 "\r%s %s / %s (%.2f%%)" % (
96 self._filename, self._seen_so_far, self._size,
97 percentage))
98 sys.stdout.flush()
99
100
101 transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
102 # Upload /tmp/myfile to s3://bucket/key and print upload progress.
103 transfer.upload_file('/tmp/myfile', 'bucket', 'key',
104 callback=ProgressPercentage('/tmp/myfile'))
105
106
107
108You can also provide a TransferConfig object to the S3Transfer
109object that gives you more fine grained control over the
110transfer. For example:
111
112.. code-block:: python
113
114 client = boto3.client('s3', 'us-west-2')
115 config = TransferConfig(
116 multipart_threshold=8 * 1024 * 1024,
117 max_concurrency=10,
118 num_download_attempts=10,
119 )
120 transfer = S3Transfer(client, config)
121 transfer.upload_file('/tmp/foo', 'bucket', 'key')
122
123
124"""
125
126import logging
127import threading
128from os import PathLike, fspath, getpid
129
130from botocore.compat import HAS_CRT
131from botocore.exceptions import ClientError
132from s3transfer.exceptions import (
133 RetriesExceededError as S3TransferRetriesExceededError,
134)
135from s3transfer.futures import NonThreadedExecutor
136from s3transfer.manager import TransferConfig as S3TransferConfig
137from s3transfer.manager import TransferManager
138from s3transfer.subscribers import BaseSubscriber
139from s3transfer.utils import OSUtils
140
141import boto3.s3.constants as constants
142from boto3.exceptions import RetriesExceededError, S3UploadFailedError
143
144if HAS_CRT:
145 import awscrt.s3
146
147 from boto3.crt import create_crt_transfer_manager
148
149KB = 1024
150MB = KB * KB
151
152logger = logging.getLogger(__name__)
153
154
155def create_transfer_manager(client, config, osutil=None):
156 """Creates a transfer manager based on configuration
157
158 :type client: boto3.client
159 :param client: The S3 client to use
160
161 :type config: boto3.s3.transfer.TransferConfig
162 :param config: The transfer config to use
163
164 :type osutil: s3transfer.utils.OSUtils
165 :param osutil: The os utility to use
166
167 :rtype: s3transfer.manager.TransferManager
168 :returns: A transfer manager based on parameters provided
169 """
170 if _should_use_crt(config):
171 crt_transfer_manager = create_crt_transfer_manager(client, config)
172 if crt_transfer_manager is not None:
173 logger.debug(
174 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}"
175 )
176 return crt_transfer_manager
177
178 # If we don't resolve something above, fallback to the default.
179 logger.debug(
180 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}"
181 )
182 return _create_default_transfer_manager(client, config, osutil)
183
184
185def _should_use_crt(config):
186 # This feature requires awscrt>=0.19.18
187 if HAS_CRT and has_minimum_crt_version((0, 19, 18)):
188 is_optimized_instance = awscrt.s3.is_optimized_for_system()
189 else:
190 is_optimized_instance = False
191 pref_transfer_client = config.preferred_transfer_client.lower()
192
193 if (
194 is_optimized_instance
195 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
196 ):
197 logger.debug(
198 "Attempting to use CRTTransferManager. Config settings may be ignored."
199 )
200 return True
201
202 logger.debug(
203 "Opting out of CRT Transfer Manager. Preferred client: "
204 f"{pref_transfer_client}, CRT available: {HAS_CRT}, "
205 f"Instance Optimized: {is_optimized_instance}."
206 )
207 return False
208
209
210def has_minimum_crt_version(minimum_version):
211 """Not intended for use outside boto3."""
212 if not HAS_CRT:
213 return False
214
215 crt_version_str = awscrt.__version__
216 try:
217 crt_version_ints = map(int, crt_version_str.split("."))
218 crt_version_tuple = tuple(crt_version_ints)
219 except (TypeError, ValueError):
220 return False
221
222 return crt_version_tuple >= minimum_version
223
224
225def _create_default_transfer_manager(client, config, osutil):
226 """Create the default TransferManager implementation for s3transfer."""
227 executor_cls = None
228 if not config.use_threads:
229 executor_cls = NonThreadedExecutor
230 return TransferManager(client, config, osutil, executor_cls)
231
232
233class TransferConfig(S3TransferConfig):
234 ALIAS = {
235 'max_concurrency': 'max_request_concurrency',
236 'max_io_queue': 'max_io_queue_size',
237 }
238
239 def __init__(
240 self,
241 multipart_threshold=8 * MB,
242 max_concurrency=10,
243 multipart_chunksize=8 * MB,
244 num_download_attempts=5,
245 max_io_queue=100,
246 io_chunksize=256 * KB,
247 use_threads=True,
248 max_bandwidth=None,
249 preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT,
250 ):
251 """Configuration object for managed S3 transfers
252
253 :param multipart_threshold: The transfer size threshold for which
254 multipart uploads, downloads, and copies will automatically be
255 triggered.
256
257 :param max_concurrency: The maximum number of threads that will be
258 making requests to perform a transfer. If ``use_threads`` is
259 set to ``False``, the value provided is ignored as the transfer
260 will only ever use the current thread.
261
262 :param multipart_chunksize: The partition size of each part for a
263 multipart transfer.
264
265 :param num_download_attempts: The number of download attempts that
266 will be retried upon errors with downloading an object in S3.
267 Note that these retries account for errors that occur when
268 streaming down the data from s3 (i.e. socket errors and read
269 timeouts that occur after receiving an OK response from s3).
270 Other retryable exceptions such as throttling errors and 5xx
271 errors are already retried by botocore (this default is 5). This
272 does not take into account the number of exceptions retried by
273 botocore.
274
275 :param max_io_queue: The maximum amount of read parts that can be
276 queued in memory to be written for a download. The size of each
277 of these read parts is at most the size of ``io_chunksize``.
278
279 :param io_chunksize: The max size of each chunk in the io queue.
280 Currently, this is size used when ``read`` is called on the
281 downloaded stream as well.
282
283 :param use_threads: If True, threads will be used when performing
284 S3 transfers. If False, no threads will be used in
285 performing transfers; all logic will be run in the current thread.
286
287 :param max_bandwidth: The maximum bandwidth that will be consumed
288 in uploading and downloading file content. The value is an integer
289 in terms of bytes per second.
290
291 :param preferred_transfer_client: String specifying preferred transfer
292 client for transfer operations.
293
294 Current supported settings are:
295 * auto (default) - Use the CRTTransferManager when calls
296 are made with supported environment and settings.
297 * classic - Only use the origin S3TransferManager with
298 requests. Disables possible CRT upgrade on requests.
299 """
300 super().__init__(
301 multipart_threshold=multipart_threshold,
302 max_request_concurrency=max_concurrency,
303 multipart_chunksize=multipart_chunksize,
304 num_download_attempts=num_download_attempts,
305 max_io_queue_size=max_io_queue,
306 io_chunksize=io_chunksize,
307 max_bandwidth=max_bandwidth,
308 )
309 # Some of the argument names are not the same as the inherited
310 # S3TransferConfig so we add aliases so you can still access the
311 # old version of the names.
312 for alias in self.ALIAS:
313 setattr(self, alias, getattr(self, self.ALIAS[alias]))
314 self.use_threads = use_threads
315 self.preferred_transfer_client = preferred_transfer_client
316
317 def __setattr__(self, name, value):
318 # If the alias name is used, make sure we set the name that it points
319 # to as that is what actually is used in governing the TransferManager.
320 if name in self.ALIAS:
321 super().__setattr__(self.ALIAS[name], value)
322 # Always set the value of the actual name provided.
323 super().__setattr__(name, value)
324
325
326class S3Transfer:
327 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS
328 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS
329
330 def __init__(self, client=None, config=None, osutil=None, manager=None):
331 if not client and not manager:
332 raise ValueError(
333 'Either a boto3.Client or s3transfer.manager.TransferManager '
334 'must be provided'
335 )
336 if manager and any([client, config, osutil]):
337 raise ValueError(
338 'Manager cannot be provided with client, config, '
339 'nor osutil. These parameters are mutually exclusive.'
340 )
341 if config is None:
342 config = TransferConfig()
343 if osutil is None:
344 osutil = OSUtils()
345 if manager:
346 self._manager = manager
347 else:
348 self._manager = create_transfer_manager(client, config, osutil)
349
350 def upload_file(
351 self, filename, bucket, key, callback=None, extra_args=None
352 ):
353 """Upload a file to an S3 object.
354
355 Variants have also been injected into S3 client, Bucket and Object.
356 You don't have to use S3Transfer.upload_file() directly.
357
358 .. seealso::
359 :py:meth:`S3.Client.upload_file`
360 :py:meth:`S3.Client.upload_fileobj`
361 """
362 if isinstance(filename, PathLike):
363 filename = fspath(filename)
364 if not isinstance(filename, str):
365 raise ValueError('Filename must be a string or a path-like object')
366
367 subscribers = self._get_subscribers(callback)
368 future = self._manager.upload(
369 filename, bucket, key, extra_args, subscribers
370 )
371 try:
372 future.result()
373 # If a client error was raised, add the backwards compatibility layer
374 # that raises a S3UploadFailedError. These specific errors were only
375 # ever thrown for upload_parts but now can be thrown for any related
376 # client error.
377 except ClientError as e:
378 raise S3UploadFailedError(
379 "Failed to upload {} to {}: {}".format(
380 filename, '/'.join([bucket, key]), e
381 )
382 )
383
384 def download_file(
385 self, bucket, key, filename, extra_args=None, callback=None
386 ):
387 """Download an S3 object to a file.
388
389 Variants have also been injected into S3 client, Bucket and Object.
390 You don't have to use S3Transfer.download_file() directly.
391
392 .. seealso::
393 :py:meth:`S3.Client.download_file`
394 :py:meth:`S3.Client.download_fileobj`
395 """
396 if isinstance(filename, PathLike):
397 filename = fspath(filename)
398 if not isinstance(filename, str):
399 raise ValueError('Filename must be a string or a path-like object')
400
401 subscribers = self._get_subscribers(callback)
402 future = self._manager.download(
403 bucket, key, filename, extra_args, subscribers
404 )
405 try:
406 future.result()
407 # This is for backwards compatibility where when retries are
408 # exceeded we need to throw the same error from boto3 instead of
409 # s3transfer's built in RetriesExceededError as current users are
410 # catching the boto3 one instead of the s3transfer exception to do
411 # their own retries.
412 except S3TransferRetriesExceededError as e:
413 raise RetriesExceededError(e.last_exception)
414
415 def _get_subscribers(self, callback):
416 if not callback:
417 return None
418 return [ProgressCallbackInvoker(callback)]
419
420 def __enter__(self):
421 return self
422
423 def __exit__(self, *args):
424 self._manager.__exit__(*args)
425
426
427class ProgressCallbackInvoker(BaseSubscriber):
428 """A back-compat wrapper to invoke a provided callback via a subscriber
429
430 :param callback: A callable that takes a single positional argument for
431 how many bytes were transferred.
432 """
433
434 def __init__(self, callback):
435 self._callback = callback
436
437 def on_progress(self, bytes_transferred, **kwargs):
438 self._callback(bytes_transferred)