1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import sys
16import threading
17from collections import namedtuple
18from concurrent import futures
19
20from s3transfer.compat import MAXINT
21from s3transfer.exceptions import CancelledError, TransferNotDoneError
22from s3transfer.utils import FunctionContainer, TaskSemaphore
23
24logger = logging.getLogger(__name__)
25
26
27class BaseTransferFuture:
28 @property
29 def meta(self):
30 """The metadata associated to the TransferFuture"""
31 raise NotImplementedError('meta')
32
33 def done(self):
34 """Determines if a TransferFuture has completed
35
36 :returns: True if completed. False, otherwise.
37 """
38 raise NotImplementedError('done()')
39
40 def result(self):
41 """Waits until TransferFuture is done and returns the result
42
43 If the TransferFuture succeeded, it will return the result. If the
44 TransferFuture failed, it will raise the exception associated to the
45 failure.
46 """
47 raise NotImplementedError('result()')
48
49 def cancel(self):
50 """Cancels the request associated with the TransferFuture"""
51 raise NotImplementedError('cancel()')
52
53
54class BaseTransferMeta:
55 @property
56 def call_args(self):
57 """The call args used in the transfer request"""
58 raise NotImplementedError('call_args')
59
60 @property
61 def transfer_id(self):
62 """The unique id of the transfer"""
63 raise NotImplementedError('transfer_id')
64
65 @property
66 def user_context(self):
67 """A dictionary that requesters can store data in"""
68 raise NotImplementedError('user_context')
69
70
71class TransferFuture(BaseTransferFuture):
72 def __init__(self, meta=None, coordinator=None):
73 """The future associated to a submitted transfer request
74
75 :type meta: TransferMeta
76 :param meta: The metadata associated to the request. This object
77 is visible to the requester.
78
79 :type coordinator: TransferCoordinator
80 :param coordinator: The coordinator associated to the request. This
81 object is not visible to the requester.
82 """
83 self._meta = meta
84 if meta is None:
85 self._meta = TransferMeta()
86
87 self._coordinator = coordinator
88 if coordinator is None:
89 self._coordinator = TransferCoordinator()
90
91 @property
92 def meta(self):
93 return self._meta
94
95 def done(self):
96 return self._coordinator.done()
97
98 def result(self):
99 try:
100 # Usually the result() method blocks until the transfer is done,
101 # however if a KeyboardInterrupt is raised we want want to exit
102 # out of this and propagate the exception.
103 return self._coordinator.result()
104 except KeyboardInterrupt as e:
105 self.cancel()
106 raise e
107
108 def cancel(self):
109 self._coordinator.cancel()
110
111 def set_exception(self, exception):
112 """Sets the exception on the future."""
113 if not self.done():
114 raise TransferNotDoneError(
115 'set_exception can only be called once the transfer is '
116 'complete.'
117 )
118 self._coordinator.set_exception(exception, override=True)
119
120
121class TransferMeta(BaseTransferMeta):
122 """Holds metadata about the TransferFuture"""
123
124 def __init__(self, call_args=None, transfer_id=None):
125 self._call_args = call_args
126 self._transfer_id = transfer_id
127 self._size = None
128 self._user_context = {}
129
130 @property
131 def call_args(self):
132 """The call args used in the transfer request"""
133 return self._call_args
134
135 @property
136 def transfer_id(self):
137 """The unique id of the transfer"""
138 return self._transfer_id
139
140 @property
141 def size(self):
142 """The size of the transfer request if known"""
143 return self._size
144
145 @property
146 def user_context(self):
147 """A dictionary that requesters can store data in"""
148 return self._user_context
149
150 def provide_transfer_size(self, size):
151 """A method to provide the size of a transfer request
152
153 By providing this value, the TransferManager will not try to
154 call HeadObject or use the use OS to determine the size of the
155 transfer.
156 """
157 self._size = size
158
159
160class TransferCoordinator:
161 """A helper class for managing TransferFuture"""
162
163 def __init__(self, transfer_id=None):
164 self.transfer_id = transfer_id
165 self._status = 'not-started'
166 self._result = None
167 self._exception = None
168 self._associated_futures = set()
169 self._failure_cleanups = []
170 self._done_callbacks = []
171 self._done_event = threading.Event()
172 self._lock = threading.Lock()
173 self._associated_futures_lock = threading.Lock()
174 self._done_callbacks_lock = threading.Lock()
175 self._failure_cleanups_lock = threading.Lock()
176
177 def __repr__(self):
178 return '{}(transfer_id={})'.format(
179 self.__class__.__name__, self.transfer_id
180 )
181
182 @property
183 def exception(self):
184 return self._exception
185
186 @property
187 def associated_futures(self):
188 """The list of futures associated to the inprogress TransferFuture
189
190 Once the transfer finishes this list becomes empty as the transfer
191 is considered done and there should be no running futures left.
192 """
193 with self._associated_futures_lock:
194 # We return a copy of the list because we do not want to
195 # processing the returned list while another thread is adding
196 # more futures to the actual list.
197 return copy.copy(self._associated_futures)
198
199 @property
200 def failure_cleanups(self):
201 """The list of callbacks to call when the TransferFuture fails"""
202 return self._failure_cleanups
203
204 @property
205 def status(self):
206 """The status of the TransferFuture
207
208 The currently supported states are:
209 * not-started - Has yet to start. If in this state, a transfer
210 can be canceled immediately and nothing will happen.
211 * queued - SubmissionTask is about to submit tasks
212 * running - Is inprogress. In-progress as of now means that
213 the SubmissionTask that runs the transfer is being executed. So
214 there is no guarantee any transfer requests had been made to
215 S3 if this state is reached.
216 * cancelled - Was cancelled
217 * failed - An exception other than CancelledError was thrown
218 * success - No exceptions were thrown and is done.
219 """
220 return self._status
221
222 def set_result(self, result):
223 """Set a result for the TransferFuture
224
225 Implies that the TransferFuture succeeded. This will always set a
226 result because it is invoked on the final task where there is only
227 ever one final task and it is ran at the very end of a transfer
228 process. So if a result is being set for this final task, the transfer
229 succeeded even if something came a long and canceled the transfer
230 on the final task.
231 """
232 with self._lock:
233 self._exception = None
234 self._result = result
235 self._status = 'success'
236
237 def set_exception(self, exception, override=False):
238 """Set an exception for the TransferFuture
239
240 Implies the TransferFuture failed.
241
242 :param exception: The exception that cause the transfer to fail.
243 :param override: If True, override any existing state.
244 """
245 with self._lock:
246 if not self.done() or override:
247 self._exception = exception
248 self._status = 'failed'
249
250 def result(self):
251 """Waits until TransferFuture is done and returns the result
252
253 If the TransferFuture succeeded, it will return the result. If the
254 TransferFuture failed, it will raise the exception associated to the
255 failure.
256 """
257 # Doing a wait() with no timeout cannot be interrupted in python2 but
258 # can be interrupted in python3 so we just wait with the largest
259 # possible value integer value, which is on the scale of billions of
260 # years...
261 self._done_event.wait(MAXINT)
262
263 # Once done waiting, raise an exception if present or return the
264 # final result.
265 if self._exception:
266 raise self._exception
267 return self._result
268
269 def cancel(self, msg='', exc_type=CancelledError):
270 """Cancels the TransferFuture
271
272 :param msg: The message to attach to the cancellation
273 :param exc_type: The type of exception to set for the cancellation
274 """
275 with self._lock:
276 if not self.done():
277 should_announce_done = False
278 logger.debug('%s cancel(%s) called', self, msg)
279 self._exception = exc_type(msg)
280 if self._status == 'not-started':
281 should_announce_done = True
282 self._status = 'cancelled'
283 if should_announce_done:
284 self.announce_done()
285
286 def set_status_to_queued(self):
287 """Sets the TransferFutrue's status to running"""
288 self._transition_to_non_done_state('queued')
289
290 def set_status_to_running(self):
291 """Sets the TransferFuture's status to running"""
292 self._transition_to_non_done_state('running')
293
294 def _transition_to_non_done_state(self, desired_state):
295 with self._lock:
296 if self.done():
297 raise RuntimeError(
298 'Unable to transition from done state %s to non-done '
299 'state %s.' % (self.status, desired_state)
300 )
301 self._status = desired_state
302
303 def submit(self, executor, task, tag=None):
304 """Submits a task to a provided executor
305
306 :type executor: s3transfer.futures.BoundedExecutor
307 :param executor: The executor to submit the callable to
308
309 :type task: s3transfer.tasks.Task
310 :param task: The task to submit to the executor
311
312 :type tag: s3transfer.futures.TaskTag
313 :param tag: A tag to associate to the submitted task
314
315 :rtype: concurrent.futures.Future
316 :returns: A future representing the submitted task
317 """
318 logger.debug(
319 "Submitting task {} to executor {} for transfer request: {}.".format(
320 task, executor, self.transfer_id
321 )
322 )
323 future = executor.submit(task, tag=tag)
324 # Add this created future to the list of associated future just
325 # in case it is needed during cleanups.
326 self.add_associated_future(future)
327 future.add_done_callback(
328 FunctionContainer(self.remove_associated_future, future)
329 )
330 return future
331
332 def done(self):
333 """Determines if a TransferFuture has completed
334
335 :returns: False if status is equal to 'failed', 'cancelled', or
336 'success'. True, otherwise
337 """
338 return self.status in ['failed', 'cancelled', 'success']
339
340 def add_associated_future(self, future):
341 """Adds a future to be associated with the TransferFuture"""
342 with self._associated_futures_lock:
343 self._associated_futures.add(future)
344
345 def remove_associated_future(self, future):
346 """Removes a future's association to the TransferFuture"""
347 with self._associated_futures_lock:
348 self._associated_futures.remove(future)
349
350 def add_done_callback(self, function, *args, **kwargs):
351 """Add a done callback to be invoked when transfer is done"""
352 with self._done_callbacks_lock:
353 self._done_callbacks.append(
354 FunctionContainer(function, *args, **kwargs)
355 )
356
357 def add_failure_cleanup(self, function, *args, **kwargs):
358 """Adds a callback to call upon failure"""
359 with self._failure_cleanups_lock:
360 self._failure_cleanups.append(
361 FunctionContainer(function, *args, **kwargs)
362 )
363
364 def announce_done(self):
365 """Announce that future is done running and run associated callbacks
366
367 This will run any failure cleanups if the transfer failed if not
368 they have not been run, allows the result() to be unblocked, and will
369 run any done callbacks associated to the TransferFuture if they have
370 not already been ran.
371 """
372 if self.status != 'success':
373 self._run_failure_cleanups()
374 self._done_event.set()
375 self._run_done_callbacks()
376
377 def _run_done_callbacks(self):
378 # Run the callbacks and remove the callbacks from the internal
379 # list so they do not get ran again if done is announced more than
380 # once.
381 with self._done_callbacks_lock:
382 self._run_callbacks(self._done_callbacks)
383 self._done_callbacks = []
384
385 def _run_failure_cleanups(self):
386 # Run the cleanup callbacks and remove the callbacks from the internal
387 # list so they do not get ran again if done is announced more than
388 # once.
389 with self._failure_cleanups_lock:
390 self._run_callbacks(self.failure_cleanups)
391 self._failure_cleanups = []
392
393 def _run_callbacks(self, callbacks):
394 for callback in callbacks:
395 self._run_callback(callback)
396
397 def _run_callback(self, callback):
398 try:
399 callback()
400 # We do not want a callback interrupting the process, especially
401 # in the failure cleanups. So log and catch, the exception.
402 except Exception:
403 logger.debug("Exception raised in %s." % callback, exc_info=True)
404
405
406class BoundedExecutor:
407 EXECUTOR_CLS = futures.ThreadPoolExecutor
408
409 def __init__(
410 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
411 ):
412 """An executor implementation that has a maximum queued up tasks
413
414 The executor will block if the number of tasks that have been
415 submitted and is currently working on is past its maximum.
416
417 :params max_size: The maximum number of inflight futures. An inflight
418 future means that the task is either queued up or is currently
419 being executed. A size of None or 0 means that the executor will
420 have no bound in terms of the number of inflight futures.
421
422 :params max_num_threads: The maximum number of threads the executor
423 uses.
424
425 :type tag_semaphores: dict
426 :params tag_semaphores: A dictionary where the key is the name of the
427 tag and the value is the semaphore to use when limiting the
428 number of tasks the executor is processing at a time.
429
430 :type executor_cls: BaseExecutor
431 :param underlying_executor_cls: The executor class that
432 get bounded by this executor. If None is provided, the
433 concurrent.futures.ThreadPoolExecutor class is used.
434 """
435 self._max_num_threads = max_num_threads
436 if executor_cls is None:
437 executor_cls = self.EXECUTOR_CLS
438 self._executor = executor_cls(max_workers=self._max_num_threads)
439 self._semaphore = TaskSemaphore(max_size)
440 self._tag_semaphores = tag_semaphores
441
442 def submit(self, task, tag=None, block=True):
443 """Submit a task to complete
444
445 :type task: s3transfer.tasks.Task
446 :param task: The task to run __call__ on
447
448
449 :type tag: s3transfer.futures.TaskTag
450 :param tag: An optional tag to associate to the task. This
451 is used to override which semaphore to use.
452
453 :type block: boolean
454 :param block: True if to wait till it is possible to submit a task.
455 False, if not to wait and raise an error if not able to submit
456 a task.
457
458 :returns: The future associated to the submitted task
459 """
460 semaphore = self._semaphore
461 # If a tag was provided, use the semaphore associated to that
462 # tag.
463 if tag:
464 semaphore = self._tag_semaphores[tag]
465
466 # Call acquire on the semaphore.
467 acquire_token = semaphore.acquire(task.transfer_id, block)
468 # Create a callback to invoke when task is done in order to call
469 # release on the semaphore.
470 release_callback = FunctionContainer(
471 semaphore.release, task.transfer_id, acquire_token
472 )
473 # Submit the task to the underlying executor.
474 future = ExecutorFuture(self._executor.submit(task))
475 # Add the Semaphore.release() callback to the future such that
476 # it is invoked once the future completes.
477 future.add_done_callback(release_callback)
478 return future
479
480 def shutdown(self, wait=True):
481 self._executor.shutdown(wait)
482
483
484class ExecutorFuture:
485 def __init__(self, future):
486 """A future returned from the executor
487
488 Currently, it is just a wrapper around a concurrent.futures.Future.
489 However, this can eventually grow to implement the needed functionality
490 of concurrent.futures.Future if we move off of the library and not
491 affect the rest of the codebase.
492
493 :type future: concurrent.futures.Future
494 :param future: The underlying future
495 """
496 self._future = future
497
498 def result(self):
499 return self._future.result()
500
501 def add_done_callback(self, fn):
502 """Adds a callback to be completed once future is done
503
504 :param fn: A callable that takes no arguments. Note that is different
505 than concurrent.futures.Future.add_done_callback that requires
506 a single argument for the future.
507 """
508
509 # The done callback for concurrent.futures.Future will always pass a
510 # the future in as the only argument. So we need to create the
511 # proper signature wrapper that will invoke the callback provided.
512 def done_callback(future_passed_to_callback):
513 return fn()
514
515 self._future.add_done_callback(done_callback)
516
517 def done(self):
518 return self._future.done()
519
520
521class BaseExecutor:
522 """Base Executor class implementation needed to work with s3transfer"""
523
524 def __init__(self, max_workers=None):
525 pass
526
527 def submit(self, fn, *args, **kwargs):
528 raise NotImplementedError('submit()')
529
530 def shutdown(self, wait=True):
531 raise NotImplementedError('shutdown()')
532
533
534class NonThreadedExecutor(BaseExecutor):
535 """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
536
537 def submit(self, fn, *args, **kwargs):
538 future = NonThreadedExecutorFuture()
539 try:
540 result = fn(*args, **kwargs)
541 future.set_result(result)
542 except Exception:
543 e, tb = sys.exc_info()[1:]
544 logger.debug(
545 'Setting exception for %s to %s with traceback %s',
546 future,
547 e,
548 tb,
549 )
550 future.set_exception_info(e, tb)
551 return future
552
553 def shutdown(self, wait=True):
554 pass
555
556
557class NonThreadedExecutorFuture:
558 """The Future returned from NonThreadedExecutor
559
560 Note that this future is **not** thread-safe as it is being used
561 from the context of a non-threaded environment.
562 """
563
564 def __init__(self):
565 self._result = None
566 self._exception = None
567 self._traceback = None
568 self._done = False
569 self._done_callbacks = []
570
571 def set_result(self, result):
572 self._result = result
573 self._set_done()
574
575 def set_exception_info(self, exception, traceback):
576 self._exception = exception
577 self._traceback = traceback
578 self._set_done()
579
580 def result(self, timeout=None):
581 if self._exception:
582 raise self._exception.with_traceback(self._traceback)
583 return self._result
584
585 def _set_done(self):
586 self._done = True
587 for done_callback in self._done_callbacks:
588 self._invoke_done_callback(done_callback)
589 self._done_callbacks = []
590
591 def _invoke_done_callback(self, done_callback):
592 return done_callback(self)
593
594 def done(self):
595 return self._done
596
597 def add_done_callback(self, fn):
598 if self._done:
599 self._invoke_done_callback(fn)
600 else:
601 self._done_callbacks.append(fn)
602
603
604TaskTag = namedtuple('TaskTag', ['name'])
605
606IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
607IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')