Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/futures.py: 38%
265 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import sys
16import threading
17from collections import namedtuple
18from concurrent import futures
20from s3transfer.compat import MAXINT
21from s3transfer.exceptions import CancelledError, TransferNotDoneError
22from s3transfer.utils import FunctionContainer, TaskSemaphore
24logger = logging.getLogger(__name__)
27class BaseTransferFuture:
28 @property
29 def meta(self):
30 """The metadata associated to the TransferFuture"""
31 raise NotImplementedError('meta')
33 def done(self):
34 """Determines if a TransferFuture has completed
36 :returns: True if completed. False, otherwise.
37 """
38 raise NotImplementedError('done()')
40 def result(self):
41 """Waits until TransferFuture is done and returns the result
43 If the TransferFuture succeeded, it will return the result. If the
44 TransferFuture failed, it will raise the exception associated to the
45 failure.
46 """
47 raise NotImplementedError('result()')
49 def cancel(self):
50 """Cancels the request associated with the TransferFuture"""
51 raise NotImplementedError('cancel()')
54class BaseTransferMeta:
55 @property
56 def call_args(self):
57 """The call args used in the transfer request"""
58 raise NotImplementedError('call_args')
60 @property
61 def transfer_id(self):
62 """The unique id of the transfer"""
63 raise NotImplementedError('transfer_id')
65 @property
66 def user_context(self):
67 """A dictionary that requesters can store data in"""
68 raise NotImplementedError('user_context')
71class TransferFuture(BaseTransferFuture):
72 def __init__(self, meta=None, coordinator=None):
73 """The future associated to a submitted transfer request
75 :type meta: TransferMeta
76 :param meta: The metadata associated to the request. This object
77 is visible to the requester.
79 :type coordinator: TransferCoordinator
80 :param coordinator: The coordinator associated to the request. This
81 object is not visible to the requester.
82 """
83 self._meta = meta
84 if meta is None:
85 self._meta = TransferMeta()
87 self._coordinator = coordinator
88 if coordinator is None:
89 self._coordinator = TransferCoordinator()
91 @property
92 def meta(self):
93 return self._meta
95 def done(self):
96 return self._coordinator.done()
98 def result(self):
99 try:
100 # Usually the result() method blocks until the transfer is done,
101 # however if a KeyboardInterrupt is raised we want want to exit
102 # out of this and propagate the exception.
103 return self._coordinator.result()
104 except KeyboardInterrupt as e:
105 self.cancel()
106 raise e
108 def cancel(self):
109 self._coordinator.cancel()
111 def set_exception(self, exception):
112 """Sets the exception on the future."""
113 if not self.done():
114 raise TransferNotDoneError(
115 'set_exception can only be called once the transfer is '
116 'complete.'
117 )
118 self._coordinator.set_exception(exception, override=True)
121class TransferMeta(BaseTransferMeta):
122 """Holds metadata about the TransferFuture"""
124 def __init__(self, call_args=None, transfer_id=None):
125 self._call_args = call_args
126 self._transfer_id = transfer_id
127 self._size = None
128 self._user_context = {}
130 @property
131 def call_args(self):
132 """The call args used in the transfer request"""
133 return self._call_args
135 @property
136 def transfer_id(self):
137 """The unique id of the transfer"""
138 return self._transfer_id
140 @property
141 def size(self):
142 """The size of the transfer request if known"""
143 return self._size
145 @property
146 def user_context(self):
147 """A dictionary that requesters can store data in"""
148 return self._user_context
150 def provide_transfer_size(self, size):
151 """A method to provide the size of a transfer request
153 By providing this value, the TransferManager will not try to
154 call HeadObject or use the use OS to determine the size of the
155 transfer.
156 """
157 self._size = size
160class TransferCoordinator:
161 """A helper class for managing TransferFuture"""
163 def __init__(self, transfer_id=None):
164 self.transfer_id = transfer_id
165 self._status = 'not-started'
166 self._result = None
167 self._exception = None
168 self._associated_futures = set()
169 self._failure_cleanups = []
170 self._done_callbacks = []
171 self._done_event = threading.Event()
172 self._lock = threading.Lock()
173 self._associated_futures_lock = threading.Lock()
174 self._done_callbacks_lock = threading.Lock()
175 self._failure_cleanups_lock = threading.Lock()
177 def __repr__(self):
178 return '{}(transfer_id={})'.format(
179 self.__class__.__name__, self.transfer_id
180 )
182 @property
183 def exception(self):
184 return self._exception
186 @property
187 def associated_futures(self):
188 """The list of futures associated to the inprogress TransferFuture
190 Once the transfer finishes this list becomes empty as the transfer
191 is considered done and there should be no running futures left.
192 """
193 with self._associated_futures_lock:
194 # We return a copy of the list because we do not want to
195 # processing the returned list while another thread is adding
196 # more futures to the actual list.
197 return copy.copy(self._associated_futures)
199 @property
200 def failure_cleanups(self):
201 """The list of callbacks to call when the TransferFuture fails"""
202 return self._failure_cleanups
204 @property
205 def status(self):
206 """The status of the TransferFuture
208 The currently supported states are:
209 * not-started - Has yet to start. If in this state, a transfer
210 can be canceled immediately and nothing will happen.
211 * queued - SubmissionTask is about to submit tasks
212 * running - Is inprogress. In-progress as of now means that
213 the SubmissionTask that runs the transfer is being executed. So
214 there is no guarantee any transfer requests had been made to
215 S3 if this state is reached.
216 * cancelled - Was cancelled
217 * failed - An exception other than CancelledError was thrown
218 * success - No exceptions were thrown and is done.
219 """
220 return self._status
222 def set_result(self, result):
223 """Set a result for the TransferFuture
225 Implies that the TransferFuture succeeded. This will always set a
226 result because it is invoked on the final task where there is only
227 ever one final task and it is ran at the very end of a transfer
228 process. So if a result is being set for this final task, the transfer
229 succeeded even if something came a long and canceled the transfer
230 on the final task.
231 """
232 with self._lock:
233 self._exception = None
234 self._result = result
235 self._status = 'success'
237 def set_exception(self, exception, override=False):
238 """Set an exception for the TransferFuture
240 Implies the TransferFuture failed.
242 :param exception: The exception that cause the transfer to fail.
243 :param override: If True, override any existing state.
244 """
245 with self._lock:
246 if not self.done() or override:
247 self._exception = exception
248 self._status = 'failed'
250 def result(self):
251 """Waits until TransferFuture is done and returns the result
253 If the TransferFuture succeeded, it will return the result. If the
254 TransferFuture failed, it will raise the exception associated to the
255 failure.
256 """
257 # Doing a wait() with no timeout cannot be interrupted in python2 but
258 # can be interrupted in python3 so we just wait with the largest
259 # possible value integer value, which is on the scale of billions of
260 # years...
261 self._done_event.wait(MAXINT)
263 # Once done waiting, raise an exception if present or return the
264 # final result.
265 if self._exception:
266 raise self._exception
267 return self._result
269 def cancel(self, msg='', exc_type=CancelledError):
270 """Cancels the TransferFuture
272 :param msg: The message to attach to the cancellation
273 :param exc_type: The type of exception to set for the cancellation
274 """
275 with self._lock:
276 if not self.done():
277 should_announce_done = False
278 logger.debug('%s cancel(%s) called', self, msg)
279 self._exception = exc_type(msg)
280 if self._status == 'not-started':
281 should_announce_done = True
282 self._status = 'cancelled'
283 if should_announce_done:
284 self.announce_done()
286 def set_status_to_queued(self):
287 """Sets the TransferFutrue's status to running"""
288 self._transition_to_non_done_state('queued')
290 def set_status_to_running(self):
291 """Sets the TransferFuture's status to running"""
292 self._transition_to_non_done_state('running')
294 def _transition_to_non_done_state(self, desired_state):
295 with self._lock:
296 if self.done():
297 raise RuntimeError(
298 'Unable to transition from done state %s to non-done '
299 'state %s.' % (self.status, desired_state)
300 )
301 self._status = desired_state
303 def submit(self, executor, task, tag=None):
304 """Submits a task to a provided executor
306 :type executor: s3transfer.futures.BoundedExecutor
307 :param executor: The executor to submit the callable to
309 :type task: s3transfer.tasks.Task
310 :param task: The task to submit to the executor
312 :type tag: s3transfer.futures.TaskTag
313 :param tag: A tag to associate to the submitted task
315 :rtype: concurrent.futures.Future
316 :returns: A future representing the submitted task
317 """
318 logger.debug(
319 "Submitting task {} to executor {} for transfer request: {}.".format(
320 task, executor, self.transfer_id
321 )
322 )
323 future = executor.submit(task, tag=tag)
324 # Add this created future to the list of associated future just
325 # in case it is needed during cleanups.
326 self.add_associated_future(future)
327 future.add_done_callback(
328 FunctionContainer(self.remove_associated_future, future)
329 )
330 return future
332 def done(self):
333 """Determines if a TransferFuture has completed
335 :returns: False if status is equal to 'failed', 'cancelled', or
336 'success'. True, otherwise
337 """
338 return self.status in ['failed', 'cancelled', 'success']
340 def add_associated_future(self, future):
341 """Adds a future to be associated with the TransferFuture"""
342 with self._associated_futures_lock:
343 self._associated_futures.add(future)
345 def remove_associated_future(self, future):
346 """Removes a future's association to the TransferFuture"""
347 with self._associated_futures_lock:
348 self._associated_futures.remove(future)
350 def add_done_callback(self, function, *args, **kwargs):
351 """Add a done callback to be invoked when transfer is done"""
352 with self._done_callbacks_lock:
353 self._done_callbacks.append(
354 FunctionContainer(function, *args, **kwargs)
355 )
357 def add_failure_cleanup(self, function, *args, **kwargs):
358 """Adds a callback to call upon failure"""
359 with self._failure_cleanups_lock:
360 self._failure_cleanups.append(
361 FunctionContainer(function, *args, **kwargs)
362 )
364 def announce_done(self):
365 """Announce that future is done running and run associated callbacks
367 This will run any failure cleanups if the transfer failed if not
368 they have not been run, allows the result() to be unblocked, and will
369 run any done callbacks associated to the TransferFuture if they have
370 not already been ran.
371 """
372 if self.status != 'success':
373 self._run_failure_cleanups()
374 self._done_event.set()
375 self._run_done_callbacks()
377 def _run_done_callbacks(self):
378 # Run the callbacks and remove the callbacks from the internal
379 # list so they do not get ran again if done is announced more than
380 # once.
381 with self._done_callbacks_lock:
382 self._run_callbacks(self._done_callbacks)
383 self._done_callbacks = []
385 def _run_failure_cleanups(self):
386 # Run the cleanup callbacks and remove the callbacks from the internal
387 # list so they do not get ran again if done is announced more than
388 # once.
389 with self._failure_cleanups_lock:
390 self._run_callbacks(self.failure_cleanups)
391 self._failure_cleanups = []
393 def _run_callbacks(self, callbacks):
394 for callback in callbacks:
395 self._run_callback(callback)
397 def _run_callback(self, callback):
398 try:
399 callback()
400 # We do not want a callback interrupting the process, especially
401 # in the failure cleanups. So log and catch, the exception.
402 except Exception:
403 logger.debug("Exception raised in %s." % callback, exc_info=True)
406class BoundedExecutor:
407 EXECUTOR_CLS = futures.ThreadPoolExecutor
409 def __init__(
410 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
411 ):
412 """An executor implementation that has a maximum queued up tasks
414 The executor will block if the number of tasks that have been
415 submitted and is currently working on is past its maximum.
417 :params max_size: The maximum number of inflight futures. An inflight
418 future means that the task is either queued up or is currently
419 being executed. A size of None or 0 means that the executor will
420 have no bound in terms of the number of inflight futures.
422 :params max_num_threads: The maximum number of threads the executor
423 uses.
425 :type tag_semaphores: dict
426 :params tag_semaphores: A dictionary where the key is the name of the
427 tag and the value is the semaphore to use when limiting the
428 number of tasks the executor is processing at a time.
430 :type executor_cls: BaseExecutor
431 :param underlying_executor_cls: The executor class that
432 get bounded by this executor. If None is provided, the
433 concurrent.futures.ThreadPoolExecutor class is used.
434 """
435 self._max_num_threads = max_num_threads
436 if executor_cls is None:
437 executor_cls = self.EXECUTOR_CLS
438 self._executor = executor_cls(max_workers=self._max_num_threads)
439 self._semaphore = TaskSemaphore(max_size)
440 self._tag_semaphores = tag_semaphores
442 def submit(self, task, tag=None, block=True):
443 """Submit a task to complete
445 :type task: s3transfer.tasks.Task
446 :param task: The task to run __call__ on
449 :type tag: s3transfer.futures.TaskTag
450 :param tag: An optional tag to associate to the task. This
451 is used to override which semaphore to use.
453 :type block: boolean
454 :param block: True if to wait till it is possible to submit a task.
455 False, if not to wait and raise an error if not able to submit
456 a task.
458 :returns: The future associated to the submitted task
459 """
460 semaphore = self._semaphore
461 # If a tag was provided, use the semaphore associated to that
462 # tag.
463 if tag:
464 semaphore = self._tag_semaphores[tag]
466 # Call acquire on the semaphore.
467 acquire_token = semaphore.acquire(task.transfer_id, block)
468 # Create a callback to invoke when task is done in order to call
469 # release on the semaphore.
470 release_callback = FunctionContainer(
471 semaphore.release, task.transfer_id, acquire_token
472 )
473 # Submit the task to the underlying executor.
474 future = ExecutorFuture(self._executor.submit(task))
475 # Add the Semaphore.release() callback to the future such that
476 # it is invoked once the future completes.
477 future.add_done_callback(release_callback)
478 return future
480 def shutdown(self, wait=True):
481 self._executor.shutdown(wait)
484class ExecutorFuture:
485 def __init__(self, future):
486 """A future returned from the executor
488 Currently, it is just a wrapper around a concurrent.futures.Future.
489 However, this can eventually grow to implement the needed functionality
490 of concurrent.futures.Future if we move off of the library and not
491 affect the rest of the codebase.
493 :type future: concurrent.futures.Future
494 :param future: The underlying future
495 """
496 self._future = future
498 def result(self):
499 return self._future.result()
501 def add_done_callback(self, fn):
502 """Adds a callback to be completed once future is done
504 :param fn: A callable that takes no arguments. Note that is different
505 than concurrent.futures.Future.add_done_callback that requires
506 a single argument for the future.
507 """
508 # The done callback for concurrent.futures.Future will always pass a
509 # the future in as the only argument. So we need to create the
510 # proper signature wrapper that will invoke the callback provided.
511 def done_callback(future_passed_to_callback):
512 return fn()
514 self._future.add_done_callback(done_callback)
516 def done(self):
517 return self._future.done()
520class BaseExecutor:
521 """Base Executor class implementation needed to work with s3transfer"""
523 def __init__(self, max_workers=None):
524 pass
526 def submit(self, fn, *args, **kwargs):
527 raise NotImplementedError('submit()')
529 def shutdown(self, wait=True):
530 raise NotImplementedError('shutdown()')
533class NonThreadedExecutor(BaseExecutor):
534 """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
536 def submit(self, fn, *args, **kwargs):
537 future = NonThreadedExecutorFuture()
538 try:
539 result = fn(*args, **kwargs)
540 future.set_result(result)
541 except Exception:
542 e, tb = sys.exc_info()[1:]
543 logger.debug(
544 'Setting exception for %s to %s with traceback %s',
545 future,
546 e,
547 tb,
548 )
549 future.set_exception_info(e, tb)
550 return future
552 def shutdown(self, wait=True):
553 pass
556class NonThreadedExecutorFuture:
557 """The Future returned from NonThreadedExecutor
559 Note that this future is **not** thread-safe as it is being used
560 from the context of a non-threaded environment.
561 """
563 def __init__(self):
564 self._result = None
565 self._exception = None
566 self._traceback = None
567 self._done = False
568 self._done_callbacks = []
570 def set_result(self, result):
571 self._result = result
572 self._set_done()
574 def set_exception_info(self, exception, traceback):
575 self._exception = exception
576 self._traceback = traceback
577 self._set_done()
579 def result(self, timeout=None):
580 if self._exception:
581 raise self._exception.with_traceback(self._traceback)
582 return self._result
584 def _set_done(self):
585 self._done = True
586 for done_callback in self._done_callbacks:
587 self._invoke_done_callback(done_callback)
588 self._done_callbacks = []
590 def _invoke_done_callback(self, done_callback):
591 return done_callback(self)
593 def done(self):
594 return self._done
596 def add_done_callback(self, fn):
597 if self._done:
598 self._invoke_done_callback(fn)
599 else:
600 self._done_callbacks.append(fn)
603TaskTag = namedtuple('TaskTag', ['name'])
605IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
606IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')