Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/__init__.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

314 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13"""Abstractions over S3's upload/download operations. 

14 

15This module provides high level abstractions for efficient 

16uploads/downloads. It handles several things for the user: 

17 

18* Automatically switching to multipart transfers when 

19 a file is over a specific size threshold 

20* Uploading/downloading a file in parallel 

21* Throttling based on max bandwidth 

22* Progress callbacks to monitor transfers 

23* Retries. While botocore handles retries for streaming uploads, 

24 it is not possible for it to handle retries for streaming 

25 downloads. This module handles retries for both cases so 

26 you don't need to implement any retry logic yourself. 

27 

28This module has a reasonable set of defaults. It also allows you 

29to configure many aspects of the transfer process including: 

30 

31* Multipart threshold size 

32* Max parallel downloads 

33* Max bandwidth 

34* Socket timeouts 

35* Retry amounts 

36 

37There is no support for s3->s3 multipart copies at this 

38time. 

39 

40 

41.. _ref_s3transfer_usage: 

42 

43Usage 

44===== 

45 

46The simplest way to use this module is: 

47 

48.. code-block:: python 

49 

50 client = boto3.client('s3', 'us-west-2') 

51 transfer = S3Transfer(client) 

52 # Upload /tmp/myfile to s3://bucket/key 

53 transfer.upload_file('/tmp/myfile', 'bucket', 'key') 

54 

55 # Download s3://bucket/key to /tmp/myfile 

56 transfer.download_file('bucket', 'key', '/tmp/myfile') 

57 

58The ``upload_file`` and ``download_file`` methods also accept 

59``**kwargs``, which will be forwarded through to the corresponding 

60client operation. Here are a few examples using ``upload_file``:: 

61 

62 # Making the object public 

63 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

64 extra_args={'ACL': 'public-read'}) 

65 

66 # Setting metadata 

67 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

68 extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) 

69 

70 # Setting content type 

71 transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', 

72 extra_args={'ContentType': "application/json"}) 

73 

74 

75The ``S3Transfer`` class also supports progress callbacks so you can 

76provide transfer progress to users. Both the ``upload_file`` and 

77``download_file`` methods take an optional ``callback`` parameter. 

78Here's an example of how to print a simple progress percentage 

79to the user: 

80 

81.. code-block:: python 

82 

83 class ProgressPercentage(object): 

84 def __init__(self, filename): 

85 self._filename = filename 

86 self._size = float(os.path.getsize(filename)) 

87 self._seen_so_far = 0 

88 self._lock = threading.Lock() 

89 

90 def __call__(self, bytes_amount): 

91 # To simplify we'll assume this is hooked up 

92 # to a single filename. 

93 with self._lock: 

94 self._seen_so_far += bytes_amount 

95 percentage = (self._seen_so_far / self._size) * 100 

96 sys.stdout.write( 

97 "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, 

98 self._size, percentage)) 

99 sys.stdout.flush() 

100 

101 

102 transfer = S3Transfer(boto3.client('s3', 'us-west-2')) 

103 # Upload /tmp/myfile to s3://bucket/key and print upload progress. 

104 transfer.upload_file('/tmp/myfile', 'bucket', 'key', 

105 callback=ProgressPercentage('/tmp/myfile')) 

106 

107 

108 

109You can also provide a TransferConfig object to the S3Transfer 

110object that gives you more fine grained control over the 

111transfer. For example: 

112 

113.. code-block:: python 

114 

115 client = boto3.client('s3', 'us-west-2') 

116 config = TransferConfig( 

117 multipart_threshold=8 * 1024 * 1024, 

118 max_concurrency=10, 

119 num_download_attempts=10, 

120 ) 

121 transfer = S3Transfer(client, config) 

122 transfer.upload_file('/tmp/foo', 'bucket', 'key') 

123 

124 

125""" 

126 

127import concurrent.futures 

128import functools 

129import logging 

130import math 

131import os 

132import queue 

133import random 

134import socket 

135import string 

136import threading 

137from logging import NullHandler 

138 

139from botocore.compat import six # noqa: F401 

140from botocore.exceptions import IncompleteReadError, ResponseStreamingError 

141from botocore.vendored.requests.packages.urllib3.exceptions import ( 

142 ReadTimeoutError, 

143) 

144 

145import s3transfer.compat 

146from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError 

147 

148__author__ = 'Amazon Web Services' 

149__version__ = '0.13.1' 

150 

151 

152logger = logging.getLogger(__name__) 

153logger.addHandler(NullHandler()) 

154 

155MB = 1024 * 1024 

156SHUTDOWN_SENTINEL = object() 

157 

158 

159def random_file_extension(num_digits=8): 

160 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

161 

162 

163def disable_upload_callbacks(request, operation_name, **kwargs): 

164 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

165 request.body, 'disable_callback' 

166 ): 

167 request.body.disable_callback() 

168 

169 

170def enable_upload_callbacks(request, operation_name, **kwargs): 

171 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

172 request.body, 'enable_callback' 

173 ): 

174 request.body.enable_callback() 

175 

176 

177class QueueShutdownError(Exception): 

178 pass 

179 

180 

181class ReadFileChunk: 

182 def __init__( 

183 self, 

184 fileobj, 

185 start_byte, 

186 chunk_size, 

187 full_file_size, 

188 callback=None, 

189 enable_callback=True, 

190 ): 

191 """ 

192 

193 Given a file object shown below: 

194 

195 |___________________________________________________| 

196 0 | | full_file_size 

197 |----chunk_size---| 

198 start_byte 

199 

200 :type fileobj: file 

201 :param fileobj: File like object 

202 

203 :type start_byte: int 

204 :param start_byte: The first byte from which to start reading. 

205 

206 :type chunk_size: int 

207 :param chunk_size: The max chunk size to read. Trying to read 

208 pass the end of the chunk size will behave like you've 

209 reached the end of the file. 

210 

211 :type full_file_size: int 

212 :param full_file_size: The entire content length associated 

213 with ``fileobj``. 

214 

215 :type callback: function(amount_read) 

216 :param callback: Called whenever data is read from this object. 

217 

218 """ 

219 self._fileobj = fileobj 

220 self._start_byte = start_byte 

221 self._size = self._calculate_file_size( 

222 self._fileobj, 

223 requested_size=chunk_size, 

224 start_byte=start_byte, 

225 actual_file_size=full_file_size, 

226 ) 

227 self._fileobj.seek(self._start_byte) 

228 self._amount_read = 0 

229 self._callback = callback 

230 self._callback_enabled = enable_callback 

231 

232 @classmethod 

233 def from_filename( 

234 cls, 

235 filename, 

236 start_byte, 

237 chunk_size, 

238 callback=None, 

239 enable_callback=True, 

240 ): 

241 """Convenience factory function to create from a filename. 

242 

243 :type start_byte: int 

244 :param start_byte: The first byte from which to start reading. 

245 

246 :type chunk_size: int 

247 :param chunk_size: The max chunk size to read. Trying to read 

248 pass the end of the chunk size will behave like you've 

249 reached the end of the file. 

250 

251 :type full_file_size: int 

252 :param full_file_size: The entire content length associated 

253 with ``fileobj``. 

254 

255 :type callback: function(amount_read) 

256 :param callback: Called whenever data is read from this object. 

257 

258 :type enable_callback: bool 

259 :param enable_callback: Indicate whether to invoke callback 

260 during read() calls. 

261 

262 :rtype: ``ReadFileChunk`` 

263 :return: A new instance of ``ReadFileChunk`` 

264 

265 """ 

266 f = open(filename, 'rb') 

267 file_size = os.fstat(f.fileno()).st_size 

268 return cls( 

269 f, start_byte, chunk_size, file_size, callback, enable_callback 

270 ) 

271 

272 def _calculate_file_size( 

273 self, fileobj, requested_size, start_byte, actual_file_size 

274 ): 

275 max_chunk_size = actual_file_size - start_byte 

276 return min(max_chunk_size, requested_size) 

277 

278 def read(self, amount=None): 

279 if amount is None: 

280 amount_to_read = self._size - self._amount_read 

281 else: 

282 amount_to_read = min(self._size - self._amount_read, amount) 

283 data = self._fileobj.read(amount_to_read) 

284 self._amount_read += len(data) 

285 if self._callback is not None and self._callback_enabled: 

286 self._callback(len(data)) 

287 return data 

288 

289 def enable_callback(self): 

290 self._callback_enabled = True 

291 

292 def disable_callback(self): 

293 self._callback_enabled = False 

294 

295 def seek(self, where): 

296 self._fileobj.seek(self._start_byte + where) 

297 if self._callback is not None and self._callback_enabled: 

298 # To also rewind the callback() for an accurate progress report 

299 self._callback(where - self._amount_read) 

300 self._amount_read = where 

301 

302 def close(self): 

303 self._fileobj.close() 

304 

305 def tell(self): 

306 return self._amount_read 

307 

308 def __len__(self): 

309 # __len__ is defined because requests will try to determine the length 

310 # of the stream to set a content length. In the normal case 

311 # of the file it will just stat the file, but we need to change that 

312 # behavior. By providing a __len__, requests will use that instead 

313 # of stat'ing the file. 

314 return self._size 

315 

316 def __enter__(self): 

317 return self 

318 

319 def __exit__(self, *args, **kwargs): 

320 self.close() 

321 

322 def __iter__(self): 

323 # This is a workaround for http://bugs.python.org/issue17575 

324 # Basically httplib will try to iterate over the contents, even 

325 # if its a file like object. This wasn't noticed because we've 

326 # already exhausted the stream so iterating over the file immediately 

327 # stops, which is what we're simulating here. 

328 return iter([]) 

329 

330 

331class StreamReaderProgress: 

332 """Wrapper for a read only stream that adds progress callbacks.""" 

333 

334 def __init__(self, stream, callback=None): 

335 self._stream = stream 

336 self._callback = callback 

337 

338 def read(self, *args, **kwargs): 

339 value = self._stream.read(*args, **kwargs) 

340 if self._callback is not None: 

341 self._callback(len(value)) 

342 return value 

343 

344 

345class OSUtils: 

346 def get_file_size(self, filename): 

347 return os.path.getsize(filename) 

348 

349 def open_file_chunk_reader(self, filename, start_byte, size, callback): 

350 return ReadFileChunk.from_filename( 

351 filename, start_byte, size, callback, enable_callback=False 

352 ) 

353 

354 def open(self, filename, mode): 

355 return open(filename, mode) 

356 

357 def remove_file(self, filename): 

358 """Remove a file, noop if file does not exist.""" 

359 # Unlike os.remove, if the file does not exist, 

360 # then this method does nothing. 

361 try: 

362 os.remove(filename) 

363 except OSError: 

364 pass 

365 

366 def rename_file(self, current_filename, new_filename): 

367 s3transfer.compat.rename_file(current_filename, new_filename) 

368 

369 

370class MultipartUploader: 

371 # These are the extra_args that need to be forwarded onto 

372 # subsequent upload_parts. 

373 UPLOAD_PART_ARGS = [ 

374 'SSECustomerKey', 

375 'SSECustomerAlgorithm', 

376 'SSECustomerKeyMD5', 

377 'RequestPayer', 

378 ] 

379 

380 def __init__( 

381 self, 

382 client, 

383 config, 

384 osutil, 

385 executor_cls=concurrent.futures.ThreadPoolExecutor, 

386 ): 

387 self._client = client 

388 self._config = config 

389 self._os = osutil 

390 self._executor_cls = executor_cls 

391 

392 def _extra_upload_part_args(self, extra_args): 

393 # Only the args in UPLOAD_PART_ARGS actually need to be passed 

394 # onto the upload_part calls. 

395 upload_parts_args = {} 

396 for key, value in extra_args.items(): 

397 if key in self.UPLOAD_PART_ARGS: 

398 upload_parts_args[key] = value 

399 return upload_parts_args 

400 

401 def upload_file(self, filename, bucket, key, callback, extra_args): 

402 response = self._client.create_multipart_upload( 

403 Bucket=bucket, Key=key, **extra_args 

404 ) 

405 upload_id = response['UploadId'] 

406 try: 

407 parts = self._upload_parts( 

408 upload_id, filename, bucket, key, callback, extra_args 

409 ) 

410 except Exception as e: 

411 logger.debug( 

412 "Exception raised while uploading parts, " 

413 "aborting multipart upload.", 

414 exc_info=True, 

415 ) 

416 self._client.abort_multipart_upload( 

417 Bucket=bucket, Key=key, UploadId=upload_id 

418 ) 

419 raise S3UploadFailedError( 

420 "Failed to upload {} to {}: {}".format( 

421 filename, '/'.join([bucket, key]), e 

422 ) 

423 ) 

424 self._client.complete_multipart_upload( 

425 Bucket=bucket, 

426 Key=key, 

427 UploadId=upload_id, 

428 MultipartUpload={'Parts': parts}, 

429 ) 

430 

431 def _upload_parts( 

432 self, upload_id, filename, bucket, key, callback, extra_args 

433 ): 

434 upload_parts_extra_args = self._extra_upload_part_args(extra_args) 

435 parts = [] 

436 part_size = self._config.multipart_chunksize 

437 num_parts = int( 

438 math.ceil(self._os.get_file_size(filename) / float(part_size)) 

439 ) 

440 max_workers = self._config.max_concurrency 

441 with self._executor_cls(max_workers=max_workers) as executor: 

442 upload_partial = functools.partial( 

443 self._upload_one_part, 

444 filename, 

445 bucket, 

446 key, 

447 upload_id, 

448 part_size, 

449 upload_parts_extra_args, 

450 callback, 

451 ) 

452 for part in executor.map(upload_partial, range(1, num_parts + 1)): 

453 parts.append(part) 

454 return parts 

455 

456 def _upload_one_part( 

457 self, 

458 filename, 

459 bucket, 

460 key, 

461 upload_id, 

462 part_size, 

463 extra_args, 

464 callback, 

465 part_number, 

466 ): 

467 open_chunk_reader = self._os.open_file_chunk_reader 

468 with open_chunk_reader( 

469 filename, part_size * (part_number - 1), part_size, callback 

470 ) as body: 

471 response = self._client.upload_part( 

472 Bucket=bucket, 

473 Key=key, 

474 UploadId=upload_id, 

475 PartNumber=part_number, 

476 Body=body, 

477 **extra_args, 

478 ) 

479 etag = response['ETag'] 

480 return {'ETag': etag, 'PartNumber': part_number} 

481 

482 

483class ShutdownQueue(queue.Queue): 

484 """A queue implementation that can be shutdown. 

485 

486 Shutting down a queue means that this class adds a 

487 trigger_shutdown method that will trigger all subsequent 

488 calls to put() to fail with a ``QueueShutdownError``. 

489 

490 It purposefully deviates from queue.Queue, and is *not* meant 

491 to be a drop in replacement for ``queue.Queue``. 

492 

493 """ 

494 

495 def _init(self, maxsize): 

496 self._shutdown = False 

497 self._shutdown_lock = threading.Lock() 

498 # queue.Queue is an old style class so we don't use super(). 

499 return queue.Queue._init(self, maxsize) 

500 

501 def trigger_shutdown(self): 

502 with self._shutdown_lock: 

503 self._shutdown = True 

504 logger.debug("The IO queue is now shutdown.") 

505 

506 def put(self, item): 

507 # Note: this is not sufficient, it's still possible to deadlock! 

508 # Need to hook into the condition vars used by this class. 

509 with self._shutdown_lock: 

510 if self._shutdown: 

511 raise QueueShutdownError( 

512 "Cannot put item to queue when queue has been shutdown." 

513 ) 

514 return queue.Queue.put(self, item) 

515 

516 

517class MultipartDownloader: 

518 def __init__( 

519 self, 

520 client, 

521 config, 

522 osutil, 

523 executor_cls=concurrent.futures.ThreadPoolExecutor, 

524 ): 

525 self._client = client 

526 self._config = config 

527 self._os = osutil 

528 self._executor_cls = executor_cls 

529 self._ioqueue = ShutdownQueue(self._config.max_io_queue) 

530 

531 def download_file( 

532 self, bucket, key, filename, object_size, extra_args, callback=None 

533 ): 

534 with self._executor_cls(max_workers=2) as controller: 

535 # 1 thread for the future that manages the uploading of files 

536 # 1 thread for the future that manages IO writes. 

537 download_parts_handler = functools.partial( 

538 self._download_file_as_future, 

539 bucket, 

540 key, 

541 filename, 

542 object_size, 

543 callback, 

544 ) 

545 parts_future = controller.submit(download_parts_handler) 

546 

547 io_writes_handler = functools.partial( 

548 self._perform_io_writes, filename 

549 ) 

550 io_future = controller.submit(io_writes_handler) 

551 results = concurrent.futures.wait( 

552 [parts_future, io_future], 

553 return_when=concurrent.futures.FIRST_EXCEPTION, 

554 ) 

555 self._process_future_results(results) 

556 

557 def _process_future_results(self, futures): 

558 finished, unfinished = futures 

559 for future in finished: 

560 future.result() 

561 

562 def _download_file_as_future( 

563 self, bucket, key, filename, object_size, callback 

564 ): 

565 part_size = self._config.multipart_chunksize 

566 num_parts = int(math.ceil(object_size / float(part_size))) 

567 max_workers = self._config.max_concurrency 

568 download_partial = functools.partial( 

569 self._download_range, 

570 bucket, 

571 key, 

572 filename, 

573 part_size, 

574 num_parts, 

575 callback, 

576 ) 

577 try: 

578 with self._executor_cls(max_workers=max_workers) as executor: 

579 list(executor.map(download_partial, range(num_parts))) 

580 finally: 

581 self._ioqueue.put(SHUTDOWN_SENTINEL) 

582 

583 def _calculate_range_param(self, part_size, part_index, num_parts): 

584 start_range = part_index * part_size 

585 if part_index == num_parts - 1: 

586 end_range = '' 

587 else: 

588 end_range = start_range + part_size - 1 

589 range_param = f'bytes={start_range}-{end_range}' 

590 return range_param 

591 

592 def _download_range( 

593 self, bucket, key, filename, part_size, num_parts, callback, part_index 

594 ): 

595 try: 

596 range_param = self._calculate_range_param( 

597 part_size, part_index, num_parts 

598 ) 

599 

600 max_attempts = self._config.num_download_attempts 

601 last_exception = None 

602 for i in range(max_attempts): 

603 try: 

604 logger.debug("Making get_object call.") 

605 response = self._client.get_object( 

606 Bucket=bucket, Key=key, Range=range_param 

607 ) 

608 streaming_body = StreamReaderProgress( 

609 response['Body'], callback 

610 ) 

611 buffer_size = 1024 * 16 

612 current_index = part_size * part_index 

613 for chunk in iter( 

614 lambda: streaming_body.read(buffer_size), b'' 

615 ): 

616 self._ioqueue.put((current_index, chunk)) 

617 current_index += len(chunk) 

618 return 

619 except ( 

620 socket.timeout, 

621 OSError, 

622 ReadTimeoutError, 

623 IncompleteReadError, 

624 ResponseStreamingError, 

625 ) as e: 

626 logger.debug( 

627 "Retrying exception caught (%s), " 

628 "retrying request, (attempt %s / %s)", 

629 e, 

630 i, 

631 max_attempts, 

632 exc_info=True, 

633 ) 

634 last_exception = e 

635 continue 

636 raise RetriesExceededError(last_exception) 

637 finally: 

638 logger.debug("EXITING _download_range for part: %s", part_index) 

639 

640 def _perform_io_writes(self, filename): 

641 with self._os.open(filename, 'wb') as f: 

642 while True: 

643 task = self._ioqueue.get() 

644 if task is SHUTDOWN_SENTINEL: 

645 logger.debug( 

646 "Shutdown sentinel received in IO handler, " 

647 "shutting down IO handler." 

648 ) 

649 return 

650 else: 

651 try: 

652 offset, data = task 

653 f.seek(offset) 

654 f.write(data) 

655 except Exception as e: 

656 logger.debug( 

657 "Caught exception in IO thread: %s", 

658 e, 

659 exc_info=True, 

660 ) 

661 self._ioqueue.trigger_shutdown() 

662 raise 

663 

664 

665class TransferConfig: 

666 def __init__( 

667 self, 

668 multipart_threshold=8 * MB, 

669 max_concurrency=10, 

670 multipart_chunksize=8 * MB, 

671 num_download_attempts=5, 

672 max_io_queue=100, 

673 ): 

674 self.multipart_threshold = multipart_threshold 

675 self.max_concurrency = max_concurrency 

676 self.multipart_chunksize = multipart_chunksize 

677 self.num_download_attempts = num_download_attempts 

678 self.max_io_queue = max_io_queue 

679 

680 

681class S3Transfer: 

682 ALLOWED_DOWNLOAD_ARGS = [ 

683 'VersionId', 

684 'SSECustomerAlgorithm', 

685 'SSECustomerKey', 

686 'SSECustomerKeyMD5', 

687 'RequestPayer', 

688 ] 

689 

690 ALLOWED_UPLOAD_ARGS = [ 

691 'ACL', 

692 'CacheControl', 

693 'ContentDisposition', 

694 'ContentEncoding', 

695 'ContentLanguage', 

696 'ContentType', 

697 'Expires', 

698 'GrantFullControl', 

699 'GrantRead', 

700 'GrantReadACP', 

701 'GrantWriteACL', 

702 'Metadata', 

703 'RequestPayer', 

704 'ServerSideEncryption', 

705 'StorageClass', 

706 'SSECustomerAlgorithm', 

707 'SSECustomerKey', 

708 'SSECustomerKeyMD5', 

709 'SSEKMSKeyId', 

710 'SSEKMSEncryptionContext', 

711 'Tagging', 

712 ] 

713 

714 def __init__(self, client, config=None, osutil=None): 

715 self._client = client 

716 self._client.meta.events.register( 

717 'before-call.s3.*', self._update_checksum_context 

718 ) 

719 if config is None: 

720 config = TransferConfig() 

721 self._config = config 

722 if osutil is None: 

723 osutil = OSUtils() 

724 self._osutil = osutil 

725 

726 def _update_checksum_context(self, params, **kwargs): 

727 request_context = params.get("context", {}) 

728 checksum_context = request_context.get("checksum", {}) 

729 if "request_algorithm" in checksum_context: 

730 # Force request checksum algorithm in the header if specified. 

731 checksum_context["request_algorithm"]["in"] = "header" 

732 

733 def upload_file( 

734 self, filename, bucket, key, callback=None, extra_args=None 

735 ): 

736 """Upload a file to an S3 object. 

737 

738 Variants have also been injected into S3 client, Bucket and Object. 

739 You don't have to use S3Transfer.upload_file() directly. 

740 """ 

741 if extra_args is None: 

742 extra_args = {} 

743 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

744 events = self._client.meta.events 

745 events.register_first( 

746 'request-created.s3', 

747 disable_upload_callbacks, 

748 unique_id='s3upload-callback-disable', 

749 ) 

750 events.register_last( 

751 'request-created.s3', 

752 enable_upload_callbacks, 

753 unique_id='s3upload-callback-enable', 

754 ) 

755 if ( 

756 self._osutil.get_file_size(filename) 

757 >= self._config.multipart_threshold 

758 ): 

759 self._multipart_upload(filename, bucket, key, callback, extra_args) 

760 else: 

761 self._put_object(filename, bucket, key, callback, extra_args) 

762 

763 def _put_object(self, filename, bucket, key, callback, extra_args): 

764 # We're using open_file_chunk_reader so we can take advantage of the 

765 # progress callback functionality. 

766 open_chunk_reader = self._osutil.open_file_chunk_reader 

767 with open_chunk_reader( 

768 filename, 

769 0, 

770 self._osutil.get_file_size(filename), 

771 callback=callback, 

772 ) as body: 

773 self._client.put_object( 

774 Bucket=bucket, Key=key, Body=body, **extra_args 

775 ) 

776 

777 def download_file( 

778 self, bucket, key, filename, extra_args=None, callback=None 

779 ): 

780 """Download an S3 object to a file. 

781 

782 Variants have also been injected into S3 client, Bucket and Object. 

783 You don't have to use S3Transfer.download_file() directly. 

784 """ 

785 # This method will issue a ``head_object`` request to determine 

786 # the size of the S3 object. This is used to determine if the 

787 # object is downloaded in parallel. 

788 if extra_args is None: 

789 extra_args = {} 

790 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

791 object_size = self._object_size(bucket, key, extra_args) 

792 temp_filename = filename + os.extsep + random_file_extension() 

793 try: 

794 self._download_file( 

795 bucket, key, temp_filename, object_size, extra_args, callback 

796 ) 

797 except Exception: 

798 logger.debug( 

799 "Exception caught in download_file, removing partial file: %s", 

800 temp_filename, 

801 exc_info=True, 

802 ) 

803 self._osutil.remove_file(temp_filename) 

804 raise 

805 else: 

806 self._osutil.rename_file(temp_filename, filename) 

807 

808 def _download_file( 

809 self, bucket, key, filename, object_size, extra_args, callback 

810 ): 

811 if object_size >= self._config.multipart_threshold: 

812 self._ranged_download( 

813 bucket, key, filename, object_size, extra_args, callback 

814 ) 

815 else: 

816 self._get_object(bucket, key, filename, extra_args, callback) 

817 

818 def _validate_all_known_args(self, actual, allowed): 

819 for kwarg in actual: 

820 if kwarg not in allowed: 

821 raise ValueError( 

822 f"Invalid extra_args key '{kwarg}', " 

823 f"must be one of: {', '.join(allowed)}" 

824 ) 

825 

826 def _ranged_download( 

827 self, bucket, key, filename, object_size, extra_args, callback 

828 ): 

829 downloader = MultipartDownloader( 

830 self._client, self._config, self._osutil 

831 ) 

832 downloader.download_file( 

833 bucket, key, filename, object_size, extra_args, callback 

834 ) 

835 

836 def _get_object(self, bucket, key, filename, extra_args, callback): 

837 # precondition: num_download_attempts > 0 

838 max_attempts = self._config.num_download_attempts 

839 last_exception = None 

840 for i in range(max_attempts): 

841 try: 

842 return self._do_get_object( 

843 bucket, key, filename, extra_args, callback 

844 ) 

845 except ( 

846 socket.timeout, 

847 OSError, 

848 ReadTimeoutError, 

849 IncompleteReadError, 

850 ResponseStreamingError, 

851 ) as e: 

852 # TODO: we need a way to reset the callback if the 

853 # download failed. 

854 logger.debug( 

855 "Retrying exception caught (%s), " 

856 "retrying request, (attempt %s / %s)", 

857 e, 

858 i, 

859 max_attempts, 

860 exc_info=True, 

861 ) 

862 last_exception = e 

863 continue 

864 raise RetriesExceededError(last_exception) 

865 

866 def _do_get_object(self, bucket, key, filename, extra_args, callback): 

867 response = self._client.get_object( 

868 Bucket=bucket, Key=key, **extra_args 

869 ) 

870 streaming_body = StreamReaderProgress(response['Body'], callback) 

871 with self._osutil.open(filename, 'wb') as f: 

872 for chunk in iter(lambda: streaming_body.read(8192), b''): 

873 f.write(chunk) 

874 

875 def _object_size(self, bucket, key, extra_args): 

876 return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[ 

877 'ContentLength' 

878 ] 

879 

880 def _multipart_upload(self, filename, bucket, key, callback, extra_args): 

881 uploader = MultipartUploader(self._client, self._config, self._osutil) 

882 uploader.upload_file(filename, bucket, key, callback, extra_args)