Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_synchronization.py: 38%

96 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import threading 

2from types import TracebackType 

3from typing import Optional, Type 

4 

5import sniffio 

6 

7from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions 

8 

9# Our async synchronization primatives use either 'anyio' or 'trio' depending 

10# on if they're running under asyncio or trio. 

11 

12try: 

13 import trio 

14except ImportError: # pragma: nocover 

15 trio = None # type: ignore 

16 

17try: 

18 import anyio 

19except ImportError: # pragma: nocover 

20 anyio = None # type: ignore 

21 

22 

23class AsyncLock: 

24 def __init__(self) -> None: 

25 self._backend = "" 

26 

27 def setup(self) -> None: 

28 """ 

29 Detect if we're running under 'asyncio' or 'trio' and create 

30 a lock with the correct implementation. 

31 """ 

32 self._backend = sniffio.current_async_library() 

33 if self._backend == "trio": 

34 if trio is None: # pragma: nocover 

35 raise RuntimeError( 

36 "Running under trio, requires the 'trio' package to be installed." 

37 ) 

38 self._trio_lock = trio.Lock() 

39 else: 

40 if anyio is None: # pragma: nocover 

41 raise RuntimeError( 

42 "Running under asyncio requires the 'anyio' package to be installed." 

43 ) 

44 self._anyio_lock = anyio.Lock() 

45 

46 async def __aenter__(self) -> "AsyncLock": 

47 if not self._backend: 

48 self.setup() 

49 

50 if self._backend == "trio": 

51 await self._trio_lock.acquire() 

52 else: 

53 await self._anyio_lock.acquire() 

54 

55 return self 

56 

57 async def __aexit__( 

58 self, 

59 exc_type: Optional[Type[BaseException]] = None, 

60 exc_value: Optional[BaseException] = None, 

61 traceback: Optional[TracebackType] = None, 

62 ) -> None: 

63 if self._backend == "trio": 

64 self._trio_lock.release() 

65 else: 

66 self._anyio_lock.release() 

67 

68 

69class AsyncEvent: 

70 def __init__(self) -> None: 

71 self._backend = "" 

72 

73 def setup(self) -> None: 

74 """ 

75 Detect if we're running under 'asyncio' or 'trio' and create 

76 a lock with the correct implementation. 

77 """ 

78 self._backend = sniffio.current_async_library() 

79 if self._backend == "trio": 

80 if trio is None: # pragma: nocover 

81 raise RuntimeError( 

82 "Running under trio requires the 'trio' package to be installed." 

83 ) 

84 self._trio_event = trio.Event() 

85 else: 

86 if anyio is None: # pragma: nocover 

87 raise RuntimeError( 

88 "Running under asyncio requires the 'anyio' package to be installed." 

89 ) 

90 self._anyio_event = anyio.Event() 

91 

92 def set(self) -> None: 

93 if not self._backend: 

94 self.setup() 

95 

96 if self._backend == "trio": 

97 self._trio_event.set() 

98 else: 

99 self._anyio_event.set() 

100 

101 async def wait(self, timeout: Optional[float] = None) -> None: 

102 if not self._backend: 

103 self.setup() 

104 

105 if self._backend == "trio": 

106 if trio is None: # pragma: nocover 

107 raise RuntimeError( 

108 "Running under trio requires the 'trio' package to be installed." 

109 ) 

110 

111 trio_exc_map: ExceptionMapping = {trio.TooSlowError: PoolTimeout} 

112 timeout_or_inf = float("inf") if timeout is None else timeout 

113 with map_exceptions(trio_exc_map): 

114 with trio.fail_after(timeout_or_inf): 

115 await self._trio_event.wait() 

116 else: 

117 if anyio is None: # pragma: nocover 

118 raise RuntimeError( 

119 "Running under asyncio requires the 'anyio' package to be installed." 

120 ) 

121 

122 anyio_exc_map: ExceptionMapping = {TimeoutError: PoolTimeout} 

123 with map_exceptions(anyio_exc_map): 

124 with anyio.fail_after(timeout): 

125 await self._anyio_event.wait() 

126 

127 

128class AsyncSemaphore: 

129 def __init__(self, bound: int) -> None: 

130 self._bound = bound 

131 self._backend = "" 

132 

133 def setup(self) -> None: 

134 """ 

135 Detect if we're running under 'asyncio' or 'trio' and create 

136 a semaphore with the correct implementation. 

137 """ 

138 self._backend = sniffio.current_async_library() 

139 if self._backend == "trio": 

140 if trio is None: # pragma: nocover 

141 raise RuntimeError( 

142 "Running under trio requires the 'trio' package to be installed." 

143 ) 

144 

145 self._trio_semaphore = trio.Semaphore( 

146 initial_value=self._bound, max_value=self._bound 

147 ) 

148 else: 

149 if anyio is None: # pragma: nocover 

150 raise RuntimeError( 

151 "Running under asyncio requires the 'anyio' package to be installed." 

152 ) 

153 

154 self._anyio_semaphore = anyio.Semaphore( 

155 initial_value=self._bound, max_value=self._bound 

156 ) 

157 

158 async def acquire(self) -> None: 

159 if not self._backend: 

160 self.setup() 

161 

162 if self._backend == "trio": 

163 await self._trio_semaphore.acquire() 

164 else: 

165 await self._anyio_semaphore.acquire() 

166 

167 async def release(self) -> None: 

168 if self._backend == "trio": 

169 self._trio_semaphore.release() 

170 else: 

171 self._anyio_semaphore.release() 

172 

173 

174# Our thread-based synchronization primitives... 

175 

176 

177class Lock: 

178 def __init__(self) -> None: 

179 self._lock = threading.Lock() 

180 

181 def __enter__(self) -> "Lock": 

182 self._lock.acquire() 

183 return self 

184 

185 def __exit__( 

186 self, 

187 exc_type: Optional[Type[BaseException]] = None, 

188 exc_value: Optional[BaseException] = None, 

189 traceback: Optional[TracebackType] = None, 

190 ) -> None: 

191 self._lock.release() 

192 

193 

194class Event: 

195 def __init__(self) -> None: 

196 self._event = threading.Event() 

197 

198 def set(self) -> None: 

199 self._event.set() 

200 

201 def wait(self, timeout: Optional[float] = None) -> None: 

202 if not self._event.wait(timeout=timeout): 

203 raise PoolTimeout() # pragma: nocover 

204 

205 

206class Semaphore: 

207 def __init__(self, bound: int) -> None: 

208 self._semaphore = threading.Semaphore(value=bound) 

209 

210 def acquire(self) -> None: 

211 self._semaphore.acquire() 

212 

213 def release(self) -> None: 

214 self._semaphore.release()