Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/futures.py: 38%

265 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import sys 

16import threading 

17from collections import namedtuple 

18from concurrent import futures 

19 

20from s3transfer.compat import MAXINT 

21from s3transfer.exceptions import CancelledError, TransferNotDoneError 

22from s3transfer.utils import FunctionContainer, TaskSemaphore 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class BaseTransferFuture: 

28 @property 

29 def meta(self): 

30 """The metadata associated to the TransferFuture""" 

31 raise NotImplementedError('meta') 

32 

33 def done(self): 

34 """Determines if a TransferFuture has completed 

35 

36 :returns: True if completed. False, otherwise. 

37 """ 

38 raise NotImplementedError('done()') 

39 

40 def result(self): 

41 """Waits until TransferFuture is done and returns the result 

42 

43 If the TransferFuture succeeded, it will return the result. If the 

44 TransferFuture failed, it will raise the exception associated to the 

45 failure. 

46 """ 

47 raise NotImplementedError('result()') 

48 

49 def cancel(self): 

50 """Cancels the request associated with the TransferFuture""" 

51 raise NotImplementedError('cancel()') 

52 

53 

54class BaseTransferMeta: 

55 @property 

56 def call_args(self): 

57 """The call args used in the transfer request""" 

58 raise NotImplementedError('call_args') 

59 

60 @property 

61 def transfer_id(self): 

62 """The unique id of the transfer""" 

63 raise NotImplementedError('transfer_id') 

64 

65 @property 

66 def user_context(self): 

67 """A dictionary that requesters can store data in""" 

68 raise NotImplementedError('user_context') 

69 

70 

71class TransferFuture(BaseTransferFuture): 

72 def __init__(self, meta=None, coordinator=None): 

73 """The future associated to a submitted transfer request 

74 

75 :type meta: TransferMeta 

76 :param meta: The metadata associated to the request. This object 

77 is visible to the requester. 

78 

79 :type coordinator: TransferCoordinator 

80 :param coordinator: The coordinator associated to the request. This 

81 object is not visible to the requester. 

82 """ 

83 self._meta = meta 

84 if meta is None: 

85 self._meta = TransferMeta() 

86 

87 self._coordinator = coordinator 

88 if coordinator is None: 

89 self._coordinator = TransferCoordinator() 

90 

91 @property 

92 def meta(self): 

93 return self._meta 

94 

95 def done(self): 

96 return self._coordinator.done() 

97 

98 def result(self): 

99 try: 

100 # Usually the result() method blocks until the transfer is done, 

101 # however if a KeyboardInterrupt is raised we want want to exit 

102 # out of this and propagate the exception. 

103 return self._coordinator.result() 

104 except KeyboardInterrupt as e: 

105 self.cancel() 

106 raise e 

107 

108 def cancel(self): 

109 self._coordinator.cancel() 

110 

111 def set_exception(self, exception): 

112 """Sets the exception on the future.""" 

113 if not self.done(): 

114 raise TransferNotDoneError( 

115 'set_exception can only be called once the transfer is ' 

116 'complete.' 

117 ) 

118 self._coordinator.set_exception(exception, override=True) 

119 

120 

121class TransferMeta(BaseTransferMeta): 

122 """Holds metadata about the TransferFuture""" 

123 

124 def __init__(self, call_args=None, transfer_id=None): 

125 self._call_args = call_args 

126 self._transfer_id = transfer_id 

127 self._size = None 

128 self._user_context = {} 

129 

130 @property 

131 def call_args(self): 

132 """The call args used in the transfer request""" 

133 return self._call_args 

134 

135 @property 

136 def transfer_id(self): 

137 """The unique id of the transfer""" 

138 return self._transfer_id 

139 

140 @property 

141 def size(self): 

142 """The size of the transfer request if known""" 

143 return self._size 

144 

145 @property 

146 def user_context(self): 

147 """A dictionary that requesters can store data in""" 

148 return self._user_context 

149 

150 def provide_transfer_size(self, size): 

151 """A method to provide the size of a transfer request 

152 

153 By providing this value, the TransferManager will not try to 

154 call HeadObject or use the use OS to determine the size of the 

155 transfer. 

156 """ 

157 self._size = size 

158 

159 

160class TransferCoordinator: 

161 """A helper class for managing TransferFuture""" 

162 

163 def __init__(self, transfer_id=None): 

164 self.transfer_id = transfer_id 

165 self._status = 'not-started' 

166 self._result = None 

167 self._exception = None 

168 self._associated_futures = set() 

169 self._failure_cleanups = [] 

170 self._done_callbacks = [] 

171 self._done_event = threading.Event() 

172 self._lock = threading.Lock() 

173 self._associated_futures_lock = threading.Lock() 

174 self._done_callbacks_lock = threading.Lock() 

175 self._failure_cleanups_lock = threading.Lock() 

176 

177 def __repr__(self): 

178 return '{}(transfer_id={})'.format( 

179 self.__class__.__name__, self.transfer_id 

180 ) 

181 

182 @property 

183 def exception(self): 

184 return self._exception 

185 

186 @property 

187 def associated_futures(self): 

188 """The list of futures associated to the inprogress TransferFuture 

189 

190 Once the transfer finishes this list becomes empty as the transfer 

191 is considered done and there should be no running futures left. 

192 """ 

193 with self._associated_futures_lock: 

194 # We return a copy of the list because we do not want to 

195 # processing the returned list while another thread is adding 

196 # more futures to the actual list. 

197 return copy.copy(self._associated_futures) 

198 

199 @property 

200 def failure_cleanups(self): 

201 """The list of callbacks to call when the TransferFuture fails""" 

202 return self._failure_cleanups 

203 

204 @property 

205 def status(self): 

206 """The status of the TransferFuture 

207 

208 The currently supported states are: 

209 * not-started - Has yet to start. If in this state, a transfer 

210 can be canceled immediately and nothing will happen. 

211 * queued - SubmissionTask is about to submit tasks 

212 * running - Is inprogress. In-progress as of now means that 

213 the SubmissionTask that runs the transfer is being executed. So 

214 there is no guarantee any transfer requests had been made to 

215 S3 if this state is reached. 

216 * cancelled - Was cancelled 

217 * failed - An exception other than CancelledError was thrown 

218 * success - No exceptions were thrown and is done. 

219 """ 

220 return self._status 

221 

222 def set_result(self, result): 

223 """Set a result for the TransferFuture 

224 

225 Implies that the TransferFuture succeeded. This will always set a 

226 result because it is invoked on the final task where there is only 

227 ever one final task and it is ran at the very end of a transfer 

228 process. So if a result is being set for this final task, the transfer 

229 succeeded even if something came a long and canceled the transfer 

230 on the final task. 

231 """ 

232 with self._lock: 

233 self._exception = None 

234 self._result = result 

235 self._status = 'success' 

236 

237 def set_exception(self, exception, override=False): 

238 """Set an exception for the TransferFuture 

239 

240 Implies the TransferFuture failed. 

241 

242 :param exception: The exception that cause the transfer to fail. 

243 :param override: If True, override any existing state. 

244 """ 

245 with self._lock: 

246 if not self.done() or override: 

247 self._exception = exception 

248 self._status = 'failed' 

249 

250 def result(self): 

251 """Waits until TransferFuture is done and returns the result 

252 

253 If the TransferFuture succeeded, it will return the result. If the 

254 TransferFuture failed, it will raise the exception associated to the 

255 failure. 

256 """ 

257 # Doing a wait() with no timeout cannot be interrupted in python2 but 

258 # can be interrupted in python3 so we just wait with the largest 

259 # possible value integer value, which is on the scale of billions of 

260 # years... 

261 self._done_event.wait(MAXINT) 

262 

263 # Once done waiting, raise an exception if present or return the 

264 # final result. 

265 if self._exception: 

266 raise self._exception 

267 return self._result 

268 

269 def cancel(self, msg='', exc_type=CancelledError): 

270 """Cancels the TransferFuture 

271 

272 :param msg: The message to attach to the cancellation 

273 :param exc_type: The type of exception to set for the cancellation 

274 """ 

275 with self._lock: 

276 if not self.done(): 

277 should_announce_done = False 

278 logger.debug('%s cancel(%s) called', self, msg) 

279 self._exception = exc_type(msg) 

280 if self._status == 'not-started': 

281 should_announce_done = True 

282 self._status = 'cancelled' 

283 if should_announce_done: 

284 self.announce_done() 

285 

286 def set_status_to_queued(self): 

287 """Sets the TransferFutrue's status to running""" 

288 self._transition_to_non_done_state('queued') 

289 

290 def set_status_to_running(self): 

291 """Sets the TransferFuture's status to running""" 

292 self._transition_to_non_done_state('running') 

293 

294 def _transition_to_non_done_state(self, desired_state): 

295 with self._lock: 

296 if self.done(): 

297 raise RuntimeError( 

298 'Unable to transition from done state %s to non-done ' 

299 'state %s.' % (self.status, desired_state) 

300 ) 

301 self._status = desired_state 

302 

303 def submit(self, executor, task, tag=None): 

304 """Submits a task to a provided executor 

305 

306 :type executor: s3transfer.futures.BoundedExecutor 

307 :param executor: The executor to submit the callable to 

308 

309 :type task: s3transfer.tasks.Task 

310 :param task: The task to submit to the executor 

311 

312 :type tag: s3transfer.futures.TaskTag 

313 :param tag: A tag to associate to the submitted task 

314 

315 :rtype: concurrent.futures.Future 

316 :returns: A future representing the submitted task 

317 """ 

318 logger.debug( 

319 "Submitting task {} to executor {} for transfer request: {}.".format( 

320 task, executor, self.transfer_id 

321 ) 

322 ) 

323 future = executor.submit(task, tag=tag) 

324 # Add this created future to the list of associated future just 

325 # in case it is needed during cleanups. 

326 self.add_associated_future(future) 

327 future.add_done_callback( 

328 FunctionContainer(self.remove_associated_future, future) 

329 ) 

330 return future 

331 

332 def done(self): 

333 """Determines if a TransferFuture has completed 

334 

335 :returns: False if status is equal to 'failed', 'cancelled', or 

336 'success'. True, otherwise 

337 """ 

338 return self.status in ['failed', 'cancelled', 'success'] 

339 

340 def add_associated_future(self, future): 

341 """Adds a future to be associated with the TransferFuture""" 

342 with self._associated_futures_lock: 

343 self._associated_futures.add(future) 

344 

345 def remove_associated_future(self, future): 

346 """Removes a future's association to the TransferFuture""" 

347 with self._associated_futures_lock: 

348 self._associated_futures.remove(future) 

349 

350 def add_done_callback(self, function, *args, **kwargs): 

351 """Add a done callback to be invoked when transfer is done""" 

352 with self._done_callbacks_lock: 

353 self._done_callbacks.append( 

354 FunctionContainer(function, *args, **kwargs) 

355 ) 

356 

357 def add_failure_cleanup(self, function, *args, **kwargs): 

358 """Adds a callback to call upon failure""" 

359 with self._failure_cleanups_lock: 

360 self._failure_cleanups.append( 

361 FunctionContainer(function, *args, **kwargs) 

362 ) 

363 

364 def announce_done(self): 

365 """Announce that future is done running and run associated callbacks 

366 

367 This will run any failure cleanups if the transfer failed if not 

368 they have not been run, allows the result() to be unblocked, and will 

369 run any done callbacks associated to the TransferFuture if they have 

370 not already been ran. 

371 """ 

372 if self.status != 'success': 

373 self._run_failure_cleanups() 

374 self._done_event.set() 

375 self._run_done_callbacks() 

376 

377 def _run_done_callbacks(self): 

378 # Run the callbacks and remove the callbacks from the internal 

379 # list so they do not get ran again if done is announced more than 

380 # once. 

381 with self._done_callbacks_lock: 

382 self._run_callbacks(self._done_callbacks) 

383 self._done_callbacks = [] 

384 

385 def _run_failure_cleanups(self): 

386 # Run the cleanup callbacks and remove the callbacks from the internal 

387 # list so they do not get ran again if done is announced more than 

388 # once. 

389 with self._failure_cleanups_lock: 

390 self._run_callbacks(self.failure_cleanups) 

391 self._failure_cleanups = [] 

392 

393 def _run_callbacks(self, callbacks): 

394 for callback in callbacks: 

395 self._run_callback(callback) 

396 

397 def _run_callback(self, callback): 

398 try: 

399 callback() 

400 # We do not want a callback interrupting the process, especially 

401 # in the failure cleanups. So log and catch, the exception. 

402 except Exception: 

403 logger.debug("Exception raised in %s." % callback, exc_info=True) 

404 

405 

406class BoundedExecutor: 

407 EXECUTOR_CLS = futures.ThreadPoolExecutor 

408 

409 def __init__( 

410 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None 

411 ): 

412 """An executor implementation that has a maximum queued up tasks 

413 

414 The executor will block if the number of tasks that have been 

415 submitted and is currently working on is past its maximum. 

416 

417 :params max_size: The maximum number of inflight futures. An inflight 

418 future means that the task is either queued up or is currently 

419 being executed. A size of None or 0 means that the executor will 

420 have no bound in terms of the number of inflight futures. 

421 

422 :params max_num_threads: The maximum number of threads the executor 

423 uses. 

424 

425 :type tag_semaphores: dict 

426 :params tag_semaphores: A dictionary where the key is the name of the 

427 tag and the value is the semaphore to use when limiting the 

428 number of tasks the executor is processing at a time. 

429 

430 :type executor_cls: BaseExecutor 

431 :param underlying_executor_cls: The executor class that 

432 get bounded by this executor. If None is provided, the 

433 concurrent.futures.ThreadPoolExecutor class is used. 

434 """ 

435 self._max_num_threads = max_num_threads 

436 if executor_cls is None: 

437 executor_cls = self.EXECUTOR_CLS 

438 self._executor = executor_cls(max_workers=self._max_num_threads) 

439 self._semaphore = TaskSemaphore(max_size) 

440 self._tag_semaphores = tag_semaphores 

441 

442 def submit(self, task, tag=None, block=True): 

443 """Submit a task to complete 

444 

445 :type task: s3transfer.tasks.Task 

446 :param task: The task to run __call__ on 

447 

448 

449 :type tag: s3transfer.futures.TaskTag 

450 :param tag: An optional tag to associate to the task. This 

451 is used to override which semaphore to use. 

452 

453 :type block: boolean 

454 :param block: True if to wait till it is possible to submit a task. 

455 False, if not to wait and raise an error if not able to submit 

456 a task. 

457 

458 :returns: The future associated to the submitted task 

459 """ 

460 semaphore = self._semaphore 

461 # If a tag was provided, use the semaphore associated to that 

462 # tag. 

463 if tag: 

464 semaphore = self._tag_semaphores[tag] 

465 

466 # Call acquire on the semaphore. 

467 acquire_token = semaphore.acquire(task.transfer_id, block) 

468 # Create a callback to invoke when task is done in order to call 

469 # release on the semaphore. 

470 release_callback = FunctionContainer( 

471 semaphore.release, task.transfer_id, acquire_token 

472 ) 

473 # Submit the task to the underlying executor. 

474 future = ExecutorFuture(self._executor.submit(task)) 

475 # Add the Semaphore.release() callback to the future such that 

476 # it is invoked once the future completes. 

477 future.add_done_callback(release_callback) 

478 return future 

479 

480 def shutdown(self, wait=True): 

481 self._executor.shutdown(wait) 

482 

483 

484class ExecutorFuture: 

485 def __init__(self, future): 

486 """A future returned from the executor 

487 

488 Currently, it is just a wrapper around a concurrent.futures.Future. 

489 However, this can eventually grow to implement the needed functionality 

490 of concurrent.futures.Future if we move off of the library and not 

491 affect the rest of the codebase. 

492 

493 :type future: concurrent.futures.Future 

494 :param future: The underlying future 

495 """ 

496 self._future = future 

497 

498 def result(self): 

499 return self._future.result() 

500 

501 def add_done_callback(self, fn): 

502 """Adds a callback to be completed once future is done 

503 

504 :param fn: A callable that takes no arguments. Note that is different 

505 than concurrent.futures.Future.add_done_callback that requires 

506 a single argument for the future. 

507 """ 

508 # The done callback for concurrent.futures.Future will always pass a 

509 # the future in as the only argument. So we need to create the 

510 # proper signature wrapper that will invoke the callback provided. 

511 def done_callback(future_passed_to_callback): 

512 return fn() 

513 

514 self._future.add_done_callback(done_callback) 

515 

516 def done(self): 

517 return self._future.done() 

518 

519 

520class BaseExecutor: 

521 """Base Executor class implementation needed to work with s3transfer""" 

522 

523 def __init__(self, max_workers=None): 

524 pass 

525 

526 def submit(self, fn, *args, **kwargs): 

527 raise NotImplementedError('submit()') 

528 

529 def shutdown(self, wait=True): 

530 raise NotImplementedError('shutdown()') 

531 

532 

533class NonThreadedExecutor(BaseExecutor): 

534 """A drop-in replacement non-threaded version of ThreadPoolExecutor""" 

535 

536 def submit(self, fn, *args, **kwargs): 

537 future = NonThreadedExecutorFuture() 

538 try: 

539 result = fn(*args, **kwargs) 

540 future.set_result(result) 

541 except Exception: 

542 e, tb = sys.exc_info()[1:] 

543 logger.debug( 

544 'Setting exception for %s to %s with traceback %s', 

545 future, 

546 e, 

547 tb, 

548 ) 

549 future.set_exception_info(e, tb) 

550 return future 

551 

552 def shutdown(self, wait=True): 

553 pass 

554 

555 

556class NonThreadedExecutorFuture: 

557 """The Future returned from NonThreadedExecutor 

558 

559 Note that this future is **not** thread-safe as it is being used 

560 from the context of a non-threaded environment. 

561 """ 

562 

563 def __init__(self): 

564 self._result = None 

565 self._exception = None 

566 self._traceback = None 

567 self._done = False 

568 self._done_callbacks = [] 

569 

570 def set_result(self, result): 

571 self._result = result 

572 self._set_done() 

573 

574 def set_exception_info(self, exception, traceback): 

575 self._exception = exception 

576 self._traceback = traceback 

577 self._set_done() 

578 

579 def result(self, timeout=None): 

580 if self._exception: 

581 raise self._exception.with_traceback(self._traceback) 

582 return self._result 

583 

584 def _set_done(self): 

585 self._done = True 

586 for done_callback in self._done_callbacks: 

587 self._invoke_done_callback(done_callback) 

588 self._done_callbacks = [] 

589 

590 def _invoke_done_callback(self, done_callback): 

591 return done_callback(self) 

592 

593 def done(self): 

594 return self._done 

595 

596 def add_done_callback(self, fn): 

597 if self._done: 

598 self._invoke_done_callback(fn) 

599 else: 

600 self._done_callbacks.append(fn) 

601 

602 

603TaskTag = namedtuple('TaskTag', ['name']) 

604 

605IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload') 

606IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')