Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/manager.py: 25%
206 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import re
16import threading
18from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
19from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, KB, MB
20from s3transfer.copies import CopySubmissionTask
21from s3transfer.delete import DeleteSubmissionTask
22from s3transfer.download import DownloadSubmissionTask
23from s3transfer.exceptions import CancelledError, FatalError
24from s3transfer.futures import (
25 IN_MEMORY_DOWNLOAD_TAG,
26 IN_MEMORY_UPLOAD_TAG,
27 BoundedExecutor,
28 TransferCoordinator,
29 TransferFuture,
30 TransferMeta,
31)
32from s3transfer.upload import UploadSubmissionTask
33from s3transfer.utils import (
34 CallArgs,
35 OSUtils,
36 SlidingWindowSemaphore,
37 TaskSemaphore,
38 add_s3express_defaults,
39 get_callbacks,
40 signal_not_transferring,
41 signal_transferring,
42)
44logger = logging.getLogger(__name__)
47class TransferConfig:
48 def __init__(
49 self,
50 multipart_threshold=8 * MB,
51 multipart_chunksize=8 * MB,
52 max_request_concurrency=10,
53 max_submission_concurrency=5,
54 max_request_queue_size=1000,
55 max_submission_queue_size=1000,
56 max_io_queue_size=1000,
57 io_chunksize=256 * KB,
58 num_download_attempts=5,
59 max_in_memory_upload_chunks=10,
60 max_in_memory_download_chunks=10,
61 max_bandwidth=None,
62 ):
63 """Configurations for the transfer manager
65 :param multipart_threshold: The threshold for which multipart
66 transfers occur.
68 :param max_request_concurrency: The maximum number of S3 API
69 transfer-related requests that can happen at a time.
71 :param max_submission_concurrency: The maximum number of threads
72 processing a call to a TransferManager method. Processing a
73 call usually entails determining which S3 API requests that need
74 to be enqueued, but does **not** entail making any of the
75 S3 API data transferring requests needed to perform the transfer.
76 The threads controlled by ``max_request_concurrency`` is
77 responsible for that.
79 :param multipart_chunksize: The size of each transfer if a request
80 becomes a multipart transfer.
82 :param max_request_queue_size: The maximum amount of S3 API requests
83 that can be queued at a time.
85 :param max_submission_queue_size: The maximum amount of
86 TransferManager method calls that can be queued at a time.
88 :param max_io_queue_size: The maximum amount of read parts that
89 can be queued to be written to disk per download. The default
90 size for each elementin this queue is 8 KB.
92 :param io_chunksize: The max size of each chunk in the io queue.
93 Currently, this is size used when reading from the downloaded
94 stream as well.
96 :param num_download_attempts: The number of download attempts that
97 will be tried upon errors with downloading an object in S3. Note
98 that these retries account for errors that occur when streaming
99 down the data from s3 (i.e. socket errors and read timeouts that
100 occur after receiving an OK response from s3).
101 Other retryable exceptions such as throttling errors and 5xx errors
102 are already retried by botocore (this default is 5). The
103 ``num_download_attempts`` does not take into account the
104 number of exceptions retried by botocore.
106 :param max_in_memory_upload_chunks: The number of chunks that can
107 be stored in memory at a time for all ongoing upload requests.
108 This pertains to chunks of data that need to be stored in memory
109 during an upload if the data is sourced from a file-like object.
110 The total maximum memory footprint due to a in-memory upload
111 chunks is roughly equal to:
113 max_in_memory_upload_chunks * multipart_chunksize
114 + max_submission_concurrency * multipart_chunksize
116 ``max_submission_concurrency`` has an affect on this value because
117 for each thread pulling data off of a file-like object, they may
118 be waiting with a single read chunk to be submitted for upload
119 because the ``max_in_memory_upload_chunks`` value has been reached
120 by the threads making the upload request.
122 :param max_in_memory_download_chunks: The number of chunks that can
123 be buffered in memory and **not** in the io queue at a time for all
124 ongoing download requests. This pertains specifically to file-like
125 objects that cannot be seeked. The total maximum memory footprint
126 due to a in-memory download chunks is roughly equal to:
128 max_in_memory_download_chunks * multipart_chunksize
130 :param max_bandwidth: The maximum bandwidth that will be consumed
131 in uploading and downloading file content. The value is in terms of
132 bytes per second.
133 """
134 self.multipart_threshold = multipart_threshold
135 self.multipart_chunksize = multipart_chunksize
136 self.max_request_concurrency = max_request_concurrency
137 self.max_submission_concurrency = max_submission_concurrency
138 self.max_request_queue_size = max_request_queue_size
139 self.max_submission_queue_size = max_submission_queue_size
140 self.max_io_queue_size = max_io_queue_size
141 self.io_chunksize = io_chunksize
142 self.num_download_attempts = num_download_attempts
143 self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
144 self.max_in_memory_download_chunks = max_in_memory_download_chunks
145 self.max_bandwidth = max_bandwidth
146 self._validate_attrs_are_nonzero()
148 def _validate_attrs_are_nonzero(self):
149 for attr, attr_val in self.__dict__.items():
150 if attr_val is not None and attr_val <= 0:
151 raise ValueError(
152 'Provided parameter %s of value %s must be greater than '
153 '0.' % (attr, attr_val)
154 )
157class TransferManager:
158 ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
160 ALLOWED_UPLOAD_ARGS = [
161 'ACL',
162 'CacheControl',
163 'ChecksumAlgorithm',
164 'ContentDisposition',
165 'ContentEncoding',
166 'ContentLanguage',
167 'ContentType',
168 'ExpectedBucketOwner',
169 'Expires',
170 'GrantFullControl',
171 'GrantRead',
172 'GrantReadACP',
173 'GrantWriteACP',
174 'Metadata',
175 'ObjectLockLegalHoldStatus',
176 'ObjectLockMode',
177 'ObjectLockRetainUntilDate',
178 'RequestPayer',
179 'ServerSideEncryption',
180 'StorageClass',
181 'SSECustomerAlgorithm',
182 'SSECustomerKey',
183 'SSECustomerKeyMD5',
184 'SSEKMSKeyId',
185 'SSEKMSEncryptionContext',
186 'Tagging',
187 'WebsiteRedirectLocation',
188 ]
190 ALLOWED_COPY_ARGS = ALLOWED_UPLOAD_ARGS + [
191 'CopySourceIfMatch',
192 'CopySourceIfModifiedSince',
193 'CopySourceIfNoneMatch',
194 'CopySourceIfUnmodifiedSince',
195 'CopySourceSSECustomerAlgorithm',
196 'CopySourceSSECustomerKey',
197 'CopySourceSSECustomerKeyMD5',
198 'MetadataDirective',
199 'TaggingDirective',
200 ]
202 ALLOWED_DELETE_ARGS = [
203 'MFA',
204 'VersionId',
205 'RequestPayer',
206 'ExpectedBucketOwner',
207 ]
209 VALIDATE_SUPPORTED_BUCKET_VALUES = True
211 _UNSUPPORTED_BUCKET_PATTERNS = {
212 'S3 Object Lambda': re.compile(
213 r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
214 r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
215 ),
216 }
218 def __init__(self, client, config=None, osutil=None, executor_cls=None):
219 """A transfer manager interface for Amazon S3
221 :param client: Client to be used by the manager
222 :param config: TransferConfig to associate specific configurations
223 :param osutil: OSUtils object to use for os-related behavior when
224 using with transfer manager.
226 :type executor_cls: s3transfer.futures.BaseExecutor
227 :param executor_cls: The class of executor to use with the transfer
228 manager. By default, concurrent.futures.ThreadPoolExecutor is used.
229 """
230 self._client = client
231 self._config = config
232 if config is None:
233 self._config = TransferConfig()
234 self._osutil = osutil
235 if osutil is None:
236 self._osutil = OSUtils()
237 self._coordinator_controller = TransferCoordinatorController()
238 # A counter to create unique id's for each transfer submitted.
239 self._id_counter = 0
241 # The executor responsible for making S3 API transfer requests
242 self._request_executor = BoundedExecutor(
243 max_size=self._config.max_request_queue_size,
244 max_num_threads=self._config.max_request_concurrency,
245 tag_semaphores={
246 IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
247 self._config.max_in_memory_upload_chunks
248 ),
249 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
250 self._config.max_in_memory_download_chunks
251 ),
252 },
253 executor_cls=executor_cls,
254 )
256 # The executor responsible for submitting the necessary tasks to
257 # perform the desired transfer
258 self._submission_executor = BoundedExecutor(
259 max_size=self._config.max_submission_queue_size,
260 max_num_threads=self._config.max_submission_concurrency,
261 executor_cls=executor_cls,
262 )
264 # There is one thread available for writing to disk. It will handle
265 # downloads for all files.
266 self._io_executor = BoundedExecutor(
267 max_size=self._config.max_io_queue_size,
268 max_num_threads=1,
269 executor_cls=executor_cls,
270 )
272 # The component responsible for limiting bandwidth usage if it
273 # is configured.
274 self._bandwidth_limiter = None
275 if self._config.max_bandwidth is not None:
276 logger.debug(
277 'Setting max_bandwidth to %s', self._config.max_bandwidth
278 )
279 leaky_bucket = LeakyBucket(self._config.max_bandwidth)
280 self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
282 self._register_handlers()
284 @property
285 def client(self):
286 return self._client
288 @property
289 def config(self):
290 return self._config
292 def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
293 """Uploads a file to S3
295 :type fileobj: str or seekable file-like object
296 :param fileobj: The name of a file to upload or a seekable file-like
297 object to upload. It is recommended to use a filename because
298 file-like objects may result in higher memory usage.
300 :type bucket: str
301 :param bucket: The name of the bucket to upload to
303 :type key: str
304 :param key: The name of the key to upload to
306 :type extra_args: dict
307 :param extra_args: Extra arguments that may be passed to the
308 client operation
310 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
311 :param subscribers: The list of subscribers to be invoked in the
312 order provided based on the event emit during the process of
313 the transfer request.
315 :rtype: s3transfer.futures.TransferFuture
316 :returns: Transfer future representing the upload
317 """
318 if extra_args is None:
319 extra_args = {}
320 if subscribers is None:
321 subscribers = []
322 self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
323 self._validate_if_bucket_supported(bucket)
324 self._add_operation_defaults(bucket, extra_args)
325 call_args = CallArgs(
326 fileobj=fileobj,
327 bucket=bucket,
328 key=key,
329 extra_args=extra_args,
330 subscribers=subscribers,
331 )
332 extra_main_kwargs = {}
333 if self._bandwidth_limiter:
334 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
335 return self._submit_transfer(
336 call_args, UploadSubmissionTask, extra_main_kwargs
337 )
339 def download(
340 self, bucket, key, fileobj, extra_args=None, subscribers=None
341 ):
342 """Downloads a file from S3
344 :type bucket: str
345 :param bucket: The name of the bucket to download from
347 :type key: str
348 :param key: The name of the key to download from
350 :type fileobj: str or seekable file-like object
351 :param fileobj: The name of a file to download or a seekable file-like
352 object to download. It is recommended to use a filename because
353 file-like objects may result in higher memory usage.
355 :type extra_args: dict
356 :param extra_args: Extra arguments that may be passed to the
357 client operation
359 :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
360 :param subscribers: The list of subscribers to be invoked in the
361 order provided based on the event emit during the process of
362 the transfer request.
364 :rtype: s3transfer.futures.TransferFuture
365 :returns: Transfer future representing the download
366 """
367 if extra_args is None:
368 extra_args = {}
369 if subscribers is None:
370 subscribers = []
371 self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
372 self._validate_if_bucket_supported(bucket)
373 call_args = CallArgs(
374 bucket=bucket,
375 key=key,
376 fileobj=fileobj,
377 extra_args=extra_args,
378 subscribers=subscribers,
379 )
380 extra_main_kwargs = {'io_executor': self._io_executor}
381 if self._bandwidth_limiter:
382 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
383 return self._submit_transfer(
384 call_args, DownloadSubmissionTask, extra_main_kwargs
385 )
387 def copy(
388 self,
389 copy_source,
390 bucket,
391 key,
392 extra_args=None,
393 subscribers=None,
394 source_client=None,
395 ):
396 """Copies a file in S3
398 :type copy_source: dict
399 :param copy_source: The name of the source bucket, key name of the
400 source object, and optional version ID of the source object. The
401 dictionary format is:
402 ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
403 that the ``VersionId`` key is optional and may be omitted.
405 :type bucket: str
406 :param bucket: The name of the bucket to copy to
408 :type key: str
409 :param key: The name of the key to copy to
411 :type extra_args: dict
412 :param extra_args: Extra arguments that may be passed to the
413 client operation
415 :type subscribers: a list of subscribers
416 :param subscribers: The list of subscribers to be invoked in the
417 order provided based on the event emit during the process of
418 the transfer request.
420 :type source_client: botocore or boto3 Client
421 :param source_client: The client to be used for operation that
422 may happen at the source object. For example, this client is
423 used for the head_object that determines the size of the copy.
424 If no client is provided, the transfer manager's client is used
425 as the client for the source object.
427 :rtype: s3transfer.futures.TransferFuture
428 :returns: Transfer future representing the copy
429 """
430 if extra_args is None:
431 extra_args = {}
432 if subscribers is None:
433 subscribers = []
434 if source_client is None:
435 source_client = self._client
436 self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
437 if isinstance(copy_source, dict):
438 self._validate_if_bucket_supported(copy_source.get('Bucket'))
439 self._validate_if_bucket_supported(bucket)
440 call_args = CallArgs(
441 copy_source=copy_source,
442 bucket=bucket,
443 key=key,
444 extra_args=extra_args,
445 subscribers=subscribers,
446 source_client=source_client,
447 )
448 return self._submit_transfer(call_args, CopySubmissionTask)
450 def delete(self, bucket, key, extra_args=None, subscribers=None):
451 """Delete an S3 object.
453 :type bucket: str
454 :param bucket: The name of the bucket.
456 :type key: str
457 :param key: The name of the S3 object to delete.
459 :type extra_args: dict
460 :param extra_args: Extra arguments that may be passed to the
461 DeleteObject call.
463 :type subscribers: list
464 :param subscribers: A list of subscribers to be invoked during the
465 process of the transfer request. Note that the ``on_progress``
466 callback is not invoked during object deletion.
468 :rtype: s3transfer.futures.TransferFuture
469 :return: Transfer future representing the deletion.
471 """
472 if extra_args is None:
473 extra_args = {}
474 if subscribers is None:
475 subscribers = []
476 self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
477 self._validate_if_bucket_supported(bucket)
478 call_args = CallArgs(
479 bucket=bucket,
480 key=key,
481 extra_args=extra_args,
482 subscribers=subscribers,
483 )
484 return self._submit_transfer(call_args, DeleteSubmissionTask)
486 def _validate_if_bucket_supported(self, bucket):
487 # s3 high level operations don't support some resources
488 # (eg. S3 Object Lambda) only direct API calls are available
489 # for such resources
490 if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
491 for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
492 match = pattern.match(bucket)
493 if match:
494 raise ValueError(
495 'TransferManager methods do not support %s '
496 'resource. Use direct client calls instead.' % resource
497 )
499 def _validate_all_known_args(self, actual, allowed):
500 for kwarg in actual:
501 if kwarg not in allowed:
502 raise ValueError(
503 "Invalid extra_args key '%s', "
504 "must be one of: %s" % (kwarg, ', '.join(allowed))
505 )
507 def _add_operation_defaults(self, bucket, extra_args):
508 add_s3express_defaults(bucket, extra_args)
510 def _submit_transfer(
511 self, call_args, submission_task_cls, extra_main_kwargs=None
512 ):
513 if not extra_main_kwargs:
514 extra_main_kwargs = {}
516 # Create a TransferFuture to return back to the user
517 transfer_future, components = self._get_future_with_components(
518 call_args
519 )
521 # Add any provided done callbacks to the created transfer future
522 # to be invoked on the transfer future being complete.
523 for callback in get_callbacks(transfer_future, 'done'):
524 components['coordinator'].add_done_callback(callback)
526 # Get the main kwargs needed to instantiate the submission task
527 main_kwargs = self._get_submission_task_main_kwargs(
528 transfer_future, extra_main_kwargs
529 )
531 # Submit a SubmissionTask that will submit all of the necessary
532 # tasks needed to complete the S3 transfer.
533 self._submission_executor.submit(
534 submission_task_cls(
535 transfer_coordinator=components['coordinator'],
536 main_kwargs=main_kwargs,
537 )
538 )
540 # Increment the unique id counter for future transfer requests
541 self._id_counter += 1
543 return transfer_future
545 def _get_future_with_components(self, call_args):
546 transfer_id = self._id_counter
547 # Creates a new transfer future along with its components
548 transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
549 # Track the transfer coordinator for transfers to manage.
550 self._coordinator_controller.add_transfer_coordinator(
551 transfer_coordinator
552 )
553 # Also make sure that the transfer coordinator is removed once
554 # the transfer completes so it does not stick around in memory.
555 transfer_coordinator.add_done_callback(
556 self._coordinator_controller.remove_transfer_coordinator,
557 transfer_coordinator,
558 )
559 components = {
560 'meta': TransferMeta(call_args, transfer_id=transfer_id),
561 'coordinator': transfer_coordinator,
562 }
563 transfer_future = TransferFuture(**components)
564 return transfer_future, components
566 def _get_submission_task_main_kwargs(
567 self, transfer_future, extra_main_kwargs
568 ):
569 main_kwargs = {
570 'client': self._client,
571 'config': self._config,
572 'osutil': self._osutil,
573 'request_executor': self._request_executor,
574 'transfer_future': transfer_future,
575 }
576 main_kwargs.update(extra_main_kwargs)
577 return main_kwargs
579 def _register_handlers(self):
580 # Register handlers to enable/disable callbacks on uploads.
581 event_name = 'request-created.s3'
582 self._client.meta.events.register_first(
583 event_name,
584 signal_not_transferring,
585 unique_id='s3upload-not-transferring',
586 )
587 self._client.meta.events.register_last(
588 event_name, signal_transferring, unique_id='s3upload-transferring'
589 )
591 def __enter__(self):
592 return self
594 def __exit__(self, exc_type, exc_value, *args):
595 cancel = False
596 cancel_msg = ''
597 cancel_exc_type = FatalError
598 # If a exception was raised in the context handler, signal to cancel
599 # all of the inprogress futures in the shutdown.
600 if exc_type:
601 cancel = True
602 cancel_msg = str(exc_value)
603 if not cancel_msg:
604 cancel_msg = repr(exc_value)
605 # If it was a KeyboardInterrupt, the cancellation was initiated
606 # by the user.
607 if isinstance(exc_value, KeyboardInterrupt):
608 cancel_exc_type = CancelledError
609 self._shutdown(cancel, cancel_msg, cancel_exc_type)
611 def shutdown(self, cancel=False, cancel_msg=''):
612 """Shutdown the TransferManager
614 It will wait till all transfers complete before it completely shuts
615 down.
617 :type cancel: boolean
618 :param cancel: If True, calls TransferFuture.cancel() for
619 all in-progress in transfers. This is useful if you want the
620 shutdown to happen quicker.
622 :type cancel_msg: str
623 :param cancel_msg: The message to specify if canceling all in-progress
624 transfers.
625 """
626 self._shutdown(cancel, cancel, cancel_msg)
628 def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
629 if cancel:
630 # Cancel all in-flight transfers if requested, before waiting
631 # for them to complete.
632 self._coordinator_controller.cancel(cancel_msg, exc_type)
633 try:
634 # Wait until there are no more in-progress transfers. This is
635 # wrapped in a try statement because this can be interrupted
636 # with a KeyboardInterrupt that needs to be caught.
637 self._coordinator_controller.wait()
638 except KeyboardInterrupt:
639 # If not errors were raised in the try block, the cancel should
640 # have no coordinators it needs to run cancel on. If there was
641 # an error raised in the try statement we want to cancel all of
642 # the inflight transfers before shutting down to speed that
643 # process up.
644 self._coordinator_controller.cancel('KeyboardInterrupt()')
645 raise
646 finally:
647 # Shutdown all of the executors.
648 self._submission_executor.shutdown()
649 self._request_executor.shutdown()
650 self._io_executor.shutdown()
653class TransferCoordinatorController:
654 def __init__(self):
655 """Abstraction to control all transfer coordinators
657 This abstraction allows the manager to wait for inprogress transfers
658 to complete and cancel all inprogress transfers.
659 """
660 self._lock = threading.Lock()
661 self._tracked_transfer_coordinators = set()
663 @property
664 def tracked_transfer_coordinators(self):
665 """The set of transfer coordinators being tracked"""
666 with self._lock:
667 # We return a copy because the set is mutable and if you were to
668 # iterate over the set, it may be changing in length due to
669 # additions and removals of transfer coordinators.
670 return copy.copy(self._tracked_transfer_coordinators)
672 def add_transfer_coordinator(self, transfer_coordinator):
673 """Adds a transfer coordinator of a transfer to be canceled if needed
675 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
676 :param transfer_coordinator: The transfer coordinator for the
677 particular transfer
678 """
679 with self._lock:
680 self._tracked_transfer_coordinators.add(transfer_coordinator)
682 def remove_transfer_coordinator(self, transfer_coordinator):
683 """Remove a transfer coordinator from cancellation consideration
685 Typically, this method is invoked by the transfer coordinator itself
686 to remove its self when it completes its transfer.
688 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
689 :param transfer_coordinator: The transfer coordinator for the
690 particular transfer
691 """
692 with self._lock:
693 self._tracked_transfer_coordinators.remove(transfer_coordinator)
695 def cancel(self, msg='', exc_type=CancelledError):
696 """Cancels all inprogress transfers
698 This cancels the inprogress transfers by calling cancel() on all
699 tracked transfer coordinators.
701 :param msg: The message to pass on to each transfer coordinator that
702 gets cancelled.
704 :param exc_type: The type of exception to set for the cancellation
705 """
706 for transfer_coordinator in self.tracked_transfer_coordinators:
707 transfer_coordinator.cancel(msg, exc_type)
709 def wait(self):
710 """Wait until there are no more inprogress transfers
712 This will not stop when failures are encountered and not propagate any
713 of these errors from failed transfers, but it can be interrupted with
714 a KeyboardInterrupt.
715 """
716 try:
717 transfer_coordinator = None
718 for transfer_coordinator in self.tracked_transfer_coordinators:
719 transfer_coordinator.result()
720 except KeyboardInterrupt:
721 logger.debug('Received KeyboardInterrupt in wait()')
722 # If Keyboard interrupt is raised while waiting for
723 # the result, then exit out of the wait and raise the
724 # exception
725 if transfer_coordinator:
726 logger.debug(
727 'On KeyboardInterrupt was waiting for %s',
728 transfer_coordinator,
729 )
730 raise
731 except Exception:
732 # A general exception could have been thrown because
733 # of result(). We just want to ignore this and continue
734 # because we at least know that the transfer coordinator
735 # has completed.
736 pass