Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/exceptiongroup/_catch.py: 21%

78 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1from __future__ import annotations 

2 

3import inspect 

4import sys 

5from collections.abc import Callable, Iterable, Mapping 

6from contextlib import AbstractContextManager 

7from types import TracebackType 

8from typing import TYPE_CHECKING, Any 

9 

10if sys.version_info < (3, 11): 

11 from ._exceptions import BaseExceptionGroup 

12 

13if TYPE_CHECKING: 

14 _Handler = Callable[[BaseException], Any] 

15 

16 

17class _Catcher: 

18 def __init__(self, handler_map: Mapping[tuple[type[BaseException], ...], _Handler]): 

19 self._handler_map = handler_map 

20 

21 def __enter__(self) -> None: 

22 pass 

23 

24 def __exit__( 

25 self, 

26 etype: type[BaseException] | None, 

27 exc: BaseException | None, 

28 tb: TracebackType | None, 

29 ) -> bool: 

30 if exc is not None: 

31 unhandled = self.handle_exception(exc) 

32 if unhandled is exc: 

33 return False 

34 elif unhandled is None: 

35 return True 

36 else: 

37 if isinstance(exc, BaseExceptionGroup): 

38 try: 

39 raise unhandled from exc.__cause__ 

40 except BaseExceptionGroup: 

41 # Change __context__ to __cause__ because Python 3.11 does this 

42 # too 

43 unhandled.__context__ = exc.__cause__ 

44 raise 

45 

46 raise unhandled from exc 

47 

48 return False 

49 

50 def handle_exception(self, exc: BaseException) -> BaseException | None: 

51 excgroup: BaseExceptionGroup | None 

52 if isinstance(exc, BaseExceptionGroup): 

53 excgroup = exc 

54 else: 

55 excgroup = BaseExceptionGroup("", [exc]) 

56 

57 new_exceptions: list[BaseException] = [] 

58 for exc_types, handler in self._handler_map.items(): 

59 matched, excgroup = excgroup.split(exc_types) 

60 if matched: 

61 try: 

62 try: 

63 raise matched 

64 except BaseExceptionGroup: 

65 result = handler(matched) 

66 except BaseExceptionGroup as new_exc: 

67 new_exceptions.extend(new_exc.exceptions) 

68 except BaseException as new_exc: 

69 new_exceptions.append(new_exc) 

70 else: 

71 if inspect.iscoroutine(result): 

72 raise TypeError( 

73 f"Error trying to handle {matched!r} with {handler!r}. " 

74 "Exception handler must be a sync function." 

75 ) from exc 

76 

77 if not excgroup: 

78 break 

79 

80 if new_exceptions: 

81 if len(new_exceptions) == 1: 

82 return new_exceptions[0] 

83 

84 return BaseExceptionGroup("", new_exceptions) 

85 elif ( 

86 excgroup and len(excgroup.exceptions) == 1 and excgroup.exceptions[0] is exc 

87 ): 

88 return exc 

89 else: 

90 return excgroup 

91 

92 

93def catch( 

94 __handlers: Mapping[type[BaseException] | Iterable[type[BaseException]], _Handler] 

95) -> AbstractContextManager[None]: 

96 if not isinstance(__handlers, Mapping): 

97 raise TypeError("the argument must be a mapping") 

98 

99 handler_map: dict[ 

100 tuple[type[BaseException], ...], Callable[[BaseExceptionGroup]] 

101 ] = {} 

102 for type_or_iterable, handler in __handlers.items(): 

103 iterable: tuple[type[BaseException]] 

104 if isinstance(type_or_iterable, type) and issubclass( 

105 type_or_iterable, BaseException 

106 ): 

107 iterable = (type_or_iterable,) 

108 elif isinstance(type_or_iterable, Iterable): 

109 iterable = tuple(type_or_iterable) 

110 else: 

111 raise TypeError( 

112 "each key must be either an exception classes or an iterable thereof" 

113 ) 

114 

115 if not callable(handler): 

116 raise TypeError("handlers must be callable") 

117 

118 for exc_type in iterable: 

119 if not isinstance(exc_type, type) or not issubclass( 

120 exc_type, BaseException 

121 ): 

122 raise TypeError( 

123 "each key must be either an exception classes or an iterable " 

124 "thereof" 

125 ) 

126 

127 if issubclass(exc_type, BaseExceptionGroup): 

128 raise TypeError( 

129 "catching ExceptionGroup with catch() is not allowed. " 

130 "Use except instead." 

131 ) 

132 

133 handler_map[iterable] = handler 

134 

135 return _Catcher(handler_map)