Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/utils.py: 28%

355 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import functools 

14import logging 

15import math 

16import os 

17import random 

18import socket 

19import stat 

20import string 

21import threading 

22from collections import defaultdict 

23 

24from botocore.exceptions import IncompleteReadError, ReadTimeoutError 

25from botocore.httpchecksum import AwsChunkedWrapper 

26from botocore.utils import is_s3express_bucket 

27 

28from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file 

29 

30MAX_PARTS = 10000 

31# The maximum file size you can upload via S3 per request. 

32# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html 

33# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html 

34MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3) 

35MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2) 

36logger = logging.getLogger(__name__) 

37 

38 

39S3_RETRYABLE_DOWNLOAD_ERRORS = ( 

40 socket.timeout, 

41 SOCKET_ERROR, 

42 ReadTimeoutError, 

43 IncompleteReadError, 

44) 

45 

46 

47def random_file_extension(num_digits=8): 

48 return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) 

49 

50 

51def signal_not_transferring(request, operation_name, **kwargs): 

52 if operation_name in ['PutObject', 'UploadPart'] and hasattr( 

53 request.body, 'signal_not_transferring' 

54 ): 

55 request.body.signal_not_transferring() 

56 

57 

58def signal_transferring(request, operation_name, **kwargs): 

59 if operation_name in ['PutObject', 'UploadPart']: 

60 body = request.body 

61 if isinstance(body, AwsChunkedWrapper): 

62 body = getattr(body, '_raw', None) 

63 if hasattr(body, 'signal_transferring'): 

64 body.signal_transferring() 

65 

66 

67def calculate_num_parts(size, part_size): 

68 return int(math.ceil(size / float(part_size))) 

69 

70 

71def calculate_range_parameter( 

72 part_size, part_index, num_parts, total_size=None 

73): 

74 """Calculate the range parameter for multipart downloads/copies 

75 

76 :type part_size: int 

77 :param part_size: The size of the part 

78 

79 :type part_index: int 

80 :param part_index: The index for which this parts starts. This index starts 

81 at zero 

82 

83 :type num_parts: int 

84 :param num_parts: The total number of parts in the transfer 

85 

86 :returns: The value to use for Range parameter on downloads or 

87 the CopySourceRange parameter for copies 

88 """ 

89 # Used to calculate the Range parameter 

90 start_range = part_index * part_size 

91 if part_index == num_parts - 1: 

92 end_range = '' 

93 if total_size is not None: 

94 end_range = str(total_size - 1) 

95 else: 

96 end_range = start_range + part_size - 1 

97 range_param = f'bytes={start_range}-{end_range}' 

98 return range_param 

99 

100 

101def get_callbacks(transfer_future, callback_type): 

102 """Retrieves callbacks from a subscriber 

103 

104 :type transfer_future: s3transfer.futures.TransferFuture 

105 :param transfer_future: The transfer future the subscriber is associated 

106 to. 

107 

108 :type callback_type: str 

109 :param callback_type: The type of callback to retrieve from the subscriber. 

110 Valid types include: 

111 * 'queued' 

112 * 'progress' 

113 * 'done' 

114 

115 :returns: A list of callbacks for the type specified. All callbacks are 

116 preinjected with the transfer future. 

117 """ 

118 callbacks = [] 

119 for subscriber in transfer_future.meta.call_args.subscribers: 

120 callback_name = 'on_' + callback_type 

121 if hasattr(subscriber, callback_name): 

122 callbacks.append( 

123 functools.partial( 

124 getattr(subscriber, callback_name), future=transfer_future 

125 ) 

126 ) 

127 return callbacks 

128 

129 

130def invoke_progress_callbacks(callbacks, bytes_transferred): 

131 """Calls all progress callbacks 

132 

133 :param callbacks: A list of progress callbacks to invoke 

134 :param bytes_transferred: The number of bytes transferred. This is passed 

135 to the callbacks. If no bytes were transferred the callbacks will not 

136 be invoked because no progress was achieved. It is also possible 

137 to receive a negative amount which comes from retrying a transfer 

138 request. 

139 """ 

140 # Only invoke the callbacks if bytes were actually transferred. 

141 if bytes_transferred: 

142 for callback in callbacks: 

143 callback(bytes_transferred=bytes_transferred) 

144 

145 

146def get_filtered_dict(original_dict, whitelisted_keys): 

147 """Gets a dictionary filtered by whitelisted keys 

148 

149 :param original_dict: The original dictionary of arguments to source keys 

150 and values. 

151 :param whitelisted_key: A list of keys to include in the filtered 

152 dictionary. 

153 

154 :returns: A dictionary containing key/values from the original dictionary 

155 whose key was included in the whitelist 

156 """ 

157 filtered_dict = {} 

158 for key, value in original_dict.items(): 

159 if key in whitelisted_keys: 

160 filtered_dict[key] = value 

161 return filtered_dict 

162 

163 

164class CallArgs: 

165 def __init__(self, **kwargs): 

166 """A class that records call arguments 

167 

168 The call arguments must be passed as keyword arguments. It will set 

169 each keyword argument as an attribute of the object along with its 

170 associated value. 

171 """ 

172 for arg, value in kwargs.items(): 

173 setattr(self, arg, value) 

174 

175 

176class FunctionContainer: 

177 """An object that contains a function and any args or kwargs to call it 

178 

179 When called the provided function will be called with provided args 

180 and kwargs. 

181 """ 

182 

183 def __init__(self, func, *args, **kwargs): 

184 self._func = func 

185 self._args = args 

186 self._kwargs = kwargs 

187 

188 def __repr__(self): 

189 return 'Function: {} with args {} and kwargs {}'.format( 

190 self._func, self._args, self._kwargs 

191 ) 

192 

193 def __call__(self): 

194 return self._func(*self._args, **self._kwargs) 

195 

196 

197class CountCallbackInvoker: 

198 """An abstraction to invoke a callback when a shared count reaches zero 

199 

200 :param callback: Callback invoke when finalized count reaches zero 

201 """ 

202 

203 def __init__(self, callback): 

204 self._lock = threading.Lock() 

205 self._callback = callback 

206 self._count = 0 

207 self._is_finalized = False 

208 

209 @property 

210 def current_count(self): 

211 with self._lock: 

212 return self._count 

213 

214 def increment(self): 

215 """Increment the count by one""" 

216 with self._lock: 

217 if self._is_finalized: 

218 raise RuntimeError( 

219 'Counter has been finalized it can no longer be ' 

220 'incremented.' 

221 ) 

222 self._count += 1 

223 

224 def decrement(self): 

225 """Decrement the count by one""" 

226 with self._lock: 

227 if self._count == 0: 

228 raise RuntimeError( 

229 'Counter is at zero. It cannot dip below zero' 

230 ) 

231 self._count -= 1 

232 if self._is_finalized and self._count == 0: 

233 self._callback() 

234 

235 def finalize(self): 

236 """Finalize the counter 

237 

238 Once finalized, the counter never be incremented and the callback 

239 can be invoked once the count reaches zero 

240 """ 

241 with self._lock: 

242 self._is_finalized = True 

243 if self._count == 0: 

244 self._callback() 

245 

246 

247class OSUtils: 

248 _MAX_FILENAME_LEN = 255 

249 

250 def get_file_size(self, filename): 

251 return os.path.getsize(filename) 

252 

253 def open_file_chunk_reader(self, filename, start_byte, size, callbacks): 

254 return ReadFileChunk.from_filename( 

255 filename, start_byte, size, callbacks, enable_callbacks=False 

256 ) 

257 

258 def open_file_chunk_reader_from_fileobj( 

259 self, 

260 fileobj, 

261 chunk_size, 

262 full_file_size, 

263 callbacks, 

264 close_callbacks=None, 

265 ): 

266 return ReadFileChunk( 

267 fileobj, 

268 chunk_size, 

269 full_file_size, 

270 callbacks=callbacks, 

271 enable_callbacks=False, 

272 close_callbacks=close_callbacks, 

273 ) 

274 

275 def open(self, filename, mode): 

276 return open(filename, mode) 

277 

278 def remove_file(self, filename): 

279 """Remove a file, noop if file does not exist.""" 

280 # Unlike os.remove, if the file does not exist, 

281 # then this method does nothing. 

282 try: 

283 os.remove(filename) 

284 except OSError: 

285 pass 

286 

287 def rename_file(self, current_filename, new_filename): 

288 rename_file(current_filename, new_filename) 

289 

290 def is_special_file(cls, filename): 

291 """Checks to see if a file is a special UNIX file. 

292 

293 It checks if the file is a character special device, block special 

294 device, FIFO, or socket. 

295 

296 :param filename: Name of the file 

297 

298 :returns: True if the file is a special file. False, if is not. 

299 """ 

300 # If it does not exist, it must be a new file so it cannot be 

301 # a special file. 

302 if not os.path.exists(filename): 

303 return False 

304 mode = os.stat(filename).st_mode 

305 # Character special device. 

306 if stat.S_ISCHR(mode): 

307 return True 

308 # Block special device 

309 if stat.S_ISBLK(mode): 

310 return True 

311 # Named pipe / FIFO 

312 if stat.S_ISFIFO(mode): 

313 return True 

314 # Socket. 

315 if stat.S_ISSOCK(mode): 

316 return True 

317 return False 

318 

319 def get_temp_filename(self, filename): 

320 suffix = os.extsep + random_file_extension() 

321 path = os.path.dirname(filename) 

322 name = os.path.basename(filename) 

323 temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix 

324 return os.path.join(path, temp_filename) 

325 

326 def allocate(self, filename, size): 

327 try: 

328 with self.open(filename, 'wb') as f: 

329 fallocate(f, size) 

330 except OSError: 

331 self.remove_file(filename) 

332 raise 

333 

334 

335class DeferredOpenFile: 

336 def __init__(self, filename, start_byte=0, mode='rb', open_function=open): 

337 """A class that defers the opening of a file till needed 

338 

339 This is useful for deferring opening of a file till it is needed 

340 in a separate thread, as there is a limit of how many open files 

341 there can be in a single thread for most operating systems. The 

342 file gets opened in the following methods: ``read()``, ``seek()``, 

343 and ``__enter__()`` 

344 

345 :type filename: str 

346 :param filename: The name of the file to open 

347 

348 :type start_byte: int 

349 :param start_byte: The byte to seek to when the file is opened. 

350 

351 :type mode: str 

352 :param mode: The mode to use to open the file 

353 

354 :type open_function: function 

355 :param open_function: The function to use to open the file 

356 """ 

357 self._filename = filename 

358 self._fileobj = None 

359 self._start_byte = start_byte 

360 self._mode = mode 

361 self._open_function = open_function 

362 

363 def _open_if_needed(self): 

364 if self._fileobj is None: 

365 self._fileobj = self._open_function(self._filename, self._mode) 

366 if self._start_byte != 0: 

367 self._fileobj.seek(self._start_byte) 

368 

369 @property 

370 def name(self): 

371 return self._filename 

372 

373 def read(self, amount=None): 

374 self._open_if_needed() 

375 return self._fileobj.read(amount) 

376 

377 def write(self, data): 

378 self._open_if_needed() 

379 self._fileobj.write(data) 

380 

381 def seek(self, where, whence=0): 

382 self._open_if_needed() 

383 self._fileobj.seek(where, whence) 

384 

385 def tell(self): 

386 if self._fileobj is None: 

387 return self._start_byte 

388 return self._fileobj.tell() 

389 

390 def close(self): 

391 if self._fileobj: 

392 self._fileobj.close() 

393 

394 def __enter__(self): 

395 self._open_if_needed() 

396 return self 

397 

398 def __exit__(self, *args, **kwargs): 

399 self.close() 

400 

401 

402class ReadFileChunk: 

403 def __init__( 

404 self, 

405 fileobj, 

406 chunk_size, 

407 full_file_size, 

408 callbacks=None, 

409 enable_callbacks=True, 

410 close_callbacks=None, 

411 ): 

412 """ 

413 

414 Given a file object shown below:: 

415 

416 |___________________________________________________| 

417 0 | | full_file_size 

418 |----chunk_size---| 

419 f.tell() 

420 

421 :type fileobj: file 

422 :param fileobj: File like object 

423 

424 :type chunk_size: int 

425 :param chunk_size: The max chunk size to read. Trying to read 

426 pass the end of the chunk size will behave like you've 

427 reached the end of the file. 

428 

429 :type full_file_size: int 

430 :param full_file_size: The entire content length associated 

431 with ``fileobj``. 

432 

433 :type callbacks: A list of function(amount_read) 

434 :param callbacks: Called whenever data is read from this object in the 

435 order provided. 

436 

437 :type enable_callbacks: boolean 

438 :param enable_callbacks: True if to run callbacks. Otherwise, do not 

439 run callbacks 

440 

441 :type close_callbacks: A list of function() 

442 :param close_callbacks: Called when close is called. The function 

443 should take no arguments. 

444 """ 

445 self._fileobj = fileobj 

446 self._start_byte = self._fileobj.tell() 

447 self._size = self._calculate_file_size( 

448 self._fileobj, 

449 requested_size=chunk_size, 

450 start_byte=self._start_byte, 

451 actual_file_size=full_file_size, 

452 ) 

453 # _amount_read represents the position in the chunk and may exceed 

454 # the chunk size, but won't allow reads out of bounds. 

455 self._amount_read = 0 

456 self._callbacks = callbacks 

457 if callbacks is None: 

458 self._callbacks = [] 

459 self._callbacks_enabled = enable_callbacks 

460 self._close_callbacks = close_callbacks 

461 if close_callbacks is None: 

462 self._close_callbacks = close_callbacks 

463 

464 @classmethod 

465 def from_filename( 

466 cls, 

467 filename, 

468 start_byte, 

469 chunk_size, 

470 callbacks=None, 

471 enable_callbacks=True, 

472 ): 

473 """Convenience factory function to create from a filename. 

474 

475 :type start_byte: int 

476 :param start_byte: The first byte from which to start reading. 

477 

478 :type chunk_size: int 

479 :param chunk_size: The max chunk size to read. Trying to read 

480 pass the end of the chunk size will behave like you've 

481 reached the end of the file. 

482 

483 :type full_file_size: int 

484 :param full_file_size: The entire content length associated 

485 with ``fileobj``. 

486 

487 :type callbacks: function(amount_read) 

488 :param callbacks: Called whenever data is read from this object. 

489 

490 :type enable_callbacks: bool 

491 :param enable_callbacks: Indicate whether to invoke callback 

492 during read() calls. 

493 

494 :rtype: ``ReadFileChunk`` 

495 :return: A new instance of ``ReadFileChunk`` 

496 

497 """ 

498 f = open(filename, 'rb') 

499 f.seek(start_byte) 

500 file_size = os.fstat(f.fileno()).st_size 

501 return cls(f, chunk_size, file_size, callbacks, enable_callbacks) 

502 

503 def _calculate_file_size( 

504 self, fileobj, requested_size, start_byte, actual_file_size 

505 ): 

506 max_chunk_size = actual_file_size - start_byte 

507 return min(max_chunk_size, requested_size) 

508 

509 def read(self, amount=None): 

510 amount_left = max(self._size - self._amount_read, 0) 

511 if amount is None: 

512 amount_to_read = amount_left 

513 else: 

514 amount_to_read = min(amount_left, amount) 

515 data = self._fileobj.read(amount_to_read) 

516 self._amount_read += len(data) 

517 if self._callbacks is not None and self._callbacks_enabled: 

518 invoke_progress_callbacks(self._callbacks, len(data)) 

519 return data 

520 

521 def signal_transferring(self): 

522 self.enable_callback() 

523 if hasattr(self._fileobj, 'signal_transferring'): 

524 self._fileobj.signal_transferring() 

525 

526 def signal_not_transferring(self): 

527 self.disable_callback() 

528 if hasattr(self._fileobj, 'signal_not_transferring'): 

529 self._fileobj.signal_not_transferring() 

530 

531 def enable_callback(self): 

532 self._callbacks_enabled = True 

533 

534 def disable_callback(self): 

535 self._callbacks_enabled = False 

536 

537 def seek(self, where, whence=0): 

538 if whence not in (0, 1, 2): 

539 # Mimic io's error for invalid whence values 

540 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") 

541 

542 # Recalculate where based on chunk attributes so seek from file 

543 # start (whence=0) is always used 

544 where += self._start_byte 

545 if whence == 1: 

546 where += self._amount_read 

547 elif whence == 2: 

548 where += self._size 

549 

550 self._fileobj.seek(max(where, self._start_byte)) 

551 if self._callbacks is not None and self._callbacks_enabled: 

552 # To also rewind the callback() for an accurate progress report 

553 bounded_where = max(min(where - self._start_byte, self._size), 0) 

554 bounded_amount_read = min(self._amount_read, self._size) 

555 amount = bounded_where - bounded_amount_read 

556 invoke_progress_callbacks( 

557 self._callbacks, bytes_transferred=amount 

558 ) 

559 self._amount_read = max(where - self._start_byte, 0) 

560 

561 def close(self): 

562 if self._close_callbacks is not None and self._callbacks_enabled: 

563 for callback in self._close_callbacks: 

564 callback() 

565 self._fileobj.close() 

566 

567 def tell(self): 

568 return self._amount_read 

569 

570 def __len__(self): 

571 # __len__ is defined because requests will try to determine the length 

572 # of the stream to set a content length. In the normal case 

573 # of the file it will just stat the file, but we need to change that 

574 # behavior. By providing a __len__, requests will use that instead 

575 # of stat'ing the file. 

576 return self._size 

577 

578 def __enter__(self): 

579 return self 

580 

581 def __exit__(self, *args, **kwargs): 

582 self.close() 

583 

584 def __iter__(self): 

585 # This is a workaround for http://bugs.python.org/issue17575 

586 # Basically httplib will try to iterate over the contents, even 

587 # if its a file like object. This wasn't noticed because we've 

588 # already exhausted the stream so iterating over the file immediately 

589 # stops, which is what we're simulating here. 

590 return iter([]) 

591 

592 

593class StreamReaderProgress: 

594 """Wrapper for a read only stream that adds progress callbacks.""" 

595 

596 def __init__(self, stream, callbacks=None): 

597 self._stream = stream 

598 self._callbacks = callbacks 

599 if callbacks is None: 

600 self._callbacks = [] 

601 

602 def read(self, *args, **kwargs): 

603 value = self._stream.read(*args, **kwargs) 

604 invoke_progress_callbacks(self._callbacks, len(value)) 

605 return value 

606 

607 

608class NoResourcesAvailable(Exception): 

609 pass 

610 

611 

612class TaskSemaphore: 

613 def __init__(self, count): 

614 """A semaphore for the purpose of limiting the number of tasks 

615 

616 :param count: The size of semaphore 

617 """ 

618 self._semaphore = threading.Semaphore(count) 

619 

620 def acquire(self, tag, blocking=True): 

621 """Acquire the semaphore 

622 

623 :param tag: A tag identifying what is acquiring the semaphore. Note 

624 that this is not really needed to directly use this class but is 

625 needed for API compatibility with the SlidingWindowSemaphore 

626 implementation. 

627 :param block: If True, block until it can be acquired. If False, 

628 do not block and raise an exception if cannot be acquired. 

629 

630 :returns: A token (can be None) to use when releasing the semaphore 

631 """ 

632 logger.debug("Acquiring %s", tag) 

633 if not self._semaphore.acquire(blocking): 

634 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) 

635 

636 def release(self, tag, acquire_token): 

637 """Release the semaphore 

638 

639 :param tag: A tag identifying what is releasing the semaphore 

640 :param acquire_token: The token returned from when the semaphore was 

641 acquired. Note that this is not really needed to directly use this 

642 class but is needed for API compatibility with the 

643 SlidingWindowSemaphore implementation. 

644 """ 

645 logger.debug(f"Releasing acquire {tag}/{acquire_token}") 

646 self._semaphore.release() 

647 

648 

649class SlidingWindowSemaphore(TaskSemaphore): 

650 """A semaphore used to coordinate sequential resource access. 

651 

652 This class is similar to the stdlib BoundedSemaphore: 

653 

654 * It's initialized with a count. 

655 * Each call to ``acquire()`` decrements the counter. 

656 * If the count is at zero, then ``acquire()`` will either block until the 

657 count increases, or if ``blocking=False``, then it will raise 

658 a NoResourcesAvailable exception indicating that it failed to acquire the 

659 semaphore. 

660 

661 The main difference is that this semaphore is used to limit 

662 access to a resource that requires sequential access. For example, 

663 if I want to access resource R that has 20 subresources R_0 - R_19, 

664 this semaphore can also enforce that you only have a max range of 

665 10 at any given point in time. You must also specify a tag name 

666 when you acquire the semaphore. The sliding window semantics apply 

667 on a per tag basis. The internal count will only be incremented 

668 when the minimum sequence number for a tag is released. 

669 

670 """ 

671 

672 def __init__(self, count): 

673 self._count = count 

674 # Dict[tag, next_sequence_number]. 

675 self._tag_sequences = defaultdict(int) 

676 self._lowest_sequence = {} 

677 self._lock = threading.Lock() 

678 self._condition = threading.Condition(self._lock) 

679 # Dict[tag, List[sequence_number]] 

680 self._pending_release = {} 

681 

682 def current_count(self): 

683 with self._lock: 

684 return self._count 

685 

686 def acquire(self, tag, blocking=True): 

687 logger.debug("Acquiring %s", tag) 

688 self._condition.acquire() 

689 try: 

690 if self._count == 0: 

691 if not blocking: 

692 raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag) 

693 else: 

694 while self._count == 0: 

695 self._condition.wait() 

696 # self._count is no longer zero. 

697 # First, check if this is the first time we're seeing this tag. 

698 sequence_number = self._tag_sequences[tag] 

699 if sequence_number == 0: 

700 # First time seeing the tag, so record we're at 0. 

701 self._lowest_sequence[tag] = sequence_number 

702 self._tag_sequences[tag] += 1 

703 self._count -= 1 

704 return sequence_number 

705 finally: 

706 self._condition.release() 

707 

708 def release(self, tag, acquire_token): 

709 sequence_number = acquire_token 

710 logger.debug("Releasing acquire %s/%s", tag, sequence_number) 

711 self._condition.acquire() 

712 try: 

713 if tag not in self._tag_sequences: 

714 raise ValueError("Attempted to release unknown tag: %s" % tag) 

715 max_sequence = self._tag_sequences[tag] 

716 if self._lowest_sequence[tag] == sequence_number: 

717 # We can immediately process this request and free up 

718 # resources. 

719 self._lowest_sequence[tag] += 1 

720 self._count += 1 

721 self._condition.notify() 

722 queued = self._pending_release.get(tag, []) 

723 while queued: 

724 if self._lowest_sequence[tag] == queued[-1]: 

725 queued.pop() 

726 self._lowest_sequence[tag] += 1 

727 self._count += 1 

728 else: 

729 break 

730 elif self._lowest_sequence[tag] < sequence_number < max_sequence: 

731 # We can't do anything right now because we're still waiting 

732 # for the min sequence for the tag to be released. We have 

733 # to queue this for pending release. 

734 self._pending_release.setdefault(tag, []).append( 

735 sequence_number 

736 ) 

737 self._pending_release[tag].sort(reverse=True) 

738 else: 

739 raise ValueError( 

740 "Attempted to release unknown sequence number " 

741 "%s for tag: %s" % (sequence_number, tag) 

742 ) 

743 finally: 

744 self._condition.release() 

745 

746 

747class ChunksizeAdjuster: 

748 def __init__( 

749 self, 

750 max_size=MAX_SINGLE_UPLOAD_SIZE, 

751 min_size=MIN_UPLOAD_CHUNKSIZE, 

752 max_parts=MAX_PARTS, 

753 ): 

754 self.max_size = max_size 

755 self.min_size = min_size 

756 self.max_parts = max_parts 

757 

758 def adjust_chunksize(self, current_chunksize, file_size=None): 

759 """Get a chunksize close to current that fits within all S3 limits. 

760 

761 :type current_chunksize: int 

762 :param current_chunksize: The currently configured chunksize. 

763 

764 :type file_size: int or None 

765 :param file_size: The size of the file to upload. This might be None 

766 if the object being transferred has an unknown size. 

767 

768 :returns: A valid chunksize that fits within configured limits. 

769 """ 

770 chunksize = current_chunksize 

771 if file_size is not None: 

772 chunksize = self._adjust_for_max_parts(chunksize, file_size) 

773 return self._adjust_for_chunksize_limits(chunksize) 

774 

775 def _adjust_for_chunksize_limits(self, current_chunksize): 

776 if current_chunksize > self.max_size: 

777 logger.debug( 

778 "Chunksize greater than maximum chunksize. " 

779 "Setting to %s from %s." % (self.max_size, current_chunksize) 

780 ) 

781 return self.max_size 

782 elif current_chunksize < self.min_size: 

783 logger.debug( 

784 "Chunksize less than minimum chunksize. " 

785 "Setting to %s from %s." % (self.min_size, current_chunksize) 

786 ) 

787 return self.min_size 

788 else: 

789 return current_chunksize 

790 

791 def _adjust_for_max_parts(self, current_chunksize, file_size): 

792 chunksize = current_chunksize 

793 num_parts = int(math.ceil(file_size / float(chunksize))) 

794 

795 while num_parts > self.max_parts: 

796 chunksize *= 2 

797 num_parts = int(math.ceil(file_size / float(chunksize))) 

798 

799 if chunksize != current_chunksize: 

800 logger.debug( 

801 "Chunksize would result in the number of parts exceeding the " 

802 "maximum. Setting to %s from %s." 

803 % (chunksize, current_chunksize) 

804 ) 

805 

806 return chunksize 

807 

808 

809def add_s3express_defaults(bucket, extra_args): 

810 if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args: 

811 # Default Transfer Operations to S3Express to use CRC32 

812 extra_args["ChecksumAlgorithm"] = "crc32"