Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/boto3/s3/transfer.py: 33%

115 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# https://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Progress callbacks to monitor transfers 

22* Retries. While botocore handles retries for streaming uploads, 

23 it is not possible for it to handle retries for streaming 

24 downloads. This module handles retries for both cases so 

25 you don't need to implement any retry logic yourself. 

26 

27This module has a reasonable set of defaults. It also allows you 

28to configure many aspects of the transfer process including: 

29 

30* Multipart threshold size 

31* Max parallel downloads 

32* Socket timeouts 

33* Retry amounts 

34 

35There is no support for s3->s3 multipart copies at this 

36time. 

37 

38 

39.. _ref_s3transfer_usage: 

40 

41Usage 

42===== 

43 

44The simplest way to use this module is: 

45 

46.. code-block:: python 

47 

48 client = boto3.client('s3', 'us-west-2') 

49 transfer = S3Transfer(client) 

50 # Upload /tmp/myfile to s3://bucket/key 

51 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

52 

53 # Download s3://bucket/key to /tmp/myfile 

54 transfer.download_file('bucket', 'key', '/tmp/myfile') 

55 

56The ``upload_file`` and ``download_file`` methods also accept 

57``**kwargs``, which will be forwarded through to the corresponding 

58client operation. Here are a few examples using ``upload_file``:: 

59 

60 # Making the object public 

61 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

62 extra_args={'ACL': 'public-read'}) 

63 

64 # Setting metadata 

65 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

66 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

67 

68 # Setting content type 

69 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

70 extra_args={'ContentType': "application/json"}) 

71 

72 

73The ``S3Transfer`` class also supports progress callbacks so you can 

74provide transfer progress to users. Both the ``upload_file`` and 

75``download_file`` methods take an optional ``callback`` parameter. 

76Here's an example of how to print a simple progress percentage 

77to the user: 

78 

79.. code-block:: python 

80 

81 class ProgressPercentage(object): 

82 def __init__(self, filename): 

83 self._filename = filename 

84 self._size = float(os.path.getsize(filename)) 

85 self._seen_so_far = 0 

86 self._lock = threading.Lock() 

87 

88 def __call__(self, bytes_amount): 

89 # To simplify we'll assume this is hooked up 

90 # to a single filename. 

91 with self._lock: 

92 self._seen_so_far += bytes_amount 

93 percentage = (self._seen_so_far / self._size) * 100 

94 sys.stdout.write( 

95 "\r%s %s / %s (%.2f%%)" % ( 

96 self._filename, self._seen_so_far, self._size, 

97 percentage)) 

98 sys.stdout.flush() 

99 

100 

101 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

102 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

103 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

104 callback=ProgressPercentage('/tmp/myfile')) 

105 

106 

107 

108You can also provide a TransferConfig object to the S3Transfer 

109object that gives you more fine grained control over the 

110transfer. For example: 

111 

112.. code-block:: python 

113 

114 client = boto3.client('s3', 'us-west-2') 

115 config = TransferConfig( 

116 multipart_threshold=8 * 1024 * 1024, 

117 max_concurrency=10, 

118 num_download_attempts=10, 

119 ) 

120 transfer = S3Transfer(client, config) 

121 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

122 

123 

124""" 

125import logging 

126import threading 

127from os import PathLike, fspath, getpid 

128 

129from botocore.compat import HAS_CRT 

130from botocore.exceptions import ClientError 

131from s3transfer.exceptions import ( 

132 RetriesExceededError as S3TransferRetriesExceededError, 

133) 

134from s3transfer.futures import NonThreadedExecutor 

135from s3transfer.manager import TransferConfig as S3TransferConfig 

136from s3transfer.manager import TransferManager 

137from s3transfer.subscribers import BaseSubscriber 

138from s3transfer.utils import OSUtils 

139 

140import boto3.s3.constants as constants 

141from boto3.exceptions import RetriesExceededError, S3UploadFailedError 

142 

143if HAS_CRT: 

144 import awscrt.s3 

145 

146 from boto3.crt import create_crt_transfer_manager 

147 

148KB = 1024 

149MB = KB * KB 

150 

151logger = logging.getLogger(__name__) 

152 

153 

154def create_transfer_manager(client, config, osutil=None): 

155 """Creates a transfer manager based on configuration 

156 

157 :type client: boto3.client 

158 :param client: The S3 client to use 

159 

160 :type config: boto3.s3.transfer.TransferConfig 

161 :param config: The transfer config to use 

162 

163 :type osutil: s3transfer.utils.OSUtils 

164 :param osutil: The os utility to use 

165 

166 :rtype: s3transfer.manager.TransferManager 

167 :returns: A transfer manager based on parameters provided 

168 """ 

169 if _should_use_crt(config): 

170 crt_transfer_manager = create_crt_transfer_manager(client, config) 

171 if crt_transfer_manager is not None: 

172 logger.debug( 

173 f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}" 

174 ) 

175 return crt_transfer_manager 

176 

177 # If we don't resolve something above, fallback to the default. 

178 logger.debug( 

179 f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}" 

180 ) 

181 return _create_default_transfer_manager(client, config, osutil) 

182 

183 

184def _should_use_crt(config): 

185 # This feature requires awscrt>=0.19.17 

186 if HAS_CRT and has_minimum_crt_version((0, 19, 17)): 

187 is_optimized_instance = awscrt.s3.is_optimized_for_system() 

188 else: 

189 is_optimized_instance = False 

190 pref_transfer_client = config.preferred_transfer_client.lower() 

191 

192 if ( 

193 is_optimized_instance 

194 and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT 

195 ): 

196 logger.debug( 

197 "Attempting to use CRTTransferManager. Config settings may be ignored." 

198 ) 

199 return True 

200 

201 logger.debug( 

202 "Opting out of CRT Transfer Manager. Preferred client: " 

203 f"{pref_transfer_client}, CRT available: {HAS_CRT}, " 

204 f"Instance Optimized: {is_optimized_instance}." 

205 ) 

206 return False 

207 

208 

209def has_minimum_crt_version(minimum_version): 

210 """Not intended for use outside boto3.""" 

211 if not HAS_CRT: 

212 return False 

213 

214 crt_version_str = awscrt.__version__ 

215 try: 

216 crt_version_ints = map(int, crt_version_str.split(".")) 

217 crt_version_tuple = tuple(crt_version_ints) 

218 except (TypeError, ValueError): 

219 return False 

220 

221 return crt_version_tuple >= minimum_version 

222 

223 

224def _create_default_transfer_manager(client, config, osutil): 

225 """Create the default TransferManager implementation for s3transfer.""" 

226 executor_cls = None 

227 if not config.use_threads: 

228 executor_cls = NonThreadedExecutor 

229 return TransferManager(client, config, osutil, executor_cls) 

230 

231 

232class TransferConfig(S3TransferConfig): 

233 ALIAS = { 

234 'max_concurrency': 'max_request_concurrency', 

235 'max_io_queue': 'max_io_queue_size', 

236 } 

237 

238 def __init__( 

239 self, 

240 multipart_threshold=8 * MB, 

241 max_concurrency=10, 

242 multipart_chunksize=8 * MB, 

243 num_download_attempts=5, 

244 max_io_queue=100, 

245 io_chunksize=256 * KB, 

246 use_threads=True, 

247 max_bandwidth=None, 

248 preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT, 

249 ): 

250 """Configuration object for managed S3 transfers 

251 

252 :param multipart_threshold: The transfer size threshold for which 

253 multipart uploads, downloads, and copies will automatically be 

254 triggered. 

255 

256 :param max_concurrency: The maximum number of threads that will be 

257 making requests to perform a transfer. If ``use_threads`` is 

258 set to ``False``, the value provided is ignored as the transfer 

259 will only ever use the main thread. 

260 

261 :param multipart_chunksize: The partition size of each part for a 

262 multipart transfer. 

263 

264 :param num_download_attempts: The number of download attempts that 

265 will be retried upon errors with downloading an object in S3. 

266 Note that these retries account for errors that occur when 

267 streaming down the data from s3 (i.e. socket errors and read 

268 timeouts that occur after receiving an OK response from s3). 

269 Other retryable exceptions such as throttling errors and 5xx 

270 errors are already retried by botocore (this default is 5). This 

271 does not take into account the number of exceptions retried by 

272 botocore. 

273 

274 :param max_io_queue: The maximum amount of read parts that can be 

275 queued in memory to be written for a download. The size of each 

276 of these read parts is at most the size of ``io_chunksize``. 

277 

278 :param io_chunksize: The max size of each chunk in the io queue. 

279 Currently, this is size used when ``read`` is called on the 

280 downloaded stream as well. 

281 

282 :param use_threads: If True, threads will be used when performing 

283 S3 transfers. If False, no threads will be used in 

284 performing transfers; all logic will be run in the main thread. 

285 

286 :param max_bandwidth: The maximum bandwidth that will be consumed 

287 in uploading and downloading file content. The value is an integer 

288 in terms of bytes per second. 

289 

290 :param preferred_transfer_client: String specifying preferred transfer 

291 client for transfer operations. 

292 

293 Current supported settings are: 

294 * auto (default) - Use the CRTTransferManager when calls 

295 are made with supported environment and settings. 

296 * classic - Only use the origin S3TransferManager with 

297 requests. Disables possible CRT upgrade on requests. 

298 """ 

299 super().__init__( 

300 multipart_threshold=multipart_threshold, 

301 max_request_concurrency=max_concurrency, 

302 multipart_chunksize=multipart_chunksize, 

303 num_download_attempts=num_download_attempts, 

304 max_io_queue_size=max_io_queue, 

305 io_chunksize=io_chunksize, 

306 max_bandwidth=max_bandwidth, 

307 ) 

308 # Some of the argument names are not the same as the inherited 

309 # S3TransferConfig so we add aliases so you can still access the 

310 # old version of the names. 

311 for alias in self.ALIAS: 

312 setattr(self, alias, getattr(self, self.ALIAS[alias])) 

313 self.use_threads = use_threads 

314 self.preferred_transfer_client = preferred_transfer_client 

315 

316 def __setattr__(self, name, value): 

317 # If the alias name is used, make sure we set the name that it points 

318 # to as that is what actually is used in governing the TransferManager. 

319 if name in self.ALIAS: 

320 super().__setattr__(self.ALIAS[name], value) 

321 # Always set the value of the actual name provided. 

322 super().__setattr__(name, value) 

323 

324 

325class S3Transfer: 

326 ALLOWED_DOWNLOAD_ARGS = TransferManager.ALLOWED_DOWNLOAD_ARGS 

327 ALLOWED_UPLOAD_ARGS = TransferManager.ALLOWED_UPLOAD_ARGS 

328 

329 def __init__(self, client=None, config=None, osutil=None, manager=None): 

330 if not client and not manager: 

331 raise ValueError( 

332 'Either a boto3.Client or s3transfer.manager.TransferManager ' 

333 'must be provided' 

334 ) 

335 if manager and any([client, config, osutil]): 

336 raise ValueError( 

337 'Manager cannot be provided with client, config, ' 

338 'nor osutil. These parameters are mutually exclusive.' 

339 ) 

340 if config is None: 

341 config = TransferConfig() 

342 if osutil is None: 

343 osutil = OSUtils() 

344 if manager: 

345 self._manager = manager 

346 else: 

347 self._manager = create_transfer_manager(client, config, osutil) 

348 

349 def upload_file( 

350 self, filename, bucket, key, callback=None, extra_args=None 

351 ): 

352 """Upload a file to an S3 object. 

353 

354 Variants have also been injected into S3 client, Bucket and Object. 

355 You don't have to use S3Transfer.upload_file() directly. 

356 

357 .. seealso:: 

358 :py:meth:`S3.Client.upload_file` 

359 :py:meth:`S3.Client.upload_fileobj` 

360 """ 

361 if isinstance(filename, PathLike): 

362 filename = fspath(filename) 

363 if not isinstance(filename, str): 

364 raise ValueError('Filename must be a string or a path-like object') 

365 

366 subscribers = self._get_subscribers(callback) 

367 future = self._manager.upload( 

368 filename, bucket, key, extra_args, subscribers 

369 ) 

370 try: 

371 future.result() 

372 # If a client error was raised, add the backwards compatibility layer 

373 # that raises a S3UploadFailedError. These specific errors were only 

374 # ever thrown for upload_parts but now can be thrown for any related 

375 # client error. 

376 except ClientError as e: 

377 raise S3UploadFailedError( 

378 "Failed to upload {} to {}: {}".format( 

379 filename, '/'.join([bucket, key]), e 

380 ) 

381 ) 

382 

383 def download_file( 

384 self, bucket, key, filename, extra_args=None, callback=None 

385 ): 

386 """Download an S3 object to a file. 

387 

388 Variants have also been injected into S3 client, Bucket and Object. 

389 You don't have to use S3Transfer.download_file() directly. 

390 

391 .. seealso:: 

392 :py:meth:`S3.Client.download_file` 

393 :py:meth:`S3.Client.download_fileobj` 

394 """ 

395 if isinstance(filename, PathLike): 

396 filename = fspath(filename) 

397 if not isinstance(filename, str): 

398 raise ValueError('Filename must be a string or a path-like object') 

399 

400 subscribers = self._get_subscribers(callback) 

401 future = self._manager.download( 

402 bucket, key, filename, extra_args, subscribers 

403 ) 

404 try: 

405 future.result() 

406 # This is for backwards compatibility where when retries are 

407 # exceeded we need to throw the same error from boto3 instead of 

408 # s3transfer's built in RetriesExceededError as current users are 

409 # catching the boto3 one instead of the s3transfer exception to do 

410 # their own retries. 

411 except S3TransferRetriesExceededError as e: 

412 raise RetriesExceededError(e.last_exception) 

413 

414 def _get_subscribers(self, callback): 

415 if not callback: 

416 return None 

417 return [ProgressCallbackInvoker(callback)] 

418 

419 def __enter__(self): 

420 return self 

421 

422 def __exit__(self, *args): 

423 self._manager.__exit__(*args) 

424 

425 

426class ProgressCallbackInvoker(BaseSubscriber): 

427 """A back-compat wrapper to invoke a provided callback via a subscriber 

428 

429 :param callback: A callable that takes a single positional argument for 

430 how many bytes were transferred. 

431 """ 

432 

433 def __init__(self, callback): 

434 self._callback = callback 

435 

436 def on_progress(self, bytes_transferred, **kwargs): 

437 self._callback(bytes_transferred)