Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/contextlib.py: 2%

307 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-20 07:00 +0000

1"""Utilities for with-statement contexts. See PEP 343.""" 

2import abc 

3import sys 

4import _collections_abc 

5from collections import deque 

6from functools import wraps 

7from types import MethodType, GenericAlias 

8 

9__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext", 

10 "AbstractContextManager", "AbstractAsyncContextManager", 

11 "AsyncExitStack", "ContextDecorator", "ExitStack", 

12 "redirect_stdout", "redirect_stderr", "suppress"] 

13 

14 

15class AbstractContextManager(abc.ABC): 

16 

17 """An abstract base class for context managers.""" 

18 

19 __class_getitem__ = classmethod(GenericAlias) 

20 

21 def __enter__(self): 

22 """Return `self` upon entering the runtime context.""" 

23 return self 

24 

25 @abc.abstractmethod 

26 def __exit__(self, exc_type, exc_value, traceback): 

27 """Raise any exception triggered within the runtime context.""" 

28 return None 

29 

30 @classmethod 

31 def __subclasshook__(cls, C): 

32 if cls is AbstractContextManager: 

33 return _collections_abc._check_methods(C, "__enter__", "__exit__") 

34 return NotImplemented 

35 

36 

37class AbstractAsyncContextManager(abc.ABC): 

38 

39 """An abstract base class for asynchronous context managers.""" 

40 

41 __class_getitem__ = classmethod(GenericAlias) 

42 

43 async def __aenter__(self): 

44 """Return `self` upon entering the runtime context.""" 

45 return self 

46 

47 @abc.abstractmethod 

48 async def __aexit__(self, exc_type, exc_value, traceback): 

49 """Raise any exception triggered within the runtime context.""" 

50 return None 

51 

52 @classmethod 

53 def __subclasshook__(cls, C): 

54 if cls is AbstractAsyncContextManager: 

55 return _collections_abc._check_methods(C, "__aenter__", 

56 "__aexit__") 

57 return NotImplemented 

58 

59 

60class ContextDecorator(object): 

61 "A base class or mixin that enables context managers to work as decorators." 

62 

63 def _recreate_cm(self): 

64 """Return a recreated instance of self. 

65 

66 Allows an otherwise one-shot context manager like 

67 _GeneratorContextManager to support use as 

68 a decorator via implicit recreation. 

69 

70 This is a private interface just for _GeneratorContextManager. 

71 See issue #11647 for details. 

72 """ 

73 return self 

74 

75 def __call__(self, func): 

76 @wraps(func) 

77 def inner(*args, **kwds): 

78 with self._recreate_cm(): 

79 return func(*args, **kwds) 

80 return inner 

81 

82 

83class _GeneratorContextManagerBase: 

84 """Shared functionality for @contextmanager and @asynccontextmanager.""" 

85 

86 def __init__(self, func, args, kwds): 

87 self.gen = func(*args, **kwds) 

88 self.func, self.args, self.kwds = func, args, kwds 

89 # Issue 19330: ensure context manager instances have good docstrings 

90 doc = getattr(func, "__doc__", None) 

91 if doc is None: 

92 doc = type(self).__doc__ 

93 self.__doc__ = doc 

94 # Unfortunately, this still doesn't provide good help output when 

95 # inspecting the created context manager instances, since pydoc 

96 # currently bypasses the instance docstring and shows the docstring 

97 # for the class instead. 

98 # See http://bugs.python.org/issue19404 for more details. 

99 

100 

101class _GeneratorContextManager(_GeneratorContextManagerBase, 

102 AbstractContextManager, 

103 ContextDecorator): 

104 """Helper for @contextmanager decorator.""" 

105 

106 def _recreate_cm(self): 

107 # _GCM instances are one-shot context managers, so the 

108 # CM must be recreated each time a decorated function is 

109 # called 

110 return self.__class__(self.func, self.args, self.kwds) 

111 

112 def __enter__(self): 

113 # do not keep args and kwds alive unnecessarily 

114 # they are only needed for recreation, which is not possible anymore 

115 del self.args, self.kwds, self.func 

116 try: 

117 return next(self.gen) 

118 except StopIteration: 

119 raise RuntimeError("generator didn't yield") from None 

120 

121 def __exit__(self, type, value, traceback): 

122 if type is None: 

123 try: 

124 next(self.gen) 

125 except StopIteration: 

126 return False 

127 else: 

128 raise RuntimeError("generator didn't stop") 

129 else: 

130 if value is None: 

131 # Need to force instantiation so we can reliably 

132 # tell if we get the same exception back 

133 value = type() 

134 try: 

135 self.gen.throw(type, value, traceback) 

136 except StopIteration as exc: 

137 # Suppress StopIteration *unless* it's the same exception that 

138 # was passed to throw(). This prevents a StopIteration 

139 # raised inside the "with" statement from being suppressed. 

140 return exc is not value 

141 except RuntimeError as exc: 

142 # Don't re-raise the passed in exception. (issue27122) 

143 if exc is value: 

144 return False 

145 # Likewise, avoid suppressing if a StopIteration exception 

146 # was passed to throw() and later wrapped into a RuntimeError 

147 # (see PEP 479). 

148 if type is StopIteration and exc.__cause__ is value: 

149 return False 

150 raise 

151 except: 

152 # only re-raise if it's *not* the exception that was 

153 # passed to throw(), because __exit__() must not raise 

154 # an exception unless __exit__() itself failed. But throw() 

155 # has to raise the exception to signal propagation, so this 

156 # fixes the impedance mismatch between the throw() protocol 

157 # and the __exit__() protocol. 

158 # 

159 # This cannot use 'except BaseException as exc' (as in the 

160 # async implementation) to maintain compatibility with 

161 # Python 2, where old-style class exceptions are not caught 

162 # by 'except BaseException'. 

163 if sys.exc_info()[1] is value: 

164 return False 

165 raise 

166 raise RuntimeError("generator didn't stop after throw()") 

167 

168 

169class _AsyncGeneratorContextManager(_GeneratorContextManagerBase, 

170 AbstractAsyncContextManager): 

171 """Helper for @asynccontextmanager.""" 

172 

173 async def __aenter__(self): 

174 try: 

175 return await self.gen.__anext__() 

176 except StopAsyncIteration: 

177 raise RuntimeError("generator didn't yield") from None 

178 

179 async def __aexit__(self, typ, value, traceback): 

180 if typ is None: 

181 try: 

182 await self.gen.__anext__() 

183 except StopAsyncIteration: 

184 return 

185 else: 

186 raise RuntimeError("generator didn't stop") 

187 else: 

188 if value is None: 

189 value = typ() 

190 # See _GeneratorContextManager.__exit__ for comments on subtleties 

191 # in this implementation 

192 try: 

193 await self.gen.athrow(typ, value, traceback) 

194 raise RuntimeError("generator didn't stop after athrow()") 

195 except StopAsyncIteration as exc: 

196 return exc is not value 

197 except RuntimeError as exc: 

198 if exc is value: 

199 return False 

200 # Avoid suppressing if a StopIteration exception 

201 # was passed to throw() and later wrapped into a RuntimeError 

202 # (see PEP 479 for sync generators; async generators also 

203 # have this behavior). But do this only if the exception wrapped 

204 # by the RuntimeError is actully Stop(Async)Iteration (see 

205 # issue29692). 

206 if isinstance(value, (StopIteration, StopAsyncIteration)): 

207 if exc.__cause__ is value: 

208 return False 

209 raise 

210 except BaseException as exc: 

211 if exc is not value: 

212 raise 

213 

214 

215def contextmanager(func): 

216 """@contextmanager decorator. 

217 

218 Typical usage: 

219 

220 @contextmanager 

221 def some_generator(<arguments>): 

222 <setup> 

223 try: 

224 yield <value> 

225 finally: 

226 <cleanup> 

227 

228 This makes this: 

229 

230 with some_generator(<arguments>) as <variable>: 

231 <body> 

232 

233 equivalent to this: 

234 

235 <setup> 

236 try: 

237 <variable> = <value> 

238 <body> 

239 finally: 

240 <cleanup> 

241 """ 

242 @wraps(func) 

243 def helper(*args, **kwds): 

244 return _GeneratorContextManager(func, args, kwds) 

245 return helper 

246 

247 

248def asynccontextmanager(func): 

249 """@asynccontextmanager decorator. 

250 

251 Typical usage: 

252 

253 @asynccontextmanager 

254 async def some_async_generator(<arguments>): 

255 <setup> 

256 try: 

257 yield <value> 

258 finally: 

259 <cleanup> 

260 

261 This makes this: 

262 

263 async with some_async_generator(<arguments>) as <variable>: 

264 <body> 

265 

266 equivalent to this: 

267 

268 <setup> 

269 try: 

270 <variable> = <value> 

271 <body> 

272 finally: 

273 <cleanup> 

274 """ 

275 @wraps(func) 

276 def helper(*args, **kwds): 

277 return _AsyncGeneratorContextManager(func, args, kwds) 

278 return helper 

279 

280 

281class closing(AbstractContextManager): 

282 """Context to automatically close something at the end of a block. 

283 

284 Code like this: 

285 

286 with closing(<module>.open(<arguments>)) as f: 

287 <block> 

288 

289 is equivalent to this: 

290 

291 f = <module>.open(<arguments>) 

292 try: 

293 <block> 

294 finally: 

295 f.close() 

296 

297 """ 

298 def __init__(self, thing): 

299 self.thing = thing 

300 def __enter__(self): 

301 return self.thing 

302 def __exit__(self, *exc_info): 

303 self.thing.close() 

304 

305 

306class _RedirectStream(AbstractContextManager): 

307 

308 _stream = None 

309 

310 def __init__(self, new_target): 

311 self._new_target = new_target 

312 # We use a list of old targets to make this CM re-entrant 

313 self._old_targets = [] 

314 

315 def __enter__(self): 

316 self._old_targets.append(getattr(sys, self._stream)) 

317 setattr(sys, self._stream, self._new_target) 

318 return self._new_target 

319 

320 def __exit__(self, exctype, excinst, exctb): 

321 setattr(sys, self._stream, self._old_targets.pop()) 

322 

323 

324class redirect_stdout(_RedirectStream): 

325 """Context manager for temporarily redirecting stdout to another file. 

326 

327 # How to send help() to stderr 

328 with redirect_stdout(sys.stderr): 

329 help(dir) 

330 

331 # How to write help() to a file 

332 with open('help.txt', 'w') as f: 

333 with redirect_stdout(f): 

334 help(pow) 

335 """ 

336 

337 _stream = "stdout" 

338 

339 

340class redirect_stderr(_RedirectStream): 

341 """Context manager for temporarily redirecting stderr to another file.""" 

342 

343 _stream = "stderr" 

344 

345 

346class suppress(AbstractContextManager): 

347 """Context manager to suppress specified exceptions 

348 

349 After the exception is suppressed, execution proceeds with the next 

350 statement following the with statement. 

351 

352 with suppress(FileNotFoundError): 

353 os.remove(somefile) 

354 # Execution still resumes here if the file was already removed 

355 """ 

356 

357 def __init__(self, *exceptions): 

358 self._exceptions = exceptions 

359 

360 def __enter__(self): 

361 pass 

362 

363 def __exit__(self, exctype, excinst, exctb): 

364 # Unlike isinstance and issubclass, CPython exception handling 

365 # currently only looks at the concrete type hierarchy (ignoring 

366 # the instance and subclass checking hooks). While Guido considers 

367 # that a bug rather than a feature, it's a fairly hard one to fix 

368 # due to various internal implementation details. suppress provides 

369 # the simpler issubclass based semantics, rather than trying to 

370 # exactly reproduce the limitations of the CPython interpreter. 

371 # 

372 # See http://bugs.python.org/issue12029 for more details 

373 return exctype is not None and issubclass(exctype, self._exceptions) 

374 

375 

376class _BaseExitStack: 

377 """A base class for ExitStack and AsyncExitStack.""" 

378 

379 @staticmethod 

380 def _create_exit_wrapper(cm, cm_exit): 

381 return MethodType(cm_exit, cm) 

382 

383 @staticmethod 

384 def _create_cb_wrapper(callback, /, *args, **kwds): 

385 def _exit_wrapper(exc_type, exc, tb): 

386 callback(*args, **kwds) 

387 return _exit_wrapper 

388 

389 def __init__(self): 

390 self._exit_callbacks = deque() 

391 

392 def pop_all(self): 

393 """Preserve the context stack by transferring it to a new instance.""" 

394 new_stack = type(self)() 

395 new_stack._exit_callbacks = self._exit_callbacks 

396 self._exit_callbacks = deque() 

397 return new_stack 

398 

399 def push(self, exit): 

400 """Registers a callback with the standard __exit__ method signature. 

401 

402 Can suppress exceptions the same way __exit__ method can. 

403 Also accepts any object with an __exit__ method (registering a call 

404 to the method instead of the object itself). 

405 """ 

406 # We use an unbound method rather than a bound method to follow 

407 # the standard lookup behaviour for special methods. 

408 _cb_type = type(exit) 

409 

410 try: 

411 exit_method = _cb_type.__exit__ 

412 except AttributeError: 

413 # Not a context manager, so assume it's a callable. 

414 self._push_exit_callback(exit) 

415 else: 

416 self._push_cm_exit(exit, exit_method) 

417 return exit # Allow use as a decorator. 

418 

419 def enter_context(self, cm): 

420 """Enters the supplied context manager. 

421 

422 If successful, also pushes its __exit__ method as a callback and 

423 returns the result of the __enter__ method. 

424 """ 

425 # We look up the special methods on the type to match the with 

426 # statement. 

427 _cm_type = type(cm) 

428 _exit = _cm_type.__exit__ 

429 result = _cm_type.__enter__(cm) 

430 self._push_cm_exit(cm, _exit) 

431 return result 

432 

433 def callback(self, callback, /, *args, **kwds): 

434 """Registers an arbitrary callback and arguments. 

435 

436 Cannot suppress exceptions. 

437 """ 

438 _exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds) 

439 

440 # We changed the signature, so using @wraps is not appropriate, but 

441 # setting __wrapped__ may still help with introspection. 

442 _exit_wrapper.__wrapped__ = callback 

443 self._push_exit_callback(_exit_wrapper) 

444 return callback # Allow use as a decorator 

445 

446 def _push_cm_exit(self, cm, cm_exit): 

447 """Helper to correctly register callbacks to __exit__ methods.""" 

448 _exit_wrapper = self._create_exit_wrapper(cm, cm_exit) 

449 self._push_exit_callback(_exit_wrapper, True) 

450 

451 def _push_exit_callback(self, callback, is_sync=True): 

452 self._exit_callbacks.append((is_sync, callback)) 

453 

454 

455# Inspired by discussions on http://bugs.python.org/issue13585 

456class ExitStack(_BaseExitStack, AbstractContextManager): 

457 """Context manager for dynamic management of a stack of exit callbacks. 

458 

459 For example: 

460 with ExitStack() as stack: 

461 files = [stack.enter_context(open(fname)) for fname in filenames] 

462 # All opened files will automatically be closed at the end of 

463 # the with statement, even if attempts to open files later 

464 # in the list raise an exception. 

465 """ 

466 

467 def __enter__(self): 

468 return self 

469 

470 def __exit__(self, *exc_details): 

471 received_exc = exc_details[0] is not None 

472 

473 # We manipulate the exception state so it behaves as though 

474 # we were actually nesting multiple with statements 

475 frame_exc = sys.exc_info()[1] 

476 def _fix_exception_context(new_exc, old_exc): 

477 # Context may not be correct, so find the end of the chain 

478 while 1: 

479 exc_context = new_exc.__context__ 

480 if exc_context is old_exc: 

481 # Context is already set correctly (see issue 20317) 

482 return 

483 if exc_context is None or exc_context is frame_exc: 

484 break 

485 new_exc = exc_context 

486 # Change the end of the chain to point to the exception 

487 # we expect it to reference 

488 new_exc.__context__ = old_exc 

489 

490 # Callbacks are invoked in LIFO order to match the behaviour of 

491 # nested context managers 

492 suppressed_exc = False 

493 pending_raise = False 

494 while self._exit_callbacks: 

495 is_sync, cb = self._exit_callbacks.pop() 

496 assert is_sync 

497 try: 

498 if cb(*exc_details): 

499 suppressed_exc = True 

500 pending_raise = False 

501 exc_details = (None, None, None) 

502 except: 

503 new_exc_details = sys.exc_info() 

504 # simulate the stack of exceptions by setting the context 

505 _fix_exception_context(new_exc_details[1], exc_details[1]) 

506 pending_raise = True 

507 exc_details = new_exc_details 

508 if pending_raise: 

509 try: 

510 # bare "raise exc_details[1]" replaces our carefully 

511 # set-up context 

512 fixed_ctx = exc_details[1].__context__ 

513 raise exc_details[1] 

514 except BaseException: 

515 exc_details[1].__context__ = fixed_ctx 

516 raise 

517 return received_exc and suppressed_exc 

518 

519 def close(self): 

520 """Immediately unwind the context stack.""" 

521 self.__exit__(None, None, None) 

522 

523 

524# Inspired by discussions on https://bugs.python.org/issue29302 

525class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager): 

526 """Async context manager for dynamic management of a stack of exit 

527 callbacks. 

528 

529 For example: 

530 async with AsyncExitStack() as stack: 

531 connections = [await stack.enter_async_context(get_connection()) 

532 for i in range(5)] 

533 # All opened connections will automatically be released at the 

534 # end of the async with statement, even if attempts to open a 

535 # connection later in the list raise an exception. 

536 """ 

537 

538 @staticmethod 

539 def _create_async_exit_wrapper(cm, cm_exit): 

540 return MethodType(cm_exit, cm) 

541 

542 @staticmethod 

543 def _create_async_cb_wrapper(callback, /, *args, **kwds): 

544 async def _exit_wrapper(exc_type, exc, tb): 

545 await callback(*args, **kwds) 

546 return _exit_wrapper 

547 

548 async def enter_async_context(self, cm): 

549 """Enters the supplied async context manager. 

550 

551 If successful, also pushes its __aexit__ method as a callback and 

552 returns the result of the __aenter__ method. 

553 """ 

554 _cm_type = type(cm) 

555 _exit = _cm_type.__aexit__ 

556 result = await _cm_type.__aenter__(cm) 

557 self._push_async_cm_exit(cm, _exit) 

558 return result 

559 

560 def push_async_exit(self, exit): 

561 """Registers a coroutine function with the standard __aexit__ method 

562 signature. 

563 

564 Can suppress exceptions the same way __aexit__ method can. 

565 Also accepts any object with an __aexit__ method (registering a call 

566 to the method instead of the object itself). 

567 """ 

568 _cb_type = type(exit) 

569 try: 

570 exit_method = _cb_type.__aexit__ 

571 except AttributeError: 

572 # Not an async context manager, so assume it's a coroutine function 

573 self._push_exit_callback(exit, False) 

574 else: 

575 self._push_async_cm_exit(exit, exit_method) 

576 return exit # Allow use as a decorator 

577 

578 def push_async_callback(self, callback, /, *args, **kwds): 

579 """Registers an arbitrary coroutine function and arguments. 

580 

581 Cannot suppress exceptions. 

582 """ 

583 _exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds) 

584 

585 # We changed the signature, so using @wraps is not appropriate, but 

586 # setting __wrapped__ may still help with introspection. 

587 _exit_wrapper.__wrapped__ = callback 

588 self._push_exit_callback(_exit_wrapper, False) 

589 return callback # Allow use as a decorator 

590 

591 async def aclose(self): 

592 """Immediately unwind the context stack.""" 

593 await self.__aexit__(None, None, None) 

594 

595 def _push_async_cm_exit(self, cm, cm_exit): 

596 """Helper to correctly register coroutine function to __aexit__ 

597 method.""" 

598 _exit_wrapper = self._create_async_exit_wrapper(cm, cm_exit) 

599 self._push_exit_callback(_exit_wrapper, False) 

600 

601 async def __aenter__(self): 

602 return self 

603 

604 async def __aexit__(self, *exc_details): 

605 received_exc = exc_details[0] is not None 

606 

607 # We manipulate the exception state so it behaves as though 

608 # we were actually nesting multiple with statements 

609 frame_exc = sys.exc_info()[1] 

610 def _fix_exception_context(new_exc, old_exc): 

611 # Context may not be correct, so find the end of the chain 

612 while 1: 

613 exc_context = new_exc.__context__ 

614 if exc_context is old_exc: 

615 # Context is already set correctly (see issue 20317) 

616 return 

617 if exc_context is None or exc_context is frame_exc: 

618 break 

619 new_exc = exc_context 

620 # Change the end of the chain to point to the exception 

621 # we expect it to reference 

622 new_exc.__context__ = old_exc 

623 

624 # Callbacks are invoked in LIFO order to match the behaviour of 

625 # nested context managers 

626 suppressed_exc = False 

627 pending_raise = False 

628 while self._exit_callbacks: 

629 is_sync, cb = self._exit_callbacks.pop() 

630 try: 

631 if is_sync: 

632 cb_suppress = cb(*exc_details) 

633 else: 

634 cb_suppress = await cb(*exc_details) 

635 

636 if cb_suppress: 

637 suppressed_exc = True 

638 pending_raise = False 

639 exc_details = (None, None, None) 

640 except: 

641 new_exc_details = sys.exc_info() 

642 # simulate the stack of exceptions by setting the context 

643 _fix_exception_context(new_exc_details[1], exc_details[1]) 

644 pending_raise = True 

645 exc_details = new_exc_details 

646 if pending_raise: 

647 try: 

648 # bare "raise exc_details[1]" replaces our carefully 

649 # set-up context 

650 fixed_ctx = exc_details[1].__context__ 

651 raise exc_details[1] 

652 except BaseException: 

653 exc_details[1].__context__ = fixed_ctx 

654 raise 

655 return received_exc and suppressed_exc 

656 

657 

658class nullcontext(AbstractContextManager): 

659 """Context manager that does no additional processing. 

660 

661 Used as a stand-in for a normal context manager, when a particular 

662 block of code is only sometimes used with a normal context manager: 

663 

664 cm = optional_cm if condition else nullcontext() 

665 with cm: 

666 # Perform operation, using optional_cm if condition is True 

667 """ 

668 

669 def __init__(self, enter_result=None): 

670 self.enter_result = enter_result 

671 

672 def __enter__(self): 

673 return self.enter_result 

674 

675 def __exit__(self, *excinfo): 

676 pass