Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/boto3/s3/transfer.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

115 statements  

1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# https://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Progress callbacks to monitor transfers 

22* Retries. While botocore handles retries for streaming uploads, 

23 it is not possible for it to handle retries for streaming 

24 downloads. This module handles retries for both cases so 

25 you don't need to implement any retry logic yourself. 

26 

27This module has a reasonable set of defaults. It also allows you 

28to configure many aspects of the transfer process including: 

29 

30* Multipart threshold size 

31* Max parallel downloads 

32* Socket timeouts 

33* Retry amounts 

34 

35There is no support for s3->s3 multipart copies at this 

36time. 

37 

38 

39.. _ref_s3transfer_usage: 

40 

41Usage 

42===== 

43 

44The simplest way to use this module is: 

45 

46.. code-block:: python 

47 

48 client = boto3.client('s3', 'us-west-2') 

49 transfer = S3Transfer(client) 

50 # Upload /tmp/myfile to s3://bucket/key 

51 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

52 

53 # Download s3://bucket/key to /tmp/myfile 

54 transfer.download_file('bucket', 'key', '/tmp/myfile') 

55 

56The ``upload_file`` and ``download_file`` methods also accept 

57``**kwargs``, which will be forwarded through to the corresponding 

58client operation. Here are a few examples using ``upload_file``:: 

59 

60 # Making the object public 

61 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

62 extra_args={'ACL': 'public-read'}) 

63 

64 # Setting metadata 

65 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

67 

68 # Setting content type 

69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

70 extra_args={'ContentType': "application/json"}) 

71 

72 

73The ``S3Transfer`` class also supports progress callbacks so you can 

74provide transfer progress to users. Both the ``upload_file`` and 

75``download_file`` methods take an optional ``callback`` parameter. 

76Here's an example of how to print a simple progress percentage 

77to the user: 

78 

79.. code-block:: python 

80 

81 class ProgressPercentage(object): 

82 def __init__(self, filename): 

83 self._filename = filename 

84 self._size = float(os.path.getsize(filename)) 

85 self._seen_so_far = 0 

86 self._lock = threading.Lock() 

87 

88 def __call__(self, bytes_amount): 

89 # To simplify we'll assume this is hooked up 

90 # to a single filename. 

91 with self._lock: 

92 self._seen_so_far += bytes_amount 

93 percentage = (self._seen_so_far / self._size) * 100 

94 sys.stdout.write( 

95 "\r%s %s / %s (%.2f%%)" % ( 

96 self._filename, self._seen_so_far, self._size, 

97 percentage)) 

98 sys.stdout.flush() 

99 

100 

101 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

102 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

103 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

104 callback=ProgressPercentage('/tmp/myfile')) 

105 

106 

107 

108You can also provide a TransferConfig object to the S3Transfer 

109object that gives you more fine grained control over the 

110transfer. For example: 

111 

112.. code-block:: python 

113 

114 client = boto3.client('s3', 'us-west-2') 

115 config = TransferConfig( 

116 multipart_threshold=8 * 1024 * 1024, 

117 max_concurrency=10, 

118 num_download_attempts=10, 

119 ) 

120 transfer = S3Transfer(client, config) 

121 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

122 

123 

124""" 

125 

126import logging 

127import threading 

128from os import PathLike, fspath, getpid 

129 

130from botocore.compat import HAS_CRT 

131from botocore.exceptions import ClientError 

132from s3transfer.exceptions import ( 

133 RetriesExceededError as S3TransferRetriesExceededError, 

134) 

135from s3transfer.futures import NonThreadedExecutor 

136from s3transfer.manager import TransferConfig as S3TransferConfig 

137from s3transfer.manager import TransferManager 

138from s3transfer.subscribers import BaseSubscriber 

139from s3transfer.utils import OSUtils 

140 

141import boto3.s3.constants as constants 

142from boto3.exceptions import RetriesExceededError, S3UploadFailedError 

143 

144if HAS_CRT: 

145 import awscrt.s3 

146 

147 from boto3.crt import create_crt_transfer_manager 

148 

149KB = 1024 

150MB = KB * KB 

151 

152logger = logging.getLogger(__name__) 

153 

154 

155def create_transfer_manager(client, config, osutil=None): 

156 """Creates a transfer manager based on configuration 

157 

158 :type client: boto3.client 

159 :param client: The S3 client to use 

160 

161 :type config: boto3.s3.transfer.TransferConfig 

162 :param config: The transfer config to use 

163 

164 :type osutil: s3transfer.utils.OSUtils 

165 :param osutil: The os utility to use 

166 

167 :rtype: s3transfer.manager.TransferManager 

168 :returns: A transfer manager based on parameters provided 

169 """ 

170 if _should_use_crt(config): 

171 crt_transfer_manager = create_crt_transfer_manager(client, config) 

172 if crt_transfer_manager is not None: 

173 logger.debug( 

174 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}" 

175 ) 

176 return crt_transfer_manager 

177 

178 # If we don't resolve something above, fallback to the default. 

179 logger.debug( 

180 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}" 

181 ) 

182 return _create_default_transfer_manager(client, config, osutil) 

183 

184 

185def _should_use_crt(config): 

186 # This feature requires awscrt>=0.19.18 

187 if HAS_CRT and has_minimum_crt_version((0, 19, 18)): 

188 is_optimized_instance = awscrt.s3.is_optimized_for_system() 

189 else: 

190 is_optimized_instance = False 

191 pref_transfer_client = config.preferred_transfer_client.lower() 

192 

193 if ( 

194 is_optimized_instance 

195 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT 

196 ): 

197 logger.debug( 

198 "Attempting to use CRTTransferManager. Config settings may be ignored." 

199 ) 

200 return True 

201 

202 logger.debug( 

203 "Opting out of CRT Transfer Manager. Preferred client: " 

204 f"{pref_transfer_client}, CRT available: {HAS_CRT}, " 

205 f"Instance Optimized: {is_optimized_instance}." 

206 ) 

207 return False 

208 

209 

210def has_minimum_crt_version(minimum_version): 

211 """Not intended for use outside boto3.""" 

212 if not HAS_CRT: 

213 return False 

214 

215 crt_version_str = awscrt.__version__ 

216 try: 

217 crt_version_ints = map(int, crt_version_str.split(".")) 

218 crt_version_tuple = tuple(crt_version_ints) 

219 except (TypeError, ValueError): 

220 return False 

221 

222 return crt_version_tuple >= minimum_version 

223 

224 

225def _create_default_transfer_manager(client, config, osutil): 

226 """Create the default TransferManager implementation for s3transfer.""" 

227 executor_cls = None 

228 if not config.use_threads: 

229 executor_cls = NonThreadedExecutor 

230 return TransferManager(client, config, osutil, executor_cls) 

231 

232 

233class TransferConfig(S3TransferConfig): 

234 ALIAS = { 

235 'max_concurrency': 'max_request_concurrency', 

236 'max_io_queue': 'max_io_queue_size', 

237 } 

238 

239 def __init__( 

240 self, 

241 multipart_threshold=8 * MB, 

242 max_concurrency=10, 

243 multipart_chunksize=8 * MB, 

244 num_download_attempts=5, 

245 max_io_queue=100, 

246 io_chunksize=256 * KB, 

247 use_threads=True, 

248 max_bandwidth=None, 

249 preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT, 

250 ): 

251 """Configuration object for managed S3 transfers 

252 

253 :param multipart_threshold: The transfer size threshold for which 

254 multipart uploads, downloads, and copies will automatically be 

255 triggered. 

256 

257 :param max_concurrency: The maximum number of threads that will be 

258 making requests to perform a transfer. If ``use_threads`` is 

259 set to ``False``, the value provided is ignored as the transfer 

260 will only ever use the current thread. 

261 

262 :param multipart_chunksize: The partition size of each part for a 

263 multipart transfer. 

264 

265 :param num_download_attempts: The number of download attempts that 

266 will be retried upon errors with downloading an object in S3. 

267 Note that these retries account for errors that occur when 

268 streaming down the data from s3 (i.e. socket errors and read 

269 timeouts that occur after receiving an OK response from s3). 

270 Other retryable exceptions such as throttling errors and 5xx 

271 errors are already retried by botocore (this default is 5). This 

272 does not take into account the number of exceptions retried by 

273 botocore. 

274 

275 :param max_io_queue: The maximum amount of read parts that can be 

276 queued in memory to be written for a download. The size of each 

277 of these read parts is at most the size of ``io_chunksize``. 

278 

279 :param io_chunksize: The max size of each chunk in the io queue. 

280 Currently, this is size used when ``read`` is called on the 

281 downloaded stream as well. 

282 

283 :param use_threads: If True, threads will be used when performing 

284 S3 transfers. If False, no threads will be used in 

285 performing transfers; all logic will be run in the current thread. 

286 

287 :param max_bandwidth: The maximum bandwidth that will be consumed 

288 in uploading and downloading file content. The value is an integer 

289 in terms of bytes per second. 

290 

291 :param preferred_transfer_client: String specifying preferred transfer 

292 client for transfer operations. 

293 

294 Current supported settings are: 

295 * auto (default) - Use the CRTTransferManager when calls 

296 are made with supported environment and settings. 

297 * classic - Only use the origin S3TransferManager with 

298 requests. Disables possible CRT upgrade on requests. 

299 """ 

300 super().__init__( 

301 multipart_threshold=multipart_threshold, 

302 max_request_concurrency=max_concurrency, 

303 multipart_chunksize=multipart_chunksize, 

304 num_download_attempts=num_download_attempts, 

305 max_io_queue_size=max_io_queue, 

306 io_chunksize=io_chunksize, 

307 max_bandwidth=max_bandwidth, 

308 ) 

309 # Some of the argument names are not the same as the inherited 

310 # S3TransferConfig so we add aliases so you can still access the 

311 # old version of the names. 

312 for alias in self.ALIAS: 

313 setattr(self, alias, getattr(self, self.ALIAS[alias])) 

314 self.use_threads = use_threads 

315 self.preferred_transfer_client = preferred_transfer_client 

316 

317 def __setattr__(self, name, value): 

318 # If the alias name is used, make sure we set the name that it points 

319 # to as that is what actually is used in governing the TransferManager. 

320 if name in self.ALIAS: 

321 super().__setattr__(self.ALIAS[name], value) 

322 # Always set the value of the actual name provided. 

323 super().__setattr__(name, value) 

324 

325 

326class S3Transfer: 

327 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS 

328 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS 

329 

330 def __init__(self, client=None, config=None, osutil=None, manager=None): 

331 if not client and not manager: 

332 raise ValueError( 

333 'Either a boto3.Client or s3transfer.manager.TransferManager ' 

334 'must be provided' 

335 ) 

336 if manager and any([client, config, osutil]): 

337 raise ValueError( 

338 'Manager cannot be provided with client, config, ' 

339 'nor osutil. These parameters are mutually exclusive.' 

340 ) 

341 if config is None: 

342 config = TransferConfig() 

343 if osutil is None: 

344 osutil = OSUtils() 

345 if manager: 

346 self._manager = manager 

347 else: 

348 self._manager = create_transfer_manager(client, config, osutil) 

349 

350 def upload_file( 

351 self, filename, bucket, key, callback=None, extra_args=None 

352 ): 

353 """Upload a file to an S3 object. 

354 

355 Variants have also been injected into S3 client, Bucket and Object. 

356 You don't have to use S3Transfer.upload_file() directly. 

357 

358 .. seealso:: 

359 :py:meth:`S3.Client.upload_file` 

360 :py:meth:`S3.Client.upload_fileobj` 

361 """ 

362 if isinstance(filename, PathLike): 

363 filename = fspath(filename) 

364 if not isinstance(filename, str): 

365 raise ValueError('Filename must be a string or a path-like object') 

366 

367 subscribers = self._get_subscribers(callback) 

368 future = self._manager.upload( 

369 filename, bucket, key, extra_args, subscribers 

370 ) 

371 try: 

372 future.result() 

373 # If a client error was raised, add the backwards compatibility layer 

374 # that raises a S3UploadFailedError. These specific errors were only 

375 # ever thrown for upload_parts but now can be thrown for any related 

376 # client error. 

377 except ClientError as e: 

378 raise S3UploadFailedError( 

379 "Failed to upload {} to {}: {}".format( 

380 filename, '/'.join([bucket, key]), e 

381 ) 

382 ) 

383 

384 def download_file( 

385 self, bucket, key, filename, extra_args=None, callback=None 

386 ): 

387 """Download an S3 object to a file. 

388 

389 Variants have also been injected into S3 client, Bucket and Object. 

390 You don't have to use S3Transfer.download_file() directly. 

391 

392 .. seealso:: 

393 :py:meth:`S3.Client.download_file` 

394 :py:meth:`S3.Client.download_fileobj` 

395 """ 

396 if isinstance(filename, PathLike): 

397 filename = fspath(filename) 

398 if not isinstance(filename, str): 

399 raise ValueError('Filename must be a string or a path-like object') 

400 

401 subscribers = self._get_subscribers(callback) 

402 future = self._manager.download( 

403 bucket, key, filename, extra_args, subscribers 

404 ) 

405 try: 

406 future.result() 

407 # This is for backwards compatibility where when retries are 

408 # exceeded we need to throw the same error from boto3 instead of 

409 # s3transfer's built in RetriesExceededError as current users are 

410 # catching the boto3 one instead of the s3transfer exception to do 

411 # their own retries. 

412 except S3TransferRetriesExceededError as e: 

413 raise RetriesExceededError(e.last_exception) 

414 

415 def _get_subscribers(self, callback): 

416 if not callback: 

417 return None 

418 return [ProgressCallbackInvoker(callback)] 

419 

420 def __enter__(self): 

421 return self 

422 

423 def __exit__(self, *args): 

424 self._manager.__exit__(*args) 

425 

426 

427class ProgressCallbackInvoker(BaseSubscriber): 

428 """A back-compat wrapper to invoke a provided callback via a subscriber 

429 

430 :param callback: A callable that takes a single positional argument for 

431 how many bytes were transferred. 

432 """ 

433 

434 def __init__(self, callback): 

435 self._callback = callback 

436 

437 def on_progress(self, bytes_transferred, **kwargs): 

438 self._callback(bytes_transferred)