Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/utils.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import functools
14import logging
15import math
16import os
17import random
18import socket
19import stat
20import string
21import threading
22from collections import defaultdict
24from botocore.exceptions import (
25 IncompleteReadError,
26 ReadTimeoutError,
27 ResponseStreamingError,
28)
29from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper
30from botocore.utils import is_s3express_bucket
32from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
33from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
35MAX_PARTS = 10000
36# The maximum file size you can upload via S3 per request.
37# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
38# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
39MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
40MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
41logger = logging.getLogger(__name__)
44S3_RETRYABLE_DOWNLOAD_ERRORS = (
45 socket.timeout,
46 SOCKET_ERROR,
47 ReadTimeoutError,
48 IncompleteReadError,
49 ResponseStreamingError,
50)
53def random_file_extension(num_digits=8):
54 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
57def signal_not_transferring(request, operation_name, **kwargs):
58 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
59 request.body, 'signal_not_transferring'
60 ):
61 request.body.signal_not_transferring()
64def signal_transferring(request, operation_name, **kwargs):
65 if operation_name in ['PutObject', 'UploadPart']:
66 body = request.body
67 if isinstance(body, AwsChunkedWrapper):
68 body = getattr(body, '_raw', None)
69 if hasattr(body, 'signal_transferring'):
70 body.signal_transferring()
73def calculate_num_parts(size, part_size):
74 return int(math.ceil(size / float(part_size)))
77def calculate_range_parameter(
78 part_size, part_index, num_parts, total_size=None
79):
80 """Calculate the range parameter for multipart downloads/copies
82 :type part_size: int
83 :param part_size: The size of the part
85 :type part_index: int
86 :param part_index: The index for which this parts starts. This index starts
87 at zero
89 :type num_parts: int
90 :param num_parts: The total number of parts in the transfer
92 :returns: The value to use for Range parameter on downloads or
93 the CopySourceRange parameter for copies
94 """
95 # Used to calculate the Range parameter
96 start_range = part_index * part_size
97 if part_index == num_parts - 1:
98 end_range = ''
99 if total_size is not None:
100 end_range = str(total_size - 1)
101 else:
102 end_range = start_range + part_size - 1
103 range_param = f'bytes={start_range}-{end_range}'
104 return range_param
107def get_callbacks(transfer_future, callback_type):
108 """Retrieves callbacks from a subscriber
110 :type transfer_future: s3transfer.futures.TransferFuture
111 :param transfer_future: The transfer future the subscriber is associated
112 to.
114 :type callback_type: str
115 :param callback_type: The type of callback to retrieve from the subscriber.
116 Valid types include:
117 * 'queued'
118 * 'progress'
119 * 'done'
121 :returns: A list of callbacks for the type specified. All callbacks are
122 preinjected with the transfer future.
123 """
124 callbacks = []
125 for subscriber in transfer_future.meta.call_args.subscribers:
126 callback_name = 'on_' + callback_type
127 if hasattr(subscriber, callback_name):
128 callbacks.append(
129 functools.partial(
130 getattr(subscriber, callback_name), future=transfer_future
131 )
132 )
133 return callbacks
136def invoke_progress_callbacks(callbacks, bytes_transferred):
137 """Calls all progress callbacks
139 :param callbacks: A list of progress callbacks to invoke
140 :param bytes_transferred: The number of bytes transferred. This is passed
141 to the callbacks. If no bytes were transferred the callbacks will not
142 be invoked because no progress was achieved. It is also possible
143 to receive a negative amount which comes from retrying a transfer
144 request.
145 """
146 # Only invoke the callbacks if bytes were actually transferred.
147 if bytes_transferred:
148 for callback in callbacks:
149 callback(bytes_transferred=bytes_transferred)
152def get_filtered_dict(
153 original_dict, whitelisted_keys=None, blocklisted_keys=None
154):
155 """Gets a dictionary filtered by whitelisted and blocklisted keys.
157 :param original_dict: The original dictionary of arguments to source keys
158 and values.
159 :param whitelisted_key: A list of keys to include in the filtered
160 dictionary.
161 :param blocklisted_key: A list of keys to exclude in the filtered
162 dictionary.
164 :returns: A dictionary containing key/values from the original dictionary
165 whose key was included in the whitelist and/or not included in the
166 blocklist.
167 """
168 filtered_dict = {}
169 for key, value in original_dict.items():
170 if (whitelisted_keys and key in whitelisted_keys) or (
171 blocklisted_keys and key not in blocklisted_keys
172 ):
173 filtered_dict[key] = value
174 return filtered_dict
177class CallArgs:
178 def __init__(self, **kwargs):
179 """A class that records call arguments
181 The call arguments must be passed as keyword arguments. It will set
182 each keyword argument as an attribute of the object along with its
183 associated value.
184 """
185 for arg, value in kwargs.items():
186 setattr(self, arg, value)
189class FunctionContainer:
190 """An object that contains a function and any args or kwargs to call it
192 When called the provided function will be called with provided args
193 and kwargs.
194 """
196 def __init__(self, func, *args, **kwargs):
197 self._func = func
198 self._args = args
199 self._kwargs = kwargs
201 def __repr__(self):
202 return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}'
204 def __call__(self):
205 return self._func(*self._args, **self._kwargs)
208class CountCallbackInvoker:
209 """An abstraction to invoke a callback when a shared count reaches zero
211 :param callback: Callback invoke when finalized count reaches zero
212 """
214 def __init__(self, callback):
215 self._lock = threading.Lock()
216 self._callback = callback
217 self._count = 0
218 self._is_finalized = False
220 @property
221 def current_count(self):
222 with self._lock:
223 return self._count
225 def increment(self):
226 """Increment the count by one"""
227 with self._lock:
228 if self._is_finalized:
229 raise RuntimeError(
230 'Counter has been finalized it can no longer be '
231 'incremented.'
232 )
233 self._count += 1
235 def decrement(self):
236 """Decrement the count by one"""
237 with self._lock:
238 if self._count == 0:
239 raise RuntimeError(
240 'Counter is at zero. It cannot dip below zero'
241 )
242 self._count -= 1
243 if self._is_finalized and self._count == 0:
244 self._callback()
246 def finalize(self):
247 """Finalize the counter
249 Once finalized, the counter never be incremented and the callback
250 can be invoked once the count reaches zero
251 """
252 with self._lock:
253 self._is_finalized = True
254 if self._count == 0:
255 self._callback()
258class OSUtils:
259 _MAX_FILENAME_LEN = 255
261 def get_file_size(self, filename):
262 return os.path.getsize(filename)
264 def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
265 return ReadFileChunk.from_filename(
266 filename, start_byte, size, callbacks, enable_callbacks=False
267 )
269 def open_file_chunk_reader_from_fileobj(
270 self,
271 fileobj,
272 chunk_size,
273 full_file_size,
274 callbacks,
275 close_callbacks=None,
276 ):
277 return ReadFileChunk(
278 fileobj,
279 chunk_size,
280 full_file_size,
281 callbacks=callbacks,
282 enable_callbacks=False,
283 close_callbacks=close_callbacks,
284 )
286 def open(self, filename, mode):
287 return open(filename, mode)
289 def remove_file(self, filename):
290 """Remove a file, noop if file does not exist."""
291 # Unlike os.remove, if the file does not exist,
292 # then this method does nothing.
293 try:
294 os.remove(filename)
295 except OSError:
296 pass
298 def rename_file(self, current_filename, new_filename):
299 rename_file(current_filename, new_filename)
301 def is_special_file(cls, filename):
302 """Checks to see if a file is a special UNIX file.
304 It checks if the file is a character special device, block special
305 device, FIFO, or socket.
307 :param filename: Name of the file
309 :returns: True if the file is a special file. False, if is not.
310 """
311 # If it does not exist, it must be a new file so it cannot be
312 # a special file.
313 if not os.path.exists(filename):
314 return False
315 mode = os.stat(filename).st_mode
316 # Character special device.
317 if stat.S_ISCHR(mode):
318 return True
319 # Block special device
320 if stat.S_ISBLK(mode):
321 return True
322 # Named pipe / FIFO
323 if stat.S_ISFIFO(mode):
324 return True
325 # Socket.
326 if stat.S_ISSOCK(mode):
327 return True
328 return False
330 def get_temp_filename(self, filename):
331 suffix = os.extsep + random_file_extension()
332 path = os.path.dirname(filename)
333 name = os.path.basename(filename)
334 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
335 return os.path.join(path, temp_filename)
337 def allocate(self, filename, size):
338 try:
339 with self.open(filename, 'wb') as f:
340 fallocate(f, size)
341 except OSError:
342 self.remove_file(filename)
343 raise
346class DeferredOpenFile:
347 def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
348 """A class that defers the opening of a file till needed
350 This is useful for deferring opening of a file till it is needed
351 in a separate thread, as there is a limit of how many open files
352 there can be in a single thread for most operating systems. The
353 file gets opened in the following methods: ``read()``, ``seek()``,
354 and ``__enter__()``
356 :type filename: str
357 :param filename: The name of the file to open
359 :type start_byte: int
360 :param start_byte: The byte to seek to when the file is opened.
362 :type mode: str
363 :param mode: The mode to use to open the file
365 :type open_function: function
366 :param open_function: The function to use to open the file
367 """
368 self._filename = filename
369 self._fileobj = None
370 self._start_byte = start_byte
371 self._mode = mode
372 self._open_function = open_function
374 def _open_if_needed(self):
375 if self._fileobj is None:
376 self._fileobj = self._open_function(self._filename, self._mode)
377 if self._start_byte != 0:
378 self._fileobj.seek(self._start_byte)
380 @property
381 def name(self):
382 return self._filename
384 def read(self, amount=None):
385 self._open_if_needed()
386 return self._fileobj.read(amount)
388 def write(self, data):
389 self._open_if_needed()
390 self._fileobj.write(data)
392 def seek(self, where, whence=0):
393 self._open_if_needed()
394 self._fileobj.seek(where, whence)
396 def tell(self):
397 if self._fileobj is None:
398 return self._start_byte
399 return self._fileobj.tell()
401 def close(self):
402 if self._fileobj:
403 self._fileobj.close()
405 def __enter__(self):
406 self._open_if_needed()
407 return self
409 def __exit__(self, *args, **kwargs):
410 self.close()
413class ReadFileChunk:
414 def __init__(
415 self,
416 fileobj,
417 chunk_size,
418 full_file_size,
419 callbacks=None,
420 enable_callbacks=True,
421 close_callbacks=None,
422 ):
423 """
425 Given a file object shown below::
427 |___________________________________________________|
428 0 | | full_file_size
429 |----chunk_size---|
430 f.tell()
432 :type fileobj: file
433 :param fileobj: File like object
435 :type chunk_size: int
436 :param chunk_size: The max chunk size to read. Trying to read
437 pass the end of the chunk size will behave like you've
438 reached the end of the file.
440 :type full_file_size: int
441 :param full_file_size: The entire content length associated
442 with ``fileobj``.
444 :type callbacks: A list of function(amount_read)
445 :param callbacks: Called whenever data is read from this object in the
446 order provided.
448 :type enable_callbacks: boolean
449 :param enable_callbacks: True if to run callbacks. Otherwise, do not
450 run callbacks
452 :type close_callbacks: A list of function()
453 :param close_callbacks: Called when close is called. The function
454 should take no arguments.
455 """
456 self._fileobj = fileobj
457 self._start_byte = self._fileobj.tell()
458 self._size = self._calculate_file_size(
459 self._fileobj,
460 requested_size=chunk_size,
461 start_byte=self._start_byte,
462 actual_file_size=full_file_size,
463 )
464 # _amount_read represents the position in the chunk and may exceed
465 # the chunk size, but won't allow reads out of bounds.
466 self._amount_read = 0
467 self._callbacks = callbacks
468 if callbacks is None:
469 self._callbacks = []
470 self._callbacks_enabled = enable_callbacks
471 self._close_callbacks = close_callbacks
472 if close_callbacks is None:
473 self._close_callbacks = close_callbacks
475 @classmethod
476 def from_filename(
477 cls,
478 filename,
479 start_byte,
480 chunk_size,
481 callbacks=None,
482 enable_callbacks=True,
483 ):
484 """Convenience factory function to create from a filename.
486 :type start_byte: int
487 :param start_byte: The first byte from which to start reading.
489 :type chunk_size: int
490 :param chunk_size: The max chunk size to read. Trying to read
491 pass the end of the chunk size will behave like you've
492 reached the end of the file.
494 :type full_file_size: int
495 :param full_file_size: The entire content length associated
496 with ``fileobj``.
498 :type callbacks: function(amount_read)
499 :param callbacks: Called whenever data is read from this object.
501 :type enable_callbacks: bool
502 :param enable_callbacks: Indicate whether to invoke callback
503 during read() calls.
505 :rtype: ``ReadFileChunk``
506 :return: A new instance of ``ReadFileChunk``
508 """
509 f = open(filename, 'rb')
510 f.seek(start_byte)
511 file_size = os.fstat(f.fileno()).st_size
512 return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
514 def _calculate_file_size(
515 self, fileobj, requested_size, start_byte, actual_file_size
516 ):
517 max_chunk_size = actual_file_size - start_byte
518 return min(max_chunk_size, requested_size)
520 def read(self, amount=None):
521 amount_left = max(self._size - self._amount_read, 0)
522 if amount is None:
523 amount_to_read = amount_left
524 else:
525 amount_to_read = min(amount_left, amount)
526 data = self._fileobj.read(amount_to_read)
527 self._amount_read += len(data)
528 if self._callbacks is not None and self._callbacks_enabled:
529 invoke_progress_callbacks(self._callbacks, len(data))
530 return data
532 def signal_transferring(self):
533 self.enable_callback()
534 if hasattr(self._fileobj, 'signal_transferring'):
535 self._fileobj.signal_transferring()
537 def signal_not_transferring(self):
538 self.disable_callback()
539 if hasattr(self._fileobj, 'signal_not_transferring'):
540 self._fileobj.signal_not_transferring()
542 def enable_callback(self):
543 self._callbacks_enabled = True
545 def disable_callback(self):
546 self._callbacks_enabled = False
548 def seek(self, where, whence=0):
549 if whence not in (0, 1, 2):
550 # Mimic io's error for invalid whence values
551 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
553 # Recalculate where based on chunk attributes so seek from file
554 # start (whence=0) is always used
555 where += self._start_byte
556 if whence == 1:
557 where += self._amount_read
558 elif whence == 2:
559 where += self._size
561 self._fileobj.seek(max(where, self._start_byte))
562 if self._callbacks is not None and self._callbacks_enabled:
563 # To also rewind the callback() for an accurate progress report
564 bounded_where = max(min(where - self._start_byte, self._size), 0)
565 bounded_amount_read = min(self._amount_read, self._size)
566 amount = bounded_where - bounded_amount_read
567 invoke_progress_callbacks(
568 self._callbacks, bytes_transferred=amount
569 )
570 self._amount_read = max(where - self._start_byte, 0)
572 def close(self):
573 if self._close_callbacks is not None and self._callbacks_enabled:
574 for callback in self._close_callbacks:
575 callback()
576 self._fileobj.close()
578 def tell(self):
579 return self._amount_read
581 def __len__(self):
582 # __len__ is defined because requests will try to determine the length
583 # of the stream to set a content length. In the normal case
584 # of the file it will just stat the file, but we need to change that
585 # behavior. By providing a __len__, requests will use that instead
586 # of stat'ing the file.
587 return self._size
589 def __enter__(self):
590 return self
592 def __exit__(self, *args, **kwargs):
593 self.close()
595 def __iter__(self):
596 # This is a workaround for http://bugs.python.org/issue17575
597 # Basically httplib will try to iterate over the contents, even
598 # if its a file like object. This wasn't noticed because we've
599 # already exhausted the stream so iterating over the file immediately
600 # stops, which is what we're simulating here.
601 return iter([])
604class StreamReaderProgress:
605 """Wrapper for a read only stream that adds progress callbacks."""
607 def __init__(self, stream, callbacks=None):
608 self._stream = stream
609 self._callbacks = callbacks
610 if callbacks is None:
611 self._callbacks = []
613 def read(self, *args, **kwargs):
614 value = self._stream.read(*args, **kwargs)
615 invoke_progress_callbacks(self._callbacks, len(value))
616 return value
619class NoResourcesAvailable(Exception):
620 pass
623class TaskSemaphore:
624 def __init__(self, count):
625 """A semaphore for the purpose of limiting the number of tasks
627 :param count: The size of semaphore
628 """
629 self._semaphore = threading.Semaphore(count)
631 def acquire(self, tag, blocking=True):
632 """Acquire the semaphore
634 :param tag: A tag identifying what is acquiring the semaphore. Note
635 that this is not really needed to directly use this class but is
636 needed for API compatibility with the SlidingWindowSemaphore
637 implementation.
638 :param block: If True, block until it can be acquired. If False,
639 do not block and raise an exception if cannot be acquired.
641 :returns: A token (can be None) to use when releasing the semaphore
642 """
643 logger.debug("Acquiring %s", tag)
644 if not self._semaphore.acquire(blocking):
645 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
647 def release(self, tag, acquire_token):
648 """Release the semaphore
650 :param tag: A tag identifying what is releasing the semaphore
651 :param acquire_token: The token returned from when the semaphore was
652 acquired. Note that this is not really needed to directly use this
653 class but is needed for API compatibility with the
654 SlidingWindowSemaphore implementation.
655 """
656 logger.debug(f"Releasing acquire {tag}/{acquire_token}")
657 self._semaphore.release()
660class SlidingWindowSemaphore(TaskSemaphore):
661 """A semaphore used to coordinate sequential resource access.
663 This class is similar to the stdlib BoundedSemaphore:
665 * It's initialized with a count.
666 * Each call to ``acquire()`` decrements the counter.
667 * If the count is at zero, then ``acquire()`` will either block until the
668 count increases, or if ``blocking=False``, then it will raise
669 a NoResourcesAvailable exception indicating that it failed to acquire the
670 semaphore.
672 The main difference is that this semaphore is used to limit
673 access to a resource that requires sequential access. For example,
674 if I want to access resource R that has 20 subresources R_0 - R_19,
675 this semaphore can also enforce that you only have a max range of
676 10 at any given point in time. You must also specify a tag name
677 when you acquire the semaphore. The sliding window semantics apply
678 on a per tag basis. The internal count will only be incremented
679 when the minimum sequence number for a tag is released.
681 """
683 def __init__(self, count):
684 self._count = count
685 # Dict[tag, next_sequence_number].
686 self._tag_sequences = defaultdict(int)
687 self._lowest_sequence = {}
688 self._lock = threading.Lock()
689 self._condition = threading.Condition(self._lock)
690 # Dict[tag, List[sequence_number]]
691 self._pending_release = {}
693 def current_count(self):
694 with self._lock:
695 return self._count
697 def acquire(self, tag, blocking=True):
698 logger.debug("Acquiring %s", tag)
699 self._condition.acquire()
700 try:
701 if self._count == 0:
702 if not blocking:
703 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
704 else:
705 while self._count == 0:
706 self._condition.wait()
707 # self._count is no longer zero.
708 # First, check if this is the first time we're seeing this tag.
709 sequence_number = self._tag_sequences[tag]
710 if sequence_number == 0:
711 # First time seeing the tag, so record we're at 0.
712 self._lowest_sequence[tag] = sequence_number
713 self._tag_sequences[tag] += 1
714 self._count -= 1
715 return sequence_number
716 finally:
717 self._condition.release()
719 def release(self, tag, acquire_token):
720 sequence_number = acquire_token
721 logger.debug("Releasing acquire %s/%s", tag, sequence_number)
722 self._condition.acquire()
723 try:
724 if tag not in self._tag_sequences:
725 raise ValueError(f"Attempted to release unknown tag: {tag}")
726 max_sequence = self._tag_sequences[tag]
727 if self._lowest_sequence[tag] == sequence_number:
728 # We can immediately process this request and free up
729 # resources.
730 self._lowest_sequence[tag] += 1
731 self._count += 1
732 self._condition.notify()
733 queued = self._pending_release.get(tag, [])
734 while queued:
735 if self._lowest_sequence[tag] == queued[-1]:
736 queued.pop()
737 self._lowest_sequence[tag] += 1
738 self._count += 1
739 else:
740 break
741 elif self._lowest_sequence[tag] < sequence_number < max_sequence:
742 # We can't do anything right now because we're still waiting
743 # for the min sequence for the tag to be released. We have
744 # to queue this for pending release.
745 self._pending_release.setdefault(tag, []).append(
746 sequence_number
747 )
748 self._pending_release[tag].sort(reverse=True)
749 else:
750 raise ValueError(
751 "Attempted to release unknown sequence number "
752 f"{sequence_number} for tag: {tag}"
753 )
754 finally:
755 self._condition.release()
758class ChunksizeAdjuster:
759 def __init__(
760 self,
761 max_size=MAX_SINGLE_UPLOAD_SIZE,
762 min_size=MIN_UPLOAD_CHUNKSIZE,
763 max_parts=MAX_PARTS,
764 ):
765 self.max_size = max_size
766 self.min_size = min_size
767 self.max_parts = max_parts
769 def adjust_chunksize(self, current_chunksize, file_size=None):
770 """Get a chunksize close to current that fits within all S3 limits.
772 :type current_chunksize: int
773 :param current_chunksize: The currently configured chunksize.
775 :type file_size: int or None
776 :param file_size: The size of the file to upload. This might be None
777 if the object being transferred has an unknown size.
779 :returns: A valid chunksize that fits within configured limits.
780 """
781 chunksize = current_chunksize
782 if file_size is not None:
783 chunksize = self._adjust_for_max_parts(chunksize, file_size)
784 return self._adjust_for_chunksize_limits(chunksize)
786 def _adjust_for_chunksize_limits(self, current_chunksize):
787 if current_chunksize > self.max_size:
788 logger.debug(
789 "Chunksize greater than maximum chunksize. "
790 f"Setting to {self.max_size} from {current_chunksize}."
791 )
792 return self.max_size
793 elif current_chunksize < self.min_size:
794 logger.debug(
795 "Chunksize less than minimum chunksize. "
796 f"Setting to {self.min_size} from {current_chunksize}."
797 )
798 return self.min_size
799 else:
800 return current_chunksize
802 def _adjust_for_max_parts(self, current_chunksize, file_size):
803 chunksize = current_chunksize
804 num_parts = int(math.ceil(file_size / float(chunksize)))
806 while num_parts > self.max_parts:
807 chunksize *= 2
808 num_parts = int(math.ceil(file_size / float(chunksize)))
810 if chunksize != current_chunksize:
811 logger.debug(
812 "Chunksize would result in the number of parts exceeding the "
813 f"maximum. Setting to {chunksize} from {current_chunksize}."
814 )
816 return chunksize
819def add_s3express_defaults(bucket, extra_args):
820 """
821 This function has been deprecated, but is kept for backwards compatibility.
822 This function is subject to removal in a future release.
823 """
824 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
825 # Default Transfer Operations to S3Express to use CRC32
826 extra_args["ChecksumAlgorithm"] = "crc32"
829def set_default_checksum_algorithm(extra_args):
830 """Set the default algorithm to CRC32 if not specified by the user."""
831 if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS):
832 return
833 extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM)
836# NOTE: The following interfaces are considered private and are subject
837# to abrupt breaking changes. Please do not use them directly.
839try:
840 from botocore.utils import create_nested_client as create_client
841except ImportError:
843 def create_client(session, *args, **kwargs):
844 return session.create_client(*args, **kwargs)
847def create_nested_client(session, service_name, **kwargs):
848 return create_client(session, service_name, **kwargs)