Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1""" 

2Custom transports, with nicely configured defaults. 

3 

4The following additional keyword arguments are currently supported by httpcore... 

5 

6* uds: str 

7* local_address: str 

8* retries: int 

9 

10Example usages... 

11 

12# Disable HTTP/2 on a single specific domain. 

13mounts = { 

14 "all://": httpx.HTTPTransport(http2=True), 

15 "all://*example.org": httpx.HTTPTransport() 

16} 

17 

18# Using advanced httpcore configuration, with connection retries. 

19transport = httpx.HTTPTransport(retries=1) 

20client = httpx.Client(transport=transport) 

21 

22# Using advanced httpcore configuration, with unix domain sockets. 

23transport = httpx.HTTPTransport(uds="socket.uds") 

24client = httpx.Client(transport=transport) 

25""" 

26from __future__ import annotations 

27 

28import contextlib 

29import typing 

30from types import TracebackType 

31 

32import httpcore 

33 

34from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context 

35from .._exceptions import ( 

36 ConnectError, 

37 ConnectTimeout, 

38 LocalProtocolError, 

39 NetworkError, 

40 PoolTimeout, 

41 ProtocolError, 

42 ProxyError, 

43 ReadError, 

44 ReadTimeout, 

45 RemoteProtocolError, 

46 TimeoutException, 

47 UnsupportedProtocol, 

48 WriteError, 

49 WriteTimeout, 

50) 

51from .._models import Request, Response 

52from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes 

53from .._urls import URL 

54from .base import AsyncBaseTransport, BaseTransport 

55 

56T = typing.TypeVar("T", bound="HTTPTransport") 

57A = typing.TypeVar("A", bound="AsyncHTTPTransport") 

58 

59SOCKET_OPTION = typing.Union[ 

60 typing.Tuple[int, int, int], 

61 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

62 typing.Tuple[int, int, None, int], 

63] 

64 

65 

66@contextlib.contextmanager 

67def map_httpcore_exceptions() -> typing.Iterator[None]: 

68 try: 

69 yield 

70 except Exception as exc: 

71 mapped_exc = None 

72 

73 for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): 

74 if not isinstance(exc, from_exc): 

75 continue 

76 # We want to map to the most specific exception we can find. 

77 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to 

78 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. 

79 if mapped_exc is None or issubclass(to_exc, mapped_exc): 

80 mapped_exc = to_exc 

81 

82 if mapped_exc is None: # pragma: no cover 

83 raise 

84 

85 message = str(exc) 

86 raise mapped_exc(message) from exc 

87 

88 

89HTTPCORE_EXC_MAP = { 

90 httpcore.TimeoutException: TimeoutException, 

91 httpcore.ConnectTimeout: ConnectTimeout, 

92 httpcore.ReadTimeout: ReadTimeout, 

93 httpcore.WriteTimeout: WriteTimeout, 

94 httpcore.PoolTimeout: PoolTimeout, 

95 httpcore.NetworkError: NetworkError, 

96 httpcore.ConnectError: ConnectError, 

97 httpcore.ReadError: ReadError, 

98 httpcore.WriteError: WriteError, 

99 httpcore.ProxyError: ProxyError, 

100 httpcore.UnsupportedProtocol: UnsupportedProtocol, 

101 httpcore.ProtocolError: ProtocolError, 

102 httpcore.LocalProtocolError: LocalProtocolError, 

103 httpcore.RemoteProtocolError: RemoteProtocolError, 

104} 

105 

106 

107class ResponseStream(SyncByteStream): 

108 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: 

109 self._httpcore_stream = httpcore_stream 

110 

111 def __iter__(self) -> typing.Iterator[bytes]: 

112 with map_httpcore_exceptions(): 

113 for part in self._httpcore_stream: 

114 yield part 

115 

116 def close(self) -> None: 

117 if hasattr(self._httpcore_stream, "close"): 

118 self._httpcore_stream.close() 

119 

120 

121class HTTPTransport(BaseTransport): 

122 def __init__( 

123 self, 

124 verify: VerifyTypes = True, 

125 cert: CertTypes | None = None, 

126 http1: bool = True, 

127 http2: bool = False, 

128 limits: Limits = DEFAULT_LIMITS, 

129 trust_env: bool = True, 

130 proxy: ProxyTypes | None = None, 

131 uds: str | None = None, 

132 local_address: str | None = None, 

133 retries: int = 0, 

134 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

135 ) -> None: 

136 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

137 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

138 

139 if proxy is None: 

140 self._pool = httpcore.ConnectionPool( 

141 ssl_context=ssl_context, 

142 max_connections=limits.max_connections, 

143 max_keepalive_connections=limits.max_keepalive_connections, 

144 keepalive_expiry=limits.keepalive_expiry, 

145 http1=http1, 

146 http2=http2, 

147 uds=uds, 

148 local_address=local_address, 

149 retries=retries, 

150 socket_options=socket_options, 

151 ) 

152 elif proxy.url.scheme in ("http", "https"): 

153 self._pool = httpcore.HTTPProxy( 

154 proxy_url=httpcore.URL( 

155 scheme=proxy.url.raw_scheme, 

156 host=proxy.url.raw_host, 

157 port=proxy.url.port, 

158 target=proxy.url.raw_path, 

159 ), 

160 proxy_auth=proxy.raw_auth, 

161 proxy_headers=proxy.headers.raw, 

162 ssl_context=ssl_context, 

163 proxy_ssl_context=proxy.ssl_context, 

164 max_connections=limits.max_connections, 

165 max_keepalive_connections=limits.max_keepalive_connections, 

166 keepalive_expiry=limits.keepalive_expiry, 

167 http1=http1, 

168 http2=http2, 

169 socket_options=socket_options, 

170 ) 

171 elif proxy.url.scheme == "socks5": 

172 try: 

173 import socksio # noqa 

174 except ImportError: # pragma: no cover 

175 raise ImportError( 

176 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

177 "Make sure to install httpx using `pip install httpx[socks]`." 

178 ) from None 

179 

180 self._pool = httpcore.SOCKSProxy( 

181 proxy_url=httpcore.URL( 

182 scheme=proxy.url.raw_scheme, 

183 host=proxy.url.raw_host, 

184 port=proxy.url.port, 

185 target=proxy.url.raw_path, 

186 ), 

187 proxy_auth=proxy.raw_auth, 

188 ssl_context=ssl_context, 

189 max_connections=limits.max_connections, 

190 max_keepalive_connections=limits.max_keepalive_connections, 

191 keepalive_expiry=limits.keepalive_expiry, 

192 http1=http1, 

193 http2=http2, 

194 ) 

195 else: # pragma: no cover 

196 raise ValueError( 

197 "Proxy protocol must be either 'http', 'https', or 'socks5'," 

198 f" but got {proxy.url.scheme!r}." 

199 ) 

200 

201 def __enter__(self: T) -> T: # Use generics for subclass support. 

202 self._pool.__enter__() 

203 return self 

204 

205 def __exit__( 

206 self, 

207 exc_type: type[BaseException] | None = None, 

208 exc_value: BaseException | None = None, 

209 traceback: TracebackType | None = None, 

210 ) -> None: 

211 with map_httpcore_exceptions(): 

212 self._pool.__exit__(exc_type, exc_value, traceback) 

213 

214 def handle_request( 

215 self, 

216 request: Request, 

217 ) -> Response: 

218 assert isinstance(request.stream, SyncByteStream) 

219 

220 req = httpcore.Request( 

221 method=request.method, 

222 url=httpcore.URL( 

223 scheme=request.url.raw_scheme, 

224 host=request.url.raw_host, 

225 port=request.url.port, 

226 target=request.url.raw_path, 

227 ), 

228 headers=request.headers.raw, 

229 content=request.stream, 

230 extensions=request.extensions, 

231 ) 

232 with map_httpcore_exceptions(): 

233 resp = self._pool.handle_request(req) 

234 

235 assert isinstance(resp.stream, typing.Iterable) 

236 

237 return Response( 

238 status_code=resp.status, 

239 headers=resp.headers, 

240 stream=ResponseStream(resp.stream), 

241 extensions=resp.extensions, 

242 ) 

243 

244 def close(self) -> None: 

245 self._pool.close() 

246 

247 

248class AsyncResponseStream(AsyncByteStream): 

249 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: 

250 self._httpcore_stream = httpcore_stream 

251 

252 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

253 with map_httpcore_exceptions(): 

254 async for part in self._httpcore_stream: 

255 yield part 

256 

257 async def aclose(self) -> None: 

258 if hasattr(self._httpcore_stream, "aclose"): 

259 await self._httpcore_stream.aclose() 

260 

261 

262class AsyncHTTPTransport(AsyncBaseTransport): 

263 def __init__( 

264 self, 

265 verify: VerifyTypes = True, 

266 cert: CertTypes | None = None, 

267 http1: bool = True, 

268 http2: bool = False, 

269 limits: Limits = DEFAULT_LIMITS, 

270 trust_env: bool = True, 

271 proxy: ProxyTypes | None = None, 

272 uds: str | None = None, 

273 local_address: str | None = None, 

274 retries: int = 0, 

275 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

276 ) -> None: 

277 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

278 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy 

279 

280 if proxy is None: 

281 self._pool = httpcore.AsyncConnectionPool( 

282 ssl_context=ssl_context, 

283 max_connections=limits.max_connections, 

284 max_keepalive_connections=limits.max_keepalive_connections, 

285 keepalive_expiry=limits.keepalive_expiry, 

286 http1=http1, 

287 http2=http2, 

288 uds=uds, 

289 local_address=local_address, 

290 retries=retries, 

291 socket_options=socket_options, 

292 ) 

293 elif proxy.url.scheme in ("http", "https"): 

294 self._pool = httpcore.AsyncHTTPProxy( 

295 proxy_url=httpcore.URL( 

296 scheme=proxy.url.raw_scheme, 

297 host=proxy.url.raw_host, 

298 port=proxy.url.port, 

299 target=proxy.url.raw_path, 

300 ), 

301 proxy_auth=proxy.raw_auth, 

302 proxy_headers=proxy.headers.raw, 

303 ssl_context=ssl_context, 

304 max_connections=limits.max_connections, 

305 max_keepalive_connections=limits.max_keepalive_connections, 

306 keepalive_expiry=limits.keepalive_expiry, 

307 http1=http1, 

308 http2=http2, 

309 socket_options=socket_options, 

310 ) 

311 elif proxy.url.scheme == "socks5": 

312 try: 

313 import socksio # noqa 

314 except ImportError: # pragma: no cover 

315 raise ImportError( 

316 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

317 "Make sure to install httpx using `pip install httpx[socks]`." 

318 ) from None 

319 

320 self._pool = httpcore.AsyncSOCKSProxy( 

321 proxy_url=httpcore.URL( 

322 scheme=proxy.url.raw_scheme, 

323 host=proxy.url.raw_host, 

324 port=proxy.url.port, 

325 target=proxy.url.raw_path, 

326 ), 

327 proxy_auth=proxy.raw_auth, 

328 ssl_context=ssl_context, 

329 max_connections=limits.max_connections, 

330 max_keepalive_connections=limits.max_keepalive_connections, 

331 keepalive_expiry=limits.keepalive_expiry, 

332 http1=http1, 

333 http2=http2, 

334 ) 

335 else: # pragma: no cover 

336 raise ValueError( 

337 "Proxy protocol must be either 'http', 'https', or 'socks5'," 

338 " but got {proxy.url.scheme!r}." 

339 ) 

340 

341 async def __aenter__(self: A) -> A: # Use generics for subclass support. 

342 await self._pool.__aenter__() 

343 return self 

344 

345 async def __aexit__( 

346 self, 

347 exc_type: type[BaseException] | None = None, 

348 exc_value: BaseException | None = None, 

349 traceback: TracebackType | None = None, 

350 ) -> None: 

351 with map_httpcore_exceptions(): 

352 await self._pool.__aexit__(exc_type, exc_value, traceback) 

353 

354 async def handle_async_request( 

355 self, 

356 request: Request, 

357 ) -> Response: 

358 assert isinstance(request.stream, AsyncByteStream) 

359 

360 req = httpcore.Request( 

361 method=request.method, 

362 url=httpcore.URL( 

363 scheme=request.url.raw_scheme, 

364 host=request.url.raw_host, 

365 port=request.url.port, 

366 target=request.url.raw_path, 

367 ), 

368 headers=request.headers.raw, 

369 content=request.stream, 

370 extensions=request.extensions, 

371 ) 

372 with map_httpcore_exceptions(): 

373 resp = await self._pool.handle_async_request(req) 

374 

375 assert isinstance(resp.stream, typing.AsyncIterable) 

376 

377 return Response( 

378 status_code=resp.status, 

379 headers=resp.headers, 

380 stream=AsyncResponseStream(resp.stream), 

381 extensions=resp.extensions, 

382 ) 

383 

384 async def aclose(self) -> None: 

385 await self._pool.aclose()