Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# https://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Progress callbacks to monitor transfers 

22* Retries. While botocore handles retries for streaming uploads, 

23 it is not possible for it to handle retries for streaming 

24 downloads. This module handles retries for both cases so 

25 you don't need to implement any retry logic yourself. 

26 

27This module has a reasonable set of defaults. It also allows you 

28to configure many aspects of the transfer process including: 

29 

30* Multipart threshold size 

31* Max parallel downloads 

32* Socket timeouts 

33* Retry amounts 

34 

35There is no support for s3->s3 multipart copies at this 

36time. 

37 

38 

39.. _ref_s3transfer_usage: 

40 

41Usage 

42===== 

43 

44The simplest way to use this module is: 

45 

46.. code-block:: python 

47 

48 client = boto3.client('s3', 'us-west-2') 

49 transfer = S3Transfer(client) 

50 # Upload /tmp/myfile to s3://bucket/key 

51 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

52 

53 # Download s3://bucket/key to /tmp/myfile 

54 transfer.download_file('bucket', 'key', '/tmp/myfile') 

55 

56The ``upload_file`` and ``download_file`` methods also accept 

57``**kwargs``, which will be forwarded through to the corresponding 

58client operation. Here are a few examples using ``upload_file``:: 

59 

60 # Making the object public 

61 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

62 extra_args={'ACL': 'public-read'}) 

63 

64 # Setting metadata 

65 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

67 

68 # Setting content type 

69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

70 extra_args={'ContentType': "application/json"}) 

71 

72 

73The ``S3Transfer`` class also supports progress callbacks so you can 

74provide transfer progress to users. Both the ``upload_file`` and 

75``download_file`` methods take an optional ``callback`` parameter. 

76Here's an example of how to print a simple progress percentage 

77to the user: 

78 

79.. code-block:: python 

80 

81 class ProgressPercentage(object): 

82 def __init__(self, filename): 

83 self._filename = filename 

84 self._size = float(os.path.getsize(filename)) 

85 self._seen_so_far = 0 

86 self._lock = threading.Lock() 

87 

88 def __call__(self, bytes_amount): 

89 # To simplify we'll assume this is hooked up 

90 # to a single filename. 

91 with self._lock: 

92 self._seen_so_far += bytes_amount 

93 percentage = (self._seen_so_far / self._size) * 100 

94 sys.stdout.write( 

95 "\r%s %s / %s (%.2f%%)" % ( 

96 self._filename, self._seen_so_far, self._size, 

97 percentage)) 

98 sys.stdout.flush() 

99 

100 

101 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

102 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

103 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

104 callback=ProgressPercentage('/tmp/myfile')) 

105 

106 

107 

108You can also provide a TransferConfig object to the S3Transfer 

109object that gives you more fine grained control over the 

110transfer. For example: 

111 

112.. code-block:: python 

113 

114 client = boto3.client('s3', 'us-west-2') 

115 config = TransferConfig( 

116 multipart_threshold=8 * 1024 * 1024, 

117 max_concurrency=10, 

118 num_download_attempts=10, 

119 ) 

120 transfer = S3Transfer(client, config) 

121 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

122 

123 

124""" 

125 

126import logging 

127import threading 

128from os import PathLike, fspath, getpid 

129 

130from botocore.compat import HAS_CRT 

131from botocore.exceptions import ClientError, MissingDependencyException 

132from s3transfer.exceptions import ( 

133 RetriesExceededError as S3TransferRetriesExceededError, 

134) 

135from s3transfer.futures import NonThreadedExecutor 

136from s3transfer.manager import TransferConfig as S3TransferConfig 

137from s3transfer.manager import TransferManager 

138from s3transfer.subscribers import BaseSubscriber 

139from s3transfer.utils import OSUtils 

140 

141import boto3.s3.constants as constants 

142from boto3.compat import TRANSFER_CONFIG_SUPPORTS_CRT 

143from boto3.exceptions import ( 

144 RetriesExceededError, 

145 S3UploadFailedError, 

146) 

147 

148if HAS_CRT: 

149 import awscrt.s3 

150 

151 from boto3.crt import create_crt_transfer_manager 

152 

153KB = 1024 

154MB = KB * KB 

155 

156logger = logging.getLogger(__name__) 

157 

158 

159def create_transfer_manager(client, config, osutil=None): 

160 """Creates a transfer manager based on configuration 

161 

162 :type client: boto3.client 

163 :param client: The S3 client to use 

164 

165 :type config: boto3.s3.transfer.TransferConfig 

166 :param config: The transfer config to use 

167 

168 :type osutil: s3transfer.utils.OSUtils 

169 :param osutil: The os utility to use 

170 

171 :rtype: s3transfer.manager.TransferManager 

172 :returns: A transfer manager based on parameters provided 

173 """ 

174 if _should_use_crt(config): 

175 crt_transfer_manager = create_crt_transfer_manager(client, config) 

176 if crt_transfer_manager is not None: 

177 logger.debug( 

178 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}" 

179 ) 

180 return crt_transfer_manager 

181 

182 # If we don't resolve something above, fallback to the default. 

183 logger.debug( 

184 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}" 

185 ) 

186 return _create_default_transfer_manager(client, config, osutil) 

187 

188 

189def _should_use_crt(config): 

190 # This feature requires awscrt>=0.19.18 

191 has_min_crt = HAS_CRT and has_minimum_crt_version((0, 19, 18)) 

192 is_optimized_instance = has_min_crt and awscrt.s3.is_optimized_for_system() 

193 pref_transfer_client = config.preferred_transfer_client.lower() 

194 

195 if ( 

196 pref_transfer_client == constants.CRT_TRANSFER_CLIENT 

197 and not has_min_crt 

198 ): 

199 msg = ( 

200 "CRT transfer client is configured but is missing minimum CRT " 

201 f"version. CRT installed: {HAS_CRT}" 

202 ) 

203 if HAS_CRT: 

204 msg += f", with version: {awscrt.__version__}" 

205 raise MissingDependencyException(msg=msg) 

206 

207 if ( 

208 is_optimized_instance 

209 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT 

210 ) or pref_transfer_client == constants.CRT_TRANSFER_CLIENT: 

211 logger.debug( 

212 "Attempting to use CRTTransferManager. Config settings may be ignored." 

213 ) 

214 return True 

215 

216 logger.debug( 

217 "Opting out of CRT Transfer Manager. Preferred client: " 

218 f"{pref_transfer_client}, CRT available: {HAS_CRT}, " 

219 f"Instance Optimized: {is_optimized_instance}." 

220 ) 

221 return False 

222 

223 

224def has_minimum_crt_version(minimum_version): 

225 """Not intended for use outside boto3.""" 

226 if not HAS_CRT: 

227 return False 

228 

229 crt_version_str = awscrt.__version__ 

230 try: 

231 crt_version_ints = map(int, crt_version_str.split(".")) 

232 crt_version_tuple = tuple(crt_version_ints) 

233 except (TypeError, ValueError): 

234 return False 

235 

236 return crt_version_tuple >= minimum_version 

237 

238 

239def _create_default_transfer_manager(client, config, osutil): 

240 """Create the default TransferManager implementation for s3transfer.""" 

241 executor_cls = None 

242 if not config.use_threads: 

243 executor_cls = NonThreadedExecutor 

244 return TransferManager(client, config, osutil, executor_cls) 

245 

246 

247class TransferConfig(S3TransferConfig): 

248 ALIAS = { 

249 'max_concurrency': 'max_request_concurrency', 

250 'max_io_queue': 'max_io_queue_size', 

251 } 

252 DEFAULTS = { 

253 'multipart_threshold': 8 * MB, 

254 'max_concurrency': 10, 

255 'max_request_concurrency': 10, 

256 'multipart_chunksize': 8 * MB, 

257 'num_download_attempts': 5, 

258 'max_io_queue': 100, 

259 'max_io_queue_size': 100, 

260 'io_chunksize': 256 * KB, 

261 'use_threads': True, 

262 'max_bandwidth': None, 

263 'preferred_transfer_client': constants.AUTO_RESOLVE_TRANSFER_CLIENT, 

264 } 

265 

266 def __init__( 

267 self, 

268 multipart_threshold=None, 

269 max_concurrency=None, 

270 multipart_chunksize=None, 

271 num_download_attempts=None, 

272 max_io_queue=None, 

273 io_chunksize=None, 

274 use_threads=None, 

275 max_bandwidth=None, 

276 preferred_transfer_client=None, 

277 ): 

278 """Configuration object for managed S3 transfers 

279 

280 :param multipart_threshold: The transfer size threshold for which 

281 multipart uploads, downloads, and copies will automatically be 

282 triggered. 

283 

284 :param max_concurrency: The maximum number of threads that will be 

285 making requests to perform a transfer. If ``use_threads`` is 

286 set to ``False``, the value provided is ignored as the transfer 

287 will only ever use the current thread. 

288 

289 :param multipart_chunksize: The partition size of each part for a 

290 multipart transfer. 

291 

292 :param num_download_attempts: The number of download attempts that 

293 will be retried upon errors with downloading an object in S3. 

294 Note that these retries account for errors that occur when 

295 streaming down the data from s3 (i.e. socket errors and read 

296 timeouts that occur after receiving an OK response from s3). 

297 Other retryable exceptions such as throttling errors and 5xx 

298 errors are already retried by botocore (this default is 5). This 

299 does not take into account the number of exceptions retried by 

300 botocore. Note: This value is ignored when resolved transfer 

301 manager type is CRTTransferManager. 

302 

303 :param max_io_queue: The maximum amount of read parts that can be 

304 queued in memory to be written for a download. The size of each 

305 of these read parts is at most the size of ``io_chunksize``. 

306 Note: This value is ignored when resolved transfer manager type 

307 is CRTTransferManager. 

308 

309 :param io_chunksize: The max size of each chunk in the io queue. 

310 Currently, this is size used when ``read`` is called on the 

311 downloaded stream as well. Note: This value is ignored when 

312 resolved transfer manager type is CRTTransferManager. 

313 

314 :param use_threads: If True, threads will be used when performing 

315 S3 transfers. If False, no threads will be used in 

316 performing transfers; all logic will be run in the current thread. 

317 Note: This value is ignored when resolved transfer manager type is 

318 CRTTransferManager. 

319 

320 :param max_bandwidth: The maximum bandwidth that will be consumed 

321 in uploading and downloading file content. The value is an integer 

322 in terms of bytes per second. Note: This value is ignored when 

323 resolved transfer manager type is CRTTransferManager. 

324 

325 :param preferred_transfer_client: String specifying preferred transfer 

326 client for transfer operations. 

327 

328 Current supported settings are: 

329 * auto (default) - Use the CRTTransferManager when calls 

330 are made with supported environment and settings. 

331 * classic - Only use the origin S3TransferManager with 

332 requests. Disables possible CRT upgrade on requests. 

333 * crt - Only use the CRTTransferManager with requests. 

334 """ 

335 init_args = { 

336 'multipart_threshold': multipart_threshold, 

337 'max_concurrency': max_concurrency, 

338 'multipart_chunksize': multipart_chunksize, 

339 'num_download_attempts': num_download_attempts, 

340 'max_io_queue': max_io_queue, 

341 'io_chunksize': io_chunksize, 

342 'use_threads': use_threads, 

343 'max_bandwidth': max_bandwidth, 

344 'preferred_transfer_client': preferred_transfer_client, 

345 } 

346 resolved = self._resolve_init_args(init_args) 

347 super().__init__( 

348 multipart_threshold=resolved['multipart_threshold'], 

349 max_request_concurrency=resolved['max_concurrency'], 

350 multipart_chunksize=resolved['multipart_chunksize'], 

351 num_download_attempts=resolved['num_download_attempts'], 

352 max_io_queue_size=resolved['max_io_queue'], 

353 io_chunksize=resolved['io_chunksize'], 

354 max_bandwidth=resolved['max_bandwidth'], 

355 ) 

356 # Some of the argument names are not the same as the inherited 

357 # S3TransferConfig so we add aliases so you can still access the 

358 # old version of the names. 

359 for alias in self.ALIAS: 

360 setattr( 

361 self, 

362 alias, 

363 object.__getattribute__(self, self.ALIAS[alias]), 

364 ) 

365 self.use_threads = resolved['use_threads'] 

366 self.preferred_transfer_client = resolved['preferred_transfer_client'] 

367 

368 def __setattr__(self, name, value): 

369 # If the alias name is used, make sure we set the name that it points 

370 # to as that is what actually is used in governing the TransferManager. 

371 if name in self.ALIAS: 

372 super().__setattr__(self.ALIAS[name], value) 

373 # Always set the value of the actual name provided. 

374 super().__setattr__(name, value) 

375 

376 def __getattribute__(self, item): 

377 value = object.__getattribute__(self, item) 

378 if not TRANSFER_CONFIG_SUPPORTS_CRT: 

379 return value 

380 defaults = object.__getattribute__(self, 'DEFAULTS') 

381 if item not in defaults: 

382 return value 

383 if value is self.UNSET_DEFAULT: 

384 return defaults[item] 

385 return value 

386 

387 def _resolve_init_args(self, init_args): 

388 resolved = {} 

389 for init_arg, val in init_args.items(): 

390 if val is not None: 

391 resolved[init_arg] = val 

392 elif TRANSFER_CONFIG_SUPPORTS_CRT: 

393 resolved[init_arg] = self.UNSET_DEFAULT 

394 else: 

395 resolved[init_arg] = self.DEFAULTS[init_arg] 

396 return resolved 

397 

398 

399class S3Transfer: 

400 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS 

401 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS 

402 ALLOWED_COPY_ARGS = TransferManager.ALLOWED_COPY_ARGS 

403 

404 def __init__(self, client=None, config=None, osutil=None, manager=None): 

405 if not client and not manager: 

406 raise ValueError( 

407 'Either a boto3.Client or s3transfer.manager.TransferManager ' 

408 'must be provided' 

409 ) 

410 if manager and any([client, config, osutil]): 

411 raise ValueError( 

412 'Manager cannot be provided with client, config, ' 

413 'nor osutil. These parameters are mutually exclusive.' 

414 ) 

415 if config is None: 

416 config = TransferConfig() 

417 if osutil is None: 

418 osutil = OSUtils() 

419 if manager: 

420 self._manager = manager 

421 else: 

422 self._manager = create_transfer_manager(client, config, osutil) 

423 

424 def upload_file( 

425 self, filename, bucket, key, callback=None, extra_args=None 

426 ): 

427 """Upload a file to an S3 object. 

428 

429 Variants have also been injected into S3 client, Bucket and Object. 

430 You don't have to use S3Transfer.upload_file() directly. 

431 

432 .. seealso:: 

433 :py:meth:`S3.Client.upload_file` 

434 :py:meth:`S3.Client.upload_fileobj` 

435 """ 

436 if isinstance(filename, PathLike): 

437 filename = fspath(filename) 

438 if not isinstance(filename, str): 

439 raise ValueError('Filename must be a string or a path-like object') 

440 

441 subscribers = self._get_subscribers(callback) 

442 future = self._manager.upload( 

443 filename, bucket, key, extra_args, subscribers 

444 ) 

445 try: 

446 future.result() 

447 # If a client error was raised, add the backwards compatibility layer 

448 # that raises a S3UploadFailedError. These specific errors were only 

449 # ever thrown for upload_parts but now can be thrown for any related 

450 # client error. 

451 except ClientError as e: 

452 raise S3UploadFailedError( 

453 "Failed to upload {} to {}: {}".format( 

454 filename, '/'.join([bucket, key]), e 

455 ) 

456 ) 

457 

458 def download_file( 

459 self, bucket, key, filename, extra_args=None, callback=None 

460 ): 

461 """Download an S3 object to a file. 

462 

463 Variants have also been injected into S3 client, Bucket and Object. 

464 You don't have to use S3Transfer.download_file() directly. 

465 

466 .. seealso:: 

467 :py:meth:`S3.Client.download_file` 

468 :py:meth:`S3.Client.download_fileobj` 

469 """ 

470 if isinstance(filename, PathLike): 

471 filename = fspath(filename) 

472 if not isinstance(filename, str): 

473 raise ValueError('Filename must be a string or a path-like object') 

474 

475 subscribers = self._get_subscribers(callback) 

476 future = self._manager.download( 

477 bucket, key, filename, extra_args, subscribers 

478 ) 

479 try: 

480 future.result() 

481 # This is for backwards compatibility where when retries are 

482 # exceeded we need to throw the same error from boto3 instead of 

483 # s3transfer's built in RetriesExceededError as current users are 

484 # catching the boto3 one instead of the s3transfer exception to do 

485 # their own retries. 

486 except S3TransferRetriesExceededError as e: 

487 raise RetriesExceededError(e.last_exception) 

488 

489 def _get_subscribers(self, callback): 

490 if not callback: 

491 return None 

492 return [ProgressCallbackInvoker(callback)] 

493 

494 def __enter__(self): 

495 return self 

496 

497 def __exit__(self, *args): 

498 self._manager.__exit__(*args) 

499 

500 

501class ProgressCallbackInvoker(BaseSubscriber): 

502 """A back-compat wrapper to invoke a provided callback via a subscriber 

503 

504 :param callback: A callable that takes a single positional argument for 

505 how many bytes were transferred. 

506 """ 

507 

508 def __init__(self, callback): 

509 self._callback = callback 

510 

511 def on_progress(self, bytes_transferred, **kwargs): 

512 self._callback(bytes_transferred)