Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py: 26%

118 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import itertools 

2import logging 

3import ssl 

4from types import TracebackType 

5from typing import Iterable, Iterator, Optional, Type 

6 

7from .._exceptions import ConnectError, ConnectionNotAvailable, ConnectTimeout 

8from .._models import Origin, Request, Response 

9from .._ssl import default_ssl_context 

10from .._synchronization import AsyncLock 

11from .._trace import Trace 

12from ..backends.auto import AutoBackend 

13from ..backends.base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream 

14from .http11 import AsyncHTTP11Connection 

15from .interfaces import AsyncConnectionInterface 

16 

17RETRIES_BACKOFF_FACTOR = 0.5 # 0s, 0.5s, 1s, 2s, 4s, etc. 

18 

19 

20logger = logging.getLogger("httpcore.connection") 

21 

22 

23def exponential_backoff(factor: float) -> Iterator[float]: 

24 yield 0 

25 for n in itertools.count(2): 

26 yield factor * (2 ** (n - 2)) 

27 

28 

29class AsyncHTTPConnection(AsyncConnectionInterface): 

30 def __init__( 

31 self, 

32 origin: Origin, 

33 ssl_context: Optional[ssl.SSLContext] = None, 

34 keepalive_expiry: Optional[float] = None, 

35 http1: bool = True, 

36 http2: bool = False, 

37 retries: int = 0, 

38 local_address: Optional[str] = None, 

39 uds: Optional[str] = None, 

40 network_backend: Optional[AsyncNetworkBackend] = None, 

41 socket_options: Optional[Iterable[SOCKET_OPTION]] = None, 

42 ) -> None: 

43 self._origin = origin 

44 self._ssl_context = ssl_context 

45 self._keepalive_expiry = keepalive_expiry 

46 self._http1 = http1 

47 self._http2 = http2 

48 self._retries = retries 

49 self._local_address = local_address 

50 self._uds = uds 

51 

52 self._network_backend: AsyncNetworkBackend = ( 

53 AutoBackend() if network_backend is None else network_backend 

54 ) 

55 self._connection: Optional[AsyncConnectionInterface] = None 

56 self._connect_failed: bool = False 

57 self._request_lock = AsyncLock() 

58 self._socket_options = socket_options 

59 

60 async def handle_async_request(self, request: Request) -> Response: 

61 if not self.can_handle_request(request.url.origin): 

62 raise RuntimeError( 

63 f"Attempted to send request to {request.url.origin} on connection to {self._origin}" 

64 ) 

65 

66 async with self._request_lock: 

67 if self._connection is None: 

68 try: 

69 stream = await self._connect(request) 

70 

71 ssl_object = stream.get_extra_info("ssl_object") 

72 http2_negotiated = ( 

73 ssl_object is not None 

74 and ssl_object.selected_alpn_protocol() == "h2" 

75 ) 

76 if http2_negotiated or (self._http2 and not self._http1): 

77 from .http2 import AsyncHTTP2Connection 

78 

79 self._connection = AsyncHTTP2Connection( 

80 origin=self._origin, 

81 stream=stream, 

82 keepalive_expiry=self._keepalive_expiry, 

83 ) 

84 else: 

85 self._connection = AsyncHTTP11Connection( 

86 origin=self._origin, 

87 stream=stream, 

88 keepalive_expiry=self._keepalive_expiry, 

89 ) 

90 except Exception as exc: 

91 self._connect_failed = True 

92 raise exc 

93 elif not self._connection.is_available(): 

94 raise ConnectionNotAvailable() 

95 

96 return await self._connection.handle_async_request(request) 

97 

98 async def _connect(self, request: Request) -> AsyncNetworkStream: 

99 timeouts = request.extensions.get("timeout", {}) 

100 sni_hostname = request.extensions.get("sni_hostname", None) 

101 timeout = timeouts.get("connect", None) 

102 

103 retries_left = self._retries 

104 delays = exponential_backoff(factor=RETRIES_BACKOFF_FACTOR) 

105 

106 while True: 

107 try: 

108 if self._uds is None: 

109 kwargs = { 

110 "host": self._origin.host.decode("ascii"), 

111 "port": self._origin.port, 

112 "local_address": self._local_address, 

113 "timeout": timeout, 

114 "socket_options": self._socket_options, 

115 } 

116 async with Trace("connect_tcp", logger, request, kwargs) as trace: 

117 stream = await self._network_backend.connect_tcp(**kwargs) 

118 trace.return_value = stream 

119 else: 

120 kwargs = { 

121 "path": self._uds, 

122 "timeout": timeout, 

123 "socket_options": self._socket_options, 

124 } 

125 async with Trace( 

126 "connect_unix_socket", logger, request, kwargs 

127 ) as trace: 

128 stream = await self._network_backend.connect_unix_socket( 

129 **kwargs 

130 ) 

131 trace.return_value = stream 

132 

133 if self._origin.scheme == b"https": 

134 ssl_context = ( 

135 default_ssl_context() 

136 if self._ssl_context is None 

137 else self._ssl_context 

138 ) 

139 alpn_protocols = ["http/1.1", "h2"] if self._http2 else ["http/1.1"] 

140 ssl_context.set_alpn_protocols(alpn_protocols) 

141 

142 kwargs = { 

143 "ssl_context": ssl_context, 

144 "server_hostname": sni_hostname 

145 or self._origin.host.decode("ascii"), 

146 "timeout": timeout, 

147 } 

148 async with Trace("start_tls", logger, request, kwargs) as trace: 

149 stream = await stream.start_tls(**kwargs) 

150 trace.return_value = stream 

151 return stream 

152 except (ConnectError, ConnectTimeout): 

153 if retries_left <= 0: 

154 raise 

155 retries_left -= 1 

156 delay = next(delays) 

157 async with Trace("retry", logger, request, kwargs) as trace: 

158 await self._network_backend.sleep(delay) 

159 

160 def can_handle_request(self, origin: Origin) -> bool: 

161 return origin == self._origin 

162 

163 async def aclose(self) -> None: 

164 if self._connection is not None: 

165 async with Trace("close", logger, None, {}): 

166 await self._connection.aclose() 

167 

168 def is_available(self) -> bool: 

169 if self._connection is None: 

170 # If HTTP/2 support is enabled, and the resulting connection could 

171 # end up as HTTP/2 then we should indicate the connection as being 

172 # available to service multiple requests. 

173 return ( 

174 self._http2 

175 and (self._origin.scheme == b"https" or not self._http1) 

176 and not self._connect_failed 

177 ) 

178 return self._connection.is_available() 

179 

180 def has_expired(self) -> bool: 

181 if self._connection is None: 

182 return self._connect_failed 

183 return self._connection.has_expired() 

184 

185 def is_idle(self) -> bool: 

186 if self._connection is None: 

187 return self._connect_failed 

188 return self._connection.is_idle() 

189 

190 def is_closed(self) -> bool: 

191 if self._connection is None: 

192 return self._connect_failed 

193 return self._connection.is_closed() 

194 

195 def info(self) -> str: 

196 if self._connection is None: 

197 return "CONNECTION FAILED" if self._connect_failed else "CONNECTING" 

198 return self._connection.info() 

199 

200 def __repr__(self) -> str: 

201 return f"<{self.__class__.__name__} [{self.info()}]>" 

202 

203 # These context managers are not used in the standard flow, but are 

204 # useful for testing or working with connection instances directly. 

205 

206 async def __aenter__(self) -> "AsyncHTTPConnection": 

207 return self 

208 

209 async def __aexit__( 

210 self, 

211 exc_type: Optional[Type[BaseException]] = None, 

212 exc_value: Optional[BaseException] = None, 

213 traceback: Optional[TracebackType] = None, 

214 ) -> None: 

215 await self.aclose()