Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/manager.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import re
16import threading
18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
19from s3transfer.constants import (
20 ALLOWED_DOWNLOAD_ARGS,
21 FULL_OBJECT_CHECKSUM_ARGS,
22 KB,
23 MB,
24)
25from s3transfer.copies import CopySubmissionTask
26from s3transfer.delete import DeleteSubmissionTask
27from s3transfer.download import DownloadSubmissionTask
28from s3transfer.exceptions import CancelledError, FatalError
29from s3transfer.futures import (
30 IN_MEMORY_DOWNLOAD_TAG,
31 IN_MEMORY_UPLOAD_TAG,
32 BoundedExecutor,
33 TransferCoordinator,
34 TransferFuture,
35 TransferMeta,
36)
37from s3transfer.upload import UploadSubmissionTask
38from s3transfer.utils import (
39 CallArgs,
40 OSUtils,
41 SlidingWindowSemaphore,
42 TaskSemaphore,
43 get_callbacks,
44 set_default_checksum_algorithm,
45 signal_not_transferring,
46 signal_transferring,
47)
49logger = logging.getLogger(__name__)
52class TransferConfig:
53 UNSET_DEFAULT = object()
55 def __init__(
56 self,
57 multipart_threshold=8 * MB,
58 multipart_chunksize=8 * MB,
59 max_request_concurrency=10,
60 max_submission_concurrency=5,
61 max_request_queue_size=1000,
62 max_submission_queue_size=1000,
63 max_io_queue_size=1000,
64 io_chunksize=256 * KB,
65 num_download_attempts=5,
66 max_in_memory_upload_chunks=10,
67 max_in_memory_download_chunks=10,
68 max_bandwidth=None,
69 ):
70 """Configurations for the transfer manager
72 :param multipart_threshold: The threshold for which multipart
73 transfers occur.
75 :param max_request_concurrency: The maximum number of S3 API
76 transfer-related requests that can happen at a time.
78 :param max_submission_concurrency: The maximum number of threads
79 processing a call to a TransferManager method. Processing a
80 call usually entails determining which S3 API requests that need
81 to be enqueued, but does **not** entail making any of the
82 S3 API data transferring requests needed to perform the transfer.
83 The threads controlled by ``max_request_concurrency`` is
84 responsible for that.
86 :param multipart_chunksize: The size of each transfer if a request
87 becomes a multipart transfer.
89 :param max_request_queue_size: The maximum amount of S3 API requests
90 that can be queued at a time.
92 :param max_submission_queue_size: The maximum amount of
93 TransferManager method calls that can be queued at a time.
95 :param max_io_queue_size: The maximum amount of read parts that
96 can be queued to be written to disk per download. The default
97 size for each elementin this queue is 8 KB.
99 :param io_chunksize: The max size of each chunk in the io queue.
100 Currently, this is size used when reading from the downloaded
101 stream as well.
103 :param num_download_attempts: The number of download attempts that
104 will be tried upon errors with downloading an object in S3. Note
105 that these retries account for errors that occur when streaming
106 down the data from s3 (i.e. socket errors and read timeouts that
107 occur after receiving an OK response from s3).
108 Other retryable exceptions such as throttling errors and 5xx errors
109 are already retried by botocore (this default is 5). The
110 ``num_download_attempts`` does not take into account the
111 number of exceptions retried by botocore.
113 :param max_in_memory_upload_chunks: The number of chunks that can
114 be stored in memory at a time for all ongoing upload requests.
115 This pertains to chunks of data that need to be stored in memory
116 during an upload if the data is sourced from a file-like object.
117 The total maximum memory footprint due to a in-memory upload
118 chunks is roughly equal to:
120 max_in_memory_upload_chunks * multipart_chunksize
121 + max_submission_concurrency * multipart_chunksize
123 ``max_submission_concurrency`` has an affect on this value because
124 for each thread pulling data off of a file-like object, they may
125 be waiting with a single read chunk to be submitted for upload
126 because the ``max_in_memory_upload_chunks`` value has been reached
127 by the threads making the upload request.
129 :param max_in_memory_download_chunks: The number of chunks that can
130 be buffered in memory and **not** in the io queue at a time for all
131 ongoing download requests. This pertains specifically to file-like
132 objects that cannot be seeked. The total maximum memory footprint
133 due to a in-memory download chunks is roughly equal to:
135 max_in_memory_download_chunks * multipart_chunksize
137 :param max_bandwidth: The maximum bandwidth that will be consumed
138 in uploading and downloading file content. The value is in terms of
139 bytes per second.
140 """
141 self.multipart_threshold = multipart_threshold
142 self.multipart_chunksize = multipart_chunksize
143 self.max_request_concurrency = max_request_concurrency
144 self.max_submission_concurrency = max_submission_concurrency
145 self.max_request_queue_size = max_request_queue_size
146 self.max_submission_queue_size = max_submission_queue_size
147 self.max_io_queue_size = max_io_queue_size
148 self.io_chunksize = io_chunksize
149 self.num_download_attempts = num_download_attempts
150 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
151 self.max_in_memory_download_chunks = max_in_memory_download_chunks
152 self.max_bandwidth = max_bandwidth
153 self._validate_attrs_are_nonzero()
155 def _validate_attrs_are_nonzero(self):
156 for attr, attr_val in self.__dict__.items():
157 if (
158 attr_val is not None
159 and attr_val is not self.UNSET_DEFAULT
160 and attr_val <= 0
161 ):
162 raise ValueError(
163 f'Provided parameter {attr} of value {attr_val} must '
164 'be greater than 0.'
165 )
167 def get_deep_attr(self, item):
168 return object.__getattribute__(self, item)
171class TransferManager:
172 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
174 _ALLOWED_SHARED_ARGS = [
175 'ACL',
176 'CacheControl',
177 'ChecksumAlgorithm',
178 'ContentDisposition',
179 'ContentEncoding',
180 'ContentLanguage',
181 'ContentType',
182 'ExpectedBucketOwner',
183 'Expires',
184 'GrantFullControl',
185 'GrantRead',
186 'GrantReadACP',
187 'GrantWriteACP',
188 'Metadata',
189 'ObjectLockLegalHoldStatus',
190 'ObjectLockMode',
191 'ObjectLockRetainUntilDate',
192 'RequestPayer',
193 'ServerSideEncryption',
194 'StorageClass',
195 'SSECustomerAlgorithm',
196 'SSECustomerKey',
197 'SSECustomerKeyMD5',
198 'SSEKMSKeyId',
199 'SSEKMSEncryptionContext',
200 'Tagging',
201 'WebsiteRedirectLocation',
202 ]
204 ALLOWED_UPLOAD_ARGS = (
205 _ALLOWED_SHARED_ARGS
206 + [
207 'ChecksumType',
208 'MpuObjectSize',
209 ]
210 + FULL_OBJECT_CHECKSUM_ARGS
211 )
213 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [
214 'CopySourceIfMatch',
215 'CopySourceIfModifiedSince',
216 'CopySourceIfNoneMatch',
217 'CopySourceIfUnmodifiedSince',
218 'CopySourceSSECustomerAlgorithm',
219 'CopySourceSSECustomerKey',
220 'CopySourceSSECustomerKeyMD5',
221 'MetadataDirective',
222 'TaggingDirective',
223 ]
225 ALLOWED_DELETE_ARGS = [
226 'MFA',
227 'VersionId',
228 'RequestPayer',
229 'ExpectedBucketOwner',
230 ]
232 VALIDATE_SUPPORTED_BUCKET_VALUES = True
234 _UNSUPPORTED_BUCKET_PATTERNS = {
235 'S3 Object Lambda': re.compile(
236 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
237 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
238 ),
239 }
241 def __init__(self, client, config=None, osutil=None, executor_cls=None):
242 """A transfer manager interface for Amazon S3
244 :param client: Client to be used by the manager
245 :param config: TransferConfig to associate specific configurations
246 :param osutil: OSUtils object to use for os-related behavior when
247 using with transfer manager.
249 :type executor_cls: s3transfer.futures.BaseExecutor
250 :param executor_cls: The class of executor to use with the transfer
251 manager. By default, concurrent.futures.ThreadPoolExecutor is used.
252 """
253 self._client = client
254 self._config = config
255 if config is None:
256 self._config = TransferConfig()
257 self._osutil = osutil
258 if osutil is None:
259 self._osutil = OSUtils()
260 self._coordinator_controller = TransferCoordinatorController()
261 # A counter to create unique id's for each transfer submitted.
262 self._id_counter = 0
264 # The executor responsible for making S3 API transfer requests
265 self._request_executor = BoundedExecutor(
266 max_size=self._config.max_request_queue_size,
267 max_num_threads=self._config.max_request_concurrency,
268 tag_semaphores={
269 IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
270 self._config.max_in_memory_upload_chunks
271 ),
272 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
273 self._config.max_in_memory_download_chunks
274 ),
275 },
276 executor_cls=executor_cls,
277 )
279 # The executor responsible for submitting the necessary tasks to
280 # perform the desired transfer
281 self._submission_executor = BoundedExecutor(
282 max_size=self._config.max_submission_queue_size,
283 max_num_threads=self._config.max_submission_concurrency,
284 executor_cls=executor_cls,
285 )
287 # There is one thread available for writing to disk. It will handle
288 # downloads for all files.
289 self._io_executor = BoundedExecutor(
290 max_size=self._config.max_io_queue_size,
291 max_num_threads=1,
292 executor_cls=executor_cls,
293 )
295 # The component responsible for limiting bandwidth usage if it
296 # is configured.
297 self._bandwidth_limiter = None
298 if self._config.max_bandwidth is not None:
299 logger.debug(
300 'Setting max_bandwidth to %s', self._config.max_bandwidth
301 )
302 leaky_bucket = LeakyBucket(self._config.max_bandwidth)
303 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
305 self._register_handlers()
307 @property
308 def client(self):
309 return self._client
311 @property
312 def config(self):
313 return self._config
315 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
316 """Uploads a file to S3
318 :type fileobj: str or seekable file-like object
319 :param fileobj: The name of a file to upload or a seekable file-like
320 object to upload. It is recommended to use a filename because
321 file-like objects may result in higher memory usage.
323 :type bucket: str
324 :param bucket: The name of the bucket to upload to
326 :type key: str
327 :param key: The name of the key to upload to
329 :type extra_args: dict
330 :param extra_args: Extra arguments that may be passed to the
331 client operation
333 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
334 :param subscribers: The list of subscribers to be invoked in the
335 order provided based on the event emit during the process of
336 the transfer request.
338 :rtype: s3transfer.futures.TransferFuture
339 :returns: Transfer future representing the upload
340 """
342 extra_args = extra_args.copy() if extra_args else {}
343 if subscribers is None:
344 subscribers = []
345 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
346 self._validate_if_bucket_supported(bucket)
347 self._add_operation_defaults(extra_args)
348 call_args = CallArgs(
349 fileobj=fileobj,
350 bucket=bucket,
351 key=key,
352 extra_args=extra_args,
353 subscribers=subscribers,
354 )
355 extra_main_kwargs = {}
356 if self._bandwidth_limiter:
357 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
358 return self._submit_transfer(
359 call_args, UploadSubmissionTask, extra_main_kwargs
360 )
362 def download(
363 self, bucket, key, fileobj, extra_args=None, subscribers=None
364 ):
365 """Downloads a file from S3
367 :type bucket: str
368 :param bucket: The name of the bucket to download from
370 :type key: str
371 :param key: The name of the key to download from
373 :type fileobj: str or seekable file-like object
374 :param fileobj: The name of a file to download or a seekable file-like
375 object to download. It is recommended to use a filename because
376 file-like objects may result in higher memory usage.
378 :type extra_args: dict
379 :param extra_args: Extra arguments that may be passed to the
380 client operation
382 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
383 :param subscribers: The list of subscribers to be invoked in the
384 order provided based on the event emit during the process of
385 the transfer request.
387 :rtype: s3transfer.futures.TransferFuture
388 :returns: Transfer future representing the download
389 """
390 if extra_args is None:
391 extra_args = {}
392 if subscribers is None:
393 subscribers = []
394 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
395 self._validate_if_bucket_supported(bucket)
396 call_args = CallArgs(
397 bucket=bucket,
398 key=key,
399 fileobj=fileobj,
400 extra_args=extra_args,
401 subscribers=subscribers,
402 )
403 extra_main_kwargs = {'io_executor': self._io_executor}
404 if self._bandwidth_limiter:
405 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
406 return self._submit_transfer(
407 call_args, DownloadSubmissionTask, extra_main_kwargs
408 )
410 def copy(
411 self,
412 copy_source,
413 bucket,
414 key,
415 extra_args=None,
416 subscribers=None,
417 source_client=None,
418 ):
419 """Copies a file in S3
421 :type copy_source: dict
422 :param copy_source: The name of the source bucket, key name of the
423 source object, and optional version ID of the source object. The
424 dictionary format is:
425 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
426 that the ``VersionId`` key is optional and may be omitted.
428 :type bucket: str
429 :param bucket: The name of the bucket to copy to
431 :type key: str
432 :param key: The name of the key to copy to
434 :type extra_args: dict
435 :param extra_args: Extra arguments that may be passed to the
436 client operation
438 :type subscribers: a list of subscribers
439 :param subscribers: The list of subscribers to be invoked in the
440 order provided based on the event emit during the process of
441 the transfer request.
443 :type source_client: botocore or boto3 Client
444 :param source_client: The client to be used for operation that
445 may happen at the source object. For example, this client is
446 used for the head_object that determines the size of the copy.
447 If no client is provided, the transfer manager's client is used
448 as the client for the source object.
450 :rtype: s3transfer.futures.TransferFuture
451 :returns: Transfer future representing the copy
452 """
453 if extra_args is None:
454 extra_args = {}
455 if subscribers is None:
456 subscribers = []
457 if source_client is None:
458 source_client = self._client
459 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
460 if isinstance(copy_source, dict):
461 self._validate_if_bucket_supported(copy_source.get('Bucket'))
462 self._validate_if_bucket_supported(bucket)
463 call_args = CallArgs(
464 copy_source=copy_source,
465 bucket=bucket,
466 key=key,
467 extra_args=extra_args,
468 subscribers=subscribers,
469 source_client=source_client,
470 )
471 return self._submit_transfer(call_args, CopySubmissionTask)
473 def delete(self, bucket, key, extra_args=None, subscribers=None):
474 """Delete an S3 object.
476 :type bucket: str
477 :param bucket: The name of the bucket.
479 :type key: str
480 :param key: The name of the S3 object to delete.
482 :type extra_args: dict
483 :param extra_args: Extra arguments that may be passed to the
484 DeleteObject call.
486 :type subscribers: list
487 :param subscribers: A list of subscribers to be invoked during the
488 process of the transfer request. Note that the ``on_progress``
489 callback is not invoked during object deletion.
491 :rtype: s3transfer.futures.TransferFuture
492 :return: Transfer future representing the deletion.
494 """
495 if extra_args is None:
496 extra_args = {}
497 if subscribers is None:
498 subscribers = []
499 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
500 self._validate_if_bucket_supported(bucket)
501 call_args = CallArgs(
502 bucket=bucket,
503 key=key,
504 extra_args=extra_args,
505 subscribers=subscribers,
506 )
507 return self._submit_transfer(call_args, DeleteSubmissionTask)
509 def _validate_if_bucket_supported(self, bucket):
510 # s3 high level operations don't support some resources
511 # (eg. S3 Object Lambda) only direct API calls are available
512 # for such resources
513 if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
514 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
515 match = pattern.match(bucket)
516 if match:
517 raise ValueError(
518 f'TransferManager methods do not support {resource} '
519 'resource. Use direct client calls instead.'
520 )
522 def _validate_all_known_args(self, actual, allowed):
523 for kwarg in actual:
524 if kwarg not in allowed:
525 raise ValueError(
526 "Invalid extra_args key '{}', must be one of: {}".format(
527 kwarg, ', '.join(allowed)
528 )
529 )
531 def _add_operation_defaults(self, extra_args):
532 if (
533 self.client.meta.config.request_checksum_calculation
534 == "when_supported"
535 ):
536 set_default_checksum_algorithm(extra_args)
538 def _submit_transfer(
539 self, call_args, submission_task_cls, extra_main_kwargs=None
540 ):
541 if not extra_main_kwargs:
542 extra_main_kwargs = {}
544 # Create a TransferFuture to return back to the user
545 transfer_future, components = self._get_future_with_components(
546 call_args
547 )
549 # Add any provided done callbacks to the created transfer future
550 # to be invoked on the transfer future being complete.
551 for callback in get_callbacks(transfer_future, 'done'):
552 components['coordinator'].add_done_callback(callback)
554 # Get the main kwargs needed to instantiate the submission task
555 main_kwargs = self._get_submission_task_main_kwargs(
556 transfer_future, extra_main_kwargs
557 )
559 # Submit a SubmissionTask that will submit all of the necessary
560 # tasks needed to complete the S3 transfer.
561 self._submission_executor.submit(
562 submission_task_cls(
563 transfer_coordinator=components['coordinator'],
564 main_kwargs=main_kwargs,
565 )
566 )
568 # Increment the unique id counter for future transfer requests
569 self._id_counter += 1
571 return transfer_future
573 def _get_future_with_components(self, call_args):
574 transfer_id = self._id_counter
575 # Creates a new transfer future along with its components
576 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
577 # Track the transfer coordinator for transfers to manage.
578 self._coordinator_controller.add_transfer_coordinator(
579 transfer_coordinator
580 )
581 # Also make sure that the transfer coordinator is removed once
582 # the transfer completes so it does not stick around in memory.
583 transfer_coordinator.add_done_callback(
584 self._coordinator_controller.remove_transfer_coordinator,
585 transfer_coordinator,
586 )
587 components = {
588 'meta': TransferMeta(call_args, transfer_id=transfer_id),
589 'coordinator': transfer_coordinator,
590 }
591 transfer_future = TransferFuture(**components)
592 return transfer_future, components
594 def _get_submission_task_main_kwargs(
595 self, transfer_future, extra_main_kwargs
596 ):
597 main_kwargs = {
598 'client': self._client,
599 'config': self._config,
600 'osutil': self._osutil,
601 'request_executor': self._request_executor,
602 'transfer_future': transfer_future,
603 }
604 main_kwargs.update(extra_main_kwargs)
605 return main_kwargs
607 def _register_handlers(self):
608 # Register handlers to enable/disable callbacks on uploads.
609 event_name = 'request-created.s3'
610 self._client.meta.events.register_first(
611 event_name,
612 signal_not_transferring,
613 unique_id='s3upload-not-transferring',
614 )
615 self._client.meta.events.register_last(
616 event_name, signal_transferring, unique_id='s3upload-transferring'
617 )
619 def __enter__(self):
620 return self
622 def __exit__(self, exc_type, exc_value, *args):
623 cancel = False
624 cancel_msg = ''
625 cancel_exc_type = FatalError
626 # If a exception was raised in the context handler, signal to cancel
627 # all of the inprogress futures in the shutdown.
628 if exc_type:
629 cancel = True
630 cancel_msg = str(exc_value)
631 if not cancel_msg:
632 cancel_msg = repr(exc_value)
633 # If it was a KeyboardInterrupt, the cancellation was initiated
634 # by the user.
635 if isinstance(exc_value, KeyboardInterrupt):
636 cancel_exc_type = CancelledError
637 self._shutdown(cancel, cancel_msg, cancel_exc_type)
639 def shutdown(self, cancel=False, cancel_msg=''):
640 """Shutdown the TransferManager
642 It will wait till all transfers complete before it completely shuts
643 down.
645 :type cancel: boolean
646 :param cancel: If True, calls TransferFuture.cancel() for
647 all in-progress in transfers. This is useful if you want the
648 shutdown to happen quicker.
650 :type cancel_msg: str
651 :param cancel_msg: The message to specify if canceling all in-progress
652 transfers.
653 """
654 self._shutdown(cancel, cancel, cancel_msg)
656 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
657 if cancel:
658 # Cancel all in-flight transfers if requested, before waiting
659 # for them to complete.
660 self._coordinator_controller.cancel(cancel_msg, exc_type)
661 try:
662 # Wait until there are no more in-progress transfers. This is
663 # wrapped in a try statement because this can be interrupted
664 # with a KeyboardInterrupt that needs to be caught.
665 self._coordinator_controller.wait()
666 except KeyboardInterrupt:
667 # If not errors were raised in the try block, the cancel should
668 # have no coordinators it needs to run cancel on. If there was
669 # an error raised in the try statement we want to cancel all of
670 # the inflight transfers before shutting down to speed that
671 # process up.
672 self._coordinator_controller.cancel('KeyboardInterrupt()')
673 raise
674 finally:
675 # Shutdown all of the executors.
676 self._submission_executor.shutdown()
677 self._request_executor.shutdown()
678 self._io_executor.shutdown()
681class TransferCoordinatorController:
682 def __init__(self):
683 """Abstraction to control all transfer coordinators
685 This abstraction allows the manager to wait for inprogress transfers
686 to complete and cancel all inprogress transfers.
687 """
688 self._lock = threading.Lock()
689 self._tracked_transfer_coordinators = set()
691 @property
692 def tracked_transfer_coordinators(self):
693 """The set of transfer coordinators being tracked"""
694 with self._lock:
695 # We return a copy because the set is mutable and if you were to
696 # iterate over the set, it may be changing in length due to
697 # additions and removals of transfer coordinators.
698 return copy.copy(self._tracked_transfer_coordinators)
700 def add_transfer_coordinator(self, transfer_coordinator):
701 """Adds a transfer coordinator of a transfer to be canceled if needed
703 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
704 :param transfer_coordinator: The transfer coordinator for the
705 particular transfer
706 """
707 with self._lock:
708 self._tracked_transfer_coordinators.add(transfer_coordinator)
710 def remove_transfer_coordinator(self, transfer_coordinator):
711 """Remove a transfer coordinator from cancellation consideration
713 Typically, this method is invoked by the transfer coordinator itself
714 to remove its self when it completes its transfer.
716 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
717 :param transfer_coordinator: The transfer coordinator for the
718 particular transfer
719 """
720 with self._lock:
721 self._tracked_transfer_coordinators.remove(transfer_coordinator)
723 def cancel(self, msg='', exc_type=CancelledError):
724 """Cancels all inprogress transfers
726 This cancels the inprogress transfers by calling cancel() on all
727 tracked transfer coordinators.
729 :param msg: The message to pass on to each transfer coordinator that
730 gets cancelled.
732 :param exc_type: The type of exception to set for the cancellation
733 """
734 for transfer_coordinator in self.tracked_transfer_coordinators:
735 transfer_coordinator.cancel(msg, exc_type)
737 def wait(self):
738 """Wait until there are no more inprogress transfers
740 This will not stop when failures are encountered and not propagate any
741 of these errors from failed transfers, but it can be interrupted with
742 a KeyboardInterrupt.
743 """
744 try:
745 transfer_coordinator = None
746 for transfer_coordinator in self.tracked_transfer_coordinators:
747 transfer_coordinator.result()
748 except KeyboardInterrupt:
749 logger.debug('Received KeyboardInterrupt in wait()')
750 # If Keyboard interrupt is raised while waiting for
751 # the result, then exit out of the wait and raise the
752 # exception
753 if transfer_coordinator:
754 logger.debug(
755 'On KeyboardInterrupt was waiting for %s',
756 transfer_coordinator,
757 )
758 raise
759 except Exception:
760 # A general exception could have been thrown because
761 # of result(). We just want to ignore this and continue
762 # because we at least know that the transfer coordinator
763 # has completed.
764 pass