Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 36%

98 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1""" 

2Custom transports, with nicely configured defaults. 

3 

4The following additional keyword arguments are currently supported by httpcore... 

5 

6* uds: str 

7* local_address: str 

8* retries: int 

9 

10Example usages... 

11 

12# Disable HTTP/2 on a single specific domain. 

13mounts = { 

14 "all://": httpx.HTTPTransport(http2=True), 

15 "all://*example.org": httpx.HTTPTransport() 

16} 

17 

18# Using advanced httpcore configuration, with connection retries. 

19transport = httpx.HTTPTransport(retries=1) 

20client = httpx.Client(transport=transport) 

21 

22# Using advanced httpcore configuration, with unix domain sockets. 

23transport = httpx.HTTPTransport(uds="socket.uds") 

24client = httpx.Client(transport=transport) 

25""" 

26import contextlib 

27import typing 

28from types import TracebackType 

29 

30import httpcore 

31 

32from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context 

33from .._exceptions import ( 

34 ConnectError, 

35 ConnectTimeout, 

36 LocalProtocolError, 

37 NetworkError, 

38 PoolTimeout, 

39 ProtocolError, 

40 ProxyError, 

41 ReadError, 

42 ReadTimeout, 

43 RemoteProtocolError, 

44 TimeoutException, 

45 UnsupportedProtocol, 

46 WriteError, 

47 WriteTimeout, 

48) 

49from .._models import Request, Response 

50from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes 

51from .base import AsyncBaseTransport, BaseTransport 

52 

53T = typing.TypeVar("T", bound="HTTPTransport") 

54A = typing.TypeVar("A", bound="AsyncHTTPTransport") 

55 

56SOCKET_OPTION = typing.Union[ 

57 typing.Tuple[int, int, int], 

58 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

59 typing.Tuple[int, int, None, int], 

60] 

61 

62 

63@contextlib.contextmanager 

64def map_httpcore_exceptions() -> typing.Iterator[None]: 

65 try: 

66 yield 

67 except Exception as exc: # noqa: PIE-786 

68 mapped_exc = None 

69 

70 for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): 

71 if not isinstance(exc, from_exc): 

72 continue 

73 # We want to map to the most specific exception we can find. 

74 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to 

75 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. 

76 if mapped_exc is None or issubclass(to_exc, mapped_exc): 

77 mapped_exc = to_exc 

78 

79 if mapped_exc is None: # pragma: no cover 

80 raise 

81 

82 message = str(exc) 

83 raise mapped_exc(message) from exc 

84 

85 

86HTTPCORE_EXC_MAP = { 

87 httpcore.TimeoutException: TimeoutException, 

88 httpcore.ConnectTimeout: ConnectTimeout, 

89 httpcore.ReadTimeout: ReadTimeout, 

90 httpcore.WriteTimeout: WriteTimeout, 

91 httpcore.PoolTimeout: PoolTimeout, 

92 httpcore.NetworkError: NetworkError, 

93 httpcore.ConnectError: ConnectError, 

94 httpcore.ReadError: ReadError, 

95 httpcore.WriteError: WriteError, 

96 httpcore.ProxyError: ProxyError, 

97 httpcore.UnsupportedProtocol: UnsupportedProtocol, 

98 httpcore.ProtocolError: ProtocolError, 

99 httpcore.LocalProtocolError: LocalProtocolError, 

100 httpcore.RemoteProtocolError: RemoteProtocolError, 

101} 

102 

103 

104class ResponseStream(SyncByteStream): 

105 def __init__(self, httpcore_stream: typing.Iterable[bytes]): 

106 self._httpcore_stream = httpcore_stream 

107 

108 def __iter__(self) -> typing.Iterator[bytes]: 

109 with map_httpcore_exceptions(): 

110 for part in self._httpcore_stream: 

111 yield part 

112 

113 def close(self) -> None: 

114 if hasattr(self._httpcore_stream, "close"): 

115 self._httpcore_stream.close() 

116 

117 

118class HTTPTransport(BaseTransport): 

119 def __init__( 

120 self, 

121 verify: VerifyTypes = True, 

122 cert: typing.Optional[CertTypes] = None, 

123 http1: bool = True, 

124 http2: bool = False, 

125 limits: Limits = DEFAULT_LIMITS, 

126 trust_env: bool = True, 

127 proxy: typing.Optional[Proxy] = None, 

128 uds: typing.Optional[str] = None, 

129 local_address: typing.Optional[str] = None, 

130 retries: int = 0, 

131 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

132 ) -> None: 

133 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

134 

135 if proxy is None: 

136 self._pool = httpcore.ConnectionPool( 

137 ssl_context=ssl_context, 

138 max_connections=limits.max_connections, 

139 max_keepalive_connections=limits.max_keepalive_connections, 

140 keepalive_expiry=limits.keepalive_expiry, 

141 http1=http1, 

142 http2=http2, 

143 uds=uds, 

144 local_address=local_address, 

145 retries=retries, 

146 socket_options=socket_options, 

147 ) 

148 elif proxy.url.scheme in ("http", "https"): 

149 self._pool = httpcore.HTTPProxy( 

150 proxy_url=httpcore.URL( 

151 scheme=proxy.url.raw_scheme, 

152 host=proxy.url.raw_host, 

153 port=proxy.url.port, 

154 target=proxy.url.raw_path, 

155 ), 

156 proxy_auth=proxy.raw_auth, 

157 proxy_headers=proxy.headers.raw, 

158 ssl_context=ssl_context, 

159 max_connections=limits.max_connections, 

160 max_keepalive_connections=limits.max_keepalive_connections, 

161 keepalive_expiry=limits.keepalive_expiry, 

162 http1=http1, 

163 http2=http2, 

164 socket_options=socket_options, 

165 ) 

166 elif proxy.url.scheme == "socks5": 

167 try: 

168 import socksio # noqa 

169 except ImportError: # pragma: no cover 

170 raise ImportError( 

171 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

172 "Make sure to install httpx using `pip install httpx[socks]`." 

173 ) from None 

174 

175 self._pool = httpcore.SOCKSProxy( 

176 proxy_url=httpcore.URL( 

177 scheme=proxy.url.raw_scheme, 

178 host=proxy.url.raw_host, 

179 port=proxy.url.port, 

180 target=proxy.url.raw_path, 

181 ), 

182 proxy_auth=proxy.raw_auth, 

183 ssl_context=ssl_context, 

184 max_connections=limits.max_connections, 

185 max_keepalive_connections=limits.max_keepalive_connections, 

186 keepalive_expiry=limits.keepalive_expiry, 

187 http1=http1, 

188 http2=http2, 

189 ) 

190 else: # pragma: no cover 

191 raise ValueError( 

192 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." 

193 ) 

194 

195 def __enter__(self: T) -> T: # Use generics for subclass support. 

196 self._pool.__enter__() 

197 return self 

198 

199 def __exit__( 

200 self, 

201 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

202 exc_value: typing.Optional[BaseException] = None, 

203 traceback: typing.Optional[TracebackType] = None, 

204 ) -> None: 

205 with map_httpcore_exceptions(): 

206 self._pool.__exit__(exc_type, exc_value, traceback) 

207 

208 def handle_request( 

209 self, 

210 request: Request, 

211 ) -> Response: 

212 assert isinstance(request.stream, SyncByteStream) 

213 

214 req = httpcore.Request( 

215 method=request.method, 

216 url=httpcore.URL( 

217 scheme=request.url.raw_scheme, 

218 host=request.url.raw_host, 

219 port=request.url.port, 

220 target=request.url.raw_path, 

221 ), 

222 headers=request.headers.raw, 

223 content=request.stream, 

224 extensions=request.extensions, 

225 ) 

226 with map_httpcore_exceptions(): 

227 resp = self._pool.handle_request(req) 

228 

229 assert isinstance(resp.stream, typing.Iterable) 

230 

231 return Response( 

232 status_code=resp.status, 

233 headers=resp.headers, 

234 stream=ResponseStream(resp.stream), 

235 extensions=resp.extensions, 

236 ) 

237 

238 def close(self) -> None: 

239 self._pool.close() 

240 

241 

242class AsyncResponseStream(AsyncByteStream): 

243 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]): 

244 self._httpcore_stream = httpcore_stream 

245 

246 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

247 with map_httpcore_exceptions(): 

248 async for part in self._httpcore_stream: 

249 yield part 

250 

251 async def aclose(self) -> None: 

252 if hasattr(self._httpcore_stream, "aclose"): 

253 await self._httpcore_stream.aclose() 

254 

255 

256class AsyncHTTPTransport(AsyncBaseTransport): 

257 def __init__( 

258 self, 

259 verify: VerifyTypes = True, 

260 cert: typing.Optional[CertTypes] = None, 

261 http1: bool = True, 

262 http2: bool = False, 

263 limits: Limits = DEFAULT_LIMITS, 

264 trust_env: bool = True, 

265 proxy: typing.Optional[Proxy] = None, 

266 uds: typing.Optional[str] = None, 

267 local_address: typing.Optional[str] = None, 

268 retries: int = 0, 

269 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

270 ) -> None: 

271 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) 

272 

273 if proxy is None: 

274 self._pool = httpcore.AsyncConnectionPool( 

275 ssl_context=ssl_context, 

276 max_connections=limits.max_connections, 

277 max_keepalive_connections=limits.max_keepalive_connections, 

278 keepalive_expiry=limits.keepalive_expiry, 

279 http1=http1, 

280 http2=http2, 

281 uds=uds, 

282 local_address=local_address, 

283 retries=retries, 

284 socket_options=socket_options, 

285 ) 

286 elif proxy.url.scheme in ("http", "https"): 

287 self._pool = httpcore.AsyncHTTPProxy( 

288 proxy_url=httpcore.URL( 

289 scheme=proxy.url.raw_scheme, 

290 host=proxy.url.raw_host, 

291 port=proxy.url.port, 

292 target=proxy.url.raw_path, 

293 ), 

294 proxy_auth=proxy.raw_auth, 

295 proxy_headers=proxy.headers.raw, 

296 ssl_context=ssl_context, 

297 max_connections=limits.max_connections, 

298 max_keepalive_connections=limits.max_keepalive_connections, 

299 keepalive_expiry=limits.keepalive_expiry, 

300 http1=http1, 

301 http2=http2, 

302 socket_options=socket_options, 

303 ) 

304 elif proxy.url.scheme == "socks5": 

305 try: 

306 import socksio # noqa 

307 except ImportError: # pragma: no cover 

308 raise ImportError( 

309 "Using SOCKS proxy, but the 'socksio' package is not installed. " 

310 "Make sure to install httpx using `pip install httpx[socks]`." 

311 ) from None 

312 

313 self._pool = httpcore.AsyncSOCKSProxy( 

314 proxy_url=httpcore.URL( 

315 scheme=proxy.url.raw_scheme, 

316 host=proxy.url.raw_host, 

317 port=proxy.url.port, 

318 target=proxy.url.raw_path, 

319 ), 

320 proxy_auth=proxy.raw_auth, 

321 ssl_context=ssl_context, 

322 max_connections=limits.max_connections, 

323 max_keepalive_connections=limits.max_keepalive_connections, 

324 keepalive_expiry=limits.keepalive_expiry, 

325 http1=http1, 

326 http2=http2, 

327 ) 

328 else: # pragma: no cover 

329 raise ValueError( 

330 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." 

331 ) 

332 

333 async def __aenter__(self: A) -> A: # Use generics for subclass support. 

334 await self._pool.__aenter__() 

335 return self 

336 

337 async def __aexit__( 

338 self, 

339 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

340 exc_value: typing.Optional[BaseException] = None, 

341 traceback: typing.Optional[TracebackType] = None, 

342 ) -> None: 

343 with map_httpcore_exceptions(): 

344 await self._pool.__aexit__(exc_type, exc_value, traceback) 

345 

346 async def handle_async_request( 

347 self, 

348 request: Request, 

349 ) -> Response: 

350 assert isinstance(request.stream, AsyncByteStream) 

351 

352 req = httpcore.Request( 

353 method=request.method, 

354 url=httpcore.URL( 

355 scheme=request.url.raw_scheme, 

356 host=request.url.raw_host, 

357 port=request.url.port, 

358 target=request.url.raw_path, 

359 ), 

360 headers=request.headers.raw, 

361 content=request.stream, 

362 extensions=request.extensions, 

363 ) 

364 with map_httpcore_exceptions(): 

365 resp = await self._pool.handle_async_request(req) 

366 

367 assert isinstance(resp.stream, typing.AsyncIterable) 

368 

369 return Response( 

370 status_code=resp.status, 

371 headers=resp.headers, 

372 stream=AsyncResponseStream(resp.stream), 

373 extensions=resp.extensions, 

374 ) 

375 

376 async def aclose(self) -> None: 

377 await self._pool.aclose()