Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/exceptiongroup/_catch.py: 21%
78 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1from __future__ import annotations
3import inspect
4import sys
5from collections.abc import Callable, Iterable, Mapping
6from contextlib import AbstractContextManager
7from types import TracebackType
8from typing import TYPE_CHECKING, Any
10if sys.version_info < (3, 11):
11 from ._exceptions import BaseExceptionGroup
13if TYPE_CHECKING:
14 _Handler = Callable[[BaseException], Any]
17class _Catcher:
18 def __init__(self, handler_map: Mapping[tuple[type[BaseException], ...], _Handler]):
19 self._handler_map = handler_map
21 def __enter__(self) -> None:
22 pass
24 def __exit__(
25 self,
26 etype: type[BaseException] | None,
27 exc: BaseException | None,
28 tb: TracebackType | None,
29 ) -> bool:
30 if exc is not None:
31 unhandled = self.handle_exception(exc)
32 if unhandled is exc:
33 return False
34 elif unhandled is None:
35 return True
36 else:
37 if isinstance(exc, BaseExceptionGroup):
38 try:
39 raise unhandled from exc.__cause__
40 except BaseExceptionGroup:
41 # Change __context__ to __cause__ because Python 3.11 does this
42 # too
43 unhandled.__context__ = exc.__cause__
44 raise
46 raise unhandled from exc
48 return False
50 def handle_exception(self, exc: BaseException) -> BaseException | None:
51 excgroup: BaseExceptionGroup | None
52 if isinstance(exc, BaseExceptionGroup):
53 excgroup = exc
54 else:
55 excgroup = BaseExceptionGroup("", [exc])
57 new_exceptions: list[BaseException] = []
58 for exc_types, handler in self._handler_map.items():
59 matched, excgroup = excgroup.split(exc_types)
60 if matched:
61 try:
62 try:
63 raise matched
64 except BaseExceptionGroup:
65 result = handler(matched)
66 except BaseExceptionGroup as new_exc:
67 new_exceptions.extend(new_exc.exceptions)
68 except BaseException as new_exc:
69 new_exceptions.append(new_exc)
70 else:
71 if inspect.iscoroutine(result):
72 raise TypeError(
73 f"Error trying to handle {matched!r} with {handler!r}. "
74 "Exception handler must be a sync function."
75 ) from exc
77 if not excgroup:
78 break
80 if new_exceptions:
81 if len(new_exceptions) == 1:
82 return new_exceptions[0]
84 return BaseExceptionGroup("", new_exceptions)
85 elif (
86 excgroup and len(excgroup.exceptions) == 1 and excgroup.exceptions[0] is exc
87 ):
88 return exc
89 else:
90 return excgroup
93def catch(
94 __handlers: Mapping[type[BaseException] | Iterable[type[BaseException]], _Handler]
95) -> AbstractContextManager[None]:
96 if not isinstance(__handlers, Mapping):
97 raise TypeError("the argument must be a mapping")
99 handler_map: dict[
100 tuple[type[BaseException], ...], Callable[[BaseExceptionGroup]]
101 ] = {}
102 for type_or_iterable, handler in __handlers.items():
103 iterable: tuple[type[BaseException]]
104 if isinstance(type_or_iterable, type) and issubclass(
105 type_or_iterable, BaseException
106 ):
107 iterable = (type_or_iterable,)
108 elif isinstance(type_or_iterable, Iterable):
109 iterable = tuple(type_or_iterable)
110 else:
111 raise TypeError(
112 "each key must be either an exception classes or an iterable thereof"
113 )
115 if not callable(handler):
116 raise TypeError("handlers must be callable")
118 for exc_type in iterable:
119 if not isinstance(exc_type, type) or not issubclass(
120 exc_type, BaseException
121 ):
122 raise TypeError(
123 "each key must be either an exception classes or an iterable "
124 "thereof"
125 )
127 if issubclass(exc_type, BaseExceptionGroup):
128 raise TypeError(
129 "catching ExceptionGroup with catch() is not allowed. "
130 "Use except instead."
131 )
133 handler_map[iterable] = handler
135 return _Catcher(handler_map)