Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/concurrent/futures/_base.py: 25%

305 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Copyright 2009 Brian Quinlan. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 

5 

6import collections 

7import logging 

8import threading 

9import time 

10 

11FIRST_COMPLETED = 'FIRST_COMPLETED' 

12FIRST_EXCEPTION = 'FIRST_EXCEPTION' 

13ALL_COMPLETED = 'ALL_COMPLETED' 

14_AS_COMPLETED = '_AS_COMPLETED' 

15 

16# Possible future states (for internal use by the futures package). 

17PENDING = 'PENDING' 

18RUNNING = 'RUNNING' 

19# The future was cancelled by the user... 

20CANCELLED = 'CANCELLED' 

21# ...and _Waiter.add_cancelled() was called by a worker. 

22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 

23FINISHED = 'FINISHED' 

24 

25_FUTURE_STATES = [ 

26 PENDING, 

27 RUNNING, 

28 CANCELLED, 

29 CANCELLED_AND_NOTIFIED, 

30 FINISHED 

31] 

32 

33_STATE_TO_DESCRIPTION_MAP = { 

34 PENDING: "pending", 

35 RUNNING: "running", 

36 CANCELLED: "cancelled", 

37 CANCELLED_AND_NOTIFIED: "cancelled", 

38 FINISHED: "finished" 

39} 

40 

41# Logger for internal use by the futures package. 

42LOGGER = logging.getLogger("concurrent.futures") 

43 

44class Error(Exception): 

45 """Base class for all future-related exceptions.""" 

46 pass 

47 

48class CancelledError(Error): 

49 """The Future was cancelled.""" 

50 pass 

51 

52class TimeoutError(Error): 

53 """The operation exceeded the given deadline.""" 

54 pass 

55 

56class InvalidStateError(Error): 

57 """The operation is not allowed in this state.""" 

58 pass 

59 

60class _Waiter(object): 

61 """Provides the event that wait() and as_completed() block on.""" 

62 def __init__(self): 

63 self.event = threading.Event() 

64 self.finished_futures = [] 

65 

66 def add_result(self, future): 

67 self.finished_futures.append(future) 

68 

69 def add_exception(self, future): 

70 self.finished_futures.append(future) 

71 

72 def add_cancelled(self, future): 

73 self.finished_futures.append(future) 

74 

75class _AsCompletedWaiter(_Waiter): 

76 """Used by as_completed().""" 

77 

78 def __init__(self): 

79 super(_AsCompletedWaiter, self).__init__() 

80 self.lock = threading.Lock() 

81 

82 def add_result(self, future): 

83 with self.lock: 

84 super(_AsCompletedWaiter, self).add_result(future) 

85 self.event.set() 

86 

87 def add_exception(self, future): 

88 with self.lock: 

89 super(_AsCompletedWaiter, self).add_exception(future) 

90 self.event.set() 

91 

92 def add_cancelled(self, future): 

93 with self.lock: 

94 super(_AsCompletedWaiter, self).add_cancelled(future) 

95 self.event.set() 

96 

97class _FirstCompletedWaiter(_Waiter): 

98 """Used by wait(return_when=FIRST_COMPLETED).""" 

99 

100 def add_result(self, future): 

101 super().add_result(future) 

102 self.event.set() 

103 

104 def add_exception(self, future): 

105 super().add_exception(future) 

106 self.event.set() 

107 

108 def add_cancelled(self, future): 

109 super().add_cancelled(future) 

110 self.event.set() 

111 

112class _AllCompletedWaiter(_Waiter): 

113 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 

114 

115 def __init__(self, num_pending_calls, stop_on_exception): 

116 self.num_pending_calls = num_pending_calls 

117 self.stop_on_exception = stop_on_exception 

118 self.lock = threading.Lock() 

119 super().__init__() 

120 

121 def _decrement_pending_calls(self): 

122 with self.lock: 

123 self.num_pending_calls -= 1 

124 if not self.num_pending_calls: 

125 self.event.set() 

126 

127 def add_result(self, future): 

128 super().add_result(future) 

129 self._decrement_pending_calls() 

130 

131 def add_exception(self, future): 

132 super().add_exception(future) 

133 if self.stop_on_exception: 

134 self.event.set() 

135 else: 

136 self._decrement_pending_calls() 

137 

138 def add_cancelled(self, future): 

139 super().add_cancelled(future) 

140 self._decrement_pending_calls() 

141 

142class _AcquireFutures(object): 

143 """A context manager that does an ordered acquire of Future conditions.""" 

144 

145 def __init__(self, futures): 

146 self.futures = sorted(futures, key=id) 

147 

148 def __enter__(self): 

149 for future in self.futures: 

150 future._condition.acquire() 

151 

152 def __exit__(self, *args): 

153 for future in self.futures: 

154 future._condition.release() 

155 

156def _create_and_install_waiters(fs, return_when): 

157 if return_when == _AS_COMPLETED: 

158 waiter = _AsCompletedWaiter() 

159 elif return_when == FIRST_COMPLETED: 

160 waiter = _FirstCompletedWaiter() 

161 else: 

162 pending_count = sum( 

163 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 

164 

165 if return_when == FIRST_EXCEPTION: 

166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 

167 elif return_when == ALL_COMPLETED: 

168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 

169 else: 

170 raise ValueError("Invalid return condition: %r" % return_when) 

171 

172 for f in fs: 

173 f._waiters.append(waiter) 

174 

175 return waiter 

176 

177 

178def _yield_finished_futures(fs, waiter, ref_collect): 

179 """ 

180 Iterate on the list *fs*, yielding finished futures one by one in 

181 reverse order. 

182 Before yielding a future, *waiter* is removed from its waiters 

183 and the future is removed from each set in the collection of sets 

184 *ref_collect*. 

185 

186 The aim of this function is to avoid keeping stale references after 

187 the future is yielded and before the iterator resumes. 

188 """ 

189 while fs: 

190 f = fs[-1] 

191 for futures_set in ref_collect: 

192 futures_set.remove(f) 

193 with f._condition: 

194 f._waiters.remove(waiter) 

195 del f 

196 # Careful not to keep a reference to the popped value 

197 yield fs.pop() 

198 

199 

200def as_completed(fs, timeout=None): 

201 """An iterator over the given futures that yields each as it completes. 

202 

203 Args: 

204 fs: The sequence of Futures (possibly created by different Executors) to 

205 iterate over. 

206 timeout: The maximum number of seconds to wait. If None, then there 

207 is no limit on the wait time. 

208 

209 Returns: 

210 An iterator that yields the given Futures as they complete (finished or 

211 cancelled). If any given Futures are duplicated, they will be returned 

212 once. 

213 

214 Raises: 

215 TimeoutError: If the entire result iterator could not be generated 

216 before the given timeout. 

217 """ 

218 if timeout is not None: 

219 end_time = timeout + time.monotonic() 

220 

221 fs = set(fs) 

222 total_futures = len(fs) 

223 with _AcquireFutures(fs): 

224 finished = set( 

225 f for f in fs 

226 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 

227 pending = fs - finished 

228 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 

229 finished = list(finished) 

230 try: 

231 yield from _yield_finished_futures(finished, waiter, 

232 ref_collect=(fs,)) 

233 

234 while pending: 

235 if timeout is None: 

236 wait_timeout = None 

237 else: 

238 wait_timeout = end_time - time.monotonic() 

239 if wait_timeout < 0: 

240 raise TimeoutError( 

241 '%d (of %d) futures unfinished' % ( 

242 len(pending), total_futures)) 

243 

244 waiter.event.wait(wait_timeout) 

245 

246 with waiter.lock: 

247 finished = waiter.finished_futures 

248 waiter.finished_futures = [] 

249 waiter.event.clear() 

250 

251 # reverse to keep finishing order 

252 finished.reverse() 

253 yield from _yield_finished_futures(finished, waiter, 

254 ref_collect=(fs, pending)) 

255 

256 finally: 

257 # Remove waiter from unfinished futures 

258 for f in fs: 

259 with f._condition: 

260 f._waiters.remove(waiter) 

261 

262DoneAndNotDoneFutures = collections.namedtuple( 

263 'DoneAndNotDoneFutures', 'done not_done') 

264def wait(fs, timeout=None, return_when=ALL_COMPLETED): 

265 """Wait for the futures in the given sequence to complete. 

266 

267 Args: 

268 fs: The sequence of Futures (possibly created by different Executors) to 

269 wait upon. 

270 timeout: The maximum number of seconds to wait. If None, then there 

271 is no limit on the wait time. 

272 return_when: Indicates when this function should return. The options 

273 are: 

274 

275 FIRST_COMPLETED - Return when any future finishes or is 

276 cancelled. 

277 FIRST_EXCEPTION - Return when any future finishes by raising an 

278 exception. If no future raises an exception 

279 then it is equivalent to ALL_COMPLETED. 

280 ALL_COMPLETED - Return when all futures finish or are cancelled. 

281 

282 Returns: 

283 A named 2-tuple of sets. The first set, named 'done', contains the 

284 futures that completed (is finished or cancelled) before the wait 

285 completed. The second set, named 'not_done', contains uncompleted 

286 futures. 

287 """ 

288 with _AcquireFutures(fs): 

289 done = set(f for f in fs 

290 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 

291 not_done = set(fs) - done 

292 

293 if (return_when == FIRST_COMPLETED) and done: 

294 return DoneAndNotDoneFutures(done, not_done) 

295 elif (return_when == FIRST_EXCEPTION) and done: 

296 if any(f for f in done 

297 if not f.cancelled() and f.exception() is not None): 

298 return DoneAndNotDoneFutures(done, not_done) 

299 

300 if len(done) == len(fs): 

301 return DoneAndNotDoneFutures(done, not_done) 

302 

303 waiter = _create_and_install_waiters(fs, return_when) 

304 

305 waiter.event.wait(timeout) 

306 for f in fs: 

307 with f._condition: 

308 f._waiters.remove(waiter) 

309 

310 done.update(waiter.finished_futures) 

311 return DoneAndNotDoneFutures(done, set(fs) - done) 

312 

313class Future(object): 

314 """Represents the result of an asynchronous computation.""" 

315 

316 def __init__(self): 

317 """Initializes the future. Should not be called by clients.""" 

318 self._condition = threading.Condition() 

319 self._state = PENDING 

320 self._result = None 

321 self._exception = None 

322 self._waiters = [] 

323 self._done_callbacks = [] 

324 

325 def _invoke_callbacks(self): 

326 for callback in self._done_callbacks: 

327 try: 

328 callback(self) 

329 except Exception: 

330 LOGGER.exception('exception calling callback for %r', self) 

331 

332 def __repr__(self): 

333 with self._condition: 

334 if self._state == FINISHED: 

335 if self._exception: 

336 return '<%s at %#x state=%s raised %s>' % ( 

337 self.__class__.__name__, 

338 id(self), 

339 _STATE_TO_DESCRIPTION_MAP[self._state], 

340 self._exception.__class__.__name__) 

341 else: 

342 return '<%s at %#x state=%s returned %s>' % ( 

343 self.__class__.__name__, 

344 id(self), 

345 _STATE_TO_DESCRIPTION_MAP[self._state], 

346 self._result.__class__.__name__) 

347 return '<%s at %#x state=%s>' % ( 

348 self.__class__.__name__, 

349 id(self), 

350 _STATE_TO_DESCRIPTION_MAP[self._state]) 

351 

352 def cancel(self): 

353 """Cancel the future if possible. 

354 

355 Returns True if the future was cancelled, False otherwise. A future 

356 cannot be cancelled if it is running or has already completed. 

357 """ 

358 with self._condition: 

359 if self._state in [RUNNING, FINISHED]: 

360 return False 

361 

362 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

363 return True 

364 

365 self._state = CANCELLED 

366 self._condition.notify_all() 

367 

368 self._invoke_callbacks() 

369 return True 

370 

371 def cancelled(self): 

372 """Return True if the future was cancelled.""" 

373 with self._condition: 

374 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 

375 

376 def running(self): 

377 """Return True if the future is currently executing.""" 

378 with self._condition: 

379 return self._state == RUNNING 

380 

381 def done(self): 

382 """Return True of the future was cancelled or finished executing.""" 

383 with self._condition: 

384 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 

385 

386 def __get_result(self): 

387 if self._exception: 

388 raise self._exception 

389 else: 

390 return self._result 

391 

392 def add_done_callback(self, fn): 

393 """Attaches a callable that will be called when the future finishes. 

394 

395 Args: 

396 fn: A callable that will be called with this future as its only 

397 argument when the future completes or is cancelled. The callable 

398 will always be called by a thread in the same process in which 

399 it was added. If the future has already completed or been 

400 cancelled then the callable will be called immediately. These 

401 callables are called in the order that they were added. 

402 """ 

403 with self._condition: 

404 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 

405 self._done_callbacks.append(fn) 

406 return 

407 try: 

408 fn(self) 

409 except Exception: 

410 LOGGER.exception('exception calling callback for %r', self) 

411 

412 def result(self, timeout=None): 

413 """Return the result of the call that the future represents. 

414 

415 Args: 

416 timeout: The number of seconds to wait for the result if the future 

417 isn't done. If None, then there is no limit on the wait time. 

418 

419 Returns: 

420 The result of the call that the future represents. 

421 

422 Raises: 

423 CancelledError: If the future was cancelled. 

424 TimeoutError: If the future didn't finish executing before the given 

425 timeout. 

426 Exception: If the call raised then that exception will be raised. 

427 """ 

428 with self._condition: 

429 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

430 raise CancelledError() 

431 elif self._state == FINISHED: 

432 return self.__get_result() 

433 

434 self._condition.wait(timeout) 

435 

436 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

437 raise CancelledError() 

438 elif self._state == FINISHED: 

439 return self.__get_result() 

440 else: 

441 raise TimeoutError() 

442 

443 def exception(self, timeout=None): 

444 """Return the exception raised by the call that the future represents. 

445 

446 Args: 

447 timeout: The number of seconds to wait for the exception if the 

448 future isn't done. If None, then there is no limit on the wait 

449 time. 

450 

451 Returns: 

452 The exception raised by the call that the future represents or None 

453 if the call completed without raising. 

454 

455 Raises: 

456 CancelledError: If the future was cancelled. 

457 TimeoutError: If the future didn't finish executing before the given 

458 timeout. 

459 """ 

460 

461 with self._condition: 

462 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

463 raise CancelledError() 

464 elif self._state == FINISHED: 

465 return self._exception 

466 

467 self._condition.wait(timeout) 

468 

469 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 

470 raise CancelledError() 

471 elif self._state == FINISHED: 

472 return self._exception 

473 else: 

474 raise TimeoutError() 

475 

476 # The following methods should only be used by Executors and in tests. 

477 def set_running_or_notify_cancel(self): 

478 """Mark the future as running or process any cancel notifications. 

479 

480 Should only be used by Executor implementations and unit tests. 

481 

482 If the future has been cancelled (cancel() was called and returned 

483 True) then any threads waiting on the future completing (though calls 

484 to as_completed() or wait()) are notified and False is returned. 

485 

486 If the future was not cancelled then it is put in the running state 

487 (future calls to running() will return True) and True is returned. 

488 

489 This method should be called by Executor implementations before 

490 executing the work associated with this future. If this method returns 

491 False then the work should not be executed. 

492 

493 Returns: 

494 False if the Future was cancelled, True otherwise. 

495 

496 Raises: 

497 RuntimeError: if this method was already called or if set_result() 

498 or set_exception() was called. 

499 """ 

500 with self._condition: 

501 if self._state == CANCELLED: 

502 self._state = CANCELLED_AND_NOTIFIED 

503 for waiter in self._waiters: 

504 waiter.add_cancelled(self) 

505 # self._condition.notify_all() is not necessary because 

506 # self.cancel() triggers a notification. 

507 return False 

508 elif self._state == PENDING: 

509 self._state = RUNNING 

510 return True 

511 else: 

512 LOGGER.critical('Future %s in unexpected state: %s', 

513 id(self), 

514 self._state) 

515 raise RuntimeError('Future in unexpected state') 

516 

517 def set_result(self, result): 

518 """Sets the return value of work associated with the future. 

519 

520 Should only be used by Executor implementations and unit tests. 

521 """ 

522 with self._condition: 

523 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 

524 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 

525 self._result = result 

526 self._state = FINISHED 

527 for waiter in self._waiters: 

528 waiter.add_result(self) 

529 self._condition.notify_all() 

530 self._invoke_callbacks() 

531 

532 def set_exception(self, exception): 

533 """Sets the result of the future as being the given exception. 

534 

535 Should only be used by Executor implementations and unit tests. 

536 """ 

537 with self._condition: 

538 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 

539 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 

540 self._exception = exception 

541 self._state = FINISHED 

542 for waiter in self._waiters: 

543 waiter.add_exception(self) 

544 self._condition.notify_all() 

545 self._invoke_callbacks() 

546 

547class Executor(object): 

548 """This is an abstract base class for concrete asynchronous executors.""" 

549 

550 def submit(*args, **kwargs): 

551 """Submits a callable to be executed with the given arguments. 

552 

553 Schedules the callable to be executed as fn(*args, **kwargs) and returns 

554 a Future instance representing the execution of the callable. 

555 

556 Returns: 

557 A Future representing the given call. 

558 """ 

559 if len(args) >= 2: 

560 pass 

561 elif not args: 

562 raise TypeError("descriptor 'submit' of 'Executor' object " 

563 "needs an argument") 

564 elif 'fn' in kwargs: 

565 import warnings 

566 warnings.warn("Passing 'fn' as keyword argument is deprecated", 

567 DeprecationWarning, stacklevel=2) 

568 else: 

569 raise TypeError('submit expected at least 1 positional argument, ' 

570 'got %d' % (len(args)-1)) 

571 

572 raise NotImplementedError() 

573 submit.__text_signature__ = '($self, fn, /, *args, **kwargs)' 

574 

575 def map(self, fn, *iterables, timeout=None, chunksize=1): 

576 """Returns an iterator equivalent to map(fn, iter). 

577 

578 Args: 

579 fn: A callable that will take as many arguments as there are 

580 passed iterables. 

581 timeout: The maximum number of seconds to wait. If None, then there 

582 is no limit on the wait time. 

583 chunksize: The size of the chunks the iterable will be broken into 

584 before being passed to a child process. This argument is only 

585 used by ProcessPoolExecutor; it is ignored by 

586 ThreadPoolExecutor. 

587 

588 Returns: 

589 An iterator equivalent to: map(func, *iterables) but the calls may 

590 be evaluated out-of-order. 

591 

592 Raises: 

593 TimeoutError: If the entire result iterator could not be generated 

594 before the given timeout. 

595 Exception: If fn(*args) raises for any values. 

596 """ 

597 if timeout is not None: 

598 end_time = timeout + time.monotonic() 

599 

600 fs = [self.submit(fn, *args) for args in zip(*iterables)] 

601 

602 # Yield must be hidden in closure so that the futures are submitted 

603 # before the first iterator value is required. 

604 def result_iterator(): 

605 try: 

606 # reverse to keep finishing order 

607 fs.reverse() 

608 while fs: 

609 # Careful not to keep a reference to the popped future 

610 if timeout is None: 

611 yield fs.pop().result() 

612 else: 

613 yield fs.pop().result(end_time - time.monotonic()) 

614 finally: 

615 for future in fs: 

616 future.cancel() 

617 return result_iterator() 

618 

619 def shutdown(self, wait=True): 

620 """Clean-up the resources associated with the Executor. 

621 

622 It is safe to call this method several times. Otherwise, no other 

623 methods can be called after this one. 

624 

625 Args: 

626 wait: If True then shutdown will not return until all running 

627 futures have finished executing and the resources used by the 

628 executor have been reclaimed. 

629 """ 

630 pass 

631 

632 def __enter__(self): 

633 return self 

634 

635 def __exit__(self, exc_type, exc_val, exc_tb): 

636 self.shutdown(wait=True) 

637 return False 

638 

639 

640class BrokenExecutor(RuntimeError): 

641 """ 

642 Raised when a executor has become non-functional after a severe failure. 

643 """