1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import heapq
14import logging
15import threading
16
17from botocore.exceptions import ClientError
18
19from s3transfer.compat import seekable
20from s3transfer.exceptions import (
21 RetriesExceededError,
22 S3DownloadFailedError,
23 S3ValidationError,
24)
25from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
26from s3transfer.tasks import SubmissionTask, Task
27from s3transfer.utils import (
28 S3_RETRYABLE_DOWNLOAD_ERRORS,
29 CountCallbackInvoker,
30 DeferredOpenFile,
31 FunctionContainer,
32 StreamReaderProgress,
33 calculate_num_parts,
34 calculate_range_parameter,
35 get_callbacks,
36 invoke_progress_callbacks,
37)
38
39logger = logging.getLogger(__name__)
40
41
42class DownloadOutputManager:
43 """Base manager class for handling various types of files for downloads
44
45 This class is typically used for the DownloadSubmissionTask class to help
46 determine the following:
47
48 * Provides the fileobj to write to downloads to
49 * Get a task to complete once everything downloaded has been written
50
51 The answers/implementations differ for the various types of file outputs
52 that may be accepted. All implementations must subclass and override
53 public methods from this class.
54 """
55
56 def __init__(self, osutil, transfer_coordinator, io_executor):
57 self._osutil = osutil
58 self._transfer_coordinator = transfer_coordinator
59 self._io_executor = io_executor
60
61 @classmethod
62 def is_compatible(cls, download_target, osutil):
63 """Determines if the target for the download is compatible with manager
64
65 :param download_target: The target for which the upload will write
66 data to.
67
68 :param osutil: The os utility to be used for the transfer
69
70 :returns: True if the manager can handle the type of target specified
71 otherwise returns False.
72 """
73 raise NotImplementedError('must implement is_compatible()')
74
75 def get_download_task_tag(self):
76 """Get the tag (if any) to associate all GetObjectTasks
77
78 :rtype: s3transfer.futures.TaskTag
79 :returns: The tag to associate all GetObjectTasks with
80 """
81 return None
82
83 def get_fileobj_for_io_writes(self, transfer_future):
84 """Get file-like object to use for io writes in the io executor
85
86 :type transfer_future: s3transfer.futures.TransferFuture
87 :param transfer_future: The future associated with upload request
88
89 returns: A file-like object to write to
90 """
91 raise NotImplementedError('must implement get_fileobj_for_io_writes()')
92
93 def queue_file_io_task(self, fileobj, data, offset):
94 """Queue IO write for submission to the IO executor.
95
96 This method accepts an IO executor and information about the
97 downloaded data, and handles submitting this to the IO executor.
98
99 This method may defer submission to the IO executor if necessary.
100
101 """
102 self._transfer_coordinator.submit(
103 self._io_executor, self.get_io_write_task(fileobj, data, offset)
104 )
105
106 def get_io_write_task(self, fileobj, data, offset):
107 """Get an IO write task for the requested set of data
108
109 This task can be ran immediately or be submitted to the IO executor
110 for it to run.
111
112 :type fileobj: file-like object
113 :param fileobj: The file-like object to write to
114
115 :type data: bytes
116 :param data: The data to write out
117
118 :type offset: integer
119 :param offset: The offset to write the data to in the file-like object
120
121 :returns: An IO task to be used to write data to a file-like object
122 """
123 return IOWriteTask(
124 self._transfer_coordinator,
125 main_kwargs={
126 'fileobj': fileobj,
127 'data': data,
128 'offset': offset,
129 },
130 )
131
132 def get_final_io_task(self):
133 """Get the final io task to complete the download
134
135 This is needed because based on the architecture of the TransferManager
136 the final tasks will be sent to the IO executor, but the executor
137 needs a final task for it to signal that the transfer is done and
138 all done callbacks can be run.
139
140 :rtype: s3transfer.tasks.Task
141 :returns: A final task to completed in the io executor
142 """
143 raise NotImplementedError('must implement get_final_io_task()')
144
145 def _get_fileobj_from_filename(self, filename):
146 f = DeferredOpenFile(
147 filename, mode='wb', open_function=self._osutil.open
148 )
149 # Make sure the file gets closed and we remove the temporary file
150 # if anything goes wrong during the process.
151 self._transfer_coordinator.add_failure_cleanup(f.close)
152 return f
153
154
155class DownloadFilenameOutputManager(DownloadOutputManager):
156 def __init__(self, osutil, transfer_coordinator, io_executor):
157 super().__init__(osutil, transfer_coordinator, io_executor)
158 self._final_filename = None
159 self._temp_filename = None
160 self._temp_fileobj = None
161
162 @classmethod
163 def is_compatible(cls, download_target, osutil):
164 return isinstance(download_target, str)
165
166 def get_fileobj_for_io_writes(self, transfer_future):
167 fileobj = transfer_future.meta.call_args.fileobj
168 self._final_filename = fileobj
169 self._temp_filename = self._osutil.get_temp_filename(fileobj)
170 self._temp_fileobj = self._get_temp_fileobj()
171 return self._temp_fileobj
172
173 def get_final_io_task(self):
174 # A task to rename the file from the temporary file to its final
175 # location is needed. This should be the last task needed to complete
176 # the download.
177 return IORenameFileTask(
178 transfer_coordinator=self._transfer_coordinator,
179 main_kwargs={
180 'fileobj': self._temp_fileobj,
181 'final_filename': self._final_filename,
182 'osutil': self._osutil,
183 },
184 is_final=True,
185 )
186
187 def _get_temp_fileobj(self):
188 f = self._get_fileobj_from_filename(self._temp_filename)
189 self._transfer_coordinator.add_failure_cleanup(
190 self._osutil.remove_file, self._temp_filename
191 )
192 return f
193
194
195class DownloadSeekableOutputManager(DownloadOutputManager):
196 @classmethod
197 def is_compatible(cls, download_target, osutil):
198 return seekable(download_target)
199
200 def get_fileobj_for_io_writes(self, transfer_future):
201 # Return the fileobj provided to the future.
202 return transfer_future.meta.call_args.fileobj
203
204 def get_final_io_task(self):
205 # This task will serve the purpose of signaling when all of the io
206 # writes have finished so done callbacks can be called.
207 return CompleteDownloadNOOPTask(
208 transfer_coordinator=self._transfer_coordinator
209 )
210
211
212class DownloadNonSeekableOutputManager(DownloadOutputManager):
213 def __init__(
214 self, osutil, transfer_coordinator, io_executor, defer_queue=None
215 ):
216 super().__init__(osutil, transfer_coordinator, io_executor)
217 if defer_queue is None:
218 defer_queue = DeferQueue()
219 self._defer_queue = defer_queue
220 self._io_submit_lock = threading.Lock()
221
222 @classmethod
223 def is_compatible(cls, download_target, osutil):
224 return hasattr(download_target, 'write')
225
226 def get_download_task_tag(self):
227 return IN_MEMORY_DOWNLOAD_TAG
228
229 def get_fileobj_for_io_writes(self, transfer_future):
230 return transfer_future.meta.call_args.fileobj
231
232 def get_final_io_task(self):
233 return CompleteDownloadNOOPTask(
234 transfer_coordinator=self._transfer_coordinator
235 )
236
237 def queue_file_io_task(self, fileobj, data, offset):
238 with self._io_submit_lock:
239 writes = self._defer_queue.request_writes(offset, data)
240 for write in writes:
241 data = write['data']
242 logger.debug(
243 "Queueing IO offset %s for fileobj: %s",
244 write['offset'],
245 fileobj,
246 )
247 super().queue_file_io_task(fileobj, data, offset)
248
249 def get_io_write_task(self, fileobj, data, offset):
250 return IOStreamingWriteTask(
251 self._transfer_coordinator,
252 main_kwargs={
253 'fileobj': fileobj,
254 'data': data,
255 },
256 )
257
258
259class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager):
260 def __init__(
261 self, osutil, transfer_coordinator, io_executor, defer_queue=None
262 ):
263 super().__init__(
264 osutil, transfer_coordinator, io_executor, defer_queue
265 )
266 self._fileobj = None
267
268 @classmethod
269 def is_compatible(cls, download_target, osutil):
270 return isinstance(download_target, str) and osutil.is_special_file(
271 download_target
272 )
273
274 def get_fileobj_for_io_writes(self, transfer_future):
275 filename = transfer_future.meta.call_args.fileobj
276 self._fileobj = self._get_fileobj_from_filename(filename)
277 return self._fileobj
278
279 def get_final_io_task(self):
280 # Make sure the file gets closed once the transfer is done.
281 return IOCloseTask(
282 transfer_coordinator=self._transfer_coordinator,
283 is_final=True,
284 main_kwargs={'fileobj': self._fileobj},
285 )
286
287
288class DownloadSubmissionTask(SubmissionTask):
289 """Task for submitting tasks to execute a download"""
290
291 def _get_download_output_manager_cls(self, transfer_future, osutil):
292 """Retrieves a class for managing output for a download
293
294 :type transfer_future: s3transfer.futures.TransferFuture
295 :param transfer_future: The transfer future for the request
296
297 :type osutil: s3transfer.utils.OSUtils
298 :param osutil: The os utility associated to the transfer
299
300 :rtype: class of DownloadOutputManager
301 :returns: The appropriate class to use for managing a specific type of
302 input for downloads.
303 """
304 download_manager_resolver_chain = [
305 DownloadSpecialFilenameOutputManager,
306 DownloadFilenameOutputManager,
307 DownloadSeekableOutputManager,
308 DownloadNonSeekableOutputManager,
309 ]
310
311 fileobj = transfer_future.meta.call_args.fileobj
312 for download_manager_cls in download_manager_resolver_chain:
313 if download_manager_cls.is_compatible(fileobj, osutil):
314 return download_manager_cls
315 raise RuntimeError(
316 f'Output {fileobj} of type: {type(fileobj)} is not supported.'
317 )
318
319 def _submit(
320 self,
321 client,
322 config,
323 osutil,
324 request_executor,
325 io_executor,
326 transfer_future,
327 bandwidth_limiter=None,
328 ):
329 """
330 :param client: The client associated with the transfer manager
331
332 :type config: s3transfer.manager.TransferConfig
333 :param config: The transfer config associated with the transfer
334 manager
335
336 :type osutil: s3transfer.utils.OSUtil
337 :param osutil: The os utility associated to the transfer manager
338
339 :type request_executor: s3transfer.futures.BoundedExecutor
340 :param request_executor: The request executor associated with the
341 transfer manager
342
343 :type io_executor: s3transfer.futures.BoundedExecutor
344 :param io_executor: The io executor associated with the
345 transfer manager
346
347 :type transfer_future: s3transfer.futures.TransferFuture
348 :param transfer_future: The transfer future associated with the
349 transfer request that tasks are being submitted for
350
351 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
352 :param bandwidth_limiter: The bandwidth limiter to use when
353 downloading streams
354 """
355 if (
356 transfer_future.meta.size is None
357 or transfer_future.meta.etag is None
358 ):
359 response = client.head_object(
360 Bucket=transfer_future.meta.call_args.bucket,
361 Key=transfer_future.meta.call_args.key,
362 **transfer_future.meta.call_args.extra_args,
363 )
364 # If a size was not provided figure out the size for the
365 # user.
366 transfer_future.meta.provide_transfer_size(
367 response['ContentLength']
368 )
369 # Provide an etag to ensure a stored object is not modified
370 # during a multipart download.
371 transfer_future.meta.provide_object_etag(response.get('ETag'))
372
373 download_output_manager = self._get_download_output_manager_cls(
374 transfer_future, osutil
375 )(osutil, self._transfer_coordinator, io_executor)
376
377 # If it is greater than threshold do a ranged download, otherwise
378 # do a regular GetObject download.
379 if transfer_future.meta.size < config.multipart_threshold:
380 self._submit_download_request(
381 client,
382 config,
383 osutil,
384 request_executor,
385 io_executor,
386 download_output_manager,
387 transfer_future,
388 bandwidth_limiter,
389 )
390 else:
391 self._submit_ranged_download_request(
392 client,
393 config,
394 osutil,
395 request_executor,
396 io_executor,
397 download_output_manager,
398 transfer_future,
399 bandwidth_limiter,
400 )
401
402 def _submit_download_request(
403 self,
404 client,
405 config,
406 osutil,
407 request_executor,
408 io_executor,
409 download_output_manager,
410 transfer_future,
411 bandwidth_limiter,
412 ):
413 call_args = transfer_future.meta.call_args
414
415 # Get a handle to the file that will be used for writing downloaded
416 # contents
417 fileobj = download_output_manager.get_fileobj_for_io_writes(
418 transfer_future
419 )
420
421 # Get the needed callbacks for the task
422 progress_callbacks = get_callbacks(transfer_future, 'progress')
423
424 # Get any associated tags for the get object task.
425 get_object_tag = download_output_manager.get_download_task_tag()
426
427 # Get the final io task to run once the download is complete.
428 final_task = download_output_manager.get_final_io_task()
429
430 # Submit the task to download the object.
431 self._transfer_coordinator.submit(
432 request_executor,
433 ImmediatelyWriteIOGetObjectTask(
434 transfer_coordinator=self._transfer_coordinator,
435 main_kwargs={
436 'client': client,
437 'bucket': call_args.bucket,
438 'key': call_args.key,
439 'fileobj': fileobj,
440 'extra_args': call_args.extra_args,
441 'callbacks': progress_callbacks,
442 'max_attempts': config.num_download_attempts,
443 'download_output_manager': download_output_manager,
444 'io_chunksize': config.io_chunksize,
445 'bandwidth_limiter': bandwidth_limiter,
446 },
447 done_callbacks=[final_task],
448 ),
449 tag=get_object_tag,
450 )
451
452 def _submit_ranged_download_request(
453 self,
454 client,
455 config,
456 osutil,
457 request_executor,
458 io_executor,
459 download_output_manager,
460 transfer_future,
461 bandwidth_limiter,
462 ):
463 call_args = transfer_future.meta.call_args
464
465 # Get the needed progress callbacks for the task
466 progress_callbacks = get_callbacks(transfer_future, 'progress')
467
468 # Get a handle to the file that will be used for writing downloaded
469 # contents
470 fileobj = download_output_manager.get_fileobj_for_io_writes(
471 transfer_future
472 )
473
474 # Determine the number of parts
475 part_size = config.multipart_chunksize
476 num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
477
478 # Get any associated tags for the get object task.
479 get_object_tag = download_output_manager.get_download_task_tag()
480
481 # Callback invoker to submit the final io task once all downloads
482 # are complete.
483 finalize_download_invoker = CountCallbackInvoker(
484 self._get_final_io_task_submission_callback(
485 download_output_manager, io_executor
486 )
487 )
488 for i in range(num_parts):
489 # Calculate the range parameter
490 range_parameter = calculate_range_parameter(
491 part_size, i, num_parts
492 )
493
494 # Inject extra parameters to be passed in as extra args
495 extra_args = {
496 'Range': range_parameter,
497 }
498 if transfer_future.meta.etag is not None:
499 extra_args['IfMatch'] = transfer_future.meta.etag
500 extra_args.update(call_args.extra_args)
501 finalize_download_invoker.increment()
502 # Submit the ranged downloads
503 self._transfer_coordinator.submit(
504 request_executor,
505 GetObjectTask(
506 transfer_coordinator=self._transfer_coordinator,
507 main_kwargs={
508 'client': client,
509 'bucket': call_args.bucket,
510 'key': call_args.key,
511 'fileobj': fileobj,
512 'extra_args': extra_args,
513 'callbacks': progress_callbacks,
514 'max_attempts': config.num_download_attempts,
515 'start_index': i * part_size,
516 'download_output_manager': download_output_manager,
517 'io_chunksize': config.io_chunksize,
518 'bandwidth_limiter': bandwidth_limiter,
519 },
520 done_callbacks=[finalize_download_invoker.decrement],
521 ),
522 tag=get_object_tag,
523 )
524 finalize_download_invoker.finalize()
525
526 def _get_final_io_task_submission_callback(
527 self, download_manager, io_executor
528 ):
529 final_task = download_manager.get_final_io_task()
530 return FunctionContainer(
531 self._transfer_coordinator.submit, io_executor, final_task
532 )
533
534 def _calculate_range_param(self, part_size, part_index, num_parts):
535 # Used to calculate the Range parameter
536 start_range = part_index * part_size
537 if part_index == num_parts - 1:
538 end_range = ''
539 else:
540 end_range = start_range + part_size - 1
541 range_param = f'bytes={start_range}-{end_range}'
542 return range_param
543
544
545class GetObjectTask(Task):
546 def _main(
547 self,
548 client,
549 bucket,
550 key,
551 fileobj,
552 extra_args,
553 callbacks,
554 max_attempts,
555 download_output_manager,
556 io_chunksize,
557 start_index=0,
558 bandwidth_limiter=None,
559 ):
560 """Downloads an object and places content into io queue
561
562 :param client: The client to use when calling GetObject
563 :param bucket: The bucket to download from
564 :param key: The key to download from
565 :param fileobj: The file handle to write content to
566 :param exta_args: Any extra arguments to include in GetObject request
567 :param callbacks: List of progress callbacks to invoke on download
568 :param max_attempts: The number of retries to do when downloading
569 :param download_output_manager: The download output manager associated
570 with the current download.
571 :param io_chunksize: The size of each io chunk to read from the
572 download stream and queue in the io queue.
573 :param start_index: The location in the file to start writing the
574 content of the key to.
575 :param bandwidth_limiter: The bandwidth limiter to use when throttling
576 the downloading of data in streams.
577 """
578 last_exception = None
579 for i in range(max_attempts):
580 try:
581 current_index = start_index
582 response = client.get_object(
583 Bucket=bucket, Key=key, **extra_args
584 )
585 self._validate_content_range(
586 extra_args.get('Range'),
587 response.get('ContentRange'),
588 )
589 streaming_body = StreamReaderProgress(
590 response['Body'], callbacks
591 )
592 if bandwidth_limiter:
593 streaming_body = (
594 bandwidth_limiter.get_bandwith_limited_stream(
595 streaming_body, self._transfer_coordinator
596 )
597 )
598
599 chunks = DownloadChunkIterator(streaming_body, io_chunksize)
600 for chunk in chunks:
601 # If the transfer is done because of a cancellation
602 # or error somewhere else, stop trying to submit more
603 # data to be written and break out of the download.
604 if not self._transfer_coordinator.done():
605 self._handle_io(
606 download_output_manager,
607 fileobj,
608 chunk,
609 current_index,
610 )
611 current_index += len(chunk)
612 else:
613 return
614 return
615 except ClientError as e:
616 error_code = e.response.get('Error', {}).get('Code')
617 if error_code == "PreconditionFailed":
618 raise S3DownloadFailedError(
619 f'Contents of stored object "{key}" in bucket '
620 f'"{bucket}" did not match expected ETag.'
621 )
622 else:
623 raise
624 except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
625 logger.debug(
626 "Retrying exception caught (%s), "
627 "retrying request, (attempt %s / %s)",
628 e,
629 i,
630 max_attempts,
631 exc_info=True,
632 )
633 last_exception = e
634 # Also invoke the progress callbacks to indicate that we
635 # are trying to download the stream again and all progress
636 # for this GetObject has been lost.
637 invoke_progress_callbacks(
638 callbacks, start_index - current_index
639 )
640 continue
641 raise RetriesExceededError(last_exception)
642
643 def _handle_io(self, download_output_manager, fileobj, chunk, index):
644 download_output_manager.queue_file_io_task(fileobj, chunk, index)
645
646 def _validate_content_range(self, requested_range, content_range):
647 if not requested_range or not content_range:
648 return
649 # Unparsed `ContentRange` looks like `bytes 0-8388607/39542919`,
650 # where `0-8388607` is the fetched range and `39542919` is
651 # the total object size.
652 response_range, total_size = content_range.split('/')
653 # Subtract `1` because range is 0-indexed.
654 final_byte = str(int(total_size) - 1)
655 # If it's the last part, the requested range will not include
656 # the final byte, eg `bytes=33554432-`.
657 if requested_range.endswith('-'):
658 requested_range += final_byte
659 # Request looks like `bytes=0-8388607`.
660 # Parsed response looks like `bytes 0-8388607`.
661 if requested_range[6:] != response_range[6:]:
662 raise S3ValidationError(
663 f"Requested range: `{requested_range[6:]}` does not match "
664 f"content range in response: `{response_range[6:]}`"
665 )
666
667
668class ImmediatelyWriteIOGetObjectTask(GetObjectTask):
669 """GetObjectTask that immediately writes to the provided file object
670
671 This is useful for downloads where it is known only one thread is
672 downloading the object so there is no reason to go through the
673 overhead of using an IO queue and executor.
674 """
675
676 def _handle_io(self, download_output_manager, fileobj, chunk, index):
677 task = download_output_manager.get_io_write_task(fileobj, chunk, index)
678 task()
679
680
681class IOWriteTask(Task):
682 def _main(self, fileobj, data, offset):
683 """Pulls off an io queue to write contents to a file
684
685 :param fileobj: The file handle to write content to
686 :param data: The data to write
687 :param offset: The offset to write the data to.
688 """
689 fileobj.seek(offset)
690 fileobj.write(data)
691
692
693class IOStreamingWriteTask(Task):
694 """Task for writing data to a non-seekable stream."""
695
696 def _main(self, fileobj, data):
697 """Write data to a fileobj.
698
699 Data will be written directly to the fileobj without
700 any prior seeking.
701
702 :param fileobj: The fileobj to write content to
703 :param data: The data to write
704
705 """
706 fileobj.write(data)
707
708
709class IORenameFileTask(Task):
710 """A task to rename a temporary file to its final filename
711
712 :param fileobj: The file handle that content was written to.
713 :param final_filename: The final name of the file to rename to
714 upon completion of writing the contents.
715 :param osutil: OS utility
716 """
717
718 def _main(self, fileobj, final_filename, osutil):
719 fileobj.close()
720 osutil.rename_file(fileobj.name, final_filename)
721
722
723class IOCloseTask(Task):
724 """A task to close out a file once the download is complete.
725
726 :param fileobj: The fileobj to close.
727 """
728
729 def _main(self, fileobj):
730 fileobj.close()
731
732
733class CompleteDownloadNOOPTask(Task):
734 """A NOOP task to serve as an indicator that the download is complete
735
736 Note that the default for is_final is set to True because this should
737 always be the last task.
738 """
739
740 def __init__(
741 self,
742 transfer_coordinator,
743 main_kwargs=None,
744 pending_main_kwargs=None,
745 done_callbacks=None,
746 is_final=True,
747 ):
748 super().__init__(
749 transfer_coordinator=transfer_coordinator,
750 main_kwargs=main_kwargs,
751 pending_main_kwargs=pending_main_kwargs,
752 done_callbacks=done_callbacks,
753 is_final=is_final,
754 )
755
756 def _main(self):
757 pass
758
759
760class DownloadChunkIterator:
761 def __init__(self, body, chunksize):
762 """Iterator to chunk out a downloaded S3 stream
763
764 :param body: A readable file-like object
765 :param chunksize: The amount to read each time
766 """
767 self._body = body
768 self._chunksize = chunksize
769 self._num_reads = 0
770
771 def __iter__(self):
772 return self
773
774 def __next__(self):
775 chunk = self._body.read(self._chunksize)
776 self._num_reads += 1
777 if chunk:
778 return chunk
779 elif self._num_reads == 1:
780 # Even though the response may have not had any
781 # content, we still want to account for an empty object's
782 # existence so return the empty chunk for that initial
783 # read.
784 return chunk
785 raise StopIteration()
786
787 next = __next__
788
789
790class DeferQueue:
791 """IO queue that defers write requests until they are queued sequentially.
792
793 This class is used to track IO data for a *single* fileobj.
794
795 You can send data to this queue, and it will defer any IO write requests
796 until it has the next contiguous block available (starting at 0).
797
798 """
799
800 def __init__(self):
801 self._writes = []
802 self._pending_offsets = {}
803 self._next_offset = 0
804
805 def request_writes(self, offset, data):
806 """Request any available writes given new incoming data.
807
808 You call this method by providing new data along with the
809 offset associated with the data. If that new data unlocks
810 any contiguous writes that can now be submitted, this
811 method will return all applicable writes.
812
813 This is done with 1 method call so you don't have to
814 make two method calls (put(), get()) which acquires a lock
815 each method call.
816
817 """
818 if offset + len(data) <= self._next_offset:
819 # This is a request for a write that we've already
820 # seen. This can happen in the event of a retry
821 # where if we retry at at offset N/2, we'll requeue
822 # offsets 0-N/2 again.
823 return []
824 writes = []
825 if offset < self._next_offset:
826 # This is a special case where the write request contains
827 # both seen AND unseen data. This can happen in the case
828 # that we queue part of a chunk due to an incomplete read,
829 # then pop the incomplete data for writing, then we receive the retry
830 # for the incomplete read which contains both the previously-seen
831 # partial chunk followed by the rest of the chunk (unseen).
832 #
833 # In this case, we discard the bytes of the data we've already
834 # queued before, and only queue the unseen bytes.
835 seen_bytes = self._next_offset - offset
836 data = data[seen_bytes:]
837 offset = self._next_offset
838 if offset in self._pending_offsets:
839 queued_data = self._pending_offsets[offset]
840 if len(data) <= len(queued_data):
841 # We already have a write request queued with the same offset
842 # with at least as much data that is present in this
843 # request. In this case we should ignore this request
844 # and prefer what's already queued.
845 return []
846 else:
847 # We have a write request queued with the same offset,
848 # but this request contains more data. This can happen
849 # in the case of a retried request due to an incomplete
850 # read, followed by a retry containing the full response
851 # body. In this case, we should overwrite the queued
852 # request with this one since it contains more data.
853 self._pending_offsets[offset] = data
854 else:
855 heapq.heappush(self._writes, offset)
856 self._pending_offsets[offset] = data
857 while self._writes and self._writes[0] == self._next_offset:
858 next_write_offset = heapq.heappop(self._writes)
859 next_write = self._pending_offsets[next_write_offset]
860 writes.append({'offset': next_write_offset, 'data': next_write})
861 del self._pending_offsets[next_write_offset]
862 self._next_offset += len(next_write)
863 return writes