Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/utils.py: 28%
355 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import functools
14import logging
15import math
16import os
17import random
18import socket
19import stat
20import string
21import threading
22from collections import defaultdict
24from botocore.exceptions import IncompleteReadError, ReadTimeoutError
25from botocore.httpchecksum import AwsChunkedWrapper
26from botocore.utils import is_s3express_bucket
28from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
30MAX_PARTS = 10000
31# The maximum file size you can upload via S3 per request.
32# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
33# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
34MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
35MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
36logger = logging.getLogger(__name__)
39S3_RETRYABLE_DOWNLOAD_ERRORS = (
40 socket.timeout,
41 SOCKET_ERROR,
42 ReadTimeoutError,
43 IncompleteReadError,
44)
47def random_file_extension(num_digits=8):
48 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
51def signal_not_transferring(request, operation_name, **kwargs):
52 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
53 request.body, 'signal_not_transferring'
54 ):
55 request.body.signal_not_transferring()
58def signal_transferring(request, operation_name, **kwargs):
59 if operation_name in ['PutObject', 'UploadPart']:
60 body = request.body
61 if isinstance(body, AwsChunkedWrapper):
62 body = getattr(body, '_raw', None)
63 if hasattr(body, 'signal_transferring'):
64 body.signal_transferring()
67def calculate_num_parts(size, part_size):
68 return int(math.ceil(size / float(part_size)))
71def calculate_range_parameter(
72 part_size, part_index, num_parts, total_size=None
73):
74 """Calculate the range parameter for multipart downloads/copies
76 :type part_size: int
77 :param part_size: The size of the part
79 :type part_index: int
80 :param part_index: The index for which this parts starts. This index starts
81 at zero
83 :type num_parts: int
84 :param num_parts: The total number of parts in the transfer
86 :returns: The value to use for Range parameter on downloads or
87 the CopySourceRange parameter for copies
88 """
89 # Used to calculate the Range parameter
90 start_range = part_index * part_size
91 if part_index == num_parts - 1:
92 end_range = ''
93 if total_size is not None:
94 end_range = str(total_size - 1)
95 else:
96 end_range = start_range + part_size - 1
97 range_param = f'bytes={start_range}-{end_range}'
98 return range_param
101def get_callbacks(transfer_future, callback_type):
102 """Retrieves callbacks from a subscriber
104 :type transfer_future: s3transfer.futures.TransferFuture
105 :param transfer_future: The transfer future the subscriber is associated
106 to.
108 :type callback_type: str
109 :param callback_type: The type of callback to retrieve from the subscriber.
110 Valid types include:
111 * 'queued'
112 * 'progress'
113 * 'done'
115 :returns: A list of callbacks for the type specified. All callbacks are
116 preinjected with the transfer future.
117 """
118 callbacks = []
119 for subscriber in transfer_future.meta.call_args.subscribers:
120 callback_name = 'on_' + callback_type
121 if hasattr(subscriber, callback_name):
122 callbacks.append(
123 functools.partial(
124 getattr(subscriber, callback_name), future=transfer_future
125 )
126 )
127 return callbacks
130def invoke_progress_callbacks(callbacks, bytes_transferred):
131 """Calls all progress callbacks
133 :param callbacks: A list of progress callbacks to invoke
134 :param bytes_transferred: The number of bytes transferred. This is passed
135 to the callbacks. If no bytes were transferred the callbacks will not
136 be invoked because no progress was achieved. It is also possible
137 to receive a negative amount which comes from retrying a transfer
138 request.
139 """
140 # Only invoke the callbacks if bytes were actually transferred.
141 if bytes_transferred:
142 for callback in callbacks:
143 callback(bytes_transferred=bytes_transferred)
146def get_filtered_dict(original_dict, whitelisted_keys):
147 """Gets a dictionary filtered by whitelisted keys
149 :param original_dict: The original dictionary of arguments to source keys
150 and values.
151 :param whitelisted_key: A list of keys to include in the filtered
152 dictionary.
154 :returns: A dictionary containing key/values from the original dictionary
155 whose key was included in the whitelist
156 """
157 filtered_dict = {}
158 for key, value in original_dict.items():
159 if key in whitelisted_keys:
160 filtered_dict[key] = value
161 return filtered_dict
164class CallArgs:
165 def __init__(self, **kwargs):
166 """A class that records call arguments
168 The call arguments must be passed as keyword arguments. It will set
169 each keyword argument as an attribute of the object along with its
170 associated value.
171 """
172 for arg, value in kwargs.items():
173 setattr(self, arg, value)
176class FunctionContainer:
177 """An object that contains a function and any args or kwargs to call it
179 When called the provided function will be called with provided args
180 and kwargs.
181 """
183 def __init__(self, func, *args, **kwargs):
184 self._func = func
185 self._args = args
186 self._kwargs = kwargs
188 def __repr__(self):
189 return 'Function: {} with args {} and kwargs {}'.format(
190 self._func, self._args, self._kwargs
191 )
193 def __call__(self):
194 return self._func(*self._args, **self._kwargs)
197class CountCallbackInvoker:
198 """An abstraction to invoke a callback when a shared count reaches zero
200 :param callback: Callback invoke when finalized count reaches zero
201 """
203 def __init__(self, callback):
204 self._lock = threading.Lock()
205 self._callback = callback
206 self._count = 0
207 self._is_finalized = False
209 @property
210 def current_count(self):
211 with self._lock:
212 return self._count
214 def increment(self):
215 """Increment the count by one"""
216 with self._lock:
217 if self._is_finalized:
218 raise RuntimeError(
219 'Counter has been finalized it can no longer be '
220 'incremented.'
221 )
222 self._count += 1
224 def decrement(self):
225 """Decrement the count by one"""
226 with self._lock:
227 if self._count == 0:
228 raise RuntimeError(
229 'Counter is at zero. It cannot dip below zero'
230 )
231 self._count -= 1
232 if self._is_finalized and self._count == 0:
233 self._callback()
235 def finalize(self):
236 """Finalize the counter
238 Once finalized, the counter never be incremented and the callback
239 can be invoked once the count reaches zero
240 """
241 with self._lock:
242 self._is_finalized = True
243 if self._count == 0:
244 self._callback()
247class OSUtils:
248 _MAX_FILENAME_LEN = 255
250 def get_file_size(self, filename):
251 return os.path.getsize(filename)
253 def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
254 return ReadFileChunk.from_filename(
255 filename, start_byte, size, callbacks, enable_callbacks=False
256 )
258 def open_file_chunk_reader_from_fileobj(
259 self,
260 fileobj,
261 chunk_size,
262 full_file_size,
263 callbacks,
264 close_callbacks=None,
265 ):
266 return ReadFileChunk(
267 fileobj,
268 chunk_size,
269 full_file_size,
270 callbacks=callbacks,
271 enable_callbacks=False,
272 close_callbacks=close_callbacks,
273 )
275 def open(self, filename, mode):
276 return open(filename, mode)
278 def remove_file(self, filename):
279 """Remove a file, noop if file does not exist."""
280 # Unlike os.remove, if the file does not exist,
281 # then this method does nothing.
282 try:
283 os.remove(filename)
284 except OSError:
285 pass
287 def rename_file(self, current_filename, new_filename):
288 rename_file(current_filename, new_filename)
290 def is_special_file(cls, filename):
291 """Checks to see if a file is a special UNIX file.
293 It checks if the file is a character special device, block special
294 device, FIFO, or socket.
296 :param filename: Name of the file
298 :returns: True if the file is a special file. False, if is not.
299 """
300 # If it does not exist, it must be a new file so it cannot be
301 # a special file.
302 if not os.path.exists(filename):
303 return False
304 mode = os.stat(filename).st_mode
305 # Character special device.
306 if stat.S_ISCHR(mode):
307 return True
308 # Block special device
309 if stat.S_ISBLK(mode):
310 return True
311 # Named pipe / FIFO
312 if stat.S_ISFIFO(mode):
313 return True
314 # Socket.
315 if stat.S_ISSOCK(mode):
316 return True
317 return False
319 def get_temp_filename(self, filename):
320 suffix = os.extsep + random_file_extension()
321 path = os.path.dirname(filename)
322 name = os.path.basename(filename)
323 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
324 return os.path.join(path, temp_filename)
326 def allocate(self, filename, size):
327 try:
328 with self.open(filename, 'wb') as f:
329 fallocate(f, size)
330 except OSError:
331 self.remove_file(filename)
332 raise
335class DeferredOpenFile:
336 def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
337 """A class that defers the opening of a file till needed
339 This is useful for deferring opening of a file till it is needed
340 in a separate thread, as there is a limit of how many open files
341 there can be in a single thread for most operating systems. The
342 file gets opened in the following methods: ``read()``, ``seek()``,
343 and ``__enter__()``
345 :type filename: str
346 :param filename: The name of the file to open
348 :type start_byte: int
349 :param start_byte: The byte to seek to when the file is opened.
351 :type mode: str
352 :param mode: The mode to use to open the file
354 :type open_function: function
355 :param open_function: The function to use to open the file
356 """
357 self._filename = filename
358 self._fileobj = None
359 self._start_byte = start_byte
360 self._mode = mode
361 self._open_function = open_function
363 def _open_if_needed(self):
364 if self._fileobj is None:
365 self._fileobj = self._open_function(self._filename, self._mode)
366 if self._start_byte != 0:
367 self._fileobj.seek(self._start_byte)
369 @property
370 def name(self):
371 return self._filename
373 def read(self, amount=None):
374 self._open_if_needed()
375 return self._fileobj.read(amount)
377 def write(self, data):
378 self._open_if_needed()
379 self._fileobj.write(data)
381 def seek(self, where, whence=0):
382 self._open_if_needed()
383 self._fileobj.seek(where, whence)
385 def tell(self):
386 if self._fileobj is None:
387 return self._start_byte
388 return self._fileobj.tell()
390 def close(self):
391 if self._fileobj:
392 self._fileobj.close()
394 def __enter__(self):
395 self._open_if_needed()
396 return self
398 def __exit__(self, *args, **kwargs):
399 self.close()
402class ReadFileChunk:
403 def __init__(
404 self,
405 fileobj,
406 chunk_size,
407 full_file_size,
408 callbacks=None,
409 enable_callbacks=True,
410 close_callbacks=None,
411 ):
412 """
414 Given a file object shown below::
416 |___________________________________________________|
417 0 | | full_file_size
418 |----chunk_size---|
419 f.tell()
421 :type fileobj: file
422 :param fileobj: File like object
424 :type chunk_size: int
425 :param chunk_size: The max chunk size to read. Trying to read
426 pass the end of the chunk size will behave like you've
427 reached the end of the file.
429 :type full_file_size: int
430 :param full_file_size: The entire content length associated
431 with ``fileobj``.
433 :type callbacks: A list of function(amount_read)
434 :param callbacks: Called whenever data is read from this object in the
435 order provided.
437 :type enable_callbacks: boolean
438 :param enable_callbacks: True if to run callbacks. Otherwise, do not
439 run callbacks
441 :type close_callbacks: A list of function()
442 :param close_callbacks: Called when close is called. The function
443 should take no arguments.
444 """
445 self._fileobj = fileobj
446 self._start_byte = self._fileobj.tell()
447 self._size = self._calculate_file_size(
448 self._fileobj,
449 requested_size=chunk_size,
450 start_byte=self._start_byte,
451 actual_file_size=full_file_size,
452 )
453 # _amount_read represents the position in the chunk and may exceed
454 # the chunk size, but won't allow reads out of bounds.
455 self._amount_read = 0
456 self._callbacks = callbacks
457 if callbacks is None:
458 self._callbacks = []
459 self._callbacks_enabled = enable_callbacks
460 self._close_callbacks = close_callbacks
461 if close_callbacks is None:
462 self._close_callbacks = close_callbacks
464 @classmethod
465 def from_filename(
466 cls,
467 filename,
468 start_byte,
469 chunk_size,
470 callbacks=None,
471 enable_callbacks=True,
472 ):
473 """Convenience factory function to create from a filename.
475 :type start_byte: int
476 :param start_byte: The first byte from which to start reading.
478 :type chunk_size: int
479 :param chunk_size: The max chunk size to read. Trying to read
480 pass the end of the chunk size will behave like you've
481 reached the end of the file.
483 :type full_file_size: int
484 :param full_file_size: The entire content length associated
485 with ``fileobj``.
487 :type callbacks: function(amount_read)
488 :param callbacks: Called whenever data is read from this object.
490 :type enable_callbacks: bool
491 :param enable_callbacks: Indicate whether to invoke callback
492 during read() calls.
494 :rtype: ``ReadFileChunk``
495 :return: A new instance of ``ReadFileChunk``
497 """
498 f = open(filename, 'rb')
499 f.seek(start_byte)
500 file_size = os.fstat(f.fileno()).st_size
501 return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
503 def _calculate_file_size(
504 self, fileobj, requested_size, start_byte, actual_file_size
505 ):
506 max_chunk_size = actual_file_size - start_byte
507 return min(max_chunk_size, requested_size)
509 def read(self, amount=None):
510 amount_left = max(self._size - self._amount_read, 0)
511 if amount is None:
512 amount_to_read = amount_left
513 else:
514 amount_to_read = min(amount_left, amount)
515 data = self._fileobj.read(amount_to_read)
516 self._amount_read += len(data)
517 if self._callbacks is not None and self._callbacks_enabled:
518 invoke_progress_callbacks(self._callbacks, len(data))
519 return data
521 def signal_transferring(self):
522 self.enable_callback()
523 if hasattr(self._fileobj, 'signal_transferring'):
524 self._fileobj.signal_transferring()
526 def signal_not_transferring(self):
527 self.disable_callback()
528 if hasattr(self._fileobj, 'signal_not_transferring'):
529 self._fileobj.signal_not_transferring()
531 def enable_callback(self):
532 self._callbacks_enabled = True
534 def disable_callback(self):
535 self._callbacks_enabled = False
537 def seek(self, where, whence=0):
538 if whence not in (0, 1, 2):
539 # Mimic io's error for invalid whence values
540 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
542 # Recalculate where based on chunk attributes so seek from file
543 # start (whence=0) is always used
544 where += self._start_byte
545 if whence == 1:
546 where += self._amount_read
547 elif whence == 2:
548 where += self._size
550 self._fileobj.seek(max(where, self._start_byte))
551 if self._callbacks is not None and self._callbacks_enabled:
552 # To also rewind the callback() for an accurate progress report
553 bounded_where = max(min(where - self._start_byte, self._size), 0)
554 bounded_amount_read = min(self._amount_read, self._size)
555 amount = bounded_where - bounded_amount_read
556 invoke_progress_callbacks(
557 self._callbacks, bytes_transferred=amount
558 )
559 self._amount_read = max(where - self._start_byte, 0)
561 def close(self):
562 if self._close_callbacks is not None and self._callbacks_enabled:
563 for callback in self._close_callbacks:
564 callback()
565 self._fileobj.close()
567 def tell(self):
568 return self._amount_read
570 def __len__(self):
571 # __len__ is defined because requests will try to determine the length
572 # of the stream to set a content length. In the normal case
573 # of the file it will just stat the file, but we need to change that
574 # behavior. By providing a __len__, requests will use that instead
575 # of stat'ing the file.
576 return self._size
578 def __enter__(self):
579 return self
581 def __exit__(self, *args, **kwargs):
582 self.close()
584 def __iter__(self):
585 # This is a workaround for http://bugs.python.org/issue17575
586 # Basically httplib will try to iterate over the contents, even
587 # if its a file like object. This wasn't noticed because we've
588 # already exhausted the stream so iterating over the file immediately
589 # stops, which is what we're simulating here.
590 return iter([])
593class StreamReaderProgress:
594 """Wrapper for a read only stream that adds progress callbacks."""
596 def __init__(self, stream, callbacks=None):
597 self._stream = stream
598 self._callbacks = callbacks
599 if callbacks is None:
600 self._callbacks = []
602 def read(self, *args, **kwargs):
603 value = self._stream.read(*args, **kwargs)
604 invoke_progress_callbacks(self._callbacks, len(value))
605 return value
608class NoResourcesAvailable(Exception):
609 pass
612class TaskSemaphore:
613 def __init__(self, count):
614 """A semaphore for the purpose of limiting the number of tasks
616 :param count: The size of semaphore
617 """
618 self._semaphore = threading.Semaphore(count)
620 def acquire(self, tag, blocking=True):
621 """Acquire the semaphore
623 :param tag: A tag identifying what is acquiring the semaphore. Note
624 that this is not really needed to directly use this class but is
625 needed for API compatibility with the SlidingWindowSemaphore
626 implementation.
627 :param block: If True, block until it can be acquired. If False,
628 do not block and raise an exception if cannot be acquired.
630 :returns: A token (can be None) to use when releasing the semaphore
631 """
632 logger.debug("Acquiring %s", tag)
633 if not self._semaphore.acquire(blocking):
634 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
636 def release(self, tag, acquire_token):
637 """Release the semaphore
639 :param tag: A tag identifying what is releasing the semaphore
640 :param acquire_token: The token returned from when the semaphore was
641 acquired. Note that this is not really needed to directly use this
642 class but is needed for API compatibility with the
643 SlidingWindowSemaphore implementation.
644 """
645 logger.debug(f"Releasing acquire {tag}/{acquire_token}")
646 self._semaphore.release()
649class SlidingWindowSemaphore(TaskSemaphore):
650 """A semaphore used to coordinate sequential resource access.
652 This class is similar to the stdlib BoundedSemaphore:
654 * It's initialized with a count.
655 * Each call to ``acquire()`` decrements the counter.
656 * If the count is at zero, then ``acquire()`` will either block until the
657 count increases, or if ``blocking=False``, then it will raise
658 a NoResourcesAvailable exception indicating that it failed to acquire the
659 semaphore.
661 The main difference is that this semaphore is used to limit
662 access to a resource that requires sequential access. For example,
663 if I want to access resource R that has 20 subresources R_0 - R_19,
664 this semaphore can also enforce that you only have a max range of
665 10 at any given point in time. You must also specify a tag name
666 when you acquire the semaphore. The sliding window semantics apply
667 on a per tag basis. The internal count will only be incremented
668 when the minimum sequence number for a tag is released.
670 """
672 def __init__(self, count):
673 self._count = count
674 # Dict[tag, next_sequence_number].
675 self._tag_sequences = defaultdict(int)
676 self._lowest_sequence = {}
677 self._lock = threading.Lock()
678 self._condition = threading.Condition(self._lock)
679 # Dict[tag, List[sequence_number]]
680 self._pending_release = {}
682 def current_count(self):
683 with self._lock:
684 return self._count
686 def acquire(self, tag, blocking=True):
687 logger.debug("Acquiring %s", tag)
688 self._condition.acquire()
689 try:
690 if self._count == 0:
691 if not blocking:
692 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
693 else:
694 while self._count == 0:
695 self._condition.wait()
696 # self._count is no longer zero.
697 # First, check if this is the first time we're seeing this tag.
698 sequence_number = self._tag_sequences[tag]
699 if sequence_number == 0:
700 # First time seeing the tag, so record we're at 0.
701 self._lowest_sequence[tag] = sequence_number
702 self._tag_sequences[tag] += 1
703 self._count -= 1
704 return sequence_number
705 finally:
706 self._condition.release()
708 def release(self, tag, acquire_token):
709 sequence_number = acquire_token
710 logger.debug("Releasing acquire %s/%s", tag, sequence_number)
711 self._condition.acquire()
712 try:
713 if tag not in self._tag_sequences:
714 raise ValueError("Attempted to release unknown tag: %s" % tag)
715 max_sequence = self._tag_sequences[tag]
716 if self._lowest_sequence[tag] == sequence_number:
717 # We can immediately process this request and free up
718 # resources.
719 self._lowest_sequence[tag] += 1
720 self._count += 1
721 self._condition.notify()
722 queued = self._pending_release.get(tag, [])
723 while queued:
724 if self._lowest_sequence[tag] == queued[-1]:
725 queued.pop()
726 self._lowest_sequence[tag] += 1
727 self._count += 1
728 else:
729 break
730 elif self._lowest_sequence[tag] < sequence_number < max_sequence:
731 # We can't do anything right now because we're still waiting
732 # for the min sequence for the tag to be released. We have
733 # to queue this for pending release.
734 self._pending_release.setdefault(tag, []).append(
735 sequence_number
736 )
737 self._pending_release[tag].sort(reverse=True)
738 else:
739 raise ValueError(
740 "Attempted to release unknown sequence number "
741 "%s for tag: %s" % (sequence_number, tag)
742 )
743 finally:
744 self._condition.release()
747class ChunksizeAdjuster:
748 def __init__(
749 self,
750 max_size=MAX_SINGLE_UPLOAD_SIZE,
751 min_size=MIN_UPLOAD_CHUNKSIZE,
752 max_parts=MAX_PARTS,
753 ):
754 self.max_size = max_size
755 self.min_size = min_size
756 self.max_parts = max_parts
758 def adjust_chunksize(self, current_chunksize, file_size=None):
759 """Get a chunksize close to current that fits within all S3 limits.
761 :type current_chunksize: int
762 :param current_chunksize: The currently configured chunksize.
764 :type file_size: int or None
765 :param file_size: The size of the file to upload. This might be None
766 if the object being transferred has an unknown size.
768 :returns: A valid chunksize that fits within configured limits.
769 """
770 chunksize = current_chunksize
771 if file_size is not None:
772 chunksize = self._adjust_for_max_parts(chunksize, file_size)
773 return self._adjust_for_chunksize_limits(chunksize)
775 def _adjust_for_chunksize_limits(self, current_chunksize):
776 if current_chunksize > self.max_size:
777 logger.debug(
778 "Chunksize greater than maximum chunksize. "
779 "Setting to %s from %s." % (self.max_size, current_chunksize)
780 )
781 return self.max_size
782 elif current_chunksize < self.min_size:
783 logger.debug(
784 "Chunksize less than minimum chunksize. "
785 "Setting to %s from %s." % (self.min_size, current_chunksize)
786 )
787 return self.min_size
788 else:
789 return current_chunksize
791 def _adjust_for_max_parts(self, current_chunksize, file_size):
792 chunksize = current_chunksize
793 num_parts = int(math.ceil(file_size / float(chunksize)))
795 while num_parts > self.max_parts:
796 chunksize *= 2
797 num_parts = int(math.ceil(file_size / float(chunksize)))
799 if chunksize != current_chunksize:
800 logger.debug(
801 "Chunksize would result in the number of parts exceeding the "
802 "maximum. Setting to %s from %s."
803 % (chunksize, current_chunksize)
804 )
806 return chunksize
809def add_s3express_defaults(bucket, extra_args):
810 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
811 # Default Transfer Operations to S3Express to use CRC32
812 extra_args["ChecksumAlgorithm"] = "crc32"