Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/concurrent/futures/_base.py: 26%

302 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1# Copyright 2009 Brian Quinlan. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 

5 

6import collections 

7import logging 

8import threading 

9import time 

10import types 

11 

12FIRST_COMPLETED = 'FIRST_COMPLETED' 

13FIRST_EXCEPTION = 'FIRST_EXCEPTION' 

14ALL_COMPLETED = 'ALL_COMPLETED' 

15_AS_COMPLETED = '_AS_COMPLETED' 

16 

17# Possible future states (for internal use by the futures package). 

18PENDING = 'PENDING' 

19RUNNING = 'RUNNING' 

20# The future was cancelled by the user... 

21CANCELLED = 'CANCELLED' 

22# ...and _Waiter.add_cancelled() was called by a worker. 

23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 

24FINISHED = 'FINISHED' 

25 

26_FUTURE_STATES = [ 

27 PENDING, 

28 RUNNING, 

29 CANCELLED, 

30 CANCELLED_AND_NOTIFIED, 

31 FINISHED 

32] 

33 

34_STATE_TO_DESCRIPTION_MAP = { 

35 PENDING: "pending", 

36 RUNNING: "running", 

37 CANCELLED: "cancelled", 

38 CANCELLED_AND_NOTIFIED: "cancelled", 

39 FINISHED: "finished" 

40} 

41 

42# Logger for internal use by the futures package. 

43LOGGER = logging.getLogger("concurrent.futures") 

44 

45class Error(Exception): 

46 """Base class for all future-related exceptions.""" 

47 pass 

48 

49class CancelledError(Error): 

50 """The Future was cancelled.""" 

51 pass 

52 

53class TimeoutError(Error): 

54 """The operation exceeded the given deadline.""" 

55 pass 

56 

57class InvalidStateError(Error): 

58 """The operation is not allowed in this state.""" 

59 pass 

60 

61class _Waiter(object): 

62 """Provides the event that wait() and as_completed() block on.""" 

63 def __init__(self): 

64 self.event = threading.Event() 

65 self.finished_futures = [] 

66 

67 def add_result(self, future): 

68 self.finished_futures.append(future) 

69 

70 def add_exception(self, future): 

71 self.finished_futures.append(future) 

72 

73 def add_cancelled(self, future): 

74 self.finished_futures.append(future) 

75 

76class _AsCompletedWaiter(_Waiter): 

77 """Used by as_completed().""" 

78 

79 def __init__(self): 

80 super(_AsCompletedWaiter, self).__init__() 

81 self.lock = threading.Lock() 

82 

83 def add_result(self, future): 

84 with self.lock: 

85 super(_AsCompletedWaiter, self).add_result(future) 

86 self.event.set() 

87 

88 def add_exception(self, future): 

89 with self.lock: 

90 super(_AsCompletedWaiter, self).add_exception(future) 

91 self.event.set() 

92 

93 def add_cancelled(self, future): 

94 with self.lock: 

95 super(_AsCompletedWaiter, self).add_cancelled(future) 

96 self.event.set() 

97 

98class _FirstCompletedWaiter(_Waiter): 

99 """Used by wait(return_when=FIRST_COMPLETED).""" 

100 

101 def add_result(self, future): 

102 super().add_result(future) 

103 self.event.set() 

104 

105 def add_exception(self, future): 

106 super().add_exception(future) 

107 self.event.set() 

108 

109 def add_cancelled(self, future): 

110 super().add_cancelled(future) 

111 self.event.set() 

112 

113class _AllCompletedWaiter(_Waiter): 

114 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 

115 

116 def __init__(self, num_pending_calls, stop_on_exception): 

117 self.num_pending_calls = num_pending_calls 

118 self.stop_on_exception = stop_on_exception 

119 self.lock = threading.Lock() 

120 super().__init__() 

121 

122 def _decrement_pending_calls(self): 

123 with self.lock: 

124 self.num_pending_calls -= 1 

125 if not self.num_pending_calls: 

126 self.event.set() 

127 

128 def add_result(self, future): 

129 super().add_result(future) 

130 self._decrement_pending_calls() 

131 

132 def add_exception(self, future): 

133 super().add_exception(future) 

134 if self.stop_on_exception: 

135 self.event.set() 

136 else: 

137 self._decrement_pending_calls() 

138 

139 def add_cancelled(self, future): 

140 super().add_cancelled(future) 

141 self._decrement_pending_calls() 

142 

143class _AcquireFutures(object): 

144 """A context manager that does an ordered acquire of Future conditions.""" 

145 

146 def __init__(self, futures): 

147 self.futures = sorted(futures, key=id) 

148 

149 def __enter__(self): 

150 for future in self.futures: 

151 future._condition.acquire() 

152 

153 def __exit__(self, *args): 

154 for future in self.futures: 

155 future._condition.release() 

156 

157def _create_and_install_waiters(fs, return_when): 

158 if return_when == _AS_COMPLETED: 

159 waiter = _AsCompletedWaiter() 

160 elif return_when == FIRST_COMPLETED: 

161 waiter = _FirstCompletedWaiter() 

162 else: 

163 pending_count = sum( 

164 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 

165 

166 if return_when == FIRST_EXCEPTION: 

167 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 

168 elif return_when == ALL_COMPLETED: 

169 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 

170 else: 

171 raise ValueError("Invalid return condition: %r" % return_when) 

172 

173 for f in fs: 

174 f._waiters.append(waiter) 

175 

176 return waiter 

177 

178 

179def _yield_finished_futures(fs, waiter, ref_collect): 

180 """ 

181 Iterate on the list *fs*, yielding finished futures one by one in 

182 reverse order. 

183 Before yielding a future, *waiter* is removed from its waiters 

184 and the future is removed from each set in the collection of sets 

185 *ref_collect*. 

186 

187 The aim of this function is to avoid keeping stale references after 

188 the future is yielded and before the iterator resumes. 

189 """ 

190 while fs: 

191 f = fs[-1] 

192 for futures_set in ref_collect: 

193 futures_set.remove(f) 

194 with f._condition: 

195 f._waiters.remove(waiter) 

196 del f 

197 # Careful not to keep a reference to the popped value 

198 yield fs.pop() 

199 

200 

201def as_completed(fs, timeout=None): 

202 """An iterator over the given futures that yields each as it completes. 

203 

204 Args: 

205 fs: The sequence of Futures (possibly created by different Executors) to 

206 iterate over. 

207 timeout: The maximum number of seconds to wait. If None, then there 

208 is no limit on the wait time. 

209 

210 Returns: 

211 An iterator that yields the given Futures as they complete (finished or 

212 cancelled). If any given Futures are duplicated, they will be returned 

213 once. 

214 

215 Raises: 

216 TimeoutError: If the entire result iterator could not be generated 

217 before the given timeout. 

218 """ 

219 if timeout is not None: 

220 end_time = timeout + time.monotonic() 

221 

222 fs = set(fs) 

223 total_futures = len(fs) 

224 with _AcquireFutures(fs): 

225 finished = set( 

226 f for f in fs 

227 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 

228 pending = fs - finished 

229 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 

230 finished = list(finished) 

231 try: 

232 yield from _yield_finished_futures(finished, waiter, 

233 ref_collect=(fs,)) 

234 

235 while pending: 

236 if timeout is None: 

237 wait_timeout = None 

238 else: 

239 wait_timeout = end_time - time.monotonic() 

240 if wait_timeout < 0: 

241 raise TimeoutError( 

242 '%d (of %d) futures unfinished' % ( 

243 len(pending), total_futures)) 

244 

245 waiter.event.wait(wait_timeout) 

246 

247 with waiter.lock: 

248 finished = waiter.finished_futures 

249 waiter.finished_futures = [] 

250 waiter.event.clear() 

251 

252 # reverse to keep finishing order 

253 finished.reverse() 

254 yield from _yield_finished_futures(finished, waiter, 

255 ref_collect=(fs, pending)) 

256 

257 finally: 

258 # Remove waiter from unfinished futures 

259 for f in fs: 

260 with f._condition: 

261 f._waiters.remove(waiter) 

262 

263DoneAndNotDoneFutures = collections.namedtuple( 

264 'DoneAndNotDoneFutures', 'done not_done') 

265def wait(fs, timeout=None, return_when=ALL_COMPLETED): 

266 """Wait for the futures in the given sequence to complete. 

267 

268 Args: 

269 fs: The sequence of Futures (possibly created by different Executors) to 

270 wait upon. 

271 timeout: The maximum number of seconds to wait. If None, then there 

272 is no limit on the wait time. 

273 return_when: Indicates when this function should return. The options 

274 are: 

275 

276 FIRST_COMPLETED - Return when any future finishes or is 

277 cancelled. 

278 FIRST_EXCEPTION - Return when any future finishes by raising an 

279 exception. If no future raises an exception 

280 then it is equivalent to ALL_COMPLETED. 

281 ALL_COMPLETED - Return when all futures finish or are cancelled. 

282 

283 Returns: 

284 A named 2-tuple of sets. The first set, named 'done', contains the 

285 futures that completed (is finished or cancelled) before the wait 

286 completed. The second set, named 'not_done', contains uncompleted 

287 futures. 

288 """ 

289 with _AcquireFutures(fs): 

290 done = set(f for f in fs 

291 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 

292 not_done = set(fs) - done 

293 

294 if (return_when == FIRST_COMPLETED) and done: 

295 return DoneAndNotDoneFutures(done, not_done) 

296 elif (return_when == FIRST_EXCEPTION) and done: 

297 if any(f for f in done 

298 if not f.cancelled() and f.exception() is not None): 

299 return DoneAndNotDoneFutures(done, not_done) 

300 

301 if len(done) == len(fs): 

302 return DoneAndNotDoneFutures(done, not_done) 

303 

304 waiter = _create_and_install_waiters(fs, return_when) 

305 

306 waiter.event.wait(timeout) 

307 for f in fs: 

308 with f._condition: 

309 f._waiters.remove(waiter) 

310 

311 done.update(waiter.finished_futures) 

312 return DoneAndNotDoneFutures(done, set(fs) - done) 

313 

314class Future(object): 

315 """Represents the result of an asynchronous computation.""" 

316 

317 def __init__(self): 

318 """Initializes the future. Should not be called by clients.""" 

319 self._condition = threading.Condition() 

320 self._state = PENDING 

321 self._result = None 

322 self._exception = None 

323 self._waiters = [] 

324 self._done_callbacks = [] 

325 

326 def _invoke_callbacks(self): 

327 for callback in self._done_callbacks: 

328 try: 

329 callback(self) 

330 except Exception: 

331 LOGGER.exception('exception calling callback for %r', self) 

332 

333 def __repr__(self): 

334 with self._condition: 

335 if self._state == FINISHED: 

336 if self._exception: 

337 return '<%s at %#x state=%s raised %s>' % ( 

338 self.__class__.__name__, 

339 id(self), 

340 _STATE_TO_DESCRIPTION_MAP[self._state], 

341 self._exception.__class__.__name__) 

342 else: 

343 return '<%s at %#x state=%s returned %s>' % ( 

344 self.__class__.__name__, 

345 id(self), 

346 _STATE_TO_DESCRIPTION_MAP[self._state], 

347 self._result.__class__.__name__) 

348 return '<%s at %#x state=%s>' % ( 

349 self.__class__.__name__, 

350 id(self), 

351 _STATE_TO_DESCRIPTION_MAP[self._state]) 

352 

353 def cancel(self): 

354 """Cancel the future if possible. 

355 

356 Returns True if the future was cancelled, False otherwise. A future 

357 cannot be cancelled if it is running or has already completed. 

358 """ 

359 with self._condition: 

360 if self._state in [RUNNING, FINISHED]: 

361 return False 

362 

363 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

364 return True 

365 

366 self._state = CANCELLED 

367 self._condition.notify_all() 

368 

369 self._invoke_callbacks() 

370 return True 

371 

372 def cancelled(self): 

373 """Return True if the future was cancelled.""" 

374 with self._condition: 

375 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 

376 

377 def running(self): 

378 """Return True if the future is currently executing.""" 

379 with self._condition: 

380 return self._state == RUNNING 

381 

382 def done(self): 

383 """Return True of the future was cancelled or finished executing.""" 

384 with self._condition: 

385 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 

386 

387 def __get_result(self): 

388 if self._exception: 

389 try: 

390 raise self._exception 

391 finally: 

392 # Break a reference cycle with the exception in self._exception 

393 self = None 

394 else: 

395 return self._result 

396 

397 def add_done_callback(self, fn): 

398 """Attaches a callable that will be called when the future finishes. 

399 

400 Args: 

401 fn: A callable that will be called with this future as its only 

402 argument when the future completes or is cancelled. The callable 

403 will always be called by a thread in the same process in which 

404 it was added. If the future has already completed or been 

405 cancelled then the callable will be called immediately. These 

406 callables are called in the order that they were added. 

407 """ 

408 with self._condition: 

409 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 

410 self._done_callbacks.append(fn) 

411 return 

412 try: 

413 fn(self) 

414 except Exception: 

415 LOGGER.exception('exception calling callback for %r', self) 

416 

417 def result(self, timeout=None): 

418 """Return the result of the call that the future represents. 

419 

420 Args: 

421 timeout: The number of seconds to wait for the result if the future 

422 isn't done. If None, then there is no limit on the wait time. 

423 

424 Returns: 

425 The result of the call that the future represents. 

426 

427 Raises: 

428 CancelledError: If the future was cancelled. 

429 TimeoutError: If the future didn't finish executing before the given 

430 timeout. 

431 Exception: If the call raised then that exception will be raised. 

432 """ 

433 try: 

434 with self._condition: 

435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

436 raise CancelledError() 

437 elif self._state == FINISHED: 

438 return self.__get_result() 

439 

440 self._condition.wait(timeout) 

441 

442 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

443 raise CancelledError() 

444 elif self._state == FINISHED: 

445 return self.__get_result() 

446 else: 

447 raise TimeoutError() 

448 finally: 

449 # Break a reference cycle with the exception in self._exception 

450 self = None 

451 

452 def exception(self, timeout=None): 

453 """Return the exception raised by the call that the future represents. 

454 

455 Args: 

456 timeout: The number of seconds to wait for the exception if the 

457 future isn't done. If None, then there is no limit on the wait 

458 time. 

459 

460 Returns: 

461 The exception raised by the call that the future represents or None 

462 if the call completed without raising. 

463 

464 Raises: 

465 CancelledError: If the future was cancelled. 

466 TimeoutError: If the future didn't finish executing before the given 

467 timeout. 

468 """ 

469 

470 with self._condition: 

471 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

472 raise CancelledError() 

473 elif self._state == FINISHED: 

474 return self._exception 

475 

476 self._condition.wait(timeout) 

477 

478 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

479 raise CancelledError() 

480 elif self._state == FINISHED: 

481 return self._exception 

482 else: 

483 raise TimeoutError() 

484 

485 # The following methods should only be used by Executors and in tests. 

486 def set_running_or_notify_cancel(self): 

487 """Mark the future as running or process any cancel notifications. 

488 

489 Should only be used by Executor implementations and unit tests. 

490 

491 If the future has been cancelled (cancel() was called and returned 

492 True) then any threads waiting on the future completing (though calls 

493 to as_completed() or wait()) are notified and False is returned. 

494 

495 If the future was not cancelled then it is put in the running state 

496 (future calls to running() will return True) and True is returned. 

497 

498 This method should be called by Executor implementations before 

499 executing the work associated with this future. If this method returns 

500 False then the work should not be executed. 

501 

502 Returns: 

503 False if the Future was cancelled, True otherwise. 

504 

505 Raises: 

506 RuntimeError: if this method was already called or if set_result() 

507 or set_exception() was called. 

508 """ 

509 with self._condition: 

510 if self._state == CANCELLED: 

511 self._state = CANCELLED_AND_NOTIFIED 

512 for waiter in self._waiters: 

513 waiter.add_cancelled(self) 

514 # self._condition.notify_all() is not necessary because 

515 # self.cancel() triggers a notification. 

516 return False 

517 elif self._state == PENDING: 

518 self._state = RUNNING 

519 return True 

520 else: 

521 LOGGER.critical('Future %s in unexpected state: %s', 

522 id(self), 

523 self._state) 

524 raise RuntimeError('Future in unexpected state') 

525 

526 def set_result(self, result): 

527 """Sets the return value of work associated with the future. 

528 

529 Should only be used by Executor implementations and unit tests. 

530 """ 

531 with self._condition: 

532 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 

533 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 

534 self._result = result 

535 self._state = FINISHED 

536 for waiter in self._waiters: 

537 waiter.add_result(self) 

538 self._condition.notify_all() 

539 self._invoke_callbacks() 

540 

541 def set_exception(self, exception): 

542 """Sets the result of the future as being the given exception. 

543 

544 Should only be used by Executor implementations and unit tests. 

545 """ 

546 with self._condition: 

547 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 

548 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 

549 self._exception = exception 

550 self._state = FINISHED 

551 for waiter in self._waiters: 

552 waiter.add_exception(self) 

553 self._condition.notify_all() 

554 self._invoke_callbacks() 

555 

556 __class_getitem__ = classmethod(types.GenericAlias) 

557 

558class Executor(object): 

559 """This is an abstract base class for concrete asynchronous executors.""" 

560 

561 def submit(self, fn, /, *args, **kwargs): 

562 """Submits a callable to be executed with the given arguments. 

563 

564 Schedules the callable to be executed as fn(*args, **kwargs) and returns 

565 a Future instance representing the execution of the callable. 

566 

567 Returns: 

568 A Future representing the given call. 

569 """ 

570 raise NotImplementedError() 

571 

572 def map(self, fn, *iterables, timeout=None, chunksize=1): 

573 """Returns an iterator equivalent to map(fn, iter). 

574 

575 Args: 

576 fn: A callable that will take as many arguments as there are 

577 passed iterables. 

578 timeout: The maximum number of seconds to wait. If None, then there 

579 is no limit on the wait time. 

580 chunksize: The size of the chunks the iterable will be broken into 

581 before being passed to a child process. This argument is only 

582 used by ProcessPoolExecutor; it is ignored by 

583 ThreadPoolExecutor. 

584 

585 Returns: 

586 An iterator equivalent to: map(func, *iterables) but the calls may 

587 be evaluated out-of-order. 

588 

589 Raises: 

590 TimeoutError: If the entire result iterator could not be generated 

591 before the given timeout. 

592 Exception: If fn(*args) raises for any values. 

593 """ 

594 if timeout is not None: 

595 end_time = timeout + time.monotonic() 

596 

597 fs = [self.submit(fn, *args) for args in zip(*iterables)] 

598 

599 # Yield must be hidden in closure so that the futures are submitted 

600 # before the first iterator value is required. 

601 def result_iterator(): 

602 try: 

603 # reverse to keep finishing order 

604 fs.reverse() 

605 while fs: 

606 # Careful not to keep a reference to the popped future 

607 if timeout is None: 

608 yield fs.pop().result() 

609 else: 

610 yield fs.pop().result(end_time - time.monotonic()) 

611 finally: 

612 for future in fs: 

613 future.cancel() 

614 return result_iterator() 

615 

616 def shutdown(self, wait=True, *, cancel_futures=False): 

617 """Clean-up the resources associated with the Executor. 

618 

619 It is safe to call this method several times. Otherwise, no other 

620 methods can be called after this one. 

621 

622 Args: 

623 wait: If True then shutdown will not return until all running 

624 futures have finished executing and the resources used by the 

625 executor have been reclaimed. 

626 cancel_futures: If True then shutdown will cancel all pending 

627 futures. Futures that are completed or running will not be 

628 cancelled. 

629 """ 

630 pass 

631 

632 def __enter__(self): 

633 return self 

634 

635 def __exit__(self, exc_type, exc_val, exc_tb): 

636 self.shutdown(wait=True) 

637 return False 

638 

639 

640class BrokenExecutor(RuntimeError): 

641 """ 

642 Raised when a executor has become non-functional after a severe failure. 

643 """