Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/futures.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import sys
16import threading
17from collections import namedtuple
18from concurrent import futures
20from s3transfer.compat import MAXINT
21from s3transfer.exceptions import CancelledError, TransferNotDoneError
22from s3transfer.utils import FunctionContainer, TaskSemaphore
24try:
25 from botocore.context import get_context
26except ImportError:
28 def get_context():
29 return None
32logger = logging.getLogger(__name__)
35class BaseTransferFuture:
36 @property
37 def meta(self):
38 """The metadata associated to the TransferFuture"""
39 raise NotImplementedError('meta')
41 def done(self):
42 """Determines if a TransferFuture has completed
44 :returns: True if completed. False, otherwise.
45 """
46 raise NotImplementedError('done()')
48 def result(self):
49 """Waits until TransferFuture is done and returns the result
51 If the TransferFuture succeeded, it will return the result. If the
52 TransferFuture failed, it will raise the exception associated to the
53 failure.
54 """
55 raise NotImplementedError('result()')
57 def cancel(self):
58 """Cancels the request associated with the TransferFuture"""
59 raise NotImplementedError('cancel()')
62class BaseTransferMeta:
63 @property
64 def call_args(self):
65 """The call args used in the transfer request"""
66 raise NotImplementedError('call_args')
68 @property
69 def transfer_id(self):
70 """The unique id of the transfer"""
71 raise NotImplementedError('transfer_id')
73 @property
74 def user_context(self):
75 """A dictionary that requesters can store data in"""
76 raise NotImplementedError('user_context')
79class TransferFuture(BaseTransferFuture):
80 def __init__(self, meta=None, coordinator=None):
81 """The future associated to a submitted transfer request
83 :type meta: TransferMeta
84 :param meta: The metadata associated to the request. This object
85 is visible to the requester.
87 :type coordinator: TransferCoordinator
88 :param coordinator: The coordinator associated to the request. This
89 object is not visible to the requester.
90 """
91 self._meta = meta
92 if meta is None:
93 self._meta = TransferMeta()
95 self._coordinator = coordinator
96 if coordinator is None:
97 self._coordinator = TransferCoordinator()
99 @property
100 def meta(self):
101 return self._meta
103 def done(self):
104 return self._coordinator.done()
106 def result(self):
107 try:
108 # Usually the result() method blocks until the transfer is done,
109 # however if a KeyboardInterrupt is raised we want want to exit
110 # out of this and propagate the exception.
111 return self._coordinator.result()
112 except KeyboardInterrupt as e:
113 self.cancel()
114 raise e
116 def cancel(self):
117 self._coordinator.cancel()
119 def set_exception(self, exception):
120 """Sets the exception on the future."""
121 if not self.done():
122 raise TransferNotDoneError(
123 'set_exception can only be called once the transfer is '
124 'complete.'
125 )
126 self._coordinator.set_exception(exception, override=True)
129class TransferMeta(BaseTransferMeta):
130 """Holds metadata about the TransferFuture"""
132 def __init__(self, call_args=None, transfer_id=None):
133 self._call_args = call_args
134 self._transfer_id = transfer_id
135 self._size = None
136 self._user_context = {}
137 self._etag = None
139 @property
140 def call_args(self):
141 """The call args used in the transfer request"""
142 return self._call_args
144 @property
145 def transfer_id(self):
146 """The unique id of the transfer"""
147 return self._transfer_id
149 @property
150 def size(self):
151 """The size of the transfer request if known"""
152 return self._size
154 @property
155 def user_context(self):
156 """A dictionary that requesters can store data in"""
157 return self._user_context
159 @property
160 def etag(self):
161 """The etag of the stored object for validating multipart downloads"""
162 return self._etag
164 def provide_transfer_size(self, size):
165 """A method to provide the size of a transfer request
167 By providing this value, the TransferManager will not try to
168 call HeadObject or use the use OS to determine the size of the
169 transfer.
170 """
171 self._size = size
173 def provide_object_etag(self, etag):
174 """A method to provide the etag of a transfer request
176 By providing this value, the TransferManager will validate
177 multipart downloads by supplying an IfMatch parameter with
178 the etag as the value to GetObject requests.
179 """
180 self._etag = etag
183class TransferCoordinator:
184 """A helper class for managing TransferFuture"""
186 def __init__(self, transfer_id=None):
187 self.transfer_id = transfer_id
188 self._status = 'not-started'
189 self._result = None
190 self._exception = None
191 self._associated_futures = set()
192 self._failure_cleanups = []
193 self._done_callbacks = []
194 self._done_event = threading.Event()
195 self._lock = threading.Lock()
196 self._associated_futures_lock = threading.Lock()
197 self._done_callbacks_lock = threading.Lock()
198 self._failure_cleanups_lock = threading.Lock()
200 def __repr__(self):
201 return f'{self.__class__.__name__}(transfer_id={self.transfer_id})'
203 @property
204 def exception(self):
205 return self._exception
207 @property
208 def associated_futures(self):
209 """The list of futures associated to the inprogress TransferFuture
211 Once the transfer finishes this list becomes empty as the transfer
212 is considered done and there should be no running futures left.
213 """
214 with self._associated_futures_lock:
215 # We return a copy of the list because we do not want to
216 # processing the returned list while another thread is adding
217 # more futures to the actual list.
218 return copy.copy(self._associated_futures)
220 @property
221 def failure_cleanups(self):
222 """The list of callbacks to call when the TransferFuture fails"""
223 return self._failure_cleanups
225 @property
226 def status(self):
227 """The status of the TransferFuture
229 The currently supported states are:
230 * not-started - Has yet to start. If in this state, a transfer
231 can be canceled immediately and nothing will happen.
232 * queued - SubmissionTask is about to submit tasks
233 * running - Is inprogress. In-progress as of now means that
234 the SubmissionTask that runs the transfer is being executed. So
235 there is no guarantee any transfer requests had been made to
236 S3 if this state is reached.
237 * cancelled - Was cancelled
238 * failed - An exception other than CancelledError was thrown
239 * success - No exceptions were thrown and is done.
240 """
241 return self._status
243 def set_result(self, result):
244 """Set a result for the TransferFuture
246 Implies that the TransferFuture succeeded. This will always set a
247 result because it is invoked on the final task where there is only
248 ever one final task and it is ran at the very end of a transfer
249 process. So if a result is being set for this final task, the transfer
250 succeeded even if something came a long and canceled the transfer
251 on the final task.
252 """
253 with self._lock:
254 self._exception = None
255 self._result = result
256 self._status = 'success'
258 def set_exception(self, exception, override=False):
259 """Set an exception for the TransferFuture
261 Implies the TransferFuture failed.
263 :param exception: The exception that cause the transfer to fail.
264 :param override: If True, override any existing state.
265 """
266 with self._lock:
267 if not self.done() or override:
268 self._exception = exception
269 self._status = 'failed'
271 def result(self):
272 """Waits until TransferFuture is done and returns the result
274 If the TransferFuture succeeded, it will return the result. If the
275 TransferFuture failed, it will raise the exception associated to the
276 failure.
277 """
278 # Doing a wait() with no timeout cannot be interrupted in python2 but
279 # can be interrupted in python3 so we just wait with the largest
280 # possible value integer value, which is on the scale of billions of
281 # years...
282 self._done_event.wait(MAXINT)
284 # Once done waiting, raise an exception if present or return the
285 # final result.
286 if self._exception:
287 raise self._exception
288 return self._result
290 def cancel(self, msg='', exc_type=CancelledError):
291 """Cancels the TransferFuture
293 :param msg: The message to attach to the cancellation
294 :param exc_type: The type of exception to set for the cancellation
295 """
296 with self._lock:
297 if not self.done():
298 should_announce_done = False
299 logger.debug('%s cancel(%s) called', self, msg)
300 self._exception = exc_type(msg)
301 if self._status == 'not-started':
302 should_announce_done = True
303 self._status = 'cancelled'
304 if should_announce_done:
305 self.announce_done()
307 def set_status_to_queued(self):
308 """Sets the TransferFutrue's status to running"""
309 self._transition_to_non_done_state('queued')
311 def set_status_to_running(self):
312 """Sets the TransferFuture's status to running"""
313 self._transition_to_non_done_state('running')
315 def _transition_to_non_done_state(self, desired_state):
316 with self._lock:
317 if self.done():
318 raise RuntimeError(
319 f'Unable to transition from done state {self.status} to non-done '
320 f'state {desired_state}.'
321 )
322 self._status = desired_state
324 def submit(self, executor, task, tag=None):
325 """Submits a task to a provided executor
327 :type executor: s3transfer.futures.BoundedExecutor
328 :param executor: The executor to submit the callable to
330 :type task: s3transfer.tasks.Task
331 :param task: The task to submit to the executor
333 :type tag: s3transfer.futures.TaskTag
334 :param tag: A tag to associate to the submitted task
336 :rtype: concurrent.futures.Future
337 :returns: A future representing the submitted task
338 """
339 logger.debug(
340 f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}."
341 )
342 future = executor.submit(task, tag=tag)
343 # Add this created future to the list of associated future just
344 # in case it is needed during cleanups.
345 self.add_associated_future(future)
346 future.add_done_callback(
347 FunctionContainer(self.remove_associated_future, future)
348 )
349 return future
351 def done(self):
352 """Determines if a TransferFuture has completed
354 :returns: False if status is equal to 'failed', 'cancelled', or
355 'success'. True, otherwise
356 """
357 return self.status in ['failed', 'cancelled', 'success']
359 def add_associated_future(self, future):
360 """Adds a future to be associated with the TransferFuture"""
361 with self._associated_futures_lock:
362 self._associated_futures.add(future)
364 def remove_associated_future(self, future):
365 """Removes a future's association to the TransferFuture"""
366 with self._associated_futures_lock:
367 self._associated_futures.remove(future)
369 def add_done_callback(self, function, *args, **kwargs):
370 """Add a done callback to be invoked when transfer is done"""
371 with self._done_callbacks_lock:
372 self._done_callbacks.append(
373 FunctionContainer(function, *args, **kwargs)
374 )
376 def add_failure_cleanup(self, function, *args, **kwargs):
377 """Adds a callback to call upon failure"""
378 with self._failure_cleanups_lock:
379 self._failure_cleanups.append(
380 FunctionContainer(function, *args, **kwargs)
381 )
383 def announce_done(self):
384 """Announce that future is done running and run associated callbacks
386 This will run any failure cleanups if the transfer failed if not
387 they have not been run, allows the result() to be unblocked, and will
388 run any done callbacks associated to the TransferFuture if they have
389 not already been ran.
390 """
391 if self.status != 'success':
392 self._run_failure_cleanups()
393 self._done_event.set()
394 self._run_done_callbacks()
396 def _run_done_callbacks(self):
397 # Run the callbacks and remove the callbacks from the internal
398 # list so they do not get ran again if done is announced more than
399 # once.
400 with self._done_callbacks_lock:
401 self._run_callbacks(self._done_callbacks)
402 self._done_callbacks = []
404 def _run_failure_cleanups(self):
405 # Run the cleanup callbacks and remove the callbacks from the internal
406 # list so they do not get ran again if done is announced more than
407 # once.
408 with self._failure_cleanups_lock:
409 self._run_callbacks(self.failure_cleanups)
410 self._failure_cleanups = []
412 def _run_callbacks(self, callbacks):
413 for callback in callbacks:
414 self._run_callback(callback)
416 def _run_callback(self, callback):
417 try:
418 callback()
419 # We do not want a callback interrupting the process, especially
420 # in the failure cleanups. So log and catch, the exception.
421 except Exception:
422 logger.debug(f"Exception raised in {callback}.", exc_info=True)
425class BoundedExecutor:
426 EXECUTOR_CLS = futures.ThreadPoolExecutor
428 def __init__(
429 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
430 ):
431 """An executor implementation that has a maximum queued up tasks
433 The executor will block if the number of tasks that have been
434 submitted and is currently working on is past its maximum.
436 :params max_size: The maximum number of inflight futures. An inflight
437 future means that the task is either queued up or is currently
438 being executed. A size of None or 0 means that the executor will
439 have no bound in terms of the number of inflight futures.
441 :params max_num_threads: The maximum number of threads the executor
442 uses.
444 :type tag_semaphores: dict
445 :params tag_semaphores: A dictionary where the key is the name of the
446 tag and the value is the semaphore to use when limiting the
447 number of tasks the executor is processing at a time.
449 :type executor_cls: BaseExecutor
450 :param underlying_executor_cls: The executor class that
451 get bounded by this executor. If None is provided, the
452 concurrent.futures.ThreadPoolExecutor class is used.
453 """
454 self._max_num_threads = max_num_threads
455 if executor_cls is None:
456 executor_cls = self.EXECUTOR_CLS
457 self._executor = executor_cls(max_workers=self._max_num_threads)
458 self._semaphore = TaskSemaphore(max_size)
459 self._tag_semaphores = tag_semaphores
461 def submit(self, task, tag=None, block=True):
462 """Submit a task to complete
464 :type task: s3transfer.tasks.Task
465 :param task: The task to run __call__ on
468 :type tag: s3transfer.futures.TaskTag
469 :param tag: An optional tag to associate to the task. This
470 is used to override which semaphore to use.
472 :type block: boolean
473 :param block: True if to wait till it is possible to submit a task.
474 False, if not to wait and raise an error if not able to submit
475 a task.
477 :returns: The future associated to the submitted task
478 """
479 semaphore = self._semaphore
480 # If a tag was provided, use the semaphore associated to that
481 # tag.
482 if tag:
483 semaphore = self._tag_semaphores[tag]
485 # Call acquire on the semaphore.
486 acquire_token = semaphore.acquire(task.transfer_id, block)
487 # Create a callback to invoke when task is done in order to call
488 # release on the semaphore.
489 release_callback = FunctionContainer(
490 semaphore.release, task.transfer_id, acquire_token
491 )
492 # Submit the task to the underlying executor.
493 # Pass the current context to ensure child threads persist the
494 # parent thread's context.
495 future = ExecutorFuture(self._executor.submit(task, get_context()))
496 # Add the Semaphore.release() callback to the future such that
497 # it is invoked once the future completes.
498 future.add_done_callback(release_callback)
499 return future
501 def shutdown(self, wait=True):
502 self._executor.shutdown(wait)
505class ExecutorFuture:
506 def __init__(self, future):
507 """A future returned from the executor
509 Currently, it is just a wrapper around a concurrent.futures.Future.
510 However, this can eventually grow to implement the needed functionality
511 of concurrent.futures.Future if we move off of the library and not
512 affect the rest of the codebase.
514 :type future: concurrent.futures.Future
515 :param future: The underlying future
516 """
517 self._future = future
519 def result(self):
520 return self._future.result()
522 def add_done_callback(self, fn):
523 """Adds a callback to be completed once future is done
525 :param fn: A callable that takes no arguments. Note that is different
526 than concurrent.futures.Future.add_done_callback that requires
527 a single argument for the future.
528 """
530 # The done callback for concurrent.futures.Future will always pass a
531 # the future in as the only argument. So we need to create the
532 # proper signature wrapper that will invoke the callback provided.
533 def done_callback(future_passed_to_callback):
534 return fn()
536 self._future.add_done_callback(done_callback)
538 def done(self):
539 return self._future.done()
542class BaseExecutor:
543 """Base Executor class implementation needed to work with s3transfer"""
545 def __init__(self, max_workers=None):
546 pass
548 def submit(self, fn, *args, **kwargs):
549 raise NotImplementedError('submit()')
551 def shutdown(self, wait=True):
552 raise NotImplementedError('shutdown()')
555class NonThreadedExecutor(BaseExecutor):
556 """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
558 def submit(self, fn, *args, **kwargs):
559 future = NonThreadedExecutorFuture()
560 try:
561 result = fn(*args, **kwargs)
562 future.set_result(result)
563 except Exception:
564 e, tb = sys.exc_info()[1:]
565 logger.debug(
566 'Setting exception for %s to %s with traceback %s',
567 future,
568 e,
569 tb,
570 )
571 future.set_exception_info(e, tb)
572 return future
574 def shutdown(self, wait=True):
575 pass
578class NonThreadedExecutorFuture:
579 """The Future returned from NonThreadedExecutor
581 Note that this future is **not** thread-safe as it is being used
582 from the context of a non-threaded environment.
583 """
585 def __init__(self):
586 self._result = None
587 self._exception = None
588 self._traceback = None
589 self._done = False
590 self._done_callbacks = []
592 def set_result(self, result):
593 self._result = result
594 self._set_done()
596 def set_exception_info(self, exception, traceback):
597 self._exception = exception
598 self._traceback = traceback
599 self._set_done()
601 def result(self, timeout=None):
602 if self._exception:
603 raise self._exception.with_traceback(self._traceback)
604 return self._result
606 def _set_done(self):
607 self._done = True
608 for done_callback in self._done_callbacks:
609 self._invoke_done_callback(done_callback)
610 self._done_callbacks = []
612 def _invoke_done_callback(self, done_callback):
613 return done_callback(self)
615 def done(self):
616 return self._done
618 def add_done_callback(self, fn):
619 if self._done:
620 self._invoke_done_callback(fn)
621 else:
622 self._done_callbacks.append(fn)
625TaskTag = namedtuple('TaskTag', ['name'])
627IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
628IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')