Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/tasks.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

126 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15 

16from s3transfer.utils import get_callbacks 

17 

18try: 

19 from botocore.context import start_as_current_context 

20except ImportError: 

21 from contextlib import nullcontext as start_as_current_context 

22 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class Task: 

28 """A task associated to a TransferFuture request 

29 

30 This is a base class for other classes to subclass from. All subclassed 

31 classes must implement the main() method. 

32 """ 

33 

34 def __init__( 

35 self, 

36 transfer_coordinator, 

37 main_kwargs=None, 

38 pending_main_kwargs=None, 

39 done_callbacks=None, 

40 is_final=False, 

41 ): 

42 """ 

43 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

44 :param transfer_coordinator: The context associated to the 

45 TransferFuture for which this Task is associated with. 

46 

47 :type main_kwargs: dict 

48 :param main_kwargs: The keyword args that can be immediately supplied 

49 to the _main() method of the task 

50 

51 :type pending_main_kwargs: dict 

52 :param pending_main_kwargs: The keyword args that are depended upon 

53 by the result from a dependent future(s). The result returned by 

54 the future(s) will be used as the value for the keyword argument 

55 when _main() is called. The values for each key can be: 

56 * a single future - Once completed, its value will be the 

57 result of that single future 

58 * a list of futures - Once all of the futures complete, the 

59 value used will be a list of each completed future result 

60 value in order of when they were originally supplied. 

61 

62 :type done_callbacks: list of callbacks 

63 :param done_callbacks: A list of callbacks to call once the task is 

64 done completing. Each callback will be called with no arguments 

65 and will be called no matter if the task succeeds or an exception 

66 is raised. 

67 

68 :type is_final: boolean 

69 :param is_final: True, to indicate that this task is the final task 

70 for the TransferFuture request. By setting this value to True, it 

71 will set the result of the entire TransferFuture to the result 

72 returned by this task's main() method. 

73 """ 

74 self._transfer_coordinator = transfer_coordinator 

75 

76 self._main_kwargs = main_kwargs 

77 if self._main_kwargs is None: 

78 self._main_kwargs = {} 

79 

80 self._pending_main_kwargs = pending_main_kwargs 

81 if pending_main_kwargs is None: 

82 self._pending_main_kwargs = {} 

83 

84 self._done_callbacks = done_callbacks 

85 if self._done_callbacks is None: 

86 self._done_callbacks = [] 

87 

88 self._is_final = is_final 

89 

90 def __repr__(self): 

91 # These are the general main_kwarg parameters that we want to 

92 # display in the repr. 

93 params_to_display = [ 

94 'bucket', 

95 'key', 

96 'part_number', 

97 'final_filename', 

98 'transfer_future', 

99 'offset', 

100 'extra_args', 

101 ] 

102 main_kwargs_to_display = self._get_kwargs_with_params_to_include( 

103 self._main_kwargs, params_to_display 

104 ) 

105 return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})' 

106 

107 @property 

108 def transfer_id(self): 

109 """The id for the transfer request that the task belongs to""" 

110 return self._transfer_coordinator.transfer_id 

111 

112 def _get_kwargs_with_params_to_include(self, kwargs, include): 

113 filtered_kwargs = {} 

114 for param in include: 

115 if param in kwargs: 

116 filtered_kwargs[param] = kwargs[param] 

117 return filtered_kwargs 

118 

119 def _get_kwargs_with_params_to_exclude(self, kwargs, exclude): 

120 filtered_kwargs = {} 

121 for param, value in kwargs.items(): 

122 if param in exclude: 

123 continue 

124 filtered_kwargs[param] = value 

125 return filtered_kwargs 

126 

127 def __call__(self, ctx=None): 

128 """The callable to use when submitting a Task to an executor""" 

129 with start_as_current_context(ctx): 

130 try: 

131 # Wait for all of futures this task depends on. 

132 self._wait_on_dependent_futures() 

133 # Gather up all of the main keyword arguments for main(). 

134 # This includes the immediately provided main_kwargs and 

135 # the values for pending_main_kwargs that source from the return 

136 # values from the task's dependent futures. 

137 kwargs = self._get_all_main_kwargs() 

138 # If the task is not done (really only if some other related 

139 # task to the TransferFuture had failed) then execute the task's 

140 # main() method. 

141 if not self._transfer_coordinator.done(): 

142 return self._execute_main(kwargs) 

143 except Exception as e: 

144 self._log_and_set_exception(e) 

145 finally: 

146 # Run any done callbacks associated to the task no matter what. 

147 for done_callback in self._done_callbacks: 

148 done_callback() 

149 

150 if self._is_final: 

151 # If this is the final task announce that it is done if results 

152 # are waiting on its completion. 

153 self._transfer_coordinator.announce_done() 

154 

155 def _execute_main(self, kwargs): 

156 # Do not display keyword args that should not be printed, especially 

157 # if they are going to make the logs hard to follow. 

158 params_to_exclude = ['data'] 

159 kwargs_to_display = self._get_kwargs_with_params_to_exclude( 

160 kwargs, params_to_exclude 

161 ) 

162 # Log what is about to be executed. 

163 logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}") 

164 

165 return_value = self._main(**kwargs) 

166 # If the task is the final task, then set the TransferFuture's 

167 # value to the return value from main(). 

168 if self._is_final: 

169 self._transfer_coordinator.set_result(return_value) 

170 return return_value 

171 

172 def _log_and_set_exception(self, exception): 

173 # If an exception is ever thrown than set the exception for the 

174 # entire TransferFuture. 

175 logger.debug("Exception raised.", exc_info=True) 

176 self._transfer_coordinator.set_exception(exception) 

177 

178 def _main(self, **kwargs): 

179 """The method that will be ran in the executor 

180 

181 This method must be implemented by subclasses from Task. main() can 

182 be implemented with any arguments decided upon by the subclass. 

183 """ 

184 raise NotImplementedError('_main() must be implemented') 

185 

186 def _wait_on_dependent_futures(self): 

187 # Gather all of the futures into that main() depends on. 

188 futures_to_wait_on = [] 

189 for _, future in self._pending_main_kwargs.items(): 

190 # If the pending main keyword arg is a list then extend the list. 

191 if isinstance(future, list): 

192 futures_to_wait_on.extend(future) 

193 # If the pending main keyword arg is a future append it to the list. 

194 else: 

195 futures_to_wait_on.append(future) 

196 # Now wait for all of the futures to complete. 

197 self._wait_until_all_complete(futures_to_wait_on) 

198 

199 def _wait_until_all_complete(self, futures): 

200 # This is a basic implementation of the concurrent.futures.wait() 

201 # 

202 # concurrent.futures.wait() is not used instead because of this 

203 # reported issue: https://bugs.python.org/issue20319. 

204 # The issue would occasionally cause multipart uploads to hang 

205 # when wait() was called. With this approach, it avoids the 

206 # concurrency bug by removing any association with concurrent.futures 

207 # implementation of waiters. 

208 logger.debug( 

209 '%s about to wait for the following futures %s', self, futures 

210 ) 

211 for future in futures: 

212 try: 

213 logger.debug('%s about to wait for %s', self, future) 

214 future.result() 

215 except Exception: 

216 # result() can also produce exceptions. We want to ignore 

217 # these to be deferred to error handling down the road. 

218 pass 

219 logger.debug('%s done waiting for dependent futures', self) 

220 

221 def _get_all_main_kwargs(self): 

222 # Copy over all of the kwargs that we know is available. 

223 kwargs = copy.copy(self._main_kwargs) 

224 

225 # Iterate through the kwargs whose values are pending on the result 

226 # of a future. 

227 for key, pending_value in self._pending_main_kwargs.items(): 

228 # If the value is a list of futures, iterate though the list 

229 # appending on the result from each future. 

230 if isinstance(pending_value, list): 

231 result = [] 

232 for future in pending_value: 

233 result.append(future.result()) 

234 # Otherwise if the pending_value is a future, just wait for it. 

235 else: 

236 result = pending_value.result() 

237 # Add the retrieved value to the kwargs to be sent to the 

238 # main() call. 

239 kwargs[key] = result 

240 return kwargs 

241 

242 

243class SubmissionTask(Task): 

244 """A base class for any submission task 

245 

246 Submission tasks are the top-level task used to submit a series of tasks 

247 to execute a particular transfer. 

248 """ 

249 

250 def _main(self, transfer_future, **kwargs): 

251 """ 

252 :type transfer_future: s3transfer.futures.TransferFuture 

253 :param transfer_future: The transfer future associated with the 

254 transfer request that tasks are being submitted for 

255 

256 :param kwargs: Any additional kwargs that you may want to pass 

257 to the _submit() method 

258 """ 

259 try: 

260 self._transfer_coordinator.set_status_to_queued() 

261 

262 # Before submitting any tasks, run all of the on_queued callbacks 

263 on_queued_callbacks = get_callbacks(transfer_future, 'queued') 

264 for on_queued_callback in on_queued_callbacks: 

265 on_queued_callback() 

266 

267 # Once callbacks have been ran set the status to running. 

268 self._transfer_coordinator.set_status_to_running() 

269 

270 # Call the submit method to start submitting tasks to execute the 

271 # transfer. 

272 self._submit(transfer_future=transfer_future, **kwargs) 

273 except BaseException as e: 

274 # If there was an exception raised during the submission of task 

275 # there is a chance that the final task that signals if a transfer 

276 # is done and too run the cleanup may never have been submitted in 

277 # the first place so we need to account accordingly. 

278 # 

279 # Note that BaseException is caught, instead of Exception, because 

280 # for some implementations of executors, specifically the serial 

281 # implementation, the SubmissionTask is directly exposed to 

282 # KeyboardInterupts and so needs to cleanup and signal done 

283 # for those as well. 

284 

285 # Set the exception, that caused the process to fail. 

286 self._log_and_set_exception(e) 

287 

288 # Wait for all possibly associated futures that may have spawned 

289 # from this submission task have finished before we announce the 

290 # transfer done. 

291 self._wait_for_all_submitted_futures_to_complete() 

292 

293 # Announce the transfer as done, which will run any cleanups 

294 # and done callbacks as well. 

295 self._transfer_coordinator.announce_done() 

296 

297 def _submit(self, transfer_future, **kwargs): 

298 """The submission method to be implemented 

299 

300 :type transfer_future: s3transfer.futures.TransferFuture 

301 :param transfer_future: The transfer future associated with the 

302 transfer request that tasks are being submitted for 

303 

304 :param kwargs: Any additional keyword arguments you want to be passed 

305 in 

306 """ 

307 raise NotImplementedError('_submit() must be implemented') 

308 

309 def _wait_for_all_submitted_futures_to_complete(self): 

310 # We want to wait for all futures that were submitted to 

311 # complete as we do not want the cleanup callbacks or done callbacks 

312 # to be called to early. The main problem is any task that was 

313 # submitted may have submitted even more during its process and so 

314 # we need to account accordingly. 

315 

316 # First get all of the futures that were submitted up to this point. 

317 submitted_futures = self._transfer_coordinator.associated_futures 

318 while submitted_futures: 

319 # Wait for those futures to complete. 

320 self._wait_until_all_complete(submitted_futures) 

321 # However, more futures may have been submitted as we waited so 

322 # we need to check again for any more associated futures. 

323 possibly_more_submitted_futures = ( 

324 self._transfer_coordinator.associated_futures 

325 ) 

326 # If the current list of submitted futures is equal to the 

327 # the list of associated futures for when after the wait completes, 

328 # we can ensure no more futures were submitted in waiting on 

329 # the current list of futures to complete ultimately meaning all 

330 # futures that may have spawned from the original submission task 

331 # have completed. 

332 if submitted_futures == possibly_more_submitted_futures: 

333 break 

334 submitted_futures = possibly_more_submitted_futures 

335 

336 

337class CreateMultipartUploadTask(Task): 

338 """Task to initiate a multipart upload""" 

339 

340 def _main(self, client, bucket, key, extra_args): 

341 """ 

342 :param client: The client to use when calling CreateMultipartUpload 

343 :param bucket: The name of the bucket to upload to 

344 :param key: The name of the key to upload to 

345 :param extra_args: A dictionary of any extra arguments that may be 

346 used in the initialization. 

347 

348 :returns: The upload id of the multipart upload 

349 """ 

350 # Create the multipart upload. 

351 response = client.create_multipart_upload( 

352 Bucket=bucket, Key=key, **extra_args 

353 ) 

354 upload_id = response['UploadId'] 

355 

356 # Add a cleanup if the multipart upload fails at any point. 

357 self._transfer_coordinator.add_failure_cleanup( 

358 client.abort_multipart_upload, 

359 Bucket=bucket, 

360 Key=key, 

361 UploadId=upload_id, 

362 ) 

363 return upload_id 

364 

365 

366class CompleteMultipartUploadTask(Task): 

367 """Task to complete a multipart upload""" 

368 

369 def _main(self, client, bucket, key, upload_id, parts, extra_args): 

370 """ 

371 :param client: The client to use when calling CompleteMultipartUpload 

372 :param bucket: The name of the bucket to upload to 

373 :param key: The name of the key to upload to 

374 :param upload_id: The id of the upload 

375 :param parts: A list of parts to use to complete the multipart upload:: 

376 

377 [{'Etag': etag_value, 'PartNumber': part_number}, ...] 

378 

379 Each element in the list consists of a return value from 

380 ``UploadPartTask.main()``. 

381 :param extra_args: A dictionary of any extra arguments that may be 

382 used in completing the multipart transfer. 

383 """ 

384 client.complete_multipart_upload( 

385 Bucket=bucket, 

386 Key=key, 

387 UploadId=upload_id, 

388 MultipartUpload={'Parts': parts}, 

389 **extra_args, 

390 )