Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/__init__.py: 29%

308 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Throttling based on max bandwidth 

22* Progress callbacks to monitor transfers 

23* Retries. While botocore handles retries for streaming uploads, 

24 it is not possible for it to handle retries for streaming 

25 downloads. This module handles retries for both cases so 

26 you don't need to implement any retry logic yourself. 

27 

28This module has a reasonable set of defaults. It also allows you 

29to configure many aspects of the transfer process including: 

30 

31* Multipart threshold size 

32* Max parallel downloads 

33* Max bandwidth 

34* Socket timeouts 

35* Retry amounts 

36 

37There is no support for s3->s3 multipart copies at this 

38time. 

39 

40 

41.. _ref_s3transfer_usage: 

42 

43Usage 

44===== 

45 

46The simplest way to use this module is: 

47 

48.. code-block:: python 

49 

50 client = boto3.client('s3', 'us-west-2') 

51 transfer = S3Transfer(client) 

52 # Upload /tmp/myfile to s3://bucket/key 

53 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

54 

55 # Download s3://bucket/key to /tmp/myfile 

56 transfer.download_file('bucket', 'key', '/tmp/myfile') 

57 

58The ``upload_file`` and ``download_file`` methods also accept 

59``**kwargs``, which will be forwarded through to the corresponding 

60client operation. Here are a few examples using ``upload_file``:: 

61 

62 # Making the object public 

63 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

64 extra_args={'ACL': 'public-read'}) 

65 

66 # Setting metadata 

67 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

69 

70 # Setting content type 

71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

72 extra_args={'ContentType': "application/json"}) 

73 

74 

75The ``S3Transfer`` class also supports progress callbacks so you can 

76provide transfer progress to users. Both the ``upload_file`` and 

77``download_file`` methods take an optional ``callback`` parameter. 

78Here's an example of how to print a simple progress percentage 

79to the user: 

80 

81.. code-block:: python 

82 

83 class ProgressPercentage(object): 

84 def __init__(self, filename): 

85 self._filename = filename 

86 self._size = float(os.path.getsize(filename)) 

87 self._seen_so_far = 0 

88 self._lock = threading.Lock() 

89 

90 def __call__(self, bytes_amount): 

91 # To simplify we'll assume this is hooked up 

92 # to a single filename. 

93 with self._lock: 

94 self._seen_so_far += bytes_amount 

95 percentage = (self._seen_so_far / self._size) * 100 

96 sys.stdout.write( 

97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, 

98 self._size, percentage)) 

99 sys.stdout.flush() 

100 

101 

102 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

103 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

104 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

105 callback=ProgressPercentage('/tmp/myfile')) 

106 

107 

108 

109You can also provide a TransferConfig object to the S3Transfer 

110object that gives you more fine grained control over the 

111transfer. For example: 

112 

113.. code-block:: python 

114 

115 client = boto3.client('s3', 'us-west-2') 

116 config = TransferConfig( 

117 multipart_threshold=8 * 1024 * 1024, 

118 max_concurrency=10, 

119 num_download_attempts=10, 

120 ) 

121 transfer = S3Transfer(client, config) 

122 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

123 

124 

125""" 

126import concurrent.futures 

127import functools 

128import logging 

129import math 

130import os 

131import queue 

132import random 

133import socket 

134import string 

135import threading 

136 

137from botocore.compat import six # noqa: F401 

138from botocore.exceptions import IncompleteReadError 

139from botocore.vendored.requests.packages.urllib3.exceptions import ( 

140 ReadTimeoutError, 

141) 

142 

143import s3transfer.compat 

144from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError 

145 

146__author__ = 'Amazon Web Services' 

147__version__ = '0.8.2' 

148 

149 

150class NullHandler(logging.Handler): 

151 def emit(self, record): 

152 pass 

153 

154 

155logger = logging.getLogger(__name__) 

156logger.addHandler(NullHandler()) 

157 

158MB = 1024 * 1024 

159SHUTDOWN_SENTINEL = object() 

160 

161 

162def random_file_extension(num_digits=8): 

163 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

164 

165 

166def disable_upload_callbacks(request, operation_name, **kwargs): 

167 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

168 request.body, 'disable_callback' 

169 ): 

170 request.body.disable_callback() 

171 

172 

173def enable_upload_callbacks(request, operation_name, **kwargs): 

174 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

175 request.body, 'enable_callback' 

176 ): 

177 request.body.enable_callback() 

178 

179 

180class QueueShutdownError(Exception): 

181 pass 

182 

183 

184class ReadFileChunk: 

185 def __init__( 

186 self, 

187 fileobj, 

188 start_byte, 

189 chunk_size, 

190 full_file_size, 

191 callback=None, 

192 enable_callback=True, 

193 ): 

194 """ 

195 

196 Given a file object shown below: 

197 

198 |___________________________________________________| 

199 0 | | full_file_size 

200 |----chunk_size---| 

201 start_byte 

202 

203 :type fileobj: file 

204 :param fileobj: File like object 

205 

206 :type start_byte: int 

207 :param start_byte: The first byte from which to start reading. 

208 

209 :type chunk_size: int 

210 :param chunk_size: The max chunk size to read. Trying to read 

211 pass the end of the chunk size will behave like you've 

212 reached the end of the file. 

213 

214 :type full_file_size: int 

215 :param full_file_size: The entire content length associated 

216 with ``fileobj``. 

217 

218 :type callback: function(amount_read) 

219 :param callback: Called whenever data is read from this object. 

220 

221 """ 

222 self._fileobj = fileobj 

223 self._start_byte = start_byte 

224 self._size = self._calculate_file_size( 

225 self._fileobj, 

226 requested_size=chunk_size, 

227 start_byte=start_byte, 

228 actual_file_size=full_file_size, 

229 ) 

230 self._fileobj.seek(self._start_byte) 

231 self._amount_read = 0 

232 self._callback = callback 

233 self._callback_enabled = enable_callback 

234 

235 @classmethod 

236 def from_filename( 

237 cls, 

238 filename, 

239 start_byte, 

240 chunk_size, 

241 callback=None, 

242 enable_callback=True, 

243 ): 

244 """Convenience factory function to create from a filename. 

245 

246 :type start_byte: int 

247 :param start_byte: The first byte from which to start reading. 

248 

249 :type chunk_size: int 

250 :param chunk_size: The max chunk size to read. Trying to read 

251 pass the end of the chunk size will behave like you've 

252 reached the end of the file. 

253 

254 :type full_file_size: int 

255 :param full_file_size: The entire content length associated 

256 with ``fileobj``. 

257 

258 :type callback: function(amount_read) 

259 :param callback: Called whenever data is read from this object. 

260 

261 :type enable_callback: bool 

262 :param enable_callback: Indicate whether to invoke callback 

263 during read() calls. 

264 

265 :rtype: ``ReadFileChunk`` 

266 :return: A new instance of ``ReadFileChunk`` 

267 

268 """ 

269 f = open(filename, 'rb') 

270 file_size = os.fstat(f.fileno()).st_size 

271 return cls( 

272 f, start_byte, chunk_size, file_size, callback, enable_callback 

273 ) 

274 

275 def _calculate_file_size( 

276 self, fileobj, requested_size, start_byte, actual_file_size 

277 ): 

278 max_chunk_size = actual_file_size - start_byte 

279 return min(max_chunk_size, requested_size) 

280 

281 def read(self, amount=None): 

282 if amount is None: 

283 amount_to_read = self._size - self._amount_read 

284 else: 

285 amount_to_read = min(self._size - self._amount_read, amount) 

286 data = self._fileobj.read(amount_to_read) 

287 self._amount_read += len(data) 

288 if self._callback is not None and self._callback_enabled: 

289 self._callback(len(data)) 

290 return data 

291 

292 def enable_callback(self): 

293 self._callback_enabled = True 

294 

295 def disable_callback(self): 

296 self._callback_enabled = False 

297 

298 def seek(self, where): 

299 self._fileobj.seek(self._start_byte + where) 

300 if self._callback is not None and self._callback_enabled: 

301 # To also rewind the callback() for an accurate progress report 

302 self._callback(where - self._amount_read) 

303 self._amount_read = where 

304 

305 def close(self): 

306 self._fileobj.close() 

307 

308 def tell(self): 

309 return self._amount_read 

310 

311 def __len__(self): 

312 # __len__ is defined because requests will try to determine the length 

313 # of the stream to set a content length. In the normal case 

314 # of the file it will just stat the file, but we need to change that 

315 # behavior. By providing a __len__, requests will use that instead 

316 # of stat'ing the file. 

317 return self._size 

318 

319 def __enter__(self): 

320 return self 

321 

322 def __exit__(self, *args, **kwargs): 

323 self.close() 

324 

325 def __iter__(self): 

326 # This is a workaround for http://bugs.python.org/issue17575 

327 # Basically httplib will try to iterate over the contents, even 

328 # if its a file like object. This wasn't noticed because we've 

329 # already exhausted the stream so iterating over the file immediately 

330 # stops, which is what we're simulating here. 

331 return iter([]) 

332 

333 

334class StreamReaderProgress: 

335 """Wrapper for a read only stream that adds progress callbacks.""" 

336 

337 def __init__(self, stream, callback=None): 

338 self._stream = stream 

339 self._callback = callback 

340 

341 def read(self, *args, **kwargs): 

342 value = self._stream.read(*args, **kwargs) 

343 if self._callback is not None: 

344 self._callback(len(value)) 

345 return value 

346 

347 

348class OSUtils: 

349 def get_file_size(self, filename): 

350 return os.path.getsize(filename) 

351 

352 def open_file_chunk_reader(self, filename, start_byte, size, callback): 

353 return ReadFileChunk.from_filename( 

354 filename, start_byte, size, callback, enable_callback=False 

355 ) 

356 

357 def open(self, filename, mode): 

358 return open(filename, mode) 

359 

360 def remove_file(self, filename): 

361 """Remove a file, noop if file does not exist.""" 

362 # Unlike os.remove, if the file does not exist, 

363 # then this method does nothing. 

364 try: 

365 os.remove(filename) 

366 except OSError: 

367 pass 

368 

369 def rename_file(self, current_filename, new_filename): 

370 s3transfer.compat.rename_file(current_filename, new_filename) 

371 

372 

373class MultipartUploader: 

374 # These are the extra_args that need to be forwarded onto 

375 # subsequent upload_parts. 

376 UPLOAD_PART_ARGS = [ 

377 'SSECustomerKey', 

378 'SSECustomerAlgorithm', 

379 'SSECustomerKeyMD5', 

380 'RequestPayer', 

381 ] 

382 

383 def __init__( 

384 self, 

385 client, 

386 config, 

387 osutil, 

388 executor_cls=concurrent.futures.ThreadPoolExecutor, 

389 ): 

390 self._client = client 

391 self._config = config 

392 self._os = osutil 

393 self._executor_cls = executor_cls 

394 

395 def _extra_upload_part_args(self, extra_args): 

396 # Only the args in UPLOAD_PART_ARGS actually need to be passed 

397 # onto the upload_part calls. 

398 upload_parts_args = {} 

399 for key, value in extra_args.items(): 

400 if key in self.UPLOAD_PART_ARGS: 

401 upload_parts_args[key] = value 

402 return upload_parts_args 

403 

404 def upload_file(self, filename, bucket, key, callback, extra_args): 

405 response = self._client.create_multipart_upload( 

406 Bucket=bucket, Key=key, **extra_args 

407 ) 

408 upload_id = response['UploadId'] 

409 try: 

410 parts = self._upload_parts( 

411 upload_id, filename, bucket, key, callback, extra_args 

412 ) 

413 except Exception as e: 

414 logger.debug( 

415 "Exception raised while uploading parts, " 

416 "aborting multipart upload.", 

417 exc_info=True, 

418 ) 

419 self._client.abort_multipart_upload( 

420 Bucket=bucket, Key=key, UploadId=upload_id 

421 ) 

422 raise S3UploadFailedError( 

423 "Failed to upload {} to {}: {}".format( 

424 filename, '/'.join([bucket, key]), e 

425 ) 

426 ) 

427 self._client.complete_multipart_upload( 

428 Bucket=bucket, 

429 Key=key, 

430 UploadId=upload_id, 

431 MultipartUpload={'Parts': parts}, 

432 ) 

433 

434 def _upload_parts( 

435 self, upload_id, filename, bucket, key, callback, extra_args 

436 ): 

437 upload_parts_extra_args = self._extra_upload_part_args(extra_args) 

438 parts = [] 

439 part_size = self._config.multipart_chunksize 

440 num_parts = int( 

441 math.ceil(self._os.get_file_size(filename) / float(part_size)) 

442 ) 

443 max_workers = self._config.max_concurrency 

444 with self._executor_cls(max_workers=max_workers) as executor: 

445 upload_partial = functools.partial( 

446 self._upload_one_part, 

447 filename, 

448 bucket, 

449 key, 

450 upload_id, 

451 part_size, 

452 upload_parts_extra_args, 

453 callback, 

454 ) 

455 for part in executor.map(upload_partial, range(1, num_parts + 1)): 

456 parts.append(part) 

457 return parts 

458 

459 def _upload_one_part( 

460 self, 

461 filename, 

462 bucket, 

463 key, 

464 upload_id, 

465 part_size, 

466 extra_args, 

467 callback, 

468 part_number, 

469 ): 

470 open_chunk_reader = self._os.open_file_chunk_reader 

471 with open_chunk_reader( 

472 filename, part_size * (part_number - 1), part_size, callback 

473 ) as body: 

474 response = self._client.upload_part( 

475 Bucket=bucket, 

476 Key=key, 

477 UploadId=upload_id, 

478 PartNumber=part_number, 

479 Body=body, 

480 **extra_args, 

481 ) 

482 etag = response['ETag'] 

483 return {'ETag': etag, 'PartNumber': part_number} 

484 

485 

486class ShutdownQueue(queue.Queue): 

487 """A queue implementation that can be shutdown. 

488 

489 Shutting down a queue means that this class adds a 

490 trigger_shutdown method that will trigger all subsequent 

491 calls to put() to fail with a ``QueueShutdownError``. 

492 

493 It purposefully deviates from queue.Queue, and is *not* meant 

494 to be a drop in replacement for ``queue.Queue``. 

495 

496 """ 

497 

498 def _init(self, maxsize): 

499 self._shutdown = False 

500 self._shutdown_lock = threading.Lock() 

501 # queue.Queue is an old style class so we don't use super(). 

502 return queue.Queue._init(self, maxsize) 

503 

504 def trigger_shutdown(self): 

505 with self._shutdown_lock: 

506 self._shutdown = True 

507 logger.debug("The IO queue is now shutdown.") 

508 

509 def put(self, item): 

510 # Note: this is not sufficient, it's still possible to deadlock! 

511 # Need to hook into the condition vars used by this class. 

512 with self._shutdown_lock: 

513 if self._shutdown: 

514 raise QueueShutdownError( 

515 "Cannot put item to queue when " "queue has been shutdown." 

516 ) 

517 return queue.Queue.put(self, item) 

518 

519 

520class MultipartDownloader: 

521 def __init__( 

522 self, 

523 client, 

524 config, 

525 osutil, 

526 executor_cls=concurrent.futures.ThreadPoolExecutor, 

527 ): 

528 self._client = client 

529 self._config = config 

530 self._os = osutil 

531 self._executor_cls = executor_cls 

532 self._ioqueue = ShutdownQueue(self._config.max_io_queue) 

533 

534 def download_file( 

535 self, bucket, key, filename, object_size, extra_args, callback=None 

536 ): 

537 with self._executor_cls(max_workers=2) as controller: 

538 # 1 thread for the future that manages the uploading of files 

539 # 1 thread for the future that manages IO writes. 

540 download_parts_handler = functools.partial( 

541 self._download_file_as_future, 

542 bucket, 

543 key, 

544 filename, 

545 object_size, 

546 callback, 

547 ) 

548 parts_future = controller.submit(download_parts_handler) 

549 

550 io_writes_handler = functools.partial( 

551 self._perform_io_writes, filename 

552 ) 

553 io_future = controller.submit(io_writes_handler) 

554 results = concurrent.futures.wait( 

555 [parts_future, io_future], 

556 return_when=concurrent.futures.FIRST_EXCEPTION, 

557 ) 

558 self._process_future_results(results) 

559 

560 def _process_future_results(self, futures): 

561 finished, unfinished = futures 

562 for future in finished: 

563 future.result() 

564 

565 def _download_file_as_future( 

566 self, bucket, key, filename, object_size, callback 

567 ): 

568 part_size = self._config.multipart_chunksize 

569 num_parts = int(math.ceil(object_size / float(part_size))) 

570 max_workers = self._config.max_concurrency 

571 download_partial = functools.partial( 

572 self._download_range, 

573 bucket, 

574 key, 

575 filename, 

576 part_size, 

577 num_parts, 

578 callback, 

579 ) 

580 try: 

581 with self._executor_cls(max_workers=max_workers) as executor: 

582 list(executor.map(download_partial, range(num_parts))) 

583 finally: 

584 self._ioqueue.put(SHUTDOWN_SENTINEL) 

585 

586 def _calculate_range_param(self, part_size, part_index, num_parts): 

587 start_range = part_index * part_size 

588 if part_index == num_parts - 1: 

589 end_range = '' 

590 else: 

591 end_range = start_range + part_size - 1 

592 range_param = f'bytes={start_range}-{end_range}' 

593 return range_param 

594 

595 def _download_range( 

596 self, bucket, key, filename, part_size, num_parts, callback, part_index 

597 ): 

598 try: 

599 range_param = self._calculate_range_param( 

600 part_size, part_index, num_parts 

601 ) 

602 

603 max_attempts = self._config.num_download_attempts 

604 last_exception = None 

605 for i in range(max_attempts): 

606 try: 

607 logger.debug("Making get_object call.") 

608 response = self._client.get_object( 

609 Bucket=bucket, Key=key, Range=range_param 

610 ) 

611 streaming_body = StreamReaderProgress( 

612 response['Body'], callback 

613 ) 

614 buffer_size = 1024 * 16 

615 current_index = part_size * part_index 

616 for chunk in iter( 

617 lambda: streaming_body.read(buffer_size), b'' 

618 ): 

619 self._ioqueue.put((current_index, chunk)) 

620 current_index += len(chunk) 

621 return 

622 except ( 

623 socket.timeout, 

624 OSError, 

625 ReadTimeoutError, 

626 IncompleteReadError, 

627 ) as e: 

628 logger.debug( 

629 "Retrying exception caught (%s), " 

630 "retrying request, (attempt %s / %s)", 

631 e, 

632 i, 

633 max_attempts, 

634 exc_info=True, 

635 ) 

636 last_exception = e 

637 continue 

638 raise RetriesExceededError(last_exception) 

639 finally: 

640 logger.debug("EXITING _download_range for part: %s", part_index) 

641 

642 def _perform_io_writes(self, filename): 

643 with self._os.open(filename, 'wb') as f: 

644 while True: 

645 task = self._ioqueue.get() 

646 if task is SHUTDOWN_SENTINEL: 

647 logger.debug( 

648 "Shutdown sentinel received in IO handler, " 

649 "shutting down IO handler." 

650 ) 

651 return 

652 else: 

653 try: 

654 offset, data = task 

655 f.seek(offset) 

656 f.write(data) 

657 except Exception as e: 

658 logger.debug( 

659 "Caught exception in IO thread: %s", 

660 e, 

661 exc_info=True, 

662 ) 

663 self._ioqueue.trigger_shutdown() 

664 raise 

665 

666 

667class TransferConfig: 

668 def __init__( 

669 self, 

670 multipart_threshold=8 * MB, 

671 max_concurrency=10, 

672 multipart_chunksize=8 * MB, 

673 num_download_attempts=5, 

674 max_io_queue=100, 

675 ): 

676 self.multipart_threshold = multipart_threshold 

677 self.max_concurrency = max_concurrency 

678 self.multipart_chunksize = multipart_chunksize 

679 self.num_download_attempts = num_download_attempts 

680 self.max_io_queue = max_io_queue 

681 

682 

683class S3Transfer: 

684 

685 ALLOWED_DOWNLOAD_ARGS = [ 

686 'VersionId', 

687 'SSECustomerAlgorithm', 

688 'SSECustomerKey', 

689 'SSECustomerKeyMD5', 

690 'RequestPayer', 

691 ] 

692 

693 ALLOWED_UPLOAD_ARGS = [ 

694 'ACL', 

695 'CacheControl', 

696 'ContentDisposition', 

697 'ContentEncoding', 

698 'ContentLanguage', 

699 'ContentType', 

700 'Expires', 

701 'GrantFullControl', 

702 'GrantRead', 

703 'GrantReadACP', 

704 'GrantWriteACL', 

705 'Metadata', 

706 'RequestPayer', 

707 'ServerSideEncryption', 

708 'StorageClass', 

709 'SSECustomerAlgorithm', 

710 'SSECustomerKey', 

711 'SSECustomerKeyMD5', 

712 'SSEKMSKeyId', 

713 'SSEKMSEncryptionContext', 

714 'Tagging', 

715 ] 

716 

717 def __init__(self, client, config=None, osutil=None): 

718 self._client = client 

719 if config is None: 

720 config = TransferConfig() 

721 self._config = config 

722 if osutil is None: 

723 osutil = OSUtils() 

724 self._osutil = osutil 

725 

726 def upload_file( 

727 self, filename, bucket, key, callback=None, extra_args=None 

728 ): 

729 """Upload a file to an S3 object. 

730 

731 Variants have also been injected into S3 client, Bucket and Object. 

732 You don't have to use S3Transfer.upload_file() directly. 

733 """ 

734 if extra_args is None: 

735 extra_args = {} 

736 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

737 events = self._client.meta.events 

738 events.register_first( 

739 'request-created.s3', 

740 disable_upload_callbacks, 

741 unique_id='s3upload-callback-disable', 

742 ) 

743 events.register_last( 

744 'request-created.s3', 

745 enable_upload_callbacks, 

746 unique_id='s3upload-callback-enable', 

747 ) 

748 if ( 

749 self._osutil.get_file_size(filename) 

750 >= self._config.multipart_threshold 

751 ): 

752 self._multipart_upload(filename, bucket, key, callback, extra_args) 

753 else: 

754 self._put_object(filename, bucket, key, callback, extra_args) 

755 

756 def _put_object(self, filename, bucket, key, callback, extra_args): 

757 # We're using open_file_chunk_reader so we can take advantage of the 

758 # progress callback functionality. 

759 open_chunk_reader = self._osutil.open_file_chunk_reader 

760 with open_chunk_reader( 

761 filename, 

762 0, 

763 self._osutil.get_file_size(filename), 

764 callback=callback, 

765 ) as body: 

766 self._client.put_object( 

767 Bucket=bucket, Key=key, Body=body, **extra_args 

768 ) 

769 

770 def download_file( 

771 self, bucket, key, filename, extra_args=None, callback=None 

772 ): 

773 """Download an S3 object to a file. 

774 

775 Variants have also been injected into S3 client, Bucket and Object. 

776 You don't have to use S3Transfer.download_file() directly. 

777 """ 

778 # This method will issue a ``head_object`` request to determine 

779 # the size of the S3 object. This is used to determine if the 

780 # object is downloaded in parallel. 

781 if extra_args is None: 

782 extra_args = {} 

783 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

784 object_size = self._object_size(bucket, key, extra_args) 

785 temp_filename = filename + os.extsep + random_file_extension() 

786 try: 

787 self._download_file( 

788 bucket, key, temp_filename, object_size, extra_args, callback 

789 ) 

790 except Exception: 

791 logger.debug( 

792 "Exception caught in download_file, removing partial " 

793 "file: %s", 

794 temp_filename, 

795 exc_info=True, 

796 ) 

797 self._osutil.remove_file(temp_filename) 

798 raise 

799 else: 

800 self._osutil.rename_file(temp_filename, filename) 

801 

802 def _download_file( 

803 self, bucket, key, filename, object_size, extra_args, callback 

804 ): 

805 if object_size >= self._config.multipart_threshold: 

806 self._ranged_download( 

807 bucket, key, filename, object_size, extra_args, callback 

808 ) 

809 else: 

810 self._get_object(bucket, key, filename, extra_args, callback) 

811 

812 def _validate_all_known_args(self, actual, allowed): 

813 for kwarg in actual: 

814 if kwarg not in allowed: 

815 raise ValueError( 

816 "Invalid extra_args key '%s', " 

817 "must be one of: %s" % (kwarg, ', '.join(allowed)) 

818 ) 

819 

820 def _ranged_download( 

821 self, bucket, key, filename, object_size, extra_args, callback 

822 ): 

823 downloader = MultipartDownloader( 

824 self._client, self._config, self._osutil 

825 ) 

826 downloader.download_file( 

827 bucket, key, filename, object_size, extra_args, callback 

828 ) 

829 

830 def _get_object(self, bucket, key, filename, extra_args, callback): 

831 # precondition: num_download_attempts > 0 

832 max_attempts = self._config.num_download_attempts 

833 last_exception = None 

834 for i in range(max_attempts): 

835 try: 

836 return self._do_get_object( 

837 bucket, key, filename, extra_args, callback 

838 ) 

839 except ( 

840 socket.timeout, 

841 OSError, 

842 ReadTimeoutError, 

843 IncompleteReadError, 

844 ) as e: 

845 # TODO: we need a way to reset the callback if the 

846 # download failed. 

847 logger.debug( 

848 "Retrying exception caught (%s), " 

849 "retrying request, (attempt %s / %s)", 

850 e, 

851 i, 

852 max_attempts, 

853 exc_info=True, 

854 ) 

855 last_exception = e 

856 continue 

857 raise RetriesExceededError(last_exception) 

858 

859 def _do_get_object(self, bucket, key, filename, extra_args, callback): 

860 response = self._client.get_object( 

861 Bucket=bucket, Key=key, **extra_args 

862 ) 

863 streaming_body = StreamReaderProgress(response['Body'], callback) 

864 with self._osutil.open(filename, 'wb') as f: 

865 for chunk in iter(lambda: streaming_body.read(8192), b''): 

866 f.write(chunk) 

867 

868 def _object_size(self, bucket, key, extra_args): 

869 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[ 

870 'ContentLength' 

871 ] 

872 

873 def _multipart_upload(self, filename, bucket, key, callback, extra_args): 

874 uploader = MultipartUploader(self._client, self._config, self._osutil) 

875 uploader.upload_file(filename, bucket, key, callback, extra_args)