1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import re
16import threading
17
18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
19from s3transfer.constants import (
20 ALLOWED_DOWNLOAD_ARGS,
21 FULL_OBJECT_CHECKSUM_ARGS,
22 KB,
23 MB,
24)
25from s3transfer.copies import CopySubmissionTask
26from s3transfer.delete import DeleteSubmissionTask
27from s3transfer.download import DownloadSubmissionTask
28from s3transfer.exceptions import CancelledError, FatalError
29from s3transfer.futures import (
30 IN_MEMORY_DOWNLOAD_TAG,
31 IN_MEMORY_UPLOAD_TAG,
32 BoundedExecutor,
33 TransferCoordinator,
34 TransferFuture,
35 TransferMeta,
36)
37from s3transfer.upload import UploadSubmissionTask
38from s3transfer.utils import (
39 CallArgs,
40 OSUtils,
41 SlidingWindowSemaphore,
42 TaskSemaphore,
43 get_callbacks,
44 set_default_checksum_algorithm,
45 signal_not_transferring,
46 signal_transferring,
47)
48
49logger = logging.getLogger(__name__)
50
51
52class TransferConfig:
53 def __init__(
54 self,
55 multipart_threshold=8 * MB,
56 multipart_chunksize=8 * MB,
57 max_request_concurrency=10,
58 max_submission_concurrency=5,
59 max_request_queue_size=1000,
60 max_submission_queue_size=1000,
61 max_io_queue_size=1000,
62 io_chunksize=256 * KB,
63 num_download_attempts=5,
64 max_in_memory_upload_chunks=10,
65 max_in_memory_download_chunks=10,
66 max_bandwidth=None,
67 ):
68 """Configurations for the transfer manager
69
70 :param multipart_threshold: The threshold for which multipart
71 transfers occur.
72
73 :param max_request_concurrency: The maximum number of S3 API
74 transfer-related requests that can happen at a time.
75
76 :param max_submission_concurrency: The maximum number of threads
77 processing a call to a TransferManager method. Processing a
78 call usually entails determining which S3 API requests that need
79 to be enqueued, but does **not** entail making any of the
80 S3 API data transferring requests needed to perform the transfer.
81 The threads controlled by ``max_request_concurrency`` is
82 responsible for that.
83
84 :param multipart_chunksize: The size of each transfer if a request
85 becomes a multipart transfer.
86
87 :param max_request_queue_size: The maximum amount of S3 API requests
88 that can be queued at a time.
89
90 :param max_submission_queue_size: The maximum amount of
91 TransferManager method calls that can be queued at a time.
92
93 :param max_io_queue_size: The maximum amount of read parts that
94 can be queued to be written to disk per download. The default
95 size for each elementin this queue is 8 KB.
96
97 :param io_chunksize: The max size of each chunk in the io queue.
98 Currently, this is size used when reading from the downloaded
99 stream as well.
100
101 :param num_download_attempts: The number of download attempts that
102 will be tried upon errors with downloading an object in S3. Note
103 that these retries account for errors that occur when streaming
104 down the data from s3 (i.e. socket errors and read timeouts that
105 occur after receiving an OK response from s3).
106 Other retryable exceptions such as throttling errors and 5xx errors
107 are already retried by botocore (this default is 5). The
108 ``num_download_attempts`` does not take into account the
109 number of exceptions retried by botocore.
110
111 :param max_in_memory_upload_chunks: The number of chunks that can
112 be stored in memory at a time for all ongoing upload requests.
113 This pertains to chunks of data that need to be stored in memory
114 during an upload if the data is sourced from a file-like object.
115 The total maximum memory footprint due to a in-memory upload
116 chunks is roughly equal to:
117
118 max_in_memory_upload_chunks * multipart_chunksize
119 + max_submission_concurrency * multipart_chunksize
120
121 ``max_submission_concurrency`` has an affect on this value because
122 for each thread pulling data off of a file-like object, they may
123 be waiting with a single read chunk to be submitted for upload
124 because the ``max_in_memory_upload_chunks`` value has been reached
125 by the threads making the upload request.
126
127 :param max_in_memory_download_chunks: The number of chunks that can
128 be buffered in memory and **not** in the io queue at a time for all
129 ongoing download requests. This pertains specifically to file-like
130 objects that cannot be seeked. The total maximum memory footprint
131 due to a in-memory download chunks is roughly equal to:
132
133 max_in_memory_download_chunks * multipart_chunksize
134
135 :param max_bandwidth: The maximum bandwidth that will be consumed
136 in uploading and downloading file content. The value is in terms of
137 bytes per second.
138 """
139 self.multipart_threshold = multipart_threshold
140 self.multipart_chunksize = multipart_chunksize
141 self.max_request_concurrency = max_request_concurrency
142 self.max_submission_concurrency = max_submission_concurrency
143 self.max_request_queue_size = max_request_queue_size
144 self.max_submission_queue_size = max_submission_queue_size
145 self.max_io_queue_size = max_io_queue_size
146 self.io_chunksize = io_chunksize
147 self.num_download_attempts = num_download_attempts
148 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
149 self.max_in_memory_download_chunks = max_in_memory_download_chunks
150 self.max_bandwidth = max_bandwidth
151 self._validate_attrs_are_nonzero()
152
153 def _validate_attrs_are_nonzero(self):
154 for attr, attr_val in self.__dict__.items():
155 if attr_val is not None and attr_val <= 0:
156 raise ValueError(
157 f'Provided parameter {attr} of value {attr_val} must '
158 'be greater than 0.'
159 )
160
161
162class TransferManager:
163 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
164
165 _ALLOWED_SHARED_ARGS = [
166 'ACL',
167 'CacheControl',
168 'ChecksumAlgorithm',
169 'ContentDisposition',
170 'ContentEncoding',
171 'ContentLanguage',
172 'ContentType',
173 'ExpectedBucketOwner',
174 'Expires',
175 'GrantFullControl',
176 'GrantRead',
177 'GrantReadACP',
178 'GrantWriteACP',
179 'Metadata',
180 'ObjectLockLegalHoldStatus',
181 'ObjectLockMode',
182 'ObjectLockRetainUntilDate',
183 'RequestPayer',
184 'ServerSideEncryption',
185 'StorageClass',
186 'SSECustomerAlgorithm',
187 'SSECustomerKey',
188 'SSECustomerKeyMD5',
189 'SSEKMSKeyId',
190 'SSEKMSEncryptionContext',
191 'Tagging',
192 'WebsiteRedirectLocation',
193 ]
194
195 ALLOWED_UPLOAD_ARGS = (
196 _ALLOWED_SHARED_ARGS
197 + [
198 'ChecksumType',
199 'MpuObjectSize',
200 ]
201 + FULL_OBJECT_CHECKSUM_ARGS
202 )
203
204 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [
205 'CopySourceIfMatch',
206 'CopySourceIfModifiedSince',
207 'CopySourceIfNoneMatch',
208 'CopySourceIfUnmodifiedSince',
209 'CopySourceSSECustomerAlgorithm',
210 'CopySourceSSECustomerKey',
211 'CopySourceSSECustomerKeyMD5',
212 'MetadataDirective',
213 'TaggingDirective',
214 ]
215
216 ALLOWED_DELETE_ARGS = [
217 'MFA',
218 'VersionId',
219 'RequestPayer',
220 'ExpectedBucketOwner',
221 ]
222
223 VALIDATE_SUPPORTED_BUCKET_VALUES = True
224
225 _UNSUPPORTED_BUCKET_PATTERNS = {
226 'S3 Object Lambda': re.compile(
227 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
228 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
229 ),
230 }
231
232 def __init__(self, client, config=None, osutil=None, executor_cls=None):
233 """A transfer manager interface for Amazon S3
234
235 :param client: Client to be used by the manager
236 :param config: TransferConfig to associate specific configurations
237 :param osutil: OSUtils object to use for os-related behavior when
238 using with transfer manager.
239
240 :type executor_cls: s3transfer.futures.BaseExecutor
241 :param executor_cls: The class of executor to use with the transfer
242 manager. By default, concurrent.futures.ThreadPoolExecutor is used.
243 """
244 self._client = client
245 self._config = config
246 if config is None:
247 self._config = TransferConfig()
248 self._osutil = osutil
249 if osutil is None:
250 self._osutil = OSUtils()
251 self._coordinator_controller = TransferCoordinatorController()
252 # A counter to create unique id's for each transfer submitted.
253 self._id_counter = 0
254
255 # The executor responsible for making S3 API transfer requests
256 self._request_executor = BoundedExecutor(
257 max_size=self._config.max_request_queue_size,
258 max_num_threads=self._config.max_request_concurrency,
259 tag_semaphores={
260 IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
261 self._config.max_in_memory_upload_chunks
262 ),
263 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
264 self._config.max_in_memory_download_chunks
265 ),
266 },
267 executor_cls=executor_cls,
268 )
269
270 # The executor responsible for submitting the necessary tasks to
271 # perform the desired transfer
272 self._submission_executor = BoundedExecutor(
273 max_size=self._config.max_submission_queue_size,
274 max_num_threads=self._config.max_submission_concurrency,
275 executor_cls=executor_cls,
276 )
277
278 # There is one thread available for writing to disk. It will handle
279 # downloads for all files.
280 self._io_executor = BoundedExecutor(
281 max_size=self._config.max_io_queue_size,
282 max_num_threads=1,
283 executor_cls=executor_cls,
284 )
285
286 # The component responsible for limiting bandwidth usage if it
287 # is configured.
288 self._bandwidth_limiter = None
289 if self._config.max_bandwidth is not None:
290 logger.debug(
291 'Setting max_bandwidth to %s', self._config.max_bandwidth
292 )
293 leaky_bucket = LeakyBucket(self._config.max_bandwidth)
294 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
295
296 self._register_handlers()
297
298 @property
299 def client(self):
300 return self._client
301
302 @property
303 def config(self):
304 return self._config
305
306 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
307 """Uploads a file to S3
308
309 :type fileobj: str or seekable file-like object
310 :param fileobj: The name of a file to upload or a seekable file-like
311 object to upload. It is recommended to use a filename because
312 file-like objects may result in higher memory usage.
313
314 :type bucket: str
315 :param bucket: The name of the bucket to upload to
316
317 :type key: str
318 :param key: The name of the key to upload to
319
320 :type extra_args: dict
321 :param extra_args: Extra arguments that may be passed to the
322 client operation
323
324 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
325 :param subscribers: The list of subscribers to be invoked in the
326 order provided based on the event emit during the process of
327 the transfer request.
328
329 :rtype: s3transfer.futures.TransferFuture
330 :returns: Transfer future representing the upload
331 """
332
333 extra_args = extra_args.copy() if extra_args else {}
334 if subscribers is None:
335 subscribers = []
336 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
337 self._validate_if_bucket_supported(bucket)
338 self._add_operation_defaults(extra_args)
339 call_args = CallArgs(
340 fileobj=fileobj,
341 bucket=bucket,
342 key=key,
343 extra_args=extra_args,
344 subscribers=subscribers,
345 )
346 extra_main_kwargs = {}
347 if self._bandwidth_limiter:
348 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
349 return self._submit_transfer(
350 call_args, UploadSubmissionTask, extra_main_kwargs
351 )
352
353 def download(
354 self, bucket, key, fileobj, extra_args=None, subscribers=None
355 ):
356 """Downloads a file from S3
357
358 :type bucket: str
359 :param bucket: The name of the bucket to download from
360
361 :type key: str
362 :param key: The name of the key to download from
363
364 :type fileobj: str or seekable file-like object
365 :param fileobj: The name of a file to download or a seekable file-like
366 object to download. It is recommended to use a filename because
367 file-like objects may result in higher memory usage.
368
369 :type extra_args: dict
370 :param extra_args: Extra arguments that may be passed to the
371 client operation
372
373 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
374 :param subscribers: The list of subscribers to be invoked in the
375 order provided based on the event emit during the process of
376 the transfer request.
377
378 :rtype: s3transfer.futures.TransferFuture
379 :returns: Transfer future representing the download
380 """
381 if extra_args is None:
382 extra_args = {}
383 if subscribers is None:
384 subscribers = []
385 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
386 self._validate_if_bucket_supported(bucket)
387 call_args = CallArgs(
388 bucket=bucket,
389 key=key,
390 fileobj=fileobj,
391 extra_args=extra_args,
392 subscribers=subscribers,
393 )
394 extra_main_kwargs = {'io_executor': self._io_executor}
395 if self._bandwidth_limiter:
396 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
397 return self._submit_transfer(
398 call_args, DownloadSubmissionTask, extra_main_kwargs
399 )
400
401 def copy(
402 self,
403 copy_source,
404 bucket,
405 key,
406 extra_args=None,
407 subscribers=None,
408 source_client=None,
409 ):
410 """Copies a file in S3
411
412 :type copy_source: dict
413 :param copy_source: The name of the source bucket, key name of the
414 source object, and optional version ID of the source object. The
415 dictionary format is:
416 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
417 that the ``VersionId`` key is optional and may be omitted.
418
419 :type bucket: str
420 :param bucket: The name of the bucket to copy to
421
422 :type key: str
423 :param key: The name of the key to copy to
424
425 :type extra_args: dict
426 :param extra_args: Extra arguments that may be passed to the
427 client operation
428
429 :type subscribers: a list of subscribers
430 :param subscribers: The list of subscribers to be invoked in the
431 order provided based on the event emit during the process of
432 the transfer request.
433
434 :type source_client: botocore or boto3 Client
435 :param source_client: The client to be used for operation that
436 may happen at the source object. For example, this client is
437 used for the head_object that determines the size of the copy.
438 If no client is provided, the transfer manager's client is used
439 as the client for the source object.
440
441 :rtype: s3transfer.futures.TransferFuture
442 :returns: Transfer future representing the copy
443 """
444 if extra_args is None:
445 extra_args = {}
446 if subscribers is None:
447 subscribers = []
448 if source_client is None:
449 source_client = self._client
450 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
451 if isinstance(copy_source, dict):
452 self._validate_if_bucket_supported(copy_source.get('Bucket'))
453 self._validate_if_bucket_supported(bucket)
454 call_args = CallArgs(
455 copy_source=copy_source,
456 bucket=bucket,
457 key=key,
458 extra_args=extra_args,
459 subscribers=subscribers,
460 source_client=source_client,
461 )
462 return self._submit_transfer(call_args, CopySubmissionTask)
463
464 def delete(self, bucket, key, extra_args=None, subscribers=None):
465 """Delete an S3 object.
466
467 :type bucket: str
468 :param bucket: The name of the bucket.
469
470 :type key: str
471 :param key: The name of the S3 object to delete.
472
473 :type extra_args: dict
474 :param extra_args: Extra arguments that may be passed to the
475 DeleteObject call.
476
477 :type subscribers: list
478 :param subscribers: A list of subscribers to be invoked during the
479 process of the transfer request. Note that the ``on_progress``
480 callback is not invoked during object deletion.
481
482 :rtype: s3transfer.futures.TransferFuture
483 :return: Transfer future representing the deletion.
484
485 """
486 if extra_args is None:
487 extra_args = {}
488 if subscribers is None:
489 subscribers = []
490 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
491 self._validate_if_bucket_supported(bucket)
492 call_args = CallArgs(
493 bucket=bucket,
494 key=key,
495 extra_args=extra_args,
496 subscribers=subscribers,
497 )
498 return self._submit_transfer(call_args, DeleteSubmissionTask)
499
500 def _validate_if_bucket_supported(self, bucket):
501 # s3 high level operations don't support some resources
502 # (eg. S3 Object Lambda) only direct API calls are available
503 # for such resources
504 if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
505 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
506 match = pattern.match(bucket)
507 if match:
508 raise ValueError(
509 f'TransferManager methods do not support {resource} '
510 'resource. Use direct client calls instead.'
511 )
512
513 def _validate_all_known_args(self, actual, allowed):
514 for kwarg in actual:
515 if kwarg not in allowed:
516 raise ValueError(
517 "Invalid extra_args key '{}', "
518 "must be one of: {}".format(kwarg, ', '.join(allowed))
519 )
520
521 def _add_operation_defaults(self, extra_args):
522 if (
523 self.client.meta.config.request_checksum_calculation
524 == "when_supported"
525 ):
526 set_default_checksum_algorithm(extra_args)
527
528 def _submit_transfer(
529 self, call_args, submission_task_cls, extra_main_kwargs=None
530 ):
531 if not extra_main_kwargs:
532 extra_main_kwargs = {}
533
534 # Create a TransferFuture to return back to the user
535 transfer_future, components = self._get_future_with_components(
536 call_args
537 )
538
539 # Add any provided done callbacks to the created transfer future
540 # to be invoked on the transfer future being complete.
541 for callback in get_callbacks(transfer_future, 'done'):
542 components['coordinator'].add_done_callback(callback)
543
544 # Get the main kwargs needed to instantiate the submission task
545 main_kwargs = self._get_submission_task_main_kwargs(
546 transfer_future, extra_main_kwargs
547 )
548
549 # Submit a SubmissionTask that will submit all of the necessary
550 # tasks needed to complete the S3 transfer.
551 self._submission_executor.submit(
552 submission_task_cls(
553 transfer_coordinator=components['coordinator'],
554 main_kwargs=main_kwargs,
555 )
556 )
557
558 # Increment the unique id counter for future transfer requests
559 self._id_counter += 1
560
561 return transfer_future
562
563 def _get_future_with_components(self, call_args):
564 transfer_id = self._id_counter
565 # Creates a new transfer future along with its components
566 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
567 # Track the transfer coordinator for transfers to manage.
568 self._coordinator_controller.add_transfer_coordinator(
569 transfer_coordinator
570 )
571 # Also make sure that the transfer coordinator is removed once
572 # the transfer completes so it does not stick around in memory.
573 transfer_coordinator.add_done_callback(
574 self._coordinator_controller.remove_transfer_coordinator,
575 transfer_coordinator,
576 )
577 components = {
578 'meta': TransferMeta(call_args, transfer_id=transfer_id),
579 'coordinator': transfer_coordinator,
580 }
581 transfer_future = TransferFuture(**components)
582 return transfer_future, components
583
584 def _get_submission_task_main_kwargs(
585 self, transfer_future, extra_main_kwargs
586 ):
587 main_kwargs = {
588 'client': self._client,
589 'config': self._config,
590 'osutil': self._osutil,
591 'request_executor': self._request_executor,
592 'transfer_future': transfer_future,
593 }
594 main_kwargs.update(extra_main_kwargs)
595 return main_kwargs
596
597 def _register_handlers(self):
598 # Register handlers to enable/disable callbacks on uploads.
599 event_name = 'request-created.s3'
600 self._client.meta.events.register_first(
601 event_name,
602 signal_not_transferring,
603 unique_id='s3upload-not-transferring',
604 )
605 self._client.meta.events.register_last(
606 event_name, signal_transferring, unique_id='s3upload-transferring'
607 )
608
609 def __enter__(self):
610 return self
611
612 def __exit__(self, exc_type, exc_value, *args):
613 cancel = False
614 cancel_msg = ''
615 cancel_exc_type = FatalError
616 # If a exception was raised in the context handler, signal to cancel
617 # all of the inprogress futures in the shutdown.
618 if exc_type:
619 cancel = True
620 cancel_msg = str(exc_value)
621 if not cancel_msg:
622 cancel_msg = repr(exc_value)
623 # If it was a KeyboardInterrupt, the cancellation was initiated
624 # by the user.
625 if isinstance(exc_value, KeyboardInterrupt):
626 cancel_exc_type = CancelledError
627 self._shutdown(cancel, cancel_msg, cancel_exc_type)
628
629 def shutdown(self, cancel=False, cancel_msg=''):
630 """Shutdown the TransferManager
631
632 It will wait till all transfers complete before it completely shuts
633 down.
634
635 :type cancel: boolean
636 :param cancel: If True, calls TransferFuture.cancel() for
637 all in-progress in transfers. This is useful if you want the
638 shutdown to happen quicker.
639
640 :type cancel_msg: str
641 :param cancel_msg: The message to specify if canceling all in-progress
642 transfers.
643 """
644 self._shutdown(cancel, cancel, cancel_msg)
645
646 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
647 if cancel:
648 # Cancel all in-flight transfers if requested, before waiting
649 # for them to complete.
650 self._coordinator_controller.cancel(cancel_msg, exc_type)
651 try:
652 # Wait until there are no more in-progress transfers. This is
653 # wrapped in a try statement because this can be interrupted
654 # with a KeyboardInterrupt that needs to be caught.
655 self._coordinator_controller.wait()
656 except KeyboardInterrupt:
657 # If not errors were raised in the try block, the cancel should
658 # have no coordinators it needs to run cancel on. If there was
659 # an error raised in the try statement we want to cancel all of
660 # the inflight transfers before shutting down to speed that
661 # process up.
662 self._coordinator_controller.cancel('KeyboardInterrupt()')
663 raise
664 finally:
665 # Shutdown all of the executors.
666 self._submission_executor.shutdown()
667 self._request_executor.shutdown()
668 self._io_executor.shutdown()
669
670
671class TransferCoordinatorController:
672 def __init__(self):
673 """Abstraction to control all transfer coordinators
674
675 This abstraction allows the manager to wait for inprogress transfers
676 to complete and cancel all inprogress transfers.
677 """
678 self._lock = threading.Lock()
679 self._tracked_transfer_coordinators = set()
680
681 @property
682 def tracked_transfer_coordinators(self):
683 """The set of transfer coordinators being tracked"""
684 with self._lock:
685 # We return a copy because the set is mutable and if you were to
686 # iterate over the set, it may be changing in length due to
687 # additions and removals of transfer coordinators.
688 return copy.copy(self._tracked_transfer_coordinators)
689
690 def add_transfer_coordinator(self, transfer_coordinator):
691 """Adds a transfer coordinator of a transfer to be canceled if needed
692
693 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
694 :param transfer_coordinator: The transfer coordinator for the
695 particular transfer
696 """
697 with self._lock:
698 self._tracked_transfer_coordinators.add(transfer_coordinator)
699
700 def remove_transfer_coordinator(self, transfer_coordinator):
701 """Remove a transfer coordinator from cancellation consideration
702
703 Typically, this method is invoked by the transfer coordinator itself
704 to remove its self when it completes its transfer.
705
706 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
707 :param transfer_coordinator: The transfer coordinator for the
708 particular transfer
709 """
710 with self._lock:
711 self._tracked_transfer_coordinators.remove(transfer_coordinator)
712
713 def cancel(self, msg='', exc_type=CancelledError):
714 """Cancels all inprogress transfers
715
716 This cancels the inprogress transfers by calling cancel() on all
717 tracked transfer coordinators.
718
719 :param msg: The message to pass on to each transfer coordinator that
720 gets cancelled.
721
722 :param exc_type: The type of exception to set for the cancellation
723 """
724 for transfer_coordinator in self.tracked_transfer_coordinators:
725 transfer_coordinator.cancel(msg, exc_type)
726
727 def wait(self):
728 """Wait until there are no more inprogress transfers
729
730 This will not stop when failures are encountered and not propagate any
731 of these errors from failed transfers, but it can be interrupted with
732 a KeyboardInterrupt.
733 """
734 try:
735 transfer_coordinator = None
736 for transfer_coordinator in self.tracked_transfer_coordinators:
737 transfer_coordinator.result()
738 except KeyboardInterrupt:
739 logger.debug('Received KeyboardInterrupt in wait()')
740 # If Keyboard interrupt is raised while waiting for
741 # the result, then exit out of the wait and raise the
742 # exception
743 if transfer_coordinator:
744 logger.debug(
745 'On KeyboardInterrupt was waiting for %s',
746 transfer_coordinator,
747 )
748 raise
749 except Exception:
750 # A general exception could have been thrown because
751 # of result(). We just want to ignore this and continue
752 # because we at least know that the transfer coordinator
753 # has completed.
754 pass