Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/utils.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import functools
14import logging
15import math
16import os
17import random
18import socket
19import stat
20import string
21import threading
22from collections import defaultdict
24from botocore.exceptions import (
25 IncompleteReadError,
26 ReadTimeoutError,
27 ResponseStreamingError,
28)
29from botocore.httpchecksum import AwsChunkedWrapper
30from botocore.utils import is_s3express_bucket
32from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
34MAX_PARTS = 10000
35# The maximum file size you can upload via S3 per request.
36# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
37# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
38MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
39MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
40logger = logging.getLogger(__name__)
43S3_RETRYABLE_DOWNLOAD_ERRORS = (
44 socket.timeout,
45 SOCKET_ERROR,
46 ReadTimeoutError,
47 IncompleteReadError,
48 ResponseStreamingError,
49)
52def random_file_extension(num_digits=8):
53 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
56def signal_not_transferring(request, operation_name, **kwargs):
57 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
58 request.body, 'signal_not_transferring'
59 ):
60 request.body.signal_not_transferring()
63def signal_transferring(request, operation_name, **kwargs):
64 if operation_name in ['PutObject', 'UploadPart']:
65 body = request.body
66 if isinstance(body, AwsChunkedWrapper):
67 body = getattr(body, '_raw', None)
68 if hasattr(body, 'signal_transferring'):
69 body.signal_transferring()
72def calculate_num_parts(size, part_size):
73 return int(math.ceil(size / float(part_size)))
76def calculate_range_parameter(
77 part_size, part_index, num_parts, total_size=None
78):
79 """Calculate the range parameter for multipart downloads/copies
81 :type part_size: int
82 :param part_size: The size of the part
84 :type part_index: int
85 :param part_index: The index for which this parts starts. This index starts
86 at zero
88 :type num_parts: int
89 :param num_parts: The total number of parts in the transfer
91 :returns: The value to use for Range parameter on downloads or
92 the CopySourceRange parameter for copies
93 """
94 # Used to calculate the Range parameter
95 start_range = part_index * part_size
96 if part_index == num_parts - 1:
97 end_range = ''
98 if total_size is not None:
99 end_range = str(total_size - 1)
100 else:
101 end_range = start_range + part_size - 1
102 range_param = f'bytes={start_range}-{end_range}'
103 return range_param
106def get_callbacks(transfer_future, callback_type):
107 """Retrieves callbacks from a subscriber
109 :type transfer_future: s3transfer.futures.TransferFuture
110 :param transfer_future: The transfer future the subscriber is associated
111 to.
113 :type callback_type: str
114 :param callback_type: The type of callback to retrieve from the subscriber.
115 Valid types include:
116 * 'queued'
117 * 'progress'
118 * 'done'
120 :returns: A list of callbacks for the type specified. All callbacks are
121 preinjected with the transfer future.
122 """
123 callbacks = []
124 for subscriber in transfer_future.meta.call_args.subscribers:
125 callback_name = 'on_' + callback_type
126 if hasattr(subscriber, callback_name):
127 callbacks.append(
128 functools.partial(
129 getattr(subscriber, callback_name), future=transfer_future
130 )
131 )
132 return callbacks
135def invoke_progress_callbacks(callbacks, bytes_transferred):
136 """Calls all progress callbacks
138 :param callbacks: A list of progress callbacks to invoke
139 :param bytes_transferred: The number of bytes transferred. This is passed
140 to the callbacks. If no bytes were transferred the callbacks will not
141 be invoked because no progress was achieved. It is also possible
142 to receive a negative amount which comes from retrying a transfer
143 request.
144 """
145 # Only invoke the callbacks if bytes were actually transferred.
146 if bytes_transferred:
147 for callback in callbacks:
148 callback(bytes_transferred=bytes_transferred)
151def get_filtered_dict(original_dict, whitelisted_keys):
152 """Gets a dictionary filtered by whitelisted keys
154 :param original_dict: The original dictionary of arguments to source keys
155 and values.
156 :param whitelisted_key: A list of keys to include in the filtered
157 dictionary.
159 :returns: A dictionary containing key/values from the original dictionary
160 whose key was included in the whitelist
161 """
162 filtered_dict = {}
163 for key, value in original_dict.items():
164 if key in whitelisted_keys:
165 filtered_dict[key] = value
166 return filtered_dict
169class CallArgs:
170 def __init__(self, **kwargs):
171 """A class that records call arguments
173 The call arguments must be passed as keyword arguments. It will set
174 each keyword argument as an attribute of the object along with its
175 associated value.
176 """
177 for arg, value in kwargs.items():
178 setattr(self, arg, value)
181class FunctionContainer:
182 """An object that contains a function and any args or kwargs to call it
184 When called the provided function will be called with provided args
185 and kwargs.
186 """
188 def __init__(self, func, *args, **kwargs):
189 self._func = func
190 self._args = args
191 self._kwargs = kwargs
193 def __repr__(self):
194 return 'Function: {} with args {} and kwargs {}'.format(
195 self._func, self._args, self._kwargs
196 )
198 def __call__(self):
199 return self._func(*self._args, **self._kwargs)
202class CountCallbackInvoker:
203 """An abstraction to invoke a callback when a shared count reaches zero
205 :param callback: Callback invoke when finalized count reaches zero
206 """
208 def __init__(self, callback):
209 self._lock = threading.Lock()
210 self._callback = callback
211 self._count = 0
212 self._is_finalized = False
214 @property
215 def current_count(self):
216 with self._lock:
217 return self._count
219 def increment(self):
220 """Increment the count by one"""
221 with self._lock:
222 if self._is_finalized:
223 raise RuntimeError(
224 'Counter has been finalized it can no longer be '
225 'incremented.'
226 )
227 self._count += 1
229 def decrement(self):
230 """Decrement the count by one"""
231 with self._lock:
232 if self._count == 0:
233 raise RuntimeError(
234 'Counter is at zero. It cannot dip below zero'
235 )
236 self._count -= 1
237 if self._is_finalized and self._count == 0:
238 self._callback()
240 def finalize(self):
241 """Finalize the counter
243 Once finalized, the counter never be incremented and the callback
244 can be invoked once the count reaches zero
245 """
246 with self._lock:
247 self._is_finalized = True
248 if self._count == 0:
249 self._callback()
252class OSUtils:
253 _MAX_FILENAME_LEN = 255
255 def get_file_size(self, filename):
256 return os.path.getsize(filename)
258 def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
259 return ReadFileChunk.from_filename(
260 filename, start_byte, size, callbacks, enable_callbacks=False
261 )
263 def open_file_chunk_reader_from_fileobj(
264 self,
265 fileobj,
266 chunk_size,
267 full_file_size,
268 callbacks,
269 close_callbacks=None,
270 ):
271 return ReadFileChunk(
272 fileobj,
273 chunk_size,
274 full_file_size,
275 callbacks=callbacks,
276 enable_callbacks=False,
277 close_callbacks=close_callbacks,
278 )
280 def open(self, filename, mode):
281 return open(filename, mode)
283 def remove_file(self, filename):
284 """Remove a file, noop if file does not exist."""
285 # Unlike os.remove, if the file does not exist,
286 # then this method does nothing.
287 try:
288 os.remove(filename)
289 except OSError:
290 pass
292 def rename_file(self, current_filename, new_filename):
293 rename_file(current_filename, new_filename)
295 def is_special_file(cls, filename):
296 """Checks to see if a file is a special UNIX file.
298 It checks if the file is a character special device, block special
299 device, FIFO, or socket.
301 :param filename: Name of the file
303 :returns: True if the file is a special file. False, if is not.
304 """
305 # If it does not exist, it must be a new file so it cannot be
306 # a special file.
307 if not os.path.exists(filename):
308 return False
309 mode = os.stat(filename).st_mode
310 # Character special device.
311 if stat.S_ISCHR(mode):
312 return True
313 # Block special device
314 if stat.S_ISBLK(mode):
315 return True
316 # Named pipe / FIFO
317 if stat.S_ISFIFO(mode):
318 return True
319 # Socket.
320 if stat.S_ISSOCK(mode):
321 return True
322 return False
324 def get_temp_filename(self, filename):
325 suffix = os.extsep + random_file_extension()
326 path = os.path.dirname(filename)
327 name = os.path.basename(filename)
328 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
329 return os.path.join(path, temp_filename)
331 def allocate(self, filename, size):
332 try:
333 with self.open(filename, 'wb') as f:
334 fallocate(f, size)
335 except OSError:
336 self.remove_file(filename)
337 raise
340class DeferredOpenFile:
341 def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
342 """A class that defers the opening of a file till needed
344 This is useful for deferring opening of a file till it is needed
345 in a separate thread, as there is a limit of how many open files
346 there can be in a single thread for most operating systems. The
347 file gets opened in the following methods: ``read()``, ``seek()``,
348 and ``__enter__()``
350 :type filename: str
351 :param filename: The name of the file to open
353 :type start_byte: int
354 :param start_byte: The byte to seek to when the file is opened.
356 :type mode: str
357 :param mode: The mode to use to open the file
359 :type open_function: function
360 :param open_function: The function to use to open the file
361 """
362 self._filename = filename
363 self._fileobj = None
364 self._start_byte = start_byte
365 self._mode = mode
366 self._open_function = open_function
368 def _open_if_needed(self):
369 if self._fileobj is None:
370 self._fileobj = self._open_function(self._filename, self._mode)
371 if self._start_byte != 0:
372 self._fileobj.seek(self._start_byte)
374 @property
375 def name(self):
376 return self._filename
378 def read(self, amount=None):
379 self._open_if_needed()
380 return self._fileobj.read(amount)
382 def write(self, data):
383 self._open_if_needed()
384 self._fileobj.write(data)
386 def seek(self, where, whence=0):
387 self._open_if_needed()
388 self._fileobj.seek(where, whence)
390 def tell(self):
391 if self._fileobj is None:
392 return self._start_byte
393 return self._fileobj.tell()
395 def close(self):
396 if self._fileobj:
397 self._fileobj.close()
399 def __enter__(self):
400 self._open_if_needed()
401 return self
403 def __exit__(self, *args, **kwargs):
404 self.close()
407class ReadFileChunk:
408 def __init__(
409 self,
410 fileobj,
411 chunk_size,
412 full_file_size,
413 callbacks=None,
414 enable_callbacks=True,
415 close_callbacks=None,
416 ):
417 """
419 Given a file object shown below::
421 |___________________________________________________|
422 0 | | full_file_size
423 |----chunk_size---|
424 f.tell()
426 :type fileobj: file
427 :param fileobj: File like object
429 :type chunk_size: int
430 :param chunk_size: The max chunk size to read. Trying to read
431 pass the end of the chunk size will behave like you've
432 reached the end of the file.
434 :type full_file_size: int
435 :param full_file_size: The entire content length associated
436 with ``fileobj``.
438 :type callbacks: A list of function(amount_read)
439 :param callbacks: Called whenever data is read from this object in the
440 order provided.
442 :type enable_callbacks: boolean
443 :param enable_callbacks: True if to run callbacks. Otherwise, do not
444 run callbacks
446 :type close_callbacks: A list of function()
447 :param close_callbacks: Called when close is called. The function
448 should take no arguments.
449 """
450 self._fileobj = fileobj
451 self._start_byte = self._fileobj.tell()
452 self._size = self._calculate_file_size(
453 self._fileobj,
454 requested_size=chunk_size,
455 start_byte=self._start_byte,
456 actual_file_size=full_file_size,
457 )
458 # _amount_read represents the position in the chunk and may exceed
459 # the chunk size, but won't allow reads out of bounds.
460 self._amount_read = 0
461 self._callbacks = callbacks
462 if callbacks is None:
463 self._callbacks = []
464 self._callbacks_enabled = enable_callbacks
465 self._close_callbacks = close_callbacks
466 if close_callbacks is None:
467 self._close_callbacks = close_callbacks
469 @classmethod
470 def from_filename(
471 cls,
472 filename,
473 start_byte,
474 chunk_size,
475 callbacks=None,
476 enable_callbacks=True,
477 ):
478 """Convenience factory function to create from a filename.
480 :type start_byte: int
481 :param start_byte: The first byte from which to start reading.
483 :type chunk_size: int
484 :param chunk_size: The max chunk size to read. Trying to read
485 pass the end of the chunk size will behave like you've
486 reached the end of the file.
488 :type full_file_size: int
489 :param full_file_size: The entire content length associated
490 with ``fileobj``.
492 :type callbacks: function(amount_read)
493 :param callbacks: Called whenever data is read from this object.
495 :type enable_callbacks: bool
496 :param enable_callbacks: Indicate whether to invoke callback
497 during read() calls.
499 :rtype: ``ReadFileChunk``
500 :return: A new instance of ``ReadFileChunk``
502 """
503 f = open(filename, 'rb')
504 f.seek(start_byte)
505 file_size = os.fstat(f.fileno()).st_size
506 return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
508 def _calculate_file_size(
509 self, fileobj, requested_size, start_byte, actual_file_size
510 ):
511 max_chunk_size = actual_file_size - start_byte
512 return min(max_chunk_size, requested_size)
514 def read(self, amount=None):
515 amount_left = max(self._size - self._amount_read, 0)
516 if amount is None:
517 amount_to_read = amount_left
518 else:
519 amount_to_read = min(amount_left, amount)
520 data = self._fileobj.read(amount_to_read)
521 self._amount_read += len(data)
522 if self._callbacks is not None and self._callbacks_enabled:
523 invoke_progress_callbacks(self._callbacks, len(data))
524 return data
526 def signal_transferring(self):
527 self.enable_callback()
528 if hasattr(self._fileobj, 'signal_transferring'):
529 self._fileobj.signal_transferring()
531 def signal_not_transferring(self):
532 self.disable_callback()
533 if hasattr(self._fileobj, 'signal_not_transferring'):
534 self._fileobj.signal_not_transferring()
536 def enable_callback(self):
537 self._callbacks_enabled = True
539 def disable_callback(self):
540 self._callbacks_enabled = False
542 def seek(self, where, whence=0):
543 if whence not in (0, 1, 2):
544 # Mimic io's error for invalid whence values
545 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
547 # Recalculate where based on chunk attributes so seek from file
548 # start (whence=0) is always used
549 where += self._start_byte
550 if whence == 1:
551 where += self._amount_read
552 elif whence == 2:
553 where += self._size
555 self._fileobj.seek(max(where, self._start_byte))
556 if self._callbacks is not None and self._callbacks_enabled:
557 # To also rewind the callback() for an accurate progress report
558 bounded_where = max(min(where - self._start_byte, self._size), 0)
559 bounded_amount_read = min(self._amount_read, self._size)
560 amount = bounded_where - bounded_amount_read
561 invoke_progress_callbacks(
562 self._callbacks, bytes_transferred=amount
563 )
564 self._amount_read = max(where - self._start_byte, 0)
566 def close(self):
567 if self._close_callbacks is not None and self._callbacks_enabled:
568 for callback in self._close_callbacks:
569 callback()
570 self._fileobj.close()
572 def tell(self):
573 return self._amount_read
575 def __len__(self):
576 # __len__ is defined because requests will try to determine the length
577 # of the stream to set a content length. In the normal case
578 # of the file it will just stat the file, but we need to change that
579 # behavior. By providing a __len__, requests will use that instead
580 # of stat'ing the file.
581 return self._size
583 def __enter__(self):
584 return self
586 def __exit__(self, *args, **kwargs):
587 self.close()
589 def __iter__(self):
590 # This is a workaround for http://bugs.python.org/issue17575
591 # Basically httplib will try to iterate over the contents, even
592 # if its a file like object. This wasn't noticed because we've
593 # already exhausted the stream so iterating over the file immediately
594 # stops, which is what we're simulating here.
595 return iter([])
598class StreamReaderProgress:
599 """Wrapper for a read only stream that adds progress callbacks."""
601 def __init__(self, stream, callbacks=None):
602 self._stream = stream
603 self._callbacks = callbacks
604 if callbacks is None:
605 self._callbacks = []
607 def read(self, *args, **kwargs):
608 value = self._stream.read(*args, **kwargs)
609 invoke_progress_callbacks(self._callbacks, len(value))
610 return value
613class NoResourcesAvailable(Exception):
614 pass
617class TaskSemaphore:
618 def __init__(self, count):
619 """A semaphore for the purpose of limiting the number of tasks
621 :param count: The size of semaphore
622 """
623 self._semaphore = threading.Semaphore(count)
625 def acquire(self, tag, blocking=True):
626 """Acquire the semaphore
628 :param tag: A tag identifying what is acquiring the semaphore. Note
629 that this is not really needed to directly use this class but is
630 needed for API compatibility with the SlidingWindowSemaphore
631 implementation.
632 :param block: If True, block until it can be acquired. If False,
633 do not block and raise an exception if cannot be acquired.
635 :returns: A token (can be None) to use when releasing the semaphore
636 """
637 logger.debug("Acquiring %s", tag)
638 if not self._semaphore.acquire(blocking):
639 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
641 def release(self, tag, acquire_token):
642 """Release the semaphore
644 :param tag: A tag identifying what is releasing the semaphore
645 :param acquire_token: The token returned from when the semaphore was
646 acquired. Note that this is not really needed to directly use this
647 class but is needed for API compatibility with the
648 SlidingWindowSemaphore implementation.
649 """
650 logger.debug(f"Releasing acquire {tag}/{acquire_token}")
651 self._semaphore.release()
654class SlidingWindowSemaphore(TaskSemaphore):
655 """A semaphore used to coordinate sequential resource access.
657 This class is similar to the stdlib BoundedSemaphore:
659 * It's initialized with a count.
660 * Each call to ``acquire()`` decrements the counter.
661 * If the count is at zero, then ``acquire()`` will either block until the
662 count increases, or if ``blocking=False``, then it will raise
663 a NoResourcesAvailable exception indicating that it failed to acquire the
664 semaphore.
666 The main difference is that this semaphore is used to limit
667 access to a resource that requires sequential access. For example,
668 if I want to access resource R that has 20 subresources R_0 - R_19,
669 this semaphore can also enforce that you only have a max range of
670 10 at any given point in time. You must also specify a tag name
671 when you acquire the semaphore. The sliding window semantics apply
672 on a per tag basis. The internal count will only be incremented
673 when the minimum sequence number for a tag is released.
675 """
677 def __init__(self, count):
678 self._count = count
679 # Dict[tag, next_sequence_number].
680 self._tag_sequences = defaultdict(int)
681 self._lowest_sequence = {}
682 self._lock = threading.Lock()
683 self._condition = threading.Condition(self._lock)
684 # Dict[tag, List[sequence_number]]
685 self._pending_release = {}
687 def current_count(self):
688 with self._lock:
689 return self._count
691 def acquire(self, tag, blocking=True):
692 logger.debug("Acquiring %s", tag)
693 self._condition.acquire()
694 try:
695 if self._count == 0:
696 if not blocking:
697 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
698 else:
699 while self._count == 0:
700 self._condition.wait()
701 # self._count is no longer zero.
702 # First, check if this is the first time we're seeing this tag.
703 sequence_number = self._tag_sequences[tag]
704 if sequence_number == 0:
705 # First time seeing the tag, so record we're at 0.
706 self._lowest_sequence[tag] = sequence_number
707 self._tag_sequences[tag] += 1
708 self._count -= 1
709 return sequence_number
710 finally:
711 self._condition.release()
713 def release(self, tag, acquire_token):
714 sequence_number = acquire_token
715 logger.debug("Releasing acquire %s/%s", tag, sequence_number)
716 self._condition.acquire()
717 try:
718 if tag not in self._tag_sequences:
719 raise ValueError("Attempted to release unknown tag: %s" % tag)
720 max_sequence = self._tag_sequences[tag]
721 if self._lowest_sequence[tag] == sequence_number:
722 # We can immediately process this request and free up
723 # resources.
724 self._lowest_sequence[tag] += 1
725 self._count += 1
726 self._condition.notify()
727 queued = self._pending_release.get(tag, [])
728 while queued:
729 if self._lowest_sequence[tag] == queued[-1]:
730 queued.pop()
731 self._lowest_sequence[tag] += 1
732 self._count += 1
733 else:
734 break
735 elif self._lowest_sequence[tag] < sequence_number < max_sequence:
736 # We can't do anything right now because we're still waiting
737 # for the min sequence for the tag to be released. We have
738 # to queue this for pending release.
739 self._pending_release.setdefault(tag, []).append(
740 sequence_number
741 )
742 self._pending_release[tag].sort(reverse=True)
743 else:
744 raise ValueError(
745 "Attempted to release unknown sequence number "
746 "%s for tag: %s" % (sequence_number, tag)
747 )
748 finally:
749 self._condition.release()
752class ChunksizeAdjuster:
753 def __init__(
754 self,
755 max_size=MAX_SINGLE_UPLOAD_SIZE,
756 min_size=MIN_UPLOAD_CHUNKSIZE,
757 max_parts=MAX_PARTS,
758 ):
759 self.max_size = max_size
760 self.min_size = min_size
761 self.max_parts = max_parts
763 def adjust_chunksize(self, current_chunksize, file_size=None):
764 """Get a chunksize close to current that fits within all S3 limits.
766 :type current_chunksize: int
767 :param current_chunksize: The currently configured chunksize.
769 :type file_size: int or None
770 :param file_size: The size of the file to upload. This might be None
771 if the object being transferred has an unknown size.
773 :returns: A valid chunksize that fits within configured limits.
774 """
775 chunksize = current_chunksize
776 if file_size is not None:
777 chunksize = self._adjust_for_max_parts(chunksize, file_size)
778 return self._adjust_for_chunksize_limits(chunksize)
780 def _adjust_for_chunksize_limits(self, current_chunksize):
781 if current_chunksize > self.max_size:
782 logger.debug(
783 "Chunksize greater than maximum chunksize. "
784 "Setting to %s from %s." % (self.max_size, current_chunksize)
785 )
786 return self.max_size
787 elif current_chunksize < self.min_size:
788 logger.debug(
789 "Chunksize less than minimum chunksize. "
790 "Setting to %s from %s." % (self.min_size, current_chunksize)
791 )
792 return self.min_size
793 else:
794 return current_chunksize
796 def _adjust_for_max_parts(self, current_chunksize, file_size):
797 chunksize = current_chunksize
798 num_parts = int(math.ceil(file_size / float(chunksize)))
800 while num_parts > self.max_parts:
801 chunksize *= 2
802 num_parts = int(math.ceil(file_size / float(chunksize)))
804 if chunksize != current_chunksize:
805 logger.debug(
806 "Chunksize would result in the number of parts exceeding the "
807 "maximum. Setting to %s from %s."
808 % (chunksize, current_chunksize)
809 )
811 return chunksize
814def add_s3express_defaults(bucket, extra_args):
815 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
816 # Default Transfer Operations to S3Express to use CRC32
817 extra_args["ChecksumAlgorithm"] = "crc32"