1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import re
16import threading
17
18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
19from s3transfer.constants import (
20 ALLOWED_DOWNLOAD_ARGS,
21 FULL_OBJECT_CHECKSUM_ARGS,
22 KB,
23 MB,
24)
25from s3transfer.copies import CopySubmissionTask
26from s3transfer.delete import DeleteSubmissionTask
27from s3transfer.download import DownloadSubmissionTask
28from s3transfer.exceptions import CancelledError, FatalError
29from s3transfer.futures import (
30 IN_MEMORY_DOWNLOAD_TAG,
31 IN_MEMORY_UPLOAD_TAG,
32 BoundedExecutor,
33 TransferCoordinator,
34 TransferFuture,
35 TransferMeta,
36)
37from s3transfer.upload import UploadSubmissionTask
38from s3transfer.utils import (
39 CallArgs,
40 OSUtils,
41 SlidingWindowSemaphore,
42 TaskSemaphore,
43 get_callbacks,
44 set_default_checksum_algorithm,
45 signal_not_transferring,
46 signal_transferring,
47)
48
49logger = logging.getLogger(__name__)
50
51
52class TransferConfig:
53 def __init__(
54 self,
55 multipart_threshold=8 * MB,
56 multipart_chunksize=8 * MB,
57 max_request_concurrency=10,
58 max_submission_concurrency=5,
59 max_request_queue_size=1000,
60 max_submission_queue_size=1000,
61 max_io_queue_size=1000,
62 io_chunksize=256 * KB,
63 num_download_attempts=5,
64 max_in_memory_upload_chunks=10,
65 max_in_memory_download_chunks=10,
66 max_bandwidth=None,
67 ):
68 """Configurations for the transfer manager
69
70 :param multipart_threshold: The threshold for which multipart
71 transfers occur.
72
73 :param max_request_concurrency: The maximum number of S3 API
74 transfer-related requests that can happen at a time.
75
76 :param max_submission_concurrency: The maximum number of threads
77 processing a call to a TransferManager method. Processing a
78 call usually entails determining which S3 API requests that need
79 to be enqueued, but does **not** entail making any of the
80 S3 API data transferring requests needed to perform the transfer.
81 The threads controlled by ``max_request_concurrency`` is
82 responsible for that.
83
84 :param multipart_chunksize: The size of each transfer if a request
85 becomes a multipart transfer.
86
87 :param max_request_queue_size: The maximum amount of S3 API requests
88 that can be queued at a time.
89
90 :param max_submission_queue_size: The maximum amount of
91 TransferManager method calls that can be queued at a time.
92
93 :param max_io_queue_size: The maximum amount of read parts that
94 can be queued to be written to disk per download. The default
95 size for each elementin this queue is 8 KB.
96
97 :param io_chunksize: The max size of each chunk in the io queue.
98 Currently, this is size used when reading from the downloaded
99 stream as well.
100
101 :param num_download_attempts: The number of download attempts that
102 will be tried upon errors with downloading an object in S3. Note
103 that these retries account for errors that occur when streaming
104 down the data from s3 (i.e. socket errors and read timeouts that
105 occur after receiving an OK response from s3).
106 Other retryable exceptions such as throttling errors and 5xx errors
107 are already retried by botocore (this default is 5). The
108 ``num_download_attempts`` does not take into account the
109 number of exceptions retried by botocore.
110
111 :param max_in_memory_upload_chunks: The number of chunks that can
112 be stored in memory at a time for all ongoing upload requests.
113 This pertains to chunks of data that need to be stored in memory
114 during an upload if the data is sourced from a file-like object.
115 The total maximum memory footprint due to a in-memory upload
116 chunks is roughly equal to:
117
118 max_in_memory_upload_chunks * multipart_chunksize
119 + max_submission_concurrency * multipart_chunksize
120
121 ``max_submission_concurrency`` has an affect on this value because
122 for each thread pulling data off of a file-like object, they may
123 be waiting with a single read chunk to be submitted for upload
124 because the ``max_in_memory_upload_chunks`` value has been reached
125 by the threads making the upload request.
126
127 :param max_in_memory_download_chunks: The number of chunks that can
128 be buffered in memory and **not** in the io queue at a time for all
129 ongoing download requests. This pertains specifically to file-like
130 objects that cannot be seeked. The total maximum memory footprint
131 due to a in-memory download chunks is roughly equal to:
132
133 max_in_memory_download_chunks * multipart_chunksize
134
135 :param max_bandwidth: The maximum bandwidth that will be consumed
136 in uploading and downloading file content. The value is in terms of
137 bytes per second.
138 """
139 self.multipart_threshold = multipart_threshold
140 self.multipart_chunksize = multipart_chunksize
141 self.max_request_concurrency = max_request_concurrency
142 self.max_submission_concurrency = max_submission_concurrency
143 self.max_request_queue_size = max_request_queue_size
144 self.max_submission_queue_size = max_submission_queue_size
145 self.max_io_queue_size = max_io_queue_size
146 self.io_chunksize = io_chunksize
147 self.num_download_attempts = num_download_attempts
148 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
149 self.max_in_memory_download_chunks = max_in_memory_download_chunks
150 self.max_bandwidth = max_bandwidth
151 self._validate_attrs_are_nonzero()
152
153 def _validate_attrs_are_nonzero(self):
154 for attr, attr_val in self.__dict__.items():
155 if attr_val is not None and attr_val <= 0:
156 raise ValueError(
157 f'Provided parameter {attr} of value {attr_val} must '
158 'be greater than 0.'
159 )
160
161
162class TransferManager:
163 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
164
165 _ALLOWED_SHARED_ARGS = [
166 'ACL',
167 'CacheControl',
168 'ChecksumAlgorithm',
169 'ContentDisposition',
170 'ContentEncoding',
171 'ContentLanguage',
172 'ContentType',
173 'ExpectedBucketOwner',
174 'Expires',
175 'GrantFullControl',
176 'GrantRead',
177 'GrantReadACP',
178 'GrantWriteACP',
179 'Metadata',
180 'ObjectLockLegalHoldStatus',
181 'ObjectLockMode',
182 'ObjectLockRetainUntilDate',
183 'RequestPayer',
184 'ServerSideEncryption',
185 'StorageClass',
186 'SSECustomerAlgorithm',
187 'SSECustomerKey',
188 'SSECustomerKeyMD5',
189 'SSEKMSKeyId',
190 'SSEKMSEncryptionContext',
191 'Tagging',
192 'WebsiteRedirectLocation',
193 ]
194
195 ALLOWED_UPLOAD_ARGS = (
196 _ALLOWED_SHARED_ARGS
197 + [
198 'ChecksumType',
199 'MpuObjectSize',
200 ]
201 + FULL_OBJECT_CHECKSUM_ARGS
202 )
203
204 ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [
205 'CopySourceIfMatch',
206 'CopySourceIfModifiedSince',
207 'CopySourceIfNoneMatch',
208 'CopySourceIfUnmodifiedSince',
209 'CopySourceSSECustomerAlgorithm',
210 'CopySourceSSECustomerKey',
211 'CopySourceSSECustomerKeyMD5',
212 'MetadataDirective',
213 'TaggingDirective',
214 ]
215
216 ALLOWED_DELETE_ARGS = [
217 'MFA',
218 'VersionId',
219 'RequestPayer',
220 'ExpectedBucketOwner',
221 ]
222
223 VALIDATE_SUPPORTED_BUCKET_VALUES = True
224
225 _UNSUPPORTED_BUCKET_PATTERNS = {
226 'S3 Object Lambda': re.compile(
227 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
228 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
229 ),
230 }
231
232 def __init__(self, client, config=None, osutil=None, executor_cls=None):
233 """A transfer manager interface for Amazon S3
234
235 :param client: Client to be used by the manager
236 :param config: TransferConfig to associate specific configurations
237 :param osutil: OSUtils object to use for os-related behavior when
238 using with transfer manager.
239
240 :type executor_cls: s3transfer.futures.BaseExecutor
241 :param executor_cls: The class of executor to use with the transfer
242 manager. By default, concurrent.futures.ThreadPoolExecutor is used.
243 """
244 self._client = client
245 self._config = config
246 if config is None:
247 self._config = TransferConfig()
248 self._osutil = osutil
249 if osutil is None:
250 self._osutil = OSUtils()
251 self._coordinator_controller = TransferCoordinatorController()
252 # A counter to create unique id's for each transfer submitted.
253 self._id_counter = 0
254
255 # The executor responsible for making S3 API transfer requests
256 self._request_executor = BoundedExecutor(
257 max_size=self._config.max_request_queue_size,
258 max_num_threads=self._config.max_request_concurrency,
259 tag_semaphores={
260 IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
261 self._config.max_in_memory_upload_chunks
262 ),
263 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
264 self._config.max_in_memory_download_chunks
265 ),
266 },
267 executor_cls=executor_cls,
268 )
269
270 # The executor responsible for submitting the necessary tasks to
271 # perform the desired transfer
272 self._submission_executor = BoundedExecutor(
273 max_size=self._config.max_submission_queue_size,
274 max_num_threads=self._config.max_submission_concurrency,
275 executor_cls=executor_cls,
276 )
277
278 # There is one thread available for writing to disk. It will handle
279 # downloads for all files.
280 self._io_executor = BoundedExecutor(
281 max_size=self._config.max_io_queue_size,
282 max_num_threads=1,
283 executor_cls=executor_cls,
284 )
285
286 # The component responsible for limiting bandwidth usage if it
287 # is configured.
288 self._bandwidth_limiter = None
289 if self._config.max_bandwidth is not None:
290 logger.debug(
291 'Setting max_bandwidth to %s', self._config.max_bandwidth
292 )
293 leaky_bucket = LeakyBucket(self._config.max_bandwidth)
294 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
295
296 self._register_handlers()
297
298 @property
299 def client(self):
300 return self._client
301
302 @property
303 def config(self):
304 return self._config
305
306 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
307 """Uploads a file to S3
308
309 :type fileobj: str or seekable file-like object
310 :param fileobj: The name of a file to upload or a seekable file-like
311 object to upload. It is recommended to use a filename because
312 file-like objects may result in higher memory usage.
313
314 :type bucket: str
315 :param bucket: The name of the bucket to upload to
316
317 :type key: str
318 :param key: The name of the key to upload to
319
320 :type extra_args: dict
321 :param extra_args: Extra arguments that may be passed to the
322 client operation
323
324 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
325 :param subscribers: The list of subscribers to be invoked in the
326 order provided based on the event emit during the process of
327 the transfer request.
328
329 :rtype: s3transfer.futures.TransferFuture
330 :returns: Transfer future representing the upload
331 """
332
333 extra_args = extra_args.copy() if extra_args else {}
334 if subscribers is None:
335 subscribers = []
336 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
337 self._validate_if_bucket_supported(bucket)
338 self._add_operation_defaults(extra_args)
339 call_args = CallArgs(
340 fileobj=fileobj,
341 bucket=bucket,
342 key=key,
343 extra_args=extra_args,
344 subscribers=subscribers,
345 )
346 extra_main_kwargs = {}
347 if self._bandwidth_limiter:
348 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
349 return self._submit_transfer(
350 call_args, UploadSubmissionTask, extra_main_kwargs
351 )
352
353 def download(
354 self, bucket, key, fileobj, extra_args=None, subscribers=None
355 ):
356 """Downloads a file from S3
357
358 :type bucket: str
359 :param bucket: The name of the bucket to download from
360
361 :type key: str
362 :param key: The name of the key to download from
363
364 :type fileobj: str or seekable file-like object
365 :param fileobj: The name of a file to download or a seekable file-like
366 object to download. It is recommended to use a filename because
367 file-like objects may result in higher memory usage.
368
369 :type extra_args: dict
370 :param extra_args: Extra arguments that may be passed to the
371 client operation
372
373 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
374 :param subscribers: The list of subscribers to be invoked in the
375 order provided based on the event emit during the process of
376 the transfer request.
377
378 :rtype: s3transfer.futures.TransferFuture
379 :returns: Transfer future representing the download
380 """
381 if extra_args is None:
382 extra_args = {}
383 if subscribers is None:
384 subscribers = []
385 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
386 self._validate_if_bucket_supported(bucket)
387 call_args = CallArgs(
388 bucket=bucket,
389 key=key,
390 fileobj=fileobj,
391 extra_args=extra_args,
392 subscribers=subscribers,
393 )
394 extra_main_kwargs = {'io_executor': self._io_executor}
395 if self._bandwidth_limiter:
396 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
397 return self._submit_transfer(
398 call_args, DownloadSubmissionTask, extra_main_kwargs
399 )
400
401 def copy(
402 self,
403 copy_source,
404 bucket,
405 key,
406 extra_args=None,
407 subscribers=None,
408 source_client=None,
409 ):
410 """Copies a file in S3
411
412 :type copy_source: dict
413 :param copy_source: The name of the source bucket, key name of the
414 source object, and optional version ID of the source object. The
415 dictionary format is:
416 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
417 that the ``VersionId`` key is optional and may be omitted.
418
419 :type bucket: str
420 :param bucket: The name of the bucket to copy to
421
422 :type key: str
423 :param key: The name of the key to copy to
424
425 :type extra_args: dict
426 :param extra_args: Extra arguments that may be passed to the
427 client operation
428
429 :type subscribers: a list of subscribers
430 :param subscribers: The list of subscribers to be invoked in the
431 order provided based on the event emit during the process of
432 the transfer request.
433
434 :type source_client: botocore or boto3 Client
435 :param source_client: The client to be used for operation that
436 may happen at the source object. For example, this client is
437 used for the head_object that determines the size of the copy.
438 If no client is provided, the transfer manager's client is used
439 as the client for the source object.
440
441 :rtype: s3transfer.futures.TransferFuture
442 :returns: Transfer future representing the copy
443 """
444 if extra_args is None:
445 extra_args = {}
446 if subscribers is None:
447 subscribers = []
448 if source_client is None:
449 source_client = self._client
450 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
451 if isinstance(copy_source, dict):
452 self._validate_if_bucket_supported(copy_source.get('Bucket'))
453 self._validate_if_bucket_supported(bucket)
454 call_args = CallArgs(
455 copy_source=copy_source,
456 bucket=bucket,
457 key=key,
458 extra_args=extra_args,
459 subscribers=subscribers,
460 source_client=source_client,
461 )
462 return self._submit_transfer(call_args, CopySubmissionTask)
463
464 def delete(self, bucket, key, extra_args=None, subscribers=None):
465 """Delete an S3 object.
466
467 :type bucket: str
468 :param bucket: The name of the bucket.
469
470 :type key: str
471 :param key: The name of the S3 object to delete.
472
473 :type extra_args: dict
474 :param extra_args: Extra arguments that may be passed to the
475 DeleteObject call.
476
477 :type subscribers: list
478 :param subscribers: A list of subscribers to be invoked during the
479 process of the transfer request. Note that the ``on_progress``
480 callback is not invoked during object deletion.
481
482 :rtype: s3transfer.futures.TransferFuture
483 :return: Transfer future representing the deletion.
484
485 """
486 if extra_args is None:
487 extra_args = {}
488 if subscribers is None:
489 subscribers = []
490 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
491 self._validate_if_bucket_supported(bucket)
492 call_args = CallArgs(
493 bucket=bucket,
494 key=key,
495 extra_args=extra_args,
496 subscribers=subscribers,
497 )
498 return self._submit_transfer(call_args, DeleteSubmissionTask)
499
500 def _validate_if_bucket_supported(self, bucket):
501 # s3 high level operations don't support some resources
502 # (eg. S3 Object Lambda) only direct API calls are available
503 # for such resources
504 if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
505 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
506 match = pattern.match(bucket)
507 if match:
508 raise ValueError(
509 f'TransferManager methods do not support {resource} '
510 'resource. Use direct client calls instead.'
511 )
512
513 def _validate_all_known_args(self, actual, allowed):
514 for kwarg in actual:
515 if kwarg not in allowed:
516 raise ValueError(
517 "Invalid extra_args key '{}', must be one of: {}".format(
518 kwarg, ', '.join(allowed)
519 )
520 )
521
522 def _add_operation_defaults(self, extra_args):
523 if (
524 self.client.meta.config.request_checksum_calculation
525 == "when_supported"
526 ):
527 set_default_checksum_algorithm(extra_args)
528
529 def _submit_transfer(
530 self, call_args, submission_task_cls, extra_main_kwargs=None
531 ):
532 if not extra_main_kwargs:
533 extra_main_kwargs = {}
534
535 # Create a TransferFuture to return back to the user
536 transfer_future, components = self._get_future_with_components(
537 call_args
538 )
539
540 # Add any provided done callbacks to the created transfer future
541 # to be invoked on the transfer future being complete.
542 for callback in get_callbacks(transfer_future, 'done'):
543 components['coordinator'].add_done_callback(callback)
544
545 # Get the main kwargs needed to instantiate the submission task
546 main_kwargs = self._get_submission_task_main_kwargs(
547 transfer_future, extra_main_kwargs
548 )
549
550 # Submit a SubmissionTask that will submit all of the necessary
551 # tasks needed to complete the S3 transfer.
552 self._submission_executor.submit(
553 submission_task_cls(
554 transfer_coordinator=components['coordinator'],
555 main_kwargs=main_kwargs,
556 )
557 )
558
559 # Increment the unique id counter for future transfer requests
560 self._id_counter += 1
561
562 return transfer_future
563
564 def _get_future_with_components(self, call_args):
565 transfer_id = self._id_counter
566 # Creates a new transfer future along with its components
567 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
568 # Track the transfer coordinator for transfers to manage.
569 self._coordinator_controller.add_transfer_coordinator(
570 transfer_coordinator
571 )
572 # Also make sure that the transfer coordinator is removed once
573 # the transfer completes so it does not stick around in memory.
574 transfer_coordinator.add_done_callback(
575 self._coordinator_controller.remove_transfer_coordinator,
576 transfer_coordinator,
577 )
578 components = {
579 'meta': TransferMeta(call_args, transfer_id=transfer_id),
580 'coordinator': transfer_coordinator,
581 }
582 transfer_future = TransferFuture(**components)
583 return transfer_future, components
584
585 def _get_submission_task_main_kwargs(
586 self, transfer_future, extra_main_kwargs
587 ):
588 main_kwargs = {
589 'client': self._client,
590 'config': self._config,
591 'osutil': self._osutil,
592 'request_executor': self._request_executor,
593 'transfer_future': transfer_future,
594 }
595 main_kwargs.update(extra_main_kwargs)
596 return main_kwargs
597
598 def _register_handlers(self):
599 # Register handlers to enable/disable callbacks on uploads.
600 event_name = 'request-created.s3'
601 self._client.meta.events.register_first(
602 event_name,
603 signal_not_transferring,
604 unique_id='s3upload-not-transferring',
605 )
606 self._client.meta.events.register_last(
607 event_name, signal_transferring, unique_id='s3upload-transferring'
608 )
609
610 def __enter__(self):
611 return self
612
613 def __exit__(self, exc_type, exc_value, *args):
614 cancel = False
615 cancel_msg = ''
616 cancel_exc_type = FatalError
617 # If a exception was raised in the context handler, signal to cancel
618 # all of the inprogress futures in the shutdown.
619 if exc_type:
620 cancel = True
621 cancel_msg = str(exc_value)
622 if not cancel_msg:
623 cancel_msg = repr(exc_value)
624 # If it was a KeyboardInterrupt, the cancellation was initiated
625 # by the user.
626 if isinstance(exc_value, KeyboardInterrupt):
627 cancel_exc_type = CancelledError
628 self._shutdown(cancel, cancel_msg, cancel_exc_type)
629
630 def shutdown(self, cancel=False, cancel_msg=''):
631 """Shutdown the TransferManager
632
633 It will wait till all transfers complete before it completely shuts
634 down.
635
636 :type cancel: boolean
637 :param cancel: If True, calls TransferFuture.cancel() for
638 all in-progress in transfers. This is useful if you want the
639 shutdown to happen quicker.
640
641 :type cancel_msg: str
642 :param cancel_msg: The message to specify if canceling all in-progress
643 transfers.
644 """
645 self._shutdown(cancel, cancel, cancel_msg)
646
647 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
648 if cancel:
649 # Cancel all in-flight transfers if requested, before waiting
650 # for them to complete.
651 self._coordinator_controller.cancel(cancel_msg, exc_type)
652 try:
653 # Wait until there are no more in-progress transfers. This is
654 # wrapped in a try statement because this can be interrupted
655 # with a KeyboardInterrupt that needs to be caught.
656 self._coordinator_controller.wait()
657 except KeyboardInterrupt:
658 # If not errors were raised in the try block, the cancel should
659 # have no coordinators it needs to run cancel on. If there was
660 # an error raised in the try statement we want to cancel all of
661 # the inflight transfers before shutting down to speed that
662 # process up.
663 self._coordinator_controller.cancel('KeyboardInterrupt()')
664 raise
665 finally:
666 # Shutdown all of the executors.
667 self._submission_executor.shutdown()
668 self._request_executor.shutdown()
669 self._io_executor.shutdown()
670
671
672class TransferCoordinatorController:
673 def __init__(self):
674 """Abstraction to control all transfer coordinators
675
676 This abstraction allows the manager to wait for inprogress transfers
677 to complete and cancel all inprogress transfers.
678 """
679 self._lock = threading.Lock()
680 self._tracked_transfer_coordinators = set()
681
682 @property
683 def tracked_transfer_coordinators(self):
684 """The set of transfer coordinators being tracked"""
685 with self._lock:
686 # We return a copy because the set is mutable and if you were to
687 # iterate over the set, it may be changing in length due to
688 # additions and removals of transfer coordinators.
689 return copy.copy(self._tracked_transfer_coordinators)
690
691 def add_transfer_coordinator(self, transfer_coordinator):
692 """Adds a transfer coordinator of a transfer to be canceled if needed
693
694 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
695 :param transfer_coordinator: The transfer coordinator for the
696 particular transfer
697 """
698 with self._lock:
699 self._tracked_transfer_coordinators.add(transfer_coordinator)
700
701 def remove_transfer_coordinator(self, transfer_coordinator):
702 """Remove a transfer coordinator from cancellation consideration
703
704 Typically, this method is invoked by the transfer coordinator itself
705 to remove its self when it completes its transfer.
706
707 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
708 :param transfer_coordinator: The transfer coordinator for the
709 particular transfer
710 """
711 with self._lock:
712 self._tracked_transfer_coordinators.remove(transfer_coordinator)
713
714 def cancel(self, msg='', exc_type=CancelledError):
715 """Cancels all inprogress transfers
716
717 This cancels the inprogress transfers by calling cancel() on all
718 tracked transfer coordinators.
719
720 :param msg: The message to pass on to each transfer coordinator that
721 gets cancelled.
722
723 :param exc_type: The type of exception to set for the cancellation
724 """
725 for transfer_coordinator in self.tracked_transfer_coordinators:
726 transfer_coordinator.cancel(msg, exc_type)
727
728 def wait(self):
729 """Wait until there are no more inprogress transfers
730
731 This will not stop when failures are encountered and not propagate any
732 of these errors from failed transfers, but it can be interrupted with
733 a KeyboardInterrupt.
734 """
735 try:
736 transfer_coordinator = None
737 for transfer_coordinator in self.tracked_transfer_coordinators:
738 transfer_coordinator.result()
739 except KeyboardInterrupt:
740 logger.debug('Received KeyboardInterrupt in wait()')
741 # If Keyboard interrupt is raised while waiting for
742 # the result, then exit out of the wait and raise the
743 # exception
744 if transfer_coordinator:
745 logger.debug(
746 'On KeyboardInterrupt was waiting for %s',
747 transfer_coordinator,
748 )
749 raise
750 except Exception:
751 # A general exception could have been thrown because
752 # of result(). We just want to ignore this and continue
753 # because we at least know that the transfer coordinator
754 # has completed.
755 pass