1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
15
16from s3transfer.utils import get_callbacks
17
18try:
19 from botocore.context import start_as_current_context
20except ImportError:
21 from contextlib import nullcontext as start_as_current_context
22
23
24logger = logging.getLogger(__name__)
25
26
27class Task:
28 """A task associated to a TransferFuture request
29
30 This is a base class for other classes to subclass from. All subclassed
31 classes must implement the main() method.
32 """
33
34 def __init__(
35 self,
36 transfer_coordinator,
37 main_kwargs=None,
38 pending_main_kwargs=None,
39 done_callbacks=None,
40 is_final=False,
41 ):
42 """
43 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
44 :param transfer_coordinator: The context associated to the
45 TransferFuture for which this Task is associated with.
46
47 :type main_kwargs: dict
48 :param main_kwargs: The keyword args that can be immediately supplied
49 to the _main() method of the task
50
51 :type pending_main_kwargs: dict
52 :param pending_main_kwargs: The keyword args that are depended upon
53 by the result from a dependent future(s). The result returned by
54 the future(s) will be used as the value for the keyword argument
55 when _main() is called. The values for each key can be:
56 * a single future - Once completed, its value will be the
57 result of that single future
58 * a list of futures - Once all of the futures complete, the
59 value used will be a list of each completed future result
60 value in order of when they were originally supplied.
61
62 :type done_callbacks: list of callbacks
63 :param done_callbacks: A list of callbacks to call once the task is
64 done completing. Each callback will be called with no arguments
65 and will be called no matter if the task succeeds or an exception
66 is raised.
67
68 :type is_final: boolean
69 :param is_final: True, to indicate that this task is the final task
70 for the TransferFuture request. By setting this value to True, it
71 will set the result of the entire TransferFuture to the result
72 returned by this task's main() method.
73 """
74 self._transfer_coordinator = transfer_coordinator
75
76 self._main_kwargs = main_kwargs
77 if self._main_kwargs is None:
78 self._main_kwargs = {}
79
80 self._pending_main_kwargs = pending_main_kwargs
81 if pending_main_kwargs is None:
82 self._pending_main_kwargs = {}
83
84 self._done_callbacks = done_callbacks
85 if self._done_callbacks is None:
86 self._done_callbacks = []
87
88 self._is_final = is_final
89
90 def __repr__(self):
91 # These are the general main_kwarg parameters that we want to
92 # display in the repr.
93 params_to_display = [
94 'bucket',
95 'key',
96 'part_number',
97 'final_filename',
98 'transfer_future',
99 'offset',
100 'extra_args',
101 ]
102 main_kwargs_to_display = self._get_kwargs_with_params_to_include(
103 self._main_kwargs, params_to_display
104 )
105 return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})'
106
107 @property
108 def transfer_id(self):
109 """The id for the transfer request that the task belongs to"""
110 return self._transfer_coordinator.transfer_id
111
112 def _get_kwargs_with_params_to_include(self, kwargs, include):
113 filtered_kwargs = {}
114 for param in include:
115 if param in kwargs:
116 filtered_kwargs[param] = kwargs[param]
117 return filtered_kwargs
118
119 def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
120 filtered_kwargs = {}
121 for param, value in kwargs.items():
122 if param in exclude:
123 continue
124 filtered_kwargs[param] = value
125 return filtered_kwargs
126
127 def __call__(self, ctx=None):
128 """The callable to use when submitting a Task to an executor"""
129 with start_as_current_context(ctx):
130 try:
131 # Wait for all of futures this task depends on.
132 self._wait_on_dependent_futures()
133 # Gather up all of the main keyword arguments for main().
134 # This includes the immediately provided main_kwargs and
135 # the values for pending_main_kwargs that source from the return
136 # values from the task's dependent futures.
137 kwargs = self._get_all_main_kwargs()
138 # If the task is not done (really only if some other related
139 # task to the TransferFuture had failed) then execute the task's
140 # main() method.
141 if not self._transfer_coordinator.done():
142 return self._execute_main(kwargs)
143 except Exception as e:
144 self._log_and_set_exception(e)
145 finally:
146 # Run any done callbacks associated to the task no matter what.
147 for done_callback in self._done_callbacks:
148 done_callback()
149
150 if self._is_final:
151 # If this is the final task announce that it is done if results
152 # are waiting on its completion.
153 self._transfer_coordinator.announce_done()
154
155 def _execute_main(self, kwargs):
156 # Do not display keyword args that should not be printed, especially
157 # if they are going to make the logs hard to follow.
158 params_to_exclude = ['data']
159 kwargs_to_display = self._get_kwargs_with_params_to_exclude(
160 kwargs, params_to_exclude
161 )
162 # Log what is about to be executed.
163 logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}")
164
165 return_value = self._main(**kwargs)
166 # If the task is the final task, then set the TransferFuture's
167 # value to the return value from main().
168 if self._is_final:
169 self._transfer_coordinator.set_result(return_value)
170 return return_value
171
172 def _log_and_set_exception(self, exception):
173 # If an exception is ever thrown than set the exception for the
174 # entire TransferFuture.
175 logger.debug("Exception raised.", exc_info=True)
176 self._transfer_coordinator.set_exception(exception)
177
178 def _main(self, **kwargs):
179 """The method that will be ran in the executor
180
181 This method must be implemented by subclasses from Task. main() can
182 be implemented with any arguments decided upon by the subclass.
183 """
184 raise NotImplementedError('_main() must be implemented')
185
186 def _wait_on_dependent_futures(self):
187 # Gather all of the futures into that main() depends on.
188 futures_to_wait_on = []
189 for _, future in self._pending_main_kwargs.items():
190 # If the pending main keyword arg is a list then extend the list.
191 if isinstance(future, list):
192 futures_to_wait_on.extend(future)
193 # If the pending main keyword arg is a future append it to the list.
194 else:
195 futures_to_wait_on.append(future)
196 # Now wait for all of the futures to complete.
197 self._wait_until_all_complete(futures_to_wait_on)
198
199 def _wait_until_all_complete(self, futures):
200 # This is a basic implementation of the concurrent.futures.wait()
201 #
202 # concurrent.futures.wait() is not used instead because of this
203 # reported issue: https://bugs.python.org/issue20319.
204 # The issue would occasionally cause multipart uploads to hang
205 # when wait() was called. With this approach, it avoids the
206 # concurrency bug by removing any association with concurrent.futures
207 # implementation of waiters.
208 logger.debug(
209 '%s about to wait for the following futures %s', self, futures
210 )
211 for future in futures:
212 try:
213 logger.debug('%s about to wait for %s', self, future)
214 future.result()
215 except Exception:
216 # result() can also produce exceptions. We want to ignore
217 # these to be deferred to error handling down the road.
218 pass
219 logger.debug('%s done waiting for dependent futures', self)
220
221 def _get_all_main_kwargs(self):
222 # Copy over all of the kwargs that we know is available.
223 kwargs = copy.copy(self._main_kwargs)
224
225 # Iterate through the kwargs whose values are pending on the result
226 # of a future.
227 for key, pending_value in self._pending_main_kwargs.items():
228 # If the value is a list of futures, iterate though the list
229 # appending on the result from each future.
230 if isinstance(pending_value, list):
231 result = []
232 for future in pending_value:
233 result.append(future.result())
234 # Otherwise if the pending_value is a future, just wait for it.
235 else:
236 result = pending_value.result()
237 # Add the retrieved value to the kwargs to be sent to the
238 # main() call.
239 kwargs[key] = result
240 return kwargs
241
242
243class SubmissionTask(Task):
244 """A base class for any submission task
245
246 Submission tasks are the top-level task used to submit a series of tasks
247 to execute a particular transfer.
248 """
249
250 def _main(self, transfer_future, **kwargs):
251 """
252 :type transfer_future: s3transfer.futures.TransferFuture
253 :param transfer_future: The transfer future associated with the
254 transfer request that tasks are being submitted for
255
256 :param kwargs: Any additional kwargs that you may want to pass
257 to the _submit() method
258 """
259 try:
260 self._transfer_coordinator.set_status_to_queued()
261
262 # Before submitting any tasks, run all of the on_queued callbacks
263 on_queued_callbacks = get_callbacks(transfer_future, 'queued')
264 for on_queued_callback in on_queued_callbacks:
265 on_queued_callback()
266
267 # Once callbacks have been ran set the status to running.
268 self._transfer_coordinator.set_status_to_running()
269
270 # Call the submit method to start submitting tasks to execute the
271 # transfer.
272 self._submit(transfer_future=transfer_future, **kwargs)
273 except BaseException as e:
274 # If there was an exception raised during the submission of task
275 # there is a chance that the final task that signals if a transfer
276 # is done and too run the cleanup may never have been submitted in
277 # the first place so we need to account accordingly.
278 #
279 # Note that BaseException is caught, instead of Exception, because
280 # for some implementations of executors, specifically the serial
281 # implementation, the SubmissionTask is directly exposed to
282 # KeyboardInterupts and so needs to cleanup and signal done
283 # for those as well.
284
285 # Set the exception, that caused the process to fail.
286 self._log_and_set_exception(e)
287
288 # Wait for all possibly associated futures that may have spawned
289 # from this submission task have finished before we announce the
290 # transfer done.
291 self._wait_for_all_submitted_futures_to_complete()
292
293 # Announce the transfer as done, which will run any cleanups
294 # and done callbacks as well.
295 self._transfer_coordinator.announce_done()
296
297 def _submit(self, transfer_future, **kwargs):
298 """The submission method to be implemented
299
300 :type transfer_future: s3transfer.futures.TransferFuture
301 :param transfer_future: The transfer future associated with the
302 transfer request that tasks are being submitted for
303
304 :param kwargs: Any additional keyword arguments you want to be passed
305 in
306 """
307 raise NotImplementedError('_submit() must be implemented')
308
309 def _wait_for_all_submitted_futures_to_complete(self):
310 # We want to wait for all futures that were submitted to
311 # complete as we do not want the cleanup callbacks or done callbacks
312 # to be called to early. The main problem is any task that was
313 # submitted may have submitted even more during its process and so
314 # we need to account accordingly.
315
316 # First get all of the futures that were submitted up to this point.
317 submitted_futures = self._transfer_coordinator.associated_futures
318 while submitted_futures:
319 # Wait for those futures to complete.
320 self._wait_until_all_complete(submitted_futures)
321 # However, more futures may have been submitted as we waited so
322 # we need to check again for any more associated futures.
323 possibly_more_submitted_futures = (
324 self._transfer_coordinator.associated_futures
325 )
326 # If the current list of submitted futures is equal to the
327 # the list of associated futures for when after the wait completes,
328 # we can ensure no more futures were submitted in waiting on
329 # the current list of futures to complete ultimately meaning all
330 # futures that may have spawned from the original submission task
331 # have completed.
332 if submitted_futures == possibly_more_submitted_futures:
333 break
334 submitted_futures = possibly_more_submitted_futures
335
336
337class CreateMultipartUploadTask(Task):
338 """Task to initiate a multipart upload"""
339
340 def _main(self, client, bucket, key, extra_args):
341 """
342 :param client: The client to use when calling CreateMultipartUpload
343 :param bucket: The name of the bucket to upload to
344 :param key: The name of the key to upload to
345 :param extra_args: A dictionary of any extra arguments that may be
346 used in the initialization.
347
348 :returns: The upload id of the multipart upload
349 """
350 # Create the multipart upload.
351 response = client.create_multipart_upload(
352 Bucket=bucket, Key=key, **extra_args
353 )
354 upload_id = response['UploadId']
355
356 # Add a cleanup if the multipart upload fails at any point.
357 self._transfer_coordinator.add_failure_cleanup(
358 client.abort_multipart_upload,
359 Bucket=bucket,
360 Key=key,
361 UploadId=upload_id,
362 )
363 return upload_id
364
365
366class CompleteMultipartUploadTask(Task):
367 """Task to complete a multipart upload"""
368
369 def _main(self, client, bucket, key, upload_id, parts, extra_args):
370 """
371 :param client: The client to use when calling CompleteMultipartUpload
372 :param bucket: The name of the bucket to upload to
373 :param key: The name of the key to upload to
374 :param upload_id: The id of the upload
375 :param parts: A list of parts to use to complete the multipart upload::
376
377 [{'Etag': etag_value, 'PartNumber': part_number}, ...]
378
379 Each element in the list consists of a return value from
380 ``UploadPartTask.main()``.
381 :param extra_args: A dictionary of any extra arguments that may be
382 used in completing the multipart transfer.
383 """
384 client.complete_multipart_upload(
385 Bucket=bucket,
386 Key=key,
387 UploadId=upload_id,
388 MultipartUpload={'Parts': parts},
389 **extra_args,
390 )