Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/download.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

247 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import heapq 

14import logging 

15import threading 

16 

17from botocore.exceptions import ClientError 

18 

19from s3transfer.compat import seekable 

20from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError 

21from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG 

22from s3transfer.tasks import SubmissionTask, Task 

23from s3transfer.utils import ( 

24 S3_RETRYABLE_DOWNLOAD_ERRORS, 

25 CountCallbackInvoker, 

26 DeferredOpenFile, 

27 FunctionContainer, 

28 StreamReaderProgress, 

29 calculate_num_parts, 

30 calculate_range_parameter, 

31 get_callbacks, 

32 invoke_progress_callbacks, 

33) 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38class DownloadOutputManager: 

39 """Base manager class for handling various types of files for downloads 

40 

41 This class is typically used for the DownloadSubmissionTask class to help 

42 determine the following: 

43 

44 * Provides the fileobj to write to downloads to 

45 * Get a task to complete once everything downloaded has been written 

46 

47 The answers/implementations differ for the various types of file outputs 

48 that may be accepted. All implementations must subclass and override 

49 public methods from this class. 

50 """ 

51 

52 def __init__(self, osutil, transfer_coordinator, io_executor): 

53 self._osutil = osutil 

54 self._transfer_coordinator = transfer_coordinator 

55 self._io_executor = io_executor 

56 

57 @classmethod 

58 def is_compatible(cls, download_target, osutil): 

59 """Determines if the target for the download is compatible with manager 

60 

61 :param download_target: The target for which the upload will write 

62 data to. 

63 

64 :param osutil: The os utility to be used for the transfer 

65 

66 :returns: True if the manager can handle the type of target specified 

67 otherwise returns False. 

68 """ 

69 raise NotImplementedError('must implement is_compatible()') 

70 

71 def get_download_task_tag(self): 

72 """Get the tag (if any) to associate all GetObjectTasks 

73 

74 :rtype: s3transfer.futures.TaskTag 

75 :returns: The tag to associate all GetObjectTasks with 

76 """ 

77 return None 

78 

79 def get_fileobj_for_io_writes(self, transfer_future): 

80 """Get file-like object to use for io writes in the io executor 

81 

82 :type transfer_future: s3transfer.futures.TransferFuture 

83 :param transfer_future: The future associated with upload request 

84 

85 returns: A file-like object to write to 

86 """ 

87 raise NotImplementedError('must implement get_fileobj_for_io_writes()') 

88 

89 def queue_file_io_task(self, fileobj, data, offset): 

90 """Queue IO write for submission to the IO executor. 

91 

92 This method accepts an IO executor and information about the 

93 downloaded data, and handles submitting this to the IO executor. 

94 

95 This method may defer submission to the IO executor if necessary. 

96 

97 """ 

98 self._transfer_coordinator.submit( 

99 self._io_executor, self.get_io_write_task(fileobj, data, offset) 

100 ) 

101 

102 def get_io_write_task(self, fileobj, data, offset): 

103 """Get an IO write task for the requested set of data 

104 

105 This task can be ran immediately or be submitted to the IO executor 

106 for it to run. 

107 

108 :type fileobj: file-like object 

109 :param fileobj: The file-like object to write to 

110 

111 :type data: bytes 

112 :param data: The data to write out 

113 

114 :type offset: integer 

115 :param offset: The offset to write the data to in the file-like object 

116 

117 :returns: An IO task to be used to write data to a file-like object 

118 """ 

119 return IOWriteTask( 

120 self._transfer_coordinator, 

121 main_kwargs={ 

122 'fileobj': fileobj, 

123 'data': data, 

124 'offset': offset, 

125 }, 

126 ) 

127 

128 def get_final_io_task(self): 

129 """Get the final io task to complete the download 

130 

131 This is needed because based on the architecture of the TransferManager 

132 the final tasks will be sent to the IO executor, but the executor 

133 needs a final task for it to signal that the transfer is done and 

134 all done callbacks can be run. 

135 

136 :rtype: s3transfer.tasks.Task 

137 :returns: A final task to completed in the io executor 

138 """ 

139 raise NotImplementedError('must implement get_final_io_task()') 

140 

141 def _get_fileobj_from_filename(self, filename): 

142 f = DeferredOpenFile( 

143 filename, mode='wb', open_function=self._osutil.open 

144 ) 

145 # Make sure the file gets closed and we remove the temporary file 

146 # if anything goes wrong during the process. 

147 self._transfer_coordinator.add_failure_cleanup(f.close) 

148 return f 

149 

150 

151class DownloadFilenameOutputManager(DownloadOutputManager): 

152 def __init__(self, osutil, transfer_coordinator, io_executor): 

153 super().__init__(osutil, transfer_coordinator, io_executor) 

154 self._final_filename = None 

155 self._temp_filename = None 

156 self._temp_fileobj = None 

157 

158 @classmethod 

159 def is_compatible(cls, download_target, osutil): 

160 return isinstance(download_target, str) 

161 

162 def get_fileobj_for_io_writes(self, transfer_future): 

163 fileobj = transfer_future.meta.call_args.fileobj 

164 self._final_filename = fileobj 

165 self._temp_filename = self._osutil.get_temp_filename(fileobj) 

166 self._temp_fileobj = self._get_temp_fileobj() 

167 return self._temp_fileobj 

168 

169 def get_final_io_task(self): 

170 # A task to rename the file from the temporary file to its final 

171 # location is needed. This should be the last task needed to complete 

172 # the download. 

173 return IORenameFileTask( 

174 transfer_coordinator=self._transfer_coordinator, 

175 main_kwargs={ 

176 'fileobj': self._temp_fileobj, 

177 'final_filename': self._final_filename, 

178 'osutil': self._osutil, 

179 }, 

180 is_final=True, 

181 ) 

182 

183 def _get_temp_fileobj(self): 

184 f = self._get_fileobj_from_filename(self._temp_filename) 

185 self._transfer_coordinator.add_failure_cleanup( 

186 self._osutil.remove_file, self._temp_filename 

187 ) 

188 return f 

189 

190 

191class DownloadSeekableOutputManager(DownloadOutputManager): 

192 @classmethod 

193 def is_compatible(cls, download_target, osutil): 

194 return seekable(download_target) 

195 

196 def get_fileobj_for_io_writes(self, transfer_future): 

197 # Return the fileobj provided to the future. 

198 return transfer_future.meta.call_args.fileobj 

199 

200 def get_final_io_task(self): 

201 # This task will serve the purpose of signaling when all of the io 

202 # writes have finished so done callbacks can be called. 

203 return CompleteDownloadNOOPTask( 

204 transfer_coordinator=self._transfer_coordinator 

205 ) 

206 

207 

208class DownloadNonSeekableOutputManager(DownloadOutputManager): 

209 def __init__( 

210 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

211 ): 

212 super().__init__(osutil, transfer_coordinator, io_executor) 

213 if defer_queue is None: 

214 defer_queue = DeferQueue() 

215 self._defer_queue = defer_queue 

216 self._io_submit_lock = threading.Lock() 

217 

218 @classmethod 

219 def is_compatible(cls, download_target, osutil): 

220 return hasattr(download_target, 'write') 

221 

222 def get_download_task_tag(self): 

223 return IN_MEMORY_DOWNLOAD_TAG 

224 

225 def get_fileobj_for_io_writes(self, transfer_future): 

226 return transfer_future.meta.call_args.fileobj 

227 

228 def get_final_io_task(self): 

229 return CompleteDownloadNOOPTask( 

230 transfer_coordinator=self._transfer_coordinator 

231 ) 

232 

233 def queue_file_io_task(self, fileobj, data, offset): 

234 with self._io_submit_lock: 

235 writes = self._defer_queue.request_writes(offset, data) 

236 for write in writes: 

237 data = write['data'] 

238 logger.debug( 

239 "Queueing IO offset %s for fileobj: %s", 

240 write['offset'], 

241 fileobj, 

242 ) 

243 super().queue_file_io_task(fileobj, data, offset) 

244 

245 def get_io_write_task(self, fileobj, data, offset): 

246 return IOStreamingWriteTask( 

247 self._transfer_coordinator, 

248 main_kwargs={ 

249 'fileobj': fileobj, 

250 'data': data, 

251 }, 

252 ) 

253 

254 

255class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager): 

256 def __init__( 

257 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

258 ): 

259 super().__init__( 

260 osutil, transfer_coordinator, io_executor, defer_queue 

261 ) 

262 self._fileobj = None 

263 

264 @classmethod 

265 def is_compatible(cls, download_target, osutil): 

266 return isinstance(download_target, str) and osutil.is_special_file( 

267 download_target 

268 ) 

269 

270 def get_fileobj_for_io_writes(self, transfer_future): 

271 filename = transfer_future.meta.call_args.fileobj 

272 self._fileobj = self._get_fileobj_from_filename(filename) 

273 return self._fileobj 

274 

275 def get_final_io_task(self): 

276 # Make sure the file gets closed once the transfer is done. 

277 return IOCloseTask( 

278 transfer_coordinator=self._transfer_coordinator, 

279 is_final=True, 

280 main_kwargs={'fileobj': self._fileobj}, 

281 ) 

282 

283 

284class DownloadSubmissionTask(SubmissionTask): 

285 """Task for submitting tasks to execute a download""" 

286 

287 def _get_download_output_manager_cls(self, transfer_future, osutil): 

288 """Retrieves a class for managing output for a download 

289 

290 :type transfer_future: s3transfer.futures.TransferFuture 

291 :param transfer_future: The transfer future for the request 

292 

293 :type osutil: s3transfer.utils.OSUtils 

294 :param osutil: The os utility associated to the transfer 

295 

296 :rtype: class of DownloadOutputManager 

297 :returns: The appropriate class to use for managing a specific type of 

298 input for downloads. 

299 """ 

300 download_manager_resolver_chain = [ 

301 DownloadSpecialFilenameOutputManager, 

302 DownloadFilenameOutputManager, 

303 DownloadSeekableOutputManager, 

304 DownloadNonSeekableOutputManager, 

305 ] 

306 

307 fileobj = transfer_future.meta.call_args.fileobj 

308 for download_manager_cls in download_manager_resolver_chain: 

309 if download_manager_cls.is_compatible(fileobj, osutil): 

310 return download_manager_cls 

311 raise RuntimeError( 

312 f'Output {fileobj} of type: {type(fileobj)} is not supported.' 

313 ) 

314 

315 def _submit( 

316 self, 

317 client, 

318 config, 

319 osutil, 

320 request_executor, 

321 io_executor, 

322 transfer_future, 

323 bandwidth_limiter=None, 

324 ): 

325 """ 

326 :param client: The client associated with the transfer manager 

327 

328 :type config: s3transfer.manager.TransferConfig 

329 :param config: The transfer config associated with the transfer 

330 manager 

331 

332 :type osutil: s3transfer.utils.OSUtil 

333 :param osutil: The os utility associated to the transfer manager 

334 

335 :type request_executor: s3transfer.futures.BoundedExecutor 

336 :param request_executor: The request executor associated with the 

337 transfer manager 

338 

339 :type io_executor: s3transfer.futures.BoundedExecutor 

340 :param io_executor: The io executor associated with the 

341 transfer manager 

342 

343 :type transfer_future: s3transfer.futures.TransferFuture 

344 :param transfer_future: The transfer future associated with the 

345 transfer request that tasks are being submitted for 

346 

347 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter 

348 :param bandwidth_limiter: The bandwidth limiter to use when 

349 downloading streams 

350 """ 

351 if ( 

352 transfer_future.meta.size is None 

353 or transfer_future.meta.etag is None 

354 ): 

355 response = client.head_object( 

356 Bucket=transfer_future.meta.call_args.bucket, 

357 Key=transfer_future.meta.call_args.key, 

358 **transfer_future.meta.call_args.extra_args, 

359 ) 

360 # If a size was not provided figure out the size for the 

361 # user. 

362 transfer_future.meta.provide_transfer_size( 

363 response['ContentLength'] 

364 ) 

365 # Provide an etag to ensure a stored object is not modified 

366 # during a multipart download. 

367 transfer_future.meta.provide_object_etag(response.get('ETag')) 

368 

369 download_output_manager = self._get_download_output_manager_cls( 

370 transfer_future, osutil 

371 )(osutil, self._transfer_coordinator, io_executor) 

372 

373 # If it is greater than threshold do a ranged download, otherwise 

374 # do a regular GetObject download. 

375 if transfer_future.meta.size < config.multipart_threshold: 

376 self._submit_download_request( 

377 client, 

378 config, 

379 osutil, 

380 request_executor, 

381 io_executor, 

382 download_output_manager, 

383 transfer_future, 

384 bandwidth_limiter, 

385 ) 

386 else: 

387 self._submit_ranged_download_request( 

388 client, 

389 config, 

390 osutil, 

391 request_executor, 

392 io_executor, 

393 download_output_manager, 

394 transfer_future, 

395 bandwidth_limiter, 

396 ) 

397 

398 def _submit_download_request( 

399 self, 

400 client, 

401 config, 

402 osutil, 

403 request_executor, 

404 io_executor, 

405 download_output_manager, 

406 transfer_future, 

407 bandwidth_limiter, 

408 ): 

409 call_args = transfer_future.meta.call_args 

410 

411 # Get a handle to the file that will be used for writing downloaded 

412 # contents 

413 fileobj = download_output_manager.get_fileobj_for_io_writes( 

414 transfer_future 

415 ) 

416 

417 # Get the needed callbacks for the task 

418 progress_callbacks = get_callbacks(transfer_future, 'progress') 

419 

420 # Get any associated tags for the get object task. 

421 get_object_tag = download_output_manager.get_download_task_tag() 

422 

423 # Get the final io task to run once the download is complete. 

424 final_task = download_output_manager.get_final_io_task() 

425 

426 # Submit the task to download the object. 

427 self._transfer_coordinator.submit( 

428 request_executor, 

429 ImmediatelyWriteIOGetObjectTask( 

430 transfer_coordinator=self._transfer_coordinator, 

431 main_kwargs={ 

432 'client': client, 

433 'bucket': call_args.bucket, 

434 'key': call_args.key, 

435 'fileobj': fileobj, 

436 'extra_args': call_args.extra_args, 

437 'callbacks': progress_callbacks, 

438 'max_attempts': config.num_download_attempts, 

439 'download_output_manager': download_output_manager, 

440 'io_chunksize': config.io_chunksize, 

441 'bandwidth_limiter': bandwidth_limiter, 

442 }, 

443 done_callbacks=[final_task], 

444 ), 

445 tag=get_object_tag, 

446 ) 

447 

448 def _submit_ranged_download_request( 

449 self, 

450 client, 

451 config, 

452 osutil, 

453 request_executor, 

454 io_executor, 

455 download_output_manager, 

456 transfer_future, 

457 bandwidth_limiter, 

458 ): 

459 call_args = transfer_future.meta.call_args 

460 

461 # Get the needed progress callbacks for the task 

462 progress_callbacks = get_callbacks(transfer_future, 'progress') 

463 

464 # Get a handle to the file that will be used for writing downloaded 

465 # contents 

466 fileobj = download_output_manager.get_fileobj_for_io_writes( 

467 transfer_future 

468 ) 

469 

470 # Determine the number of parts 

471 part_size = config.multipart_chunksize 

472 num_parts = calculate_num_parts(transfer_future.meta.size, part_size) 

473 

474 # Get any associated tags for the get object task. 

475 get_object_tag = download_output_manager.get_download_task_tag() 

476 

477 # Callback invoker to submit the final io task once all downloads 

478 # are complete. 

479 finalize_download_invoker = CountCallbackInvoker( 

480 self._get_final_io_task_submission_callback( 

481 download_output_manager, io_executor 

482 ) 

483 ) 

484 for i in range(num_parts): 

485 # Calculate the range parameter 

486 range_parameter = calculate_range_parameter( 

487 part_size, i, num_parts 

488 ) 

489 

490 # Inject extra parameters to be passed in as extra args 

491 extra_args = { 

492 'Range': range_parameter, 

493 } 

494 if transfer_future.meta.etag is not None: 

495 extra_args['IfMatch'] = transfer_future.meta.etag 

496 extra_args.update(call_args.extra_args) 

497 finalize_download_invoker.increment() 

498 # Submit the ranged downloads 

499 self._transfer_coordinator.submit( 

500 request_executor, 

501 GetObjectTask( 

502 transfer_coordinator=self._transfer_coordinator, 

503 main_kwargs={ 

504 'client': client, 

505 'bucket': call_args.bucket, 

506 'key': call_args.key, 

507 'fileobj': fileobj, 

508 'extra_args': extra_args, 

509 'callbacks': progress_callbacks, 

510 'max_attempts': config.num_download_attempts, 

511 'start_index': i * part_size, 

512 'download_output_manager': download_output_manager, 

513 'io_chunksize': config.io_chunksize, 

514 'bandwidth_limiter': bandwidth_limiter, 

515 }, 

516 done_callbacks=[finalize_download_invoker.decrement], 

517 ), 

518 tag=get_object_tag, 

519 ) 

520 finalize_download_invoker.finalize() 

521 

522 def _get_final_io_task_submission_callback( 

523 self, download_manager, io_executor 

524 ): 

525 final_task = download_manager.get_final_io_task() 

526 return FunctionContainer( 

527 self._transfer_coordinator.submit, io_executor, final_task 

528 ) 

529 

530 def _calculate_range_param(self, part_size, part_index, num_parts): 

531 # Used to calculate the Range parameter 

532 start_range = part_index * part_size 

533 if part_index == num_parts - 1: 

534 end_range = '' 

535 else: 

536 end_range = start_range + part_size - 1 

537 range_param = f'bytes={start_range}-{end_range}' 

538 return range_param 

539 

540 

541class GetObjectTask(Task): 

542 def _main( 

543 self, 

544 client, 

545 bucket, 

546 key, 

547 fileobj, 

548 extra_args, 

549 callbacks, 

550 max_attempts, 

551 download_output_manager, 

552 io_chunksize, 

553 start_index=0, 

554 bandwidth_limiter=None, 

555 ): 

556 """Downloads an object and places content into io queue 

557 

558 :param client: The client to use when calling GetObject 

559 :param bucket: The bucket to download from 

560 :param key: The key to download from 

561 :param fileobj: The file handle to write content to 

562 :param exta_args: Any extra arguments to include in GetObject request 

563 :param callbacks: List of progress callbacks to invoke on download 

564 :param max_attempts: The number of retries to do when downloading 

565 :param download_output_manager: The download output manager associated 

566 with the current download. 

567 :param io_chunksize: The size of each io chunk to read from the 

568 download stream and queue in the io queue. 

569 :param start_index: The location in the file to start writing the 

570 content of the key to. 

571 :param bandwidth_limiter: The bandwidth limiter to use when throttling 

572 the downloading of data in streams. 

573 """ 

574 last_exception = None 

575 for i in range(max_attempts): 

576 try: 

577 current_index = start_index 

578 response = client.get_object( 

579 Bucket=bucket, Key=key, **extra_args 

580 ) 

581 streaming_body = StreamReaderProgress( 

582 response['Body'], callbacks 

583 ) 

584 if bandwidth_limiter: 

585 streaming_body = ( 

586 bandwidth_limiter.get_bandwith_limited_stream( 

587 streaming_body, self._transfer_coordinator 

588 ) 

589 ) 

590 

591 chunks = DownloadChunkIterator(streaming_body, io_chunksize) 

592 for chunk in chunks: 

593 # If the transfer is done because of a cancellation 

594 # or error somewhere else, stop trying to submit more 

595 # data to be written and break out of the download. 

596 if not self._transfer_coordinator.done(): 

597 self._handle_io( 

598 download_output_manager, 

599 fileobj, 

600 chunk, 

601 current_index, 

602 ) 

603 current_index += len(chunk) 

604 else: 

605 return 

606 return 

607 except ClientError as e: 

608 error_code = e.response.get('Error', {}).get('Code') 

609 if error_code == "PreconditionFailed": 

610 raise S3DownloadFailedError( 

611 f'Contents of stored object "{key}" in bucket ' 

612 f'"{bucket}" did not match expected ETag.' 

613 ) 

614 else: 

615 raise 

616 except S3_RETRYABLE_DOWNLOAD_ERRORS as e: 

617 logger.debug( 

618 "Retrying exception caught (%s), " 

619 "retrying request, (attempt %s / %s)", 

620 e, 

621 i, 

622 max_attempts, 

623 exc_info=True, 

624 ) 

625 last_exception = e 

626 # Also invoke the progress callbacks to indicate that we 

627 # are trying to download the stream again and all progress 

628 # for this GetObject has been lost. 

629 invoke_progress_callbacks( 

630 callbacks, start_index - current_index 

631 ) 

632 continue 

633 raise RetriesExceededError(last_exception) 

634 

635 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

636 download_output_manager.queue_file_io_task(fileobj, chunk, index) 

637 

638 

639class ImmediatelyWriteIOGetObjectTask(GetObjectTask): 

640 """GetObjectTask that immediately writes to the provided file object 

641 

642 This is useful for downloads where it is known only one thread is 

643 downloading the object so there is no reason to go through the 

644 overhead of using an IO queue and executor. 

645 """ 

646 

647 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

648 task = download_output_manager.get_io_write_task(fileobj, chunk, index) 

649 task() 

650 

651 

652class IOWriteTask(Task): 

653 def _main(self, fileobj, data, offset): 

654 """Pulls off an io queue to write contents to a file 

655 

656 :param fileobj: The file handle to write content to 

657 :param data: The data to write 

658 :param offset: The offset to write the data to. 

659 """ 

660 fileobj.seek(offset) 

661 fileobj.write(data) 

662 

663 

664class IOStreamingWriteTask(Task): 

665 """Task for writing data to a non-seekable stream.""" 

666 

667 def _main(self, fileobj, data): 

668 """Write data to a fileobj. 

669 

670 Data will be written directly to the fileobj without 

671 any prior seeking. 

672 

673 :param fileobj: The fileobj to write content to 

674 :param data: The data to write 

675 

676 """ 

677 fileobj.write(data) 

678 

679 

680class IORenameFileTask(Task): 

681 """A task to rename a temporary file to its final filename 

682 

683 :param fileobj: The file handle that content was written to. 

684 :param final_filename: The final name of the file to rename to 

685 upon completion of writing the contents. 

686 :param osutil: OS utility 

687 """ 

688 

689 def _main(self, fileobj, final_filename, osutil): 

690 fileobj.close() 

691 osutil.rename_file(fileobj.name, final_filename) 

692 

693 

694class IOCloseTask(Task): 

695 """A task to close out a file once the download is complete. 

696 

697 :param fileobj: The fileobj to close. 

698 """ 

699 

700 def _main(self, fileobj): 

701 fileobj.close() 

702 

703 

704class CompleteDownloadNOOPTask(Task): 

705 """A NOOP task to serve as an indicator that the download is complete 

706 

707 Note that the default for is_final is set to True because this should 

708 always be the last task. 

709 """ 

710 

711 def __init__( 

712 self, 

713 transfer_coordinator, 

714 main_kwargs=None, 

715 pending_main_kwargs=None, 

716 done_callbacks=None, 

717 is_final=True, 

718 ): 

719 super().__init__( 

720 transfer_coordinator=transfer_coordinator, 

721 main_kwargs=main_kwargs, 

722 pending_main_kwargs=pending_main_kwargs, 

723 done_callbacks=done_callbacks, 

724 is_final=is_final, 

725 ) 

726 

727 def _main(self): 

728 pass 

729 

730 

731class DownloadChunkIterator: 

732 def __init__(self, body, chunksize): 

733 """Iterator to chunk out a downloaded S3 stream 

734 

735 :param body: A readable file-like object 

736 :param chunksize: The amount to read each time 

737 """ 

738 self._body = body 

739 self._chunksize = chunksize 

740 self._num_reads = 0 

741 

742 def __iter__(self): 

743 return self 

744 

745 def __next__(self): 

746 chunk = self._body.read(self._chunksize) 

747 self._num_reads += 1 

748 if chunk: 

749 return chunk 

750 elif self._num_reads == 1: 

751 # Even though the response may have not had any 

752 # content, we still want to account for an empty object's 

753 # existence so return the empty chunk for that initial 

754 # read. 

755 return chunk 

756 raise StopIteration() 

757 

758 next = __next__ 

759 

760 

761class DeferQueue: 

762 """IO queue that defers write requests until they are queued sequentially. 

763 

764 This class is used to track IO data for a *single* fileobj. 

765 

766 You can send data to this queue, and it will defer any IO write requests 

767 until it has the next contiguous block available (starting at 0). 

768 

769 """ 

770 

771 def __init__(self): 

772 self._writes = [] 

773 self._pending_offsets = {} 

774 self._next_offset = 0 

775 

776 def request_writes(self, offset, data): 

777 """Request any available writes given new incoming data. 

778 

779 You call this method by providing new data along with the 

780 offset associated with the data. If that new data unlocks 

781 any contiguous writes that can now be submitted, this 

782 method will return all applicable writes. 

783 

784 This is done with 1 method call so you don't have to 

785 make two method calls (put(), get()) which acquires a lock 

786 each method call. 

787 

788 """ 

789 if offset + len(data) <= self._next_offset: 

790 # This is a request for a write that we've already 

791 # seen. This can happen in the event of a retry 

792 # where if we retry at at offset N/2, we'll requeue 

793 # offsets 0-N/2 again. 

794 return [] 

795 writes = [] 

796 if offset < self._next_offset: 

797 # This is a special case where the write request contains 

798 # both seen AND unseen data. This can happen in the case 

799 # that we queue part of a chunk due to an incomplete read, 

800 # then pop the incomplete data for writing, then we receive the retry 

801 # for the incomplete read which contains both the previously-seen 

802 # partial chunk followed by the rest of the chunk (unseen). 

803 # 

804 # In this case, we discard the bytes of the data we've already 

805 # queued before, and only queue the unseen bytes. 

806 seen_bytes = self._next_offset - offset 

807 data = data[seen_bytes:] 

808 offset = self._next_offset 

809 if offset in self._pending_offsets: 

810 queued_data = self._pending_offsets[offset] 

811 if len(data) <= len(queued_data): 

812 # We already have a write request queued with the same offset 

813 # with at least as much data that is present in this 

814 # request. In this case we should ignore this request 

815 # and prefer what's already queued. 

816 return [] 

817 else: 

818 # We have a write request queued with the same offset, 

819 # but this request contains more data. This can happen 

820 # in the case of a retried request due to an incomplete 

821 # read, followed by a retry containing the full response 

822 # body. In this case, we should overwrite the queued 

823 # request with this one since it contains more data. 

824 self._pending_offsets[offset] = data 

825 else: 

826 heapq.heappush(self._writes, offset) 

827 self._pending_offsets[offset] = data 

828 while self._writes and self._writes[0] == self._next_offset: 

829 next_write_offset = heapq.heappop(self._writes) 

830 next_write = self._pending_offsets[next_write_offset] 

831 writes.append({'offset': next_write_offset, 'data': next_write}) 

832 del self._pending_offsets[next_write_offset] 

833 self._next_offset += len(next_write) 

834 return writes