Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py: 22%
120 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import logging
16from s3transfer.utils import get_callbacks
18logger = logging.getLogger(__name__)
21class Task:
22 """A task associated to a TransferFuture request
24 This is a base class for other classes to subclass from. All subclassed
25 classes must implement the main() method.
26 """
28 def __init__(
29 self,
30 transfer_coordinator,
31 main_kwargs=None,
32 pending_main_kwargs=None,
33 done_callbacks=None,
34 is_final=False,
35 ):
36 """
37 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
38 :param transfer_coordinator: The context associated to the
39 TransferFuture for which this Task is associated with.
41 :type main_kwargs: dict
42 :param main_kwargs: The keyword args that can be immediately supplied
43 to the _main() method of the task
45 :type pending_main_kwargs: dict
46 :param pending_main_kwargs: The keyword args that are depended upon
47 by the result from a dependent future(s). The result returned by
48 the future(s) will be used as the value for the keyword argument
49 when _main() is called. The values for each key can be:
50 * a single future - Once completed, its value will be the
51 result of that single future
52 * a list of futures - Once all of the futures complete, the
53 value used will be a list of each completed future result
54 value in order of when they were originally supplied.
56 :type done_callbacks: list of callbacks
57 :param done_callbacks: A list of callbacks to call once the task is
58 done completing. Each callback will be called with no arguments
59 and will be called no matter if the task succeeds or an exception
60 is raised.
62 :type is_final: boolean
63 :param is_final: True, to indicate that this task is the final task
64 for the TransferFuture request. By setting this value to True, it
65 will set the result of the entire TransferFuture to the result
66 returned by this task's main() method.
67 """
68 self._transfer_coordinator = transfer_coordinator
70 self._main_kwargs = main_kwargs
71 if self._main_kwargs is None:
72 self._main_kwargs = {}
74 self._pending_main_kwargs = pending_main_kwargs
75 if pending_main_kwargs is None:
76 self._pending_main_kwargs = {}
78 self._done_callbacks = done_callbacks
79 if self._done_callbacks is None:
80 self._done_callbacks = []
82 self._is_final = is_final
84 def __repr__(self):
85 # These are the general main_kwarg parameters that we want to
86 # display in the repr.
87 params_to_display = [
88 'bucket',
89 'key',
90 'part_number',
91 'final_filename',
92 'transfer_future',
93 'offset',
94 'extra_args',
95 ]
96 main_kwargs_to_display = self._get_kwargs_with_params_to_include(
97 self._main_kwargs, params_to_display
98 )
99 return '{}(transfer_id={}, {})'.format(
100 self.__class__.__name__,
101 self._transfer_coordinator.transfer_id,
102 main_kwargs_to_display,
103 )
105 @property
106 def transfer_id(self):
107 """The id for the transfer request that the task belongs to"""
108 return self._transfer_coordinator.transfer_id
110 def _get_kwargs_with_params_to_include(self, kwargs, include):
111 filtered_kwargs = {}
112 for param in include:
113 if param in kwargs:
114 filtered_kwargs[param] = kwargs[param]
115 return filtered_kwargs
117 def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
118 filtered_kwargs = {}
119 for param, value in kwargs.items():
120 if param in exclude:
121 continue
122 filtered_kwargs[param] = value
123 return filtered_kwargs
125 def __call__(self):
126 """The callable to use when submitting a Task to an executor"""
127 try:
128 # Wait for all of futures this task depends on.
129 self._wait_on_dependent_futures()
130 # Gather up all of the main keyword arguments for main().
131 # This includes the immediately provided main_kwargs and
132 # the values for pending_main_kwargs that source from the return
133 # values from the task's dependent futures.
134 kwargs = self._get_all_main_kwargs()
135 # If the task is not done (really only if some other related
136 # task to the TransferFuture had failed) then execute the task's
137 # main() method.
138 if not self._transfer_coordinator.done():
139 return self._execute_main(kwargs)
140 except Exception as e:
141 self._log_and_set_exception(e)
142 finally:
143 # Run any done callbacks associated to the task no matter what.
144 for done_callback in self._done_callbacks:
145 done_callback()
147 if self._is_final:
148 # If this is the final task announce that it is done if results
149 # are waiting on its completion.
150 self._transfer_coordinator.announce_done()
152 def _execute_main(self, kwargs):
153 # Do not display keyword args that should not be printed, especially
154 # if they are going to make the logs hard to follow.
155 params_to_exclude = ['data']
156 kwargs_to_display = self._get_kwargs_with_params_to_exclude(
157 kwargs, params_to_exclude
158 )
159 # Log what is about to be executed.
160 logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}")
162 return_value = self._main(**kwargs)
163 # If the task is the final task, then set the TransferFuture's
164 # value to the return value from main().
165 if self._is_final:
166 self._transfer_coordinator.set_result(return_value)
167 return return_value
169 def _log_and_set_exception(self, exception):
170 # If an exception is ever thrown than set the exception for the
171 # entire TransferFuture.
172 logger.debug("Exception raised.", exc_info=True)
173 self._transfer_coordinator.set_exception(exception)
175 def _main(self, **kwargs):
176 """The method that will be ran in the executor
178 This method must be implemented by subclasses from Task. main() can
179 be implemented with any arguments decided upon by the subclass.
180 """
181 raise NotImplementedError('_main() must be implemented')
183 def _wait_on_dependent_futures(self):
184 # Gather all of the futures into that main() depends on.
185 futures_to_wait_on = []
186 for _, future in self._pending_main_kwargs.items():
187 # If the pending main keyword arg is a list then extend the list.
188 if isinstance(future, list):
189 futures_to_wait_on.extend(future)
190 # If the pending main keyword arg is a future append it to the list.
191 else:
192 futures_to_wait_on.append(future)
193 # Now wait for all of the futures to complete.
194 self._wait_until_all_complete(futures_to_wait_on)
196 def _wait_until_all_complete(self, futures):
197 # This is a basic implementation of the concurrent.futures.wait()
198 #
199 # concurrent.futures.wait() is not used instead because of this
200 # reported issue: https://bugs.python.org/issue20319.
201 # The issue would occasionally cause multipart uploads to hang
202 # when wait() was called. With this approach, it avoids the
203 # concurrency bug by removing any association with concurrent.futures
204 # implementation of waiters.
205 logger.debug(
206 '%s about to wait for the following futures %s', self, futures
207 )
208 for future in futures:
209 try:
210 logger.debug('%s about to wait for %s', self, future)
211 future.result()
212 except Exception:
213 # result() can also produce exceptions. We want to ignore
214 # these to be deferred to error handling down the road.
215 pass
216 logger.debug('%s done waiting for dependent futures', self)
218 def _get_all_main_kwargs(self):
219 # Copy over all of the kwargs that we know is available.
220 kwargs = copy.copy(self._main_kwargs)
222 # Iterate through the kwargs whose values are pending on the result
223 # of a future.
224 for key, pending_value in self._pending_main_kwargs.items():
225 # If the value is a list of futures, iterate though the list
226 # appending on the result from each future.
227 if isinstance(pending_value, list):
228 result = []
229 for future in pending_value:
230 result.append(future.result())
231 # Otherwise if the pending_value is a future, just wait for it.
232 else:
233 result = pending_value.result()
234 # Add the retrieved value to the kwargs to be sent to the
235 # main() call.
236 kwargs[key] = result
237 return kwargs
240class SubmissionTask(Task):
241 """A base class for any submission task
243 Submission tasks are the top-level task used to submit a series of tasks
244 to execute a particular transfer.
245 """
247 def _main(self, transfer_future, **kwargs):
248 """
249 :type transfer_future: s3transfer.futures.TransferFuture
250 :param transfer_future: The transfer future associated with the
251 transfer request that tasks are being submitted for
253 :param kwargs: Any additional kwargs that you may want to pass
254 to the _submit() method
255 """
256 try:
257 self._transfer_coordinator.set_status_to_queued()
259 # Before submitting any tasks, run all of the on_queued callbacks
260 on_queued_callbacks = get_callbacks(transfer_future, 'queued')
261 for on_queued_callback in on_queued_callbacks:
262 on_queued_callback()
264 # Once callbacks have been ran set the status to running.
265 self._transfer_coordinator.set_status_to_running()
267 # Call the submit method to start submitting tasks to execute the
268 # transfer.
269 self._submit(transfer_future=transfer_future, **kwargs)
270 except BaseException as e:
271 # If there was an exception raised during the submission of task
272 # there is a chance that the final task that signals if a transfer
273 # is done and too run the cleanup may never have been submitted in
274 # the first place so we need to account accordingly.
275 #
276 # Note that BaseException is caught, instead of Exception, because
277 # for some implementations of executors, specifically the serial
278 # implementation, the SubmissionTask is directly exposed to
279 # KeyboardInterupts and so needs to cleanup and signal done
280 # for those as well.
282 # Set the exception, that caused the process to fail.
283 self._log_and_set_exception(e)
285 # Wait for all possibly associated futures that may have spawned
286 # from this submission task have finished before we announce the
287 # transfer done.
288 self._wait_for_all_submitted_futures_to_complete()
290 # Announce the transfer as done, which will run any cleanups
291 # and done callbacks as well.
292 self._transfer_coordinator.announce_done()
294 def _submit(self, transfer_future, **kwargs):
295 """The submission method to be implemented
297 :type transfer_future: s3transfer.futures.TransferFuture
298 :param transfer_future: The transfer future associated with the
299 transfer request that tasks are being submitted for
301 :param kwargs: Any additional keyword arguments you want to be passed
302 in
303 """
304 raise NotImplementedError('_submit() must be implemented')
306 def _wait_for_all_submitted_futures_to_complete(self):
307 # We want to wait for all futures that were submitted to
308 # complete as we do not want the cleanup callbacks or done callbacks
309 # to be called to early. The main problem is any task that was
310 # submitted may have submitted even more during its process and so
311 # we need to account accordingly.
313 # First get all of the futures that were submitted up to this point.
314 submitted_futures = self._transfer_coordinator.associated_futures
315 while submitted_futures:
316 # Wait for those futures to complete.
317 self._wait_until_all_complete(submitted_futures)
318 # However, more futures may have been submitted as we waited so
319 # we need to check again for any more associated futures.
320 possibly_more_submitted_futures = (
321 self._transfer_coordinator.associated_futures
322 )
323 # If the current list of submitted futures is equal to the
324 # the list of associated futures for when after the wait completes,
325 # we can ensure no more futures were submitted in waiting on
326 # the current list of futures to complete ultimately meaning all
327 # futures that may have spawned from the original submission task
328 # have completed.
329 if submitted_futures == possibly_more_submitted_futures:
330 break
331 submitted_futures = possibly_more_submitted_futures
334class CreateMultipartUploadTask(Task):
335 """Task to initiate a multipart upload"""
337 def _main(self, client, bucket, key, extra_args):
338 """
339 :param client: The client to use when calling CreateMultipartUpload
340 :param bucket: The name of the bucket to upload to
341 :param key: The name of the key to upload to
342 :param extra_args: A dictionary of any extra arguments that may be
343 used in the initialization.
345 :returns: The upload id of the multipart upload
346 """
347 # Create the multipart upload.
348 response = client.create_multipart_upload(
349 Bucket=bucket, Key=key, **extra_args
350 )
351 upload_id = response['UploadId']
353 # Add a cleanup if the multipart upload fails at any point.
354 self._transfer_coordinator.add_failure_cleanup(
355 client.abort_multipart_upload,
356 Bucket=bucket,
357 Key=key,
358 UploadId=upload_id,
359 )
360 return upload_id
363class CompleteMultipartUploadTask(Task):
364 """Task to complete a multipart upload"""
366 def _main(self, client, bucket, key, upload_id, parts, extra_args):
367 """
368 :param client: The client to use when calling CompleteMultipartUpload
369 :param bucket: The name of the bucket to upload to
370 :param key: The name of the key to upload to
371 :param upload_id: The id of the upload
372 :param parts: A list of parts to use to complete the multipart upload::
374 [{'Etag': etag_value, 'PartNumber': part_number}, ...]
376 Each element in the list consists of a return value from
377 ``UploadPartTask.main()``.
378 :param extra_args: A dictionary of any extra arguments that may be
379 used in completing the multipart transfer.
380 """
381 client.complete_multipart_upload(
382 Bucket=bucket,
383 Key=key,
384 UploadId=upload_id,
385 MultipartUpload={'Parts': parts},
386 **extra_args,
387 )