Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/__init__.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

316 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Throttling based on max bandwidth 

22* Progress callbacks to monitor transfers 

23* Retries. While botocore handles retries for streaming uploads, 

24 it is not possible for it to handle retries for streaming 

25 downloads. This module handles retries for both cases so 

26 you don't need to implement any retry logic yourself. 

27 

28This module has a reasonable set of defaults. It also allows you 

29to configure many aspects of the transfer process including: 

30 

31* Multipart threshold size 

32* Max parallel downloads 

33* Max bandwidth 

34* Socket timeouts 

35* Retry amounts 

36 

37There is no support for s3->s3 multipart copies at this 

38time. 

39 

40 

41.. _ref_s3transfer_usage: 

42 

43Usage 

44===== 

45 

46The simplest way to use this module is: 

47 

48.. code-block:: python 

49 

50 client = boto3.client('s3', 'us-west-2') 

51 transfer = S3Transfer(client) 

52 # Upload /tmp/myfile to s3://bucket/key 

53 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

54 

55 # Download s3://bucket/key to /tmp/myfile 

56 transfer.download_file('bucket', 'key', '/tmp/myfile') 

57 

58The ``upload_file`` and ``download_file`` methods also accept 

59``**kwargs``, which will be forwarded through to the corresponding 

60client operation. Here are a few examples using ``upload_file``:: 

61 

62 # Making the object public 

63 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

64 extra_args={'ACL': 'public-read'}) 

65 

66 # Setting metadata 

67 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

69 

70 # Setting content type 

71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

72 extra_args={'ContentType': "application/json"}) 

73 

74 

75The ``S3Transfer`` class also supports progress callbacks so you can 

76provide transfer progress to users. Both the ``upload_file`` and 

77``download_file`` methods take an optional ``callback`` parameter. 

78Here's an example of how to print a simple progress percentage 

79to the user: 

80 

81.. code-block:: python 

82 

83 class ProgressPercentage(object): 

84 def __init__(self, filename): 

85 self._filename = filename 

86 self._size = float(os.path.getsize(filename)) 

87 self._seen_so_far = 0 

88 self._lock = threading.Lock() 

89 

90 def __call__(self, bytes_amount): 

91 # To simplify we'll assume this is hooked up 

92 # to a single filename. 

93 with self._lock: 

94 self._seen_so_far += bytes_amount 

95 percentage = (self._seen_so_far / self._size) * 100 

96 sys.stdout.write( 

97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, 

98 self._size, percentage)) 

99 sys.stdout.flush() 

100 

101 

102 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

103 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

104 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

105 callback=ProgressPercentage('/tmp/myfile')) 

106 

107 

108 

109You can also provide a TransferConfig object to the S3Transfer 

110object that gives you more fine grained control over the 

111transfer. For example: 

112 

113.. code-block:: python 

114 

115 client = boto3.client('s3', 'us-west-2') 

116 config = TransferConfig( 

117 multipart_threshold=8 * 1024 * 1024, 

118 max_concurrency=10, 

119 num_download_attempts=10, 

120 ) 

121 transfer = S3Transfer(client, config) 

122 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

123 

124 

125""" 

126 

127import concurrent.futures 

128import functools 

129import logging 

130import math 

131import os 

132import queue 

133import random 

134import socket 

135import string 

136import threading 

137 

138from botocore.compat import six # noqa: F401 

139from botocore.exceptions import IncompleteReadError, ResponseStreamingError 

140from botocore.vendored.requests.packages.urllib3.exceptions import ( 

141 ReadTimeoutError, 

142) 

143 

144import s3transfer.compat 

145from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError 

146 

147__author__ = 'Amazon Web Services' 

148__version__ = '0.13.0' 

149 

150 

151class NullHandler(logging.Handler): 

152 def emit(self, record): 

153 pass 

154 

155 

156logger = logging.getLogger(__name__) 

157logger.addHandler(NullHandler()) 

158 

159MB = 1024 * 1024 

160SHUTDOWN_SENTINEL = object() 

161 

162 

163def random_file_extension(num_digits=8): 

164 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

165 

166 

167def disable_upload_callbacks(request, operation_name, **kwargs): 

168 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

169 request.body, 'disable_callback' 

170 ): 

171 request.body.disable_callback() 

172 

173 

174def enable_upload_callbacks(request, operation_name, **kwargs): 

175 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

176 request.body, 'enable_callback' 

177 ): 

178 request.body.enable_callback() 

179 

180 

181class QueueShutdownError(Exception): 

182 pass 

183 

184 

185class ReadFileChunk: 

186 def __init__( 

187 self, 

188 fileobj, 

189 start_byte, 

190 chunk_size, 

191 full_file_size, 

192 callback=None, 

193 enable_callback=True, 

194 ): 

195 """ 

196 

197 Given a file object shown below: 

198 

199 |___________________________________________________| 

200 0 | | full_file_size 

201 |----chunk_size---| 

202 start_byte 

203 

204 :type fileobj: file 

205 :param fileobj: File like object 

206 

207 :type start_byte: int 

208 :param start_byte: The first byte from which to start reading. 

209 

210 :type chunk_size: int 

211 :param chunk_size: The max chunk size to read. Trying to read 

212 pass the end of the chunk size will behave like you've 

213 reached the end of the file. 

214 

215 :type full_file_size: int 

216 :param full_file_size: The entire content length associated 

217 with ``fileobj``. 

218 

219 :type callback: function(amount_read) 

220 :param callback: Called whenever data is read from this object. 

221 

222 """ 

223 self._fileobj = fileobj 

224 self._start_byte = start_byte 

225 self._size = self._calculate_file_size( 

226 self._fileobj, 

227 requested_size=chunk_size, 

228 start_byte=start_byte, 

229 actual_file_size=full_file_size, 

230 ) 

231 self._fileobj.seek(self._start_byte) 

232 self._amount_read = 0 

233 self._callback = callback 

234 self._callback_enabled = enable_callback 

235 

236 @classmethod 

237 def from_filename( 

238 cls, 

239 filename, 

240 start_byte, 

241 chunk_size, 

242 callback=None, 

243 enable_callback=True, 

244 ): 

245 """Convenience factory function to create from a filename. 

246 

247 :type start_byte: int 

248 :param start_byte: The first byte from which to start reading. 

249 

250 :type chunk_size: int 

251 :param chunk_size: The max chunk size to read. Trying to read 

252 pass the end of the chunk size will behave like you've 

253 reached the end of the file. 

254 

255 :type full_file_size: int 

256 :param full_file_size: The entire content length associated 

257 with ``fileobj``. 

258 

259 :type callback: function(amount_read) 

260 :param callback: Called whenever data is read from this object. 

261 

262 :type enable_callback: bool 

263 :param enable_callback: Indicate whether to invoke callback 

264 during read() calls. 

265 

266 :rtype: ``ReadFileChunk`` 

267 :return: A new instance of ``ReadFileChunk`` 

268 

269 """ 

270 f = open(filename, 'rb') 

271 file_size = os.fstat(f.fileno()).st_size 

272 return cls( 

273 f, start_byte, chunk_size, file_size, callback, enable_callback 

274 ) 

275 

276 def _calculate_file_size( 

277 self, fileobj, requested_size, start_byte, actual_file_size 

278 ): 

279 max_chunk_size = actual_file_size - start_byte 

280 return min(max_chunk_size, requested_size) 

281 

282 def read(self, amount=None): 

283 if amount is None: 

284 amount_to_read = self._size - self._amount_read 

285 else: 

286 amount_to_read = min(self._size - self._amount_read, amount) 

287 data = self._fileobj.read(amount_to_read) 

288 self._amount_read += len(data) 

289 if self._callback is not None and self._callback_enabled: 

290 self._callback(len(data)) 

291 return data 

292 

293 def enable_callback(self): 

294 self._callback_enabled = True 

295 

296 def disable_callback(self): 

297 self._callback_enabled = False 

298 

299 def seek(self, where): 

300 self._fileobj.seek(self._start_byte + where) 

301 if self._callback is not None and self._callback_enabled: 

302 # To also rewind the callback() for an accurate progress report 

303 self._callback(where - self._amount_read) 

304 self._amount_read = where 

305 

306 def close(self): 

307 self._fileobj.close() 

308 

309 def tell(self): 

310 return self._amount_read 

311 

312 def __len__(self): 

313 # __len__ is defined because requests will try to determine the length 

314 # of the stream to set a content length. In the normal case 

315 # of the file it will just stat the file, but we need to change that 

316 # behavior. By providing a __len__, requests will use that instead 

317 # of stat'ing the file. 

318 return self._size 

319 

320 def __enter__(self): 

321 return self 

322 

323 def __exit__(self, *args, **kwargs): 

324 self.close() 

325 

326 def __iter__(self): 

327 # This is a workaround for http://bugs.python.org/issue17575 

328 # Basically httplib will try to iterate over the contents, even 

329 # if its a file like object. This wasn't noticed because we've 

330 # already exhausted the stream so iterating over the file immediately 

331 # stops, which is what we're simulating here. 

332 return iter([]) 

333 

334 

335class StreamReaderProgress: 

336 """Wrapper for a read only stream that adds progress callbacks.""" 

337 

338 def __init__(self, stream, callback=None): 

339 self._stream = stream 

340 self._callback = callback 

341 

342 def read(self, *args, **kwargs): 

343 value = self._stream.read(*args, **kwargs) 

344 if self._callback is not None: 

345 self._callback(len(value)) 

346 return value 

347 

348 

349class OSUtils: 

350 def get_file_size(self, filename): 

351 return os.path.getsize(filename) 

352 

353 def open_file_chunk_reader(self, filename, start_byte, size, callback): 

354 return ReadFileChunk.from_filename( 

355 filename, start_byte, size, callback, enable_callback=False 

356 ) 

357 

358 def open(self, filename, mode): 

359 return open(filename, mode) 

360 

361 def remove_file(self, filename): 

362 """Remove a file, noop if file does not exist.""" 

363 # Unlike os.remove, if the file does not exist, 

364 # then this method does nothing. 

365 try: 

366 os.remove(filename) 

367 except OSError: 

368 pass 

369 

370 def rename_file(self, current_filename, new_filename): 

371 s3transfer.compat.rename_file(current_filename, new_filename) 

372 

373 

374class MultipartUploader: 

375 # These are the extra_args that need to be forwarded onto 

376 # subsequent upload_parts. 

377 UPLOAD_PART_ARGS = [ 

378 'SSECustomerKey', 

379 'SSECustomerAlgorithm', 

380 'SSECustomerKeyMD5', 

381 'RequestPayer', 

382 ] 

383 

384 def __init__( 

385 self, 

386 client, 

387 config, 

388 osutil, 

389 executor_cls=concurrent.futures.ThreadPoolExecutor, 

390 ): 

391 self._client = client 

392 self._config = config 

393 self._os = osutil 

394 self._executor_cls = executor_cls 

395 

396 def _extra_upload_part_args(self, extra_args): 

397 # Only the args in UPLOAD_PART_ARGS actually need to be passed 

398 # onto the upload_part calls. 

399 upload_parts_args = {} 

400 for key, value in extra_args.items(): 

401 if key in self.UPLOAD_PART_ARGS: 

402 upload_parts_args[key] = value 

403 return upload_parts_args 

404 

405 def upload_file(self, filename, bucket, key, callback, extra_args): 

406 response = self._client.create_multipart_upload( 

407 Bucket=bucket, Key=key, **extra_args 

408 ) 

409 upload_id = response['UploadId'] 

410 try: 

411 parts = self._upload_parts( 

412 upload_id, filename, bucket, key, callback, extra_args 

413 ) 

414 except Exception as e: 

415 logger.debug( 

416 "Exception raised while uploading parts, " 

417 "aborting multipart upload.", 

418 exc_info=True, 

419 ) 

420 self._client.abort_multipart_upload( 

421 Bucket=bucket, Key=key, UploadId=upload_id 

422 ) 

423 raise S3UploadFailedError( 

424 "Failed to upload {} to {}: {}".format( 

425 filename, '/'.join([bucket, key]), e 

426 ) 

427 ) 

428 self._client.complete_multipart_upload( 

429 Bucket=bucket, 

430 Key=key, 

431 UploadId=upload_id, 

432 MultipartUpload={'Parts': parts}, 

433 ) 

434 

435 def _upload_parts( 

436 self, upload_id, filename, bucket, key, callback, extra_args 

437 ): 

438 upload_parts_extra_args = self._extra_upload_part_args(extra_args) 

439 parts = [] 

440 part_size = self._config.multipart_chunksize 

441 num_parts = int( 

442 math.ceil(self._os.get_file_size(filename) / float(part_size)) 

443 ) 

444 max_workers = self._config.max_concurrency 

445 with self._executor_cls(max_workers=max_workers) as executor: 

446 upload_partial = functools.partial( 

447 self._upload_one_part, 

448 filename, 

449 bucket, 

450 key, 

451 upload_id, 

452 part_size, 

453 upload_parts_extra_args, 

454 callback, 

455 ) 

456 for part in executor.map(upload_partial, range(1, num_parts + 1)): 

457 parts.append(part) 

458 return parts 

459 

460 def _upload_one_part( 

461 self, 

462 filename, 

463 bucket, 

464 key, 

465 upload_id, 

466 part_size, 

467 extra_args, 

468 callback, 

469 part_number, 

470 ): 

471 open_chunk_reader = self._os.open_file_chunk_reader 

472 with open_chunk_reader( 

473 filename, part_size * (part_number - 1), part_size, callback 

474 ) as body: 

475 response = self._client.upload_part( 

476 Bucket=bucket, 

477 Key=key, 

478 UploadId=upload_id, 

479 PartNumber=part_number, 

480 Body=body, 

481 **extra_args, 

482 ) 

483 etag = response['ETag'] 

484 return {'ETag': etag, 'PartNumber': part_number} 

485 

486 

487class ShutdownQueue(queue.Queue): 

488 """A queue implementation that can be shutdown. 

489 

490 Shutting down a queue means that this class adds a 

491 trigger_shutdown method that will trigger all subsequent 

492 calls to put() to fail with a ``QueueShutdownError``. 

493 

494 It purposefully deviates from queue.Queue, and is *not* meant 

495 to be a drop in replacement for ``queue.Queue``. 

496 

497 """ 

498 

499 def _init(self, maxsize): 

500 self._shutdown = False 

501 self._shutdown_lock = threading.Lock() 

502 # queue.Queue is an old style class so we don't use super(). 

503 return queue.Queue._init(self, maxsize) 

504 

505 def trigger_shutdown(self): 

506 with self._shutdown_lock: 

507 self._shutdown = True 

508 logger.debug("The IO queue is now shutdown.") 

509 

510 def put(self, item): 

511 # Note: this is not sufficient, it's still possible to deadlock! 

512 # Need to hook into the condition vars used by this class. 

513 with self._shutdown_lock: 

514 if self._shutdown: 

515 raise QueueShutdownError( 

516 "Cannot put item to queue when " "queue has been shutdown." 

517 ) 

518 return queue.Queue.put(self, item) 

519 

520 

521class MultipartDownloader: 

522 def __init__( 

523 self, 

524 client, 

525 config, 

526 osutil, 

527 executor_cls=concurrent.futures.ThreadPoolExecutor, 

528 ): 

529 self._client = client 

530 self._config = config 

531 self._os = osutil 

532 self._executor_cls = executor_cls 

533 self._ioqueue = ShutdownQueue(self._config.max_io_queue) 

534 

535 def download_file( 

536 self, bucket, key, filename, object_size, extra_args, callback=None 

537 ): 

538 with self._executor_cls(max_workers=2) as controller: 

539 # 1 thread for the future that manages the uploading of files 

540 # 1 thread for the future that manages IO writes. 

541 download_parts_handler = functools.partial( 

542 self._download_file_as_future, 

543 bucket, 

544 key, 

545 filename, 

546 object_size, 

547 callback, 

548 ) 

549 parts_future = controller.submit(download_parts_handler) 

550 

551 io_writes_handler = functools.partial( 

552 self._perform_io_writes, filename 

553 ) 

554 io_future = controller.submit(io_writes_handler) 

555 results = concurrent.futures.wait( 

556 [parts_future, io_future], 

557 return_when=concurrent.futures.FIRST_EXCEPTION, 

558 ) 

559 self._process_future_results(results) 

560 

561 def _process_future_results(self, futures): 

562 finished, unfinished = futures 

563 for future in finished: 

564 future.result() 

565 

566 def _download_file_as_future( 

567 self, bucket, key, filename, object_size, callback 

568 ): 

569 part_size = self._config.multipart_chunksize 

570 num_parts = int(math.ceil(object_size / float(part_size))) 

571 max_workers = self._config.max_concurrency 

572 download_partial = functools.partial( 

573 self._download_range, 

574 bucket, 

575 key, 

576 filename, 

577 part_size, 

578 num_parts, 

579 callback, 

580 ) 

581 try: 

582 with self._executor_cls(max_workers=max_workers) as executor: 

583 list(executor.map(download_partial, range(num_parts))) 

584 finally: 

585 self._ioqueue.put(SHUTDOWN_SENTINEL) 

586 

587 def _calculate_range_param(self, part_size, part_index, num_parts): 

588 start_range = part_index * part_size 

589 if part_index == num_parts - 1: 

590 end_range = '' 

591 else: 

592 end_range = start_range + part_size - 1 

593 range_param = f'bytes={start_range}-{end_range}' 

594 return range_param 

595 

596 def _download_range( 

597 self, bucket, key, filename, part_size, num_parts, callback, part_index 

598 ): 

599 try: 

600 range_param = self._calculate_range_param( 

601 part_size, part_index, num_parts 

602 ) 

603 

604 max_attempts = self._config.num_download_attempts 

605 last_exception = None 

606 for i in range(max_attempts): 

607 try: 

608 logger.debug("Making get_object call.") 

609 response = self._client.get_object( 

610 Bucket=bucket, Key=key, Range=range_param 

611 ) 

612 streaming_body = StreamReaderProgress( 

613 response['Body'], callback 

614 ) 

615 buffer_size = 1024 * 16 

616 current_index = part_size * part_index 

617 for chunk in iter( 

618 lambda: streaming_body.read(buffer_size), b'' 

619 ): 

620 self._ioqueue.put((current_index, chunk)) 

621 current_index += len(chunk) 

622 return 

623 except ( 

624 socket.timeout, 

625 OSError, 

626 ReadTimeoutError, 

627 IncompleteReadError, 

628 ResponseStreamingError, 

629 ) as e: 

630 logger.debug( 

631 "Retrying exception caught (%s), " 

632 "retrying request, (attempt %s / %s)", 

633 e, 

634 i, 

635 max_attempts, 

636 exc_info=True, 

637 ) 

638 last_exception = e 

639 continue 

640 raise RetriesExceededError(last_exception) 

641 finally: 

642 logger.debug("EXITING _download_range for part: %s", part_index) 

643 

644 def _perform_io_writes(self, filename): 

645 with self._os.open(filename, 'wb') as f: 

646 while True: 

647 task = self._ioqueue.get() 

648 if task is SHUTDOWN_SENTINEL: 

649 logger.debug( 

650 "Shutdown sentinel received in IO handler, " 

651 "shutting down IO handler." 

652 ) 

653 return 

654 else: 

655 try: 

656 offset, data = task 

657 f.seek(offset) 

658 f.write(data) 

659 except Exception as e: 

660 logger.debug( 

661 "Caught exception in IO thread: %s", 

662 e, 

663 exc_info=True, 

664 ) 

665 self._ioqueue.trigger_shutdown() 

666 raise 

667 

668 

669class TransferConfig: 

670 def __init__( 

671 self, 

672 multipart_threshold=8 * MB, 

673 max_concurrency=10, 

674 multipart_chunksize=8 * MB, 

675 num_download_attempts=5, 

676 max_io_queue=100, 

677 ): 

678 self.multipart_threshold = multipart_threshold 

679 self.max_concurrency = max_concurrency 

680 self.multipart_chunksize = multipart_chunksize 

681 self.num_download_attempts = num_download_attempts 

682 self.max_io_queue = max_io_queue 

683 

684 

685class S3Transfer: 

686 ALLOWED_DOWNLOAD_ARGS = [ 

687 'VersionId', 

688 'SSECustomerAlgorithm', 

689 'SSECustomerKey', 

690 'SSECustomerKeyMD5', 

691 'RequestPayer', 

692 ] 

693 

694 ALLOWED_UPLOAD_ARGS = [ 

695 'ACL', 

696 'CacheControl', 

697 'ContentDisposition', 

698 'ContentEncoding', 

699 'ContentLanguage', 

700 'ContentType', 

701 'Expires', 

702 'GrantFullControl', 

703 'GrantRead', 

704 'GrantReadACP', 

705 'GrantWriteACL', 

706 'Metadata', 

707 'RequestPayer', 

708 'ServerSideEncryption', 

709 'StorageClass', 

710 'SSECustomerAlgorithm', 

711 'SSECustomerKey', 

712 'SSECustomerKeyMD5', 

713 'SSEKMSKeyId', 

714 'SSEKMSEncryptionContext', 

715 'Tagging', 

716 ] 

717 

718 def __init__(self, client, config=None, osutil=None): 

719 self._client = client 

720 self._client.meta.events.register( 

721 'before-call.s3.*', self._update_checksum_context 

722 ) 

723 if config is None: 

724 config = TransferConfig() 

725 self._config = config 

726 if osutil is None: 

727 osutil = OSUtils() 

728 self._osutil = osutil 

729 

730 def _update_checksum_context(self, params, **kwargs): 

731 request_context = params.get("context", {}) 

732 checksum_context = request_context.get("checksum", {}) 

733 if "request_algorithm" in checksum_context: 

734 # Force request checksum algorithm in the header if specified. 

735 checksum_context["request_algorithm"]["in"] = "header" 

736 

737 def upload_file( 

738 self, filename, bucket, key, callback=None, extra_args=None 

739 ): 

740 """Upload a file to an S3 object. 

741 

742 Variants have also been injected into S3 client, Bucket and Object. 

743 You don't have to use S3Transfer.upload_file() directly. 

744 """ 

745 if extra_args is None: 

746 extra_args = {} 

747 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

748 events = self._client.meta.events 

749 events.register_first( 

750 'request-created.s3', 

751 disable_upload_callbacks, 

752 unique_id='s3upload-callback-disable', 

753 ) 

754 events.register_last( 

755 'request-created.s3', 

756 enable_upload_callbacks, 

757 unique_id='s3upload-callback-enable', 

758 ) 

759 if ( 

760 self._osutil.get_file_size(filename) 

761 >= self._config.multipart_threshold 

762 ): 

763 self._multipart_upload(filename, bucket, key, callback, extra_args) 

764 else: 

765 self._put_object(filename, bucket, key, callback, extra_args) 

766 

767 def _put_object(self, filename, bucket, key, callback, extra_args): 

768 # We're using open_file_chunk_reader so we can take advantage of the 

769 # progress callback functionality. 

770 open_chunk_reader = self._osutil.open_file_chunk_reader 

771 with open_chunk_reader( 

772 filename, 

773 0, 

774 self._osutil.get_file_size(filename), 

775 callback=callback, 

776 ) as body: 

777 self._client.put_object( 

778 Bucket=bucket, Key=key, Body=body, **extra_args 

779 ) 

780 

781 def download_file( 

782 self, bucket, key, filename, extra_args=None, callback=None 

783 ): 

784 """Download an S3 object to a file. 

785 

786 Variants have also been injected into S3 client, Bucket and Object. 

787 You don't have to use S3Transfer.download_file() directly. 

788 """ 

789 # This method will issue a ``head_object`` request to determine 

790 # the size of the S3 object. This is used to determine if the 

791 # object is downloaded in parallel. 

792 if extra_args is None: 

793 extra_args = {} 

794 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

795 object_size = self._object_size(bucket, key, extra_args) 

796 temp_filename = filename + os.extsep + random_file_extension() 

797 try: 

798 self._download_file( 

799 bucket, key, temp_filename, object_size, extra_args, callback 

800 ) 

801 except Exception: 

802 logger.debug( 

803 "Exception caught in download_file, removing partial " 

804 "file: %s", 

805 temp_filename, 

806 exc_info=True, 

807 ) 

808 self._osutil.remove_file(temp_filename) 

809 raise 

810 else: 

811 self._osutil.rename_file(temp_filename, filename) 

812 

813 def _download_file( 

814 self, bucket, key, filename, object_size, extra_args, callback 

815 ): 

816 if object_size >= self._config.multipart_threshold: 

817 self._ranged_download( 

818 bucket, key, filename, object_size, extra_args, callback 

819 ) 

820 else: 

821 self._get_object(bucket, key, filename, extra_args, callback) 

822 

823 def _validate_all_known_args(self, actual, allowed): 

824 for kwarg in actual: 

825 if kwarg not in allowed: 

826 raise ValueError( 

827 f"Invalid extra_args key '{kwarg}', " 

828 f"must be one of: {', '.join(allowed)}" 

829 ) 

830 

831 def _ranged_download( 

832 self, bucket, key, filename, object_size, extra_args, callback 

833 ): 

834 downloader = MultipartDownloader( 

835 self._client, self._config, self._osutil 

836 ) 

837 downloader.download_file( 

838 bucket, key, filename, object_size, extra_args, callback 

839 ) 

840 

841 def _get_object(self, bucket, key, filename, extra_args, callback): 

842 # precondition: num_download_attempts > 0 

843 max_attempts = self._config.num_download_attempts 

844 last_exception = None 

845 for i in range(max_attempts): 

846 try: 

847 return self._do_get_object( 

848 bucket, key, filename, extra_args, callback 

849 ) 

850 except ( 

851 socket.timeout, 

852 OSError, 

853 ReadTimeoutError, 

854 IncompleteReadError, 

855 ResponseStreamingError, 

856 ) as e: 

857 # TODO: we need a way to reset the callback if the 

858 # download failed. 

859 logger.debug( 

860 "Retrying exception caught (%s), " 

861 "retrying request, (attempt %s / %s)", 

862 e, 

863 i, 

864 max_attempts, 

865 exc_info=True, 

866 ) 

867 last_exception = e 

868 continue 

869 raise RetriesExceededError(last_exception) 

870 

871 def _do_get_object(self, bucket, key, filename, extra_args, callback): 

872 response = self._client.get_object( 

873 Bucket=bucket, Key=key, **extra_args 

874 ) 

875 streaming_body = StreamReaderProgress(response['Body'], callback) 

876 with self._osutil.open(filename, 'wb') as f: 

877 for chunk in iter(lambda: streaming_body.read(8192), b''): 

878 f.write(chunk) 

879 

880 def _object_size(self, bucket, key, extra_args): 

881 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[ 

882 'ContentLength' 

883 ] 

884 

885 def _multipart_upload(self, filename, bucket, key, callback, extra_args): 

886 uploader = MultipartUploader(self._client, self._config, self._osutil) 

887 uploader.upload_file(filename, bucket, key, callback, extra_args)