Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/utils.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

355 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import functools 

14import logging 

15import math 

16import os 

17import random 

18import socket 

19import stat 

20import string 

21import threading 

22from collections import defaultdict 

23 

24from botocore.exceptions import ( 

25 IncompleteReadError, 

26 ReadTimeoutError, 

27 ResponseStreamingError, 

28) 

29from botocore.httpchecksum import AwsChunkedWrapper 

30from botocore.utils import is_s3express_bucket 

31 

32from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file 

33 

34MAX_PARTS = 10000 

35# The maximum file size you can upload via S3 per request. 

36# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html 

37# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html 

38MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3) 

39MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2) 

40logger = logging.getLogger(__name__) 

41 

42 

43S3_RETRYABLE_DOWNLOAD_ERRORS = ( 

44 socket.timeout, 

45 SOCKET_ERROR, 

46 ReadTimeoutError, 

47 IncompleteReadError, 

48 ResponseStreamingError, 

49) 

50 

51 

52def random_file_extension(num_digits=8): 

53 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

54 

55 

56def signal_not_transferring(request, operation_name, **kwargs): 

57 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

58 request.body, 'signal_not_transferring' 

59 ): 

60 request.body.signal_not_transferring() 

61 

62 

63def signal_transferring(request, operation_name, **kwargs): 

64 if operation_name in ['PutObject', 'UploadPart']: 

65 body = request.body 

66 if isinstance(body, AwsChunkedWrapper): 

67 body = getattr(body, '_raw', None) 

68 if hasattr(body, 'signal_transferring'): 

69 body.signal_transferring() 

70 

71 

72def calculate_num_parts(size, part_size): 

73 return int(math.ceil(size / float(part_size))) 

74 

75 

76def calculate_range_parameter( 

77 part_size, part_index, num_parts, total_size=None 

78): 

79 """Calculate the range parameter for multipart downloads/copies 

80 

81 :type part_size: int 

82 :param part_size: The size of the part 

83 

84 :type part_index: int 

85 :param part_index: The index for which this parts starts. This index starts 

86 at zero 

87 

88 :type num_parts: int 

89 :param num_parts: The total number of parts in the transfer 

90 

91 :returns: The value to use for Range parameter on downloads or 

92 the CopySourceRange parameter for copies 

93 """ 

94 # Used to calculate the Range parameter 

95 start_range = part_index * part_size 

96 if part_index == num_parts - 1: 

97 end_range = '' 

98 if total_size is not None: 

99 end_range = str(total_size - 1) 

100 else: 

101 end_range = start_range + part_size - 1 

102 range_param = f'bytes={start_range}-{end_range}' 

103 return range_param 

104 

105 

106def get_callbacks(transfer_future, callback_type): 

107 """Retrieves callbacks from a subscriber 

108 

109 :type transfer_future: s3transfer.futures.TransferFuture 

110 :param transfer_future: The transfer future the subscriber is associated 

111 to. 

112 

113 :type callback_type: str 

114 :param callback_type: The type of callback to retrieve from the subscriber. 

115 Valid types include: 

116 * 'queued' 

117 * 'progress' 

118 * 'done' 

119 

120 :returns: A list of callbacks for the type specified. All callbacks are 

121 preinjected with the transfer future. 

122 """ 

123 callbacks = [] 

124 for subscriber in transfer_future.meta.call_args.subscribers: 

125 callback_name = 'on_' + callback_type 

126 if hasattr(subscriber, callback_name): 

127 callbacks.append( 

128 functools.partial( 

129 getattr(subscriber, callback_name), future=transfer_future 

130 ) 

131 ) 

132 return callbacks 

133 

134 

135def invoke_progress_callbacks(callbacks, bytes_transferred): 

136 """Calls all progress callbacks 

137 

138 :param callbacks: A list of progress callbacks to invoke 

139 :param bytes_transferred: The number of bytes transferred. This is passed 

140 to the callbacks. If no bytes were transferred the callbacks will not 

141 be invoked because no progress was achieved. It is also possible 

142 to receive a negative amount which comes from retrying a transfer 

143 request. 

144 """ 

145 # Only invoke the callbacks if bytes were actually transferred. 

146 if bytes_transferred: 

147 for callback in callbacks: 

148 callback(bytes_transferred=bytes_transferred) 

149 

150 

151def get_filtered_dict(original_dict, whitelisted_keys): 

152 """Gets a dictionary filtered by whitelisted keys 

153 

154 :param original_dict: The original dictionary of arguments to source keys 

155 and values. 

156 :param whitelisted_key: A list of keys to include in the filtered 

157 dictionary. 

158 

159 :returns: A dictionary containing key/values from the original dictionary 

160 whose key was included in the whitelist 

161 """ 

162 filtered_dict = {} 

163 for key, value in original_dict.items(): 

164 if key in whitelisted_keys: 

165 filtered_dict[key] = value 

166 return filtered_dict 

167 

168 

169class CallArgs: 

170 def __init__(self, **kwargs): 

171 """A class that records call arguments 

172 

173 The call arguments must be passed as keyword arguments. It will set 

174 each keyword argument as an attribute of the object along with its 

175 associated value. 

176 """ 

177 for arg, value in kwargs.items(): 

178 setattr(self, arg, value) 

179 

180 

181class FunctionContainer: 

182 """An object that contains a function and any args or kwargs to call it 

183 

184 When called the provided function will be called with provided args 

185 and kwargs. 

186 """ 

187 

188 def __init__(self, func, *args, **kwargs): 

189 self._func = func 

190 self._args = args 

191 self._kwargs = kwargs 

192 

193 def __repr__(self): 

194 return 'Function: {} with args {} and kwargs {}'.format( 

195 self._func, self._args, self._kwargs 

196 ) 

197 

198 def __call__(self): 

199 return self._func(*self._args, **self._kwargs) 

200 

201 

202class CountCallbackInvoker: 

203 """An abstraction to invoke a callback when a shared count reaches zero 

204 

205 :param callback: Callback invoke when finalized count reaches zero 

206 """ 

207 

208 def __init__(self, callback): 

209 self._lock = threading.Lock() 

210 self._callback = callback 

211 self._count = 0 

212 self._is_finalized = False 

213 

214 @property 

215 def current_count(self): 

216 with self._lock: 

217 return self._count 

218 

219 def increment(self): 

220 """Increment the count by one""" 

221 with self._lock: 

222 if self._is_finalized: 

223 raise RuntimeError( 

224 'Counter has been finalized it can no longer be ' 

225 'incremented.' 

226 ) 

227 self._count += 1 

228 

229 def decrement(self): 

230 """Decrement the count by one""" 

231 with self._lock: 

232 if self._count == 0: 

233 raise RuntimeError( 

234 'Counter is at zero. It cannot dip below zero' 

235 ) 

236 self._count -= 1 

237 if self._is_finalized and self._count == 0: 

238 self._callback() 

239 

240 def finalize(self): 

241 """Finalize the counter 

242 

243 Once finalized, the counter never be incremented and the callback 

244 can be invoked once the count reaches zero 

245 """ 

246 with self._lock: 

247 self._is_finalized = True 

248 if self._count == 0: 

249 self._callback() 

250 

251 

252class OSUtils: 

253 _MAX_FILENAME_LEN = 255 

254 

255 def get_file_size(self, filename): 

256 return os.path.getsize(filename) 

257 

258 def open_file_chunk_reader(self, filename, start_byte, size, callbacks): 

259 return ReadFileChunk.from_filename( 

260 filename, start_byte, size, callbacks, enable_callbacks=False 

261 ) 

262 

263 def open_file_chunk_reader_from_fileobj( 

264 self, 

265 fileobj, 

266 chunk_size, 

267 full_file_size, 

268 callbacks, 

269 close_callbacks=None, 

270 ): 

271 return ReadFileChunk( 

272 fileobj, 

273 chunk_size, 

274 full_file_size, 

275 callbacks=callbacks, 

276 enable_callbacks=False, 

277 close_callbacks=close_callbacks, 

278 ) 

279 

280 def open(self, filename, mode): 

281 return open(filename, mode) 

282 

283 def remove_file(self, filename): 

284 """Remove a file, noop if file does not exist.""" 

285 # Unlike os.remove, if the file does not exist, 

286 # then this method does nothing. 

287 try: 

288 os.remove(filename) 

289 except OSError: 

290 pass 

291 

292 def rename_file(self, current_filename, new_filename): 

293 rename_file(current_filename, new_filename) 

294 

295 def is_special_file(cls, filename): 

296 """Checks to see if a file is a special UNIX file. 

297 

298 It checks if the file is a character special device, block special 

299 device, FIFO, or socket. 

300 

301 :param filename: Name of the file 

302 

303 :returns: True if the file is a special file. False, if is not. 

304 """ 

305 # If it does not exist, it must be a new file so it cannot be 

306 # a special file. 

307 if not os.path.exists(filename): 

308 return False 

309 mode = os.stat(filename).st_mode 

310 # Character special device. 

311 if stat.S_ISCHR(mode): 

312 return True 

313 # Block special device 

314 if stat.S_ISBLK(mode): 

315 return True 

316 # Named pipe / FIFO 

317 if stat.S_ISFIFO(mode): 

318 return True 

319 # Socket. 

320 if stat.S_ISSOCK(mode): 

321 return True 

322 return False 

323 

324 def get_temp_filename(self, filename): 

325 suffix = os.extsep + random_file_extension() 

326 path = os.path.dirname(filename) 

327 name = os.path.basename(filename) 

328 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix 

329 return os.path.join(path, temp_filename) 

330 

331 def allocate(self, filename, size): 

332 try: 

333 with self.open(filename, 'wb') as f: 

334 fallocate(f, size) 

335 except OSError: 

336 self.remove_file(filename) 

337 raise 

338 

339 

340class DeferredOpenFile: 

341 def __init__(self, filename, start_byte=0, mode='rb', open_function=open): 

342 """A class that defers the opening of a file till needed 

343 

344 This is useful for deferring opening of a file till it is needed 

345 in a separate thread, as there is a limit of how many open files 

346 there can be in a single thread for most operating systems. The 

347 file gets opened in the following methods: ``read()``, ``seek()``, 

348 and ``__enter__()`` 

349 

350 :type filename: str 

351 :param filename: The name of the file to open 

352 

353 :type start_byte: int 

354 :param start_byte: The byte to seek to when the file is opened. 

355 

356 :type mode: str 

357 :param mode: The mode to use to open the file 

358 

359 :type open_function: function 

360 :param open_function: The function to use to open the file 

361 """ 

362 self._filename = filename 

363 self._fileobj = None 

364 self._start_byte = start_byte 

365 self._mode = mode 

366 self._open_function = open_function 

367 

368 def _open_if_needed(self): 

369 if self._fileobj is None: 

370 self._fileobj = self._open_function(self._filename, self._mode) 

371 if self._start_byte != 0: 

372 self._fileobj.seek(self._start_byte) 

373 

374 @property 

375 def name(self): 

376 return self._filename 

377 

378 def read(self, amount=None): 

379 self._open_if_needed() 

380 return self._fileobj.read(amount) 

381 

382 def write(self, data): 

383 self._open_if_needed() 

384 self._fileobj.write(data) 

385 

386 def seek(self, where, whence=0): 

387 self._open_if_needed() 

388 self._fileobj.seek(where, whence) 

389 

390 def tell(self): 

391 if self._fileobj is None: 

392 return self._start_byte 

393 return self._fileobj.tell() 

394 

395 def close(self): 

396 if self._fileobj: 

397 self._fileobj.close() 

398 

399 def __enter__(self): 

400 self._open_if_needed() 

401 return self 

402 

403 def __exit__(self, *args, **kwargs): 

404 self.close() 

405 

406 

407class ReadFileChunk: 

408 def __init__( 

409 self, 

410 fileobj, 

411 chunk_size, 

412 full_file_size, 

413 callbacks=None, 

414 enable_callbacks=True, 

415 close_callbacks=None, 

416 ): 

417 """ 

418 

419 Given a file object shown below:: 

420 

421 |___________________________________________________| 

422 0 | | full_file_size 

423 |----chunk_size---| 

424 f.tell() 

425 

426 :type fileobj: file 

427 :param fileobj: File like object 

428 

429 :type chunk_size: int 

430 :param chunk_size: The max chunk size to read. Trying to read 

431 pass the end of the chunk size will behave like you've 

432 reached the end of the file. 

433 

434 :type full_file_size: int 

435 :param full_file_size: The entire content length associated 

436 with ``fileobj``. 

437 

438 :type callbacks: A list of function(amount_read) 

439 :param callbacks: Called whenever data is read from this object in the 

440 order provided. 

441 

442 :type enable_callbacks: boolean 

443 :param enable_callbacks: True if to run callbacks. Otherwise, do not 

444 run callbacks 

445 

446 :type close_callbacks: A list of function() 

447 :param close_callbacks: Called when close is called. The function 

448 should take no arguments. 

449 """ 

450 self._fileobj = fileobj 

451 self._start_byte = self._fileobj.tell() 

452 self._size = self._calculate_file_size( 

453 self._fileobj, 

454 requested_size=chunk_size, 

455 start_byte=self._start_byte, 

456 actual_file_size=full_file_size, 

457 ) 

458 # _amount_read represents the position in the chunk and may exceed 

459 # the chunk size, but won't allow reads out of bounds. 

460 self._amount_read = 0 

461 self._callbacks = callbacks 

462 if callbacks is None: 

463 self._callbacks = [] 

464 self._callbacks_enabled = enable_callbacks 

465 self._close_callbacks = close_callbacks 

466 if close_callbacks is None: 

467 self._close_callbacks = close_callbacks 

468 

469 @classmethod 

470 def from_filename( 

471 cls, 

472 filename, 

473 start_byte, 

474 chunk_size, 

475 callbacks=None, 

476 enable_callbacks=True, 

477 ): 

478 """Convenience factory function to create from a filename. 

479 

480 :type start_byte: int 

481 :param start_byte: The first byte from which to start reading. 

482 

483 :type chunk_size: int 

484 :param chunk_size: The max chunk size to read. Trying to read 

485 pass the end of the chunk size will behave like you've 

486 reached the end of the file. 

487 

488 :type full_file_size: int 

489 :param full_file_size: The entire content length associated 

490 with ``fileobj``. 

491 

492 :type callbacks: function(amount_read) 

493 :param callbacks: Called whenever data is read from this object. 

494 

495 :type enable_callbacks: bool 

496 :param enable_callbacks: Indicate whether to invoke callback 

497 during read() calls. 

498 

499 :rtype: ``ReadFileChunk`` 

500 :return: A new instance of ``ReadFileChunk`` 

501 

502 """ 

503 f = open(filename, 'rb') 

504 f.seek(start_byte) 

505 file_size = os.fstat(f.fileno()).st_size 

506 return cls(f, chunk_size, file_size, callbacks, enable_callbacks) 

507 

508 def _calculate_file_size( 

509 self, fileobj, requested_size, start_byte, actual_file_size 

510 ): 

511 max_chunk_size = actual_file_size - start_byte 

512 return min(max_chunk_size, requested_size) 

513 

514 def read(self, amount=None): 

515 amount_left = max(self._size - self._amount_read, 0) 

516 if amount is None: 

517 amount_to_read = amount_left 

518 else: 

519 amount_to_read = min(amount_left, amount) 

520 data = self._fileobj.read(amount_to_read) 

521 self._amount_read += len(data) 

522 if self._callbacks is not None and self._callbacks_enabled: 

523 invoke_progress_callbacks(self._callbacks, len(data)) 

524 return data 

525 

526 def signal_transferring(self): 

527 self.enable_callback() 

528 if hasattr(self._fileobj, 'signal_transferring'): 

529 self._fileobj.signal_transferring() 

530 

531 def signal_not_transferring(self): 

532 self.disable_callback() 

533 if hasattr(self._fileobj, 'signal_not_transferring'): 

534 self._fileobj.signal_not_transferring() 

535 

536 def enable_callback(self): 

537 self._callbacks_enabled = True 

538 

539 def disable_callback(self): 

540 self._callbacks_enabled = False 

541 

542 def seek(self, where, whence=0): 

543 if whence not in (0, 1, 2): 

544 # Mimic io's error for invalid whence values 

545 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") 

546 

547 # Recalculate where based on chunk attributes so seek from file 

548 # start (whence=0) is always used 

549 where += self._start_byte 

550 if whence == 1: 

551 where += self._amount_read 

552 elif whence == 2: 

553 where += self._size 

554 

555 self._fileobj.seek(max(where, self._start_byte)) 

556 if self._callbacks is not None and self._callbacks_enabled: 

557 # To also rewind the callback() for an accurate progress report 

558 bounded_where = max(min(where - self._start_byte, self._size), 0) 

559 bounded_amount_read = min(self._amount_read, self._size) 

560 amount = bounded_where - bounded_amount_read 

561 invoke_progress_callbacks( 

562 self._callbacks, bytes_transferred=amount 

563 ) 

564 self._amount_read = max(where - self._start_byte, 0) 

565 

566 def close(self): 

567 if self._close_callbacks is not None and self._callbacks_enabled: 

568 for callback in self._close_callbacks: 

569 callback() 

570 self._fileobj.close() 

571 

572 def tell(self): 

573 return self._amount_read 

574 

575 def __len__(self): 

576 # __len__ is defined because requests will try to determine the length 

577 # of the stream to set a content length. In the normal case 

578 # of the file it will just stat the file, but we need to change that 

579 # behavior. By providing a __len__, requests will use that instead 

580 # of stat'ing the file. 

581 return self._size 

582 

583 def __enter__(self): 

584 return self 

585 

586 def __exit__(self, *args, **kwargs): 

587 self.close() 

588 

589 def __iter__(self): 

590 # This is a workaround for http://bugs.python.org/issue17575 

591 # Basically httplib will try to iterate over the contents, even 

592 # if its a file like object. This wasn't noticed because we've 

593 # already exhausted the stream so iterating over the file immediately 

594 # stops, which is what we're simulating here. 

595 return iter([]) 

596 

597 

598class StreamReaderProgress: 

599 """Wrapper for a read only stream that adds progress callbacks.""" 

600 

601 def __init__(self, stream, callbacks=None): 

602 self._stream = stream 

603 self._callbacks = callbacks 

604 if callbacks is None: 

605 self._callbacks = [] 

606 

607 def read(self, *args, **kwargs): 

608 value = self._stream.read(*args, **kwargs) 

609 invoke_progress_callbacks(self._callbacks, len(value)) 

610 return value 

611 

612 

613class NoResourcesAvailable(Exception): 

614 pass 

615 

616 

617class TaskSemaphore: 

618 def __init__(self, count): 

619 """A semaphore for the purpose of limiting the number of tasks 

620 

621 :param count: The size of semaphore 

622 """ 

623 self._semaphore = threading.Semaphore(count) 

624 

625 def acquire(self, tag, blocking=True): 

626 """Acquire the semaphore 

627 

628 :param tag: A tag identifying what is acquiring the semaphore. Note 

629 that this is not really needed to directly use this class but is 

630 needed for API compatibility with the SlidingWindowSemaphore 

631 implementation. 

632 :param block: If True, block until it can be acquired. If False, 

633 do not block and raise an exception if cannot be acquired. 

634 

635 :returns: A token (can be None) to use when releasing the semaphore 

636 """ 

637 logger.debug("Acquiring %s", tag) 

638 if not self._semaphore.acquire(blocking): 

639 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) 

640 

641 def release(self, tag, acquire_token): 

642 """Release the semaphore 

643 

644 :param tag: A tag identifying what is releasing the semaphore 

645 :param acquire_token: The token returned from when the semaphore was 

646 acquired. Note that this is not really needed to directly use this 

647 class but is needed for API compatibility with the 

648 SlidingWindowSemaphore implementation. 

649 """ 

650 logger.debug(f"Releasing acquire {tag}/{acquire_token}") 

651 self._semaphore.release() 

652 

653 

654class SlidingWindowSemaphore(TaskSemaphore): 

655 """A semaphore used to coordinate sequential resource access. 

656 

657 This class is similar to the stdlib BoundedSemaphore: 

658 

659 * It's initialized with a count. 

660 * Each call to ``acquire()`` decrements the counter. 

661 * If the count is at zero, then ``acquire()`` will either block until the 

662 count increases, or if ``blocking=False``, then it will raise 

663 a NoResourcesAvailable exception indicating that it failed to acquire the 

664 semaphore. 

665 

666 The main difference is that this semaphore is used to limit 

667 access to a resource that requires sequential access. For example, 

668 if I want to access resource R that has 20 subresources R_0 - R_19, 

669 this semaphore can also enforce that you only have a max range of 

670 10 at any given point in time. You must also specify a tag name 

671 when you acquire the semaphore. The sliding window semantics apply 

672 on a per tag basis. The internal count will only be incremented 

673 when the minimum sequence number for a tag is released. 

674 

675 """ 

676 

677 def __init__(self, count): 

678 self._count = count 

679 # Dict[tag, next_sequence_number]. 

680 self._tag_sequences = defaultdict(int) 

681 self._lowest_sequence = {} 

682 self._lock = threading.Lock() 

683 self._condition = threading.Condition(self._lock) 

684 # Dict[tag, List[sequence_number]] 

685 self._pending_release = {} 

686 

687 def current_count(self): 

688 with self._lock: 

689 return self._count 

690 

691 def acquire(self, tag, blocking=True): 

692 logger.debug("Acquiring %s", tag) 

693 self._condition.acquire() 

694 try: 

695 if self._count == 0: 

696 if not blocking: 

697 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) 

698 else: 

699 while self._count == 0: 

700 self._condition.wait() 

701 # self._count is no longer zero. 

702 # First, check if this is the first time we're seeing this tag. 

703 sequence_number = self._tag_sequences[tag] 

704 if sequence_number == 0: 

705 # First time seeing the tag, so record we're at 0. 

706 self._lowest_sequence[tag] = sequence_number 

707 self._tag_sequences[tag] += 1 

708 self._count -= 1 

709 return sequence_number 

710 finally: 

711 self._condition.release() 

712 

713 def release(self, tag, acquire_token): 

714 sequence_number = acquire_token 

715 logger.debug("Releasing acquire %s/%s", tag, sequence_number) 

716 self._condition.acquire() 

717 try: 

718 if tag not in self._tag_sequences: 

719 raise ValueError("Attempted to release unknown tag: %s" % tag) 

720 max_sequence = self._tag_sequences[tag] 

721 if self._lowest_sequence[tag] == sequence_number: 

722 # We can immediately process this request and free up 

723 # resources. 

724 self._lowest_sequence[tag] += 1 

725 self._count += 1 

726 self._condition.notify() 

727 queued = self._pending_release.get(tag, []) 

728 while queued: 

729 if self._lowest_sequence[tag] == queued[-1]: 

730 queued.pop() 

731 self._lowest_sequence[tag] += 1 

732 self._count += 1 

733 else: 

734 break 

735 elif self._lowest_sequence[tag] < sequence_number < max_sequence: 

736 # We can't do anything right now because we're still waiting 

737 # for the min sequence for the tag to be released. We have 

738 # to queue this for pending release. 

739 self._pending_release.setdefault(tag, []).append( 

740 sequence_number 

741 ) 

742 self._pending_release[tag].sort(reverse=True) 

743 else: 

744 raise ValueError( 

745 "Attempted to release unknown sequence number " 

746 "%s for tag: %s" % (sequence_number, tag) 

747 ) 

748 finally: 

749 self._condition.release() 

750 

751 

752class ChunksizeAdjuster: 

753 def __init__( 

754 self, 

755 max_size=MAX_SINGLE_UPLOAD_SIZE, 

756 min_size=MIN_UPLOAD_CHUNKSIZE, 

757 max_parts=MAX_PARTS, 

758 ): 

759 self.max_size = max_size 

760 self.min_size = min_size 

761 self.max_parts = max_parts 

762 

763 def adjust_chunksize(self, current_chunksize, file_size=None): 

764 """Get a chunksize close to current that fits within all S3 limits. 

765 

766 :type current_chunksize: int 

767 :param current_chunksize: The currently configured chunksize. 

768 

769 :type file_size: int or None 

770 :param file_size: The size of the file to upload. This might be None 

771 if the object being transferred has an unknown size. 

772 

773 :returns: A valid chunksize that fits within configured limits. 

774 """ 

775 chunksize = current_chunksize 

776 if file_size is not None: 

777 chunksize = self._adjust_for_max_parts(chunksize, file_size) 

778 return self._adjust_for_chunksize_limits(chunksize) 

779 

780 def _adjust_for_chunksize_limits(self, current_chunksize): 

781 if current_chunksize > self.max_size: 

782 logger.debug( 

783 "Chunksize greater than maximum chunksize. " 

784 "Setting to %s from %s." % (self.max_size, current_chunksize) 

785 ) 

786 return self.max_size 

787 elif current_chunksize < self.min_size: 

788 logger.debug( 

789 "Chunksize less than minimum chunksize. " 

790 "Setting to %s from %s." % (self.min_size, current_chunksize) 

791 ) 

792 return self.min_size 

793 else: 

794 return current_chunksize 

795 

796 def _adjust_for_max_parts(self, current_chunksize, file_size): 

797 chunksize = current_chunksize 

798 num_parts = int(math.ceil(file_size / float(chunksize))) 

799 

800 while num_parts > self.max_parts: 

801 chunksize *= 2 

802 num_parts = int(math.ceil(file_size / float(chunksize))) 

803 

804 if chunksize != current_chunksize: 

805 logger.debug( 

806 "Chunksize would result in the number of parts exceeding the " 

807 "maximum. Setting to %s from %s." 

808 % (chunksize, current_chunksize) 

809 ) 

810 

811 return chunksize 

812 

813 

814def add_s3express_defaults(bucket, extra_args): 

815 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args: 

816 # Default Transfer Operations to S3Express to use CRC32 

817 extra_args["ChecksumAlgorithm"] = "crc32"