Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/socks_proxy.py: 2%
120 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import ssl
2import typing
4from socksio import socks5
6from .._exceptions import ConnectionNotAvailable, ProxyError
7from .._models import URL, Origin, Request, Response, enforce_bytes, enforce_url
8from .._ssl import default_ssl_context
9from .._synchronization import AsyncLock
10from .._trace import Trace
11from ..backends.auto import AutoBackend
12from ..backends.base import AsyncNetworkBackend, AsyncNetworkStream
13from .connection_pool import AsyncConnectionPool
14from .http11 import AsyncHTTP11Connection
15from .interfaces import AsyncConnectionInterface
17AUTH_METHODS = {
18 b"\x00": "NO AUTHENTICATION REQUIRED",
19 b"\x01": "GSSAPI",
20 b"\x02": "USERNAME/PASSWORD",
21 b"\xff": "NO ACCEPTABLE METHODS",
22}
24REPLY_CODES = {
25 b"\x00": "Succeeded",
26 b"\x01": "General SOCKS server failure",
27 b"\x02": "Connection not allowed by ruleset",
28 b"\x03": "Network unreachable",
29 b"\x04": "Host unreachable",
30 b"\x05": "Connection refused",
31 b"\x06": "TTL expired",
32 b"\x07": "Command not supported",
33 b"\x08": "Address type not supported",
34}
37async def _init_socks5_connection(
38 stream: AsyncNetworkStream,
39 *,
40 host: bytes,
41 port: int,
42 auth: typing.Optional[typing.Tuple[bytes, bytes]] = None,
43) -> None:
44 conn = socks5.SOCKS5Connection()
46 # Auth method request
47 auth_method = (
48 socks5.SOCKS5AuthMethod.NO_AUTH_REQUIRED
49 if auth is None
50 else socks5.SOCKS5AuthMethod.USERNAME_PASSWORD
51 )
52 conn.send(socks5.SOCKS5AuthMethodsRequest([auth_method]))
53 outgoing_bytes = conn.data_to_send()
54 await stream.write(outgoing_bytes)
56 # Auth method response
57 incoming_bytes = await stream.read(max_bytes=4096)
58 response = conn.receive_data(incoming_bytes)
59 assert isinstance(response, socks5.SOCKS5AuthReply)
60 if response.method != auth_method:
61 requested = AUTH_METHODS.get(auth_method, "UNKNOWN")
62 responded = AUTH_METHODS.get(response.method, "UNKNOWN")
63 raise ProxyError(
64 f"Requested {requested} from proxy server, but got {responded}."
65 )
67 if response.method == socks5.SOCKS5AuthMethod.USERNAME_PASSWORD:
68 # Username/password request
69 assert auth is not None
70 username, password = auth
71 conn.send(socks5.SOCKS5UsernamePasswordRequest(username, password))
72 outgoing_bytes = conn.data_to_send()
73 await stream.write(outgoing_bytes)
75 # Username/password response
76 incoming_bytes = await stream.read(max_bytes=4096)
77 response = conn.receive_data(incoming_bytes)
78 assert isinstance(response, socks5.SOCKS5UsernamePasswordReply)
79 if not response.success:
80 raise ProxyError("Invalid username/password")
82 # Connect request
83 conn.send(
84 socks5.SOCKS5CommandRequest.from_address(
85 socks5.SOCKS5Command.CONNECT, (host, port)
86 )
87 )
88 outgoing_bytes = conn.data_to_send()
89 await stream.write(outgoing_bytes)
91 # Connect response
92 incoming_bytes = await stream.read(max_bytes=4096)
93 response = conn.receive_data(incoming_bytes)
94 assert isinstance(response, socks5.SOCKS5Reply)
95 if response.reply_code != socks5.SOCKS5ReplyCode.SUCCEEDED:
96 reply_code = REPLY_CODES.get(response.reply_code, "UNKOWN")
97 raise ProxyError(f"Proxy Server could not connect: {reply_code}.")
100class AsyncSOCKSProxy(AsyncConnectionPool):
101 """
102 A connection pool that sends requests via an HTTP proxy.
103 """
105 def __init__(
106 self,
107 proxy_url: typing.Union[URL, bytes, str],
108 proxy_auth: typing.Optional[
109 typing.Tuple[typing.Union[bytes, str], typing.Union[bytes, str]]
110 ] = None,
111 ssl_context: typing.Optional[ssl.SSLContext] = None,
112 max_connections: typing.Optional[int] = 10,
113 max_keepalive_connections: typing.Optional[int] = None,
114 keepalive_expiry: typing.Optional[float] = None,
115 http1: bool = True,
116 http2: bool = False,
117 network_backend: typing.Optional[AsyncNetworkBackend] = None,
118 ) -> None:
119 """
120 A connection pool for making HTTP requests.
122 Parameters:
123 proxy_url: The URL to use when connecting to the proxy server.
124 For example `"http://127.0.0.1:8080/"`.
125 ssl_context: An SSL context to use for verifying connections.
126 If not specified, the default `httpcore.default_ssl_context()`
127 will be used.
128 max_connections: The maximum number of concurrent HTTP connections that
129 the pool should allow. Any attempt to send a request on a pool that
130 would exceed this amount will block until a connection is available.
131 max_keepalive_connections: The maximum number of idle HTTP connections
132 that will be maintained in the pool.
133 keepalive_expiry: The duration in seconds that an idle HTTP connection
134 may be maintained for before being expired from the pool.
135 http1: A boolean indicating if HTTP/1.1 requests should be supported
136 by the connection pool. Defaults to True.
137 http2: A boolean indicating if HTTP/2 requests should be supported by
138 the connection pool. Defaults to False.
139 retries: The maximum number of retries when trying to establish
140 a connection.
141 local_address: Local address to connect from. Can also be used to
142 connect using a particular address family. Using
143 `local_address="0.0.0.0"` will connect using an `AF_INET` address
144 (IPv4), while using `local_address="::"` will connect using an
145 `AF_INET6` address (IPv6).
146 uds: Path to a Unix Domain Socket to use instead of TCP sockets.
147 network_backend: A backend instance to use for handling network I/O.
148 """
149 super().__init__(
150 ssl_context=ssl_context,
151 max_connections=max_connections,
152 max_keepalive_connections=max_keepalive_connections,
153 keepalive_expiry=keepalive_expiry,
154 http1=http1,
155 http2=http2,
156 network_backend=network_backend,
157 )
158 self._ssl_context = ssl_context
159 self._proxy_url = enforce_url(proxy_url, name="proxy_url")
160 if proxy_auth is not None:
161 username, password = proxy_auth
162 username_bytes = enforce_bytes(username, name="proxy_auth")
163 password_bytes = enforce_bytes(password, name="proxy_auth")
164 self._proxy_auth: typing.Optional[typing.Tuple[bytes, bytes]] = (
165 username_bytes,
166 password_bytes,
167 )
168 else:
169 self._proxy_auth = None
171 def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
172 return AsyncSocks5Connection(
173 proxy_origin=self._proxy_url.origin,
174 remote_origin=origin,
175 proxy_auth=self._proxy_auth,
176 ssl_context=self._ssl_context,
177 keepalive_expiry=self._keepalive_expiry,
178 http1=self._http1,
179 http2=self._http2,
180 network_backend=self._network_backend,
181 )
184class AsyncSocks5Connection(AsyncConnectionInterface):
185 def __init__(
186 self,
187 proxy_origin: Origin,
188 remote_origin: Origin,
189 proxy_auth: typing.Optional[typing.Tuple[bytes, bytes]] = None,
190 ssl_context: typing.Optional[ssl.SSLContext] = None,
191 keepalive_expiry: typing.Optional[float] = None,
192 http1: bool = True,
193 http2: bool = False,
194 network_backend: typing.Optional[AsyncNetworkBackend] = None,
195 ) -> None:
196 self._proxy_origin = proxy_origin
197 self._remote_origin = remote_origin
198 self._proxy_auth = proxy_auth
199 self._ssl_context = ssl_context
200 self._keepalive_expiry = keepalive_expiry
201 self._http1 = http1
202 self._http2 = http2
204 self._network_backend: AsyncNetworkBackend = (
205 AutoBackend() if network_backend is None else network_backend
206 )
207 self._connect_lock = AsyncLock()
208 self._connection: typing.Optional[AsyncConnectionInterface] = None
209 self._connect_failed = False
211 async def handle_async_request(self, request: Request) -> Response:
212 timeouts = request.extensions.get("timeout", {})
213 timeout = timeouts.get("connect", None)
215 async with self._connect_lock:
216 if self._connection is None:
217 try:
218 # Connect to the proxy
219 kwargs = {
220 "host": self._proxy_origin.host.decode("ascii"),
221 "port": self._proxy_origin.port,
222 "timeout": timeout,
223 }
224 with Trace("connection.connect_tcp", request, kwargs) as trace:
225 stream = await self._network_backend.connect_tcp(**kwargs)
226 trace.return_value = stream
228 # Connect to the remote host using socks5
229 kwargs = {
230 "stream": stream,
231 "host": self._remote_origin.host.decode("ascii"),
232 "port": self._remote_origin.port,
233 "auth": self._proxy_auth,
234 }
235 with Trace(
236 "connection.setup_socks5_connection", request, kwargs
237 ) as trace:
238 await _init_socks5_connection(**kwargs)
239 trace.return_value = stream
241 # Upgrade the stream to SSL
242 if self._remote_origin.scheme == b"https":
243 ssl_context = (
244 default_ssl_context()
245 if self._ssl_context is None
246 else self._ssl_context
247 )
248 alpn_protocols = (
249 ["http/1.1", "h2"] if self._http2 else ["http/1.1"]
250 )
251 ssl_context.set_alpn_protocols(alpn_protocols)
253 kwargs = {
254 "ssl_context": ssl_context,
255 "server_hostname": self._remote_origin.host.decode("ascii"),
256 "timeout": timeout,
257 }
258 async with Trace(
259 "connection.start_tls", request, kwargs
260 ) as trace:
261 stream = await stream.start_tls(**kwargs)
262 trace.return_value = stream
264 # Determine if we should be using HTTP/1.1 or HTTP/2
265 ssl_object = stream.get_extra_info("ssl_object")
266 http2_negotiated = (
267 ssl_object is not None
268 and ssl_object.selected_alpn_protocol() == "h2"
269 )
271 # Create the HTTP/1.1 or HTTP/2 connection
272 if http2_negotiated or (
273 self._http2 and not self._http1
274 ): # pragma: nocover
275 from .http2 import AsyncHTTP2Connection
277 self._connection = AsyncHTTP2Connection(
278 origin=self._remote_origin,
279 stream=stream,
280 keepalive_expiry=self._keepalive_expiry,
281 )
282 else:
283 self._connection = AsyncHTTP11Connection(
284 origin=self._remote_origin,
285 stream=stream,
286 keepalive_expiry=self._keepalive_expiry,
287 )
288 except Exception as exc:
289 self._connect_failed = True
290 raise exc
291 elif not self._connection.is_available(): # pragma: nocover
292 raise ConnectionNotAvailable()
294 return await self._connection.handle_async_request(request)
296 def can_handle_request(self, origin: Origin) -> bool:
297 return origin == self._remote_origin
299 async def aclose(self) -> None:
300 if self._connection is not None:
301 await self._connection.aclose()
303 def is_available(self) -> bool:
304 if self._connection is None: # pragma: nocover
305 # If HTTP/2 support is enabled, and the resulting connection could
306 # end up as HTTP/2 then we should indicate the connection as being
307 # available to service multiple requests.
308 return (
309 self._http2
310 and (self._remote_origin.scheme == b"https" or not self._http1)
311 and not self._connect_failed
312 )
313 return self._connection.is_available()
315 def has_expired(self) -> bool:
316 if self._connection is None: # pragma: nocover
317 return self._connect_failed
318 return self._connection.has_expired()
320 def is_idle(self) -> bool:
321 if self._connection is None: # pragma: nocover
322 return self._connect_failed
323 return self._connection.is_idle()
325 def is_closed(self) -> bool:
326 if self._connection is None: # pragma: nocover
327 return self._connect_failed
328 return self._connection.is_closed()
330 def info(self) -> str:
331 if self._connection is None: # pragma: nocover
332 return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
333 return self._connection.info()
335 def __repr__(self) -> str:
336 return f"<{self.__class__.__name__} [{self.info()}]>"