Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/download.py: 34%
229 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import heapq
14import logging
15import threading
17from s3transfer.compat import seekable
18from s3transfer.exceptions import RetriesExceededError
19from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
20from s3transfer.tasks import SubmissionTask, Task
21from s3transfer.utils import (
22 S3_RETRYABLE_DOWNLOAD_ERRORS,
23 CountCallbackInvoker,
24 DeferredOpenFile,
25 FunctionContainer,
26 StreamReaderProgress,
27 calculate_num_parts,
28 calculate_range_parameter,
29 get_callbacks,
30 invoke_progress_callbacks,
31)
33logger = logging.getLogger(__name__)
36class DownloadOutputManager:
37 """Base manager class for handling various types of files for downloads
39 This class is typically used for the DownloadSubmissionTask class to help
40 determine the following:
42 * Provides the fileobj to write to downloads to
43 * Get a task to complete once everything downloaded has been written
45 The answers/implementations differ for the various types of file outputs
46 that may be accepted. All implementations must subclass and override
47 public methods from this class.
48 """
50 def __init__(self, osutil, transfer_coordinator, io_executor):
51 self._osutil = osutil
52 self._transfer_coordinator = transfer_coordinator
53 self._io_executor = io_executor
55 @classmethod
56 def is_compatible(cls, download_target, osutil):
57 """Determines if the target for the download is compatible with manager
59 :param download_target: The target for which the upload will write
60 data to.
62 :param osutil: The os utility to be used for the transfer
64 :returns: True if the manager can handle the type of target specified
65 otherwise returns False.
66 """
67 raise NotImplementedError('must implement is_compatible()')
69 def get_download_task_tag(self):
70 """Get the tag (if any) to associate all GetObjectTasks
72 :rtype: s3transfer.futures.TaskTag
73 :returns: The tag to associate all GetObjectTasks with
74 """
75 return None
77 def get_fileobj_for_io_writes(self, transfer_future):
78 """Get file-like object to use for io writes in the io executor
80 :type transfer_future: s3transfer.futures.TransferFuture
81 :param transfer_future: The future associated with upload request
83 returns: A file-like object to write to
84 """
85 raise NotImplementedError('must implement get_fileobj_for_io_writes()')
87 def queue_file_io_task(self, fileobj, data, offset):
88 """Queue IO write for submission to the IO executor.
90 This method accepts an IO executor and information about the
91 downloaded data, and handles submitting this to the IO executor.
93 This method may defer submission to the IO executor if necessary.
95 """
96 self._transfer_coordinator.submit(
97 self._io_executor, self.get_io_write_task(fileobj, data, offset)
98 )
100 def get_io_write_task(self, fileobj, data, offset):
101 """Get an IO write task for the requested set of data
103 This task can be ran immediately or be submitted to the IO executor
104 for it to run.
106 :type fileobj: file-like object
107 :param fileobj: The file-like object to write to
109 :type data: bytes
110 :param data: The data to write out
112 :type offset: integer
113 :param offset: The offset to write the data to in the file-like object
115 :returns: An IO task to be used to write data to a file-like object
116 """
117 return IOWriteTask(
118 self._transfer_coordinator,
119 main_kwargs={
120 'fileobj': fileobj,
121 'data': data,
122 'offset': offset,
123 },
124 )
126 def get_final_io_task(self):
127 """Get the final io task to complete the download
129 This is needed because based on the architecture of the TransferManager
130 the final tasks will be sent to the IO executor, but the executor
131 needs a final task for it to signal that the transfer is done and
132 all done callbacks can be run.
134 :rtype: s3transfer.tasks.Task
135 :returns: A final task to completed in the io executor
136 """
137 raise NotImplementedError('must implement get_final_io_task()')
139 def _get_fileobj_from_filename(self, filename):
140 f = DeferredOpenFile(
141 filename, mode='wb', open_function=self._osutil.open
142 )
143 # Make sure the file gets closed and we remove the temporary file
144 # if anything goes wrong during the process.
145 self._transfer_coordinator.add_failure_cleanup(f.close)
146 return f
149class DownloadFilenameOutputManager(DownloadOutputManager):
150 def __init__(self, osutil, transfer_coordinator, io_executor):
151 super().__init__(osutil, transfer_coordinator, io_executor)
152 self._final_filename = None
153 self._temp_filename = None
154 self._temp_fileobj = None
156 @classmethod
157 def is_compatible(cls, download_target, osutil):
158 return isinstance(download_target, str)
160 def get_fileobj_for_io_writes(self, transfer_future):
161 fileobj = transfer_future.meta.call_args.fileobj
162 self._final_filename = fileobj
163 self._temp_filename = self._osutil.get_temp_filename(fileobj)
164 self._temp_fileobj = self._get_temp_fileobj()
165 return self._temp_fileobj
167 def get_final_io_task(self):
168 # A task to rename the file from the temporary file to its final
169 # location is needed. This should be the last task needed to complete
170 # the download.
171 return IORenameFileTask(
172 transfer_coordinator=self._transfer_coordinator,
173 main_kwargs={
174 'fileobj': self._temp_fileobj,
175 'final_filename': self._final_filename,
176 'osutil': self._osutil,
177 },
178 is_final=True,
179 )
181 def _get_temp_fileobj(self):
182 f = self._get_fileobj_from_filename(self._temp_filename)
183 self._transfer_coordinator.add_failure_cleanup(
184 self._osutil.remove_file, self._temp_filename
185 )
186 return f
189class DownloadSeekableOutputManager(DownloadOutputManager):
190 @classmethod
191 def is_compatible(cls, download_target, osutil):
192 return seekable(download_target)
194 def get_fileobj_for_io_writes(self, transfer_future):
195 # Return the fileobj provided to the future.
196 return transfer_future.meta.call_args.fileobj
198 def get_final_io_task(self):
199 # This task will serve the purpose of signaling when all of the io
200 # writes have finished so done callbacks can be called.
201 return CompleteDownloadNOOPTask(
202 transfer_coordinator=self._transfer_coordinator
203 )
206class DownloadNonSeekableOutputManager(DownloadOutputManager):
207 def __init__(
208 self, osutil, transfer_coordinator, io_executor, defer_queue=None
209 ):
210 super().__init__(osutil, transfer_coordinator, io_executor)
211 if defer_queue is None:
212 defer_queue = DeferQueue()
213 self._defer_queue = defer_queue
214 self._io_submit_lock = threading.Lock()
216 @classmethod
217 def is_compatible(cls, download_target, osutil):
218 return hasattr(download_target, 'write')
220 def get_download_task_tag(self):
221 return IN_MEMORY_DOWNLOAD_TAG
223 def get_fileobj_for_io_writes(self, transfer_future):
224 return transfer_future.meta.call_args.fileobj
226 def get_final_io_task(self):
227 return CompleteDownloadNOOPTask(
228 transfer_coordinator=self._transfer_coordinator
229 )
231 def queue_file_io_task(self, fileobj, data, offset):
232 with self._io_submit_lock:
233 writes = self._defer_queue.request_writes(offset, data)
234 for write in writes:
235 data = write['data']
236 logger.debug(
237 "Queueing IO offset %s for fileobj: %s",
238 write['offset'],
239 fileobj,
240 )
241 super().queue_file_io_task(fileobj, data, offset)
243 def get_io_write_task(self, fileobj, data, offset):
244 return IOStreamingWriteTask(
245 self._transfer_coordinator,
246 main_kwargs={
247 'fileobj': fileobj,
248 'data': data,
249 },
250 )
253class DownloadSpecialFilenameOutputManager(DownloadNonSeekableOutputManager):
254 def __init__(
255 self, osutil, transfer_coordinator, io_executor, defer_queue=None
256 ):
257 super().__init__(
258 osutil, transfer_coordinator, io_executor, defer_queue
259 )
260 self._fileobj = None
262 @classmethod
263 def is_compatible(cls, download_target, osutil):
264 return isinstance(download_target, str) and osutil.is_special_file(
265 download_target
266 )
268 def get_fileobj_for_io_writes(self, transfer_future):
269 filename = transfer_future.meta.call_args.fileobj
270 self._fileobj = self._get_fileobj_from_filename(filename)
271 return self._fileobj
273 def get_final_io_task(self):
274 # Make sure the file gets closed once the transfer is done.
275 return IOCloseTask(
276 transfer_coordinator=self._transfer_coordinator,
277 is_final=True,
278 main_kwargs={'fileobj': self._fileobj},
279 )
282class DownloadSubmissionTask(SubmissionTask):
283 """Task for submitting tasks to execute a download"""
285 def _get_download_output_manager_cls(self, transfer_future, osutil):
286 """Retrieves a class for managing output for a download
288 :type transfer_future: s3transfer.futures.TransferFuture
289 :param transfer_future: The transfer future for the request
291 :type osutil: s3transfer.utils.OSUtils
292 :param osutil: The os utility associated to the transfer
294 :rtype: class of DownloadOutputManager
295 :returns: The appropriate class to use for managing a specific type of
296 input for downloads.
297 """
298 download_manager_resolver_chain = [
299 DownloadSpecialFilenameOutputManager,
300 DownloadFilenameOutputManager,
301 DownloadSeekableOutputManager,
302 DownloadNonSeekableOutputManager,
303 ]
305 fileobj = transfer_future.meta.call_args.fileobj
306 for download_manager_cls in download_manager_resolver_chain:
307 if download_manager_cls.is_compatible(fileobj, osutil):
308 return download_manager_cls
309 raise RuntimeError(
310 'Output {} of type: {} is not supported.'.format(
311 fileobj, type(fileobj)
312 )
313 )
315 def _submit(
316 self,
317 client,
318 config,
319 osutil,
320 request_executor,
321 io_executor,
322 transfer_future,
323 bandwidth_limiter=None,
324 ):
325 """
326 :param client: The client associated with the transfer manager
328 :type config: s3transfer.manager.TransferConfig
329 :param config: The transfer config associated with the transfer
330 manager
332 :type osutil: s3transfer.utils.OSUtil
333 :param osutil: The os utility associated to the transfer manager
335 :type request_executor: s3transfer.futures.BoundedExecutor
336 :param request_executor: The request executor associated with the
337 transfer manager
339 :type io_executor: s3transfer.futures.BoundedExecutor
340 :param io_executor: The io executor associated with the
341 transfer manager
343 :type transfer_future: s3transfer.futures.TransferFuture
344 :param transfer_future: The transfer future associated with the
345 transfer request that tasks are being submitted for
347 :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
348 :param bandwidth_limiter: The bandwidth limiter to use when
349 downloading streams
350 """
351 if transfer_future.meta.size is None:
352 # If a size was not provided figure out the size for the
353 # user.
354 response = client.head_object(
355 Bucket=transfer_future.meta.call_args.bucket,
356 Key=transfer_future.meta.call_args.key,
357 **transfer_future.meta.call_args.extra_args,
358 )
359 transfer_future.meta.provide_transfer_size(
360 response['ContentLength']
361 )
363 download_output_manager = self._get_download_output_manager_cls(
364 transfer_future, osutil
365 )(osutil, self._transfer_coordinator, io_executor)
367 # If it is greater than threshold do a ranged download, otherwise
368 # do a regular GetObject download.
369 if transfer_future.meta.size < config.multipart_threshold:
370 self._submit_download_request(
371 client,
372 config,
373 osutil,
374 request_executor,
375 io_executor,
376 download_output_manager,
377 transfer_future,
378 bandwidth_limiter,
379 )
380 else:
381 self._submit_ranged_download_request(
382 client,
383 config,
384 osutil,
385 request_executor,
386 io_executor,
387 download_output_manager,
388 transfer_future,
389 bandwidth_limiter,
390 )
392 def _submit_download_request(
393 self,
394 client,
395 config,
396 osutil,
397 request_executor,
398 io_executor,
399 download_output_manager,
400 transfer_future,
401 bandwidth_limiter,
402 ):
403 call_args = transfer_future.meta.call_args
405 # Get a handle to the file that will be used for writing downloaded
406 # contents
407 fileobj = download_output_manager.get_fileobj_for_io_writes(
408 transfer_future
409 )
411 # Get the needed callbacks for the task
412 progress_callbacks = get_callbacks(transfer_future, 'progress')
414 # Get any associated tags for the get object task.
415 get_object_tag = download_output_manager.get_download_task_tag()
417 # Get the final io task to run once the download is complete.
418 final_task = download_output_manager.get_final_io_task()
420 # Submit the task to download the object.
421 self._transfer_coordinator.submit(
422 request_executor,
423 ImmediatelyWriteIOGetObjectTask(
424 transfer_coordinator=self._transfer_coordinator,
425 main_kwargs={
426 'client': client,
427 'bucket': call_args.bucket,
428 'key': call_args.key,
429 'fileobj': fileobj,
430 'extra_args': call_args.extra_args,
431 'callbacks': progress_callbacks,
432 'max_attempts': config.num_download_attempts,
433 'download_output_manager': download_output_manager,
434 'io_chunksize': config.io_chunksize,
435 'bandwidth_limiter': bandwidth_limiter,
436 },
437 done_callbacks=[final_task],
438 ),
439 tag=get_object_tag,
440 )
442 def _submit_ranged_download_request(
443 self,
444 client,
445 config,
446 osutil,
447 request_executor,
448 io_executor,
449 download_output_manager,
450 transfer_future,
451 bandwidth_limiter,
452 ):
453 call_args = transfer_future.meta.call_args
455 # Get the needed progress callbacks for the task
456 progress_callbacks = get_callbacks(transfer_future, 'progress')
458 # Get a handle to the file that will be used for writing downloaded
459 # contents
460 fileobj = download_output_manager.get_fileobj_for_io_writes(
461 transfer_future
462 )
464 # Determine the number of parts
465 part_size = config.multipart_chunksize
466 num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
468 # Get any associated tags for the get object task.
469 get_object_tag = download_output_manager.get_download_task_tag()
471 # Callback invoker to submit the final io task once all downloads
472 # are complete.
473 finalize_download_invoker = CountCallbackInvoker(
474 self._get_final_io_task_submission_callback(
475 download_output_manager, io_executor
476 )
477 )
478 for i in range(num_parts):
479 # Calculate the range parameter
480 range_parameter = calculate_range_parameter(
481 part_size, i, num_parts
482 )
484 # Inject the Range parameter to the parameters to be passed in
485 # as extra args
486 extra_args = {'Range': range_parameter}
487 extra_args.update(call_args.extra_args)
488 finalize_download_invoker.increment()
489 # Submit the ranged downloads
490 self._transfer_coordinator.submit(
491 request_executor,
492 GetObjectTask(
493 transfer_coordinator=self._transfer_coordinator,
494 main_kwargs={
495 'client': client,
496 'bucket': call_args.bucket,
497 'key': call_args.key,
498 'fileobj': fileobj,
499 'extra_args': extra_args,
500 'callbacks': progress_callbacks,
501 'max_attempts': config.num_download_attempts,
502 'start_index': i * part_size,
503 'download_output_manager': download_output_manager,
504 'io_chunksize': config.io_chunksize,
505 'bandwidth_limiter': bandwidth_limiter,
506 },
507 done_callbacks=[finalize_download_invoker.decrement],
508 ),
509 tag=get_object_tag,
510 )
511 finalize_download_invoker.finalize()
513 def _get_final_io_task_submission_callback(
514 self, download_manager, io_executor
515 ):
516 final_task = download_manager.get_final_io_task()
517 return FunctionContainer(
518 self._transfer_coordinator.submit, io_executor, final_task
519 )
521 def _calculate_range_param(self, part_size, part_index, num_parts):
522 # Used to calculate the Range parameter
523 start_range = part_index * part_size
524 if part_index == num_parts - 1:
525 end_range = ''
526 else:
527 end_range = start_range + part_size - 1
528 range_param = f'bytes={start_range}-{end_range}'
529 return range_param
532class GetObjectTask(Task):
533 def _main(
534 self,
535 client,
536 bucket,
537 key,
538 fileobj,
539 extra_args,
540 callbacks,
541 max_attempts,
542 download_output_manager,
543 io_chunksize,
544 start_index=0,
545 bandwidth_limiter=None,
546 ):
547 """Downloads an object and places content into io queue
549 :param client: The client to use when calling GetObject
550 :param bucket: The bucket to download from
551 :param key: The key to download from
552 :param fileobj: The file handle to write content to
553 :param exta_args: Any extra arguments to include in GetObject request
554 :param callbacks: List of progress callbacks to invoke on download
555 :param max_attempts: The number of retries to do when downloading
556 :param download_output_manager: The download output manager associated
557 with the current download.
558 :param io_chunksize: The size of each io chunk to read from the
559 download stream and queue in the io queue.
560 :param start_index: The location in the file to start writing the
561 content of the key to.
562 :param bandwidth_limiter: The bandwidth limiter to use when throttling
563 the downloading of data in streams.
564 """
565 last_exception = None
566 for i in range(max_attempts):
567 try:
568 current_index = start_index
569 response = client.get_object(
570 Bucket=bucket, Key=key, **extra_args
571 )
572 streaming_body = StreamReaderProgress(
573 response['Body'], callbacks
574 )
575 if bandwidth_limiter:
576 streaming_body = (
577 bandwidth_limiter.get_bandwith_limited_stream(
578 streaming_body, self._transfer_coordinator
579 )
580 )
582 chunks = DownloadChunkIterator(streaming_body, io_chunksize)
583 for chunk in chunks:
584 # If the transfer is done because of a cancellation
585 # or error somewhere else, stop trying to submit more
586 # data to be written and break out of the download.
587 if not self._transfer_coordinator.done():
588 self._handle_io(
589 download_output_manager,
590 fileobj,
591 chunk,
592 current_index,
593 )
594 current_index += len(chunk)
595 else:
596 return
597 return
598 except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
599 logger.debug(
600 "Retrying exception caught (%s), "
601 "retrying request, (attempt %s / %s)",
602 e,
603 i,
604 max_attempts,
605 exc_info=True,
606 )
607 last_exception = e
608 # Also invoke the progress callbacks to indicate that we
609 # are trying to download the stream again and all progress
610 # for this GetObject has been lost.
611 invoke_progress_callbacks(
612 callbacks, start_index - current_index
613 )
614 continue
615 raise RetriesExceededError(last_exception)
617 def _handle_io(self, download_output_manager, fileobj, chunk, index):
618 download_output_manager.queue_file_io_task(fileobj, chunk, index)
621class ImmediatelyWriteIOGetObjectTask(GetObjectTask):
622 """GetObjectTask that immediately writes to the provided file object
624 This is useful for downloads where it is known only one thread is
625 downloading the object so there is no reason to go through the
626 overhead of using an IO queue and executor.
627 """
629 def _handle_io(self, download_output_manager, fileobj, chunk, index):
630 task = download_output_manager.get_io_write_task(fileobj, chunk, index)
631 task()
634class IOWriteTask(Task):
635 def _main(self, fileobj, data, offset):
636 """Pulls off an io queue to write contents to a file
638 :param fileobj: The file handle to write content to
639 :param data: The data to write
640 :param offset: The offset to write the data to.
641 """
642 fileobj.seek(offset)
643 fileobj.write(data)
646class IOStreamingWriteTask(Task):
647 """Task for writing data to a non-seekable stream."""
649 def _main(self, fileobj, data):
650 """Write data to a fileobj.
652 Data will be written directly to the fileobj without
653 any prior seeking.
655 :param fileobj: The fileobj to write content to
656 :param data: The data to write
658 """
659 fileobj.write(data)
662class IORenameFileTask(Task):
663 """A task to rename a temporary file to its final filename
665 :param fileobj: The file handle that content was written to.
666 :param final_filename: The final name of the file to rename to
667 upon completion of writing the contents.
668 :param osutil: OS utility
669 """
671 def _main(self, fileobj, final_filename, osutil):
672 fileobj.close()
673 osutil.rename_file(fileobj.name, final_filename)
676class IOCloseTask(Task):
677 """A task to close out a file once the download is complete.
679 :param fileobj: The fileobj to close.
680 """
682 def _main(self, fileobj):
683 fileobj.close()
686class CompleteDownloadNOOPTask(Task):
687 """A NOOP task to serve as an indicator that the download is complete
689 Note that the default for is_final is set to True because this should
690 always be the last task.
691 """
693 def __init__(
694 self,
695 transfer_coordinator,
696 main_kwargs=None,
697 pending_main_kwargs=None,
698 done_callbacks=None,
699 is_final=True,
700 ):
701 super().__init__(
702 transfer_coordinator=transfer_coordinator,
703 main_kwargs=main_kwargs,
704 pending_main_kwargs=pending_main_kwargs,
705 done_callbacks=done_callbacks,
706 is_final=is_final,
707 )
709 def _main(self):
710 pass
713class DownloadChunkIterator:
714 def __init__(self, body, chunksize):
715 """Iterator to chunk out a downloaded S3 stream
717 :param body: A readable file-like object
718 :param chunksize: The amount to read each time
719 """
720 self._body = body
721 self._chunksize = chunksize
722 self._num_reads = 0
724 def __iter__(self):
725 return self
727 def __next__(self):
728 chunk = self._body.read(self._chunksize)
729 self._num_reads += 1
730 if chunk:
731 return chunk
732 elif self._num_reads == 1:
733 # Even though the response may have not had any
734 # content, we still want to account for an empty object's
735 # existence so return the empty chunk for that initial
736 # read.
737 return chunk
738 raise StopIteration()
740 next = __next__
743class DeferQueue:
744 """IO queue that defers write requests until they are queued sequentially.
746 This class is used to track IO data for a *single* fileobj.
748 You can send data to this queue, and it will defer any IO write requests
749 until it has the next contiguous block available (starting at 0).
751 """
753 def __init__(self):
754 self._writes = []
755 self._pending_offsets = set()
756 self._next_offset = 0
758 def request_writes(self, offset, data):
759 """Request any available writes given new incoming data.
761 You call this method by providing new data along with the
762 offset associated with the data. If that new data unlocks
763 any contiguous writes that can now be submitted, this
764 method will return all applicable writes.
766 This is done with 1 method call so you don't have to
767 make two method calls (put(), get()) which acquires a lock
768 each method call.
770 """
771 if offset < self._next_offset:
772 # This is a request for a write that we've already
773 # seen. This can happen in the event of a retry
774 # where if we retry at at offset N/2, we'll requeue
775 # offsets 0-N/2 again.
776 return []
777 writes = []
778 if offset in self._pending_offsets:
779 # We've already queued this offset so this request is
780 # a duplicate. In this case we should ignore
781 # this request and prefer what's already queued.
782 return []
783 heapq.heappush(self._writes, (offset, data))
784 self._pending_offsets.add(offset)
785 while self._writes and self._writes[0][0] == self._next_offset:
786 next_write = heapq.heappop(self._writes)
787 writes.append({'offset': next_write[0], 'data': next_write[1]})
788 self._pending_offsets.remove(next_write[0])
789 self._next_offset += len(next_write[1])
790 return writes