Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/futures.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

277 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import logging 

15import sys 

16import threading 

17from collections import namedtuple 

18from concurrent import futures 

19 

20from s3transfer.compat import MAXINT 

21from s3transfer.exceptions import CancelledError, TransferNotDoneError 

22from s3transfer.utils import FunctionContainer, TaskSemaphore 

23 

24try: 

25 from botocore.context import get_context 

26except ImportError: 

27 

28 def get_context(): 

29 return None 

30 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35class BaseTransferFuture: 

36 @property 

37 def meta(self): 

38 """The metadata associated to the TransferFuture""" 

39 raise NotImplementedError('meta') 

40 

41 def done(self): 

42 """Determines if a TransferFuture has completed 

43 

44 :returns: True if completed. False, otherwise. 

45 """ 

46 raise NotImplementedError('done()') 

47 

48 def result(self): 

49 """Waits until TransferFuture is done and returns the result 

50 

51 If the TransferFuture succeeded, it will return the result. If the 

52 TransferFuture failed, it will raise the exception associated to the 

53 failure. 

54 """ 

55 raise NotImplementedError('result()') 

56 

57 def cancel(self): 

58 """Cancels the request associated with the TransferFuture""" 

59 raise NotImplementedError('cancel()') 

60 

61 

62class BaseTransferMeta: 

63 @property 

64 def call_args(self): 

65 """The call args used in the transfer request""" 

66 raise NotImplementedError('call_args') 

67 

68 @property 

69 def transfer_id(self): 

70 """The unique id of the transfer""" 

71 raise NotImplementedError('transfer_id') 

72 

73 @property 

74 def user_context(self): 

75 """A dictionary that requesters can store data in""" 

76 raise NotImplementedError('user_context') 

77 

78 

79class TransferFuture(BaseTransferFuture): 

80 def __init__(self, meta=None, coordinator=None): 

81 """The future associated to a submitted transfer request 

82 

83 :type meta: TransferMeta 

84 :param meta: The metadata associated to the request. This object 

85 is visible to the requester. 

86 

87 :type coordinator: TransferCoordinator 

88 :param coordinator: The coordinator associated to the request. This 

89 object is not visible to the requester. 

90 """ 

91 self._meta = meta 

92 if meta is None: 

93 self._meta = TransferMeta() 

94 

95 self._coordinator = coordinator 

96 if coordinator is None: 

97 self._coordinator = TransferCoordinator() 

98 

99 @property 

100 def meta(self): 

101 return self._meta 

102 

103 def done(self): 

104 return self._coordinator.done() 

105 

106 def result(self): 

107 try: 

108 # Usually the result() method blocks until the transfer is done, 

109 # however if a KeyboardInterrupt is raised we want want to exit 

110 # out of this and propagate the exception. 

111 return self._coordinator.result() 

112 except KeyboardInterrupt as e: 

113 self.cancel() 

114 raise e 

115 

116 def cancel(self): 

117 self._coordinator.cancel() 

118 

119 def set_exception(self, exception): 

120 """Sets the exception on the future.""" 

121 if not self.done(): 

122 raise TransferNotDoneError( 

123 'set_exception can only be called once the transfer is ' 

124 'complete.' 

125 ) 

126 self._coordinator.set_exception(exception, override=True) 

127 

128 

129class TransferMeta(BaseTransferMeta): 

130 """Holds metadata about the TransferFuture""" 

131 

132 def __init__(self, call_args=None, transfer_id=None): 

133 self._call_args = call_args 

134 self._transfer_id = transfer_id 

135 self._size = None 

136 self._user_context = {} 

137 self._etag = None 

138 

139 @property 

140 def call_args(self): 

141 """The call args used in the transfer request""" 

142 return self._call_args 

143 

144 @property 

145 def transfer_id(self): 

146 """The unique id of the transfer""" 

147 return self._transfer_id 

148 

149 @property 

150 def size(self): 

151 """The size of the transfer request if known""" 

152 return self._size 

153 

154 @property 

155 def user_context(self): 

156 """A dictionary that requesters can store data in""" 

157 return self._user_context 

158 

159 @property 

160 def etag(self): 

161 """The etag of the stored object for validating multipart downloads""" 

162 return self._etag 

163 

164 def provide_transfer_size(self, size): 

165 """A method to provide the size of a transfer request 

166 

167 By providing this value, the TransferManager will not try to 

168 call HeadObject or use the use OS to determine the size of the 

169 transfer. 

170 """ 

171 self._size = size 

172 

173 def provide_object_etag(self, etag): 

174 """A method to provide the etag of a transfer request 

175 

176 By providing this value, the TransferManager will validate 

177 multipart downloads by supplying an IfMatch parameter with 

178 the etag as the value to GetObject requests. 

179 """ 

180 self._etag = etag 

181 

182 

183class TransferCoordinator: 

184 """A helper class for managing TransferFuture""" 

185 

186 def __init__(self, transfer_id=None): 

187 self.transfer_id = transfer_id 

188 self._status = 'not-started' 

189 self._result = None 

190 self._exception = None 

191 self._associated_futures = set() 

192 self._failure_cleanups = [] 

193 self._done_callbacks = [] 

194 self._done_event = threading.Event() 

195 self._lock = threading.Lock() 

196 self._associated_futures_lock = threading.Lock() 

197 self._done_callbacks_lock = threading.Lock() 

198 self._failure_cleanups_lock = threading.Lock() 

199 

200 def __repr__(self): 

201 return f'{self.__class__.__name__}(transfer_id={self.transfer_id})' 

202 

203 @property 

204 def exception(self): 

205 return self._exception 

206 

207 @property 

208 def associated_futures(self): 

209 """The list of futures associated to the inprogress TransferFuture 

210 

211 Once the transfer finishes this list becomes empty as the transfer 

212 is considered done and there should be no running futures left. 

213 """ 

214 with self._associated_futures_lock: 

215 # We return a copy of the list because we do not want to 

216 # processing the returned list while another thread is adding 

217 # more futures to the actual list. 

218 return copy.copy(self._associated_futures) 

219 

220 @property 

221 def failure_cleanups(self): 

222 """The list of callbacks to call when the TransferFuture fails""" 

223 return self._failure_cleanups 

224 

225 @property 

226 def status(self): 

227 """The status of the TransferFuture 

228 

229 The currently supported states are: 

230 * not-started - Has yet to start. If in this state, a transfer 

231 can be canceled immediately and nothing will happen. 

232 * queued - SubmissionTask is about to submit tasks 

233 * running - Is inprogress. In-progress as of now means that 

234 the SubmissionTask that runs the transfer is being executed. So 

235 there is no guarantee any transfer requests had been made to 

236 S3 if this state is reached. 

237 * cancelled - Was cancelled 

238 * failed - An exception other than CancelledError was thrown 

239 * success - No exceptions were thrown and is done. 

240 """ 

241 return self._status 

242 

243 def set_result(self, result): 

244 """Set a result for the TransferFuture 

245 

246 Implies that the TransferFuture succeeded. This will always set a 

247 result because it is invoked on the final task where there is only 

248 ever one final task and it is ran at the very end of a transfer 

249 process. So if a result is being set for this final task, the transfer 

250 succeeded even if something came a long and canceled the transfer 

251 on the final task. 

252 """ 

253 with self._lock: 

254 self._exception = None 

255 self._result = result 

256 self._status = 'success' 

257 

258 def set_exception(self, exception, override=False): 

259 """Set an exception for the TransferFuture 

260 

261 Implies the TransferFuture failed. 

262 

263 :param exception: The exception that cause the transfer to fail. 

264 :param override: If True, override any existing state. 

265 """ 

266 with self._lock: 

267 if not self.done() or override: 

268 self._exception = exception 

269 self._status = 'failed' 

270 

271 def result(self): 

272 """Waits until TransferFuture is done and returns the result 

273 

274 If the TransferFuture succeeded, it will return the result. If the 

275 TransferFuture failed, it will raise the exception associated to the 

276 failure. 

277 """ 

278 # Doing a wait() with no timeout cannot be interrupted in python2 but 

279 # can be interrupted in python3 so we just wait with the largest 

280 # possible value integer value, which is on the scale of billions of 

281 # years... 

282 self._done_event.wait(MAXINT) 

283 

284 # Once done waiting, raise an exception if present or return the 

285 # final result. 

286 if self._exception: 

287 raise self._exception 

288 return self._result 

289 

290 def cancel(self, msg='', exc_type=CancelledError): 

291 """Cancels the TransferFuture 

292 

293 :param msg: The message to attach to the cancellation 

294 :param exc_type: The type of exception to set for the cancellation 

295 """ 

296 with self._lock: 

297 if not self.done(): 

298 should_announce_done = False 

299 logger.debug('%s cancel(%s) called', self, msg) 

300 self._exception = exc_type(msg) 

301 if self._status == 'not-started': 

302 should_announce_done = True 

303 self._status = 'cancelled' 

304 if should_announce_done: 

305 self.announce_done() 

306 

307 def set_status_to_queued(self): 

308 """Sets the TransferFutrue's status to running""" 

309 self._transition_to_non_done_state('queued') 

310 

311 def set_status_to_running(self): 

312 """Sets the TransferFuture's status to running""" 

313 self._transition_to_non_done_state('running') 

314 

315 def _transition_to_non_done_state(self, desired_state): 

316 with self._lock: 

317 if self.done(): 

318 raise RuntimeError( 

319 f'Unable to transition from done state {self.status} to non-done ' 

320 f'state {desired_state}.' 

321 ) 

322 self._status = desired_state 

323 

324 def submit(self, executor, task, tag=None): 

325 """Submits a task to a provided executor 

326 

327 :type executor: s3transfer.futures.BoundedExecutor 

328 :param executor: The executor to submit the callable to 

329 

330 :type task: s3transfer.tasks.Task 

331 :param task: The task to submit to the executor 

332 

333 :type tag: s3transfer.futures.TaskTag 

334 :param tag: A tag to associate to the submitted task 

335 

336 :rtype: concurrent.futures.Future 

337 :returns: A future representing the submitted task 

338 """ 

339 logger.debug( 

340 f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}." 

341 ) 

342 future = executor.submit(task, tag=tag) 

343 # Add this created future to the list of associated future just 

344 # in case it is needed during cleanups. 

345 self.add_associated_future(future) 

346 future.add_done_callback( 

347 FunctionContainer(self.remove_associated_future, future) 

348 ) 

349 return future 

350 

351 def done(self): 

352 """Determines if a TransferFuture has completed 

353 

354 :returns: False if status is equal to 'failed', 'cancelled', or 

355 'success'. True, otherwise 

356 """ 

357 return self.status in ['failed', 'cancelled', 'success'] 

358 

359 def add_associated_future(self, future): 

360 """Adds a future to be associated with the TransferFuture""" 

361 with self._associated_futures_lock: 

362 self._associated_futures.add(future) 

363 

364 def remove_associated_future(self, future): 

365 """Removes a future's association to the TransferFuture""" 

366 with self._associated_futures_lock: 

367 self._associated_futures.remove(future) 

368 

369 def add_done_callback(self, function, *args, **kwargs): 

370 """Add a done callback to be invoked when transfer is done""" 

371 with self._done_callbacks_lock: 

372 self._done_callbacks.append( 

373 FunctionContainer(function, *args, **kwargs) 

374 ) 

375 

376 def add_failure_cleanup(self, function, *args, **kwargs): 

377 """Adds a callback to call upon failure""" 

378 with self._failure_cleanups_lock: 

379 self._failure_cleanups.append( 

380 FunctionContainer(function, *args, **kwargs) 

381 ) 

382 

383 def announce_done(self): 

384 """Announce that future is done running and run associated callbacks 

385 

386 This will run any failure cleanups if the transfer failed if not 

387 they have not been run, allows the result() to be unblocked, and will 

388 run any done callbacks associated to the TransferFuture if they have 

389 not already been ran. 

390 """ 

391 if self.status != 'success': 

392 self._run_failure_cleanups() 

393 self._done_event.set() 

394 self._run_done_callbacks() 

395 

396 def _run_done_callbacks(self): 

397 # Run the callbacks and remove the callbacks from the internal 

398 # list so they do not get ran again if done is announced more than 

399 # once. 

400 with self._done_callbacks_lock: 

401 self._run_callbacks(self._done_callbacks) 

402 self._done_callbacks = [] 

403 

404 def _run_failure_cleanups(self): 

405 # Run the cleanup callbacks and remove the callbacks from the internal 

406 # list so they do not get ran again if done is announced more than 

407 # once. 

408 with self._failure_cleanups_lock: 

409 self._run_callbacks(self.failure_cleanups) 

410 self._failure_cleanups = [] 

411 

412 def _run_callbacks(self, callbacks): 

413 for callback in callbacks: 

414 self._run_callback(callback) 

415 

416 def _run_callback(self, callback): 

417 try: 

418 callback() 

419 # We do not want a callback interrupting the process, especially 

420 # in the failure cleanups. So log and catch, the exception. 

421 except Exception: 

422 logger.debug(f"Exception raised in {callback}.", exc_info=True) 

423 

424 

425class BoundedExecutor: 

426 EXECUTOR_CLS = futures.ThreadPoolExecutor 

427 

428 def __init__( 

429 self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None 

430 ): 

431 """An executor implementation that has a maximum queued up tasks 

432 

433 The executor will block if the number of tasks that have been 

434 submitted and is currently working on is past its maximum. 

435 

436 :params max_size: The maximum number of inflight futures. An inflight 

437 future means that the task is either queued up or is currently 

438 being executed. A size of None or 0 means that the executor will 

439 have no bound in terms of the number of inflight futures. 

440 

441 :params max_num_threads: The maximum number of threads the executor 

442 uses. 

443 

444 :type tag_semaphores: dict 

445 :params tag_semaphores: A dictionary where the key is the name of the 

446 tag and the value is the semaphore to use when limiting the 

447 number of tasks the executor is processing at a time. 

448 

449 :type executor_cls: BaseExecutor 

450 :param underlying_executor_cls: The executor class that 

451 get bounded by this executor. If None is provided, the 

452 concurrent.futures.ThreadPoolExecutor class is used. 

453 """ 

454 self._max_num_threads = max_num_threads 

455 if executor_cls is None: 

456 executor_cls = self.EXECUTOR_CLS 

457 self._executor = executor_cls(max_workers=self._max_num_threads) 

458 self._semaphore = TaskSemaphore(max_size) 

459 self._tag_semaphores = tag_semaphores 

460 

461 def submit(self, task, tag=None, block=True): 

462 """Submit a task to complete 

463 

464 :type task: s3transfer.tasks.Task 

465 :param task: The task to run __call__ on 

466 

467 

468 :type tag: s3transfer.futures.TaskTag 

469 :param tag: An optional tag to associate to the task. This 

470 is used to override which semaphore to use. 

471 

472 :type block: boolean 

473 :param block: True if to wait till it is possible to submit a task. 

474 False, if not to wait and raise an error if not able to submit 

475 a task. 

476 

477 :returns: The future associated to the submitted task 

478 """ 

479 semaphore = self._semaphore 

480 # If a tag was provided, use the semaphore associated to that 

481 # tag. 

482 if tag: 

483 semaphore = self._tag_semaphores[tag] 

484 

485 # Call acquire on the semaphore. 

486 acquire_token = semaphore.acquire(task.transfer_id, block) 

487 # Create a callback to invoke when task is done in order to call 

488 # release on the semaphore. 

489 release_callback = FunctionContainer( 

490 semaphore.release, task.transfer_id, acquire_token 

491 ) 

492 # Submit the task to the underlying executor. 

493 # Pass the current context to ensure child threads persist the 

494 # parent thread's context. 

495 future = ExecutorFuture(self._executor.submit(task, get_context())) 

496 # Add the Semaphore.release() callback to the future such that 

497 # it is invoked once the future completes. 

498 future.add_done_callback(release_callback) 

499 return future 

500 

501 def shutdown(self, wait=True): 

502 self._executor.shutdown(wait) 

503 

504 

505class ExecutorFuture: 

506 def __init__(self, future): 

507 """A future returned from the executor 

508 

509 Currently, it is just a wrapper around a concurrent.futures.Future. 

510 However, this can eventually grow to implement the needed functionality 

511 of concurrent.futures.Future if we move off of the library and not 

512 affect the rest of the codebase. 

513 

514 :type future: concurrent.futures.Future 

515 :param future: The underlying future 

516 """ 

517 self._future = future 

518 

519 def result(self): 

520 return self._future.result() 

521 

522 def add_done_callback(self, fn): 

523 """Adds a callback to be completed once future is done 

524 

525 :param fn: A callable that takes no arguments. Note that is different 

526 than concurrent.futures.Future.add_done_callback that requires 

527 a single argument for the future. 

528 """ 

529 

530 # The done callback for concurrent.futures.Future will always pass a 

531 # the future in as the only argument. So we need to create the 

532 # proper signature wrapper that will invoke the callback provided. 

533 def done_callback(future_passed_to_callback): 

534 return fn() 

535 

536 self._future.add_done_callback(done_callback) 

537 

538 def done(self): 

539 return self._future.done() 

540 

541 

542class BaseExecutor: 

543 """Base Executor class implementation needed to work with s3transfer""" 

544 

545 def __init__(self, max_workers=None): 

546 pass 

547 

548 def submit(self, fn, *args, **kwargs): 

549 raise NotImplementedError('submit()') 

550 

551 def shutdown(self, wait=True): 

552 raise NotImplementedError('shutdown()') 

553 

554 

555class NonThreadedExecutor(BaseExecutor): 

556 """A drop-in replacement non-threaded version of ThreadPoolExecutor""" 

557 

558 def submit(self, fn, *args, **kwargs): 

559 future = NonThreadedExecutorFuture() 

560 try: 

561 result = fn(*args, **kwargs) 

562 future.set_result(result) 

563 except Exception: 

564 e, tb = sys.exc_info()[1:] 

565 logger.debug( 

566 'Setting exception for %s to %s with traceback %s', 

567 future, 

568 e, 

569 tb, 

570 ) 

571 future.set_exception_info(e, tb) 

572 return future 

573 

574 def shutdown(self, wait=True): 

575 pass 

576 

577 

578class NonThreadedExecutorFuture: 

579 """The Future returned from NonThreadedExecutor 

580 

581 Note that this future is **not** thread-safe as it is being used 

582 from the context of a non-threaded environment. 

583 """ 

584 

585 def __init__(self): 

586 self._result = None 

587 self._exception = None 

588 self._traceback = None 

589 self._done = False 

590 self._done_callbacks = [] 

591 

592 def set_result(self, result): 

593 self._result = result 

594 self._set_done() 

595 

596 def set_exception_info(self, exception, traceback): 

597 self._exception = exception 

598 self._traceback = traceback 

599 self._set_done() 

600 

601 def result(self, timeout=None): 

602 if self._exception: 

603 raise self._exception.with_traceback(self._traceback) 

604 return self._result 

605 

606 def _set_done(self): 

607 self._done = True 

608 for done_callback in self._done_callbacks: 

609 self._invoke_done_callback(done_callback) 

610 self._done_callbacks = [] 

611 

612 def _invoke_done_callback(self, done_callback): 

613 return done_callback(self) 

614 

615 def done(self): 

616 return self._done 

617 

618 def add_done_callback(self, fn): 

619 if self._done: 

620 self._invoke_done_callback(fn) 

621 else: 

622 self._done_callbacks.append(fn) 

623 

624 

625TaskTag = namedtuple('TaskTag', ['name']) 

626 

627IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload') 

628IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')