Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/manager.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

211 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import re 

16import threading 

17 

18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket 

19from s3transfer.constants import ( 

20 ALLOWED_DOWNLOAD_ARGS, 

21 FULL_OBJECT_CHECKSUM_ARGS, 

22 KB, 

23 MB, 

24) 

25from s3transfer.copies import CopySubmissionTask 

26from s3transfer.delete import DeleteSubmissionTask 

27from s3transfer.download import DownloadSubmissionTask 

28from s3transfer.exceptions import CancelledError, FatalError 

29from s3transfer.futures import ( 

30 IN_MEMORY_DOWNLOAD_TAG, 

31 IN_MEMORY_UPLOAD_TAG, 

32 BoundedExecutor, 

33 TransferCoordinator, 

34 TransferFuture, 

35 TransferMeta, 

36) 

37from s3transfer.upload import UploadSubmissionTask 

38from s3transfer.utils import ( 

39 CallArgs, 

40 OSUtils, 

41 SlidingWindowSemaphore, 

42 TaskSemaphore, 

43 get_callbacks, 

44 set_default_checksum_algorithm, 

45 signal_not_transferring, 

46 signal_transferring, 

47) 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52class TransferConfig: 

53 UNSET_DEFAULT = object() 

54 

55 def __init__( 

56 self, 

57 multipart_threshold=8 * MB, 

58 multipart_chunksize=8 * MB, 

59 max_request_concurrency=10, 

60 max_submission_concurrency=5, 

61 max_request_queue_size=1000, 

62 max_submission_queue_size=1000, 

63 max_io_queue_size=1000, 

64 io_chunksize=256 * KB, 

65 num_download_attempts=5, 

66 max_in_memory_upload_chunks=10, 

67 max_in_memory_download_chunks=10, 

68 max_bandwidth=None, 

69 ): 

70 """Configurations for the transfer manager 

71 

72 :param multipart_threshold: The threshold for which multipart 

73 transfers occur. 

74 

75 :param max_request_concurrency: The maximum number of S3 API 

76 transfer-related requests that can happen at a time. 

77 

78 :param max_submission_concurrency: The maximum number of threads 

79 processing a call to a TransferManager method. Processing a 

80 call usually entails determining which S3 API requests that need 

81 to be enqueued, but does **not** entail making any of the 

82 S3 API data transferring requests needed to perform the transfer. 

83 The threads controlled by ``max_request_concurrency`` is 

84 responsible for that. 

85 

86 :param multipart_chunksize: The size of each transfer if a request 

87 becomes a multipart transfer. 

88 

89 :param max_request_queue_size: The maximum amount of S3 API requests 

90 that can be queued at a time. 

91 

92 :param max_submission_queue_size: The maximum amount of 

93 TransferManager method calls that can be queued at a time. 

94 

95 :param max_io_queue_size: The maximum amount of read parts that 

96 can be queued to be written to disk per download. The default 

97 size for each elementin this queue is 8 KB. 

98 

99 :param io_chunksize: The max size of each chunk in the io queue. 

100 Currently, this is size used when reading from the downloaded 

101 stream as well. 

102 

103 :param num_download_attempts: The number of download attempts that 

104 will be tried upon errors with downloading an object in S3. Note 

105 that these retries account for errors that occur when streaming 

106 down the data from s3 (i.e. socket errors and read timeouts that 

107 occur after receiving an OK response from s3). 

108 Other retryable exceptions such as throttling errors and 5xx errors 

109 are already retried by botocore (this default is 5). The 

110 ``num_download_attempts`` does not take into account the 

111 number of exceptions retried by botocore. 

112 

113 :param max_in_memory_upload_chunks: The number of chunks that can 

114 be stored in memory at a time for all ongoing upload requests. 

115 This pertains to chunks of data that need to be stored in memory 

116 during an upload if the data is sourced from a file-like object. 

117 The total maximum memory footprint due to a in-memory upload 

118 chunks is roughly equal to: 

119 

120 max_in_memory_upload_chunks * multipart_chunksize 

121 + max_submission_concurrency * multipart_chunksize 

122 

123 ``max_submission_concurrency`` has an affect on this value because 

124 for each thread pulling data off of a file-like object, they may 

125 be waiting with a single read chunk to be submitted for upload 

126 because the ``max_in_memory_upload_chunks`` value has been reached 

127 by the threads making the upload request. 

128 

129 :param max_in_memory_download_chunks: The number of chunks that can 

130 be buffered in memory and **not** in the io queue at a time for all 

131 ongoing download requests. This pertains specifically to file-like 

132 objects that cannot be seeked. The total maximum memory footprint 

133 due to a in-memory download chunks is roughly equal to: 

134 

135 max_in_memory_download_chunks * multipart_chunksize 

136 

137 :param max_bandwidth: The maximum bandwidth that will be consumed 

138 in uploading and downloading file content. The value is in terms of 

139 bytes per second. 

140 """ 

141 self.multipart_threshold = multipart_threshold 

142 self.multipart_chunksize = multipart_chunksize 

143 self.max_request_concurrency = max_request_concurrency 

144 self.max_submission_concurrency = max_submission_concurrency 

145 self.max_request_queue_size = max_request_queue_size 

146 self.max_submission_queue_size = max_submission_queue_size 

147 self.max_io_queue_size = max_io_queue_size 

148 self.io_chunksize = io_chunksize 

149 self.num_download_attempts = num_download_attempts 

150 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks 

151 self.max_in_memory_download_chunks = max_in_memory_download_chunks 

152 self.max_bandwidth = max_bandwidth 

153 self._validate_attrs_are_nonzero() 

154 

155 def _validate_attrs_are_nonzero(self): 

156 for attr, attr_val in self.__dict__.items(): 

157 if ( 

158 attr_val is not None 

159 and attr_val is not self.UNSET_DEFAULT 

160 and attr_val <= 0 

161 ): 

162 raise ValueError( 

163 f'Provided parameter {attr} of value {attr_val} must ' 

164 'be greater than 0.' 

165 ) 

166 

167 def get_deep_attr(self, item): 

168 return object.__getattribute__(self, item) 

169 

170 

171class TransferManager: 

172 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS 

173 

174 _ALLOWED_SHARED_ARGS = [ 

175 'ACL', 

176 'CacheControl', 

177 'ChecksumAlgorithm', 

178 'ContentDisposition', 

179 'ContentEncoding', 

180 'ContentLanguage', 

181 'ContentType', 

182 'ExpectedBucketOwner', 

183 'Expires', 

184 'GrantFullControl', 

185 'GrantRead', 

186 'GrantReadACP', 

187 'GrantWriteACP', 

188 'Metadata', 

189 'ObjectLockLegalHoldStatus', 

190 'ObjectLockMode', 

191 'ObjectLockRetainUntilDate', 

192 'RequestPayer', 

193 'ServerSideEncryption', 

194 'StorageClass', 

195 'SSECustomerAlgorithm', 

196 'SSECustomerKey', 

197 'SSECustomerKeyMD5', 

198 'SSEKMSKeyId', 

199 'SSEKMSEncryptionContext', 

200 'Tagging', 

201 'WebsiteRedirectLocation', 

202 ] 

203 

204 ALLOWED_UPLOAD_ARGS = ( 

205 _ALLOWED_SHARED_ARGS 

206 + [ 

207 'ChecksumType', 

208 'MpuObjectSize', 

209 ] 

210 + FULL_OBJECT_CHECKSUM_ARGS 

211 ) 

212 

213 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [ 

214 'CopySourceIfMatch', 

215 'CopySourceIfModifiedSince', 

216 'CopySourceIfNoneMatch', 

217 'CopySourceIfUnmodifiedSince', 

218 'CopySourceSSECustomerAlgorithm', 

219 'CopySourceSSECustomerKey', 

220 'CopySourceSSECustomerKeyMD5', 

221 'MetadataDirective', 

222 'TaggingDirective', 

223 ] 

224 

225 ALLOWED_DELETE_ARGS = [ 

226 'MFA', 

227 'VersionId', 

228 'RequestPayer', 

229 'ExpectedBucketOwner', 

230 ] 

231 

232 VALIDATE_SUPPORTED_BUCKET_VALUES = True 

233 

234 _UNSUPPORTED_BUCKET_PATTERNS = { 

235 'S3 Object Lambda': re.compile( 

236 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:' 

237 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}' 

238 ), 

239 } 

240 

241 def __init__(self, client, config=None, osutil=None, executor_cls=None): 

242 """A transfer manager interface for Amazon S3 

243 

244 :param client: Client to be used by the manager 

245 :param config: TransferConfig to associate specific configurations 

246 :param osutil: OSUtils object to use for os-related behavior when 

247 using with transfer manager. 

248 

249 :type executor_cls: s3transfer.futures.BaseExecutor 

250 :param executor_cls: The class of executor to use with the transfer 

251 manager. By default, concurrent.futures.ThreadPoolExecutor is used. 

252 """ 

253 self._client = client 

254 self._config = config 

255 if config is None: 

256 self._config = TransferConfig() 

257 self._osutil = osutil 

258 if osutil is None: 

259 self._osutil = OSUtils() 

260 self._coordinator_controller = TransferCoordinatorController() 

261 # A counter to create unique id's for each transfer submitted. 

262 self._id_counter = 0 

263 

264 # The executor responsible for making S3 API transfer requests 

265 self._request_executor = BoundedExecutor( 

266 max_size=self._config.max_request_queue_size, 

267 max_num_threads=self._config.max_request_concurrency, 

268 tag_semaphores={ 

269 IN_MEMORY_UPLOAD_TAG: TaskSemaphore( 

270 self._config.max_in_memory_upload_chunks 

271 ), 

272 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore( 

273 self._config.max_in_memory_download_chunks 

274 ), 

275 }, 

276 executor_cls=executor_cls, 

277 ) 

278 

279 # The executor responsible for submitting the necessary tasks to 

280 # perform the desired transfer 

281 self._submission_executor = BoundedExecutor( 

282 max_size=self._config.max_submission_queue_size, 

283 max_num_threads=self._config.max_submission_concurrency, 

284 executor_cls=executor_cls, 

285 ) 

286 

287 # There is one thread available for writing to disk. It will handle 

288 # downloads for all files. 

289 self._io_executor = BoundedExecutor( 

290 max_size=self._config.max_io_queue_size, 

291 max_num_threads=1, 

292 executor_cls=executor_cls, 

293 ) 

294 

295 # The component responsible for limiting bandwidth usage if it 

296 # is configured. 

297 self._bandwidth_limiter = None 

298 if self._config.max_bandwidth is not None: 

299 logger.debug( 

300 'Setting max_bandwidth to %s', self._config.max_bandwidth 

301 ) 

302 leaky_bucket = LeakyBucket(self._config.max_bandwidth) 

303 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket) 

304 

305 self._register_handlers() 

306 

307 @property 

308 def client(self): 

309 return self._client 

310 

311 @property 

312 def config(self): 

313 return self._config 

314 

315 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): 

316 """Uploads a file to S3 

317 

318 :type fileobj: str or seekable file-like object 

319 :param fileobj: The name of a file to upload or a seekable file-like 

320 object to upload. It is recommended to use a filename because 

321 file-like objects may result in higher memory usage. 

322 

323 :type bucket: str 

324 :param bucket: The name of the bucket to upload to 

325 

326 :type key: str 

327 :param key: The name of the key to upload to 

328 

329 :type extra_args: dict 

330 :param extra_args: Extra arguments that may be passed to the 

331 client operation 

332 

333 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

334 :param subscribers: The list of subscribers to be invoked in the 

335 order provided based on the event emit during the process of 

336 the transfer request. 

337 

338 :rtype: s3transfer.futures.TransferFuture 

339 :returns: Transfer future representing the upload 

340 """ 

341 

342 extra_args = extra_args.copy() if extra_args else {} 

343 if subscribers is None: 

344 subscribers = [] 

345 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

346 self._validate_if_bucket_supported(bucket) 

347 self._add_operation_defaults(extra_args) 

348 call_args = CallArgs( 

349 fileobj=fileobj, 

350 bucket=bucket, 

351 key=key, 

352 extra_args=extra_args, 

353 subscribers=subscribers, 

354 ) 

355 extra_main_kwargs = {} 

356 if self._bandwidth_limiter: 

357 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

358 return self._submit_transfer( 

359 call_args, UploadSubmissionTask, extra_main_kwargs 

360 ) 

361 

362 def download( 

363 self, bucket, key, fileobj, extra_args=None, subscribers=None 

364 ): 

365 """Downloads a file from S3 

366 

367 :type bucket: str 

368 :param bucket: The name of the bucket to download from 

369 

370 :type key: str 

371 :param key: The name of the key to download from 

372 

373 :type fileobj: str or seekable file-like object 

374 :param fileobj: The name of a file to download or a seekable file-like 

375 object to download. It is recommended to use a filename because 

376 file-like objects may result in higher memory usage. 

377 

378 :type extra_args: dict 

379 :param extra_args: Extra arguments that may be passed to the 

380 client operation 

381 

382 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

383 :param subscribers: The list of subscribers to be invoked in the 

384 order provided based on the event emit during the process of 

385 the transfer request. 

386 

387 :rtype: s3transfer.futures.TransferFuture 

388 :returns: Transfer future representing the download 

389 """ 

390 if extra_args is None: 

391 extra_args = {} 

392 if subscribers is None: 

393 subscribers = [] 

394 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

395 self._validate_if_bucket_supported(bucket) 

396 call_args = CallArgs( 

397 bucket=bucket, 

398 key=key, 

399 fileobj=fileobj, 

400 extra_args=extra_args, 

401 subscribers=subscribers, 

402 ) 

403 extra_main_kwargs = {'io_executor': self._io_executor} 

404 if self._bandwidth_limiter: 

405 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

406 return self._submit_transfer( 

407 call_args, DownloadSubmissionTask, extra_main_kwargs 

408 ) 

409 

410 def copy( 

411 self, 

412 copy_source, 

413 bucket, 

414 key, 

415 extra_args=None, 

416 subscribers=None, 

417 source_client=None, 

418 ): 

419 """Copies a file in S3 

420 

421 :type copy_source: dict 

422 :param copy_source: The name of the source bucket, key name of the 

423 source object, and optional version ID of the source object. The 

424 dictionary format is: 

425 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note 

426 that the ``VersionId`` key is optional and may be omitted. 

427 

428 :type bucket: str 

429 :param bucket: The name of the bucket to copy to 

430 

431 :type key: str 

432 :param key: The name of the key to copy to 

433 

434 :type extra_args: dict 

435 :param extra_args: Extra arguments that may be passed to the 

436 client operation 

437 

438 :type subscribers: a list of subscribers 

439 :param subscribers: The list of subscribers to be invoked in the 

440 order provided based on the event emit during the process of 

441 the transfer request. 

442 

443 :type source_client: botocore or boto3 Client 

444 :param source_client: The client to be used for operation that 

445 may happen at the source object. For example, this client is 

446 used for the head_object that determines the size of the copy. 

447 If no client is provided, the transfer manager's client is used 

448 as the client for the source object. 

449 

450 :rtype: s3transfer.futures.TransferFuture 

451 :returns: Transfer future representing the copy 

452 """ 

453 if extra_args is None: 

454 extra_args = {} 

455 if subscribers is None: 

456 subscribers = [] 

457 if source_client is None: 

458 source_client = self._client 

459 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS) 

460 if isinstance(copy_source, dict): 

461 self._validate_if_bucket_supported(copy_source.get('Bucket')) 

462 self._validate_if_bucket_supported(bucket) 

463 call_args = CallArgs( 

464 copy_source=copy_source, 

465 bucket=bucket, 

466 key=key, 

467 extra_args=extra_args, 

468 subscribers=subscribers, 

469 source_client=source_client, 

470 ) 

471 return self._submit_transfer(call_args, CopySubmissionTask) 

472 

473 def delete(self, bucket, key, extra_args=None, subscribers=None): 

474 """Delete an S3 object. 

475 

476 :type bucket: str 

477 :param bucket: The name of the bucket. 

478 

479 :type key: str 

480 :param key: The name of the S3 object to delete. 

481 

482 :type extra_args: dict 

483 :param extra_args: Extra arguments that may be passed to the 

484 DeleteObject call. 

485 

486 :type subscribers: list 

487 :param subscribers: A list of subscribers to be invoked during the 

488 process of the transfer request. Note that the ``on_progress`` 

489 callback is not invoked during object deletion. 

490 

491 :rtype: s3transfer.futures.TransferFuture 

492 :return: Transfer future representing the deletion. 

493 

494 """ 

495 if extra_args is None: 

496 extra_args = {} 

497 if subscribers is None: 

498 subscribers = [] 

499 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) 

500 self._validate_if_bucket_supported(bucket) 

501 call_args = CallArgs( 

502 bucket=bucket, 

503 key=key, 

504 extra_args=extra_args, 

505 subscribers=subscribers, 

506 ) 

507 return self._submit_transfer(call_args, DeleteSubmissionTask) 

508 

509 def _validate_if_bucket_supported(self, bucket): 

510 # s3 high level operations don't support some resources 

511 # (eg. S3 Object Lambda) only direct API calls are available 

512 # for such resources 

513 if self.VALIDATE_SUPPORTED_BUCKET_VALUES: 

514 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): 

515 match = pattern.match(bucket) 

516 if match: 

517 raise ValueError( 

518 f'TransferManager methods do not support {resource} ' 

519 'resource. Use direct client calls instead.' 

520 ) 

521 

522 def _validate_all_known_args(self, actual, allowed): 

523 for kwarg in actual: 

524 if kwarg not in allowed: 

525 raise ValueError( 

526 "Invalid extra_args key '{}', must be one of: {}".format( 

527 kwarg, ', '.join(allowed) 

528 ) 

529 ) 

530 

531 def _add_operation_defaults(self, extra_args): 

532 if ( 

533 self.client.meta.config.request_checksum_calculation 

534 == "when_supported" 

535 ): 

536 set_default_checksum_algorithm(extra_args) 

537 

538 def _submit_transfer( 

539 self, call_args, submission_task_cls, extra_main_kwargs=None 

540 ): 

541 if not extra_main_kwargs: 

542 extra_main_kwargs = {} 

543 

544 # Create a TransferFuture to return back to the user 

545 transfer_future, components = self._get_future_with_components( 

546 call_args 

547 ) 

548 

549 # Add any provided done callbacks to the created transfer future 

550 # to be invoked on the transfer future being complete. 

551 for callback in get_callbacks(transfer_future, 'done'): 

552 components['coordinator'].add_done_callback(callback) 

553 

554 # Get the main kwargs needed to instantiate the submission task 

555 main_kwargs = self._get_submission_task_main_kwargs( 

556 transfer_future, extra_main_kwargs 

557 ) 

558 

559 # Submit a SubmissionTask that will submit all of the necessary 

560 # tasks needed to complete the S3 transfer. 

561 self._submission_executor.submit( 

562 submission_task_cls( 

563 transfer_coordinator=components['coordinator'], 

564 main_kwargs=main_kwargs, 

565 ) 

566 ) 

567 

568 # Increment the unique id counter for future transfer requests 

569 self._id_counter += 1 

570 

571 return transfer_future 

572 

573 def _get_future_with_components(self, call_args): 

574 transfer_id = self._id_counter 

575 # Creates a new transfer future along with its components 

576 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id) 

577 # Track the transfer coordinator for transfers to manage. 

578 self._coordinator_controller.add_transfer_coordinator( 

579 transfer_coordinator 

580 ) 

581 # Also make sure that the transfer coordinator is removed once 

582 # the transfer completes so it does not stick around in memory. 

583 transfer_coordinator.add_done_callback( 

584 self._coordinator_controller.remove_transfer_coordinator, 

585 transfer_coordinator, 

586 ) 

587 components = { 

588 'meta': TransferMeta(call_args, transfer_id=transfer_id), 

589 'coordinator': transfer_coordinator, 

590 } 

591 transfer_future = TransferFuture(**components) 

592 return transfer_future, components 

593 

594 def _get_submission_task_main_kwargs( 

595 self, transfer_future, extra_main_kwargs 

596 ): 

597 main_kwargs = { 

598 'client': self._client, 

599 'config': self._config, 

600 'osutil': self._osutil, 

601 'request_executor': self._request_executor, 

602 'transfer_future': transfer_future, 

603 } 

604 main_kwargs.update(extra_main_kwargs) 

605 return main_kwargs 

606 

607 def _register_handlers(self): 

608 # Register handlers to enable/disable callbacks on uploads. 

609 event_name = 'request-created.s3' 

610 self._client.meta.events.register_first( 

611 event_name, 

612 signal_not_transferring, 

613 unique_id='s3upload-not-transferring', 

614 ) 

615 self._client.meta.events.register_last( 

616 event_name, signal_transferring, unique_id='s3upload-transferring' 

617 ) 

618 

619 def __enter__(self): 

620 return self 

621 

622 def __exit__(self, exc_type, exc_value, *args): 

623 cancel = False 

624 cancel_msg = '' 

625 cancel_exc_type = FatalError 

626 # If a exception was raised in the context handler, signal to cancel 

627 # all of the inprogress futures in the shutdown. 

628 if exc_type: 

629 cancel = True 

630 cancel_msg = str(exc_value) 

631 if not cancel_msg: 

632 cancel_msg = repr(exc_value) 

633 # If it was a KeyboardInterrupt, the cancellation was initiated 

634 # by the user. 

635 if isinstance(exc_value, KeyboardInterrupt): 

636 cancel_exc_type = CancelledError 

637 self._shutdown(cancel, cancel_msg, cancel_exc_type) 

638 

639 def shutdown(self, cancel=False, cancel_msg=''): 

640 """Shutdown the TransferManager 

641 

642 It will wait till all transfers complete before it completely shuts 

643 down. 

644 

645 :type cancel: boolean 

646 :param cancel: If True, calls TransferFuture.cancel() for 

647 all in-progress in transfers. This is useful if you want the 

648 shutdown to happen quicker. 

649 

650 :type cancel_msg: str 

651 :param cancel_msg: The message to specify if canceling all in-progress 

652 transfers. 

653 """ 

654 self._shutdown(cancel, cancel, cancel_msg) 

655 

656 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): 

657 if cancel: 

658 # Cancel all in-flight transfers if requested, before waiting 

659 # for them to complete. 

660 self._coordinator_controller.cancel(cancel_msg, exc_type) 

661 try: 

662 # Wait until there are no more in-progress transfers. This is 

663 # wrapped in a try statement because this can be interrupted 

664 # with a KeyboardInterrupt that needs to be caught. 

665 self._coordinator_controller.wait() 

666 except KeyboardInterrupt: 

667 # If not errors were raised in the try block, the cancel should 

668 # have no coordinators it needs to run cancel on. If there was 

669 # an error raised in the try statement we want to cancel all of 

670 # the inflight transfers before shutting down to speed that 

671 # process up. 

672 self._coordinator_controller.cancel('KeyboardInterrupt()') 

673 raise 

674 finally: 

675 # Shutdown all of the executors. 

676 self._submission_executor.shutdown() 

677 self._request_executor.shutdown() 

678 self._io_executor.shutdown() 

679 

680 

681class TransferCoordinatorController: 

682 def __init__(self): 

683 """Abstraction to control all transfer coordinators 

684 

685 This abstraction allows the manager to wait for inprogress transfers 

686 to complete and cancel all inprogress transfers. 

687 """ 

688 self._lock = threading.Lock() 

689 self._tracked_transfer_coordinators = set() 

690 

691 @property 

692 def tracked_transfer_coordinators(self): 

693 """The set of transfer coordinators being tracked""" 

694 with self._lock: 

695 # We return a copy because the set is mutable and if you were to 

696 # iterate over the set, it may be changing in length due to 

697 # additions and removals of transfer coordinators. 

698 return copy.copy(self._tracked_transfer_coordinators) 

699 

700 def add_transfer_coordinator(self, transfer_coordinator): 

701 """Adds a transfer coordinator of a transfer to be canceled if needed 

702 

703 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

704 :param transfer_coordinator: The transfer coordinator for the 

705 particular transfer 

706 """ 

707 with self._lock: 

708 self._tracked_transfer_coordinators.add(transfer_coordinator) 

709 

710 def remove_transfer_coordinator(self, transfer_coordinator): 

711 """Remove a transfer coordinator from cancellation consideration 

712 

713 Typically, this method is invoked by the transfer coordinator itself 

714 to remove its self when it completes its transfer. 

715 

716 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

717 :param transfer_coordinator: The transfer coordinator for the 

718 particular transfer 

719 """ 

720 with self._lock: 

721 self._tracked_transfer_coordinators.remove(transfer_coordinator) 

722 

723 def cancel(self, msg='', exc_type=CancelledError): 

724 """Cancels all inprogress transfers 

725 

726 This cancels the inprogress transfers by calling cancel() on all 

727 tracked transfer coordinators. 

728 

729 :param msg: The message to pass on to each transfer coordinator that 

730 gets cancelled. 

731 

732 :param exc_type: The type of exception to set for the cancellation 

733 """ 

734 for transfer_coordinator in self.tracked_transfer_coordinators: 

735 transfer_coordinator.cancel(msg, exc_type) 

736 

737 def wait(self): 

738 """Wait until there are no more inprogress transfers 

739 

740 This will not stop when failures are encountered and not propagate any 

741 of these errors from failed transfers, but it can be interrupted with 

742 a KeyboardInterrupt. 

743 """ 

744 try: 

745 transfer_coordinator = None 

746 for transfer_coordinator in self.tracked_transfer_coordinators: 

747 transfer_coordinator.result() 

748 except KeyboardInterrupt: 

749 logger.debug('Received KeyboardInterrupt in wait()') 

750 # If Keyboard interrupt is raised while waiting for 

751 # the result, then exit out of the wait and raise the 

752 # exception 

753 if transfer_coordinator: 

754 logger.debug( 

755 'On KeyboardInterrupt was waiting for %s', 

756 transfer_coordinator, 

757 ) 

758 raise 

759 except Exception: 

760 # A general exception could have been thrown because 

761 # of result(). We just want to ignore this and continue 

762 # because we at least know that the transfer coordinator 

763 # has completed. 

764 pass