Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/manager.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

208 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import re 

16import threading 

17 

18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket 

19from s3transfer.constants import ( 

20 ALLOWED_DOWNLOAD_ARGS, 

21 FULL_OBJECT_CHECKSUM_ARGS, 

22 KB, 

23 MB, 

24) 

25from s3transfer.copies import CopySubmissionTask 

26from s3transfer.delete import DeleteSubmissionTask 

27from s3transfer.download import DownloadSubmissionTask 

28from s3transfer.exceptions import CancelledError, FatalError 

29from s3transfer.futures import ( 

30 IN_MEMORY_DOWNLOAD_TAG, 

31 IN_MEMORY_UPLOAD_TAG, 

32 BoundedExecutor, 

33 TransferCoordinator, 

34 TransferFuture, 

35 TransferMeta, 

36) 

37from s3transfer.upload import UploadSubmissionTask 

38from s3transfer.utils import ( 

39 CallArgs, 

40 OSUtils, 

41 SlidingWindowSemaphore, 

42 TaskSemaphore, 

43 get_callbacks, 

44 set_default_checksum_algorithm, 

45 signal_not_transferring, 

46 signal_transferring, 

47) 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52class TransferConfig: 

53 def __init__( 

54 self, 

55 multipart_threshold=8 * MB, 

56 multipart_chunksize=8 * MB, 

57 max_request_concurrency=10, 

58 max_submission_concurrency=5, 

59 max_request_queue_size=1000, 

60 max_submission_queue_size=1000, 

61 max_io_queue_size=1000, 

62 io_chunksize=256 * KB, 

63 num_download_attempts=5, 

64 max_in_memory_upload_chunks=10, 

65 max_in_memory_download_chunks=10, 

66 max_bandwidth=None, 

67 ): 

68 """Configurations for the transfer manager 

69 

70 :param multipart_threshold: The threshold for which multipart 

71 transfers occur. 

72 

73 :param max_request_concurrency: The maximum number of S3 API 

74 transfer-related requests that can happen at a time. 

75 

76 :param max_submission_concurrency: The maximum number of threads 

77 processing a call to a TransferManager method. Processing a 

78 call usually entails determining which S3 API requests that need 

79 to be enqueued, but does **not** entail making any of the 

80 S3 API data transferring requests needed to perform the transfer. 

81 The threads controlled by ``max_request_concurrency`` is 

82 responsible for that. 

83 

84 :param multipart_chunksize: The size of each transfer if a request 

85 becomes a multipart transfer. 

86 

87 :param max_request_queue_size: The maximum amount of S3 API requests 

88 that can be queued at a time. 

89 

90 :param max_submission_queue_size: The maximum amount of 

91 TransferManager method calls that can be queued at a time. 

92 

93 :param max_io_queue_size: The maximum amount of read parts that 

94 can be queued to be written to disk per download. The default 

95 size for each elementin this queue is 8 KB. 

96 

97 :param io_chunksize: The max size of each chunk in the io queue. 

98 Currently, this is size used when reading from the downloaded 

99 stream as well. 

100 

101 :param num_download_attempts: The number of download attempts that 

102 will be tried upon errors with downloading an object in S3. Note 

103 that these retries account for errors that occur when streaming 

104 down the data from s3 (i.e. socket errors and read timeouts that 

105 occur after receiving an OK response from s3). 

106 Other retryable exceptions such as throttling errors and 5xx errors 

107 are already retried by botocore (this default is 5). The 

108 ``num_download_attempts`` does not take into account the 

109 number of exceptions retried by botocore. 

110 

111 :param max_in_memory_upload_chunks: The number of chunks that can 

112 be stored in memory at a time for all ongoing upload requests. 

113 This pertains to chunks of data that need to be stored in memory 

114 during an upload if the data is sourced from a file-like object. 

115 The total maximum memory footprint due to a in-memory upload 

116 chunks is roughly equal to: 

117 

118 max_in_memory_upload_chunks * multipart_chunksize 

119 + max_submission_concurrency * multipart_chunksize 

120 

121 ``max_submission_concurrency`` has an affect on this value because 

122 for each thread pulling data off of a file-like object, they may 

123 be waiting with a single read chunk to be submitted for upload 

124 because the ``max_in_memory_upload_chunks`` value has been reached 

125 by the threads making the upload request. 

126 

127 :param max_in_memory_download_chunks: The number of chunks that can 

128 be buffered in memory and **not** in the io queue at a time for all 

129 ongoing download requests. This pertains specifically to file-like 

130 objects that cannot be seeked. The total maximum memory footprint 

131 due to a in-memory download chunks is roughly equal to: 

132 

133 max_in_memory_download_chunks * multipart_chunksize 

134 

135 :param max_bandwidth: The maximum bandwidth that will be consumed 

136 in uploading and downloading file content. The value is in terms of 

137 bytes per second. 

138 """ 

139 self.multipart_threshold = multipart_threshold 

140 self.multipart_chunksize = multipart_chunksize 

141 self.max_request_concurrency = max_request_concurrency 

142 self.max_submission_concurrency = max_submission_concurrency 

143 self.max_request_queue_size = max_request_queue_size 

144 self.max_submission_queue_size = max_submission_queue_size 

145 self.max_io_queue_size = max_io_queue_size 

146 self.io_chunksize = io_chunksize 

147 self.num_download_attempts = num_download_attempts 

148 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks 

149 self.max_in_memory_download_chunks = max_in_memory_download_chunks 

150 self.max_bandwidth = max_bandwidth 

151 self._validate_attrs_are_nonzero() 

152 

153 def _validate_attrs_are_nonzero(self): 

154 for attr, attr_val in self.__dict__.items(): 

155 if attr_val is not None and attr_val <= 0: 

156 raise ValueError( 

157 f'Provided parameter {attr} of value {attr_val} must ' 

158 'be greater than 0.' 

159 ) 

160 

161 

162class TransferManager: 

163 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS 

164 

165 _ALLOWED_SHARED_ARGS = [ 

166 'ACL', 

167 'CacheControl', 

168 'ChecksumAlgorithm', 

169 'ContentDisposition', 

170 'ContentEncoding', 

171 'ContentLanguage', 

172 'ContentType', 

173 'ExpectedBucketOwner', 

174 'Expires', 

175 'GrantFullControl', 

176 'GrantRead', 

177 'GrantReadACP', 

178 'GrantWriteACP', 

179 'Metadata', 

180 'ObjectLockLegalHoldStatus', 

181 'ObjectLockMode', 

182 'ObjectLockRetainUntilDate', 

183 'RequestPayer', 

184 'ServerSideEncryption', 

185 'StorageClass', 

186 'SSECustomerAlgorithm', 

187 'SSECustomerKey', 

188 'SSECustomerKeyMD5', 

189 'SSEKMSKeyId', 

190 'SSEKMSEncryptionContext', 

191 'Tagging', 

192 'WebsiteRedirectLocation', 

193 ] 

194 

195 ALLOWED_UPLOAD_ARGS = ( 

196 _ALLOWED_SHARED_ARGS 

197 + [ 

198 'ChecksumType', 

199 'MpuObjectSize', 

200 ] 

201 + FULL_OBJECT_CHECKSUM_ARGS 

202 ) 

203 

204 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [ 

205 'CopySourceIfMatch', 

206 'CopySourceIfModifiedSince', 

207 'CopySourceIfNoneMatch', 

208 'CopySourceIfUnmodifiedSince', 

209 'CopySourceSSECustomerAlgorithm', 

210 'CopySourceSSECustomerKey', 

211 'CopySourceSSECustomerKeyMD5', 

212 'MetadataDirective', 

213 'TaggingDirective', 

214 ] 

215 

216 ALLOWED_DELETE_ARGS = [ 

217 'MFA', 

218 'VersionId', 

219 'RequestPayer', 

220 'ExpectedBucketOwner', 

221 ] 

222 

223 VALIDATE_SUPPORTED_BUCKET_VALUES = True 

224 

225 _UNSUPPORTED_BUCKET_PATTERNS = { 

226 'S3 Object Lambda': re.compile( 

227 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:' 

228 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}' 

229 ), 

230 } 

231 

232 def __init__(self, client, config=None, osutil=None, executor_cls=None): 

233 """A transfer manager interface for Amazon S3 

234 

235 :param client: Client to be used by the manager 

236 :param config: TransferConfig to associate specific configurations 

237 :param osutil: OSUtils object to use for os-related behavior when 

238 using with transfer manager. 

239 

240 :type executor_cls: s3transfer.futures.BaseExecutor 

241 :param executor_cls: The class of executor to use with the transfer 

242 manager. By default, concurrent.futures.ThreadPoolExecutor is used. 

243 """ 

244 self._client = client 

245 self._config = config 

246 if config is None: 

247 self._config = TransferConfig() 

248 self._osutil = osutil 

249 if osutil is None: 

250 self._osutil = OSUtils() 

251 self._coordinator_controller = TransferCoordinatorController() 

252 # A counter to create unique id's for each transfer submitted. 

253 self._id_counter = 0 

254 

255 # The executor responsible for making S3 API transfer requests 

256 self._request_executor = BoundedExecutor( 

257 max_size=self._config.max_request_queue_size, 

258 max_num_threads=self._config.max_request_concurrency, 

259 tag_semaphores={ 

260 IN_MEMORY_UPLOAD_TAG: TaskSemaphore( 

261 self._config.max_in_memory_upload_chunks 

262 ), 

263 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore( 

264 self._config.max_in_memory_download_chunks 

265 ), 

266 }, 

267 executor_cls=executor_cls, 

268 ) 

269 

270 # The executor responsible for submitting the necessary tasks to 

271 # perform the desired transfer 

272 self._submission_executor = BoundedExecutor( 

273 max_size=self._config.max_submission_queue_size, 

274 max_num_threads=self._config.max_submission_concurrency, 

275 executor_cls=executor_cls, 

276 ) 

277 

278 # There is one thread available for writing to disk. It will handle 

279 # downloads for all files. 

280 self._io_executor = BoundedExecutor( 

281 max_size=self._config.max_io_queue_size, 

282 max_num_threads=1, 

283 executor_cls=executor_cls, 

284 ) 

285 

286 # The component responsible for limiting bandwidth usage if it 

287 # is configured. 

288 self._bandwidth_limiter = None 

289 if self._config.max_bandwidth is not None: 

290 logger.debug( 

291 'Setting max_bandwidth to %s', self._config.max_bandwidth 

292 ) 

293 leaky_bucket = LeakyBucket(self._config.max_bandwidth) 

294 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket) 

295 

296 self._register_handlers() 

297 

298 @property 

299 def client(self): 

300 return self._client 

301 

302 @property 

303 def config(self): 

304 return self._config 

305 

306 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): 

307 """Uploads a file to S3 

308 

309 :type fileobj: str or seekable file-like object 

310 :param fileobj: The name of a file to upload or a seekable file-like 

311 object to upload. It is recommended to use a filename because 

312 file-like objects may result in higher memory usage. 

313 

314 :type bucket: str 

315 :param bucket: The name of the bucket to upload to 

316 

317 :type key: str 

318 :param key: The name of the key to upload to 

319 

320 :type extra_args: dict 

321 :param extra_args: Extra arguments that may be passed to the 

322 client operation 

323 

324 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

325 :param subscribers: The list of subscribers to be invoked in the 

326 order provided based on the event emit during the process of 

327 the transfer request. 

328 

329 :rtype: s3transfer.futures.TransferFuture 

330 :returns: Transfer future representing the upload 

331 """ 

332 

333 extra_args = extra_args.copy() if extra_args else {} 

334 if subscribers is None: 

335 subscribers = [] 

336 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

337 self._validate_if_bucket_supported(bucket) 

338 self._add_operation_defaults(extra_args) 

339 call_args = CallArgs( 

340 fileobj=fileobj, 

341 bucket=bucket, 

342 key=key, 

343 extra_args=extra_args, 

344 subscribers=subscribers, 

345 ) 

346 extra_main_kwargs = {} 

347 if self._bandwidth_limiter: 

348 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

349 return self._submit_transfer( 

350 call_args, UploadSubmissionTask, extra_main_kwargs 

351 ) 

352 

353 def download( 

354 self, bucket, key, fileobj, extra_args=None, subscribers=None 

355 ): 

356 """Downloads a file from S3 

357 

358 :type bucket: str 

359 :param bucket: The name of the bucket to download from 

360 

361 :type key: str 

362 :param key: The name of the key to download from 

363 

364 :type fileobj: str or seekable file-like object 

365 :param fileobj: The name of a file to download or a seekable file-like 

366 object to download. It is recommended to use a filename because 

367 file-like objects may result in higher memory usage. 

368 

369 :type extra_args: dict 

370 :param extra_args: Extra arguments that may be passed to the 

371 client operation 

372 

373 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

374 :param subscribers: The list of subscribers to be invoked in the 

375 order provided based on the event emit during the process of 

376 the transfer request. 

377 

378 :rtype: s3transfer.futures.TransferFuture 

379 :returns: Transfer future representing the download 

380 """ 

381 if extra_args is None: 

382 extra_args = {} 

383 if subscribers is None: 

384 subscribers = [] 

385 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

386 self._validate_if_bucket_supported(bucket) 

387 call_args = CallArgs( 

388 bucket=bucket, 

389 key=key, 

390 fileobj=fileobj, 

391 extra_args=extra_args, 

392 subscribers=subscribers, 

393 ) 

394 extra_main_kwargs = {'io_executor': self._io_executor} 

395 if self._bandwidth_limiter: 

396 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

397 return self._submit_transfer( 

398 call_args, DownloadSubmissionTask, extra_main_kwargs 

399 ) 

400 

401 def copy( 

402 self, 

403 copy_source, 

404 bucket, 

405 key, 

406 extra_args=None, 

407 subscribers=None, 

408 source_client=None, 

409 ): 

410 """Copies a file in S3 

411 

412 :type copy_source: dict 

413 :param copy_source: The name of the source bucket, key name of the 

414 source object, and optional version ID of the source object. The 

415 dictionary format is: 

416 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note 

417 that the ``VersionId`` key is optional and may be omitted. 

418 

419 :type bucket: str 

420 :param bucket: The name of the bucket to copy to 

421 

422 :type key: str 

423 :param key: The name of the key to copy to 

424 

425 :type extra_args: dict 

426 :param extra_args: Extra arguments that may be passed to the 

427 client operation 

428 

429 :type subscribers: a list of subscribers 

430 :param subscribers: The list of subscribers to be invoked in the 

431 order provided based on the event emit during the process of 

432 the transfer request. 

433 

434 :type source_client: botocore or boto3 Client 

435 :param source_client: The client to be used for operation that 

436 may happen at the source object. For example, this client is 

437 used for the head_object that determines the size of the copy. 

438 If no client is provided, the transfer manager's client is used 

439 as the client for the source object. 

440 

441 :rtype: s3transfer.futures.TransferFuture 

442 :returns: Transfer future representing the copy 

443 """ 

444 if extra_args is None: 

445 extra_args = {} 

446 if subscribers is None: 

447 subscribers = [] 

448 if source_client is None: 

449 source_client = self._client 

450 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS) 

451 if isinstance(copy_source, dict): 

452 self._validate_if_bucket_supported(copy_source.get('Bucket')) 

453 self._validate_if_bucket_supported(bucket) 

454 call_args = CallArgs( 

455 copy_source=copy_source, 

456 bucket=bucket, 

457 key=key, 

458 extra_args=extra_args, 

459 subscribers=subscribers, 

460 source_client=source_client, 

461 ) 

462 return self._submit_transfer(call_args, CopySubmissionTask) 

463 

464 def delete(self, bucket, key, extra_args=None, subscribers=None): 

465 """Delete an S3 object. 

466 

467 :type bucket: str 

468 :param bucket: The name of the bucket. 

469 

470 :type key: str 

471 :param key: The name of the S3 object to delete. 

472 

473 :type extra_args: dict 

474 :param extra_args: Extra arguments that may be passed to the 

475 DeleteObject call. 

476 

477 :type subscribers: list 

478 :param subscribers: A list of subscribers to be invoked during the 

479 process of the transfer request. Note that the ``on_progress`` 

480 callback is not invoked during object deletion. 

481 

482 :rtype: s3transfer.futures.TransferFuture 

483 :return: Transfer future representing the deletion. 

484 

485 """ 

486 if extra_args is None: 

487 extra_args = {} 

488 if subscribers is None: 

489 subscribers = [] 

490 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) 

491 self._validate_if_bucket_supported(bucket) 

492 call_args = CallArgs( 

493 bucket=bucket, 

494 key=key, 

495 extra_args=extra_args, 

496 subscribers=subscribers, 

497 ) 

498 return self._submit_transfer(call_args, DeleteSubmissionTask) 

499 

500 def _validate_if_bucket_supported(self, bucket): 

501 # s3 high level operations don't support some resources 

502 # (eg. S3 Object Lambda) only direct API calls are available 

503 # for such resources 

504 if self.VALIDATE_SUPPORTED_BUCKET_VALUES: 

505 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): 

506 match = pattern.match(bucket) 

507 if match: 

508 raise ValueError( 

509 f'TransferManager methods do not support {resource} ' 

510 'resource. Use direct client calls instead.' 

511 ) 

512 

513 def _validate_all_known_args(self, actual, allowed): 

514 for kwarg in actual: 

515 if kwarg not in allowed: 

516 raise ValueError( 

517 "Invalid extra_args key '{}', " 

518 "must be one of: {}".format(kwarg, ', '.join(allowed)) 

519 ) 

520 

521 def _add_operation_defaults(self, extra_args): 

522 if ( 

523 self.client.meta.config.request_checksum_calculation 

524 == "when_supported" 

525 ): 

526 set_default_checksum_algorithm(extra_args) 

527 

528 def _submit_transfer( 

529 self, call_args, submission_task_cls, extra_main_kwargs=None 

530 ): 

531 if not extra_main_kwargs: 

532 extra_main_kwargs = {} 

533 

534 # Create a TransferFuture to return back to the user 

535 transfer_future, components = self._get_future_with_components( 

536 call_args 

537 ) 

538 

539 # Add any provided done callbacks to the created transfer future 

540 # to be invoked on the transfer future being complete. 

541 for callback in get_callbacks(transfer_future, 'done'): 

542 components['coordinator'].add_done_callback(callback) 

543 

544 # Get the main kwargs needed to instantiate the submission task 

545 main_kwargs = self._get_submission_task_main_kwargs( 

546 transfer_future, extra_main_kwargs 

547 ) 

548 

549 # Submit a SubmissionTask that will submit all of the necessary 

550 # tasks needed to complete the S3 transfer. 

551 self._submission_executor.submit( 

552 submission_task_cls( 

553 transfer_coordinator=components['coordinator'], 

554 main_kwargs=main_kwargs, 

555 ) 

556 ) 

557 

558 # Increment the unique id counter for future transfer requests 

559 self._id_counter += 1 

560 

561 return transfer_future 

562 

563 def _get_future_with_components(self, call_args): 

564 transfer_id = self._id_counter 

565 # Creates a new transfer future along with its components 

566 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id) 

567 # Track the transfer coordinator for transfers to manage. 

568 self._coordinator_controller.add_transfer_coordinator( 

569 transfer_coordinator 

570 ) 

571 # Also make sure that the transfer coordinator is removed once 

572 # the transfer completes so it does not stick around in memory. 

573 transfer_coordinator.add_done_callback( 

574 self._coordinator_controller.remove_transfer_coordinator, 

575 transfer_coordinator, 

576 ) 

577 components = { 

578 'meta': TransferMeta(call_args, transfer_id=transfer_id), 

579 'coordinator': transfer_coordinator, 

580 } 

581 transfer_future = TransferFuture(**components) 

582 return transfer_future, components 

583 

584 def _get_submission_task_main_kwargs( 

585 self, transfer_future, extra_main_kwargs 

586 ): 

587 main_kwargs = { 

588 'client': self._client, 

589 'config': self._config, 

590 'osutil': self._osutil, 

591 'request_executor': self._request_executor, 

592 'transfer_future': transfer_future, 

593 } 

594 main_kwargs.update(extra_main_kwargs) 

595 return main_kwargs 

596 

597 def _register_handlers(self): 

598 # Register handlers to enable/disable callbacks on uploads. 

599 event_name = 'request-created.s3' 

600 self._client.meta.events.register_first( 

601 event_name, 

602 signal_not_transferring, 

603 unique_id='s3upload-not-transferring', 

604 ) 

605 self._client.meta.events.register_last( 

606 event_name, signal_transferring, unique_id='s3upload-transferring' 

607 ) 

608 

609 def __enter__(self): 

610 return self 

611 

612 def __exit__(self, exc_type, exc_value, *args): 

613 cancel = False 

614 cancel_msg = '' 

615 cancel_exc_type = FatalError 

616 # If a exception was raised in the context handler, signal to cancel 

617 # all of the inprogress futures in the shutdown. 

618 if exc_type: 

619 cancel = True 

620 cancel_msg = str(exc_value) 

621 if not cancel_msg: 

622 cancel_msg = repr(exc_value) 

623 # If it was a KeyboardInterrupt, the cancellation was initiated 

624 # by the user. 

625 if isinstance(exc_value, KeyboardInterrupt): 

626 cancel_exc_type = CancelledError 

627 self._shutdown(cancel, cancel_msg, cancel_exc_type) 

628 

629 def shutdown(self, cancel=False, cancel_msg=''): 

630 """Shutdown the TransferManager 

631 

632 It will wait till all transfers complete before it completely shuts 

633 down. 

634 

635 :type cancel: boolean 

636 :param cancel: If True, calls TransferFuture.cancel() for 

637 all in-progress in transfers. This is useful if you want the 

638 shutdown to happen quicker. 

639 

640 :type cancel_msg: str 

641 :param cancel_msg: The message to specify if canceling all in-progress 

642 transfers. 

643 """ 

644 self._shutdown(cancel, cancel, cancel_msg) 

645 

646 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): 

647 if cancel: 

648 # Cancel all in-flight transfers if requested, before waiting 

649 # for them to complete. 

650 self._coordinator_controller.cancel(cancel_msg, exc_type) 

651 try: 

652 # Wait until there are no more in-progress transfers. This is 

653 # wrapped in a try statement because this can be interrupted 

654 # with a KeyboardInterrupt that needs to be caught. 

655 self._coordinator_controller.wait() 

656 except KeyboardInterrupt: 

657 # If not errors were raised in the try block, the cancel should 

658 # have no coordinators it needs to run cancel on. If there was 

659 # an error raised in the try statement we want to cancel all of 

660 # the inflight transfers before shutting down to speed that 

661 # process up. 

662 self._coordinator_controller.cancel('KeyboardInterrupt()') 

663 raise 

664 finally: 

665 # Shutdown all of the executors. 

666 self._submission_executor.shutdown() 

667 self._request_executor.shutdown() 

668 self._io_executor.shutdown() 

669 

670 

671class TransferCoordinatorController: 

672 def __init__(self): 

673 """Abstraction to control all transfer coordinators 

674 

675 This abstraction allows the manager to wait for inprogress transfers 

676 to complete and cancel all inprogress transfers. 

677 """ 

678 self._lock = threading.Lock() 

679 self._tracked_transfer_coordinators = set() 

680 

681 @property 

682 def tracked_transfer_coordinators(self): 

683 """The set of transfer coordinators being tracked""" 

684 with self._lock: 

685 # We return a copy because the set is mutable and if you were to 

686 # iterate over the set, it may be changing in length due to 

687 # additions and removals of transfer coordinators. 

688 return copy.copy(self._tracked_transfer_coordinators) 

689 

690 def add_transfer_coordinator(self, transfer_coordinator): 

691 """Adds a transfer coordinator of a transfer to be canceled if needed 

692 

693 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

694 :param transfer_coordinator: The transfer coordinator for the 

695 particular transfer 

696 """ 

697 with self._lock: 

698 self._tracked_transfer_coordinators.add(transfer_coordinator) 

699 

700 def remove_transfer_coordinator(self, transfer_coordinator): 

701 """Remove a transfer coordinator from cancellation consideration 

702 

703 Typically, this method is invoked by the transfer coordinator itself 

704 to remove its self when it completes its transfer. 

705 

706 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

707 :param transfer_coordinator: The transfer coordinator for the 

708 particular transfer 

709 """ 

710 with self._lock: 

711 self._tracked_transfer_coordinators.remove(transfer_coordinator) 

712 

713 def cancel(self, msg='', exc_type=CancelledError): 

714 """Cancels all inprogress transfers 

715 

716 This cancels the inprogress transfers by calling cancel() on all 

717 tracked transfer coordinators. 

718 

719 :param msg: The message to pass on to each transfer coordinator that 

720 gets cancelled. 

721 

722 :param exc_type: The type of exception to set for the cancellation 

723 """ 

724 for transfer_coordinator in self.tracked_transfer_coordinators: 

725 transfer_coordinator.cancel(msg, exc_type) 

726 

727 def wait(self): 

728 """Wait until there are no more inprogress transfers 

729 

730 This will not stop when failures are encountered and not propagate any 

731 of these errors from failed transfers, but it can be interrupted with 

732 a KeyboardInterrupt. 

733 """ 

734 try: 

735 transfer_coordinator = None 

736 for transfer_coordinator in self.tracked_transfer_coordinators: 

737 transfer_coordinator.result() 

738 except KeyboardInterrupt: 

739 logger.debug('Received KeyboardInterrupt in wait()') 

740 # If Keyboard interrupt is raised while waiting for 

741 # the result, then exit out of the wait and raise the 

742 # exception 

743 if transfer_coordinator: 

744 logger.debug( 

745 'On KeyboardInterrupt was waiting for %s', 

746 transfer_coordinator, 

747 ) 

748 raise 

749 except Exception: 

750 # A general exception could have been thrown because 

751 # of result(). We just want to ignore this and continue 

752 # because we at least know that the transfer coordinator 

753 # has completed. 

754 pass