Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/manager.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

208 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import re 

16import threading 

17 

18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket 

19from s3transfer.constants import ( 

20 ALLOWED_DOWNLOAD_ARGS, 

21 FULL_OBJECT_CHECKSUM_ARGS, 

22 KB, 

23 MB, 

24) 

25from s3transfer.copies import CopySubmissionTask 

26from s3transfer.delete import DeleteSubmissionTask 

27from s3transfer.download import DownloadSubmissionTask 

28from s3transfer.exceptions import CancelledError, FatalError 

29from s3transfer.futures import ( 

30 IN_MEMORY_DOWNLOAD_TAG, 

31 IN_MEMORY_UPLOAD_TAG, 

32 BoundedExecutor, 

33 TransferCoordinator, 

34 TransferFuture, 

35 TransferMeta, 

36) 

37from s3transfer.upload import UploadSubmissionTask 

38from s3transfer.utils import ( 

39 CallArgs, 

40 OSUtils, 

41 SlidingWindowSemaphore, 

42 TaskSemaphore, 

43 get_callbacks, 

44 set_default_checksum_algorithm, 

45 signal_not_transferring, 

46 signal_transferring, 

47) 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52class TransferConfig: 

53 def __init__( 

54 self, 

55 multipart_threshold=8 * MB, 

56 multipart_chunksize=8 * MB, 

57 max_request_concurrency=10, 

58 max_submission_concurrency=5, 

59 max_request_queue_size=1000, 

60 max_submission_queue_size=1000, 

61 max_io_queue_size=1000, 

62 io_chunksize=256 * KB, 

63 num_download_attempts=5, 

64 max_in_memory_upload_chunks=10, 

65 max_in_memory_download_chunks=10, 

66 max_bandwidth=None, 

67 ): 

68 """Configurations for the transfer manager 

69 

70 :param multipart_threshold: The threshold for which multipart 

71 transfers occur. 

72 

73 :param max_request_concurrency: The maximum number of S3 API 

74 transfer-related requests that can happen at a time. 

75 

76 :param max_submission_concurrency: The maximum number of threads 

77 processing a call to a TransferManager method. Processing a 

78 call usually entails determining which S3 API requests that need 

79 to be enqueued, but does **not** entail making any of the 

80 S3 API data transferring requests needed to perform the transfer. 

81 The threads controlled by ``max_request_concurrency`` is 

82 responsible for that. 

83 

84 :param multipart_chunksize: The size of each transfer if a request 

85 becomes a multipart transfer. 

86 

87 :param max_request_queue_size: The maximum amount of S3 API requests 

88 that can be queued at a time. 

89 

90 :param max_submission_queue_size: The maximum amount of 

91 TransferManager method calls that can be queued at a time. 

92 

93 :param max_io_queue_size: The maximum amount of read parts that 

94 can be queued to be written to disk per download. The default 

95 size for each elementin this queue is 8 KB. 

96 

97 :param io_chunksize: The max size of each chunk in the io queue. 

98 Currently, this is size used when reading from the downloaded 

99 stream as well. 

100 

101 :param num_download_attempts: The number of download attempts that 

102 will be tried upon errors with downloading an object in S3. Note 

103 that these retries account for errors that occur when streaming 

104 down the data from s3 (i.e. socket errors and read timeouts that 

105 occur after receiving an OK response from s3). 

106 Other retryable exceptions such as throttling errors and 5xx errors 

107 are already retried by botocore (this default is 5). The 

108 ``num_download_attempts`` does not take into account the 

109 number of exceptions retried by botocore. 

110 

111 :param max_in_memory_upload_chunks: The number of chunks that can 

112 be stored in memory at a time for all ongoing upload requests. 

113 This pertains to chunks of data that need to be stored in memory 

114 during an upload if the data is sourced from a file-like object. 

115 The total maximum memory footprint due to a in-memory upload 

116 chunks is roughly equal to: 

117 

118 max_in_memory_upload_chunks * multipart_chunksize 

119 + max_submission_concurrency * multipart_chunksize 

120 

121 ``max_submission_concurrency`` has an affect on this value because 

122 for each thread pulling data off of a file-like object, they may 

123 be waiting with a single read chunk to be submitted for upload 

124 because the ``max_in_memory_upload_chunks`` value has been reached 

125 by the threads making the upload request. 

126 

127 :param max_in_memory_download_chunks: The number of chunks that can 

128 be buffered in memory and **not** in the io queue at a time for all 

129 ongoing download requests. This pertains specifically to file-like 

130 objects that cannot be seeked. The total maximum memory footprint 

131 due to a in-memory download chunks is roughly equal to: 

132 

133 max_in_memory_download_chunks * multipart_chunksize 

134 

135 :param max_bandwidth: The maximum bandwidth that will be consumed 

136 in uploading and downloading file content. The value is in terms of 

137 bytes per second. 

138 """ 

139 self.multipart_threshold = multipart_threshold 

140 self.multipart_chunksize = multipart_chunksize 

141 self.max_request_concurrency = max_request_concurrency 

142 self.max_submission_concurrency = max_submission_concurrency 

143 self.max_request_queue_size = max_request_queue_size 

144 self.max_submission_queue_size = max_submission_queue_size 

145 self.max_io_queue_size = max_io_queue_size 

146 self.io_chunksize = io_chunksize 

147 self.num_download_attempts = num_download_attempts 

148 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks 

149 self.max_in_memory_download_chunks = max_in_memory_download_chunks 

150 self.max_bandwidth = max_bandwidth 

151 self._validate_attrs_are_nonzero() 

152 

153 def _validate_attrs_are_nonzero(self): 

154 for attr, attr_val in self.__dict__.items(): 

155 if attr_val is not None and attr_val <= 0: 

156 raise ValueError( 

157 f'Provided parameter {attr} of value {attr_val} must ' 

158 'be greater than 0.' 

159 ) 

160 

161 

162class TransferManager: 

163 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS 

164 

165 _ALLOWED_SHARED_ARGS = [ 

166 'ACL', 

167 'CacheControl', 

168 'ChecksumAlgorithm', 

169 'ContentDisposition', 

170 'ContentEncoding', 

171 'ContentLanguage', 

172 'ContentType', 

173 'ExpectedBucketOwner', 

174 'Expires', 

175 'GrantFullControl', 

176 'GrantRead', 

177 'GrantReadACP', 

178 'GrantWriteACP', 

179 'Metadata', 

180 'ObjectLockLegalHoldStatus', 

181 'ObjectLockMode', 

182 'ObjectLockRetainUntilDate', 

183 'RequestPayer', 

184 'ServerSideEncryption', 

185 'StorageClass', 

186 'SSECustomerAlgorithm', 

187 'SSECustomerKey', 

188 'SSECustomerKeyMD5', 

189 'SSEKMSKeyId', 

190 'SSEKMSEncryptionContext', 

191 'Tagging', 

192 'WebsiteRedirectLocation', 

193 ] 

194 

195 ALLOWED_UPLOAD_ARGS = ( 

196 _ALLOWED_SHARED_ARGS 

197 + [ 

198 'ChecksumType', 

199 'MpuObjectSize', 

200 ] 

201 + FULL_OBJECT_CHECKSUM_ARGS 

202 ) 

203 

204 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [ 

205 'CopySourceIfMatch', 

206 'CopySourceIfModifiedSince', 

207 'CopySourceIfNoneMatch', 

208 'CopySourceIfUnmodifiedSince', 

209 'CopySourceSSECustomerAlgorithm', 

210 'CopySourceSSECustomerKey', 

211 'CopySourceSSECustomerKeyMD5', 

212 'MetadataDirective', 

213 'TaggingDirective', 

214 ] 

215 

216 ALLOWED_DELETE_ARGS = [ 

217 'MFA', 

218 'VersionId', 

219 'RequestPayer', 

220 'ExpectedBucketOwner', 

221 ] 

222 

223 VALIDATE_SUPPORTED_BUCKET_VALUES = True 

224 

225 _UNSUPPORTED_BUCKET_PATTERNS = { 

226 'S3 Object Lambda': re.compile( 

227 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:' 

228 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}' 

229 ), 

230 } 

231 

232 def __init__(self, client, config=None, osutil=None, executor_cls=None): 

233 """A transfer manager interface for Amazon S3 

234 

235 :param client: Client to be used by the manager 

236 :param config: TransferConfig to associate specific configurations 

237 :param osutil: OSUtils object to use for os-related behavior when 

238 using with transfer manager. 

239 

240 :type executor_cls: s3transfer.futures.BaseExecutor 

241 :param executor_cls: The class of executor to use with the transfer 

242 manager. By default, concurrent.futures.ThreadPoolExecutor is used. 

243 """ 

244 self._client = client 

245 self._config = config 

246 if config is None: 

247 self._config = TransferConfig() 

248 self._osutil = osutil 

249 if osutil is None: 

250 self._osutil = OSUtils() 

251 self._coordinator_controller = TransferCoordinatorController() 

252 # A counter to create unique id's for each transfer submitted. 

253 self._id_counter = 0 

254 

255 # The executor responsible for making S3 API transfer requests 

256 self._request_executor = BoundedExecutor( 

257 max_size=self._config.max_request_queue_size, 

258 max_num_threads=self._config.max_request_concurrency, 

259 tag_semaphores={ 

260 IN_MEMORY_UPLOAD_TAG: TaskSemaphore( 

261 self._config.max_in_memory_upload_chunks 

262 ), 

263 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore( 

264 self._config.max_in_memory_download_chunks 

265 ), 

266 }, 

267 executor_cls=executor_cls, 

268 ) 

269 

270 # The executor responsible for submitting the necessary tasks to 

271 # perform the desired transfer 

272 self._submission_executor = BoundedExecutor( 

273 max_size=self._config.max_submission_queue_size, 

274 max_num_threads=self._config.max_submission_concurrency, 

275 executor_cls=executor_cls, 

276 ) 

277 

278 # There is one thread available for writing to disk. It will handle 

279 # downloads for all files. 

280 self._io_executor = BoundedExecutor( 

281 max_size=self._config.max_io_queue_size, 

282 max_num_threads=1, 

283 executor_cls=executor_cls, 

284 ) 

285 

286 # The component responsible for limiting bandwidth usage if it 

287 # is configured. 

288 self._bandwidth_limiter = None 

289 if self._config.max_bandwidth is not None: 

290 logger.debug( 

291 'Setting max_bandwidth to %s', self._config.max_bandwidth 

292 ) 

293 leaky_bucket = LeakyBucket(self._config.max_bandwidth) 

294 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket) 

295 

296 self._register_handlers() 

297 

298 @property 

299 def client(self): 

300 return self._client 

301 

302 @property 

303 def config(self): 

304 return self._config 

305 

306 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): 

307 """Uploads a file to S3 

308 

309 :type fileobj: str or seekable file-like object 

310 :param fileobj: The name of a file to upload or a seekable file-like 

311 object to upload. It is recommended to use a filename because 

312 file-like objects may result in higher memory usage. 

313 

314 :type bucket: str 

315 :param bucket: The name of the bucket to upload to 

316 

317 :type key: str 

318 :param key: The name of the key to upload to 

319 

320 :type extra_args: dict 

321 :param extra_args: Extra arguments that may be passed to the 

322 client operation 

323 

324 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

325 :param subscribers: The list of subscribers to be invoked in the 

326 order provided based on the event emit during the process of 

327 the transfer request. 

328 

329 :rtype: s3transfer.futures.TransferFuture 

330 :returns: Transfer future representing the upload 

331 """ 

332 

333 extra_args = extra_args.copy() if extra_args else {} 

334 if subscribers is None: 

335 subscribers = [] 

336 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

337 self._validate_if_bucket_supported(bucket) 

338 self._add_operation_defaults(extra_args) 

339 call_args = CallArgs( 

340 fileobj=fileobj, 

341 bucket=bucket, 

342 key=key, 

343 extra_args=extra_args, 

344 subscribers=subscribers, 

345 ) 

346 extra_main_kwargs = {} 

347 if self._bandwidth_limiter: 

348 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

349 return self._submit_transfer( 

350 call_args, UploadSubmissionTask, extra_main_kwargs 

351 ) 

352 

353 def download( 

354 self, bucket, key, fileobj, extra_args=None, subscribers=None 

355 ): 

356 """Downloads a file from S3 

357 

358 :type bucket: str 

359 :param bucket: The name of the bucket to download from 

360 

361 :type key: str 

362 :param key: The name of the key to download from 

363 

364 :type fileobj: str or seekable file-like object 

365 :param fileobj: The name of a file to download or a seekable file-like 

366 object to download. It is recommended to use a filename because 

367 file-like objects may result in higher memory usage. 

368 

369 :type extra_args: dict 

370 :param extra_args: Extra arguments that may be passed to the 

371 client operation 

372 

373 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

374 :param subscribers: The list of subscribers to be invoked in the 

375 order provided based on the event emit during the process of 

376 the transfer request. 

377 

378 :rtype: s3transfer.futures.TransferFuture 

379 :returns: Transfer future representing the download 

380 """ 

381 if extra_args is None: 

382 extra_args = {} 

383 if subscribers is None: 

384 subscribers = [] 

385 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

386 self._validate_if_bucket_supported(bucket) 

387 call_args = CallArgs( 

388 bucket=bucket, 

389 key=key, 

390 fileobj=fileobj, 

391 extra_args=extra_args, 

392 subscribers=subscribers, 

393 ) 

394 extra_main_kwargs = {'io_executor': self._io_executor} 

395 if self._bandwidth_limiter: 

396 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

397 return self._submit_transfer( 

398 call_args, DownloadSubmissionTask, extra_main_kwargs 

399 ) 

400 

401 def copy( 

402 self, 

403 copy_source, 

404 bucket, 

405 key, 

406 extra_args=None, 

407 subscribers=None, 

408 source_client=None, 

409 ): 

410 """Copies a file in S3 

411 

412 :type copy_source: dict 

413 :param copy_source: The name of the source bucket, key name of the 

414 source object, and optional version ID of the source object. The 

415 dictionary format is: 

416 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note 

417 that the ``VersionId`` key is optional and may be omitted. 

418 

419 :type bucket: str 

420 :param bucket: The name of the bucket to copy to 

421 

422 :type key: str 

423 :param key: The name of the key to copy to 

424 

425 :type extra_args: dict 

426 :param extra_args: Extra arguments that may be passed to the 

427 client operation 

428 

429 :type subscribers: a list of subscribers 

430 :param subscribers: The list of subscribers to be invoked in the 

431 order provided based on the event emit during the process of 

432 the transfer request. 

433 

434 :type source_client: botocore or boto3 Client 

435 :param source_client: The client to be used for operation that 

436 may happen at the source object. For example, this client is 

437 used for the head_object that determines the size of the copy. 

438 If no client is provided, the transfer manager's client is used 

439 as the client for the source object. 

440 

441 :rtype: s3transfer.futures.TransferFuture 

442 :returns: Transfer future representing the copy 

443 """ 

444 if extra_args is None: 

445 extra_args = {} 

446 if subscribers is None: 

447 subscribers = [] 

448 if source_client is None: 

449 source_client = self._client 

450 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS) 

451 if isinstance(copy_source, dict): 

452 self._validate_if_bucket_supported(copy_source.get('Bucket')) 

453 self._validate_if_bucket_supported(bucket) 

454 call_args = CallArgs( 

455 copy_source=copy_source, 

456 bucket=bucket, 

457 key=key, 

458 extra_args=extra_args, 

459 subscribers=subscribers, 

460 source_client=source_client, 

461 ) 

462 return self._submit_transfer(call_args, CopySubmissionTask) 

463 

464 def delete(self, bucket, key, extra_args=None, subscribers=None): 

465 """Delete an S3 object. 

466 

467 :type bucket: str 

468 :param bucket: The name of the bucket. 

469 

470 :type key: str 

471 :param key: The name of the S3 object to delete. 

472 

473 :type extra_args: dict 

474 :param extra_args: Extra arguments that may be passed to the 

475 DeleteObject call. 

476 

477 :type subscribers: list 

478 :param subscribers: A list of subscribers to be invoked during the 

479 process of the transfer request. Note that the ``on_progress`` 

480 callback is not invoked during object deletion. 

481 

482 :rtype: s3transfer.futures.TransferFuture 

483 :return: Transfer future representing the deletion. 

484 

485 """ 

486 if extra_args is None: 

487 extra_args = {} 

488 if subscribers is None: 

489 subscribers = [] 

490 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) 

491 self._validate_if_bucket_supported(bucket) 

492 call_args = CallArgs( 

493 bucket=bucket, 

494 key=key, 

495 extra_args=extra_args, 

496 subscribers=subscribers, 

497 ) 

498 return self._submit_transfer(call_args, DeleteSubmissionTask) 

499 

500 def _validate_if_bucket_supported(self, bucket): 

501 # s3 high level operations don't support some resources 

502 # (eg. S3 Object Lambda) only direct API calls are available 

503 # for such resources 

504 if self.VALIDATE_SUPPORTED_BUCKET_VALUES: 

505 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): 

506 match = pattern.match(bucket) 

507 if match: 

508 raise ValueError( 

509 f'TransferManager methods do not support {resource} ' 

510 'resource. Use direct client calls instead.' 

511 ) 

512 

513 def _validate_all_known_args(self, actual, allowed): 

514 for kwarg in actual: 

515 if kwarg not in allowed: 

516 raise ValueError( 

517 "Invalid extra_args key '{}', must be one of: {}".format( 

518 kwarg, ', '.join(allowed) 

519 ) 

520 ) 

521 

522 def _add_operation_defaults(self, extra_args): 

523 if ( 

524 self.client.meta.config.request_checksum_calculation 

525 == "when_supported" 

526 ): 

527 set_default_checksum_algorithm(extra_args) 

528 

529 def _submit_transfer( 

530 self, call_args, submission_task_cls, extra_main_kwargs=None 

531 ): 

532 if not extra_main_kwargs: 

533 extra_main_kwargs = {} 

534 

535 # Create a TransferFuture to return back to the user 

536 transfer_future, components = self._get_future_with_components( 

537 call_args 

538 ) 

539 

540 # Add any provided done callbacks to the created transfer future 

541 # to be invoked on the transfer future being complete. 

542 for callback in get_callbacks(transfer_future, 'done'): 

543 components['coordinator'].add_done_callback(callback) 

544 

545 # Get the main kwargs needed to instantiate the submission task 

546 main_kwargs = self._get_submission_task_main_kwargs( 

547 transfer_future, extra_main_kwargs 

548 ) 

549 

550 # Submit a SubmissionTask that will submit all of the necessary 

551 # tasks needed to complete the S3 transfer. 

552 self._submission_executor.submit( 

553 submission_task_cls( 

554 transfer_coordinator=components['coordinator'], 

555 main_kwargs=main_kwargs, 

556 ) 

557 ) 

558 

559 # Increment the unique id counter for future transfer requests 

560 self._id_counter += 1 

561 

562 return transfer_future 

563 

564 def _get_future_with_components(self, call_args): 

565 transfer_id = self._id_counter 

566 # Creates a new transfer future along with its components 

567 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id) 

568 # Track the transfer coordinator for transfers to manage. 

569 self._coordinator_controller.add_transfer_coordinator( 

570 transfer_coordinator 

571 ) 

572 # Also make sure that the transfer coordinator is removed once 

573 # the transfer completes so it does not stick around in memory. 

574 transfer_coordinator.add_done_callback( 

575 self._coordinator_controller.remove_transfer_coordinator, 

576 transfer_coordinator, 

577 ) 

578 components = { 

579 'meta': TransferMeta(call_args, transfer_id=transfer_id), 

580 'coordinator': transfer_coordinator, 

581 } 

582 transfer_future = TransferFuture(**components) 

583 return transfer_future, components 

584 

585 def _get_submission_task_main_kwargs( 

586 self, transfer_future, extra_main_kwargs 

587 ): 

588 main_kwargs = { 

589 'client': self._client, 

590 'config': self._config, 

591 'osutil': self._osutil, 

592 'request_executor': self._request_executor, 

593 'transfer_future': transfer_future, 

594 } 

595 main_kwargs.update(extra_main_kwargs) 

596 return main_kwargs 

597 

598 def _register_handlers(self): 

599 # Register handlers to enable/disable callbacks on uploads. 

600 event_name = 'request-created.s3' 

601 self._client.meta.events.register_first( 

602 event_name, 

603 signal_not_transferring, 

604 unique_id='s3upload-not-transferring', 

605 ) 

606 self._client.meta.events.register_last( 

607 event_name, signal_transferring, unique_id='s3upload-transferring' 

608 ) 

609 

610 def __enter__(self): 

611 return self 

612 

613 def __exit__(self, exc_type, exc_value, *args): 

614 cancel = False 

615 cancel_msg = '' 

616 cancel_exc_type = FatalError 

617 # If a exception was raised in the context handler, signal to cancel 

618 # all of the inprogress futures in the shutdown. 

619 if exc_type: 

620 cancel = True 

621 cancel_msg = str(exc_value) 

622 if not cancel_msg: 

623 cancel_msg = repr(exc_value) 

624 # If it was a KeyboardInterrupt, the cancellation was initiated 

625 # by the user. 

626 if isinstance(exc_value, KeyboardInterrupt): 

627 cancel_exc_type = CancelledError 

628 self._shutdown(cancel, cancel_msg, cancel_exc_type) 

629 

630 def shutdown(self, cancel=False, cancel_msg=''): 

631 """Shutdown the TransferManager 

632 

633 It will wait till all transfers complete before it completely shuts 

634 down. 

635 

636 :type cancel: boolean 

637 :param cancel: If True, calls TransferFuture.cancel() for 

638 all in-progress in transfers. This is useful if you want the 

639 shutdown to happen quicker. 

640 

641 :type cancel_msg: str 

642 :param cancel_msg: The message to specify if canceling all in-progress 

643 transfers. 

644 """ 

645 self._shutdown(cancel, cancel, cancel_msg) 

646 

647 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): 

648 if cancel: 

649 # Cancel all in-flight transfers if requested, before waiting 

650 # for them to complete. 

651 self._coordinator_controller.cancel(cancel_msg, exc_type) 

652 try: 

653 # Wait until there are no more in-progress transfers. This is 

654 # wrapped in a try statement because this can be interrupted 

655 # with a KeyboardInterrupt that needs to be caught. 

656 self._coordinator_controller.wait() 

657 except KeyboardInterrupt: 

658 # If not errors were raised in the try block, the cancel should 

659 # have no coordinators it needs to run cancel on. If there was 

660 # an error raised in the try statement we want to cancel all of 

661 # the inflight transfers before shutting down to speed that 

662 # process up. 

663 self._coordinator_controller.cancel('KeyboardInterrupt()') 

664 raise 

665 finally: 

666 # Shutdown all of the executors. 

667 self._submission_executor.shutdown() 

668 self._request_executor.shutdown() 

669 self._io_executor.shutdown() 

670 

671 

672class TransferCoordinatorController: 

673 def __init__(self): 

674 """Abstraction to control all transfer coordinators 

675 

676 This abstraction allows the manager to wait for inprogress transfers 

677 to complete and cancel all inprogress transfers. 

678 """ 

679 self._lock = threading.Lock() 

680 self._tracked_transfer_coordinators = set() 

681 

682 @property 

683 def tracked_transfer_coordinators(self): 

684 """The set of transfer coordinators being tracked""" 

685 with self._lock: 

686 # We return a copy because the set is mutable and if you were to 

687 # iterate over the set, it may be changing in length due to 

688 # additions and removals of transfer coordinators. 

689 return copy.copy(self._tracked_transfer_coordinators) 

690 

691 def add_transfer_coordinator(self, transfer_coordinator): 

692 """Adds a transfer coordinator of a transfer to be canceled if needed 

693 

694 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

695 :param transfer_coordinator: The transfer coordinator for the 

696 particular transfer 

697 """ 

698 with self._lock: 

699 self._tracked_transfer_coordinators.add(transfer_coordinator) 

700 

701 def remove_transfer_coordinator(self, transfer_coordinator): 

702 """Remove a transfer coordinator from cancellation consideration 

703 

704 Typically, this method is invoked by the transfer coordinator itself 

705 to remove its self when it completes its transfer. 

706 

707 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

708 :param transfer_coordinator: The transfer coordinator for the 

709 particular transfer 

710 """ 

711 with self._lock: 

712 self._tracked_transfer_coordinators.remove(transfer_coordinator) 

713 

714 def cancel(self, msg='', exc_type=CancelledError): 

715 """Cancels all inprogress transfers 

716 

717 This cancels the inprogress transfers by calling cancel() on all 

718 tracked transfer coordinators. 

719 

720 :param msg: The message to pass on to each transfer coordinator that 

721 gets cancelled. 

722 

723 :param exc_type: The type of exception to set for the cancellation 

724 """ 

725 for transfer_coordinator in self.tracked_transfer_coordinators: 

726 transfer_coordinator.cancel(msg, exc_type) 

727 

728 def wait(self): 

729 """Wait until there are no more inprogress transfers 

730 

731 This will not stop when failures are encountered and not propagate any 

732 of these errors from failed transfers, but it can be interrupted with 

733 a KeyboardInterrupt. 

734 """ 

735 try: 

736 transfer_coordinator = None 

737 for transfer_coordinator in self.tracked_transfer_coordinators: 

738 transfer_coordinator.result() 

739 except KeyboardInterrupt: 

740 logger.debug('Received KeyboardInterrupt in wait()') 

741 # If Keyboard interrupt is raised while waiting for 

742 # the result, then exit out of the wait and raise the 

743 # exception 

744 if transfer_coordinator: 

745 logger.debug( 

746 'On KeyboardInterrupt was waiting for %s', 

747 transfer_coordinator, 

748 ) 

749 raise 

750 except Exception: 

751 # A general exception could have been thrown because 

752 # of result(). We just want to ignore this and continue 

753 # because we at least know that the transfer coordinator 

754 # has completed. 

755 pass