Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/socks_proxy.py: 2%

120 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import ssl 

2import typing 

3 

4from socksio import socks5 

5 

6from .._exceptions import ConnectionNotAvailable, ProxyError 

7from .._models import URL, Origin, Request, Response, enforce_bytes, enforce_url 

8from .._ssl import default_ssl_context 

9from .._synchronization import AsyncLock 

10from .._trace import Trace 

11from ..backends.auto import AutoBackend 

12from ..backends.base import AsyncNetworkBackend, AsyncNetworkStream 

13from .connection_pool import AsyncConnectionPool 

14from .http11 import AsyncHTTP11Connection 

15from .interfaces import AsyncConnectionInterface 

16 

17AUTH_METHODS = { 

18 b"\x00": "NO AUTHENTICATION REQUIRED", 

19 b"\x01": "GSSAPI", 

20 b"\x02": "USERNAME/PASSWORD", 

21 b"\xff": "NO ACCEPTABLE METHODS", 

22} 

23 

24REPLY_CODES = { 

25 b"\x00": "Succeeded", 

26 b"\x01": "General SOCKS server failure", 

27 b"\x02": "Connection not allowed by ruleset", 

28 b"\x03": "Network unreachable", 

29 b"\x04": "Host unreachable", 

30 b"\x05": "Connection refused", 

31 b"\x06": "TTL expired", 

32 b"\x07": "Command not supported", 

33 b"\x08": "Address type not supported", 

34} 

35 

36 

37async def _init_socks5_connection( 

38 stream: AsyncNetworkStream, 

39 *, 

40 host: bytes, 

41 port: int, 

42 auth: typing.Optional[typing.Tuple[bytes, bytes]] = None, 

43) -> None: 

44 conn = socks5.SOCKS5Connection() 

45 

46 # Auth method request 

47 auth_method = ( 

48 socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED 

49 if auth is None 

50 else socks5.SOCKS5AuthMethod.USERNAME_PASSWORD 

51 ) 

52 conn.send(socks5.SOCKS5AuthMethodsRequest([auth_method])) 

53 outgoing_bytes = conn.data_to_send() 

54 await stream.write(outgoing_bytes) 

55 

56 # Auth method response 

57 incoming_bytes = await stream.read(max_bytes=4096) 

58 response = conn.receive_data(incoming_bytes) 

59 assert isinstance(response, socks5.SOCKS5AuthReply) 

60 if response.method != auth_method: 

61 requested = AUTH_METHODS.get(auth_method, "UNKNOWN") 

62 responded = AUTH_METHODS.get(response.method, "UNKNOWN") 

63 raise ProxyError( 

64 f"Requested {requested} from proxy server, but got {responded}." 

65 ) 

66 

67 if response.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD: 

68 # Username/password request 

69 assert auth is not None 

70 username, password = auth 

71 conn.send(socks5.SOCKS5UsernamePasswordRequest(username, password)) 

72 outgoing_bytes = conn.data_to_send() 

73 await stream.write(outgoing_bytes) 

74 

75 # Username/password response 

76 incoming_bytes = await stream.read(max_bytes=4096) 

77 response = conn.receive_data(incoming_bytes) 

78 assert isinstance(response, socks5.SOCKS5UsernamePasswordReply) 

79 if not response.success: 

80 raise ProxyError("Invalid username/password") 

81 

82 # Connect request 

83 conn.send( 

84 socks5.SOCKS5CommandRequest.from_address( 

85 socks5.SOCKS5Command.CONNECT, (host, port) 

86 ) 

87 ) 

88 outgoing_bytes = conn.data_to_send() 

89 await stream.write(outgoing_bytes) 

90 

91 # Connect response 

92 incoming_bytes = await stream.read(max_bytes=4096) 

93 response = conn.receive_data(incoming_bytes) 

94 assert isinstance(response, socks5.SOCKS5Reply) 

95 if response.reply_code != socks5.SOCKS5ReplyCode.SUCCEEDED: 

96 reply_code = REPLY_CODES.get(response.reply_code, "UNKOWN") 

97 raise ProxyError(f"Proxy Server could not connect: {reply_code}.") 

98 

99 

100class AsyncSOCKSProxy(AsyncConnectionPool): 

101 """ 

102 A connection pool that sends requests via an HTTP proxy. 

103 """ 

104 

105 def __init__( 

106 self, 

107 proxy_url: typing.Union[URL, bytes, str], 

108 proxy_auth: typing.Optional[ 

109 typing.Tuple[typing.Union[bytes, str], typing.Union[bytes, str]] 

110 ] = None, 

111 ssl_context: typing.Optional[ssl.SSLContext] = None, 

112 max_connections: typing.Optional[int] = 10, 

113 max_keepalive_connections: typing.Optional[int] = None, 

114 keepalive_expiry: typing.Optional[float] = None, 

115 http1: bool = True, 

116 http2: bool = False, 

117 network_backend: typing.Optional[AsyncNetworkBackend] = None, 

118 ) -> None: 

119 """ 

120 A connection pool for making HTTP requests. 

121 

122 Parameters: 

123 proxy_url: The URL to use when connecting to the proxy server. 

124 For example `"http://127.0.0.1:8080/"`. 

125 ssl_context: An SSL context to use for verifying connections. 

126 If not specified, the default `httpcore.default_ssl_context()` 

127 will be used. 

128 max_connections: The maximum number of concurrent HTTP connections that 

129 the pool should allow. Any attempt to send a request on a pool that 

130 would exceed this amount will block until a connection is available. 

131 max_keepalive_connections: The maximum number of idle HTTP connections 

132 that will be maintained in the pool. 

133 keepalive_expiry: The duration in seconds that an idle HTTP connection 

134 may be maintained for before being expired from the pool. 

135 http1: A boolean indicating if HTTP/1.1 requests should be supported 

136 by the connection pool. Defaults to True. 

137 http2: A boolean indicating if HTTP/2 requests should be supported by 

138 the connection pool. Defaults to False. 

139 retries: The maximum number of retries when trying to establish 

140 a connection. 

141 local_address: Local address to connect from. Can also be used to 

142 connect using a particular address family. Using 

143 `local_address="0.0.0.0"` will connect using an `AF_INET` address 

144 (IPv4), while using `local_address="::"` will connect using an 

145 `AF_INET6` address (IPv6). 

146 uds: Path to a Unix Domain Socket to use instead of TCP sockets. 

147 network_backend: A backend instance to use for handling network I/O. 

148 """ 

149 super().__init__( 

150 ssl_context=ssl_context, 

151 max_connections=max_connections, 

152 max_keepalive_connections=max_keepalive_connections, 

153 keepalive_expiry=keepalive_expiry, 

154 http1=http1, 

155 http2=http2, 

156 network_backend=network_backend, 

157 ) 

158 self._ssl_context = ssl_context 

159 self._proxy_url = enforce_url(proxy_url, name="proxy_url") 

160 if proxy_auth is not None: 

161 username, password = proxy_auth 

162 username_bytes = enforce_bytes(username, name="proxy_auth") 

163 password_bytes = enforce_bytes(password, name="proxy_auth") 

164 self._proxy_auth: typing.Optional[typing.Tuple[bytes, bytes]] = ( 

165 username_bytes, 

166 password_bytes, 

167 ) 

168 else: 

169 self._proxy_auth = None 

170 

171 def create_connection(self, origin: Origin) -> AsyncConnectionInterface: 

172 return AsyncSocks5Connection( 

173 proxy_origin=self._proxy_url.origin, 

174 remote_origin=origin, 

175 proxy_auth=self._proxy_auth, 

176 ssl_context=self._ssl_context, 

177 keepalive_expiry=self._keepalive_expiry, 

178 http1=self._http1, 

179 http2=self._http2, 

180 network_backend=self._network_backend, 

181 ) 

182 

183 

184class AsyncSocks5Connection(AsyncConnectionInterface): 

185 def __init__( 

186 self, 

187 proxy_origin: Origin, 

188 remote_origin: Origin, 

189 proxy_auth: typing.Optional[typing.Tuple[bytes, bytes]] = None, 

190 ssl_context: typing.Optional[ssl.SSLContext] = None, 

191 keepalive_expiry: typing.Optional[float] = None, 

192 http1: bool = True, 

193 http2: bool = False, 

194 network_backend: typing.Optional[AsyncNetworkBackend] = None, 

195 ) -> None: 

196 self._proxy_origin = proxy_origin 

197 self._remote_origin = remote_origin 

198 self._proxy_auth = proxy_auth 

199 self._ssl_context = ssl_context 

200 self._keepalive_expiry = keepalive_expiry 

201 self._http1 = http1 

202 self._http2 = http2 

203 

204 self._network_backend: AsyncNetworkBackend = ( 

205 AutoBackend() if network_backend is None else network_backend 

206 ) 

207 self._connect_lock = AsyncLock() 

208 self._connection: typing.Optional[AsyncConnectionInterface] = None 

209 self._connect_failed = False 

210 

211 async def handle_async_request(self, request: Request) -> Response: 

212 timeouts = request.extensions.get("timeout", {}) 

213 timeout = timeouts.get("connect", None) 

214 

215 async with self._connect_lock: 

216 if self._connection is None: 

217 try: 

218 # Connect to the proxy 

219 kwargs = { 

220 "host": self._proxy_origin.host.decode("ascii"), 

221 "port": self._proxy_origin.port, 

222 "timeout": timeout, 

223 } 

224 with Trace("connection.connect_tcp", request, kwargs) as trace: 

225 stream = await self._network_backend.connect_tcp(**kwargs) 

226 trace.return_value = stream 

227 

228 # Connect to the remote host using socks5 

229 kwargs = { 

230 "stream": stream, 

231 "host": self._remote_origin.host.decode("ascii"), 

232 "port": self._remote_origin.port, 

233 "auth": self._proxy_auth, 

234 } 

235 with Trace( 

236 "connection.setup_socks5_connection", request, kwargs 

237 ) as trace: 

238 await _init_socks5_connection(**kwargs) 

239 trace.return_value = stream 

240 

241 # Upgrade the stream to SSL 

242 if self._remote_origin.scheme == b"https": 

243 ssl_context = ( 

244 default_ssl_context() 

245 if self._ssl_context is None 

246 else self._ssl_context 

247 ) 

248 alpn_protocols = ( 

249 ["http/1.1", "h2"] if self._http2 else ["http/1.1"] 

250 ) 

251 ssl_context.set_alpn_protocols(alpn_protocols) 

252 

253 kwargs = { 

254 "ssl_context": ssl_context, 

255 "server_hostname": self._remote_origin.host.decode("ascii"), 

256 "timeout": timeout, 

257 } 

258 async with Trace( 

259 "connection.start_tls", request, kwargs 

260 ) as trace: 

261 stream = await stream.start_tls(**kwargs) 

262 trace.return_value = stream 

263 

264 # Determine if we should be using HTTP/1.1 or HTTP/2 

265 ssl_object = stream.get_extra_info("ssl_object") 

266 http2_negotiated = ( 

267 ssl_object is not None 

268 and ssl_object.selected_alpn_protocol() == "h2" 

269 ) 

270 

271 # Create the HTTP/1.1 or HTTP/2 connection 

272 if http2_negotiated or ( 

273 self._http2 and not self._http1 

274 ): # pragma: nocover 

275 from .http2 import AsyncHTTP2Connection 

276 

277 self._connection = AsyncHTTP2Connection( 

278 origin=self._remote_origin, 

279 stream=stream, 

280 keepalive_expiry=self._keepalive_expiry, 

281 ) 

282 else: 

283 self._connection = AsyncHTTP11Connection( 

284 origin=self._remote_origin, 

285 stream=stream, 

286 keepalive_expiry=self._keepalive_expiry, 

287 ) 

288 except Exception as exc: 

289 self._connect_failed = True 

290 raise exc 

291 elif not self._connection.is_available(): # pragma: nocover 

292 raise ConnectionNotAvailable() 

293 

294 return await self._connection.handle_async_request(request) 

295 

296 def can_handle_request(self, origin: Origin) -> bool: 

297 return origin == self._remote_origin 

298 

299 async def aclose(self) -> None: 

300 if self._connection is not None: 

301 await self._connection.aclose() 

302 

303 def is_available(self) -> bool: 

304 if self._connection is None: # pragma: nocover 

305 # If HTTP/2 support is enabled, and the resulting connection could 

306 # end up as HTTP/2 then we should indicate the connection as being 

307 # available to service multiple requests. 

308 return ( 

309 self._http2 

310 and (self._remote_origin.scheme == b"https" or not self._http1) 

311 and not self._connect_failed 

312 ) 

313 return self._connection.is_available() 

314 

315 def has_expired(self) -> bool: 

316 if self._connection is None: # pragma: nocover 

317 return self._connect_failed 

318 return self._connection.has_expired() 

319 

320 def is_idle(self) -> bool: 

321 if self._connection is None: # pragma: nocover 

322 return self._connect_failed 

323 return self._connection.is_idle() 

324 

325 def is_closed(self) -> bool: 

326 if self._connection is None: # pragma: nocover 

327 return self._connect_failed 

328 return self._connection.is_closed() 

329 

330 def info(self) -> str: 

331 if self._connection is None: # pragma: nocover 

332 return "CONNECTION FAILED" if self._connect_failed else "CONNECTING" 

333 return self._connection.info() 

334 

335 def __repr__(self) -> str: 

336 return f"<{self.__class__.__name__} [{self.info()}]>"