Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# https://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Progress callbacks to monitor transfers 

22* Retries. While botocore handles retries for streaming uploads, 

23 it is not possible for it to handle retries for streaming 

24 downloads. This module handles retries for both cases so 

25 you don't need to implement any retry logic yourself. 

26 

27This module has a reasonable set of defaults. It also allows you 

28to configure many aspects of the transfer process including: 

29 

30* Multipart threshold size 

31* Max parallel downloads 

32* Socket timeouts 

33* Retry amounts 

34 

35There is no support for s3->s3 multipart copies at this 

36time. 

37 

38 

39.. _ref_s3transfer_usage: 

40 

41Usage 

42===== 

43 

44The simplest way to use this module is: 

45 

46.. code-block:: python 

47 

48 client = boto3.client('s3', 'us-west-2') 

49 transfer = S3Transfer(client) 

50 # Upload /tmp/myfile to s3://bucket/key 

51 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

52 

53 # Download s3://bucket/key to /tmp/myfile 

54 transfer.download_file('bucket', 'key', '/tmp/myfile') 

55 

56The ``upload_file`` and ``download_file`` methods also accept 

57``**kwargs``, which will be forwarded through to the corresponding 

58client operation. Here are a few examples using ``upload_file``:: 

59 

60 # Making the object public 

61 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

62 extra_args={'ACL': 'public-read'}) 

63 

64 # Setting metadata 

65 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

67 

68 # Setting content type 

69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

70 extra_args={'ContentType': "application/json"}) 

71 

72 

73The ``S3Transfer`` class also supports progress callbacks so you can 

74provide transfer progress to users. Both the ``upload_file`` and 

75``download_file`` methods take an optional ``callback`` parameter. 

76Here's an example of how to print a simple progress percentage 

77to the user: 

78 

79.. code-block:: python 

80 

81 class ProgressPercentage(object): 

82 def __init__(self, filename): 

83 self._filename = filename 

84 self._size = float(os.path.getsize(filename)) 

85 self._seen_so_far = 0 

86 self._lock = threading.Lock() 

87 

88 def __call__(self, bytes_amount): 

89 # To simplify we'll assume this is hooked up 

90 # to a single filename. 

91 with self._lock: 

92 self._seen_so_far += bytes_amount 

93 percentage = (self._seen_so_far / self._size) * 100 

94 sys.stdout.write( 

95 "\r%s %s / %s (%.2f%%)" % ( 

96 self._filename, self._seen_so_far, self._size, 

97 percentage)) 

98 sys.stdout.flush() 

99 

100 

101 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

102 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

103 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

104 callback=ProgressPercentage('/tmp/myfile')) 

105 

106 

107 

108You can also provide a TransferConfig object to the S3Transfer 

109object that gives you more fine grained control over the 

110transfer. For example: 

111 

112.. code-block:: python 

113 

114 client = boto3.client('s3', 'us-west-2') 

115 config = TransferConfig( 

116 multipart_threshold=8 * 1024 * 1024, 

117 max_concurrency=10, 

118 num_download_attempts=10, 

119 ) 

120 transfer = S3Transfer(client, config) 

121 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

122 

123 

124""" 

125 

126import logging 

127import threading 

128from os import PathLike, fspath, getpid 

129 

130from botocore.compat import HAS_CRT 

131from botocore.exceptions import ClientError, MissingDependencyException 

132from s3transfer.exceptions import ( 

133 RetriesExceededError as S3TransferRetriesExceededError, 

134) 

135from s3transfer.futures import NonThreadedExecutor 

136from s3transfer.manager import TransferConfig as S3TransferConfig 

137from s3transfer.manager import TransferManager 

138from s3transfer.subscribers import BaseSubscriber 

139from s3transfer.utils import OSUtils 

140 

141import boto3.s3.constants as constants 

142from boto3.compat import TRANSFER_CONFIG_SUPPORTS_CRT 

143from boto3.exceptions import ( 

144 RetriesExceededError, 

145 S3UploadFailedError, 

146) 

147 

148if HAS_CRT: 

149 import awscrt.s3 

150 

151 from boto3.crt import create_crt_transfer_manager 

152 

153KB = 1024 

154MB = KB * KB 

155 

156logger = logging.getLogger(__name__) 

157 

158 

159def create_transfer_manager(client, config, osutil=None): 

160 """Creates a transfer manager based on configuration 

161 

162 :type client: boto3.client 

163 :param client: The S3 client to use 

164 

165 :type config: boto3.s3.transfer.TransferConfig 

166 :param config: The transfer config to use 

167 

168 :type osutil: s3transfer.utils.OSUtils 

169 :param osutil: The os utility to use 

170 

171 :rtype: s3transfer.manager.TransferManager 

172 :returns: A transfer manager based on parameters provided 

173 """ 

174 if _should_use_crt(config): 

175 crt_transfer_manager = create_crt_transfer_manager(client, config) 

176 if crt_transfer_manager is not None: 

177 logger.debug( 

178 "Using CRT client. pid: %s, thread: %s", 

179 getpid(), 

180 threading.get_ident(), 

181 ) 

182 return crt_transfer_manager 

183 

184 # If we don't resolve something above, fallback to the default. 

185 logger.debug( 

186 "Using default client. pid: %s, thread: %s", 

187 getpid(), 

188 threading.get_ident(), 

189 ) 

190 return _create_default_transfer_manager(client, config, osutil) 

191 

192 

193def _should_use_crt(config): 

194 # This feature requires awscrt>=0.19.18 

195 has_min_crt = HAS_CRT and has_minimum_crt_version((0, 19, 18)) 

196 is_optimized_instance = has_min_crt and awscrt.s3.is_optimized_for_system() 

197 pref_transfer_client = config.preferred_transfer_client.lower() 

198 

199 if ( 

200 pref_transfer_client == constants.CRT_TRANSFER_CLIENT 

201 and not has_min_crt 

202 ): 

203 msg = ( 

204 "CRT transfer client is configured but is missing minimum CRT " 

205 f"version. CRT installed: {HAS_CRT}" 

206 ) 

207 if HAS_CRT: 

208 msg += f", with version: {awscrt.__version__}" 

209 raise MissingDependencyException(msg=msg) 

210 

211 if ( 

212 is_optimized_instance 

213 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT 

214 ) or pref_transfer_client == constants.CRT_TRANSFER_CLIENT: 

215 logger.debug( 

216 "Attempting to use CRTTransferManager. Config settings may be ignored." 

217 ) 

218 return True 

219 

220 logger.debug( 

221 "Opting out of CRT Transfer Manager. " 

222 "Preferred client: %s, CRT available: %s, Instance Optimized: %s", 

223 pref_transfer_client, 

224 HAS_CRT, 

225 is_optimized_instance, 

226 ) 

227 return False 

228 

229 

230def has_minimum_crt_version(minimum_version): 

231 """Not intended for use outside boto3.""" 

232 if not HAS_CRT: 

233 return False 

234 

235 crt_version_str = awscrt.__version__ 

236 try: 

237 crt_version_ints = map(int, crt_version_str.split(".")) 

238 crt_version_tuple = tuple(crt_version_ints) 

239 except (TypeError, ValueError): 

240 return False 

241 

242 return crt_version_tuple >= minimum_version 

243 

244 

245def _create_default_transfer_manager(client, config, osutil): 

246 """Create the default TransferManager implementation for s3transfer.""" 

247 executor_cls = None 

248 if not config.use_threads: 

249 executor_cls = NonThreadedExecutor 

250 return TransferManager(client, config, osutil, executor_cls) 

251 

252 

253class TransferConfig(S3TransferConfig): 

254 ALIAS = { 

255 'max_concurrency': 'max_request_concurrency', 

256 'max_io_queue': 'max_io_queue_size', 

257 } 

258 DEFAULTS = { 

259 'multipart_threshold': 8 * MB, 

260 'max_concurrency': 10, 

261 'max_request_concurrency': 10, 

262 'multipart_chunksize': 8 * MB, 

263 'num_download_attempts': 5, 

264 'max_io_queue': 100, 

265 'max_io_queue_size': 100, 

266 'io_chunksize': 256 * KB, 

267 'use_threads': True, 

268 'max_bandwidth': None, 

269 'preferred_transfer_client': constants.AUTO_RESOLVE_TRANSFER_CLIENT, 

270 } 

271 

272 def __init__( 

273 self, 

274 multipart_threshold=None, 

275 max_concurrency=None, 

276 multipart_chunksize=None, 

277 num_download_attempts=None, 

278 max_io_queue=None, 

279 io_chunksize=None, 

280 use_threads=None, 

281 max_bandwidth=None, 

282 preferred_transfer_client=None, 

283 ): 

284 """Configuration object for managed S3 transfers 

285 

286 :param multipart_threshold: The transfer size threshold for which 

287 multipart uploads, downloads, and copies will automatically be 

288 triggered. 

289 

290 :param max_concurrency: The maximum number of threads that will be 

291 making requests to perform a transfer. If ``use_threads`` is 

292 set to ``False``, the value provided is ignored as the transfer 

293 will only ever use the current thread. 

294 

295 :param multipart_chunksize: The partition size of each part for a 

296 multipart transfer. 

297 

298 :param num_download_attempts: The number of download attempts that 

299 will be retried upon errors with downloading an object in S3. 

300 Note that these retries account for errors that occur when 

301 streaming down the data from s3 (i.e. socket errors and read 

302 timeouts that occur after receiving an OK response from s3). 

303 Other retryable exceptions such as throttling errors and 5xx 

304 errors are already retried by botocore (this default is 5). This 

305 does not take into account the number of exceptions retried by 

306 botocore. Note: This value is ignored when resolved transfer 

307 manager type is CRTTransferManager. 

308 

309 :param max_io_queue: The maximum amount of read parts that can be 

310 queued in memory to be written for a download. The size of each 

311 of these read parts is at most the size of ``io_chunksize``. 

312 Note: This value is ignored when resolved transfer manager type 

313 is CRTTransferManager. 

314 

315 :param io_chunksize: The max size of each chunk in the io queue. 

316 Currently, this is size used when ``read`` is called on the 

317 downloaded stream as well. Note: This value is ignored when 

318 resolved transfer manager type is CRTTransferManager. 

319 

320 :param use_threads: If True, threads will be used when performing 

321 S3 transfers. If False, no threads will be used in 

322 performing transfers; all logic will be run in the current thread. 

323 Note: This value is ignored when resolved transfer manager type is 

324 CRTTransferManager. 

325 

326 :param max_bandwidth: The maximum bandwidth that will be consumed 

327 in uploading and downloading file content. The value is an integer 

328 in terms of bytes per second. Note: This value is ignored when 

329 resolved transfer manager type is CRTTransferManager. 

330 

331 :param preferred_transfer_client: String specifying preferred transfer 

332 client for transfer operations. 

333 

334 Current supported settings are: 

335 * auto (default) - Use the CRTTransferManager when calls 

336 are made with supported environment and settings. 

337 * classic - Only use the origin S3TransferManager with 

338 requests. Disables possible CRT upgrade on requests. 

339 * crt - Only use the CRTTransferManager with requests. 

340 """ 

341 init_args = { 

342 'multipart_threshold': multipart_threshold, 

343 'max_concurrency': max_concurrency, 

344 'multipart_chunksize': multipart_chunksize, 

345 'num_download_attempts': num_download_attempts, 

346 'max_io_queue': max_io_queue, 

347 'io_chunksize': io_chunksize, 

348 'use_threads': use_threads, 

349 'max_bandwidth': max_bandwidth, 

350 'preferred_transfer_client': preferred_transfer_client, 

351 } 

352 resolved = self._resolve_init_args(init_args) 

353 super().__init__( 

354 multipart_threshold=resolved['multipart_threshold'], 

355 max_request_concurrency=resolved['max_concurrency'], 

356 multipart_chunksize=resolved['multipart_chunksize'], 

357 num_download_attempts=resolved['num_download_attempts'], 

358 max_io_queue_size=resolved['max_io_queue'], 

359 io_chunksize=resolved['io_chunksize'], 

360 max_bandwidth=resolved['max_bandwidth'], 

361 ) 

362 # Some of the argument names are not the same as the inherited 

363 # S3TransferConfig so we add aliases so you can still access the 

364 # old version of the names. 

365 for alias in self.ALIAS: 

366 setattr( 

367 self, 

368 alias, 

369 object.__getattribute__(self, self.ALIAS[alias]), 

370 ) 

371 self.use_threads = resolved['use_threads'] 

372 self.preferred_transfer_client = resolved['preferred_transfer_client'] 

373 

374 def __setattr__(self, name, value): 

375 # If the alias name is used, make sure we set the name that it points 

376 # to as that is what actually is used in governing the TransferManager. 

377 if name in self.ALIAS: 

378 super().__setattr__(self.ALIAS[name], value) 

379 # Always set the value of the actual name provided. 

380 super().__setattr__(name, value) 

381 

382 def __getattribute__(self, item): 

383 value = object.__getattribute__(self, item) 

384 if not TRANSFER_CONFIG_SUPPORTS_CRT: 

385 return value 

386 defaults = object.__getattribute__(self, 'DEFAULTS') 

387 if item not in defaults: 

388 return value 

389 if value is self.UNSET_DEFAULT: 

390 return defaults[item] 

391 return value 

392 

393 def _resolve_init_args(self, init_args): 

394 resolved = {} 

395 for init_arg, val in init_args.items(): 

396 if val is not None: 

397 resolved[init_arg] = val 

398 elif TRANSFER_CONFIG_SUPPORTS_CRT: 

399 resolved[init_arg] = self.UNSET_DEFAULT 

400 else: 

401 resolved[init_arg] = self.DEFAULTS[init_arg] 

402 return resolved 

403 

404 

405class S3Transfer: 

406 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS 

407 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS 

408 ALLOWED_COPY_ARGS = TransferManager.ALLOWED_COPY_ARGS 

409 

410 def __init__(self, client=None, config=None, osutil=None, manager=None): 

411 if not client and not manager: 

412 raise ValueError( 

413 'Either a boto3.Client or s3transfer.manager.TransferManager ' 

414 'must be provided' 

415 ) 

416 if manager and any([client, config, osutil]): 

417 raise ValueError( 

418 'Manager cannot be provided with client, config, ' 

419 'nor osutil. These parameters are mutually exclusive.' 

420 ) 

421 if config is None: 

422 config = TransferConfig() 

423 if osutil is None: 

424 osutil = OSUtils() 

425 if manager: 

426 self._manager = manager 

427 else: 

428 self._manager = create_transfer_manager(client, config, osutil) 

429 

430 def upload_file( 

431 self, filename, bucket, key, callback=None, extra_args=None 

432 ): 

433 """Upload a file to an S3 object. 

434 

435 Variants have also been injected into S3 client, Bucket and Object. 

436 You don't have to use S3Transfer.upload_file() directly. 

437 

438 .. seealso:: 

439 :py:meth:`S3.Client.upload_file` 

440 :py:meth:`S3.Client.upload_fileobj` 

441 """ 

442 if isinstance(filename, PathLike): 

443 filename = fspath(filename) 

444 if not isinstance(filename, str): 

445 raise ValueError('Filename must be a string or a path-like object') 

446 

447 subscribers = self._get_subscribers(callback) 

448 future = self._manager.upload( 

449 filename, bucket, key, extra_args, subscribers 

450 ) 

451 try: 

452 future.result() 

453 # If a client error was raised, add the backwards compatibility layer 

454 # that raises a S3UploadFailedError. These specific errors were only 

455 # ever thrown for upload_parts but now can be thrown for any related 

456 # client error. 

457 except ClientError as e: 

458 raise S3UploadFailedError( 

459 f"Failed to upload {filename} to {bucket}/{key}: {e}" 

460 ) 

461 

462 def download_file( 

463 self, bucket, key, filename, extra_args=None, callback=None 

464 ): 

465 """Download an S3 object to a file. 

466 

467 Variants have also been injected into S3 client, Bucket and Object. 

468 You don't have to use S3Transfer.download_file() directly. 

469 

470 .. seealso:: 

471 :py:meth:`S3.Client.download_file` 

472 :py:meth:`S3.Client.download_fileobj` 

473 """ 

474 if isinstance(filename, PathLike): 

475 filename = fspath(filename) 

476 if not isinstance(filename, str): 

477 raise ValueError('Filename must be a string or a path-like object') 

478 

479 subscribers = self._get_subscribers(callback) 

480 future = self._manager.download( 

481 bucket, key, filename, extra_args, subscribers 

482 ) 

483 try: 

484 future.result() 

485 # This is for backwards compatibility where when retries are 

486 # exceeded we need to throw the same error from boto3 instead of 

487 # s3transfer's built in RetriesExceededError as current users are 

488 # catching the boto3 one instead of the s3transfer exception to do 

489 # their own retries. 

490 except S3TransferRetriesExceededError as e: 

491 raise RetriesExceededError(e.last_exception) 

492 

493 def _get_subscribers(self, callback): 

494 if not callback: 

495 return None 

496 return [ProgressCallbackInvoker(callback)] 

497 

498 def __enter__(self): 

499 return self 

500 

501 def __exit__(self, *args): 

502 self._manager.__exit__(*args) 

503 

504 

505class ProgressCallbackInvoker(BaseSubscriber): 

506 """A back-compat wrapper to invoke a provided callback via a subscriber 

507 

508 :param callback: A callable that takes a single positional argument for 

509 how many bytes were transferred. 

510 """ 

511 

512 def __init__(self, callback): 

513 self._callback = callback 

514 

515 def on_progress(self, bytes_transferred, **kwargs): 

516 self._callback(bytes_transferred)