Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 35%

97 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2Custom transports, with nicely configured defaults. 

3 

4The following additional keyword arguments are currently supported by httpcore... 

5 

6* uds: str 

7* local_address: str 

8* retries: int 

9 

10Example usages... 

11 

12# Disable HTTP/2 on a single specific domain. 

13mounts = { 

14 "all://": httpx.HTTPTransport(http2=True), 

15 "all://*example.org": httpx.HTTPTransport() 

16} 

17 

18# Using advanced httpcore configuration, with connection retries. 

19transport = httpx.HTTPTransport(retries=1) 

20client = httpx.Client(transport=transport) 

21 

22# Using advanced httpcore configuration, with unix domain sockets. 

23transport = httpx.HTTPTransport(uds="socket.uds") 

24client = httpx.Client(transport=transport) 

25""" 

26import contextlib 

27import typing 

28from types import TracebackType 

29 

30import httpcore 

31 

32from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context 

33from .._exceptions import ( 

34 ConnectError, 

35 ConnectTimeout, 

36 LocalProtocolError, 

37 NetworkError, 

38 PoolTimeout, 

39 ProtocolError, 

40 ProxyError, 

41 ReadError, 

42 ReadTimeout, 

43 RemoteProtocolError, 

44 TimeoutException, 

45 UnsupportedProtocol, 

46 WriteError, 

47 WriteTimeout, 

48) 

49from .._models import Request, Response 

50from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes 

51from .base import AsyncBaseTransport, BaseTransport 

52 

53T = typing.TypeVar("T", bound="HTTPTransport") 

54A = typing.TypeVar("A", bound="AsyncHTTPTransport") 

55 

56 

57@contextlib.contextmanager 

58def map_httpcore_exceptions() -> typing.Iterator[None]: 

59 try: 

60 yield 

61 except Exception as exc: # noqa: PIE-786 

62 mapped_exc = None 

63 

64 for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): 

65 if not isinstance(exc, from_exc): 

66 continue 

67 # We want to map to the most specific exception we can find. 

68 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to 

69 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. 

70 if mapped_exc is None or issubclass(to_exc, mapped_exc): 

71 mapped_exc = to_exc 

72 

73 if mapped_exc is None: # pragma: no cover 

74 raise 

75 

76 message = str(exc) 

77 raise mapped_exc(message) from exc 

78 

79 

80HTTPCORE_EXC_MAP = { 

81 httpcore.TimeoutException: TimeoutException, 

82 httpcore.ConnectTimeout: ConnectTimeout, 

83 httpcore.ReadTimeout: ReadTimeout, 

84 httpcore.WriteTimeout: WriteTimeout, 

85 httpcore.PoolTimeout: PoolTimeout, 

86 httpcore.NetworkError: NetworkError, 

87 httpcore.ConnectError: ConnectError, 

88 httpcore.ReadError: ReadError, 

89 httpcore.WriteError: WriteError, 

90 httpcore.ProxyError: ProxyError, 

91 httpcore.UnsupportedProtocol: UnsupportedProtocol, 

92 httpcore.ProtocolError: ProtocolError, 

93 httpcore.LocalProtocolError: LocalProtocolError, 

94 httpcore.RemoteProtocolError: RemoteProtocolError, 

95} 

96 

97 

98class ResponseStream(SyncByteStream): 

99 def __init__(self, httpcore_stream: typing.Iterable[bytes]): 

100 self._httpcore_stream = httpcore_stream 

101 

102 def __iter__(self) -> typing.Iterator[bytes]: 

103 with map_httpcore_exceptions(): 

104 for part in self._httpcore_stream: 

105 yield part 

106 

107 def close(self) -> None: 

108 if hasattr(self._httpcore_stream, "close"): 

109 self._httpcore_stream.close() # type: ignore 

110 

111 

112class HTTPTransport(BaseTransport): 

113 def __init__( 

114 self, 

115 verify: VerifyTypes = True, 

116 cert: typing.Optional[CertTypes] = None, 

117 http1: bool = True, 

118 http2: bool = False, 

119 limits: Limits = DEFAULT_LIMITS, 

120 trust_env: bool = True, 

121 proxy: typing.Optional[Proxy] = None, 

122 uds: typing.Optional[str] = None, 

123 local_address: typing.Optional[str] = None, 

124 retries: int = 0, 

125 ) -> None: 

126 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

127 

128 if proxy is None: 

129 self._pool = httpcore.ConnectionPool( 

130 ssl_context=ssl_context, 

131 max_connections=limits.max_connections, 

132 max_keepalive_connections=limits.max_keepalive_connections, 

133 keepalive_expiry=limits.keepalive_expiry, 

134 http1=http1, 

135 http2=http2, 

136 uds=uds, 

137 local_address=local_address, 

138 retries=retries, 

139 ) 

140 elif proxy.url.scheme in ("http", "https"): 

141 self._pool = httpcore.HTTPProxy( 

142 proxy_url=httpcore.URL( 

143 scheme=proxy.url.raw_scheme, 

144 host=proxy.url.raw_host, 

145 port=proxy.url.port, 

146 target=proxy.url.raw_path, 

147 ), 

148 proxy_auth=proxy.raw_auth, 

149 proxy_headers=proxy.headers.raw, 

150 ssl_context=ssl_context, 

151 max_connections=limits.max_connections, 

152 max_keepalive_connections=limits.max_keepalive_connections, 

153 keepalive_expiry=limits.keepalive_expiry, 

154 http1=http1, 

155 http2=http2, 

156 ) 

157 elif proxy.url.scheme == "socks5": 

158 try: 

159 import socksio # noqa 

160 except ImportError: # pragma: no cover 

161 raise ImportError( 

162 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

163 "Make sure to install httpx using `pip install httpx[socks]`." 

164 ) from None 

165 

166 self._pool = httpcore.SOCKSProxy( 

167 proxy_url=httpcore.URL( 

168 scheme=proxy.url.raw_scheme, 

169 host=proxy.url.raw_host, 

170 port=proxy.url.port, 

171 target=proxy.url.raw_path, 

172 ), 

173 proxy_auth=proxy.raw_auth, 

174 ssl_context=ssl_context, 

175 max_connections=limits.max_connections, 

176 max_keepalive_connections=limits.max_keepalive_connections, 

177 keepalive_expiry=limits.keepalive_expiry, 

178 http1=http1, 

179 http2=http2, 

180 ) 

181 else: # pragma: no cover 

182 raise ValueError( 

183 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." 

184 ) 

185 

186 def __enter__(self: T) -> T: # Use generics for subclass support. 

187 self._pool.__enter__() 

188 return self 

189 

190 def __exit__( 

191 self, 

192 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

193 exc_value: typing.Optional[BaseException] = None, 

194 traceback: typing.Optional[TracebackType] = None, 

195 ) -> None: 

196 with map_httpcore_exceptions(): 

197 self._pool.__exit__(exc_type, exc_value, traceback) 

198 

199 def handle_request( 

200 self, 

201 request: Request, 

202 ) -> Response: 

203 assert isinstance(request.stream, SyncByteStream) 

204 

205 req = httpcore.Request( 

206 method=request.method, 

207 url=httpcore.URL( 

208 scheme=request.url.raw_scheme, 

209 host=request.url.raw_host, 

210 port=request.url.port, 

211 target=request.url.raw_path, 

212 ), 

213 headers=request.headers.raw, 

214 content=request.stream, 

215 extensions=request.extensions, 

216 ) 

217 with map_httpcore_exceptions(): 

218 resp = self._pool.handle_request(req) 

219 

220 assert isinstance(resp.stream, typing.Iterable) 

221 

222 return Response( 

223 status_code=resp.status, 

224 headers=resp.headers, 

225 stream=ResponseStream(resp.stream), 

226 extensions=resp.extensions, 

227 ) 

228 

229 def close(self) -> None: 

230 self._pool.close() 

231 

232 

233class AsyncResponseStream(AsyncByteStream): 

234 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]): 

235 self._httpcore_stream = httpcore_stream 

236 

237 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

238 with map_httpcore_exceptions(): 

239 async for part in self._httpcore_stream: 

240 yield part 

241 

242 async def aclose(self) -> None: 

243 if hasattr(self._httpcore_stream, "aclose"): 

244 await self._httpcore_stream.aclose() # type: ignore 

245 

246 

247class AsyncHTTPTransport(AsyncBaseTransport): 

248 def __init__( 

249 self, 

250 verify: VerifyTypes = True, 

251 cert: typing.Optional[CertTypes] = None, 

252 http1: bool = True, 

253 http2: bool = False, 

254 limits: Limits = DEFAULT_LIMITS, 

255 trust_env: bool = True, 

256 proxy: typing.Optional[Proxy] = None, 

257 uds: typing.Optional[str] = None, 

258 local_address: typing.Optional[str] = None, 

259 retries: int = 0, 

260 ) -> None: 

261 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

262 

263 if proxy is None: 

264 self._pool = httpcore.AsyncConnectionPool( 

265 ssl_context=ssl_context, 

266 max_connections=limits.max_connections, 

267 max_keepalive_connections=limits.max_keepalive_connections, 

268 keepalive_expiry=limits.keepalive_expiry, 

269 http1=http1, 

270 http2=http2, 

271 uds=uds, 

272 local_address=local_address, 

273 retries=retries, 

274 ) 

275 elif proxy.url.scheme in ("http", "https"): 

276 self._pool = httpcore.AsyncHTTPProxy( 

277 proxy_url=httpcore.URL( 

278 scheme=proxy.url.raw_scheme, 

279 host=proxy.url.raw_host, 

280 port=proxy.url.port, 

281 target=proxy.url.raw_path, 

282 ), 

283 proxy_auth=proxy.raw_auth, 

284 proxy_headers=proxy.headers.raw, 

285 ssl_context=ssl_context, 

286 max_connections=limits.max_connections, 

287 max_keepalive_connections=limits.max_keepalive_connections, 

288 keepalive_expiry=limits.keepalive_expiry, 

289 http1=http1, 

290 http2=http2, 

291 ) 

292 elif proxy.url.scheme == "socks5": 

293 try: 

294 import socksio # noqa 

295 except ImportError: # pragma: no cover 

296 raise ImportError( 

297 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

298 "Make sure to install httpx using `pip install httpx[socks]`." 

299 ) from None 

300 

301 self._pool = httpcore.AsyncSOCKSProxy( 

302 proxy_url=httpcore.URL( 

303 scheme=proxy.url.raw_scheme, 

304 host=proxy.url.raw_host, 

305 port=proxy.url.port, 

306 target=proxy.url.raw_path, 

307 ), 

308 proxy_auth=proxy.raw_auth, 

309 ssl_context=ssl_context, 

310 max_connections=limits.max_connections, 

311 max_keepalive_connections=limits.max_keepalive_connections, 

312 keepalive_expiry=limits.keepalive_expiry, 

313 http1=http1, 

314 http2=http2, 

315 ) 

316 else: # pragma: no cover 

317 raise ValueError( 

318 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." 

319 ) 

320 

321 async def __aenter__(self: A) -> A: # Use generics for subclass support. 

322 await self._pool.__aenter__() 

323 return self 

324 

325 async def __aexit__( 

326 self, 

327 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

328 exc_value: typing.Optional[BaseException] = None, 

329 traceback: typing.Optional[TracebackType] = None, 

330 ) -> None: 

331 with map_httpcore_exceptions(): 

332 await self._pool.__aexit__(exc_type, exc_value, traceback) 

333 

334 async def handle_async_request( 

335 self, 

336 request: Request, 

337 ) -> Response: 

338 assert isinstance(request.stream, AsyncByteStream) 

339 

340 req = httpcore.Request( 

341 method=request.method, 

342 url=httpcore.URL( 

343 scheme=request.url.raw_scheme, 

344 host=request.url.raw_host, 

345 port=request.url.port, 

346 target=request.url.raw_path, 

347 ), 

348 headers=request.headers.raw, 

349 content=request.stream, 

350 extensions=request.extensions, 

351 ) 

352 with map_httpcore_exceptions(): 

353 resp = await self._pool.handle_async_request(req) 

354 

355 assert isinstance(resp.stream, typing.AsyncIterable) 

356 

357 return Response( 

358 status_code=resp.status, 

359 headers=resp.headers, 

360 stream=AsyncResponseStream(resp.stream), 

361 extensions=resp.extensions, 

362 ) 

363 

364 async def aclose(self) -> None: 

365 await self._pool.aclose()