Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

103 statements  

1""" 

2Custom transports, with nicely configured defaults. 

3 

4The following additional keyword arguments are currently supported by httpcore... 

5 

6* uds: str 

7* local_address: str 

8* retries: int 

9 

10Example usages... 

11 

12# Disable HTTP/2 on a single specific domain. 

13mounts = { 

14 "all://": httpx.HTTPTransport(http2=True), 

15 "all://*example.org": httpx.HTTPTransport() 

16} 

17 

18# Using advanced httpcore configuration, with connection retries. 

19transport = httpx.HTTPTransport(retries=1) 

20client = httpx.Client(transport=transport) 

21 

22# Using advanced httpcore configuration, with unix domain sockets. 

23transport = httpx.HTTPTransport(uds="socket.uds") 

24client = httpx.Client(transport=transport) 

25""" 

26 

27from __future__ import annotations 

28 

29import contextlib 

30import typing 

31from types import TracebackType 

32 

33import httpcore 

34 

35from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context 

36from .._exceptions import ( 

37 ConnectError, 

38 ConnectTimeout, 

39 LocalProtocolError, 

40 NetworkError, 

41 PoolTimeout, 

42 ProtocolError, 

43 ProxyError, 

44 ReadError, 

45 ReadTimeout, 

46 RemoteProtocolError, 

47 TimeoutException, 

48 UnsupportedProtocol, 

49 WriteError, 

50 WriteTimeout, 

51) 

52from .._models import Request, Response 

53from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes 

54from .._urls import URL 

55from .base import AsyncBaseTransport, BaseTransport 

56 

57T = typing.TypeVar("T", bound="HTTPTransport") 

58A = typing.TypeVar("A", bound="AsyncHTTPTransport") 

59 

60SOCKET_OPTION = typing.Union[ 

61 typing.Tuple[int, int, int], 

62 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

63 typing.Tuple[int, int, None, int], 

64] 

65 

66__all__ = ["AsyncHTTPTransport", "HTTPTransport"] 

67 

68 

69@contextlib.contextmanager 

70def map_httpcore_exceptions() -> typing.Iterator[None]: 

71 try: 

72 yield 

73 except Exception as exc: 

74 mapped_exc = None 

75 

76 for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): 

77 if not isinstance(exc, from_exc): 

78 continue 

79 # We want to map to the most specific exception we can find. 

80 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to 

81 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. 

82 if mapped_exc is None or issubclass(to_exc, mapped_exc): 

83 mapped_exc = to_exc 

84 

85 if mapped_exc is None: # pragma: no cover 

86 raise 

87 

88 message = str(exc) 

89 raise mapped_exc(message) from exc 

90 

91 

92HTTPCORE_EXC_MAP = { 

93 httpcore.TimeoutException: TimeoutException, 

94 httpcore.ConnectTimeout: ConnectTimeout, 

95 httpcore.ReadTimeout: ReadTimeout, 

96 httpcore.WriteTimeout: WriteTimeout, 

97 httpcore.PoolTimeout: PoolTimeout, 

98 httpcore.NetworkError: NetworkError, 

99 httpcore.ConnectError: ConnectError, 

100 httpcore.ReadError: ReadError, 

101 httpcore.WriteError: WriteError, 

102 httpcore.ProxyError: ProxyError, 

103 httpcore.UnsupportedProtocol: UnsupportedProtocol, 

104 httpcore.ProtocolError: ProtocolError, 

105 httpcore.LocalProtocolError: LocalProtocolError, 

106 httpcore.RemoteProtocolError: RemoteProtocolError, 

107} 

108 

109 

110class ResponseStream(SyncByteStream): 

111 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: 

112 self._httpcore_stream = httpcore_stream 

113 

114 def __iter__(self) -> typing.Iterator[bytes]: 

115 with map_httpcore_exceptions(): 

116 for part in self._httpcore_stream: 

117 yield part 

118 

119 def close(self) -> None: 

120 if hasattr(self._httpcore_stream, "close"): 

121 self._httpcore_stream.close() 

122 

123 

124class HTTPTransport(BaseTransport): 

125 def __init__( 

126 self, 

127 verify: VerifyTypes = True, 

128 cert: CertTypes | None = None, 

129 http1: bool = True, 

130 http2: bool = False, 

131 limits: Limits = DEFAULT_LIMITS, 

132 trust_env: bool = True, 

133 proxy: ProxyTypes | None = None, 

134 uds: str | None = None, 

135 local_address: str | None = None, 

136 retries: int = 0, 

137 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

138 ) -> None: 

139 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

140 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

141 

142 if proxy is None: 

143 self._pool = httpcore.ConnectionPool( 

144 ssl_context=ssl_context, 

145 max_connections=limits.max_connections, 

146 max_keepalive_connections=limits.max_keepalive_connections, 

147 keepalive_expiry=limits.keepalive_expiry, 

148 http1=http1, 

149 http2=http2, 

150 uds=uds, 

151 local_address=local_address, 

152 retries=retries, 

153 socket_options=socket_options, 

154 ) 

155 elif proxy.url.scheme in ("http", "https"): 

156 self._pool = httpcore.HTTPProxy( 

157 proxy_url=httpcore.URL( 

158 scheme=proxy.url.raw_scheme, 

159 host=proxy.url.raw_host, 

160 port=proxy.url.port, 

161 target=proxy.url.raw_path, 

162 ), 

163 proxy_auth=proxy.raw_auth, 

164 proxy_headers=proxy.headers.raw, 

165 ssl_context=ssl_context, 

166 proxy_ssl_context=proxy.ssl_context, 

167 max_connections=limits.max_connections, 

168 max_keepalive_connections=limits.max_keepalive_connections, 

169 keepalive_expiry=limits.keepalive_expiry, 

170 http1=http1, 

171 http2=http2, 

172 socket_options=socket_options, 

173 ) 

174 elif proxy.url.scheme == "socks5": 

175 try: 

176 import socksio # noqa 

177 except ImportError: # pragma: no cover 

178 raise ImportError( 

179 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

180 "Make sure to install httpx using `pip install httpx[socks]`." 

181 ) from None 

182 

183 self._pool = httpcore.SOCKSProxy( 

184 proxy_url=httpcore.URL( 

185 scheme=proxy.url.raw_scheme, 

186 host=proxy.url.raw_host, 

187 port=proxy.url.port, 

188 target=proxy.url.raw_path, 

189 ), 

190 proxy_auth=proxy.raw_auth, 

191 ssl_context=ssl_context, 

192 max_connections=limits.max_connections, 

193 max_keepalive_connections=limits.max_keepalive_connections, 

194 keepalive_expiry=limits.keepalive_expiry, 

195 http1=http1, 

196 http2=http2, 

197 ) 

198 else: # pragma: no cover 

199 raise ValueError( 

200 "Proxy protocol must be either 'http', 'https', or 'socks5'," 

201 f" but got {proxy.url.scheme!r}." 

202 ) 

203 

204 def __enter__(self: T) -> T: # Use generics for subclass support. 

205 self._pool.__enter__() 

206 return self 

207 

208 def __exit__( 

209 self, 

210 exc_type: type[BaseException] | None = None, 

211 exc_value: BaseException | None = None, 

212 traceback: TracebackType | None = None, 

213 ) -> None: 

214 with map_httpcore_exceptions(): 

215 self._pool.__exit__(exc_type, exc_value, traceback) 

216 

217 def handle_request( 

218 self, 

219 request: Request, 

220 ) -> Response: 

221 assert isinstance(request.stream, SyncByteStream) 

222 

223 req = httpcore.Request( 

224 method=request.method, 

225 url=httpcore.URL( 

226 scheme=request.url.raw_scheme, 

227 host=request.url.raw_host, 

228 port=request.url.port, 

229 target=request.url.raw_path, 

230 ), 

231 headers=request.headers.raw, 

232 content=request.stream, 

233 extensions=request.extensions, 

234 ) 

235 with map_httpcore_exceptions(): 

236 resp = self._pool.handle_request(req) 

237 

238 assert isinstance(resp.stream, typing.Iterable) 

239 

240 return Response( 

241 status_code=resp.status, 

242 headers=resp.headers, 

243 stream=ResponseStream(resp.stream), 

244 extensions=resp.extensions, 

245 ) 

246 

247 def close(self) -> None: 

248 self._pool.close() 

249 

250 

251class AsyncResponseStream(AsyncByteStream): 

252 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: 

253 self._httpcore_stream = httpcore_stream 

254 

255 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

256 with map_httpcore_exceptions(): 

257 async for part in self._httpcore_stream: 

258 yield part 

259 

260 async def aclose(self) -> None: 

261 if hasattr(self._httpcore_stream, "aclose"): 

262 await self._httpcore_stream.aclose() 

263 

264 

265class AsyncHTTPTransport(AsyncBaseTransport): 

266 def __init__( 

267 self, 

268 verify: VerifyTypes = True, 

269 cert: CertTypes | None = None, 

270 http1: bool = True, 

271 http2: bool = False, 

272 limits: Limits = DEFAULT_LIMITS, 

273 trust_env: bool = True, 

274 proxy: ProxyTypes | None = None, 

275 uds: str | None = None, 

276 local_address: str | None = None, 

277 retries: int = 0, 

278 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

279 ) -> None: 

280 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

281 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

282 

283 if proxy is None: 

284 self._pool = httpcore.AsyncConnectionPool( 

285 ssl_context=ssl_context, 

286 max_connections=limits.max_connections, 

287 max_keepalive_connections=limits.max_keepalive_connections, 

288 keepalive_expiry=limits.keepalive_expiry, 

289 http1=http1, 

290 http2=http2, 

291 uds=uds, 

292 local_address=local_address, 

293 retries=retries, 

294 socket_options=socket_options, 

295 ) 

296 elif proxy.url.scheme in ("http", "https"): 

297 self._pool = httpcore.AsyncHTTPProxy( 

298 proxy_url=httpcore.URL( 

299 scheme=proxy.url.raw_scheme, 

300 host=proxy.url.raw_host, 

301 port=proxy.url.port, 

302 target=proxy.url.raw_path, 

303 ), 

304 proxy_auth=proxy.raw_auth, 

305 proxy_headers=proxy.headers.raw, 

306 proxy_ssl_context=proxy.ssl_context, 

307 ssl_context=ssl_context, 

308 max_connections=limits.max_connections, 

309 max_keepalive_connections=limits.max_keepalive_connections, 

310 keepalive_expiry=limits.keepalive_expiry, 

311 http1=http1, 

312 http2=http2, 

313 socket_options=socket_options, 

314 ) 

315 elif proxy.url.scheme == "socks5": 

316 try: 

317 import socksio # noqa 

318 except ImportError: # pragma: no cover 

319 raise ImportError( 

320 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

321 "Make sure to install httpx using `pip install httpx[socks]`." 

322 ) from None 

323 

324 self._pool = httpcore.AsyncSOCKSProxy( 

325 proxy_url=httpcore.URL( 

326 scheme=proxy.url.raw_scheme, 

327 host=proxy.url.raw_host, 

328 port=proxy.url.port, 

329 target=proxy.url.raw_path, 

330 ), 

331 proxy_auth=proxy.raw_auth, 

332 ssl_context=ssl_context, 

333 max_connections=limits.max_connections, 

334 max_keepalive_connections=limits.max_keepalive_connections, 

335 keepalive_expiry=limits.keepalive_expiry, 

336 http1=http1, 

337 http2=http2, 

338 ) 

339 else: # pragma: no cover 

340 raise ValueError( 

341 "Proxy protocol must be either 'http', 'https', or 'socks5'," 

342 " but got {proxy.url.scheme!r}." 

343 ) 

344 

345 async def __aenter__(self: A) -> A: # Use generics for subclass support. 

346 await self._pool.__aenter__() 

347 return self 

348 

349 async def __aexit__( 

350 self, 

351 exc_type: type[BaseException] | None = None, 

352 exc_value: BaseException | None = None, 

353 traceback: TracebackType | None = None, 

354 ) -> None: 

355 with map_httpcore_exceptions(): 

356 await self._pool.__aexit__(exc_type, exc_value, traceback) 

357 

358 async def handle_async_request( 

359 self, 

360 request: Request, 

361 ) -> Response: 

362 assert isinstance(request.stream, AsyncByteStream) 

363 

364 req = httpcore.Request( 

365 method=request.method, 

366 url=httpcore.URL( 

367 scheme=request.url.raw_scheme, 

368 host=request.url.raw_host, 

369 port=request.url.port, 

370 target=request.url.raw_path, 

371 ), 

372 headers=request.headers.raw, 

373 content=request.stream, 

374 extensions=request.extensions, 

375 ) 

376 with map_httpcore_exceptions(): 

377 resp = await self._pool.handle_async_request(req) 

378 

379 assert isinstance(resp.stream, typing.AsyncIterable) 

380 

381 return Response( 

382 status_code=resp.status, 

383 headers=resp.headers, 

384 stream=AsyncResponseStream(resp.stream), 

385 extensions=resp.extensions, 

386 ) 

387 

388 async def aclose(self) -> None: 

389 await self._pool.aclose()