1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import heapq
14import logging
15import threading
16
17from botocore.exceptions import ClientError
18
19from s3transfer.compat import seekable
20from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError
21from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
22from s3transfer.tasks import SubmissionTask, Task
23from s3transfer.utils import (
24 S3_RETRYABLE_DOWNLOAD_ERRORS,
25 CountCallbackInvoker,
26 DeferredOpenFile,
27 FunctionContainer,
28 StreamReaderProgress,
29 calculate_num_parts,
30 calculate_range_parameter,
31 get_callbacks,
32 invoke_progress_callbacks,
33)
34
35logger = logging.getLogger(__name__)
36
37
38class DownloadOutputManager:
39 """Base manager class for handling various types of files for downloads
40
41 This class is typically used for the DownloadSubmissionTask class to help
42 determine the following:
43
44 * Provides the fileobj to write to downloads to
45 * Get a task to complete once everything downloaded has been written
46
47 The answers/implementations differ for the various types of file outputs
48 that may be accepted. All implementations must subclass and override
49 public methods from this class.
50 """
51
52 def __init__(self, osutil, transfer_coordinator, io_executor):
53 self._osutil = osutil
54 self._transfer_coordinator = transfer_coordinator
55 self._io_executor = io_executor
56
57 @classmethod
58 def is_compatible(cls, download_target, osutil):
59 """Determines if the target for the download is compatible with manager
60
61 :param download_target: The target for which the upload will write
62 data to.
63
64 :param osutil: The os utility to be used for the transfer
65
66 :returns: True if the manager can handle the type of target specified
67 otherwise returns False.
68 """
69 raise NotImplementedError('must implement is_compatible()')
70
71 def get_download_task_tag(self):
72 """Get the tag (if any) to associate all GetObjectTasks
73
74 :rtype: s3transfer.futures.TaskTag
75 :returns: The tag to associate all GetObjectTasks with
76 """
77 return None
78
79 def get_fileobj_for_io_writes(self, transfer_future):
80 """Get file-like object to use for io writes in the io executor
81
82 :type transfer_future: s3transfer.futures.TransferFuture
83 :param transfer_future: The future associated with upload request
84
85 returns: A file-like object to write to
86 """
87 raise NotImplementedError('must implement get_fileobj_for_io_writes()')
88
89 def queue_file_io_task(self, fileobj, data, offset):
90 """Queue IO write for submission to the IO executor.
91
92 This method accepts an IO executor and information about the
93 downloaded data, and handles submitting this to the IO executor.
94
95 This method may defer submission to the IO executor if necessary.
96
97 """
98 self._transfer_coordinator.submit(
99 self._io_executor, self.get_io_write_task(fileobj, data, offset)
100 )
101
102 def get_io_write_task(self, fileobj, data, offset):
103 """Get an IO write task for the requested set of data
104
105 This task can be ran immediately or be submitted to the IO executor
106 for it to run.
107
108 :type fileobj: file-like object
109 :param fileobj: The file-like object to write to
110
111 :type data: bytes
112 :param data: The data to write out
113
114 :type offset: integer
115 :param offset: The offset to write the data to in the file-like object
116
117 :returns: An IO task to be used to write data to a file-like object
118 """
119 return IOWriteTask(
120 self._transfer_coordinator,
121 main_kwargs={
122 'fileobj': fileobj,
123 'data': data,
124 'offset': offset,
125 },
126 )
127
128 def get_final_io_task(self):
129 """Get the final io task to complete the download
130
131 This is needed because based on the architecture of the TransferManager
132 the final tasks will be sent to the IO executor, but the executor
133 needs a final task for it to signal that the transfer is done and
134 all done callbacks can be run.
135
136 :rtype: s3transfer.tasks.Task
137 :returns: A final task to completed in the io executor
138 """
139 raise NotImplementedError('must implement get_final_io_task()')
140
141 def _get_fileobj_from_filename(self, filename):
142 f = DeferredOpenFile(
143 filename, mode='wb', open_function=self._osutil.open
144 )
145 # Make sure the file gets closed and we remove the temporary file
146 # if anything goes wrong during the process.
147 self._transfer_coordinator.add_failure_cleanup(f.close)
148 return f
149
150
151class DownloadFilenameOutputManager(DownloadOutputManager):
152 def __init__(self, osutil, transfer_coordinator, io_executor):
153 super().__init__(osutil, transfer_coordinator, io_executor)
154 self._final_filename = None
155 self._temp_filename = None
156 self._temp_fileobj = None
157
158 @classmethod
159 def is_compatible(cls, download_target, osutil):
160 return isinstance(download_target, str)
161
162 def get_fileobj_for_io_writes(self, transfer_future):
163 fileobj = transfer_future.meta.call_args.fileobj
164 self._final_filename = fileobj
165 self._temp_filename = self._osutil.get_temp_filename(fileobj)
166 self._temp_fileobj = self._get_temp_fileobj()
167 return self._temp_fileobj
168
169 def get_final_io_task(self):
170 # A task to rename the file from the temporary file to its final
171 # location is needed. This should be the last task needed to complete
172 # the download.
173 return IORenameFileTask(
174 transfer_coordinator=self._transfer_coordinator,
175 main_kwargs={
176 'fileobj': self._temp_fileobj,
177 'final_filename': self._final_filename,
178 'osutil': self._osutil,
179 },
180 is_final=True,
181 )
182
183 def _get_temp_fileobj(self):
184 f = self._get_fileobj_from_filename(self._temp_filename)
185 self._transfer_coordinator.add_failure_cleanup(
186 self._osutil.remove_file, self._temp_filename
187 )
188 return f
189
190
191class DownloadSeekableOutputManager(DownloadOutputManager):
192 @classmethod
193 def is_compatible(cls, download_target, osutil):
194 return seekable(download_target)
195
196 def get_fileobj_for_io_writes(self, transfer_future):
197 # Return the fileobj provided to the future.
198 return transfer_future.meta.call_args.fileobj
199
200 def get_final_io_task(self):
201 # This task will serve the purpose of signaling when all of the io
202 # writes have finished so done callbacks can be called.
203 return CompleteDownloadNOOPTask(
204 transfer_coordinator=self._transfer_coordinator
205 )
206
207
208class DownloadNonSeekableOutputManager(DownloadOutputManager):
209 def __init__(
210 self, osutil, transfer_coordinator, io_executor, defer_queue=None
211 ):
212 super().__init__(osutil, transfer_coordinator, io_executor)
213 if defer_queue is None:
214 defer_queue = DeferQueue()
215 self._defer_queue = defer_queue
216 self._io_submit_lock = threading.Lock()
217
218 @classmethod
219 def is_compatible(cls, download_target, osutil):
220 return hasattr(download_target, 'write')
221
222 def get_download_task_tag(self):
223 return IN_MEMORY_DOWNLOAD_TAG
224
225 def get_fileobj_for_io_writes(self, transfer_future):
226 return transfer_future.meta.call_args.fileobj
227
228 def get_final_io_task(self):
229 return CompleteDownloadNOOPTask(
230 transfer_coordinator=self._transfer_coordinator
231 )
232
233 def queue_file_io_task(self, fileobj, data, offset):
234 with self._io_submit_lock:
235 writes = self._defer_queue.request_writes(offset, data)
236 for write in writes:
237 data = write['data']
238 logger.debug(
239 "Queueing IO offset %s for fileobj: %s",
240 write['offset'],
241 fileobj,
242 )
243 super().queue_file_io_task(fileobj, data, offset)
244
245 def get_io_write_task(self, fileobj, data, offset):
246 return IOStreamingWriteTask(
247 self._transfer_coordinator,
248 main_kwargs={
249 'fileobj': fileobj,
250 'data': data,
251 },
252 )
253
254
255class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager):
256 def __init__(
257 self, osutil, transfer_coordinator, io_executor, defer_queue=None
258 ):
259 super().__init__(
260 osutil, transfer_coordinator, io_executor, defer_queue
261 )
262 self._fileobj = None
263
264 @classmethod
265 def is_compatible(cls, download_target, osutil):
266 return isinstance(download_target, str) and osutil.is_special_file(
267 download_target
268 )
269
270 def get_fileobj_for_io_writes(self, transfer_future):
271 filename = transfer_future.meta.call_args.fileobj
272 self._fileobj = self._get_fileobj_from_filename(filename)
273 return self._fileobj
274
275 def get_final_io_task(self):
276 # Make sure the file gets closed once the transfer is done.
277 return IOCloseTask(
278 transfer_coordinator=self._transfer_coordinator,
279 is_final=True,
280 main_kwargs={'fileobj': self._fileobj},
281 )
282
283
284class DownloadSubmissionTask(SubmissionTask):
285 """Task for submitting tasks to execute a download"""
286
287 def _get_download_output_manager_cls(self, transfer_future, osutil):
288 """Retrieves a class for managing output for a download
289
290 :type transfer_future: s3transfer.futures.TransferFuture
291 :param transfer_future: The transfer future for the request
292
293 :type osutil: s3transfer.utils.OSUtils
294 :param osutil: The os utility associated to the transfer
295
296 :rtype: class of DownloadOutputManager
297 :returns: The appropriate class to use for managing a specific type of
298 input for downloads.
299 """
300 download_manager_resolver_chain = [
301 DownloadSpecialFilenameOutputManager,
302 DownloadFilenameOutputManager,
303 DownloadSeekableOutputManager,
304 DownloadNonSeekableOutputManager,
305 ]
306
307 fileobj = transfer_future.meta.call_args.fileobj
308 for download_manager_cls in download_manager_resolver_chain:
309 if download_manager_cls.is_compatible(fileobj, osutil):
310 return download_manager_cls
311 raise RuntimeError(
312 f'Output {fileobj} of type: {type(fileobj)} is not supported.'
313 )
314
315 def _submit(
316 self,
317 client,
318 config,
319 osutil,
320 request_executor,
321 io_executor,
322 transfer_future,
323 bandwidth_limiter=None,
324 ):
325 """
326 :param client: The client associated with the transfer manager
327
328 :type config: s3transfer.manager.TransferConfig
329 :param config: The transfer config associated with the transfer
330 manager
331
332 :type osutil: s3transfer.utils.OSUtil
333 :param osutil: The os utility associated to the transfer manager
334
335 :type request_executor: s3transfer.futures.BoundedExecutor
336 :param request_executor: The request executor associated with the
337 transfer manager
338
339 :type io_executor: s3transfer.futures.BoundedExecutor
340 :param io_executor: The io executor associated with the
341 transfer manager
342
343 :type transfer_future: s3transfer.futures.TransferFuture
344 :param transfer_future: The transfer future associated with the
345 transfer request that tasks are being submitted for
346
347 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
348 :param bandwidth_limiter: The bandwidth limiter to use when
349 downloading streams
350 """
351 if (
352 transfer_future.meta.size is None
353 or transfer_future.meta.etag is None
354 ):
355 response = client.head_object(
356 Bucket=transfer_future.meta.call_args.bucket,
357 Key=transfer_future.meta.call_args.key,
358 **transfer_future.meta.call_args.extra_args,
359 )
360 # If a size was not provided figure out the size for the
361 # user.
362 transfer_future.meta.provide_transfer_size(
363 response['ContentLength']
364 )
365 # Provide an etag to ensure a stored object is not modified
366 # during a multipart download.
367 transfer_future.meta.provide_object_etag(response.get('ETag'))
368
369 download_output_manager = self._get_download_output_manager_cls(
370 transfer_future, osutil
371 )(osutil, self._transfer_coordinator, io_executor)
372
373 # If it is greater than threshold do a ranged download, otherwise
374 # do a regular GetObject download.
375 if transfer_future.meta.size < config.multipart_threshold:
376 self._submit_download_request(
377 client,
378 config,
379 osutil,
380 request_executor,
381 io_executor,
382 download_output_manager,
383 transfer_future,
384 bandwidth_limiter,
385 )
386 else:
387 self._submit_ranged_download_request(
388 client,
389 config,
390 osutil,
391 request_executor,
392 io_executor,
393 download_output_manager,
394 transfer_future,
395 bandwidth_limiter,
396 )
397
398 def _submit_download_request(
399 self,
400 client,
401 config,
402 osutil,
403 request_executor,
404 io_executor,
405 download_output_manager,
406 transfer_future,
407 bandwidth_limiter,
408 ):
409 call_args = transfer_future.meta.call_args
410
411 # Get a handle to the file that will be used for writing downloaded
412 # contents
413 fileobj = download_output_manager.get_fileobj_for_io_writes(
414 transfer_future
415 )
416
417 # Get the needed callbacks for the task
418 progress_callbacks = get_callbacks(transfer_future, 'progress')
419
420 # Get any associated tags for the get object task.
421 get_object_tag = download_output_manager.get_download_task_tag()
422
423 # Get the final io task to run once the download is complete.
424 final_task = download_output_manager.get_final_io_task()
425
426 # Submit the task to download the object.
427 self._transfer_coordinator.submit(
428 request_executor,
429 ImmediatelyWriteIOGetObjectTask(
430 transfer_coordinator=self._transfer_coordinator,
431 main_kwargs={
432 'client': client,
433 'bucket': call_args.bucket,
434 'key': call_args.key,
435 'fileobj': fileobj,
436 'extra_args': call_args.extra_args,
437 'callbacks': progress_callbacks,
438 'max_attempts': config.num_download_attempts,
439 'download_output_manager': download_output_manager,
440 'io_chunksize': config.io_chunksize,
441 'bandwidth_limiter': bandwidth_limiter,
442 },
443 done_callbacks=[final_task],
444 ),
445 tag=get_object_tag,
446 )
447
448 def _submit_ranged_download_request(
449 self,
450 client,
451 config,
452 osutil,
453 request_executor,
454 io_executor,
455 download_output_manager,
456 transfer_future,
457 bandwidth_limiter,
458 ):
459 call_args = transfer_future.meta.call_args
460
461 # Get the needed progress callbacks for the task
462 progress_callbacks = get_callbacks(transfer_future, 'progress')
463
464 # Get a handle to the file that will be used for writing downloaded
465 # contents
466 fileobj = download_output_manager.get_fileobj_for_io_writes(
467 transfer_future
468 )
469
470 # Determine the number of parts
471 part_size = config.multipart_chunksize
472 num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
473
474 # Get any associated tags for the get object task.
475 get_object_tag = download_output_manager.get_download_task_tag()
476
477 # Callback invoker to submit the final io task once all downloads
478 # are complete.
479 finalize_download_invoker = CountCallbackInvoker(
480 self._get_final_io_task_submission_callback(
481 download_output_manager, io_executor
482 )
483 )
484 for i in range(num_parts):
485 # Calculate the range parameter
486 range_parameter = calculate_range_parameter(
487 part_size, i, num_parts
488 )
489
490 # Inject extra parameters to be passed in as extra args
491 extra_args = {
492 'Range': range_parameter,
493 }
494 if transfer_future.meta.etag is not None:
495 extra_args['IfMatch'] = transfer_future.meta.etag
496 extra_args.update(call_args.extra_args)
497 finalize_download_invoker.increment()
498 # Submit the ranged downloads
499 self._transfer_coordinator.submit(
500 request_executor,
501 GetObjectTask(
502 transfer_coordinator=self._transfer_coordinator,
503 main_kwargs={
504 'client': client,
505 'bucket': call_args.bucket,
506 'key': call_args.key,
507 'fileobj': fileobj,
508 'extra_args': extra_args,
509 'callbacks': progress_callbacks,
510 'max_attempts': config.num_download_attempts,
511 'start_index': i * part_size,
512 'download_output_manager': download_output_manager,
513 'io_chunksize': config.io_chunksize,
514 'bandwidth_limiter': bandwidth_limiter,
515 },
516 done_callbacks=[finalize_download_invoker.decrement],
517 ),
518 tag=get_object_tag,
519 )
520 finalize_download_invoker.finalize()
521
522 def _get_final_io_task_submission_callback(
523 self, download_manager, io_executor
524 ):
525 final_task = download_manager.get_final_io_task()
526 return FunctionContainer(
527 self._transfer_coordinator.submit, io_executor, final_task
528 )
529
530 def _calculate_range_param(self, part_size, part_index, num_parts):
531 # Used to calculate the Range parameter
532 start_range = part_index * part_size
533 if part_index == num_parts - 1:
534 end_range = ''
535 else:
536 end_range = start_range + part_size - 1
537 range_param = f'bytes={start_range}-{end_range}'
538 return range_param
539
540
541class GetObjectTask(Task):
542 def _main(
543 self,
544 client,
545 bucket,
546 key,
547 fileobj,
548 extra_args,
549 callbacks,
550 max_attempts,
551 download_output_manager,
552 io_chunksize,
553 start_index=0,
554 bandwidth_limiter=None,
555 ):
556 """Downloads an object and places content into io queue
557
558 :param client: The client to use when calling GetObject
559 :param bucket: The bucket to download from
560 :param key: The key to download from
561 :param fileobj: The file handle to write content to
562 :param exta_args: Any extra arguments to include in GetObject request
563 :param callbacks: List of progress callbacks to invoke on download
564 :param max_attempts: The number of retries to do when downloading
565 :param download_output_manager: The download output manager associated
566 with the current download.
567 :param io_chunksize: The size of each io chunk to read from the
568 download stream and queue in the io queue.
569 :param start_index: The location in the file to start writing the
570 content of the key to.
571 :param bandwidth_limiter: The bandwidth limiter to use when throttling
572 the downloading of data in streams.
573 """
574 last_exception = None
575 for i in range(max_attempts):
576 try:
577 current_index = start_index
578 response = client.get_object(
579 Bucket=bucket, Key=key, **extra_args
580 )
581 streaming_body = StreamReaderProgress(
582 response['Body'], callbacks
583 )
584 if bandwidth_limiter:
585 streaming_body = (
586 bandwidth_limiter.get_bandwith_limited_stream(
587 streaming_body, self._transfer_coordinator
588 )
589 )
590
591 chunks = DownloadChunkIterator(streaming_body, io_chunksize)
592 for chunk in chunks:
593 # If the transfer is done because of a cancellation
594 # or error somewhere else, stop trying to submit more
595 # data to be written and break out of the download.
596 if not self._transfer_coordinator.done():
597 self._handle_io(
598 download_output_manager,
599 fileobj,
600 chunk,
601 current_index,
602 )
603 current_index += len(chunk)
604 else:
605 return
606 return
607 except ClientError as e:
608 error_code = e.response.get('Error', {}).get('Code')
609 if error_code == "PreconditionFailed":
610 raise S3DownloadFailedError(
611 f'Contents of stored object "{key}" in bucket '
612 f'"{bucket}" did not match expected ETag.'
613 )
614 else:
615 raise
616 except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
617 logger.debug(
618 "Retrying exception caught (%s), "
619 "retrying request, (attempt %s / %s)",
620 e,
621 i,
622 max_attempts,
623 exc_info=True,
624 )
625 last_exception = e
626 # Also invoke the progress callbacks to indicate that we
627 # are trying to download the stream again and all progress
628 # for this GetObject has been lost.
629 invoke_progress_callbacks(
630 callbacks, start_index - current_index
631 )
632 continue
633 raise RetriesExceededError(last_exception)
634
635 def _handle_io(self, download_output_manager, fileobj, chunk, index):
636 download_output_manager.queue_file_io_task(fileobj, chunk, index)
637
638
639class ImmediatelyWriteIOGetObjectTask(GetObjectTask):
640 """GetObjectTask that immediately writes to the provided file object
641
642 This is useful for downloads where it is known only one thread is
643 downloading the object so there is no reason to go through the
644 overhead of using an IO queue and executor.
645 """
646
647 def _handle_io(self, download_output_manager, fileobj, chunk, index):
648 task = download_output_manager.get_io_write_task(fileobj, chunk, index)
649 task()
650
651
652class IOWriteTask(Task):
653 def _main(self, fileobj, data, offset):
654 """Pulls off an io queue to write contents to a file
655
656 :param fileobj: The file handle to write content to
657 :param data: The data to write
658 :param offset: The offset to write the data to.
659 """
660 fileobj.seek(offset)
661 fileobj.write(data)
662
663
664class IOStreamingWriteTask(Task):
665 """Task for writing data to a non-seekable stream."""
666
667 def _main(self, fileobj, data):
668 """Write data to a fileobj.
669
670 Data will be written directly to the fileobj without
671 any prior seeking.
672
673 :param fileobj: The fileobj to write content to
674 :param data: The data to write
675
676 """
677 fileobj.write(data)
678
679
680class IORenameFileTask(Task):
681 """A task to rename a temporary file to its final filename
682
683 :param fileobj: The file handle that content was written to.
684 :param final_filename: The final name of the file to rename to
685 upon completion of writing the contents.
686 :param osutil: OS utility
687 """
688
689 def _main(self, fileobj, final_filename, osutil):
690 fileobj.close()
691 osutil.rename_file(fileobj.name, final_filename)
692
693
694class IOCloseTask(Task):
695 """A task to close out a file once the download is complete.
696
697 :param fileobj: The fileobj to close.
698 """
699
700 def _main(self, fileobj):
701 fileobj.close()
702
703
704class CompleteDownloadNOOPTask(Task):
705 """A NOOP task to serve as an indicator that the download is complete
706
707 Note that the default for is_final is set to True because this should
708 always be the last task.
709 """
710
711 def __init__(
712 self,
713 transfer_coordinator,
714 main_kwargs=None,
715 pending_main_kwargs=None,
716 done_callbacks=None,
717 is_final=True,
718 ):
719 super().__init__(
720 transfer_coordinator=transfer_coordinator,
721 main_kwargs=main_kwargs,
722 pending_main_kwargs=pending_main_kwargs,
723 done_callbacks=done_callbacks,
724 is_final=is_final,
725 )
726
727 def _main(self):
728 pass
729
730
731class DownloadChunkIterator:
732 def __init__(self, body, chunksize):
733 """Iterator to chunk out a downloaded S3 stream
734
735 :param body: A readable file-like object
736 :param chunksize: The amount to read each time
737 """
738 self._body = body
739 self._chunksize = chunksize
740 self._num_reads = 0
741
742 def __iter__(self):
743 return self
744
745 def __next__(self):
746 chunk = self._body.read(self._chunksize)
747 self._num_reads += 1
748 if chunk:
749 return chunk
750 elif self._num_reads == 1:
751 # Even though the response may have not had any
752 # content, we still want to account for an empty object's
753 # existence so return the empty chunk for that initial
754 # read.
755 return chunk
756 raise StopIteration()
757
758 next = __next__
759
760
761class DeferQueue:
762 """IO queue that defers write requests until they are queued sequentially.
763
764 This class is used to track IO data for a *single* fileobj.
765
766 You can send data to this queue, and it will defer any IO write requests
767 until it has the next contiguous block available (starting at 0).
768
769 """
770
771 def __init__(self):
772 self._writes = []
773 self._pending_offsets = {}
774 self._next_offset = 0
775
776 def request_writes(self, offset, data):
777 """Request any available writes given new incoming data.
778
779 You call this method by providing new data along with the
780 offset associated with the data. If that new data unlocks
781 any contiguous writes that can now be submitted, this
782 method will return all applicable writes.
783
784 This is done with 1 method call so you don't have to
785 make two method calls (put(), get()) which acquires a lock
786 each method call.
787
788 """
789 if offset + len(data) <= self._next_offset:
790 # This is a request for a write that we've already
791 # seen. This can happen in the event of a retry
792 # where if we retry at at offset N/2, we'll requeue
793 # offsets 0-N/2 again.
794 return []
795 writes = []
796 if offset < self._next_offset:
797 # This is a special case where the write request contains
798 # both seen AND unseen data. This can happen in the case
799 # that we queue part of a chunk due to an incomplete read,
800 # then pop the incomplete data for writing, then we receive the retry
801 # for the incomplete read which contains both the previously-seen
802 # partial chunk followed by the rest of the chunk (unseen).
803 #
804 # In this case, we discard the bytes of the data we've already
805 # queued before, and only queue the unseen bytes.
806 seen_bytes = self._next_offset - offset
807 data = data[seen_bytes:]
808 offset = self._next_offset
809 if offset in self._pending_offsets:
810 queued_data = self._pending_offsets[offset]
811 if len(data) <= len(queued_data):
812 # We already have a write request queued with the same offset
813 # with at least as much data that is present in this
814 # request. In this case we should ignore this request
815 # and prefer what's already queued.
816 return []
817 else:
818 # We have a write request queued with the same offset,
819 # but this request contains more data. This can happen
820 # in the case of a retried request due to an incomplete
821 # read, followed by a retry containing the full response
822 # body. In this case, we should overwrite the queued
823 # request with this one since it contains more data.
824 self._pending_offsets[offset] = data
825 else:
826 heapq.heappush(self._writes, offset)
827 self._pending_offsets[offset] = data
828 while self._writes and self._writes[0] == self._next_offset:
829 next_write_offset = heapq.heappop(self._writes)
830 next_write = self._pending_offsets[next_write_offset]
831 writes.append({'offset': next_write_offset, 'data': next_write})
832 del self._pending_offsets[next_write_offset]
833 self._next_offset += len(next_write)
834 return writes