Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py: 22%

120 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15 

16from s3transfer.utils import get_callbacks 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class Task: 

22 """A task associated to a TransferFuture request 

23 

24 This is a base class for other classes to subclass from. All subclassed 

25 classes must implement the main() method. 

26 """ 

27 

28 def __init__( 

29 self, 

30 transfer_coordinator, 

31 main_kwargs=None, 

32 pending_main_kwargs=None, 

33 done_callbacks=None, 

34 is_final=False, 

35 ): 

36 """ 

37 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

38 :param transfer_coordinator: The context associated to the 

39 TransferFuture for which this Task is associated with. 

40 

41 :type main_kwargs: dict 

42 :param main_kwargs: The keyword args that can be immediately supplied 

43 to the _main() method of the task 

44 

45 :type pending_main_kwargs: dict 

46 :param pending_main_kwargs: The keyword args that are depended upon 

47 by the result from a dependent future(s). The result returned by 

48 the future(s) will be used as the value for the keyword argument 

49 when _main() is called. The values for each key can be: 

50 * a single future - Once completed, its value will be the 

51 result of that single future 

52 * a list of futures - Once all of the futures complete, the 

53 value used will be a list of each completed future result 

54 value in order of when they were originally supplied. 

55 

56 :type done_callbacks: list of callbacks 

57 :param done_callbacks: A list of callbacks to call once the task is 

58 done completing. Each callback will be called with no arguments 

59 and will be called no matter if the task succeeds or an exception 

60 is raised. 

61 

62 :type is_final: boolean 

63 :param is_final: True, to indicate that this task is the final task 

64 for the TransferFuture request. By setting this value to True, it 

65 will set the result of the entire TransferFuture to the result 

66 returned by this task's main() method. 

67 """ 

68 self._transfer_coordinator = transfer_coordinator 

69 

70 self._main_kwargs = main_kwargs 

71 if self._main_kwargs is None: 

72 self._main_kwargs = {} 

73 

74 self._pending_main_kwargs = pending_main_kwargs 

75 if pending_main_kwargs is None: 

76 self._pending_main_kwargs = {} 

77 

78 self._done_callbacks = done_callbacks 

79 if self._done_callbacks is None: 

80 self._done_callbacks = [] 

81 

82 self._is_final = is_final 

83 

84 def __repr__(self): 

85 # These are the general main_kwarg parameters that we want to 

86 # display in the repr. 

87 params_to_display = [ 

88 'bucket', 

89 'key', 

90 'part_number', 

91 'final_filename', 

92 'transfer_future', 

93 'offset', 

94 'extra_args', 

95 ] 

96 main_kwargs_to_display = self._get_kwargs_with_params_to_include( 

97 self._main_kwargs, params_to_display 

98 ) 

99 return '{}(transfer_id={}, {})'.format( 

100 self.__class__.__name__, 

101 self._transfer_coordinator.transfer_id, 

102 main_kwargs_to_display, 

103 ) 

104 

105 @property 

106 def transfer_id(self): 

107 """The id for the transfer request that the task belongs to""" 

108 return self._transfer_coordinator.transfer_id 

109 

110 def _get_kwargs_with_params_to_include(self, kwargs, include): 

111 filtered_kwargs = {} 

112 for param in include: 

113 if param in kwargs: 

114 filtered_kwargs[param] = kwargs[param] 

115 return filtered_kwargs 

116 

117 def _get_kwargs_with_params_to_exclude(self, kwargs, exclude): 

118 filtered_kwargs = {} 

119 for param, value in kwargs.items(): 

120 if param in exclude: 

121 continue 

122 filtered_kwargs[param] = value 

123 return filtered_kwargs 

124 

125 def __call__(self): 

126 """The callable to use when submitting a Task to an executor""" 

127 try: 

128 # Wait for all of futures this task depends on. 

129 self._wait_on_dependent_futures() 

130 # Gather up all of the main keyword arguments for main(). 

131 # This includes the immediately provided main_kwargs and 

132 # the values for pending_main_kwargs that source from the return 

133 # values from the task's dependent futures. 

134 kwargs = self._get_all_main_kwargs() 

135 # If the task is not done (really only if some other related 

136 # task to the TransferFuture had failed) then execute the task's 

137 # main() method. 

138 if not self._transfer_coordinator.done(): 

139 return self._execute_main(kwargs) 

140 except Exception as e: 

141 self._log_and_set_exception(e) 

142 finally: 

143 # Run any done callbacks associated to the task no matter what. 

144 for done_callback in self._done_callbacks: 

145 done_callback() 

146 

147 if self._is_final: 

148 # If this is the final task announce that it is done if results 

149 # are waiting on its completion. 

150 self._transfer_coordinator.announce_done() 

151 

152 def _execute_main(self, kwargs): 

153 # Do not display keyword args that should not be printed, especially 

154 # if they are going to make the logs hard to follow. 

155 params_to_exclude = ['data'] 

156 kwargs_to_display = self._get_kwargs_with_params_to_exclude( 

157 kwargs, params_to_exclude 

158 ) 

159 # Log what is about to be executed. 

160 logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}") 

161 

162 return_value = self._main(**kwargs) 

163 # If the task is the final task, then set the TransferFuture's 

164 # value to the return value from main(). 

165 if self._is_final: 

166 self._transfer_coordinator.set_result(return_value) 

167 return return_value 

168 

169 def _log_and_set_exception(self, exception): 

170 # If an exception is ever thrown than set the exception for the 

171 # entire TransferFuture. 

172 logger.debug("Exception raised.", exc_info=True) 

173 self._transfer_coordinator.set_exception(exception) 

174 

175 def _main(self, **kwargs): 

176 """The method that will be ran in the executor 

177 

178 This method must be implemented by subclasses from Task. main() can 

179 be implemented with any arguments decided upon by the subclass. 

180 """ 

181 raise NotImplementedError('_main() must be implemented') 

182 

183 def _wait_on_dependent_futures(self): 

184 # Gather all of the futures into that main() depends on. 

185 futures_to_wait_on = [] 

186 for _, future in self._pending_main_kwargs.items(): 

187 # If the pending main keyword arg is a list then extend the list. 

188 if isinstance(future, list): 

189 futures_to_wait_on.extend(future) 

190 # If the pending main keyword arg is a future append it to the list. 

191 else: 

192 futures_to_wait_on.append(future) 

193 # Now wait for all of the futures to complete. 

194 self._wait_until_all_complete(futures_to_wait_on) 

195 

196 def _wait_until_all_complete(self, futures): 

197 # This is a basic implementation of the concurrent.futures.wait() 

198 # 

199 # concurrent.futures.wait() is not used instead because of this 

200 # reported issue: https://bugs.python.org/issue20319. 

201 # The issue would occasionally cause multipart uploads to hang 

202 # when wait() was called. With this approach, it avoids the 

203 # concurrency bug by removing any association with concurrent.futures 

204 # implementation of waiters. 

205 logger.debug( 

206 '%s about to wait for the following futures %s', self, futures 

207 ) 

208 for future in futures: 

209 try: 

210 logger.debug('%s about to wait for %s', self, future) 

211 future.result() 

212 except Exception: 

213 # result() can also produce exceptions. We want to ignore 

214 # these to be deferred to error handling down the road. 

215 pass 

216 logger.debug('%s done waiting for dependent futures', self) 

217 

218 def _get_all_main_kwargs(self): 

219 # Copy over all of the kwargs that we know is available. 

220 kwargs = copy.copy(self._main_kwargs) 

221 

222 # Iterate through the kwargs whose values are pending on the result 

223 # of a future. 

224 for key, pending_value in self._pending_main_kwargs.items(): 

225 # If the value is a list of futures, iterate though the list 

226 # appending on the result from each future. 

227 if isinstance(pending_value, list): 

228 result = [] 

229 for future in pending_value: 

230 result.append(future.result()) 

231 # Otherwise if the pending_value is a future, just wait for it. 

232 else: 

233 result = pending_value.result() 

234 # Add the retrieved value to the kwargs to be sent to the 

235 # main() call. 

236 kwargs[key] = result 

237 return kwargs 

238 

239 

240class SubmissionTask(Task): 

241 """A base class for any submission task 

242 

243 Submission tasks are the top-level task used to submit a series of tasks 

244 to execute a particular transfer. 

245 """ 

246 

247 def _main(self, transfer_future, **kwargs): 

248 """ 

249 :type transfer_future: s3transfer.futures.TransferFuture 

250 :param transfer_future: The transfer future associated with the 

251 transfer request that tasks are being submitted for 

252 

253 :param kwargs: Any additional kwargs that you may want to pass 

254 to the _submit() method 

255 """ 

256 try: 

257 self._transfer_coordinator.set_status_to_queued() 

258 

259 # Before submitting any tasks, run all of the on_queued callbacks 

260 on_queued_callbacks = get_callbacks(transfer_future, 'queued') 

261 for on_queued_callback in on_queued_callbacks: 

262 on_queued_callback() 

263 

264 # Once callbacks have been ran set the status to running. 

265 self._transfer_coordinator.set_status_to_running() 

266 

267 # Call the submit method to start submitting tasks to execute the 

268 # transfer. 

269 self._submit(transfer_future=transfer_future, **kwargs) 

270 except BaseException as e: 

271 # If there was an exception raised during the submission of task 

272 # there is a chance that the final task that signals if a transfer 

273 # is done and too run the cleanup may never have been submitted in 

274 # the first place so we need to account accordingly. 

275 # 

276 # Note that BaseException is caught, instead of Exception, because 

277 # for some implementations of executors, specifically the serial 

278 # implementation, the SubmissionTask is directly exposed to 

279 # KeyboardInterupts and so needs to cleanup and signal done 

280 # for those as well. 

281 

282 # Set the exception, that caused the process to fail. 

283 self._log_and_set_exception(e) 

284 

285 # Wait for all possibly associated futures that may have spawned 

286 # from this submission task have finished before we announce the 

287 # transfer done. 

288 self._wait_for_all_submitted_futures_to_complete() 

289 

290 # Announce the transfer as done, which will run any cleanups 

291 # and done callbacks as well. 

292 self._transfer_coordinator.announce_done() 

293 

294 def _submit(self, transfer_future, **kwargs): 

295 """The submission method to be implemented 

296 

297 :type transfer_future: s3transfer.futures.TransferFuture 

298 :param transfer_future: The transfer future associated with the 

299 transfer request that tasks are being submitted for 

300 

301 :param kwargs: Any additional keyword arguments you want to be passed 

302 in 

303 """ 

304 raise NotImplementedError('_submit() must be implemented') 

305 

306 def _wait_for_all_submitted_futures_to_complete(self): 

307 # We want to wait for all futures that were submitted to 

308 # complete as we do not want the cleanup callbacks or done callbacks 

309 # to be called to early. The main problem is any task that was 

310 # submitted may have submitted even more during its process and so 

311 # we need to account accordingly. 

312 

313 # First get all of the futures that were submitted up to this point. 

314 submitted_futures = self._transfer_coordinator.associated_futures 

315 while submitted_futures: 

316 # Wait for those futures to complete. 

317 self._wait_until_all_complete(submitted_futures) 

318 # However, more futures may have been submitted as we waited so 

319 # we need to check again for any more associated futures. 

320 possibly_more_submitted_futures = ( 

321 self._transfer_coordinator.associated_futures 

322 ) 

323 # If the current list of submitted futures is equal to the 

324 # the list of associated futures for when after the wait completes, 

325 # we can ensure no more futures were submitted in waiting on 

326 # the current list of futures to complete ultimately meaning all 

327 # futures that may have spawned from the original submission task 

328 # have completed. 

329 if submitted_futures == possibly_more_submitted_futures: 

330 break 

331 submitted_futures = possibly_more_submitted_futures 

332 

333 

334class CreateMultipartUploadTask(Task): 

335 """Task to initiate a multipart upload""" 

336 

337 def _main(self, client, bucket, key, extra_args): 

338 """ 

339 :param client: The client to use when calling CreateMultipartUpload 

340 :param bucket: The name of the bucket to upload to 

341 :param key: The name of the key to upload to 

342 :param extra_args: A dictionary of any extra arguments that may be 

343 used in the initialization. 

344 

345 :returns: The upload id of the multipart upload 

346 """ 

347 # Create the multipart upload. 

348 response = client.create_multipart_upload( 

349 Bucket=bucket, Key=key, **extra_args 

350 ) 

351 upload_id = response['UploadId'] 

352 

353 # Add a cleanup if the multipart upload fails at any point. 

354 self._transfer_coordinator.add_failure_cleanup( 

355 client.abort_multipart_upload, 

356 Bucket=bucket, 

357 Key=key, 

358 UploadId=upload_id, 

359 ) 

360 return upload_id 

361 

362 

363class CompleteMultipartUploadTask(Task): 

364 """Task to complete a multipart upload""" 

365 

366 def _main(self, client, bucket, key, upload_id, parts, extra_args): 

367 """ 

368 :param client: The client to use when calling CompleteMultipartUpload 

369 :param bucket: The name of the bucket to upload to 

370 :param key: The name of the key to upload to 

371 :param upload_id: The id of the upload 

372 :param parts: A list of parts to use to complete the multipart upload:: 

373 

374 [{'Etag': etag_value, 'PartNumber': part_number}, ...] 

375 

376 Each element in the list consists of a return value from 

377 ``UploadPartTask.main()``. 

378 :param extra_args: A dictionary of any extra arguments that may be 

379 used in completing the multipart transfer. 

380 """ 

381 client.complete_multipart_upload( 

382 Bucket=bucket, 

383 Key=key, 

384 UploadId=upload_id, 

385 MultipartUpload={'Parts': parts}, 

386 **extra_args, 

387 )