Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/download.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

257 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import heapq 

14import logging 

15import threading 

16 

17from botocore.exceptions import ClientError 

18 

19from s3transfer.compat import seekable 

20from s3transfer.exceptions import ( 

21 RetriesExceededError, 

22 S3DownloadFailedError, 

23 S3ValidationError, 

24) 

25from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG 

26from s3transfer.tasks import SubmissionTask, Task 

27from s3transfer.utils import ( 

28 S3_RETRYABLE_DOWNLOAD_ERRORS, 

29 CountCallbackInvoker, 

30 DeferredOpenFile, 

31 FunctionContainer, 

32 StreamReaderProgress, 

33 calculate_num_parts, 

34 calculate_range_parameter, 

35 get_callbacks, 

36 invoke_progress_callbacks, 

37) 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42class DownloadOutputManager: 

43 """Base manager class for handling various types of files for downloads 

44 

45 This class is typically used for the DownloadSubmissionTask class to help 

46 determine the following: 

47 

48 * Provides the fileobj to write to downloads to 

49 * Get a task to complete once everything downloaded has been written 

50 

51 The answers/implementations differ for the various types of file outputs 

52 that may be accepted. All implementations must subclass and override 

53 public methods from this class. 

54 """ 

55 

56 def __init__(self, osutil, transfer_coordinator, io_executor): 

57 self._osutil = osutil 

58 self._transfer_coordinator = transfer_coordinator 

59 self._io_executor = io_executor 

60 

61 @classmethod 

62 def is_compatible(cls, download_target, osutil): 

63 """Determines if the target for the download is compatible with manager 

64 

65 :param download_target: The target for which the upload will write 

66 data to. 

67 

68 :param osutil: The os utility to be used for the transfer 

69 

70 :returns: True if the manager can handle the type of target specified 

71 otherwise returns False. 

72 """ 

73 raise NotImplementedError('must implement is_compatible()') 

74 

75 def get_download_task_tag(self): 

76 """Get the tag (if any) to associate all GetObjectTasks 

77 

78 :rtype: s3transfer.futures.TaskTag 

79 :returns: The tag to associate all GetObjectTasks with 

80 """ 

81 return None 

82 

83 def get_fileobj_for_io_writes(self, transfer_future): 

84 """Get file-like object to use for io writes in the io executor 

85 

86 :type transfer_future: s3transfer.futures.TransferFuture 

87 :param transfer_future: The future associated with upload request 

88 

89 returns: A file-like object to write to 

90 """ 

91 raise NotImplementedError('must implement get_fileobj_for_io_writes()') 

92 

93 def queue_file_io_task(self, fileobj, data, offset): 

94 """Queue IO write for submission to the IO executor. 

95 

96 This method accepts an IO executor and information about the 

97 downloaded data, and handles submitting this to the IO executor. 

98 

99 This method may defer submission to the IO executor if necessary. 

100 

101 """ 

102 self._transfer_coordinator.submit( 

103 self._io_executor, self.get_io_write_task(fileobj, data, offset) 

104 ) 

105 

106 def get_io_write_task(self, fileobj, data, offset): 

107 """Get an IO write task for the requested set of data 

108 

109 This task can be ran immediately or be submitted to the IO executor 

110 for it to run. 

111 

112 :type fileobj: file-like object 

113 :param fileobj: The file-like object to write to 

114 

115 :type data: bytes 

116 :param data: The data to write out 

117 

118 :type offset: integer 

119 :param offset: The offset to write the data to in the file-like object 

120 

121 :returns: An IO task to be used to write data to a file-like object 

122 """ 

123 return IOWriteTask( 

124 self._transfer_coordinator, 

125 main_kwargs={ 

126 'fileobj': fileobj, 

127 'data': data, 

128 'offset': offset, 

129 }, 

130 ) 

131 

132 def get_final_io_task(self): 

133 """Get the final io task to complete the download 

134 

135 This is needed because based on the architecture of the TransferManager 

136 the final tasks will be sent to the IO executor, but the executor 

137 needs a final task for it to signal that the transfer is done and 

138 all done callbacks can be run. 

139 

140 :rtype: s3transfer.tasks.Task 

141 :returns: A final task to completed in the io executor 

142 """ 

143 raise NotImplementedError('must implement get_final_io_task()') 

144 

145 def _get_fileobj_from_filename(self, filename): 

146 f = DeferredOpenFile( 

147 filename, mode='wb', open_function=self._osutil.open 

148 ) 

149 # Make sure the file gets closed and we remove the temporary file 

150 # if anything goes wrong during the process. 

151 self._transfer_coordinator.add_failure_cleanup(f.close) 

152 return f 

153 

154 

155class DownloadFilenameOutputManager(DownloadOutputManager): 

156 def __init__(self, osutil, transfer_coordinator, io_executor): 

157 super().__init__(osutil, transfer_coordinator, io_executor) 

158 self._final_filename = None 

159 self._temp_filename = None 

160 self._temp_fileobj = None 

161 

162 @classmethod 

163 def is_compatible(cls, download_target, osutil): 

164 return isinstance(download_target, str) 

165 

166 def get_fileobj_for_io_writes(self, transfer_future): 

167 fileobj = transfer_future.meta.call_args.fileobj 

168 self._final_filename = fileobj 

169 self._temp_filename = self._osutil.get_temp_filename(fileobj) 

170 self._temp_fileobj = self._get_temp_fileobj() 

171 return self._temp_fileobj 

172 

173 def get_final_io_task(self): 

174 # A task to rename the file from the temporary file to its final 

175 # location is needed. This should be the last task needed to complete 

176 # the download. 

177 return IORenameFileTask( 

178 transfer_coordinator=self._transfer_coordinator, 

179 main_kwargs={ 

180 'fileobj': self._temp_fileobj, 

181 'final_filename': self._final_filename, 

182 'osutil': self._osutil, 

183 }, 

184 is_final=True, 

185 ) 

186 

187 def _get_temp_fileobj(self): 

188 f = self._get_fileobj_from_filename(self._temp_filename) 

189 self._transfer_coordinator.add_failure_cleanup( 

190 self._osutil.remove_file, self._temp_filename 

191 ) 

192 return f 

193 

194 

195class DownloadSeekableOutputManager(DownloadOutputManager): 

196 @classmethod 

197 def is_compatible(cls, download_target, osutil): 

198 return seekable(download_target) 

199 

200 def get_fileobj_for_io_writes(self, transfer_future): 

201 # Return the fileobj provided to the future. 

202 return transfer_future.meta.call_args.fileobj 

203 

204 def get_final_io_task(self): 

205 # This task will serve the purpose of signaling when all of the io 

206 # writes have finished so done callbacks can be called. 

207 return CompleteDownloadNOOPTask( 

208 transfer_coordinator=self._transfer_coordinator 

209 ) 

210 

211 

212class DownloadNonSeekableOutputManager(DownloadOutputManager): 

213 def __init__( 

214 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

215 ): 

216 super().__init__(osutil, transfer_coordinator, io_executor) 

217 if defer_queue is None: 

218 defer_queue = DeferQueue() 

219 self._defer_queue = defer_queue 

220 self._io_submit_lock = threading.Lock() 

221 

222 @classmethod 

223 def is_compatible(cls, download_target, osutil): 

224 return hasattr(download_target, 'write') 

225 

226 def get_download_task_tag(self): 

227 return IN_MEMORY_DOWNLOAD_TAG 

228 

229 def get_fileobj_for_io_writes(self, transfer_future): 

230 return transfer_future.meta.call_args.fileobj 

231 

232 def get_final_io_task(self): 

233 return CompleteDownloadNOOPTask( 

234 transfer_coordinator=self._transfer_coordinator 

235 ) 

236 

237 def queue_file_io_task(self, fileobj, data, offset): 

238 with self._io_submit_lock: 

239 writes = self._defer_queue.request_writes(offset, data) 

240 for write in writes: 

241 data = write['data'] 

242 logger.debug( 

243 "Queueing IO offset %s for fileobj: %s", 

244 write['offset'], 

245 fileobj, 

246 ) 

247 super().queue_file_io_task(fileobj, data, offset) 

248 

249 def get_io_write_task(self, fileobj, data, offset): 

250 return IOStreamingWriteTask( 

251 self._transfer_coordinator, 

252 main_kwargs={ 

253 'fileobj': fileobj, 

254 'data': data, 

255 }, 

256 ) 

257 

258 

259class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager): 

260 def __init__( 

261 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

262 ): 

263 super().__init__( 

264 osutil, transfer_coordinator, io_executor, defer_queue 

265 ) 

266 self._fileobj = None 

267 

268 @classmethod 

269 def is_compatible(cls, download_target, osutil): 

270 return isinstance(download_target, str) and osutil.is_special_file( 

271 download_target 

272 ) 

273 

274 def get_fileobj_for_io_writes(self, transfer_future): 

275 filename = transfer_future.meta.call_args.fileobj 

276 self._fileobj = self._get_fileobj_from_filename(filename) 

277 return self._fileobj 

278 

279 def get_final_io_task(self): 

280 # Make sure the file gets closed once the transfer is done. 

281 return IOCloseTask( 

282 transfer_coordinator=self._transfer_coordinator, 

283 is_final=True, 

284 main_kwargs={'fileobj': self._fileobj}, 

285 ) 

286 

287 

288class DownloadSubmissionTask(SubmissionTask): 

289 """Task for submitting tasks to execute a download""" 

290 

291 def _get_download_output_manager_cls(self, transfer_future, osutil): 

292 """Retrieves a class for managing output for a download 

293 

294 :type transfer_future: s3transfer.futures.TransferFuture 

295 :param transfer_future: The transfer future for the request 

296 

297 :type osutil: s3transfer.utils.OSUtils 

298 :param osutil: The os utility associated to the transfer 

299 

300 :rtype: class of DownloadOutputManager 

301 :returns: The appropriate class to use for managing a specific type of 

302 input for downloads. 

303 """ 

304 download_manager_resolver_chain = [ 

305 DownloadSpecialFilenameOutputManager, 

306 DownloadFilenameOutputManager, 

307 DownloadSeekableOutputManager, 

308 DownloadNonSeekableOutputManager, 

309 ] 

310 

311 fileobj = transfer_future.meta.call_args.fileobj 

312 for download_manager_cls in download_manager_resolver_chain: 

313 if download_manager_cls.is_compatible(fileobj, osutil): 

314 return download_manager_cls 

315 raise RuntimeError( 

316 f'Output {fileobj} of type: {type(fileobj)} is not supported.' 

317 ) 

318 

319 def _submit( 

320 self, 

321 client, 

322 config, 

323 osutil, 

324 request_executor, 

325 io_executor, 

326 transfer_future, 

327 bandwidth_limiter=None, 

328 ): 

329 """ 

330 :param client: The client associated with the transfer manager 

331 

332 :type config: s3transfer.manager.TransferConfig 

333 :param config: The transfer config associated with the transfer 

334 manager 

335 

336 :type osutil: s3transfer.utils.OSUtil 

337 :param osutil: The os utility associated to the transfer manager 

338 

339 :type request_executor: s3transfer.futures.BoundedExecutor 

340 :param request_executor: The request executor associated with the 

341 transfer manager 

342 

343 :type io_executor: s3transfer.futures.BoundedExecutor 

344 :param io_executor: The io executor associated with the 

345 transfer manager 

346 

347 :type transfer_future: s3transfer.futures.TransferFuture 

348 :param transfer_future: The transfer future associated with the 

349 transfer request that tasks are being submitted for 

350 

351 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter 

352 :param bandwidth_limiter: The bandwidth limiter to use when 

353 downloading streams 

354 """ 

355 if ( 

356 transfer_future.meta.size is None 

357 or transfer_future.meta.etag is None 

358 ): 

359 response = client.head_object( 

360 Bucket=transfer_future.meta.call_args.bucket, 

361 Key=transfer_future.meta.call_args.key, 

362 **transfer_future.meta.call_args.extra_args, 

363 ) 

364 # If a size was not provided figure out the size for the 

365 # user. 

366 transfer_future.meta.provide_transfer_size( 

367 response['ContentLength'] 

368 ) 

369 # Provide an etag to ensure a stored object is not modified 

370 # during a multipart download. 

371 transfer_future.meta.provide_object_etag(response.get('ETag')) 

372 

373 download_output_manager = self._get_download_output_manager_cls( 

374 transfer_future, osutil 

375 )(osutil, self._transfer_coordinator, io_executor) 

376 

377 # If it is greater than threshold do a ranged download, otherwise 

378 # do a regular GetObject download. 

379 if transfer_future.meta.size < config.multipart_threshold: 

380 self._submit_download_request( 

381 client, 

382 config, 

383 osutil, 

384 request_executor, 

385 io_executor, 

386 download_output_manager, 

387 transfer_future, 

388 bandwidth_limiter, 

389 ) 

390 else: 

391 self._submit_ranged_download_request( 

392 client, 

393 config, 

394 osutil, 

395 request_executor, 

396 io_executor, 

397 download_output_manager, 

398 transfer_future, 

399 bandwidth_limiter, 

400 ) 

401 

402 def _submit_download_request( 

403 self, 

404 client, 

405 config, 

406 osutil, 

407 request_executor, 

408 io_executor, 

409 download_output_manager, 

410 transfer_future, 

411 bandwidth_limiter, 

412 ): 

413 call_args = transfer_future.meta.call_args 

414 

415 # Get a handle to the file that will be used for writing downloaded 

416 # contents 

417 fileobj = download_output_manager.get_fileobj_for_io_writes( 

418 transfer_future 

419 ) 

420 

421 # Get the needed callbacks for the task 

422 progress_callbacks = get_callbacks(transfer_future, 'progress') 

423 

424 # Get any associated tags for the get object task. 

425 get_object_tag = download_output_manager.get_download_task_tag() 

426 

427 # Get the final io task to run once the download is complete. 

428 final_task = download_output_manager.get_final_io_task() 

429 

430 # Submit the task to download the object. 

431 self._transfer_coordinator.submit( 

432 request_executor, 

433 ImmediatelyWriteIOGetObjectTask( 

434 transfer_coordinator=self._transfer_coordinator, 

435 main_kwargs={ 

436 'client': client, 

437 'bucket': call_args.bucket, 

438 'key': call_args.key, 

439 'fileobj': fileobj, 

440 'extra_args': call_args.extra_args, 

441 'callbacks': progress_callbacks, 

442 'max_attempts': config.num_download_attempts, 

443 'download_output_manager': download_output_manager, 

444 'io_chunksize': config.io_chunksize, 

445 'bandwidth_limiter': bandwidth_limiter, 

446 }, 

447 done_callbacks=[final_task], 

448 ), 

449 tag=get_object_tag, 

450 ) 

451 

452 def _submit_ranged_download_request( 

453 self, 

454 client, 

455 config, 

456 osutil, 

457 request_executor, 

458 io_executor, 

459 download_output_manager, 

460 transfer_future, 

461 bandwidth_limiter, 

462 ): 

463 call_args = transfer_future.meta.call_args 

464 

465 # Get the needed progress callbacks for the task 

466 progress_callbacks = get_callbacks(transfer_future, 'progress') 

467 

468 # Get a handle to the file that will be used for writing downloaded 

469 # contents 

470 fileobj = download_output_manager.get_fileobj_for_io_writes( 

471 transfer_future 

472 ) 

473 

474 # Determine the number of parts 

475 part_size = config.multipart_chunksize 

476 num_parts = calculate_num_parts(transfer_future.meta.size, part_size) 

477 

478 # Get any associated tags for the get object task. 

479 get_object_tag = download_output_manager.get_download_task_tag() 

480 

481 # Callback invoker to submit the final io task once all downloads 

482 # are complete. 

483 finalize_download_invoker = CountCallbackInvoker( 

484 self._get_final_io_task_submission_callback( 

485 download_output_manager, io_executor 

486 ) 

487 ) 

488 for i in range(num_parts): 

489 # Calculate the range parameter 

490 range_parameter = calculate_range_parameter( 

491 part_size, i, num_parts 

492 ) 

493 

494 # Inject extra parameters to be passed in as extra args 

495 extra_args = { 

496 'Range': range_parameter, 

497 } 

498 if transfer_future.meta.etag is not None: 

499 extra_args['IfMatch'] = transfer_future.meta.etag 

500 extra_args.update(call_args.extra_args) 

501 finalize_download_invoker.increment() 

502 # Submit the ranged downloads 

503 self._transfer_coordinator.submit( 

504 request_executor, 

505 GetObjectTask( 

506 transfer_coordinator=self._transfer_coordinator, 

507 main_kwargs={ 

508 'client': client, 

509 'bucket': call_args.bucket, 

510 'key': call_args.key, 

511 'fileobj': fileobj, 

512 'extra_args': extra_args, 

513 'callbacks': progress_callbacks, 

514 'max_attempts': config.num_download_attempts, 

515 'start_index': i * part_size, 

516 'download_output_manager': download_output_manager, 

517 'io_chunksize': config.io_chunksize, 

518 'bandwidth_limiter': bandwidth_limiter, 

519 }, 

520 done_callbacks=[finalize_download_invoker.decrement], 

521 ), 

522 tag=get_object_tag, 

523 ) 

524 finalize_download_invoker.finalize() 

525 

526 def _get_final_io_task_submission_callback( 

527 self, download_manager, io_executor 

528 ): 

529 final_task = download_manager.get_final_io_task() 

530 return FunctionContainer( 

531 self._transfer_coordinator.submit, io_executor, final_task 

532 ) 

533 

534 def _calculate_range_param(self, part_size, part_index, num_parts): 

535 # Used to calculate the Range parameter 

536 start_range = part_index * part_size 

537 if part_index == num_parts - 1: 

538 end_range = '' 

539 else: 

540 end_range = start_range + part_size - 1 

541 range_param = f'bytes={start_range}-{end_range}' 

542 return range_param 

543 

544 

545class GetObjectTask(Task): 

546 def _main( 

547 self, 

548 client, 

549 bucket, 

550 key, 

551 fileobj, 

552 extra_args, 

553 callbacks, 

554 max_attempts, 

555 download_output_manager, 

556 io_chunksize, 

557 start_index=0, 

558 bandwidth_limiter=None, 

559 ): 

560 """Downloads an object and places content into io queue 

561 

562 :param client: The client to use when calling GetObject 

563 :param bucket: The bucket to download from 

564 :param key: The key to download from 

565 :param fileobj: The file handle to write content to 

566 :param exta_args: Any extra arguments to include in GetObject request 

567 :param callbacks: List of progress callbacks to invoke on download 

568 :param max_attempts: The number of retries to do when downloading 

569 :param download_output_manager: The download output manager associated 

570 with the current download. 

571 :param io_chunksize: The size of each io chunk to read from the 

572 download stream and queue in the io queue. 

573 :param start_index: The location in the file to start writing the 

574 content of the key to. 

575 :param bandwidth_limiter: The bandwidth limiter to use when throttling 

576 the downloading of data in streams. 

577 """ 

578 last_exception = None 

579 for i in range(max_attempts): 

580 try: 

581 current_index = start_index 

582 response = client.get_object( 

583 Bucket=bucket, Key=key, **extra_args 

584 ) 

585 self._validate_content_range( 

586 extra_args.get('Range'), 

587 response.get('ContentRange'), 

588 ) 

589 streaming_body = StreamReaderProgress( 

590 response['Body'], callbacks 

591 ) 

592 if bandwidth_limiter: 

593 streaming_body = ( 

594 bandwidth_limiter.get_bandwith_limited_stream( 

595 streaming_body, self._transfer_coordinator 

596 ) 

597 ) 

598 

599 chunks = DownloadChunkIterator(streaming_body, io_chunksize) 

600 for chunk in chunks: 

601 # If the transfer is done because of a cancellation 

602 # or error somewhere else, stop trying to submit more 

603 # data to be written and break out of the download. 

604 if not self._transfer_coordinator.done(): 

605 self._handle_io( 

606 download_output_manager, 

607 fileobj, 

608 chunk, 

609 current_index, 

610 ) 

611 current_index += len(chunk) 

612 else: 

613 return 

614 return 

615 except ClientError as e: 

616 error_code = e.response.get('Error', {}).get('Code') 

617 if error_code == "PreconditionFailed": 

618 raise S3DownloadFailedError( 

619 f'Contents of stored object "{key}" in bucket ' 

620 f'"{bucket}" did not match expected ETag.' 

621 ) 

622 else: 

623 raise 

624 except S3_RETRYABLE_DOWNLOAD_ERRORS as e: 

625 logger.debug( 

626 "Retrying exception caught (%s), " 

627 "retrying request, (attempt %s / %s)", 

628 e, 

629 i, 

630 max_attempts, 

631 exc_info=True, 

632 ) 

633 last_exception = e 

634 # Also invoke the progress callbacks to indicate that we 

635 # are trying to download the stream again and all progress 

636 # for this GetObject has been lost. 

637 invoke_progress_callbacks( 

638 callbacks, start_index - current_index 

639 ) 

640 continue 

641 raise RetriesExceededError(last_exception) 

642 

643 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

644 download_output_manager.queue_file_io_task(fileobj, chunk, index) 

645 

646 def _validate_content_range(self, requested_range, content_range): 

647 if not requested_range or not content_range: 

648 return 

649 # Unparsed `ContentRange` looks like `bytes 0-8388607/39542919`, 

650 # where `0-8388607` is the fetched range and `39542919` is 

651 # the total object size. 

652 response_range, total_size = content_range.split('/') 

653 # Subtract `1` because range is 0-indexed. 

654 final_byte = str(int(total_size) - 1) 

655 # If it's the last part, the requested range will not include 

656 # the final byte, eg `bytes=33554432-`. 

657 if requested_range.endswith('-'): 

658 requested_range += final_byte 

659 # Request looks like `bytes=0-8388607`. 

660 # Parsed response looks like `bytes 0-8388607`. 

661 if requested_range[6:] != response_range[6:]: 

662 raise S3ValidationError( 

663 f"Requested range: `{requested_range[6:]}` does not match " 

664 f"content range in response: `{response_range[6:]}`" 

665 ) 

666 

667 

668class ImmediatelyWriteIOGetObjectTask(GetObjectTask): 

669 """GetObjectTask that immediately writes to the provided file object 

670 

671 This is useful for downloads where it is known only one thread is 

672 downloading the object so there is no reason to go through the 

673 overhead of using an IO queue and executor. 

674 """ 

675 

676 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

677 task = download_output_manager.get_io_write_task(fileobj, chunk, index) 

678 task() 

679 

680 

681class IOWriteTask(Task): 

682 def _main(self, fileobj, data, offset): 

683 """Pulls off an io queue to write contents to a file 

684 

685 :param fileobj: The file handle to write content to 

686 :param data: The data to write 

687 :param offset: The offset to write the data to. 

688 """ 

689 fileobj.seek(offset) 

690 fileobj.write(data) 

691 

692 

693class IOStreamingWriteTask(Task): 

694 """Task for writing data to a non-seekable stream.""" 

695 

696 def _main(self, fileobj, data): 

697 """Write data to a fileobj. 

698 

699 Data will be written directly to the fileobj without 

700 any prior seeking. 

701 

702 :param fileobj: The fileobj to write content to 

703 :param data: The data to write 

704 

705 """ 

706 fileobj.write(data) 

707 

708 

709class IORenameFileTask(Task): 

710 """A task to rename a temporary file to its final filename 

711 

712 :param fileobj: The file handle that content was written to. 

713 :param final_filename: The final name of the file to rename to 

714 upon completion of writing the contents. 

715 :param osutil: OS utility 

716 """ 

717 

718 def _main(self, fileobj, final_filename, osutil): 

719 fileobj.close() 

720 osutil.rename_file(fileobj.name, final_filename) 

721 

722 

723class IOCloseTask(Task): 

724 """A task to close out a file once the download is complete. 

725 

726 :param fileobj: The fileobj to close. 

727 """ 

728 

729 def _main(self, fileobj): 

730 fileobj.close() 

731 

732 

733class CompleteDownloadNOOPTask(Task): 

734 """A NOOP task to serve as an indicator that the download is complete 

735 

736 Note that the default for is_final is set to True because this should 

737 always be the last task. 

738 """ 

739 

740 def __init__( 

741 self, 

742 transfer_coordinator, 

743 main_kwargs=None, 

744 pending_main_kwargs=None, 

745 done_callbacks=None, 

746 is_final=True, 

747 ): 

748 super().__init__( 

749 transfer_coordinator=transfer_coordinator, 

750 main_kwargs=main_kwargs, 

751 pending_main_kwargs=pending_main_kwargs, 

752 done_callbacks=done_callbacks, 

753 is_final=is_final, 

754 ) 

755 

756 def _main(self): 

757 pass 

758 

759 

760class DownloadChunkIterator: 

761 def __init__(self, body, chunksize): 

762 """Iterator to chunk out a downloaded S3 stream 

763 

764 :param body: A readable file-like object 

765 :param chunksize: The amount to read each time 

766 """ 

767 self._body = body 

768 self._chunksize = chunksize 

769 self._num_reads = 0 

770 

771 def __iter__(self): 

772 return self 

773 

774 def __next__(self): 

775 chunk = self._body.read(self._chunksize) 

776 self._num_reads += 1 

777 if chunk: 

778 return chunk 

779 elif self._num_reads == 1: 

780 # Even though the response may have not had any 

781 # content, we still want to account for an empty object's 

782 # existence so return the empty chunk for that initial 

783 # read. 

784 return chunk 

785 raise StopIteration() 

786 

787 next = __next__ 

788 

789 

790class DeferQueue: 

791 """IO queue that defers write requests until they are queued sequentially. 

792 

793 This class is used to track IO data for a *single* fileobj. 

794 

795 You can send data to this queue, and it will defer any IO write requests 

796 until it has the next contiguous block available (starting at 0). 

797 

798 """ 

799 

800 def __init__(self): 

801 self._writes = [] 

802 self._pending_offsets = {} 

803 self._next_offset = 0 

804 

805 def request_writes(self, offset, data): 

806 """Request any available writes given new incoming data. 

807 

808 You call this method by providing new data along with the 

809 offset associated with the data. If that new data unlocks 

810 any contiguous writes that can now be submitted, this 

811 method will return all applicable writes. 

812 

813 This is done with 1 method call so you don't have to 

814 make two method calls (put(), get()) which acquires a lock 

815 each method call. 

816 

817 """ 

818 if offset + len(data) <= self._next_offset: 

819 # This is a request for a write that we've already 

820 # seen. This can happen in the event of a retry 

821 # where if we retry at at offset N/2, we'll requeue 

822 # offsets 0-N/2 again. 

823 return [] 

824 writes = [] 

825 if offset < self._next_offset: 

826 # This is a special case where the write request contains 

827 # both seen AND unseen data. This can happen in the case 

828 # that we queue part of a chunk due to an incomplete read, 

829 # then pop the incomplete data for writing, then we receive the retry 

830 # for the incomplete read which contains both the previously-seen 

831 # partial chunk followed by the rest of the chunk (unseen). 

832 # 

833 # In this case, we discard the bytes of the data we've already 

834 # queued before, and only queue the unseen bytes. 

835 seen_bytes = self._next_offset - offset 

836 data = data[seen_bytes:] 

837 offset = self._next_offset 

838 if offset in self._pending_offsets: 

839 queued_data = self._pending_offsets[offset] 

840 if len(data) <= len(queued_data): 

841 # We already have a write request queued with the same offset 

842 # with at least as much data that is present in this 

843 # request. In this case we should ignore this request 

844 # and prefer what's already queued. 

845 return [] 

846 else: 

847 # We have a write request queued with the same offset, 

848 # but this request contains more data. This can happen 

849 # in the case of a retried request due to an incomplete 

850 # read, followed by a retry containing the full response 

851 # body. In this case, we should overwrite the queued 

852 # request with this one since it contains more data. 

853 self._pending_offsets[offset] = data 

854 else: 

855 heapq.heappush(self._writes, offset) 

856 self._pending_offsets[offset] = data 

857 while self._writes and self._writes[0] == self._next_offset: 

858 next_write_offset = heapq.heappop(self._writes) 

859 next_write = self._pending_offsets[next_write_offset] 

860 writes.append({'offset': next_write_offset, 'data': next_write}) 

861 del self._pending_offsets[next_write_offset] 

862 self._next_offset += len(next_write) 

863 return writes