Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/utils.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

361 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import functools 

14import logging 

15import math 

16import os 

17import random 

18import socket 

19import stat 

20import string 

21import threading 

22from collections import defaultdict 

23 

24from botocore.exceptions import ( 

25 IncompleteReadError, 

26 ReadTimeoutError, 

27 ResponseStreamingError, 

28) 

29from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper 

30from botocore.utils import is_s3express_bucket 

31 

32from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file 

33from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS 

34 

35MAX_PARTS = 10000 

36# The maximum file size you can upload via S3 per request. 

37# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html 

38# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html 

39MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3) 

40MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2) 

41logger = logging.getLogger(__name__) 

42 

43 

44S3_RETRYABLE_DOWNLOAD_ERRORS = ( 

45 socket.timeout, 

46 SOCKET_ERROR, 

47 ReadTimeoutError, 

48 IncompleteReadError, 

49 ResponseStreamingError, 

50) 

51 

52 

53def random_file_extension(num_digits=8): 

54 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

55 

56 

57def signal_not_transferring(request, operation_name, **kwargs): 

58 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

59 request.body, 'signal_not_transferring' 

60 ): 

61 request.body.signal_not_transferring() 

62 

63 

64def signal_transferring(request, operation_name, **kwargs): 

65 if operation_name in ['PutObject', 'UploadPart']: 

66 body = request.body 

67 if isinstance(body, AwsChunkedWrapper): 

68 body = getattr(body, '_raw', None) 

69 if hasattr(body, 'signal_transferring'): 

70 body.signal_transferring() 

71 

72 

73def calculate_num_parts(size, part_size): 

74 return int(math.ceil(size / float(part_size))) 

75 

76 

77def calculate_range_parameter( 

78 part_size, part_index, num_parts, total_size=None 

79): 

80 """Calculate the range parameter for multipart downloads/copies 

81 

82 :type part_size: int 

83 :param part_size: The size of the part 

84 

85 :type part_index: int 

86 :param part_index: The index for which this parts starts. This index starts 

87 at zero 

88 

89 :type num_parts: int 

90 :param num_parts: The total number of parts in the transfer 

91 

92 :returns: The value to use for Range parameter on downloads or 

93 the CopySourceRange parameter for copies 

94 """ 

95 # Used to calculate the Range parameter 

96 start_range = part_index * part_size 

97 if part_index == num_parts - 1: 

98 end_range = '' 

99 if total_size is not None: 

100 end_range = str(total_size - 1) 

101 else: 

102 end_range = start_range + part_size - 1 

103 range_param = f'bytes={start_range}-{end_range}' 

104 return range_param 

105 

106 

107def get_callbacks(transfer_future, callback_type): 

108 """Retrieves callbacks from a subscriber 

109 

110 :type transfer_future: s3transfer.futures.TransferFuture 

111 :param transfer_future: The transfer future the subscriber is associated 

112 to. 

113 

114 :type callback_type: str 

115 :param callback_type: The type of callback to retrieve from the subscriber. 

116 Valid types include: 

117 * 'queued' 

118 * 'progress' 

119 * 'done' 

120 

121 :returns: A list of callbacks for the type specified. All callbacks are 

122 preinjected with the transfer future. 

123 """ 

124 callbacks = [] 

125 for subscriber in transfer_future.meta.call_args.subscribers: 

126 callback_name = 'on_' + callback_type 

127 if hasattr(subscriber, callback_name): 

128 callbacks.append( 

129 functools.partial( 

130 getattr(subscriber, callback_name), future=transfer_future 

131 ) 

132 ) 

133 return callbacks 

134 

135 

136def invoke_progress_callbacks(callbacks, bytes_transferred): 

137 """Calls all progress callbacks 

138 

139 :param callbacks: A list of progress callbacks to invoke 

140 :param bytes_transferred: The number of bytes transferred. This is passed 

141 to the callbacks. If no bytes were transferred the callbacks will not 

142 be invoked because no progress was achieved. It is also possible 

143 to receive a negative amount which comes from retrying a transfer 

144 request. 

145 """ 

146 # Only invoke the callbacks if bytes were actually transferred. 

147 if bytes_transferred: 

148 for callback in callbacks: 

149 callback(bytes_transferred=bytes_transferred) 

150 

151 

152def get_filtered_dict( 

153 original_dict, whitelisted_keys=None, blocklisted_keys=None 

154): 

155 """Gets a dictionary filtered by whitelisted and blocklisted keys. 

156 

157 :param original_dict: The original dictionary of arguments to source keys 

158 and values. 

159 :param whitelisted_key: A list of keys to include in the filtered 

160 dictionary. 

161 :param blocklisted_key: A list of keys to exclude in the filtered 

162 dictionary. 

163 

164 :returns: A dictionary containing key/values from the original dictionary 

165 whose key was included in the whitelist and/or not included in the 

166 blocklist. 

167 """ 

168 filtered_dict = {} 

169 for key, value in original_dict.items(): 

170 if (whitelisted_keys and key in whitelisted_keys) or ( 

171 blocklisted_keys and key not in blocklisted_keys 

172 ): 

173 filtered_dict[key] = value 

174 return filtered_dict 

175 

176 

177class CallArgs: 

178 def __init__(self, **kwargs): 

179 """A class that records call arguments 

180 

181 The call arguments must be passed as keyword arguments. It will set 

182 each keyword argument as an attribute of the object along with its 

183 associated value. 

184 """ 

185 for arg, value in kwargs.items(): 

186 setattr(self, arg, value) 

187 

188 

189class FunctionContainer: 

190 """An object that contains a function and any args or kwargs to call it 

191 

192 When called the provided function will be called with provided args 

193 and kwargs. 

194 """ 

195 

196 def __init__(self, func, *args, **kwargs): 

197 self._func = func 

198 self._args = args 

199 self._kwargs = kwargs 

200 

201 def __repr__(self): 

202 return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}' 

203 

204 def __call__(self): 

205 return self._func(*self._args, **self._kwargs) 

206 

207 

208class CountCallbackInvoker: 

209 """An abstraction to invoke a callback when a shared count reaches zero 

210 

211 :param callback: Callback invoke when finalized count reaches zero 

212 """ 

213 

214 def __init__(self, callback): 

215 self._lock = threading.Lock() 

216 self._callback = callback 

217 self._count = 0 

218 self._is_finalized = False 

219 

220 @property 

221 def current_count(self): 

222 with self._lock: 

223 return self._count 

224 

225 def increment(self): 

226 """Increment the count by one""" 

227 with self._lock: 

228 if self._is_finalized: 

229 raise RuntimeError( 

230 'Counter has been finalized it can no longer be ' 

231 'incremented.' 

232 ) 

233 self._count += 1 

234 

235 def decrement(self): 

236 """Decrement the count by one""" 

237 with self._lock: 

238 if self._count == 0: 

239 raise RuntimeError( 

240 'Counter is at zero. It cannot dip below zero' 

241 ) 

242 self._count -= 1 

243 if self._is_finalized and self._count == 0: 

244 self._callback() 

245 

246 def finalize(self): 

247 """Finalize the counter 

248 

249 Once finalized, the counter never be incremented and the callback 

250 can be invoked once the count reaches zero 

251 """ 

252 with self._lock: 

253 self._is_finalized = True 

254 if self._count == 0: 

255 self._callback() 

256 

257 

258class OSUtils: 

259 _MAX_FILENAME_LEN = 255 

260 

261 def get_file_size(self, filename): 

262 return os.path.getsize(filename) 

263 

264 def open_file_chunk_reader(self, filename, start_byte, size, callbacks): 

265 return ReadFileChunk.from_filename( 

266 filename, start_byte, size, callbacks, enable_callbacks=False 

267 ) 

268 

269 def open_file_chunk_reader_from_fileobj( 

270 self, 

271 fileobj, 

272 chunk_size, 

273 full_file_size, 

274 callbacks, 

275 close_callbacks=None, 

276 ): 

277 return ReadFileChunk( 

278 fileobj, 

279 chunk_size, 

280 full_file_size, 

281 callbacks=callbacks, 

282 enable_callbacks=False, 

283 close_callbacks=close_callbacks, 

284 ) 

285 

286 def open(self, filename, mode): 

287 return open(filename, mode) 

288 

289 def remove_file(self, filename): 

290 """Remove a file, noop if file does not exist.""" 

291 # Unlike os.remove, if the file does not exist, 

292 # then this method does nothing. 

293 try: 

294 os.remove(filename) 

295 except OSError: 

296 pass 

297 

298 def rename_file(self, current_filename, new_filename): 

299 rename_file(current_filename, new_filename) 

300 

301 def is_special_file(cls, filename): 

302 """Checks to see if a file is a special UNIX file. 

303 

304 It checks if the file is a character special device, block special 

305 device, FIFO, or socket. 

306 

307 :param filename: Name of the file 

308 

309 :returns: True if the file is a special file. False, if is not. 

310 """ 

311 # If it does not exist, it must be a new file so it cannot be 

312 # a special file. 

313 if not os.path.exists(filename): 

314 return False 

315 mode = os.stat(filename).st_mode 

316 # Character special device. 

317 if stat.S_ISCHR(mode): 

318 return True 

319 # Block special device 

320 if stat.S_ISBLK(mode): 

321 return True 

322 # Named pipe / FIFO 

323 if stat.S_ISFIFO(mode): 

324 return True 

325 # Socket. 

326 if stat.S_ISSOCK(mode): 

327 return True 

328 return False 

329 

330 def get_temp_filename(self, filename): 

331 suffix = os.extsep + random_file_extension() 

332 path = os.path.dirname(filename) 

333 name = os.path.basename(filename) 

334 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix 

335 return os.path.join(path, temp_filename) 

336 

337 def allocate(self, filename, size): 

338 try: 

339 with self.open(filename, 'wb') as f: 

340 fallocate(f, size) 

341 except OSError: 

342 self.remove_file(filename) 

343 raise 

344 

345 

346class DeferredOpenFile: 

347 def __init__(self, filename, start_byte=0, mode='rb', open_function=open): 

348 """A class that defers the opening of a file till needed 

349 

350 This is useful for deferring opening of a file till it is needed 

351 in a separate thread, as there is a limit of how many open files 

352 there can be in a single thread for most operating systems. The 

353 file gets opened in the following methods: ``read()``, ``seek()``, 

354 and ``__enter__()`` 

355 

356 :type filename: str 

357 :param filename: The name of the file to open 

358 

359 :type start_byte: int 

360 :param start_byte: The byte to seek to when the file is opened. 

361 

362 :type mode: str 

363 :param mode: The mode to use to open the file 

364 

365 :type open_function: function 

366 :param open_function: The function to use to open the file 

367 """ 

368 self._filename = filename 

369 self._fileobj = None 

370 self._start_byte = start_byte 

371 self._mode = mode 

372 self._open_function = open_function 

373 

374 def _open_if_needed(self): 

375 if self._fileobj is None: 

376 self._fileobj = self._open_function(self._filename, self._mode) 

377 if self._start_byte != 0: 

378 self._fileobj.seek(self._start_byte) 

379 

380 @property 

381 def name(self): 

382 return self._filename 

383 

384 def read(self, amount=None): 

385 self._open_if_needed() 

386 return self._fileobj.read(amount) 

387 

388 def write(self, data): 

389 self._open_if_needed() 

390 self._fileobj.write(data) 

391 

392 def seek(self, where, whence=0): 

393 self._open_if_needed() 

394 self._fileobj.seek(where, whence) 

395 

396 def tell(self): 

397 if self._fileobj is None: 

398 return self._start_byte 

399 return self._fileobj.tell() 

400 

401 def close(self): 

402 if self._fileobj: 

403 self._fileobj.close() 

404 

405 def __enter__(self): 

406 self._open_if_needed() 

407 return self 

408 

409 def __exit__(self, *args, **kwargs): 

410 self.close() 

411 

412 

413class ReadFileChunk: 

414 def __init__( 

415 self, 

416 fileobj, 

417 chunk_size, 

418 full_file_size, 

419 callbacks=None, 

420 enable_callbacks=True, 

421 close_callbacks=None, 

422 ): 

423 """ 

424 

425 Given a file object shown below:: 

426 

427 |___________________________________________________| 

428 0 | | full_file_size 

429 |----chunk_size---| 

430 f.tell() 

431 

432 :type fileobj: file 

433 :param fileobj: File like object 

434 

435 :type chunk_size: int 

436 :param chunk_size: The max chunk size to read. Trying to read 

437 pass the end of the chunk size will behave like you've 

438 reached the end of the file. 

439 

440 :type full_file_size: int 

441 :param full_file_size: The entire content length associated 

442 with ``fileobj``. 

443 

444 :type callbacks: A list of function(amount_read) 

445 :param callbacks: Called whenever data is read from this object in the 

446 order provided. 

447 

448 :type enable_callbacks: boolean 

449 :param enable_callbacks: True if to run callbacks. Otherwise, do not 

450 run callbacks 

451 

452 :type close_callbacks: A list of function() 

453 :param close_callbacks: Called when close is called. The function 

454 should take no arguments. 

455 """ 

456 self._fileobj = fileobj 

457 self._start_byte = self._fileobj.tell() 

458 self._size = self._calculate_file_size( 

459 self._fileobj, 

460 requested_size=chunk_size, 

461 start_byte=self._start_byte, 

462 actual_file_size=full_file_size, 

463 ) 

464 # _amount_read represents the position in the chunk and may exceed 

465 # the chunk size, but won't allow reads out of bounds. 

466 self._amount_read = 0 

467 self._callbacks = callbacks 

468 if callbacks is None: 

469 self._callbacks = [] 

470 self._callbacks_enabled = enable_callbacks 

471 self._close_callbacks = close_callbacks 

472 if close_callbacks is None: 

473 self._close_callbacks = close_callbacks 

474 

475 @classmethod 

476 def from_filename( 

477 cls, 

478 filename, 

479 start_byte, 

480 chunk_size, 

481 callbacks=None, 

482 enable_callbacks=True, 

483 ): 

484 """Convenience factory function to create from a filename. 

485 

486 :type start_byte: int 

487 :param start_byte: The first byte from which to start reading. 

488 

489 :type chunk_size: int 

490 :param chunk_size: The max chunk size to read. Trying to read 

491 pass the end of the chunk size will behave like you've 

492 reached the end of the file. 

493 

494 :type full_file_size: int 

495 :param full_file_size: The entire content length associated 

496 with ``fileobj``. 

497 

498 :type callbacks: function(amount_read) 

499 :param callbacks: Called whenever data is read from this object. 

500 

501 :type enable_callbacks: bool 

502 :param enable_callbacks: Indicate whether to invoke callback 

503 during read() calls. 

504 

505 :rtype: ``ReadFileChunk`` 

506 :return: A new instance of ``ReadFileChunk`` 

507 

508 """ 

509 f = open(filename, 'rb') 

510 f.seek(start_byte) 

511 file_size = os.fstat(f.fileno()).st_size 

512 return cls(f, chunk_size, file_size, callbacks, enable_callbacks) 

513 

514 def _calculate_file_size( 

515 self, fileobj, requested_size, start_byte, actual_file_size 

516 ): 

517 max_chunk_size = actual_file_size - start_byte 

518 return min(max_chunk_size, requested_size) 

519 

520 def read(self, amount=None): 

521 amount_left = max(self._size - self._amount_read, 0) 

522 if amount is None: 

523 amount_to_read = amount_left 

524 else: 

525 amount_to_read = min(amount_left, amount) 

526 data = self._fileobj.read(amount_to_read) 

527 self._amount_read += len(data) 

528 if self._callbacks is not None and self._callbacks_enabled: 

529 invoke_progress_callbacks(self._callbacks, len(data)) 

530 return data 

531 

532 def signal_transferring(self): 

533 self.enable_callback() 

534 if hasattr(self._fileobj, 'signal_transferring'): 

535 self._fileobj.signal_transferring() 

536 

537 def signal_not_transferring(self): 

538 self.disable_callback() 

539 if hasattr(self._fileobj, 'signal_not_transferring'): 

540 self._fileobj.signal_not_transferring() 

541 

542 def enable_callback(self): 

543 self._callbacks_enabled = True 

544 

545 def disable_callback(self): 

546 self._callbacks_enabled = False 

547 

548 def seek(self, where, whence=0): 

549 if whence not in (0, 1, 2): 

550 # Mimic io's error for invalid whence values 

551 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") 

552 

553 # Recalculate where based on chunk attributes so seek from file 

554 # start (whence=0) is always used 

555 where += self._start_byte 

556 if whence == 1: 

557 where += self._amount_read 

558 elif whence == 2: 

559 where += self._size 

560 

561 self._fileobj.seek(max(where, self._start_byte)) 

562 if self._callbacks is not None and self._callbacks_enabled: 

563 # To also rewind the callback() for an accurate progress report 

564 bounded_where = max(min(where - self._start_byte, self._size), 0) 

565 bounded_amount_read = min(self._amount_read, self._size) 

566 amount = bounded_where - bounded_amount_read 

567 invoke_progress_callbacks( 

568 self._callbacks, bytes_transferred=amount 

569 ) 

570 self._amount_read = max(where - self._start_byte, 0) 

571 

572 def close(self): 

573 if self._close_callbacks is not None and self._callbacks_enabled: 

574 for callback in self._close_callbacks: 

575 callback() 

576 self._fileobj.close() 

577 

578 def tell(self): 

579 return self._amount_read 

580 

581 def __len__(self): 

582 # __len__ is defined because requests will try to determine the length 

583 # of the stream to set a content length. In the normal case 

584 # of the file it will just stat the file, but we need to change that 

585 # behavior. By providing a __len__, requests will use that instead 

586 # of stat'ing the file. 

587 return self._size 

588 

589 def __enter__(self): 

590 return self 

591 

592 def __exit__(self, *args, **kwargs): 

593 self.close() 

594 

595 def __iter__(self): 

596 # This is a workaround for http://bugs.python.org/issue17575 

597 # Basically httplib will try to iterate over the contents, even 

598 # if its a file like object. This wasn't noticed because we've 

599 # already exhausted the stream so iterating over the file immediately 

600 # stops, which is what we're simulating here. 

601 return iter([]) 

602 

603 

604class StreamReaderProgress: 

605 """Wrapper for a read only stream that adds progress callbacks.""" 

606 

607 def __init__(self, stream, callbacks=None): 

608 self._stream = stream 

609 self._callbacks = callbacks 

610 if callbacks is None: 

611 self._callbacks = [] 

612 

613 def read(self, *args, **kwargs): 

614 value = self._stream.read(*args, **kwargs) 

615 invoke_progress_callbacks(self._callbacks, len(value)) 

616 return value 

617 

618 

619class NoResourcesAvailable(Exception): 

620 pass 

621 

622 

623class TaskSemaphore: 

624 def __init__(self, count): 

625 """A semaphore for the purpose of limiting the number of tasks 

626 

627 :param count: The size of semaphore 

628 """ 

629 self._semaphore = threading.Semaphore(count) 

630 

631 def acquire(self, tag, blocking=True): 

632 """Acquire the semaphore 

633 

634 :param tag: A tag identifying what is acquiring the semaphore. Note 

635 that this is not really needed to directly use this class but is 

636 needed for API compatibility with the SlidingWindowSemaphore 

637 implementation. 

638 :param block: If True, block until it can be acquired. If False, 

639 do not block and raise an exception if cannot be acquired. 

640 

641 :returns: A token (can be None) to use when releasing the semaphore 

642 """ 

643 logger.debug("Acquiring %s", tag) 

644 if not self._semaphore.acquire(blocking): 

645 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'") 

646 

647 def release(self, tag, acquire_token): 

648 """Release the semaphore 

649 

650 :param tag: A tag identifying what is releasing the semaphore 

651 :param acquire_token: The token returned from when the semaphore was 

652 acquired. Note that this is not really needed to directly use this 

653 class but is needed for API compatibility with the 

654 SlidingWindowSemaphore implementation. 

655 """ 

656 logger.debug(f"Releasing acquire {tag}/{acquire_token}") 

657 self._semaphore.release() 

658 

659 

660class SlidingWindowSemaphore(TaskSemaphore): 

661 """A semaphore used to coordinate sequential resource access. 

662 

663 This class is similar to the stdlib BoundedSemaphore: 

664 

665 * It's initialized with a count. 

666 * Each call to ``acquire()`` decrements the counter. 

667 * If the count is at zero, then ``acquire()`` will either block until the 

668 count increases, or if ``blocking=False``, then it will raise 

669 a NoResourcesAvailable exception indicating that it failed to acquire the 

670 semaphore. 

671 

672 The main difference is that this semaphore is used to limit 

673 access to a resource that requires sequential access. For example, 

674 if I want to access resource R that has 20 subresources R_0 - R_19, 

675 this semaphore can also enforce that you only have a max range of 

676 10 at any given point in time. You must also specify a tag name 

677 when you acquire the semaphore. The sliding window semantics apply 

678 on a per tag basis. The internal count will only be incremented 

679 when the minimum sequence number for a tag is released. 

680 

681 """ 

682 

683 def __init__(self, count): 

684 self._count = count 

685 # Dict[tag, next_sequence_number]. 

686 self._tag_sequences = defaultdict(int) 

687 self._lowest_sequence = {} 

688 self._lock = threading.Lock() 

689 self._condition = threading.Condition(self._lock) 

690 # Dict[tag, List[sequence_number]] 

691 self._pending_release = {} 

692 

693 def current_count(self): 

694 with self._lock: 

695 return self._count 

696 

697 def acquire(self, tag, blocking=True): 

698 logger.debug("Acquiring %s", tag) 

699 self._condition.acquire() 

700 try: 

701 if self._count == 0: 

702 if not blocking: 

703 raise NoResourcesAvailable(f"Cannot acquire tag '{tag}'") 

704 else: 

705 while self._count == 0: 

706 self._condition.wait() 

707 # self._count is no longer zero. 

708 # First, check if this is the first time we're seeing this tag. 

709 sequence_number = self._tag_sequences[tag] 

710 if sequence_number == 0: 

711 # First time seeing the tag, so record we're at 0. 

712 self._lowest_sequence[tag] = sequence_number 

713 self._tag_sequences[tag] += 1 

714 self._count -= 1 

715 return sequence_number 

716 finally: 

717 self._condition.release() 

718 

719 def release(self, tag, acquire_token): 

720 sequence_number = acquire_token 

721 logger.debug("Releasing acquire %s/%s", tag, sequence_number) 

722 self._condition.acquire() 

723 try: 

724 if tag not in self._tag_sequences: 

725 raise ValueError(f"Attempted to release unknown tag: {tag}") 

726 max_sequence = self._tag_sequences[tag] 

727 if self._lowest_sequence[tag] == sequence_number: 

728 # We can immediately process this request and free up 

729 # resources. 

730 self._lowest_sequence[tag] += 1 

731 self._count += 1 

732 self._condition.notify() 

733 queued = self._pending_release.get(tag, []) 

734 while queued: 

735 if self._lowest_sequence[tag] == queued[-1]: 

736 queued.pop() 

737 self._lowest_sequence[tag] += 1 

738 self._count += 1 

739 else: 

740 break 

741 elif self._lowest_sequence[tag] < sequence_number < max_sequence: 

742 # We can't do anything right now because we're still waiting 

743 # for the min sequence for the tag to be released. We have 

744 # to queue this for pending release. 

745 self._pending_release.setdefault(tag, []).append( 

746 sequence_number 

747 ) 

748 self._pending_release[tag].sort(reverse=True) 

749 else: 

750 raise ValueError( 

751 "Attempted to release unknown sequence number " 

752 f"{sequence_number} for tag: {tag}" 

753 ) 

754 finally: 

755 self._condition.release() 

756 

757 

758class ChunksizeAdjuster: 

759 def __init__( 

760 self, 

761 max_size=MAX_SINGLE_UPLOAD_SIZE, 

762 min_size=MIN_UPLOAD_CHUNKSIZE, 

763 max_parts=MAX_PARTS, 

764 ): 

765 self.max_size = max_size 

766 self.min_size = min_size 

767 self.max_parts = max_parts 

768 

769 def adjust_chunksize(self, current_chunksize, file_size=None): 

770 """Get a chunksize close to current that fits within all S3 limits. 

771 

772 :type current_chunksize: int 

773 :param current_chunksize: The currently configured chunksize. 

774 

775 :type file_size: int or None 

776 :param file_size: The size of the file to upload. This might be None 

777 if the object being transferred has an unknown size. 

778 

779 :returns: A valid chunksize that fits within configured limits. 

780 """ 

781 chunksize = current_chunksize 

782 if file_size is not None: 

783 chunksize = self._adjust_for_max_parts(chunksize, file_size) 

784 return self._adjust_for_chunksize_limits(chunksize) 

785 

786 def _adjust_for_chunksize_limits(self, current_chunksize): 

787 if current_chunksize > self.max_size: 

788 logger.debug( 

789 "Chunksize greater than maximum chunksize. " 

790 f"Setting to {self.max_size} from {current_chunksize}." 

791 ) 

792 return self.max_size 

793 elif current_chunksize < self.min_size: 

794 logger.debug( 

795 "Chunksize less than minimum chunksize. " 

796 f"Setting to {self.min_size} from {current_chunksize}." 

797 ) 

798 return self.min_size 

799 else: 

800 return current_chunksize 

801 

802 def _adjust_for_max_parts(self, current_chunksize, file_size): 

803 chunksize = current_chunksize 

804 num_parts = int(math.ceil(file_size / float(chunksize))) 

805 

806 while num_parts > self.max_parts: 

807 chunksize *= 2 

808 num_parts = int(math.ceil(file_size / float(chunksize))) 

809 

810 if chunksize != current_chunksize: 

811 logger.debug( 

812 "Chunksize would result in the number of parts exceeding the " 

813 f"maximum. Setting to {chunksize} from {current_chunksize}." 

814 ) 

815 

816 return chunksize 

817 

818 

819def add_s3express_defaults(bucket, extra_args): 

820 """ 

821 This function has been deprecated, but is kept for backwards compatibility. 

822 This function is subject to removal in a future release. 

823 """ 

824 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args: 

825 # Default Transfer Operations to S3Express to use CRC32 

826 extra_args["ChecksumAlgorithm"] = "crc32" 

827 

828 

829def set_default_checksum_algorithm(extra_args): 

830 """Set the default algorithm to CRC32 if not specified by the user.""" 

831 if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): 

832 return 

833 extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM)