Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/manager.py: 25%

206 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import re 

16import threading 

17 

18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket 

19from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, KB, MB 

20from s3transfer.copies import CopySubmissionTask 

21from s3transfer.delete import DeleteSubmissionTask 

22from s3transfer.download import DownloadSubmissionTask 

23from s3transfer.exceptions import CancelledError, FatalError 

24from s3transfer.futures import ( 

25 IN_MEMORY_DOWNLOAD_TAG, 

26 IN_MEMORY_UPLOAD_TAG, 

27 BoundedExecutor, 

28 TransferCoordinator, 

29 TransferFuture, 

30 TransferMeta, 

31) 

32from s3transfer.upload import UploadSubmissionTask 

33from s3transfer.utils import ( 

34 CallArgs, 

35 OSUtils, 

36 SlidingWindowSemaphore, 

37 TaskSemaphore, 

38 add_s3express_defaults, 

39 get_callbacks, 

40 signal_not_transferring, 

41 signal_transferring, 

42) 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class TransferConfig: 

48 def __init__( 

49 self, 

50 multipart_threshold=8 * MB, 

51 multipart_chunksize=8 * MB, 

52 max_request_concurrency=10, 

53 max_submission_concurrency=5, 

54 max_request_queue_size=1000, 

55 max_submission_queue_size=1000, 

56 max_io_queue_size=1000, 

57 io_chunksize=256 * KB, 

58 num_download_attempts=5, 

59 max_in_memory_upload_chunks=10, 

60 max_in_memory_download_chunks=10, 

61 max_bandwidth=None, 

62 ): 

63 """Configurations for the transfer manager 

64 

65 :param multipart_threshold: The threshold for which multipart 

66 transfers occur. 

67 

68 :param max_request_concurrency: The maximum number of S3 API 

69 transfer-related requests that can happen at a time. 

70 

71 :param max_submission_concurrency: The maximum number of threads 

72 processing a call to a TransferManager method. Processing a 

73 call usually entails determining which S3 API requests that need 

74 to be enqueued, but does **not** entail making any of the 

75 S3 API data transferring requests needed to perform the transfer. 

76 The threads controlled by ``max_request_concurrency`` is 

77 responsible for that. 

78 

79 :param multipart_chunksize: The size of each transfer if a request 

80 becomes a multipart transfer. 

81 

82 :param max_request_queue_size: The maximum amount of S3 API requests 

83 that can be queued at a time. 

84 

85 :param max_submission_queue_size: The maximum amount of 

86 TransferManager method calls that can be queued at a time. 

87 

88 :param max_io_queue_size: The maximum amount of read parts that 

89 can be queued to be written to disk per download. The default 

90 size for each elementin this queue is 8 KB. 

91 

92 :param io_chunksize: The max size of each chunk in the io queue. 

93 Currently, this is size used when reading from the downloaded 

94 stream as well. 

95 

96 :param num_download_attempts: The number of download attempts that 

97 will be tried upon errors with downloading an object in S3. Note 

98 that these retries account for errors that occur when streaming 

99 down the data from s3 (i.e. socket errors and read timeouts that 

100 occur after receiving an OK response from s3). 

101 Other retryable exceptions such as throttling errors and 5xx errors 

102 are already retried by botocore (this default is 5). The 

103 ``num_download_attempts`` does not take into account the 

104 number of exceptions retried by botocore. 

105 

106 :param max_in_memory_upload_chunks: The number of chunks that can 

107 be stored in memory at a time for all ongoing upload requests. 

108 This pertains to chunks of data that need to be stored in memory 

109 during an upload if the data is sourced from a file-like object. 

110 The total maximum memory footprint due to a in-memory upload 

111 chunks is roughly equal to: 

112 

113 max_in_memory_upload_chunks * multipart_chunksize 

114 + max_submission_concurrency * multipart_chunksize 

115 

116 ``max_submission_concurrency`` has an affect on this value because 

117 for each thread pulling data off of a file-like object, they may 

118 be waiting with a single read chunk to be submitted for upload 

119 because the ``max_in_memory_upload_chunks`` value has been reached 

120 by the threads making the upload request. 

121 

122 :param max_in_memory_download_chunks: The number of chunks that can 

123 be buffered in memory and **not** in the io queue at a time for all 

124 ongoing download requests. This pertains specifically to file-like 

125 objects that cannot be seeked. The total maximum memory footprint 

126 due to a in-memory download chunks is roughly equal to: 

127 

128 max_in_memory_download_chunks * multipart_chunksize 

129 

130 :param max_bandwidth: The maximum bandwidth that will be consumed 

131 in uploading and downloading file content. The value is in terms of 

132 bytes per second. 

133 """ 

134 self.multipart_threshold = multipart_threshold 

135 self.multipart_chunksize = multipart_chunksize 

136 self.max_request_concurrency = max_request_concurrency 

137 self.max_submission_concurrency = max_submission_concurrency 

138 self.max_request_queue_size = max_request_queue_size 

139 self.max_submission_queue_size = max_submission_queue_size 

140 self.max_io_queue_size = max_io_queue_size 

141 self.io_chunksize = io_chunksize 

142 self.num_download_attempts = num_download_attempts 

143 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks 

144 self.max_in_memory_download_chunks = max_in_memory_download_chunks 

145 self.max_bandwidth = max_bandwidth 

146 self._validate_attrs_are_nonzero() 

147 

148 def _validate_attrs_are_nonzero(self): 

149 for attr, attr_val in self.__dict__.items(): 

150 if attr_val is not None and attr_val <= 0: 

151 raise ValueError( 

152 'Provided parameter %s of value %s must be greater than ' 

153 '0.' % (attr, attr_val) 

154 ) 

155 

156 

157class TransferManager: 

158 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS 

159 

160 ALLOWED_UPLOAD_ARGS = [ 

161 'ACL', 

162 'CacheControl', 

163 'ChecksumAlgorithm', 

164 'ContentDisposition', 

165 'ContentEncoding', 

166 'ContentLanguage', 

167 'ContentType', 

168 'ExpectedBucketOwner', 

169 'Expires', 

170 'GrantFullControl', 

171 'GrantRead', 

172 'GrantReadACP', 

173 'GrantWriteACP', 

174 'Metadata', 

175 'ObjectLockLegalHoldStatus', 

176 'ObjectLockMode', 

177 'ObjectLockRetainUntilDate', 

178 'RequestPayer', 

179 'ServerSideEncryption', 

180 'StorageClass', 

181 'SSECustomerAlgorithm', 

182 'SSECustomerKey', 

183 'SSECustomerKeyMD5', 

184 'SSEKMSKeyId', 

185 'SSEKMSEncryptionContext', 

186 'Tagging', 

187 'WebsiteRedirectLocation', 

188 ] 

189 

190 ALLOWED_COPY_ARGS = ALLOWED_UPLOAD_ARGS + [ 

191 'CopySourceIfMatch', 

192 'CopySourceIfModifiedSince', 

193 'CopySourceIfNoneMatch', 

194 'CopySourceIfUnmodifiedSince', 

195 'CopySourceSSECustomerAlgorithm', 

196 'CopySourceSSECustomerKey', 

197 'CopySourceSSECustomerKeyMD5', 

198 'MetadataDirective', 

199 'TaggingDirective', 

200 ] 

201 

202 ALLOWED_DELETE_ARGS = [ 

203 'MFA', 

204 'VersionId', 

205 'RequestPayer', 

206 'ExpectedBucketOwner', 

207 ] 

208 

209 VALIDATE_SUPPORTED_BUCKET_VALUES = True 

210 

211 _UNSUPPORTED_BUCKET_PATTERNS = { 

212 'S3 Object Lambda': re.compile( 

213 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:' 

214 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}' 

215 ), 

216 } 

217 

218 def __init__(self, client, config=None, osutil=None, executor_cls=None): 

219 """A transfer manager interface for Amazon S3 

220 

221 :param client: Client to be used by the manager 

222 :param config: TransferConfig to associate specific configurations 

223 :param osutil: OSUtils object to use for os-related behavior when 

224 using with transfer manager. 

225 

226 :type executor_cls: s3transfer.futures.BaseExecutor 

227 :param executor_cls: The class of executor to use with the transfer 

228 manager. By default, concurrent.futures.ThreadPoolExecutor is used. 

229 """ 

230 self._client = client 

231 self._config = config 

232 if config is None: 

233 self._config = TransferConfig() 

234 self._osutil = osutil 

235 if osutil is None: 

236 self._osutil = OSUtils() 

237 self._coordinator_controller = TransferCoordinatorController() 

238 # A counter to create unique id's for each transfer submitted. 

239 self._id_counter = 0 

240 

241 # The executor responsible for making S3 API transfer requests 

242 self._request_executor = BoundedExecutor( 

243 max_size=self._config.max_request_queue_size, 

244 max_num_threads=self._config.max_request_concurrency, 

245 tag_semaphores={ 

246 IN_MEMORY_UPLOAD_TAG: TaskSemaphore( 

247 self._config.max_in_memory_upload_chunks 

248 ), 

249 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore( 

250 self._config.max_in_memory_download_chunks 

251 ), 

252 }, 

253 executor_cls=executor_cls, 

254 ) 

255 

256 # The executor responsible for submitting the necessary tasks to 

257 # perform the desired transfer 

258 self._submission_executor = BoundedExecutor( 

259 max_size=self._config.max_submission_queue_size, 

260 max_num_threads=self._config.max_submission_concurrency, 

261 executor_cls=executor_cls, 

262 ) 

263 

264 # There is one thread available for writing to disk. It will handle 

265 # downloads for all files. 

266 self._io_executor = BoundedExecutor( 

267 max_size=self._config.max_io_queue_size, 

268 max_num_threads=1, 

269 executor_cls=executor_cls, 

270 ) 

271 

272 # The component responsible for limiting bandwidth usage if it 

273 # is configured. 

274 self._bandwidth_limiter = None 

275 if self._config.max_bandwidth is not None: 

276 logger.debug( 

277 'Setting max_bandwidth to %s', self._config.max_bandwidth 

278 ) 

279 leaky_bucket = LeakyBucket(self._config.max_bandwidth) 

280 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket) 

281 

282 self._register_handlers() 

283 

284 @property 

285 def client(self): 

286 return self._client 

287 

288 @property 

289 def config(self): 

290 return self._config 

291 

292 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): 

293 """Uploads a file to S3 

294 

295 :type fileobj: str or seekable file-like object 

296 :param fileobj: The name of a file to upload or a seekable file-like 

297 object to upload. It is recommended to use a filename because 

298 file-like objects may result in higher memory usage. 

299 

300 :type bucket: str 

301 :param bucket: The name of the bucket to upload to 

302 

303 :type key: str 

304 :param key: The name of the key to upload to 

305 

306 :type extra_args: dict 

307 :param extra_args: Extra arguments that may be passed to the 

308 client operation 

309 

310 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

311 :param subscribers: The list of subscribers to be invoked in the 

312 order provided based on the event emit during the process of 

313 the transfer request. 

314 

315 :rtype: s3transfer.futures.TransferFuture 

316 :returns: Transfer future representing the upload 

317 """ 

318 if extra_args is None: 

319 extra_args = {} 

320 if subscribers is None: 

321 subscribers = [] 

322 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) 

323 self._validate_if_bucket_supported(bucket) 

324 self._add_operation_defaults(bucket, extra_args) 

325 call_args = CallArgs( 

326 fileobj=fileobj, 

327 bucket=bucket, 

328 key=key, 

329 extra_args=extra_args, 

330 subscribers=subscribers, 

331 ) 

332 extra_main_kwargs = {} 

333 if self._bandwidth_limiter: 

334 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

335 return self._submit_transfer( 

336 call_args, UploadSubmissionTask, extra_main_kwargs 

337 ) 

338 

339 def download( 

340 self, bucket, key, fileobj, extra_args=None, subscribers=None 

341 ): 

342 """Downloads a file from S3 

343 

344 :type bucket: str 

345 :param bucket: The name of the bucket to download from 

346 

347 :type key: str 

348 :param key: The name of the key to download from 

349 

350 :type fileobj: str or seekable file-like object 

351 :param fileobj: The name of a file to download or a seekable file-like 

352 object to download. It is recommended to use a filename because 

353 file-like objects may result in higher memory usage. 

354 

355 :type extra_args: dict 

356 :param extra_args: Extra arguments that may be passed to the 

357 client operation 

358 

359 :type subscribers: list(s3transfer.subscribers.BaseSubscriber) 

360 :param subscribers: The list of subscribers to be invoked in the 

361 order provided based on the event emit during the process of 

362 the transfer request. 

363 

364 :rtype: s3transfer.futures.TransferFuture 

365 :returns: Transfer future representing the download 

366 """ 

367 if extra_args is None: 

368 extra_args = {} 

369 if subscribers is None: 

370 subscribers = [] 

371 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) 

372 self._validate_if_bucket_supported(bucket) 

373 call_args = CallArgs( 

374 bucket=bucket, 

375 key=key, 

376 fileobj=fileobj, 

377 extra_args=extra_args, 

378 subscribers=subscribers, 

379 ) 

380 extra_main_kwargs = {'io_executor': self._io_executor} 

381 if self._bandwidth_limiter: 

382 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter 

383 return self._submit_transfer( 

384 call_args, DownloadSubmissionTask, extra_main_kwargs 

385 ) 

386 

387 def copy( 

388 self, 

389 copy_source, 

390 bucket, 

391 key, 

392 extra_args=None, 

393 subscribers=None, 

394 source_client=None, 

395 ): 

396 """Copies a file in S3 

397 

398 :type copy_source: dict 

399 :param copy_source: The name of the source bucket, key name of the 

400 source object, and optional version ID of the source object. The 

401 dictionary format is: 

402 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note 

403 that the ``VersionId`` key is optional and may be omitted. 

404 

405 :type bucket: str 

406 :param bucket: The name of the bucket to copy to 

407 

408 :type key: str 

409 :param key: The name of the key to copy to 

410 

411 :type extra_args: dict 

412 :param extra_args: Extra arguments that may be passed to the 

413 client operation 

414 

415 :type subscribers: a list of subscribers 

416 :param subscribers: The list of subscribers to be invoked in the 

417 order provided based on the event emit during the process of 

418 the transfer request. 

419 

420 :type source_client: botocore or boto3 Client 

421 :param source_client: The client to be used for operation that 

422 may happen at the source object. For example, this client is 

423 used for the head_object that determines the size of the copy. 

424 If no client is provided, the transfer manager's client is used 

425 as the client for the source object. 

426 

427 :rtype: s3transfer.futures.TransferFuture 

428 :returns: Transfer future representing the copy 

429 """ 

430 if extra_args is None: 

431 extra_args = {} 

432 if subscribers is None: 

433 subscribers = [] 

434 if source_client is None: 

435 source_client = self._client 

436 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS) 

437 if isinstance(copy_source, dict): 

438 self._validate_if_bucket_supported(copy_source.get('Bucket')) 

439 self._validate_if_bucket_supported(bucket) 

440 call_args = CallArgs( 

441 copy_source=copy_source, 

442 bucket=bucket, 

443 key=key, 

444 extra_args=extra_args, 

445 subscribers=subscribers, 

446 source_client=source_client, 

447 ) 

448 return self._submit_transfer(call_args, CopySubmissionTask) 

449 

450 def delete(self, bucket, key, extra_args=None, subscribers=None): 

451 """Delete an S3 object. 

452 

453 :type bucket: str 

454 :param bucket: The name of the bucket. 

455 

456 :type key: str 

457 :param key: The name of the S3 object to delete. 

458 

459 :type extra_args: dict 

460 :param extra_args: Extra arguments that may be passed to the 

461 DeleteObject call. 

462 

463 :type subscribers: list 

464 :param subscribers: A list of subscribers to be invoked during the 

465 process of the transfer request. Note that the ``on_progress`` 

466 callback is not invoked during object deletion. 

467 

468 :rtype: s3transfer.futures.TransferFuture 

469 :return: Transfer future representing the deletion. 

470 

471 """ 

472 if extra_args is None: 

473 extra_args = {} 

474 if subscribers is None: 

475 subscribers = [] 

476 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS) 

477 self._validate_if_bucket_supported(bucket) 

478 call_args = CallArgs( 

479 bucket=bucket, 

480 key=key, 

481 extra_args=extra_args, 

482 subscribers=subscribers, 

483 ) 

484 return self._submit_transfer(call_args, DeleteSubmissionTask) 

485 

486 def _validate_if_bucket_supported(self, bucket): 

487 # s3 high level operations don't support some resources 

488 # (eg. S3 Object Lambda) only direct API calls are available 

489 # for such resources 

490 if self.VALIDATE_SUPPORTED_BUCKET_VALUES: 

491 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items(): 

492 match = pattern.match(bucket) 

493 if match: 

494 raise ValueError( 

495 'TransferManager methods do not support %s ' 

496 'resource. Use direct client calls instead.' % resource 

497 ) 

498 

499 def _validate_all_known_args(self, actual, allowed): 

500 for kwarg in actual: 

501 if kwarg not in allowed: 

502 raise ValueError( 

503 "Invalid extra_args key '%s', " 

504 "must be one of: %s" % (kwarg, ', '.join(allowed)) 

505 ) 

506 

507 def _add_operation_defaults(self, bucket, extra_args): 

508 add_s3express_defaults(bucket, extra_args) 

509 

510 def _submit_transfer( 

511 self, call_args, submission_task_cls, extra_main_kwargs=None 

512 ): 

513 if not extra_main_kwargs: 

514 extra_main_kwargs = {} 

515 

516 # Create a TransferFuture to return back to the user 

517 transfer_future, components = self._get_future_with_components( 

518 call_args 

519 ) 

520 

521 # Add any provided done callbacks to the created transfer future 

522 # to be invoked on the transfer future being complete. 

523 for callback in get_callbacks(transfer_future, 'done'): 

524 components['coordinator'].add_done_callback(callback) 

525 

526 # Get the main kwargs needed to instantiate the submission task 

527 main_kwargs = self._get_submission_task_main_kwargs( 

528 transfer_future, extra_main_kwargs 

529 ) 

530 

531 # Submit a SubmissionTask that will submit all of the necessary 

532 # tasks needed to complete the S3 transfer. 

533 self._submission_executor.submit( 

534 submission_task_cls( 

535 transfer_coordinator=components['coordinator'], 

536 main_kwargs=main_kwargs, 

537 ) 

538 ) 

539 

540 # Increment the unique id counter for future transfer requests 

541 self._id_counter += 1 

542 

543 return transfer_future 

544 

545 def _get_future_with_components(self, call_args): 

546 transfer_id = self._id_counter 

547 # Creates a new transfer future along with its components 

548 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id) 

549 # Track the transfer coordinator for transfers to manage. 

550 self._coordinator_controller.add_transfer_coordinator( 

551 transfer_coordinator 

552 ) 

553 # Also make sure that the transfer coordinator is removed once 

554 # the transfer completes so it does not stick around in memory. 

555 transfer_coordinator.add_done_callback( 

556 self._coordinator_controller.remove_transfer_coordinator, 

557 transfer_coordinator, 

558 ) 

559 components = { 

560 'meta': TransferMeta(call_args, transfer_id=transfer_id), 

561 'coordinator': transfer_coordinator, 

562 } 

563 transfer_future = TransferFuture(**components) 

564 return transfer_future, components 

565 

566 def _get_submission_task_main_kwargs( 

567 self, transfer_future, extra_main_kwargs 

568 ): 

569 main_kwargs = { 

570 'client': self._client, 

571 'config': self._config, 

572 'osutil': self._osutil, 

573 'request_executor': self._request_executor, 

574 'transfer_future': transfer_future, 

575 } 

576 main_kwargs.update(extra_main_kwargs) 

577 return main_kwargs 

578 

579 def _register_handlers(self): 

580 # Register handlers to enable/disable callbacks on uploads. 

581 event_name = 'request-created.s3' 

582 self._client.meta.events.register_first( 

583 event_name, 

584 signal_not_transferring, 

585 unique_id='s3upload-not-transferring', 

586 ) 

587 self._client.meta.events.register_last( 

588 event_name, signal_transferring, unique_id='s3upload-transferring' 

589 ) 

590 

591 def __enter__(self): 

592 return self 

593 

594 def __exit__(self, exc_type, exc_value, *args): 

595 cancel = False 

596 cancel_msg = '' 

597 cancel_exc_type = FatalError 

598 # If a exception was raised in the context handler, signal to cancel 

599 # all of the inprogress futures in the shutdown. 

600 if exc_type: 

601 cancel = True 

602 cancel_msg = str(exc_value) 

603 if not cancel_msg: 

604 cancel_msg = repr(exc_value) 

605 # If it was a KeyboardInterrupt, the cancellation was initiated 

606 # by the user. 

607 if isinstance(exc_value, KeyboardInterrupt): 

608 cancel_exc_type = CancelledError 

609 self._shutdown(cancel, cancel_msg, cancel_exc_type) 

610 

611 def shutdown(self, cancel=False, cancel_msg=''): 

612 """Shutdown the TransferManager 

613 

614 It will wait till all transfers complete before it completely shuts 

615 down. 

616 

617 :type cancel: boolean 

618 :param cancel: If True, calls TransferFuture.cancel() for 

619 all in-progress in transfers. This is useful if you want the 

620 shutdown to happen quicker. 

621 

622 :type cancel_msg: str 

623 :param cancel_msg: The message to specify if canceling all in-progress 

624 transfers. 

625 """ 

626 self._shutdown(cancel, cancel, cancel_msg) 

627 

628 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError): 

629 if cancel: 

630 # Cancel all in-flight transfers if requested, before waiting 

631 # for them to complete. 

632 self._coordinator_controller.cancel(cancel_msg, exc_type) 

633 try: 

634 # Wait until there are no more in-progress transfers. This is 

635 # wrapped in a try statement because this can be interrupted 

636 # with a KeyboardInterrupt that needs to be caught. 

637 self._coordinator_controller.wait() 

638 except KeyboardInterrupt: 

639 # If not errors were raised in the try block, the cancel should 

640 # have no coordinators it needs to run cancel on. If there was 

641 # an error raised in the try statement we want to cancel all of 

642 # the inflight transfers before shutting down to speed that 

643 # process up. 

644 self._coordinator_controller.cancel('KeyboardInterrupt()') 

645 raise 

646 finally: 

647 # Shutdown all of the executors. 

648 self._submission_executor.shutdown() 

649 self._request_executor.shutdown() 

650 self._io_executor.shutdown() 

651 

652 

653class TransferCoordinatorController: 

654 def __init__(self): 

655 """Abstraction to control all transfer coordinators 

656 

657 This abstraction allows the manager to wait for inprogress transfers 

658 to complete and cancel all inprogress transfers. 

659 """ 

660 self._lock = threading.Lock() 

661 self._tracked_transfer_coordinators = set() 

662 

663 @property 

664 def tracked_transfer_coordinators(self): 

665 """The set of transfer coordinators being tracked""" 

666 with self._lock: 

667 # We return a copy because the set is mutable and if you were to 

668 # iterate over the set, it may be changing in length due to 

669 # additions and removals of transfer coordinators. 

670 return copy.copy(self._tracked_transfer_coordinators) 

671 

672 def add_transfer_coordinator(self, transfer_coordinator): 

673 """Adds a transfer coordinator of a transfer to be canceled if needed 

674 

675 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

676 :param transfer_coordinator: The transfer coordinator for the 

677 particular transfer 

678 """ 

679 with self._lock: 

680 self._tracked_transfer_coordinators.add(transfer_coordinator) 

681 

682 def remove_transfer_coordinator(self, transfer_coordinator): 

683 """Remove a transfer coordinator from cancellation consideration 

684 

685 Typically, this method is invoked by the transfer coordinator itself 

686 to remove its self when it completes its transfer. 

687 

688 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

689 :param transfer_coordinator: The transfer coordinator for the 

690 particular transfer 

691 """ 

692 with self._lock: 

693 self._tracked_transfer_coordinators.remove(transfer_coordinator) 

694 

695 def cancel(self, msg='', exc_type=CancelledError): 

696 """Cancels all inprogress transfers 

697 

698 This cancels the inprogress transfers by calling cancel() on all 

699 tracked transfer coordinators. 

700 

701 :param msg: The message to pass on to each transfer coordinator that 

702 gets cancelled. 

703 

704 :param exc_type: The type of exception to set for the cancellation 

705 """ 

706 for transfer_coordinator in self.tracked_transfer_coordinators: 

707 transfer_coordinator.cancel(msg, exc_type) 

708 

709 def wait(self): 

710 """Wait until there are no more inprogress transfers 

711 

712 This will not stop when failures are encountered and not propagate any 

713 of these errors from failed transfers, but it can be interrupted with 

714 a KeyboardInterrupt. 

715 """ 

716 try: 

717 transfer_coordinator = None 

718 for transfer_coordinator in self.tracked_transfer_coordinators: 

719 transfer_coordinator.result() 

720 except KeyboardInterrupt: 

721 logger.debug('Received KeyboardInterrupt in wait()') 

722 # If Keyboard interrupt is raised while waiting for 

723 # the result, then exit out of the wait and raise the 

724 # exception 

725 if transfer_coordinator: 

726 logger.debug( 

727 'On KeyboardInterrupt was waiting for %s', 

728 transfer_coordinator, 

729 ) 

730 raise 

731 except Exception: 

732 # A general exception could have been thrown because 

733 # of result(). We just want to ignore this and continue 

734 # because we at least know that the transfer coordinator 

735 # has completed. 

736 pass