Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/download.py: 34%

229 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import heapq 

14import logging 

15import threading 

16 

17from s3transfer.compat import seekable 

18from s3transfer.exceptions import RetriesExceededError 

19from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG 

20from s3transfer.tasks import SubmissionTask, Task 

21from s3transfer.utils import ( 

22 S3_RETRYABLE_DOWNLOAD_ERRORS, 

23 CountCallbackInvoker, 

24 DeferredOpenFile, 

25 FunctionContainer, 

26 StreamReaderProgress, 

27 calculate_num_parts, 

28 calculate_range_parameter, 

29 get_callbacks, 

30 invoke_progress_callbacks, 

31) 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class DownloadOutputManager: 

37 """Base manager class for handling various types of files for downloads 

38 

39 This class is typically used for the DownloadSubmissionTask class to help 

40 determine the following: 

41 

42 * Provides the fileobj to write to downloads to 

43 * Get a task to complete once everything downloaded has been written 

44 

45 The answers/implementations differ for the various types of file outputs 

46 that may be accepted. All implementations must subclass and override 

47 public methods from this class. 

48 """ 

49 

50 def __init__(self, osutil, transfer_coordinator, io_executor): 

51 self._osutil = osutil 

52 self._transfer_coordinator = transfer_coordinator 

53 self._io_executor = io_executor 

54 

55 @classmethod 

56 def is_compatible(cls, download_target, osutil): 

57 """Determines if the target for the download is compatible with manager 

58 

59 :param download_target: The target for which the upload will write 

60 data to. 

61 

62 :param osutil: The os utility to be used for the transfer 

63 

64 :returns: True if the manager can handle the type of target specified 

65 otherwise returns False. 

66 """ 

67 raise NotImplementedError('must implement is_compatible()') 

68 

69 def get_download_task_tag(self): 

70 """Get the tag (if any) to associate all GetObjectTasks 

71 

72 :rtype: s3transfer.futures.TaskTag 

73 :returns: The tag to associate all GetObjectTasks with 

74 """ 

75 return None 

76 

77 def get_fileobj_for_io_writes(self, transfer_future): 

78 """Get file-like object to use for io writes in the io executor 

79 

80 :type transfer_future: s3transfer.futures.TransferFuture 

81 :param transfer_future: The future associated with upload request 

82 

83 returns: A file-like object to write to 

84 """ 

85 raise NotImplementedError('must implement get_fileobj_for_io_writes()') 

86 

87 def queue_file_io_task(self, fileobj, data, offset): 

88 """Queue IO write for submission to the IO executor. 

89 

90 This method accepts an IO executor and information about the 

91 downloaded data, and handles submitting this to the IO executor. 

92 

93 This method may defer submission to the IO executor if necessary. 

94 

95 """ 

96 self._transfer_coordinator.submit( 

97 self._io_executor, self.get_io_write_task(fileobj, data, offset) 

98 ) 

99 

100 def get_io_write_task(self, fileobj, data, offset): 

101 """Get an IO write task for the requested set of data 

102 

103 This task can be ran immediately or be submitted to the IO executor 

104 for it to run. 

105 

106 :type fileobj: file-like object 

107 :param fileobj: The file-like object to write to 

108 

109 :type data: bytes 

110 :param data: The data to write out 

111 

112 :type offset: integer 

113 :param offset: The offset to write the data to in the file-like object 

114 

115 :returns: An IO task to be used to write data to a file-like object 

116 """ 

117 return IOWriteTask( 

118 self._transfer_coordinator, 

119 main_kwargs={ 

120 'fileobj': fileobj, 

121 'data': data, 

122 'offset': offset, 

123 }, 

124 ) 

125 

126 def get_final_io_task(self): 

127 """Get the final io task to complete the download 

128 

129 This is needed because based on the architecture of the TransferManager 

130 the final tasks will be sent to the IO executor, but the executor 

131 needs a final task for it to signal that the transfer is done and 

132 all done callbacks can be run. 

133 

134 :rtype: s3transfer.tasks.Task 

135 :returns: A final task to completed in the io executor 

136 """ 

137 raise NotImplementedError('must implement get_final_io_task()') 

138 

139 def _get_fileobj_from_filename(self, filename): 

140 f = DeferredOpenFile( 

141 filename, mode='wb', open_function=self._osutil.open 

142 ) 

143 # Make sure the file gets closed and we remove the temporary file 

144 # if anything goes wrong during the process. 

145 self._transfer_coordinator.add_failure_cleanup(f.close) 

146 return f 

147 

148 

149class DownloadFilenameOutputManager(DownloadOutputManager): 

150 def __init__(self, osutil, transfer_coordinator, io_executor): 

151 super().__init__(osutil, transfer_coordinator, io_executor) 

152 self._final_filename = None 

153 self._temp_filename = None 

154 self._temp_fileobj = None 

155 

156 @classmethod 

157 def is_compatible(cls, download_target, osutil): 

158 return isinstance(download_target, str) 

159 

160 def get_fileobj_for_io_writes(self, transfer_future): 

161 fileobj = transfer_future.meta.call_args.fileobj 

162 self._final_filename = fileobj 

163 self._temp_filename = self._osutil.get_temp_filename(fileobj) 

164 self._temp_fileobj = self._get_temp_fileobj() 

165 return self._temp_fileobj 

166 

167 def get_final_io_task(self): 

168 # A task to rename the file from the temporary file to its final 

169 # location is needed. This should be the last task needed to complete 

170 # the download. 

171 return IORenameFileTask( 

172 transfer_coordinator=self._transfer_coordinator, 

173 main_kwargs={ 

174 'fileobj': self._temp_fileobj, 

175 'final_filename': self._final_filename, 

176 'osutil': self._osutil, 

177 }, 

178 is_final=True, 

179 ) 

180 

181 def _get_temp_fileobj(self): 

182 f = self._get_fileobj_from_filename(self._temp_filename) 

183 self._transfer_coordinator.add_failure_cleanup( 

184 self._osutil.remove_file, self._temp_filename 

185 ) 

186 return f 

187 

188 

189class DownloadSeekableOutputManager(DownloadOutputManager): 

190 @classmethod 

191 def is_compatible(cls, download_target, osutil): 

192 return seekable(download_target) 

193 

194 def get_fileobj_for_io_writes(self, transfer_future): 

195 # Return the fileobj provided to the future. 

196 return transfer_future.meta.call_args.fileobj 

197 

198 def get_final_io_task(self): 

199 # This task will serve the purpose of signaling when all of the io 

200 # writes have finished so done callbacks can be called. 

201 return CompleteDownloadNOOPTask( 

202 transfer_coordinator=self._transfer_coordinator 

203 ) 

204 

205 

206class DownloadNonSeekableOutputManager(DownloadOutputManager): 

207 def __init__( 

208 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

209 ): 

210 super().__init__(osutil, transfer_coordinator, io_executor) 

211 if defer_queue is None: 

212 defer_queue = DeferQueue() 

213 self._defer_queue = defer_queue 

214 self._io_submit_lock = threading.Lock() 

215 

216 @classmethod 

217 def is_compatible(cls, download_target, osutil): 

218 return hasattr(download_target, 'write') 

219 

220 def get_download_task_tag(self): 

221 return IN_MEMORY_DOWNLOAD_TAG 

222 

223 def get_fileobj_for_io_writes(self, transfer_future): 

224 return transfer_future.meta.call_args.fileobj 

225 

226 def get_final_io_task(self): 

227 return CompleteDownloadNOOPTask( 

228 transfer_coordinator=self._transfer_coordinator 

229 ) 

230 

231 def queue_file_io_task(self, fileobj, data, offset): 

232 with self._io_submit_lock: 

233 writes = self._defer_queue.request_writes(offset, data) 

234 for write in writes: 

235 data = write['data'] 

236 logger.debug( 

237 "Queueing IO offset %s for fileobj: %s", 

238 write['offset'], 

239 fileobj, 

240 ) 

241 super().queue_file_io_task(fileobj, data, offset) 

242 

243 def get_io_write_task(self, fileobj, data, offset): 

244 return IOStreamingWriteTask( 

245 self._transfer_coordinator, 

246 main_kwargs={ 

247 'fileobj': fileobj, 

248 'data': data, 

249 }, 

250 ) 

251 

252 

253class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager): 

254 def __init__( 

255 self, osutil, transfer_coordinator, io_executor, defer_queue=None 

256 ): 

257 super().__init__( 

258 osutil, transfer_coordinator, io_executor, defer_queue 

259 ) 

260 self._fileobj = None 

261 

262 @classmethod 

263 def is_compatible(cls, download_target, osutil): 

264 return isinstance(download_target, str) and osutil.is_special_file( 

265 download_target 

266 ) 

267 

268 def get_fileobj_for_io_writes(self, transfer_future): 

269 filename = transfer_future.meta.call_args.fileobj 

270 self._fileobj = self._get_fileobj_from_filename(filename) 

271 return self._fileobj 

272 

273 def get_final_io_task(self): 

274 # Make sure the file gets closed once the transfer is done. 

275 return IOCloseTask( 

276 transfer_coordinator=self._transfer_coordinator, 

277 is_final=True, 

278 main_kwargs={'fileobj': self._fileobj}, 

279 ) 

280 

281 

282class DownloadSubmissionTask(SubmissionTask): 

283 """Task for submitting tasks to execute a download""" 

284 

285 def _get_download_output_manager_cls(self, transfer_future, osutil): 

286 """Retrieves a class for managing output for a download 

287 

288 :type transfer_future: s3transfer.futures.TransferFuture 

289 :param transfer_future: The transfer future for the request 

290 

291 :type osutil: s3transfer.utils.OSUtils 

292 :param osutil: The os utility associated to the transfer 

293 

294 :rtype: class of DownloadOutputManager 

295 :returns: The appropriate class to use for managing a specific type of 

296 input for downloads. 

297 """ 

298 download_manager_resolver_chain = [ 

299 DownloadSpecialFilenameOutputManager, 

300 DownloadFilenameOutputManager, 

301 DownloadSeekableOutputManager, 

302 DownloadNonSeekableOutputManager, 

303 ] 

304 

305 fileobj = transfer_future.meta.call_args.fileobj 

306 for download_manager_cls in download_manager_resolver_chain: 

307 if download_manager_cls.is_compatible(fileobj, osutil): 

308 return download_manager_cls 

309 raise RuntimeError( 

310 'Output {} of type: {} is not supported.'.format( 

311 fileobj, type(fileobj) 

312 ) 

313 ) 

314 

315 def _submit( 

316 self, 

317 client, 

318 config, 

319 osutil, 

320 request_executor, 

321 io_executor, 

322 transfer_future, 

323 bandwidth_limiter=None, 

324 ): 

325 """ 

326 :param client: The client associated with the transfer manager 

327 

328 :type config: s3transfer.manager.TransferConfig 

329 :param config: The transfer config associated with the transfer 

330 manager 

331 

332 :type osutil: s3transfer.utils.OSUtil 

333 :param osutil: The os utility associated to the transfer manager 

334 

335 :type request_executor: s3transfer.futures.BoundedExecutor 

336 :param request_executor: The request executor associated with the 

337 transfer manager 

338 

339 :type io_executor: s3transfer.futures.BoundedExecutor 

340 :param io_executor: The io executor associated with the 

341 transfer manager 

342 

343 :type transfer_future: s3transfer.futures.TransferFuture 

344 :param transfer_future: The transfer future associated with the 

345 transfer request that tasks are being submitted for 

346 

347 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter 

348 :param bandwidth_limiter: The bandwidth limiter to use when 

349 downloading streams 

350 """ 

351 if transfer_future.meta.size is None: 

352 # If a size was not provided figure out the size for the 

353 # user. 

354 response = client.head_object( 

355 Bucket=transfer_future.meta.call_args.bucket, 

356 Key=transfer_future.meta.call_args.key, 

357 **transfer_future.meta.call_args.extra_args, 

358 ) 

359 transfer_future.meta.provide_transfer_size( 

360 response['ContentLength'] 

361 ) 

362 

363 download_output_manager = self._get_download_output_manager_cls( 

364 transfer_future, osutil 

365 )(osutil, self._transfer_coordinator, io_executor) 

366 

367 # If it is greater than threshold do a ranged download, otherwise 

368 # do a regular GetObject download. 

369 if transfer_future.meta.size < config.multipart_threshold: 

370 self._submit_download_request( 

371 client, 

372 config, 

373 osutil, 

374 request_executor, 

375 io_executor, 

376 download_output_manager, 

377 transfer_future, 

378 bandwidth_limiter, 

379 ) 

380 else: 

381 self._submit_ranged_download_request( 

382 client, 

383 config, 

384 osutil, 

385 request_executor, 

386 io_executor, 

387 download_output_manager, 

388 transfer_future, 

389 bandwidth_limiter, 

390 ) 

391 

392 def _submit_download_request( 

393 self, 

394 client, 

395 config, 

396 osutil, 

397 request_executor, 

398 io_executor, 

399 download_output_manager, 

400 transfer_future, 

401 bandwidth_limiter, 

402 ): 

403 call_args = transfer_future.meta.call_args 

404 

405 # Get a handle to the file that will be used for writing downloaded 

406 # contents 

407 fileobj = download_output_manager.get_fileobj_for_io_writes( 

408 transfer_future 

409 ) 

410 

411 # Get the needed callbacks for the task 

412 progress_callbacks = get_callbacks(transfer_future, 'progress') 

413 

414 # Get any associated tags for the get object task. 

415 get_object_tag = download_output_manager.get_download_task_tag() 

416 

417 # Get the final io task to run once the download is complete. 

418 final_task = download_output_manager.get_final_io_task() 

419 

420 # Submit the task to download the object. 

421 self._transfer_coordinator.submit( 

422 request_executor, 

423 ImmediatelyWriteIOGetObjectTask( 

424 transfer_coordinator=self._transfer_coordinator, 

425 main_kwargs={ 

426 'client': client, 

427 'bucket': call_args.bucket, 

428 'key': call_args.key, 

429 'fileobj': fileobj, 

430 'extra_args': call_args.extra_args, 

431 'callbacks': progress_callbacks, 

432 'max_attempts': config.num_download_attempts, 

433 'download_output_manager': download_output_manager, 

434 'io_chunksize': config.io_chunksize, 

435 'bandwidth_limiter': bandwidth_limiter, 

436 }, 

437 done_callbacks=[final_task], 

438 ), 

439 tag=get_object_tag, 

440 ) 

441 

442 def _submit_ranged_download_request( 

443 self, 

444 client, 

445 config, 

446 osutil, 

447 request_executor, 

448 io_executor, 

449 download_output_manager, 

450 transfer_future, 

451 bandwidth_limiter, 

452 ): 

453 call_args = transfer_future.meta.call_args 

454 

455 # Get the needed progress callbacks for the task 

456 progress_callbacks = get_callbacks(transfer_future, 'progress') 

457 

458 # Get a handle to the file that will be used for writing downloaded 

459 # contents 

460 fileobj = download_output_manager.get_fileobj_for_io_writes( 

461 transfer_future 

462 ) 

463 

464 # Determine the number of parts 

465 part_size = config.multipart_chunksize 

466 num_parts = calculate_num_parts(transfer_future.meta.size, part_size) 

467 

468 # Get any associated tags for the get object task. 

469 get_object_tag = download_output_manager.get_download_task_tag() 

470 

471 # Callback invoker to submit the final io task once all downloads 

472 # are complete. 

473 finalize_download_invoker = CountCallbackInvoker( 

474 self._get_final_io_task_submission_callback( 

475 download_output_manager, io_executor 

476 ) 

477 ) 

478 for i in range(num_parts): 

479 # Calculate the range parameter 

480 range_parameter = calculate_range_parameter( 

481 part_size, i, num_parts 

482 ) 

483 

484 # Inject the Range parameter to the parameters to be passed in 

485 # as extra args 

486 extra_args = {'Range': range_parameter} 

487 extra_args.update(call_args.extra_args) 

488 finalize_download_invoker.increment() 

489 # Submit the ranged downloads 

490 self._transfer_coordinator.submit( 

491 request_executor, 

492 GetObjectTask( 

493 transfer_coordinator=self._transfer_coordinator, 

494 main_kwargs={ 

495 'client': client, 

496 'bucket': call_args.bucket, 

497 'key': call_args.key, 

498 'fileobj': fileobj, 

499 'extra_args': extra_args, 

500 'callbacks': progress_callbacks, 

501 'max_attempts': config.num_download_attempts, 

502 'start_index': i * part_size, 

503 'download_output_manager': download_output_manager, 

504 'io_chunksize': config.io_chunksize, 

505 'bandwidth_limiter': bandwidth_limiter, 

506 }, 

507 done_callbacks=[finalize_download_invoker.decrement], 

508 ), 

509 tag=get_object_tag, 

510 ) 

511 finalize_download_invoker.finalize() 

512 

513 def _get_final_io_task_submission_callback( 

514 self, download_manager, io_executor 

515 ): 

516 final_task = download_manager.get_final_io_task() 

517 return FunctionContainer( 

518 self._transfer_coordinator.submit, io_executor, final_task 

519 ) 

520 

521 def _calculate_range_param(self, part_size, part_index, num_parts): 

522 # Used to calculate the Range parameter 

523 start_range = part_index * part_size 

524 if part_index == num_parts - 1: 

525 end_range = '' 

526 else: 

527 end_range = start_range + part_size - 1 

528 range_param = f'bytes={start_range}-{end_range}' 

529 return range_param 

530 

531 

532class GetObjectTask(Task): 

533 def _main( 

534 self, 

535 client, 

536 bucket, 

537 key, 

538 fileobj, 

539 extra_args, 

540 callbacks, 

541 max_attempts, 

542 download_output_manager, 

543 io_chunksize, 

544 start_index=0, 

545 bandwidth_limiter=None, 

546 ): 

547 """Downloads an object and places content into io queue 

548 

549 :param client: The client to use when calling GetObject 

550 :param bucket: The bucket to download from 

551 :param key: The key to download from 

552 :param fileobj: The file handle to write content to 

553 :param exta_args: Any extra arguments to include in GetObject request 

554 :param callbacks: List of progress callbacks to invoke on download 

555 :param max_attempts: The number of retries to do when downloading 

556 :param download_output_manager: The download output manager associated 

557 with the current download. 

558 :param io_chunksize: The size of each io chunk to read from the 

559 download stream and queue in the io queue. 

560 :param start_index: The location in the file to start writing the 

561 content of the key to. 

562 :param bandwidth_limiter: The bandwidth limiter to use when throttling 

563 the downloading of data in streams. 

564 """ 

565 last_exception = None 

566 for i in range(max_attempts): 

567 try: 

568 current_index = start_index 

569 response = client.get_object( 

570 Bucket=bucket, Key=key, **extra_args 

571 ) 

572 streaming_body = StreamReaderProgress( 

573 response['Body'], callbacks 

574 ) 

575 if bandwidth_limiter: 

576 streaming_body = ( 

577 bandwidth_limiter.get_bandwith_limited_stream( 

578 streaming_body, self._transfer_coordinator 

579 ) 

580 ) 

581 

582 chunks = DownloadChunkIterator(streaming_body, io_chunksize) 

583 for chunk in chunks: 

584 # If the transfer is done because of a cancellation 

585 # or error somewhere else, stop trying to submit more 

586 # data to be written and break out of the download. 

587 if not self._transfer_coordinator.done(): 

588 self._handle_io( 

589 download_output_manager, 

590 fileobj, 

591 chunk, 

592 current_index, 

593 ) 

594 current_index += len(chunk) 

595 else: 

596 return 

597 return 

598 except S3_RETRYABLE_DOWNLOAD_ERRORS as e: 

599 logger.debug( 

600 "Retrying exception caught (%s), " 

601 "retrying request, (attempt %s / %s)", 

602 e, 

603 i, 

604 max_attempts, 

605 exc_info=True, 

606 ) 

607 last_exception = e 

608 # Also invoke the progress callbacks to indicate that we 

609 # are trying to download the stream again and all progress 

610 # for this GetObject has been lost. 

611 invoke_progress_callbacks( 

612 callbacks, start_index - current_index 

613 ) 

614 continue 

615 raise RetriesExceededError(last_exception) 

616 

617 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

618 download_output_manager.queue_file_io_task(fileobj, chunk, index) 

619 

620 

621class ImmediatelyWriteIOGetObjectTask(GetObjectTask): 

622 """GetObjectTask that immediately writes to the provided file object 

623 

624 This is useful for downloads where it is known only one thread is 

625 downloading the object so there is no reason to go through the 

626 overhead of using an IO queue and executor. 

627 """ 

628 

629 def _handle_io(self, download_output_manager, fileobj, chunk, index): 

630 task = download_output_manager.get_io_write_task(fileobj, chunk, index) 

631 task() 

632 

633 

634class IOWriteTask(Task): 

635 def _main(self, fileobj, data, offset): 

636 """Pulls off an io queue to write contents to a file 

637 

638 :param fileobj: The file handle to write content to 

639 :param data: The data to write 

640 :param offset: The offset to write the data to. 

641 """ 

642 fileobj.seek(offset) 

643 fileobj.write(data) 

644 

645 

646class IOStreamingWriteTask(Task): 

647 """Task for writing data to a non-seekable stream.""" 

648 

649 def _main(self, fileobj, data): 

650 """Write data to a fileobj. 

651 

652 Data will be written directly to the fileobj without 

653 any prior seeking. 

654 

655 :param fileobj: The fileobj to write content to 

656 :param data: The data to write 

657 

658 """ 

659 fileobj.write(data) 

660 

661 

662class IORenameFileTask(Task): 

663 """A task to rename a temporary file to its final filename 

664 

665 :param fileobj: The file handle that content was written to. 

666 :param final_filename: The final name of the file to rename to 

667 upon completion of writing the contents. 

668 :param osutil: OS utility 

669 """ 

670 

671 def _main(self, fileobj, final_filename, osutil): 

672 fileobj.close() 

673 osutil.rename_file(fileobj.name, final_filename) 

674 

675 

676class IOCloseTask(Task): 

677 """A task to close out a file once the download is complete. 

678 

679 :param fileobj: The fileobj to close. 

680 """ 

681 

682 def _main(self, fileobj): 

683 fileobj.close() 

684 

685 

686class CompleteDownloadNOOPTask(Task): 

687 """A NOOP task to serve as an indicator that the download is complete 

688 

689 Note that the default for is_final is set to True because this should 

690 always be the last task. 

691 """ 

692 

693 def __init__( 

694 self, 

695 transfer_coordinator, 

696 main_kwargs=None, 

697 pending_main_kwargs=None, 

698 done_callbacks=None, 

699 is_final=True, 

700 ): 

701 super().__init__( 

702 transfer_coordinator=transfer_coordinator, 

703 main_kwargs=main_kwargs, 

704 pending_main_kwargs=pending_main_kwargs, 

705 done_callbacks=done_callbacks, 

706 is_final=is_final, 

707 ) 

708 

709 def _main(self): 

710 pass 

711 

712 

713class DownloadChunkIterator: 

714 def __init__(self, body, chunksize): 

715 """Iterator to chunk out a downloaded S3 stream 

716 

717 :param body: A readable file-like object 

718 :param chunksize: The amount to read each time 

719 """ 

720 self._body = body 

721 self._chunksize = chunksize 

722 self._num_reads = 0 

723 

724 def __iter__(self): 

725 return self 

726 

727 def __next__(self): 

728 chunk = self._body.read(self._chunksize) 

729 self._num_reads += 1 

730 if chunk: 

731 return chunk 

732 elif self._num_reads == 1: 

733 # Even though the response may have not had any 

734 # content, we still want to account for an empty object's 

735 # existence so return the empty chunk for that initial 

736 # read. 

737 return chunk 

738 raise StopIteration() 

739 

740 next = __next__ 

741 

742 

743class DeferQueue: 

744 """IO queue that defers write requests until they are queued sequentially. 

745 

746 This class is used to track IO data for a *single* fileobj. 

747 

748 You can send data to this queue, and it will defer any IO write requests 

749 until it has the next contiguous block available (starting at 0). 

750 

751 """ 

752 

753 def __init__(self): 

754 self._writes = [] 

755 self._pending_offsets = set() 

756 self._next_offset = 0 

757 

758 def request_writes(self, offset, data): 

759 """Request any available writes given new incoming data. 

760 

761 You call this method by providing new data along with the 

762 offset associated with the data. If that new data unlocks 

763 any contiguous writes that can now be submitted, this 

764 method will return all applicable writes. 

765 

766 This is done with 1 method call so you don't have to 

767 make two method calls (put(), get()) which acquires a lock 

768 each method call. 

769 

770 """ 

771 if offset < self._next_offset: 

772 # This is a request for a write that we've already 

773 # seen. This can happen in the event of a retry 

774 # where if we retry at at offset N/2, we'll requeue 

775 # offsets 0-N/2 again. 

776 return [] 

777 writes = [] 

778 if offset in self._pending_offsets: 

779 # We've already queued this offset so this request is 

780 # a duplicate. In this case we should ignore 

781 # this request and prefer what's already queued. 

782 return [] 

783 heapq.heappush(self._writes, (offset, data)) 

784 self._pending_offsets.add(offset) 

785 while self._writes and self._writes[0][0] == self._next_offset: 

786 next_write = heapq.heappop(self._writes) 

787 writes.append({'offset': next_write[0], 'data': next_write[1]}) 

788 self._pending_offsets.remove(next_write[0]) 

789 self._next_offset += len(next_write[1]) 

790 return writes