1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15import sys
16import threading
17from collections import namedtuple
18from concurrent import futures
19
20from s3transfer.compat import MAXINT
21from s3transfer.exceptions import CancelledError, TransferNotDoneError
22from s3transfer.utils import FunctionContainer, TaskSemaphore
23
24try:
25 from botocore.context import get_context
26except ImportError:
27
28 def get_context():
29 return None
30
31
32logger = logging.getLogger(__name__)
33
34
35class BaseTransferFuture:
36 @property
37 def meta(self):
38 """The metadata associated to the TransferFuture"""
39 raise NotImplementedError('meta')
40
41 def done(self):
42 """Determines if a TransferFuture has completed
43
44 :returns: True if completed. False, otherwise.
45 """
46 raise NotImplementedError('done()')
47
48 def result(self):
49 """Waits until TransferFuture is done and returns the result
50
51 If the TransferFuture succeeded, it will return the result. If the
52 TransferFuture failed, it will raise the exception associated to the
53 failure.
54 """
55 raise NotImplementedError('result()')
56
57 def cancel(self):
58 """Cancels the request associated with the TransferFuture"""
59 raise NotImplementedError('cancel()')
60
61
62class BaseTransferMeta:
63 @property
64 def call_args(self):
65 """The call args used in the transfer request"""
66 raise NotImplementedError('call_args')
67
68 @property
69 def transfer_id(self):
70 """The unique id of the transfer"""
71 raise NotImplementedError('transfer_id')
72
73 @property
74 def user_context(self):
75 """A dictionary that requesters can store data in"""
76 raise NotImplementedError('user_context')
77
78
79class TransferFuture(BaseTransferFuture):
80 def __init__(self, meta=None, coordinator=None):
81 """The future associated to a submitted transfer request
82
83 :type meta: TransferMeta
84 :param meta: The metadata associated to the request. This object
85 is visible to the requester.
86
87 :type coordinator: TransferCoordinator
88 :param coordinator: The coordinator associated to the request. This
89 object is not visible to the requester.
90 """
91 self._meta = meta
92 if meta is None:
93 self._meta = TransferMeta()
94
95 self._coordinator = coordinator
96 if coordinator is None:
97 self._coordinator = TransferCoordinator()
98
99 @property
100 def meta(self):
101 return self._meta
102
103 def done(self):
104 return self._coordinator.done()
105
106 def result(self):
107 try:
108 # Usually the result() method blocks until the transfer is done,
109 # however if a KeyboardInterrupt is raised we want want to exit
110 # out of this and propagate the exception.
111 return self._coordinator.result()
112 except KeyboardInterrupt as e:
113 self.cancel()
114 raise e
115
116 def cancel(self):
117 self._coordinator.cancel()
118
119 def set_exception(self, exception):
120 """Sets the exception on the future."""
121 if not self.done():
122 raise TransferNotDoneError(
123 'set_exception can only be called once the transfer is '
124 'complete.'
125 )
126 self._coordinator.set_exception(exception, override=True)
127
128
129class TransferMeta(BaseTransferMeta):
130 """Holds metadata about the TransferFuture"""
131
132 def __init__(self, call_args=None, transfer_id=None):
133 self._call_args = call_args
134 self._transfer_id = transfer_id
135 self._size = None
136 self._user_context = {}
137 self._etag = None
138
139 @property
140 def call_args(self):
141 """The call args used in the transfer request"""
142 return self._call_args
143
144 @property
145 def transfer_id(self):
146 """The unique id of the transfer"""
147 return self._transfer_id
148
149 @property
150 def size(self):
151 """The size of the transfer request if known"""
152 return self._size
153
154 @property
155 def user_context(self):
156 """A dictionary that requesters can store data in"""
157 return self._user_context
158
159 @property
160 def etag(self):
161 """The etag of the stored object for validating multipart downloads"""
162 return self._etag
163
164 def provide_transfer_size(self, size):
165 """A method to provide the size of a transfer request
166
167 By providing this value, the TransferManager will not try to
168 call HeadObject or use the use OS to determine the size of the
169 transfer.
170 """
171 self._size = size
172
173 def provide_object_etag(self, etag):
174 """A method to provide the etag of a transfer request
175
176 By providing this value, the TransferManager will validate
177 multipart downloads by supplying an IfMatch parameter with
178 the etag as the value to GetObject requests.
179 """
180 self._etag = etag
181
182
183class TransferCoordinator:
184 """A helper class for managing TransferFuture"""
185
186 def __init__(self, transfer_id=None):
187 self.transfer_id = transfer_id
188 self._status = 'not-started'
189 self._result = None
190 self._exception = None
191 self._associated_futures = set()
192 self._failure_cleanups = []
193 self._done_callbacks = []
194 self._done_event = threading.Event()
195 self._lock = threading.Lock()
196 self._associated_futures_lock = threading.Lock()
197 self._done_callbacks_lock = threading.Lock()
198 self._failure_cleanups_lock = threading.Lock()
199
200 def __repr__(self):
201 return f'{self.__class__.__name__}(transfer_id={self.transfer_id})'
202
203 @property
204 def exception(self):
205 return self._exception
206
207 @property
208 def associated_futures(self):
209 """The list of futures associated to the inprogress TransferFuture
210
211 Once the transfer finishes this list becomes empty as the transfer
212 is considered done and there should be no running futures left.
213 """
214 with self._associated_futures_lock:
215 # We return a copy of the list because we do not want to
216 # processing the returned list while another thread is adding
217 # more futures to the actual list.
218 return copy.copy(self._associated_futures)
219
220 @property
221 def failure_cleanups(self):
222 """The list of callbacks to call when the TransferFuture fails"""
223 return self._failure_cleanups
224
225 @property
226 def status(self):
227 """The status of the TransferFuture
228
229 The currently supported states are:
230 * not-started - Has yet to start. If in this state, a transfer
231 can be canceled immediately and nothing will happen.
232 * queued - SubmissionTask is about to submit tasks
233 * running - Is inprogress. In-progress as of now means that
234 the SubmissionTask that runs the transfer is being executed. So
235 there is no guarantee any transfer requests had been made to
236 S3 if this state is reached.
237 * cancelled - Was cancelled
238 * failed - An exception other than CancelledError was thrown
239 * success - No exceptions were thrown and is done.
240 """
241 return self._status
242
243 def set_result(self, result):
244 """Set a result for the TransferFuture
245
246 Implies that the TransferFuture succeeded. This will always set a
247 result because it is invoked on the final task where there is only
248 ever one final task and it is ran at the very end of a transfer
249 process. So if a result is being set for this final task, the transfer
250 succeeded even if something came a long and canceled the transfer
251 on the final task.
252 """
253 with self._lock:
254 self._exception = None
255 self._result = result
256 self._status = 'success'
257
258 def set_exception(self, exception, override=False):
259 """Set an exception for the TransferFuture
260
261 Implies the TransferFuture failed.
262
263 :param exception: The exception that cause the transfer to fail.
264 :param override: If True, override any existing state.
265 """
266 with self._lock:
267 if not self.done() or override:
268 self._exception = exception
269 self._status = 'failed'
270
271 def result(self):
272 """Waits until TransferFuture is done and returns the result
273
274 If the TransferFuture succeeded, it will return the result. If the
275 TransferFuture failed, it will raise the exception associated to the
276 failure.
277 """
278 # Doing a wait() with no timeout cannot be interrupted in python2 but
279 # can be interrupted in python3 so we just wait with the largest
280 # possible value integer value, which is on the scale of billions of
281 # years...
282 self._done_event.wait(MAXINT)
283
284 # Once done waiting, raise an exception if present or return the
285 # final result.
286 if self._exception:
287 raise self._exception
288 return self._result
289
290 def cancel(self, msg='', exc_type=CancelledError):
291 """Cancels the TransferFuture
292
293 :param msg: The message to attach to the cancellation
294 :param exc_type: The type of exception to set for the cancellation
295 """
296 with self._lock:
297 if not self.done():
298 should_announce_done = False
299 logger.debug('%s cancel(%s) called', self, msg)
300 self._exception = exc_type(msg)
301 if self._status == 'not-started':
302 should_announce_done = True
303 self._status = 'cancelled'
304 if should_announce_done:
305 self.announce_done()
306
307 def set_status_to_queued(self):
308 """Sets the TransferFutrue's status to running"""
309 self._transition_to_non_done_state('queued')
310
311 def set_status_to_running(self):
312 """Sets the TransferFuture's status to running"""
313 self._transition_to_non_done_state('running')
314
315 def _transition_to_non_done_state(self, desired_state):
316 with self._lock:
317 if self.done():
318 raise RuntimeError(
319 f'Unable to transition from done state {self.status} to non-done '
320 f'state {desired_state}.'
321 )
322 self._status = desired_state
323
324 def submit(self, executor, task, tag=None):
325 """Submits a task to a provided executor
326
327 :type executor: s3transfer.futures.BoundedExecutor
328 :param executor: The executor to submit the callable to
329
330 :type task: s3transfer.tasks.Task
331 :param task: The task to submit to the executor
332
333 :type tag: s3transfer.futures.TaskTag
334 :param tag: A tag to associate to the submitted task
335
336 :rtype: concurrent.futures.Future
337 :returns: A future representing the submitted task
338 """
339 logger.debug(
340 f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}."
341 )
342 future = executor.submit(task, tag=tag)
343 # Add this created future to the list of associated future just
344 # in case it is needed during cleanups.
345 self.add_associated_future(future)
346 future.add_done_callback(
347 FunctionContainer(self.remove_associated_future, future)
348 )
349 return future
350
351 def done(self):
352 """Determines if a TransferFuture has completed
353
354 :returns: False if status is equal to 'failed', 'cancelled', or
355 'success'. True, otherwise
356 """
357 return self.status in ['failed', 'cancelled', 'success']
358
359 def add_associated_future(self, future):
360 """Adds a future to be associated with the TransferFuture"""
361 with self._associated_futures_lock:
362 self._associated_futures.add(future)
363
364 def remove_associated_future(self, future):
365 """Removes a future's association to the TransferFuture"""
366 with self._associated_futures_lock:
367 self._associated_futures.remove(future)
368
369 def add_done_callback(self, function, *args, **kwargs):
370 """Add a done callback to be invoked when transfer is done"""
371 with self._done_callbacks_lock:
372 self._done_callbacks.append(
373 FunctionContainer(function, *args, **kwargs)
374 )
375
376 def add_failure_cleanup(self, function, *args, **kwargs):
377 """Adds a callback to call upon failure"""
378 with self._failure_cleanups_lock:
379 self._failure_cleanups.append(
380 FunctionContainer(function, *args, **kwargs)
381 )
382
383 def announce_done(self):
384 """Announce that future is done running and run associated callbacks
385
386 This will run any failure cleanups if the transfer failed if not
387 they have not been run, allows the result() to be unblocked, and will
388 run any done callbacks associated to the TransferFuture if they have
389 not already been ran.
390 """
391 if self.status != 'success':
392 self._run_failure_cleanups()
393 self._done_event.set()
394 self._run_done_callbacks()
395
396 def _run_done_callbacks(self):
397 # Run the callbacks and remove the callbacks from the internal
398 # list so they do not get ran again if done is announced more than
399 # once.
400 with self._done_callbacks_lock:
401 self._run_callbacks(self._done_callbacks)
402 self._done_callbacks = []
403
404 def _run_failure_cleanups(self):
405 # Run the cleanup callbacks and remove the callbacks from the internal
406 # list so they do not get ran again if done is announced more than
407 # once.
408 with self._failure_cleanups_lock:
409 self._run_callbacks(self.failure_cleanups)
410 self._failure_cleanups = []
411
412 def _run_callbacks(self, callbacks):
413 for callback in callbacks:
414 self._run_callback(callback)
415
416 def _run_callback(self, callback):
417 try:
418 callback()
419 # We do not want a callback interrupting the process, especially
420 # in the failure cleanups. So log and catch, the exception.
421 except Exception:
422 logger.debug(f"Exception raised in {callback}.", exc_info=True)
423
424
425class BoundedExecutor:
426 EXECUTOR_CLS = futures.ThreadPoolExecutor
427
428 def __init__(
429 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
430 ):
431 """An executor implementation that has a maximum queued up tasks
432
433 The executor will block if the number of tasks that have been
434 submitted and is currently working on is past its maximum.
435
436 :params max_size: The maximum number of inflight futures. An inflight
437 future means that the task is either queued up or is currently
438 being executed. A size of None or 0 means that the executor will
439 have no bound in terms of the number of inflight futures.
440
441 :params max_num_threads: The maximum number of threads the executor
442 uses.
443
444 :type tag_semaphores: dict
445 :params tag_semaphores: A dictionary where the key is the name of the
446 tag and the value is the semaphore to use when limiting the
447 number of tasks the executor is processing at a time.
448
449 :type executor_cls: BaseExecutor
450 :param underlying_executor_cls: The executor class that
451 get bounded by this executor. If None is provided, the
452 concurrent.futures.ThreadPoolExecutor class is used.
453 """
454 self._max_num_threads = max_num_threads
455 if executor_cls is None:
456 executor_cls = self.EXECUTOR_CLS
457 self._executor = executor_cls(max_workers=self._max_num_threads)
458 self._semaphore = TaskSemaphore(max_size)
459 self._tag_semaphores = tag_semaphores
460
461 def submit(self, task, tag=None, block=True):
462 """Submit a task to complete
463
464 :type task: s3transfer.tasks.Task
465 :param task: The task to run __call__ on
466
467
468 :type tag: s3transfer.futures.TaskTag
469 :param tag: An optional tag to associate to the task. This
470 is used to override which semaphore to use.
471
472 :type block: boolean
473 :param block: True if to wait till it is possible to submit a task.
474 False, if not to wait and raise an error if not able to submit
475 a task.
476
477 :returns: The future associated to the submitted task
478 """
479 semaphore = self._semaphore
480 # If a tag was provided, use the semaphore associated to that
481 # tag.
482 if tag:
483 semaphore = self._tag_semaphores[tag]
484
485 # Call acquire on the semaphore.
486 acquire_token = semaphore.acquire(task.transfer_id, block)
487 # Create a callback to invoke when task is done in order to call
488 # release on the semaphore.
489 release_callback = FunctionContainer(
490 semaphore.release, task.transfer_id, acquire_token
491 )
492 # Submit the task to the underlying executor.
493 # Pass the current context to ensure child threads persist the
494 # parent thread's context.
495 future = ExecutorFuture(self._executor.submit(task, get_context()))
496 # Add the Semaphore.release() callback to the future such that
497 # it is invoked once the future completes.
498 future.add_done_callback(release_callback)
499 return future
500
501 def shutdown(self, wait=True):
502 self._executor.shutdown(wait)
503
504
505class ExecutorFuture:
506 def __init__(self, future):
507 """A future returned from the executor
508
509 Currently, it is just a wrapper around a concurrent.futures.Future.
510 However, this can eventually grow to implement the needed functionality
511 of concurrent.futures.Future if we move off of the library and not
512 affect the rest of the codebase.
513
514 :type future: concurrent.futures.Future
515 :param future: The underlying future
516 """
517 self._future = future
518
519 def result(self):
520 return self._future.result()
521
522 def add_done_callback(self, fn):
523 """Adds a callback to be completed once future is done
524
525 :param fn: A callable that takes no arguments. Note that is different
526 than concurrent.futures.Future.add_done_callback that requires
527 a single argument for the future.
528 """
529
530 # The done callback for concurrent.futures.Future will always pass a
531 # the future in as the only argument. So we need to create the
532 # proper signature wrapper that will invoke the callback provided.
533 def done_callback(future_passed_to_callback):
534 return fn()
535
536 self._future.add_done_callback(done_callback)
537
538 def done(self):
539 return self._future.done()
540
541
542class BaseExecutor:
543 """Base Executor class implementation needed to work with s3transfer"""
544
545 def __init__(self, max_workers=None):
546 pass
547
548 def submit(self, fn, *args, **kwargs):
549 raise NotImplementedError('submit()')
550
551 def shutdown(self, wait=True):
552 raise NotImplementedError('shutdown()')
553
554
555class NonThreadedExecutor(BaseExecutor):
556 """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
557
558 def submit(self, fn, *args, **kwargs):
559 future = NonThreadedExecutorFuture()
560 try:
561 result = fn(*args, **kwargs)
562 future.set_result(result)
563 except Exception:
564 e, tb = sys.exc_info()[1:]
565 logger.debug(
566 'Setting exception for %s to %s with traceback %s',
567 future,
568 e,
569 tb,
570 )
571 future.set_exception_info(e, tb)
572 return future
573
574 def shutdown(self, wait=True):
575 pass
576
577
578class NonThreadedExecutorFuture:
579 """The Future returned from NonThreadedExecutor
580
581 Note that this future is **not** thread-safe as it is being used
582 from the context of a non-threaded environment.
583 """
584
585 def __init__(self):
586 self._result = None
587 self._exception = None
588 self._traceback = None
589 self._done = False
590 self._done_callbacks = []
591
592 def set_result(self, result):
593 self._result = result
594 self._set_done()
595
596 def set_exception_info(self, exception, traceback):
597 self._exception = exception
598 self._traceback = traceback
599 self._set_done()
600
601 def result(self, timeout=None):
602 if self._exception:
603 raise self._exception.with_traceback(self._traceback)
604 return self._result
605
606 def _set_done(self):
607 self._done = True
608 for done_callback in self._done_callbacks:
609 self._invoke_done_callback(done_callback)
610 self._done_callbacks = []
611
612 def _invoke_done_callback(self, done_callback):
613 return done_callback(self)
614
615 def done(self):
616 return self._done
617
618 def add_done_callback(self, fn):
619 if self._done:
620 self._invoke_done_callback(fn)
621 else:
622 self._done_callbacks.append(fn)
623
624
625TaskTag = namedtuple('TaskTag', ['name'])
626
627IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
628IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')