Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1""" 

2Custom transports, with nicely configured defaults. 

3 

4The following additional keyword arguments are currently supported by httpcore... 

5 

6* uds: str 

7* local_address: str 

8* retries: int 

9 

10Example usages... 

11 

12# Disable HTTP/2 on a single specific domain. 

13mounts = { 

14 "all://": httpx.HTTPTransport(http2=True), 

15 "all://*example.org": httpx.HTTPTransport() 

16} 

17 

18# Using advanced httpcore configuration, with connection retries. 

19transport = httpx.HTTPTransport(retries=1) 

20client = httpx.Client(transport=transport) 

21 

22# Using advanced httpcore configuration, with unix domain sockets. 

23transport = httpx.HTTPTransport(uds="socket.uds") 

24client = httpx.Client(transport=transport) 

25""" 

26 

27from __future__ import annotations 

28 

29import contextlib 

30import typing 

31from types import TracebackType 

32 

33if typing.TYPE_CHECKING: 

34 import ssl # pragma: no cover 

35 

36 import httpx # pragma: no cover 

37 

38from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context 

39from .._exceptions import ( 

40 ConnectError, 

41 ConnectTimeout, 

42 LocalProtocolError, 

43 NetworkError, 

44 PoolTimeout, 

45 ProtocolError, 

46 ProxyError, 

47 ReadError, 

48 ReadTimeout, 

49 RemoteProtocolError, 

50 TimeoutException, 

51 UnsupportedProtocol, 

52 WriteError, 

53 WriteTimeout, 

54) 

55from .._models import Request, Response 

56from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream 

57from .._urls import URL 

58from .base import AsyncBaseTransport, BaseTransport 

59 

60T = typing.TypeVar("T", bound="HTTPTransport") 

61A = typing.TypeVar("A", bound="AsyncHTTPTransport") 

62 

63SOCKET_OPTION = typing.Union[ 

64 typing.Tuple[int, int, int], 

65 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

66 typing.Tuple[int, int, None, int], 

67] 

68 

69__all__ = ["AsyncHTTPTransport", "HTTPTransport"] 

70 

71HTTPCORE_EXC_MAP: dict[type[Exception], type[httpx.HTTPError]] = {} 

72 

73 

74def _load_httpcore_exceptions() -> dict[type[Exception], type[httpx.HTTPError]]: 

75 import httpcore 

76 

77 return { 

78 httpcore.TimeoutException: TimeoutException, 

79 httpcore.ConnectTimeout: ConnectTimeout, 

80 httpcore.ReadTimeout: ReadTimeout, 

81 httpcore.WriteTimeout: WriteTimeout, 

82 httpcore.PoolTimeout: PoolTimeout, 

83 httpcore.NetworkError: NetworkError, 

84 httpcore.ConnectError: ConnectError, 

85 httpcore.ReadError: ReadError, 

86 httpcore.WriteError: WriteError, 

87 httpcore.ProxyError: ProxyError, 

88 httpcore.UnsupportedProtocol: UnsupportedProtocol, 

89 httpcore.ProtocolError: ProtocolError, 

90 httpcore.LocalProtocolError: LocalProtocolError, 

91 httpcore.RemoteProtocolError: RemoteProtocolError, 

92 } 

93 

94 

95@contextlib.contextmanager 

96def map_httpcore_exceptions() -> typing.Iterator[None]: 

97 global HTTPCORE_EXC_MAP 

98 if len(HTTPCORE_EXC_MAP) == 0: 

99 HTTPCORE_EXC_MAP = _load_httpcore_exceptions() 

100 try: 

101 yield 

102 except Exception as exc: 

103 mapped_exc = None 

104 

105 for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): 

106 if not isinstance(exc, from_exc): 

107 continue 

108 # We want to map to the most specific exception we can find. 

109 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to 

110 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. 

111 if mapped_exc is None or issubclass(to_exc, mapped_exc): 

112 mapped_exc = to_exc 

113 

114 if mapped_exc is None: # pragma: no cover 

115 raise 

116 

117 message = str(exc) 

118 raise mapped_exc(message) from exc 

119 

120 

121class ResponseStream(SyncByteStream): 

122 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: 

123 self._httpcore_stream = httpcore_stream 

124 

125 def __iter__(self) -> typing.Iterator[bytes]: 

126 with map_httpcore_exceptions(): 

127 for part in self._httpcore_stream: 

128 yield part 

129 

130 def close(self) -> None: 

131 if hasattr(self._httpcore_stream, "close"): 

132 self._httpcore_stream.close() 

133 

134 

135class HTTPTransport(BaseTransport): 

136 def __init__( 

137 self, 

138 verify: ssl.SSLContext | str | bool = True, 

139 cert: CertTypes | None = None, 

140 trust_env: bool = True, 

141 http1: bool = True, 

142 http2: bool = False, 

143 limits: Limits = DEFAULT_LIMITS, 

144 proxy: ProxyTypes | None = None, 

145 uds: str | None = None, 

146 local_address: str | None = None, 

147 retries: int = 0, 

148 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

149 ) -> None: 

150 import httpcore 

151 

152 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

153 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

154 

155 if proxy is None: 

156 self._pool = httpcore.ConnectionPool( 

157 ssl_context=ssl_context, 

158 max_connections=limits.max_connections, 

159 max_keepalive_connections=limits.max_keepalive_connections, 

160 keepalive_expiry=limits.keepalive_expiry, 

161 http1=http1, 

162 http2=http2, 

163 uds=uds, 

164 local_address=local_address, 

165 retries=retries, 

166 socket_options=socket_options, 

167 ) 

168 elif proxy.url.scheme in ("http", "https"): 

169 self._pool = httpcore.HTTPProxy( 

170 proxy_url=httpcore.URL( 

171 scheme=proxy.url.raw_scheme, 

172 host=proxy.url.raw_host, 

173 port=proxy.url.port, 

174 target=proxy.url.raw_path, 

175 ), 

176 proxy_auth=proxy.raw_auth, 

177 proxy_headers=proxy.headers.raw, 

178 ssl_context=ssl_context, 

179 proxy_ssl_context=proxy.ssl_context, 

180 max_connections=limits.max_connections, 

181 max_keepalive_connections=limits.max_keepalive_connections, 

182 keepalive_expiry=limits.keepalive_expiry, 

183 http1=http1, 

184 http2=http2, 

185 socket_options=socket_options, 

186 ) 

187 elif proxy.url.scheme in ("socks5", "socks5h"): 

188 try: 

189 import socksio # noqa 

190 except ImportError: # pragma: no cover 

191 raise ImportError( 

192 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

193 "Make sure to install httpx using `pip install httpx[socks]`." 

194 ) from None 

195 

196 self._pool = httpcore.SOCKSProxy( 

197 proxy_url=httpcore.URL( 

198 scheme=proxy.url.raw_scheme, 

199 host=proxy.url.raw_host, 

200 port=proxy.url.port, 

201 target=proxy.url.raw_path, 

202 ), 

203 proxy_auth=proxy.raw_auth, 

204 ssl_context=ssl_context, 

205 max_connections=limits.max_connections, 

206 max_keepalive_connections=limits.max_keepalive_connections, 

207 keepalive_expiry=limits.keepalive_expiry, 

208 http1=http1, 

209 http2=http2, 

210 ) 

211 else: # pragma: no cover 

212 raise ValueError( 

213 "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," 

214 f" but got {proxy.url.scheme!r}." 

215 ) 

216 

217 def __enter__(self: T) -> T: # Use generics for subclass support. 

218 self._pool.__enter__() 

219 return self 

220 

221 def __exit__( 

222 self, 

223 exc_type: type[BaseException] | None = None, 

224 exc_value: BaseException | None = None, 

225 traceback: TracebackType | None = None, 

226 ) -> None: 

227 with map_httpcore_exceptions(): 

228 self._pool.__exit__(exc_type, exc_value, traceback) 

229 

230 def handle_request( 

231 self, 

232 request: Request, 

233 ) -> Response: 

234 assert isinstance(request.stream, SyncByteStream) 

235 import httpcore 

236 

237 req = httpcore.Request( 

238 method=request.method, 

239 url=httpcore.URL( 

240 scheme=request.url.raw_scheme, 

241 host=request.url.raw_host, 

242 port=request.url.port, 

243 target=request.url.raw_path, 

244 ), 

245 headers=request.headers.raw, 

246 content=request.stream, 

247 extensions=request.extensions, 

248 ) 

249 with map_httpcore_exceptions(): 

250 resp = self._pool.handle_request(req) 

251 

252 assert isinstance(resp.stream, typing.Iterable) 

253 

254 return Response( 

255 status_code=resp.status, 

256 headers=resp.headers, 

257 stream=ResponseStream(resp.stream), 

258 extensions=resp.extensions, 

259 ) 

260 

261 def close(self) -> None: 

262 self._pool.close() 

263 

264 

265class AsyncResponseStream(AsyncByteStream): 

266 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: 

267 self._httpcore_stream = httpcore_stream 

268 

269 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

270 with map_httpcore_exceptions(): 

271 async for part in self._httpcore_stream: 

272 yield part 

273 

274 async def aclose(self) -> None: 

275 if hasattr(self._httpcore_stream, "aclose"): 

276 await self._httpcore_stream.aclose() 

277 

278 

279class AsyncHTTPTransport(AsyncBaseTransport): 

280 def __init__( 

281 self, 

282 verify: ssl.SSLContext | str | bool = True, 

283 cert: CertTypes | None = None, 

284 trust_env: bool = True, 

285 http1: bool = True, 

286 http2: bool = False, 

287 limits: Limits = DEFAULT_LIMITS, 

288 proxy: ProxyTypes | None = None, 

289 uds: str | None = None, 

290 local_address: str | None = None, 

291 retries: int = 0, 

292 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

293 ) -> None: 

294 import httpcore 

295 

296 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

297 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

298 

299 if proxy is None: 

300 self._pool = httpcore.AsyncConnectionPool( 

301 ssl_context=ssl_context, 

302 max_connections=limits.max_connections, 

303 max_keepalive_connections=limits.max_keepalive_connections, 

304 keepalive_expiry=limits.keepalive_expiry, 

305 http1=http1, 

306 http2=http2, 

307 uds=uds, 

308 local_address=local_address, 

309 retries=retries, 

310 socket_options=socket_options, 

311 ) 

312 elif proxy.url.scheme in ("http", "https"): 

313 self._pool = httpcore.AsyncHTTPProxy( 

314 proxy_url=httpcore.URL( 

315 scheme=proxy.url.raw_scheme, 

316 host=proxy.url.raw_host, 

317 port=proxy.url.port, 

318 target=proxy.url.raw_path, 

319 ), 

320 proxy_auth=proxy.raw_auth, 

321 proxy_headers=proxy.headers.raw, 

322 proxy_ssl_context=proxy.ssl_context, 

323 ssl_context=ssl_context, 

324 max_connections=limits.max_connections, 

325 max_keepalive_connections=limits.max_keepalive_connections, 

326 keepalive_expiry=limits.keepalive_expiry, 

327 http1=http1, 

328 http2=http2, 

329 socket_options=socket_options, 

330 ) 

331 elif proxy.url.scheme in ("socks5", "socks5h"): 

332 try: 

333 import socksio # noqa 

334 except ImportError: # pragma: no cover 

335 raise ImportError( 

336 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

337 "Make sure to install httpx using `pip install httpx[socks]`." 

338 ) from None 

339 

340 self._pool = httpcore.AsyncSOCKSProxy( 

341 proxy_url=httpcore.URL( 

342 scheme=proxy.url.raw_scheme, 

343 host=proxy.url.raw_host, 

344 port=proxy.url.port, 

345 target=proxy.url.raw_path, 

346 ), 

347 proxy_auth=proxy.raw_auth, 

348 ssl_context=ssl_context, 

349 max_connections=limits.max_connections, 

350 max_keepalive_connections=limits.max_keepalive_connections, 

351 keepalive_expiry=limits.keepalive_expiry, 

352 http1=http1, 

353 http2=http2, 

354 ) 

355 else: # pragma: no cover 

356 raise ValueError( 

357 "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," 

358 " but got {proxy.url.scheme!r}." 

359 ) 

360 

361 async def __aenter__(self: A) -> A: # Use generics for subclass support. 

362 await self._pool.__aenter__() 

363 return self 

364 

365 async def __aexit__( 

366 self, 

367 exc_type: type[BaseException] | None = None, 

368 exc_value: BaseException | None = None, 

369 traceback: TracebackType | None = None, 

370 ) -> None: 

371 with map_httpcore_exceptions(): 

372 await self._pool.__aexit__(exc_type, exc_value, traceback) 

373 

374 async def handle_async_request( 

375 self, 

376 request: Request, 

377 ) -> Response: 

378 assert isinstance(request.stream, AsyncByteStream) 

379 import httpcore 

380 

381 req = httpcore.Request( 

382 method=request.method, 

383 url=httpcore.URL( 

384 scheme=request.url.raw_scheme, 

385 host=request.url.raw_host, 

386 port=request.url.port, 

387 target=request.url.raw_path, 

388 ), 

389 headers=request.headers.raw, 

390 content=request.stream, 

391 extensions=request.extensions, 

392 ) 

393 with map_httpcore_exceptions(): 

394 resp = await self._pool.handle_async_request(req) 

395 

396 assert isinstance(resp.stream, typing.AsyncIterable) 

397 

398 return Response( 

399 status_code=resp.status, 

400 headers=resp.headers, 

401 stream=AsyncResponseStream(resp.stream), 

402 extensions=resp.extensions, 

403 ) 

404 

405 async def aclose(self) -> None: 

406 await self._pool.aclose()