1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import functools
14import logging
15import math
16import os
17import random
18import socket
19import stat
20import string
21import threading
22from collections import defaultdict
23
24from botocore.exceptions import (
25 IncompleteReadError,
26 ReadTimeoutError,
27 ResponseStreamingError,
28)
29from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper
30from botocore.utils import is_s3express_bucket
31
32from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
33from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
34
35MAX_PARTS = 10000
36# The maximum file size you can upload via S3 per request.
37# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
38# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
39MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
40MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
41logger = logging.getLogger(__name__)
42
43
44S3_RETRYABLE_DOWNLOAD_ERRORS = (
45 socket.timeout,
46 SOCKET_ERROR,
47 ReadTimeoutError,
48 IncompleteReadError,
49 ResponseStreamingError,
50)
51
52
53def random_file_extension(num_digits=8):
54 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
55
56
57def signal_not_transferring(request, operation_name, **kwargs):
58 if operation_name in ['PutObject', 'UploadPart'] and hasattr(
59 request.body, 'signal_not_transferring'
60 ):
61 request.body.signal_not_transferring()
62
63
64def signal_transferring(request, operation_name, **kwargs):
65 if operation_name in ['PutObject', 'UploadPart']:
66 body = request.body
67 if isinstance(body, AwsChunkedWrapper):
68 body = getattr(body, '_raw', None)
69 if hasattr(body, 'signal_transferring'):
70 body.signal_transferring()
71
72
73def calculate_num_parts(size, part_size):
74 return int(math.ceil(size / float(part_size)))
75
76
77def calculate_range_parameter(
78 part_size, part_index, num_parts, total_size=None
79):
80 """Calculate the range parameter for multipart downloads/copies
81
82 :type part_size: int
83 :param part_size: The size of the part
84
85 :type part_index: int
86 :param part_index: The index for which this parts starts. This index starts
87 at zero
88
89 :type num_parts: int
90 :param num_parts: The total number of parts in the transfer
91
92 :returns: The value to use for Range parameter on downloads or
93 the CopySourceRange parameter for copies
94 """
95 # Used to calculate the Range parameter
96 start_range = part_index * part_size
97 if part_index == num_parts - 1:
98 end_range = ''
99 if total_size is not None:
100 end_range = str(total_size - 1)
101 else:
102 end_range = start_range + part_size - 1
103 range_param = f'bytes={start_range}-{end_range}'
104 return range_param
105
106
107def get_callbacks(transfer_future, callback_type):
108 """Retrieves callbacks from a subscriber
109
110 :type transfer_future: s3transfer.futures.TransferFuture
111 :param transfer_future: The transfer future the subscriber is associated
112 to.
113
114 :type callback_type: str
115 :param callback_type: The type of callback to retrieve from the subscriber.
116 Valid types include:
117 * 'queued'
118 * 'progress'
119 * 'done'
120
121 :returns: A list of callbacks for the type specified. All callbacks are
122 preinjected with the transfer future.
123 """
124 callbacks = []
125 for subscriber in transfer_future.meta.call_args.subscribers:
126 callback_name = 'on_' + callback_type
127 if hasattr(subscriber, callback_name):
128 callbacks.append(
129 functools.partial(
130 getattr(subscriber, callback_name), future=transfer_future
131 )
132 )
133 return callbacks
134
135
136def invoke_progress_callbacks(callbacks, bytes_transferred):
137 """Calls all progress callbacks
138
139 :param callbacks: A list of progress callbacks to invoke
140 :param bytes_transferred: The number of bytes transferred. This is passed
141 to the callbacks. If no bytes were transferred the callbacks will not
142 be invoked because no progress was achieved. It is also possible
143 to receive a negative amount which comes from retrying a transfer
144 request.
145 """
146 # Only invoke the callbacks if bytes were actually transferred.
147 if bytes_transferred:
148 for callback in callbacks:
149 callback(bytes_transferred=bytes_transferred)
150
151
152def get_filtered_dict(
153 original_dict, whitelisted_keys=None, blocklisted_keys=None
154):
155 """Gets a dictionary filtered by whitelisted and blocklisted keys.
156
157 :param original_dict: The original dictionary of arguments to source keys
158 and values.
159 :param whitelisted_key: A list of keys to include in the filtered
160 dictionary.
161 :param blocklisted_key: A list of keys to exclude in the filtered
162 dictionary.
163
164 :returns: A dictionary containing key/values from the original dictionary
165 whose key was included in the whitelist and/or not included in the
166 blocklist.
167 """
168 filtered_dict = {}
169 for key, value in original_dict.items():
170 if (whitelisted_keys and key in whitelisted_keys) or (
171 blocklisted_keys and key not in blocklisted_keys
172 ):
173 filtered_dict[key] = value
174 return filtered_dict
175
176
177class CallArgs:
178 def __init__(self, **kwargs):
179 """A class that records call arguments
180
181 The call arguments must be passed as keyword arguments. It will set
182 each keyword argument as an attribute of the object along with its
183 associated value.
184 """
185 for arg, value in kwargs.items():
186 setattr(self, arg, value)
187
188
189class FunctionContainer:
190 """An object that contains a function and any args or kwargs to call it
191
192 When called the provided function will be called with provided args
193 and kwargs.
194 """
195
196 def __init__(self, func, *args, **kwargs):
197 self._func = func
198 self._args = args
199 self._kwargs = kwargs
200
201 def __repr__(self):
202 return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}'
203
204 def __call__(self):
205 return self._func(*self._args, **self._kwargs)
206
207
208class CountCallbackInvoker:
209 """An abstraction to invoke a callback when a shared count reaches zero
210
211 :param callback: Callback invoke when finalized count reaches zero
212 """
213
214 def __init__(self, callback):
215 self._lock = threading.Lock()
216 self._callback = callback
217 self._count = 0
218 self._is_finalized = False
219
220 @property
221 def current_count(self):
222 with self._lock:
223 return self._count
224
225 def increment(self):
226 """Increment the count by one"""
227 with self._lock:
228 if self._is_finalized:
229 raise RuntimeError(
230 'Counter has been finalized it can no longer be '
231 'incremented.'
232 )
233 self._count += 1
234
235 def decrement(self):
236 """Decrement the count by one"""
237 with self._lock:
238 if self._count == 0:
239 raise RuntimeError(
240 'Counter is at zero. It cannot dip below zero'
241 )
242 self._count -= 1
243 if self._is_finalized and self._count == 0:
244 self._callback()
245
246 def finalize(self):
247 """Finalize the counter
248
249 Once finalized, the counter never be incremented and the callback
250 can be invoked once the count reaches zero
251 """
252 with self._lock:
253 self._is_finalized = True
254 if self._count == 0:
255 self._callback()
256
257
258class OSUtils:
259 _MAX_FILENAME_LEN = 255
260
261 def get_file_size(self, filename):
262 return os.path.getsize(filename)
263
264 def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
265 return ReadFileChunk.from_filename(
266 filename, start_byte, size, callbacks, enable_callbacks=False
267 )
268
269 def open_file_chunk_reader_from_fileobj(
270 self,
271 fileobj,
272 chunk_size,
273 full_file_size,
274 callbacks,
275 close_callbacks=None,
276 ):
277 return ReadFileChunk(
278 fileobj,
279 chunk_size,
280 full_file_size,
281 callbacks=callbacks,
282 enable_callbacks=False,
283 close_callbacks=close_callbacks,
284 )
285
286 def open(self, filename, mode):
287 return open(filename, mode)
288
289 def remove_file(self, filename):
290 """Remove a file, noop if file does not exist."""
291 # Unlike os.remove, if the file does not exist,
292 # then this method does nothing.
293 try:
294 os.remove(filename)
295 except OSError:
296 pass
297
298 def rename_file(self, current_filename, new_filename):
299 rename_file(current_filename, new_filename)
300
301 def is_special_file(cls, filename):
302 """Checks to see if a file is a special UNIX file.
303
304 It checks if the file is a character special device, block special
305 device, FIFO, or socket.
306
307 :param filename: Name of the file
308
309 :returns: True if the file is a special file. False, if is not.
310 """
311 # If it does not exist, it must be a new file so it cannot be
312 # a special file.
313 if not os.path.exists(filename):
314 return False
315 mode = os.stat(filename).st_mode
316 # Character special device.
317 if stat.S_ISCHR(mode):
318 return True
319 # Block special device
320 if stat.S_ISBLK(mode):
321 return True
322 # Named pipe / FIFO
323 if stat.S_ISFIFO(mode):
324 return True
325 # Socket.
326 if stat.S_ISSOCK(mode):
327 return True
328 return False
329
330 def get_temp_filename(self, filename):
331 suffix = os.extsep + random_file_extension()
332 path = os.path.dirname(filename)
333 name = os.path.basename(filename)
334 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
335 return os.path.join(path, temp_filename)
336
337 def allocate(self, filename, size):
338 try:
339 with self.open(filename, 'wb') as f:
340 fallocate(f, size)
341 except OSError:
342 self.remove_file(filename)
343 raise
344
345
346class DeferredOpenFile:
347 def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
348 """A class that defers the opening of a file till needed
349
350 This is useful for deferring opening of a file till it is needed
351 in a separate thread, as there is a limit of how many open files
352 there can be in a single thread for most operating systems. The
353 file gets opened in the following methods: ``read()``, ``seek()``,
354 and ``__enter__()``
355
356 :type filename: str
357 :param filename: The name of the file to open
358
359 :type start_byte: int
360 :param start_byte: The byte to seek to when the file is opened.
361
362 :type mode: str
363 :param mode: The mode to use to open the file
364
365 :type open_function: function
366 :param open_function: The function to use to open the file
367 """
368 self._filename = filename
369 self._fileobj = None
370 self._start_byte = start_byte
371 self._mode = mode
372 self._open_function = open_function
373
374 def _open_if_needed(self):
375 if self._fileobj is None:
376 self._fileobj = self._open_function(self._filename, self._mode)
377 if self._start_byte != 0:
378 self._fileobj.seek(self._start_byte)
379
380 @property
381 def name(self):
382 return self._filename
383
384 def read(self, amount=None):
385 self._open_if_needed()
386 return self._fileobj.read(amount)
387
388 def write(self, data):
389 self._open_if_needed()
390 self._fileobj.write(data)
391
392 def seek(self, where, whence=0):
393 self._open_if_needed()
394 self._fileobj.seek(where, whence)
395
396 def tell(self):
397 if self._fileobj is None:
398 return self._start_byte
399 return self._fileobj.tell()
400
401 def close(self):
402 if self._fileobj:
403 self._fileobj.close()
404
405 def __enter__(self):
406 self._open_if_needed()
407 return self
408
409 def __exit__(self, *args, **kwargs):
410 self.close()
411
412
413class ReadFileChunk:
414 def __init__(
415 self,
416 fileobj,
417 chunk_size,
418 full_file_size,
419 callbacks=None,
420 enable_callbacks=True,
421 close_callbacks=None,
422 ):
423 """
424
425 Given a file object shown below::
426
427 |___________________________________________________|
428 0 | | full_file_size
429 |----chunk_size---|
430 f.tell()
431
432 :type fileobj: file
433 :param fileobj: File like object
434
435 :type chunk_size: int
436 :param chunk_size: The max chunk size to read. Trying to read
437 pass the end of the chunk size will behave like you've
438 reached the end of the file.
439
440 :type full_file_size: int
441 :param full_file_size: The entire content length associated
442 with ``fileobj``.
443
444 :type callbacks: A list of function(amount_read)
445 :param callbacks: Called whenever data is read from this object in the
446 order provided.
447
448 :type enable_callbacks: boolean
449 :param enable_callbacks: True if to run callbacks. Otherwise, do not
450 run callbacks
451
452 :type close_callbacks: A list of function()
453 :param close_callbacks: Called when close is called. The function
454 should take no arguments.
455 """
456 self._fileobj = fileobj
457 self._start_byte = self._fileobj.tell()
458 self._size = self._calculate_file_size(
459 self._fileobj,
460 requested_size=chunk_size,
461 start_byte=self._start_byte,
462 actual_file_size=full_file_size,
463 )
464 # _amount_read represents the position in the chunk and may exceed
465 # the chunk size, but won't allow reads out of bounds.
466 self._amount_read = 0
467 self._callbacks = callbacks
468 if callbacks is None:
469 self._callbacks = []
470 self._callbacks_enabled = enable_callbacks
471 self._close_callbacks = close_callbacks
472 if close_callbacks is None:
473 self._close_callbacks = close_callbacks
474
475 @classmethod
476 def from_filename(
477 cls,
478 filename,
479 start_byte,
480 chunk_size,
481 callbacks=None,
482 enable_callbacks=True,
483 ):
484 """Convenience factory function to create from a filename.
485
486 :type start_byte: int
487 :param start_byte: The first byte from which to start reading.
488
489 :type chunk_size: int
490 :param chunk_size: The max chunk size to read. Trying to read
491 pass the end of the chunk size will behave like you've
492 reached the end of the file.
493
494 :type full_file_size: int
495 :param full_file_size: The entire content length associated
496 with ``fileobj``.
497
498 :type callbacks: function(amount_read)
499 :param callbacks: Called whenever data is read from this object.
500
501 :type enable_callbacks: bool
502 :param enable_callbacks: Indicate whether to invoke callback
503 during read() calls.
504
505 :rtype: ``ReadFileChunk``
506 :return: A new instance of ``ReadFileChunk``
507
508 """
509 f = open(filename, 'rb')
510 f.seek(start_byte)
511 file_size = os.fstat(f.fileno()).st_size
512 return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
513
514 def _calculate_file_size(
515 self, fileobj, requested_size, start_byte, actual_file_size
516 ):
517 max_chunk_size = actual_file_size - start_byte
518 return min(max_chunk_size, requested_size)
519
520 def read(self, amount=None):
521 amount_left = max(self._size - self._amount_read, 0)
522 if amount is None:
523 amount_to_read = amount_left
524 else:
525 amount_to_read = min(amount_left, amount)
526 data = self._fileobj.read(amount_to_read)
527 self._amount_read += len(data)
528 if self._callbacks is not None and self._callbacks_enabled:
529 invoke_progress_callbacks(self._callbacks, len(data))
530 return data
531
532 def signal_transferring(self):
533 self.enable_callback()
534 if hasattr(self._fileobj, 'signal_transferring'):
535 self._fileobj.signal_transferring()
536
537 def signal_not_transferring(self):
538 self.disable_callback()
539 if hasattr(self._fileobj, 'signal_not_transferring'):
540 self._fileobj.signal_not_transferring()
541
542 def enable_callback(self):
543 self._callbacks_enabled = True
544
545 def disable_callback(self):
546 self._callbacks_enabled = False
547
548 def seek(self, where, whence=0):
549 if whence not in (0, 1, 2):
550 # Mimic io's error for invalid whence values
551 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
552
553 # Recalculate where based on chunk attributes so seek from file
554 # start (whence=0) is always used
555 where += self._start_byte
556 if whence == 1:
557 where += self._amount_read
558 elif whence == 2:
559 where += self._size
560
561 self._fileobj.seek(max(where, self._start_byte))
562 if self._callbacks is not None and self._callbacks_enabled:
563 # To also rewind the callback() for an accurate progress report
564 bounded_where = max(min(where - self._start_byte, self._size), 0)
565 bounded_amount_read = min(self._amount_read, self._size)
566 amount = bounded_where - bounded_amount_read
567 invoke_progress_callbacks(
568 self._callbacks, bytes_transferred=amount
569 )
570 self._amount_read = max(where - self._start_byte, 0)
571
572 def close(self):
573 if self._close_callbacks is not None and self._callbacks_enabled:
574 for callback in self._close_callbacks:
575 callback()
576 self._fileobj.close()
577
578 def tell(self):
579 return self._amount_read
580
581 def __len__(self):
582 # __len__ is defined because requests will try to determine the length
583 # of the stream to set a content length. In the normal case
584 # of the file it will just stat the file, but we need to change that
585 # behavior. By providing a __len__, requests will use that instead
586 # of stat'ing the file.
587 return self._size
588
589 def __enter__(self):
590 return self
591
592 def __exit__(self, *args, **kwargs):
593 self.close()
594
595 def __iter__(self):
596 # This is a workaround for http://bugs.python.org/issue17575
597 # Basically httplib will try to iterate over the contents, even
598 # if its a file like object. This wasn't noticed because we've
599 # already exhausted the stream so iterating over the file immediately
600 # stops, which is what we're simulating here.
601 return iter([])
602
603
604class StreamReaderProgress:
605 """Wrapper for a read only stream that adds progress callbacks."""
606
607 def __init__(self, stream, callbacks=None):
608 self._stream = stream
609 self._callbacks = callbacks
610 if callbacks is None:
611 self._callbacks = []
612
613 def read(self, *args, **kwargs):
614 value = self._stream.read(*args, **kwargs)
615 invoke_progress_callbacks(self._callbacks, len(value))
616 return value
617
618
619class NoResourcesAvailable(Exception):
620 pass
621
622
623class TaskSemaphore:
624 def __init__(self, count):
625 """A semaphore for the purpose of limiting the number of tasks
626
627 :param count: The size of semaphore
628 """
629 self._semaphore = threading.Semaphore(count)
630
631 def acquire(self, tag, blocking=True):
632 """Acquire the semaphore
633
634 :param tag: A tag identifying what is acquiring the semaphore. Note
635 that this is not really needed to directly use this class but is
636 needed for API compatibility with the SlidingWindowSemaphore
637 implementation.
638 :param block: If True, block until it can be acquired. If False,
639 do not block and raise an exception if cannot be acquired.
640
641 :returns: A token (can be None) to use when releasing the semaphore
642 """
643 logger.debug("Acquiring %s", tag)
644 if not self._semaphore.acquire(blocking):
645 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
646
647 def release(self, tag, acquire_token):
648 """Release the semaphore
649
650 :param tag: A tag identifying what is releasing the semaphore
651 :param acquire_token: The token returned from when the semaphore was
652 acquired. Note that this is not really needed to directly use this
653 class but is needed for API compatibility with the
654 SlidingWindowSemaphore implementation.
655 """
656 logger.debug(f"Releasing acquire {tag}/{acquire_token}")
657 self._semaphore.release()
658
659
660class SlidingWindowSemaphore(TaskSemaphore):
661 """A semaphore used to coordinate sequential resource access.
662
663 This class is similar to the stdlib BoundedSemaphore:
664
665 * It's initialized with a count.
666 * Each call to ``acquire()`` decrements the counter.
667 * If the count is at zero, then ``acquire()`` will either block until the
668 count increases, or if ``blocking=False``, then it will raise
669 a NoResourcesAvailable exception indicating that it failed to acquire the
670 semaphore.
671
672 The main difference is that this semaphore is used to limit
673 access to a resource that requires sequential access. For example,
674 if I want to access resource R that has 20 subresources R_0 - R_19,
675 this semaphore can also enforce that you only have a max range of
676 10 at any given point in time. You must also specify a tag name
677 when you acquire the semaphore. The sliding window semantics apply
678 on a per tag basis. The internal count will only be incremented
679 when the minimum sequence number for a tag is released.
680
681 """
682
683 def __init__(self, count):
684 self._count = count
685 # Dict[tag, next_sequence_number].
686 self._tag_sequences = defaultdict(int)
687 self._lowest_sequence = {}
688 self._lock = threading.Lock()
689 self._condition = threading.Condition(self._lock)
690 # Dict[tag, List[sequence_number]]
691 self._pending_release = {}
692
693 def current_count(self):
694 with self._lock:
695 return self._count
696
697 def acquire(self, tag, blocking=True):
698 logger.debug("Acquiring %s", tag)
699 self._condition.acquire()
700 try:
701 if self._count == 0:
702 if not blocking:
703 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'")
704 else:
705 while self._count == 0:
706 self._condition.wait()
707 # self._count is no longer zero.
708 # First, check if this is the first time we're seeing this tag.
709 sequence_number = self._tag_sequences[tag]
710 if sequence_number == 0:
711 # First time seeing the tag, so record we're at 0.
712 self._lowest_sequence[tag] = sequence_number
713 self._tag_sequences[tag] += 1
714 self._count -= 1
715 return sequence_number
716 finally:
717 self._condition.release()
718
719 def release(self, tag, acquire_token):
720 sequence_number = acquire_token
721 logger.debug("Releasing acquire %s/%s", tag, sequence_number)
722 self._condition.acquire()
723 try:
724 if tag not in self._tag_sequences:
725 raise ValueError(f"Attempted to release unknown tag: {tag}")
726 max_sequence = self._tag_sequences[tag]
727 if self._lowest_sequence[tag] == sequence_number:
728 # We can immediately process this request and free up
729 # resources.
730 self._lowest_sequence[tag] += 1
731 self._count += 1
732 self._condition.notify()
733 queued = self._pending_release.get(tag, [])
734 while queued:
735 if self._lowest_sequence[tag] == queued[-1]:
736 queued.pop()
737 self._lowest_sequence[tag] += 1
738 self._count += 1
739 else:
740 break
741 elif self._lowest_sequence[tag] < sequence_number < max_sequence:
742 # We can't do anything right now because we're still waiting
743 # for the min sequence for the tag to be released. We have
744 # to queue this for pending release.
745 self._pending_release.setdefault(tag, []).append(
746 sequence_number
747 )
748 self._pending_release[tag].sort(reverse=True)
749 else:
750 raise ValueError(
751 "Attempted to release unknown sequence number "
752 f"{sequence_number} for tag: {tag}"
753 )
754 finally:
755 self._condition.release()
756
757
758class ChunksizeAdjuster:
759 def __init__(
760 self,
761 max_size=MAX_SINGLE_UPLOAD_SIZE,
762 min_size=MIN_UPLOAD_CHUNKSIZE,
763 max_parts=MAX_PARTS,
764 ):
765 self.max_size = max_size
766 self.min_size = min_size
767 self.max_parts = max_parts
768
769 def adjust_chunksize(self, current_chunksize, file_size=None):
770 """Get a chunksize close to current that fits within all S3 limits.
771
772 :type current_chunksize: int
773 :param current_chunksize: The currently configured chunksize.
774
775 :type file_size: int or None
776 :param file_size: The size of the file to upload. This might be None
777 if the object being transferred has an unknown size.
778
779 :returns: A valid chunksize that fits within configured limits.
780 """
781 chunksize = current_chunksize
782 if file_size is not None:
783 chunksize = self._adjust_for_max_parts(chunksize, file_size)
784 return self._adjust_for_chunksize_limits(chunksize)
785
786 def _adjust_for_chunksize_limits(self, current_chunksize):
787 if current_chunksize > self.max_size:
788 logger.debug(
789 "Chunksize greater than maximum chunksize. "
790 f"Setting to {self.max_size} from {current_chunksize}."
791 )
792 return self.max_size
793 elif current_chunksize < self.min_size:
794 logger.debug(
795 "Chunksize less than minimum chunksize. "
796 f"Setting to {self.min_size} from {current_chunksize}."
797 )
798 return self.min_size
799 else:
800 return current_chunksize
801
802 def _adjust_for_max_parts(self, current_chunksize, file_size):
803 chunksize = current_chunksize
804 num_parts = int(math.ceil(file_size / float(chunksize)))
805
806 while num_parts > self.max_parts:
807 chunksize *= 2
808 num_parts = int(math.ceil(file_size / float(chunksize)))
809
810 if chunksize != current_chunksize:
811 logger.debug(
812 "Chunksize would result in the number of parts exceeding the "
813 f"maximum. Setting to {chunksize} from {current_chunksize}."
814 )
815
816 return chunksize
817
818
819def add_s3express_defaults(bucket, extra_args):
820 """
821 This function has been deprecated, but is kept for backwards compatibility.
822 This function is subject to removal in a future release.
823 """
824 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
825 # Default Transfer Operations to S3Express to use CRC32
826 extra_args["ChecksumAlgorithm"] = "crc32"
827
828
829def set_default_checksum_algorithm(extra_args):
830 """Set the default algorithm to CRC32 if not specified by the user."""
831 if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS):
832 return
833 extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM)