Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/contextlib.py: 14%

327 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Utilities for with-statement contexts. See PEP 343.""" 

2import abc 

3import sys 

4import _collections_abc 

5from collections import deque 

6from functools import wraps 

7from types import MethodType 

8 

9__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext", 

10 "AbstractContextManager", "AbstractAsyncContextManager", 

11 "AsyncExitStack", "ContextDecorator", "ExitStack", 

12 "redirect_stdout", "redirect_stderr", "suppress"] 

13 

14 

15class AbstractContextManager(abc.ABC): 

16 

17 """An abstract base class for context managers.""" 

18 

19 def __enter__(self): 

20 """Return `self` upon entering the runtime context.""" 

21 return self 

22 

23 @abc.abstractmethod 

24 def __exit__(self, exc_type, exc_value, traceback): 

25 """Raise any exception triggered within the runtime context.""" 

26 return None 

27 

28 @classmethod 

29 def __subclasshook__(cls, C): 

30 if cls is AbstractContextManager: 

31 return _collections_abc._check_methods(C, "__enter__", "__exit__") 

32 return NotImplemented 

33 

34 

35class AbstractAsyncContextManager(abc.ABC): 

36 

37 """An abstract base class for asynchronous context managers.""" 

38 

39 async def __aenter__(self): 

40 """Return `self` upon entering the runtime context.""" 

41 return self 

42 

43 @abc.abstractmethod 

44 async def __aexit__(self, exc_type, exc_value, traceback): 

45 """Raise any exception triggered within the runtime context.""" 

46 return None 

47 

48 @classmethod 

49 def __subclasshook__(cls, C): 

50 if cls is AbstractAsyncContextManager: 

51 return _collections_abc._check_methods(C, "__aenter__", 

52 "__aexit__") 

53 return NotImplemented 

54 

55 

56class ContextDecorator(object): 

57 "A base class or mixin that enables context managers to work as decorators." 

58 

59 def _recreate_cm(self): 

60 """Return a recreated instance of self. 

61 

62 Allows an otherwise one-shot context manager like 

63 _GeneratorContextManager to support use as 

64 a decorator via implicit recreation. 

65 

66 This is a private interface just for _GeneratorContextManager. 

67 See issue #11647 for details. 

68 """ 

69 return self 

70 

71 def __call__(self, func): 

72 @wraps(func) 

73 def inner(*args, **kwds): 

74 with self._recreate_cm(): 

75 return func(*args, **kwds) 

76 return inner 

77 

78 

79class _GeneratorContextManagerBase: 

80 """Shared functionality for @contextmanager and @asynccontextmanager.""" 

81 

82 def __init__(self, func, args, kwds): 

83 self.gen = func(*args, **kwds) 

84 self.func, self.args, self.kwds = func, args, kwds 

85 # Issue 19330: ensure context manager instances have good docstrings 

86 doc = getattr(func, "__doc__", None) 

87 if doc is None: 

88 doc = type(self).__doc__ 

89 self.__doc__ = doc 

90 # Unfortunately, this still doesn't provide good help output when 

91 # inspecting the created context manager instances, since pydoc 

92 # currently bypasses the instance docstring and shows the docstring 

93 # for the class instead. 

94 # See http://bugs.python.org/issue19404 for more details. 

95 

96 

97class _GeneratorContextManager(_GeneratorContextManagerBase, 

98 AbstractContextManager, 

99 ContextDecorator): 

100 """Helper for @contextmanager decorator.""" 

101 

102 def _recreate_cm(self): 

103 # _GCM instances are one-shot context managers, so the 

104 # CM must be recreated each time a decorated function is 

105 # called 

106 return self.__class__(self.func, self.args, self.kwds) 

107 

108 def __enter__(self): 

109 # do not keep args and kwds alive unnecessarily 

110 # they are only needed for recreation, which is not possible anymore 

111 del self.args, self.kwds, self.func 

112 try: 

113 return next(self.gen) 

114 except StopIteration: 

115 raise RuntimeError("generator didn't yield") from None 

116 

117 def __exit__(self, type, value, traceback): 

118 if type is None: 

119 try: 

120 next(self.gen) 

121 except StopIteration: 

122 return False 

123 else: 

124 raise RuntimeError("generator didn't stop") 

125 else: 

126 if value is None: 

127 # Need to force instantiation so we can reliably 

128 # tell if we get the same exception back 

129 value = type() 

130 try: 

131 self.gen.throw(type, value, traceback) 

132 except StopIteration as exc: 

133 # Suppress StopIteration *unless* it's the same exception that 

134 # was passed to throw(). This prevents a StopIteration 

135 # raised inside the "with" statement from being suppressed. 

136 return exc is not value 

137 except RuntimeError as exc: 

138 # Don't re-raise the passed in exception. (issue27122) 

139 if exc is value: 

140 return False 

141 # Likewise, avoid suppressing if a StopIteration exception 

142 # was passed to throw() and later wrapped into a RuntimeError 

143 # (see PEP 479). 

144 if type is StopIteration and exc.__cause__ is value: 

145 return False 

146 raise 

147 except: 

148 # only re-raise if it's *not* the exception that was 

149 # passed to throw(), because __exit__() must not raise 

150 # an exception unless __exit__() itself failed. But throw() 

151 # has to raise the exception to signal propagation, so this 

152 # fixes the impedance mismatch between the throw() protocol 

153 # and the __exit__() protocol. 

154 # 

155 # This cannot use 'except BaseException as exc' (as in the 

156 # async implementation) to maintain compatibility with 

157 # Python 2, where old-style class exceptions are not caught 

158 # by 'except BaseException'. 

159 if sys.exc_info()[1] is value: 

160 return False 

161 raise 

162 raise RuntimeError("generator didn't stop after throw()") 

163 

164 

165class _AsyncGeneratorContextManager(_GeneratorContextManagerBase, 

166 AbstractAsyncContextManager): 

167 """Helper for @asynccontextmanager.""" 

168 

169 async def __aenter__(self): 

170 try: 

171 return await self.gen.__anext__() 

172 except StopAsyncIteration: 

173 raise RuntimeError("generator didn't yield") from None 

174 

175 async def __aexit__(self, typ, value, traceback): 

176 if typ is None: 

177 try: 

178 await self.gen.__anext__() 

179 except StopAsyncIteration: 

180 return 

181 else: 

182 raise RuntimeError("generator didn't stop") 

183 else: 

184 if value is None: 

185 value = typ() 

186 # See _GeneratorContextManager.__exit__ for comments on subtleties 

187 # in this implementation 

188 try: 

189 await self.gen.athrow(typ, value, traceback) 

190 raise RuntimeError("generator didn't stop after athrow()") 

191 except StopAsyncIteration as exc: 

192 return exc is not value 

193 except RuntimeError as exc: 

194 if exc is value: 

195 return False 

196 # Avoid suppressing if a StopIteration exception 

197 # was passed to throw() and later wrapped into a RuntimeError 

198 # (see PEP 479 for sync generators; async generators also 

199 # have this behavior). But do this only if the exception wrapped 

200 # by the RuntimeError is actully Stop(Async)Iteration (see 

201 # issue29692). 

202 if isinstance(value, (StopIteration, StopAsyncIteration)): 

203 if exc.__cause__ is value: 

204 return False 

205 raise 

206 except BaseException as exc: 

207 if exc is not value: 

208 raise 

209 

210 

211def contextmanager(func): 

212 """@contextmanager decorator. 

213 

214 Typical usage: 

215 

216 @contextmanager 

217 def some_generator(<arguments>): 

218 <setup> 

219 try: 

220 yield <value> 

221 finally: 

222 <cleanup> 

223 

224 This makes this: 

225 

226 with some_generator(<arguments>) as <variable>: 

227 <body> 

228 

229 equivalent to this: 

230 

231 <setup> 

232 try: 

233 <variable> = <value> 

234 <body> 

235 finally: 

236 <cleanup> 

237 """ 

238 @wraps(func) 

239 def helper(*args, **kwds): 

240 return _GeneratorContextManager(func, args, kwds) 

241 return helper 

242 

243 

244def asynccontextmanager(func): 

245 """@asynccontextmanager decorator. 

246 

247 Typical usage: 

248 

249 @asynccontextmanager 

250 async def some_async_generator(<arguments>): 

251 <setup> 

252 try: 

253 yield <value> 

254 finally: 

255 <cleanup> 

256 

257 This makes this: 

258 

259 async with some_async_generator(<arguments>) as <variable>: 

260 <body> 

261 

262 equivalent to this: 

263 

264 <setup> 

265 try: 

266 <variable> = <value> 

267 <body> 

268 finally: 

269 <cleanup> 

270 """ 

271 @wraps(func) 

272 def helper(*args, **kwds): 

273 return _AsyncGeneratorContextManager(func, args, kwds) 

274 return helper 

275 

276 

277class closing(AbstractContextManager): 

278 """Context to automatically close something at the end of a block. 

279 

280 Code like this: 

281 

282 with closing(<module>.open(<arguments>)) as f: 

283 <block> 

284 

285 is equivalent to this: 

286 

287 f = <module>.open(<arguments>) 

288 try: 

289 <block> 

290 finally: 

291 f.close() 

292 

293 """ 

294 def __init__(self, thing): 

295 self.thing = thing 

296 def __enter__(self): 

297 return self.thing 

298 def __exit__(self, *exc_info): 

299 self.thing.close() 

300 

301 

302class _RedirectStream(AbstractContextManager): 

303 

304 _stream = None 

305 

306 def __init__(self, new_target): 

307 self._new_target = new_target 

308 # We use a list of old targets to make this CM re-entrant 

309 self._old_targets = [] 

310 

311 def __enter__(self): 

312 self._old_targets.append(getattr(sys, self._stream)) 

313 setattr(sys, self._stream, self._new_target) 

314 return self._new_target 

315 

316 def __exit__(self, exctype, excinst, exctb): 

317 setattr(sys, self._stream, self._old_targets.pop()) 

318 

319 

320class redirect_stdout(_RedirectStream): 

321 """Context manager for temporarily redirecting stdout to another file. 

322 

323 # How to send help() to stderr 

324 with redirect_stdout(sys.stderr): 

325 help(dir) 

326 

327 # How to write help() to a file 

328 with open('help.txt', 'w') as f: 

329 with redirect_stdout(f): 

330 help(pow) 

331 """ 

332 

333 _stream = "stdout" 

334 

335 

336class redirect_stderr(_RedirectStream): 

337 """Context manager for temporarily redirecting stderr to another file.""" 

338 

339 _stream = "stderr" 

340 

341 

342class suppress(AbstractContextManager): 

343 """Context manager to suppress specified exceptions 

344 

345 After the exception is suppressed, execution proceeds with the next 

346 statement following the with statement. 

347 

348 with suppress(FileNotFoundError): 

349 os.remove(somefile) 

350 # Execution still resumes here if the file was already removed 

351 """ 

352 

353 def __init__(self, *exceptions): 

354 self._exceptions = exceptions 

355 

356 def __enter__(self): 

357 pass 

358 

359 def __exit__(self, exctype, excinst, exctb): 

360 # Unlike isinstance and issubclass, CPython exception handling 

361 # currently only looks at the concrete type hierarchy (ignoring 

362 # the instance and subclass checking hooks). While Guido considers 

363 # that a bug rather than a feature, it's a fairly hard one to fix 

364 # due to various internal implementation details. suppress provides 

365 # the simpler issubclass based semantics, rather than trying to 

366 # exactly reproduce the limitations of the CPython interpreter. 

367 # 

368 # See http://bugs.python.org/issue12029 for more details 

369 return exctype is not None and issubclass(exctype, self._exceptions) 

370 

371 

372class _BaseExitStack: 

373 """A base class for ExitStack and AsyncExitStack.""" 

374 

375 @staticmethod 

376 def _create_exit_wrapper(cm, cm_exit): 

377 return MethodType(cm_exit, cm) 

378 

379 @staticmethod 

380 def _create_cb_wrapper(callback, /, *args, **kwds): 

381 def _exit_wrapper(exc_type, exc, tb): 

382 callback(*args, **kwds) 

383 return _exit_wrapper 

384 

385 def __init__(self): 

386 self._exit_callbacks = deque() 

387 

388 def pop_all(self): 

389 """Preserve the context stack by transferring it to a new instance.""" 

390 new_stack = type(self)() 

391 new_stack._exit_callbacks = self._exit_callbacks 

392 self._exit_callbacks = deque() 

393 return new_stack 

394 

395 def push(self, exit): 

396 """Registers a callback with the standard __exit__ method signature. 

397 

398 Can suppress exceptions the same way __exit__ method can. 

399 Also accepts any object with an __exit__ method (registering a call 

400 to the method instead of the object itself). 

401 """ 

402 # We use an unbound method rather than a bound method to follow 

403 # the standard lookup behaviour for special methods. 

404 _cb_type = type(exit) 

405 

406 try: 

407 exit_method = _cb_type.__exit__ 

408 except AttributeError: 

409 # Not a context manager, so assume it's a callable. 

410 self._push_exit_callback(exit) 

411 else: 

412 self._push_cm_exit(exit, exit_method) 

413 return exit # Allow use as a decorator. 

414 

415 def enter_context(self, cm): 

416 """Enters the supplied context manager. 

417 

418 If successful, also pushes its __exit__ method as a callback and 

419 returns the result of the __enter__ method. 

420 """ 

421 # We look up the special methods on the type to match the with 

422 # statement. 

423 _cm_type = type(cm) 

424 _exit = _cm_type.__exit__ 

425 result = _cm_type.__enter__(cm) 

426 self._push_cm_exit(cm, _exit) 

427 return result 

428 

429 def callback(*args, **kwds): 

430 """Registers an arbitrary callback and arguments. 

431 

432 Cannot suppress exceptions. 

433 """ 

434 if len(args) >= 2: 

435 self, callback, *args = args 

436 elif not args: 

437 raise TypeError("descriptor 'callback' of '_BaseExitStack' object " 

438 "needs an argument") 

439 elif 'callback' in kwds: 

440 callback = kwds.pop('callback') 

441 self, *args = args 

442 import warnings 

443 warnings.warn("Passing 'callback' as keyword argument is deprecated", 

444 DeprecationWarning, stacklevel=2) 

445 else: 

446 raise TypeError('callback expected at least 1 positional argument, ' 

447 'got %d' % (len(args)-1)) 

448 

449 _exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds) 

450 

451 # We changed the signature, so using @wraps is not appropriate, but 

452 # setting __wrapped__ may still help with introspection. 

453 _exit_wrapper.__wrapped__ = callback 

454 self._push_exit_callback(_exit_wrapper) 

455 return callback # Allow use as a decorator 

456 callback.__text_signature__ = '($self, callback, /, *args, **kwds)' 

457 

458 def _push_cm_exit(self, cm, cm_exit): 

459 """Helper to correctly register callbacks to __exit__ methods.""" 

460 _exit_wrapper = self._create_exit_wrapper(cm, cm_exit) 

461 self._push_exit_callback(_exit_wrapper, True) 

462 

463 def _push_exit_callback(self, callback, is_sync=True): 

464 self._exit_callbacks.append((is_sync, callback)) 

465 

466 

467# Inspired by discussions on http://bugs.python.org/issue13585 

468class ExitStack(_BaseExitStack, AbstractContextManager): 

469 """Context manager for dynamic management of a stack of exit callbacks. 

470 

471 For example: 

472 with ExitStack() as stack: 

473 files = [stack.enter_context(open(fname)) for fname in filenames] 

474 # All opened files will automatically be closed at the end of 

475 # the with statement, even if attempts to open files later 

476 # in the list raise an exception. 

477 """ 

478 

479 def __enter__(self): 

480 return self 

481 

482 def __exit__(self, *exc_details): 

483 received_exc = exc_details[0] is not None 

484 

485 # We manipulate the exception state so it behaves as though 

486 # we were actually nesting multiple with statements 

487 frame_exc = sys.exc_info()[1] 

488 def _fix_exception_context(new_exc, old_exc): 

489 # Context may not be correct, so find the end of the chain 

490 while 1: 

491 exc_context = new_exc.__context__ 

492 if exc_context is old_exc: 

493 # Context is already set correctly (see issue 20317) 

494 return 

495 if exc_context is None or exc_context is frame_exc: 

496 break 

497 new_exc = exc_context 

498 # Change the end of the chain to point to the exception 

499 # we expect it to reference 

500 new_exc.__context__ = old_exc 

501 

502 # Callbacks are invoked in LIFO order to match the behaviour of 

503 # nested context managers 

504 suppressed_exc = False 

505 pending_raise = False 

506 while self._exit_callbacks: 

507 is_sync, cb = self._exit_callbacks.pop() 

508 assert is_sync 

509 try: 

510 if cb(*exc_details): 

511 suppressed_exc = True 

512 pending_raise = False 

513 exc_details = (None, None, None) 

514 except: 

515 new_exc_details = sys.exc_info() 

516 # simulate the stack of exceptions by setting the context 

517 _fix_exception_context(new_exc_details[1], exc_details[1]) 

518 pending_raise = True 

519 exc_details = new_exc_details 

520 if pending_raise: 

521 try: 

522 # bare "raise exc_details[1]" replaces our carefully 

523 # set-up context 

524 fixed_ctx = exc_details[1].__context__ 

525 raise exc_details[1] 

526 except BaseException: 

527 exc_details[1].__context__ = fixed_ctx 

528 raise 

529 return received_exc and suppressed_exc 

530 

531 def close(self): 

532 """Immediately unwind the context stack.""" 

533 self.__exit__(None, None, None) 

534 

535 

536# Inspired by discussions on https://bugs.python.org/issue29302 

537class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager): 

538 """Async context manager for dynamic management of a stack of exit 

539 callbacks. 

540 

541 For example: 

542 async with AsyncExitStack() as stack: 

543 connections = [await stack.enter_async_context(get_connection()) 

544 for i in range(5)] 

545 # All opened connections will automatically be released at the 

546 # end of the async with statement, even if attempts to open a 

547 # connection later in the list raise an exception. 

548 """ 

549 

550 @staticmethod 

551 def _create_async_exit_wrapper(cm, cm_exit): 

552 return MethodType(cm_exit, cm) 

553 

554 @staticmethod 

555 def _create_async_cb_wrapper(callback, /, *args, **kwds): 

556 async def _exit_wrapper(exc_type, exc, tb): 

557 await callback(*args, **kwds) 

558 return _exit_wrapper 

559 

560 async def enter_async_context(self, cm): 

561 """Enters the supplied async context manager. 

562 

563 If successful, also pushes its __aexit__ method as a callback and 

564 returns the result of the __aenter__ method. 

565 """ 

566 _cm_type = type(cm) 

567 _exit = _cm_type.__aexit__ 

568 result = await _cm_type.__aenter__(cm) 

569 self._push_async_cm_exit(cm, _exit) 

570 return result 

571 

572 def push_async_exit(self, exit): 

573 """Registers a coroutine function with the standard __aexit__ method 

574 signature. 

575 

576 Can suppress exceptions the same way __aexit__ method can. 

577 Also accepts any object with an __aexit__ method (registering a call 

578 to the method instead of the object itself). 

579 """ 

580 _cb_type = type(exit) 

581 try: 

582 exit_method = _cb_type.__aexit__ 

583 except AttributeError: 

584 # Not an async context manager, so assume it's a coroutine function 

585 self._push_exit_callback(exit, False) 

586 else: 

587 self._push_async_cm_exit(exit, exit_method) 

588 return exit # Allow use as a decorator 

589 

590 def push_async_callback(*args, **kwds): 

591 """Registers an arbitrary coroutine function and arguments. 

592 

593 Cannot suppress exceptions. 

594 """ 

595 if len(args) >= 2: 

596 self, callback, *args = args 

597 elif not args: 

598 raise TypeError("descriptor 'push_async_callback' of " 

599 "'AsyncExitStack' object needs an argument") 

600 elif 'callback' in kwds: 

601 callback = kwds.pop('callback') 

602 self, *args = args 

603 import warnings 

604 warnings.warn("Passing 'callback' as keyword argument is deprecated", 

605 DeprecationWarning, stacklevel=2) 

606 else: 

607 raise TypeError('push_async_callback expected at least 1 ' 

608 'positional argument, got %d' % (len(args)-1)) 

609 

610 _exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds) 

611 

612 # We changed the signature, so using @wraps is not appropriate, but 

613 # setting __wrapped__ may still help with introspection. 

614 _exit_wrapper.__wrapped__ = callback 

615 self._push_exit_callback(_exit_wrapper, False) 

616 return callback # Allow use as a decorator 

617 push_async_callback.__text_signature__ = '($self, callback, /, *args, **kwds)' 

618 

619 async def aclose(self): 

620 """Immediately unwind the context stack.""" 

621 await self.__aexit__(None, None, None) 

622 

623 def _push_async_cm_exit(self, cm, cm_exit): 

624 """Helper to correctly register coroutine function to __aexit__ 

625 method.""" 

626 _exit_wrapper = self._create_async_exit_wrapper(cm, cm_exit) 

627 self._push_exit_callback(_exit_wrapper, False) 

628 

629 async def __aenter__(self): 

630 return self 

631 

632 async def __aexit__(self, *exc_details): 

633 received_exc = exc_details[0] is not None 

634 

635 # We manipulate the exception state so it behaves as though 

636 # we were actually nesting multiple with statements 

637 frame_exc = sys.exc_info()[1] 

638 def _fix_exception_context(new_exc, old_exc): 

639 # Context may not be correct, so find the end of the chain 

640 while 1: 

641 exc_context = new_exc.__context__ 

642 if exc_context is old_exc: 

643 # Context is already set correctly (see issue 20317) 

644 return 

645 if exc_context is None or exc_context is frame_exc: 

646 break 

647 new_exc = exc_context 

648 # Change the end of the chain to point to the exception 

649 # we expect it to reference 

650 new_exc.__context__ = old_exc 

651 

652 # Callbacks are invoked in LIFO order to match the behaviour of 

653 # nested context managers 

654 suppressed_exc = False 

655 pending_raise = False 

656 while self._exit_callbacks: 

657 is_sync, cb = self._exit_callbacks.pop() 

658 try: 

659 if is_sync: 

660 cb_suppress = cb(*exc_details) 

661 else: 

662 cb_suppress = await cb(*exc_details) 

663 

664 if cb_suppress: 

665 suppressed_exc = True 

666 pending_raise = False 

667 exc_details = (None, None, None) 

668 except: 

669 new_exc_details = sys.exc_info() 

670 # simulate the stack of exceptions by setting the context 

671 _fix_exception_context(new_exc_details[1], exc_details[1]) 

672 pending_raise = True 

673 exc_details = new_exc_details 

674 if pending_raise: 

675 try: 

676 # bare "raise exc_details[1]" replaces our carefully 

677 # set-up context 

678 fixed_ctx = exc_details[1].__context__ 

679 raise exc_details[1] 

680 except BaseException: 

681 exc_details[1].__context__ = fixed_ctx 

682 raise 

683 return received_exc and suppressed_exc 

684 

685 

686class nullcontext(AbstractContextManager): 

687 """Context manager that does no additional processing. 

688 

689 Used as a stand-in for a normal context manager, when a particular 

690 block of code is only sometimes used with a normal context manager: 

691 

692 cm = optional_cm if condition else nullcontext() 

693 with cm: 

694 # Perform operation, using optional_cm if condition is True 

695 """ 

696 

697 def __init__(self, enter_result=None): 

698 self.enter_result = enter_result 

699 

700 def __enter__(self): 

701 return self.enter_result 

702 

703 def __exit__(self, *excinfo): 

704 pass