Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_http.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2_http.py
3websocket - WebSocket client library for Python
5Copyright 2025 engn33r
7Licensed under the Apache License, Version 2.0 (the "License");
8you may not use this file except in compliance with the License.
9You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13Unless required by applicable law or agreed to in writing, software
14distributed under the License is distributed on an "AS IS" BASIS,
15WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16See the License for the specific language governing permissions and
17limitations under the License.
18"""
20import errno
21import os
22import socket
23from base64 import encodebytes as base64encode
24from typing import Any, Tuple
26from ._exceptions import (
27 WebSocketAddressException,
28 WebSocketException,
29 WebSocketProxyException,
30)
31from ._logging import debug, dump, trace
32from ._socket import DEFAULT_SOCKET_OPTION, recv_line, send
33from ._ssl_compat import HAVE_SSL, ssl
34from ._url import get_proxy_info, parse_url
36__all__ = ["proxy_info", "connect", "read_headers"]
38# Import python_socks if available, otherwise define fallback classes
39try:
40 from python_socks._errors import ProxyConnectionError, ProxyError, ProxyTimeoutError
41 from python_socks._types import ProxyType
42 from python_socks.sync import Proxy
44 HAVE_PYTHON_SOCKS = True
45except ImportError:
46 HAVE_PYTHON_SOCKS = False
48 class ProxyError(Exception): # type: ignore[no-redef]
49 pass
51 class ProxyTimeoutError(Exception): # type: ignore[no-redef]
52 pass
54 class ProxyConnectionError(Exception): # type: ignore[no-redef]
55 pass
57 class ProxyType: # type: ignore[no-redef]
58 pass
61class proxy_info:
62 def __init__(self, **options):
63 self.proxy_host = options.get("http_proxy_host", None)
64 if self.proxy_host:
65 self.proxy_port = options.get("http_proxy_port", 0)
66 self.auth = options.get("http_proxy_auth", None)
67 self.no_proxy = options.get("http_no_proxy", None)
68 self.proxy_protocol = options.get("proxy_type", "http")
69 # Note: If timeout not specified, default python-socks timeout is 60 seconds
70 self.proxy_timeout = options.get("http_proxy_timeout", None)
71 if self.proxy_protocol not in [
72 "http",
73 "socks4",
74 "socks4a",
75 "socks5",
76 "socks5h",
77 ]:
78 raise ProxyError(
79 "Only http, socks4, socks5 proxy protocols are supported"
80 )
81 else:
82 self.proxy_port = 0
83 self.auth = None
84 self.no_proxy = None
85 self.proxy_protocol = "http"
88def _start_proxied_socket(
89 url: str, options: Any, proxy: Any
90) -> Tuple[socket.socket, Tuple[str, int, str]]:
91 if not HAVE_PYTHON_SOCKS:
92 raise WebSocketException(
93 "Python Socks is needed for SOCKS proxying but is not available"
94 )
96 hostname, port, resource, is_secure = parse_url(url)
98 if proxy.proxy_protocol == "socks4":
99 rdns = False
100 proxy_type = ProxyType.SOCKS4
101 # socks4a sends DNS through proxy
102 elif proxy.proxy_protocol == "socks4a":
103 rdns = True
104 proxy_type = ProxyType.SOCKS4
105 elif proxy.proxy_protocol == "socks5":
106 rdns = False
107 proxy_type = ProxyType.SOCKS5
108 # socks5h sends DNS through proxy
109 elif proxy.proxy_protocol == "socks5h":
110 rdns = True
111 proxy_type = ProxyType.SOCKS5
113 ws_proxy = Proxy.create(
114 proxy_type=proxy_type,
115 host=proxy.proxy_host,
116 port=int(proxy.proxy_port),
117 username=proxy.auth[0] if proxy.auth else None,
118 password=proxy.auth[1] if proxy.auth else None,
119 rdns=rdns,
120 )
122 sock = ws_proxy.connect(hostname, port, timeout=proxy.proxy_timeout)
124 if is_secure:
125 if HAVE_SSL:
126 sock = _ssl_socket(sock, options.sslopt, hostname)
127 else:
128 raise WebSocketException("SSL not available.")
130 return sock, (hostname, port, resource)
133def connect(
134 url: str, options: Any, proxy: Any, socket: Any
135) -> Tuple[socket.socket, Tuple[str, int, str]]:
136 # Use _start_proxied_socket() only for socks4 or socks5 proxy
137 # Use _tunnel() for http proxy
138 # TODO: Use python-socks for http protocol also, to standardize flow
139 if proxy.proxy_host and not socket and proxy.proxy_protocol != "http":
140 return _start_proxied_socket(url, options, proxy)
142 hostname, port_from_url, resource, is_secure = parse_url(url)
144 if socket:
145 return socket, (hostname, port_from_url, resource)
147 addrinfo_list, need_tunnel, auth = _get_addrinfo_list(
148 hostname, port_from_url, is_secure, proxy
149 )
150 if not addrinfo_list:
151 raise WebSocketException(f"Host not found.: {hostname}:{port_from_url}")
153 sock = None
154 try:
155 sock = _open_socket(addrinfo_list, options.sockopt, options.timeout)
156 if need_tunnel:
157 sock = _tunnel(sock, hostname, port_from_url, auth)
159 if is_secure:
160 if HAVE_SSL:
161 sock = _ssl_socket(sock, options.sslopt, hostname)
162 else:
163 raise WebSocketException("SSL not available.")
165 return sock, (hostname, port_from_url, resource)
166 except:
167 if sock:
168 sock.close()
169 raise
172def _get_addrinfo_list(
173 hostname: str, port: int, is_secure: bool, proxy: Any
174) -> Tuple[list, bool, Any]:
175 try:
176 phost, pport, pauth = get_proxy_info(
177 hostname,
178 is_secure,
179 proxy.proxy_host,
180 proxy.proxy_port,
181 proxy.auth,
182 proxy.no_proxy,
183 )
184 except TypeError as e:
185 raise WebSocketAddressException(e)
186 try:
187 # when running on windows 10, getaddrinfo without socktype returns a socktype 0.
188 # This generates an error exception: `_on_error: exception Socket type must be stream or datagram, not 0`
189 # or `OSError: [Errno 22] Invalid argument` when creating socket. Force the socket type to SOCK_STREAM.
190 if not phost:
191 addrinfo_list = socket.getaddrinfo(
192 hostname, port, 0, socket.SOCK_STREAM, socket.SOL_TCP
193 )
194 return addrinfo_list, False, None
195 else:
196 pport = pport and pport or 80
197 # when running on windows 10, the getaddrinfo used above
198 # returns a socktype 0. This generates an error exception:
199 # _on_error: exception Socket type must be stream or datagram, not 0
200 # Force the socket type to SOCK_STREAM
201 addrinfo_list = socket.getaddrinfo(
202 phost, pport, 0, socket.SOCK_STREAM, socket.SOL_TCP
203 )
204 return addrinfo_list, True, pauth
205 except (socket.gaierror, TypeError) as e:
206 raise WebSocketAddressException(e)
209def _open_socket(addrinfo_list, sockopt, timeout):
210 err = None
211 for addrinfo in addrinfo_list:
212 family, socktype, proto = addrinfo[:3]
213 sock = socket.socket(family, socktype, proto)
214 sock.settimeout(timeout)
215 for opts in DEFAULT_SOCKET_OPTION:
216 sock.setsockopt(*opts)
217 for opts in sockopt:
218 sock.setsockopt(*opts)
220 address = addrinfo[4]
221 err = None
222 while not err:
223 try:
224 sock.connect(address)
225 except socket.error as error:
226 sock.close()
227 error.remote_ip = str(address[0]) # type: ignore[attr-defined]
228 eConnRefused = (
229 errno.ECONNREFUSED,
230 getattr(errno, "WSAECONNREFUSED", errno.ECONNREFUSED),
231 errno.ENETUNREACH,
232 )
233 if error.errno not in eConnRefused:
234 raise error
235 err = error
236 continue
237 else:
238 break
239 else:
240 continue
241 break
242 else:
243 if err:
244 raise err
246 return sock
249def _wrap_sni_socket(
250 sock: socket.socket, sslopt: dict, hostname: str, check_hostname: bool
251) -> Any:
252 context = sslopt.get("context", None)
253 if not context:
254 context = ssl.SSLContext(sslopt.get("ssl_version", ssl.PROTOCOL_TLS_CLIENT))
255 # Non default context need to manually enable SSLKEYLOGFILE support by setting the keylog_filename attribute.
256 # For more details see also:
257 # * https://docs.python.org/3.8/library/ssl.html?highlight=sslkeylogfile#context-creation
258 # * https://docs.python.org/3.8/library/ssl.html?highlight=sslkeylogfile#ssl.SSLContext.keylog_filename
259 keylog_file = os.environ.get("SSLKEYLOGFILE")
260 if keylog_file is not None:
261 context.keylog_filename = keylog_file
263 if sslopt.get("cert_reqs", ssl.CERT_NONE) != ssl.CERT_NONE:
264 cafile = sslopt.get("ca_certs", None)
265 capath = sslopt.get("ca_cert_path", None)
266 if cafile or capath:
267 try:
268 context.load_verify_locations(cafile=cafile, capath=capath)
269 except (FileNotFoundError, ssl.SSLError, ValueError) as e:
270 raise WebSocketException(f"SSL CA certificate loading failed: {e}")
271 elif hasattr(context, "load_default_certs"):
272 try:
273 context.load_default_certs(ssl.Purpose.SERVER_AUTH)
274 except ssl.SSLError as e:
275 raise WebSocketException(
276 f"SSL default certificate loading failed: {e}"
277 )
278 if sslopt.get("certfile", None):
279 try:
280 context.load_cert_chain(
281 sslopt["certfile"],
282 sslopt.get("keyfile", None),
283 sslopt.get("password", None),
284 )
285 except (FileNotFoundError, ValueError) as e:
286 raise WebSocketException(f"SSL client certificate loading failed: {e}")
287 except ssl.SSLError as e:
288 raise WebSocketException(f"SSL client certificate loading failed: {e}")
290 # Python 3.10 switch to PROTOCOL_TLS_CLIENT defaults to "cert_reqs = ssl.CERT_REQUIRED" and "check_hostname = True"
291 # If both disabled, set check_hostname before verify_mode
292 # see https://github.com/liris/websocket-client/commit/b96a2e8fa765753e82eea531adb19716b52ca3ca#commitcomment-10803153
293 if sslopt.get("cert_reqs", ssl.CERT_NONE) == ssl.CERT_NONE and not sslopt.get(
294 "check_hostname", False
295 ):
296 context.check_hostname = False
297 context.verify_mode = ssl.CERT_NONE
298 else:
299 context.check_hostname = sslopt.get("check_hostname", True)
300 context.verify_mode = sslopt.get("cert_reqs", ssl.CERT_REQUIRED)
302 if "ciphers" in sslopt:
303 try:
304 context.set_ciphers(sslopt["ciphers"])
305 except ssl.SSLError as e:
306 raise WebSocketException(f"SSL cipher configuration failed: {e}")
307 if "cert_chain" in sslopt:
308 try:
309 cert_chain = sslopt["cert_chain"]
310 if not isinstance(cert_chain, (tuple, list)) or len(cert_chain) != 3:
311 raise ValueError(
312 "cert_chain must be a tuple/list of (certfile, keyfile, password)"
313 )
314 certfile, keyfile, password = cert_chain
315 context.load_cert_chain(certfile, keyfile, password)
316 except ValueError:
317 raise
318 except (FileNotFoundError, ssl.SSLError) as e:
319 raise WebSocketException(
320 f"SSL client certificate configuration failed: {e}"
321 )
322 if "ecdh_curve" in sslopt:
323 try:
324 context.set_ecdh_curve(sslopt["ecdh_curve"])
325 except ValueError as e:
326 raise WebSocketException(f"SSL ECDH curve configuration failed: {e}")
328 return context.wrap_socket(
329 sock,
330 do_handshake_on_connect=sslopt.get("do_handshake_on_connect", True),
331 suppress_ragged_eofs=sslopt.get("suppress_ragged_eofs", True),
332 server_hostname=hostname,
333 )
336def _ssl_socket(sock: socket.socket, user_sslopt: dict, hostname: str) -> Any:
337 sslopt: dict = {"cert_reqs": ssl.CERT_REQUIRED}
338 sslopt.update(user_sslopt)
340 cert_path = os.environ.get("WEBSOCKET_CLIENT_CA_BUNDLE")
341 if (
342 cert_path
343 and os.path.isfile(cert_path)
344 and user_sslopt.get("ca_certs", None) is None
345 ):
346 sslopt["ca_certs"] = cert_path
347 elif (
348 cert_path
349 and os.path.isdir(cert_path)
350 and user_sslopt.get("ca_cert_path", None) is None
351 ):
352 sslopt["ca_cert_path"] = cert_path
354 if sslopt.get("server_hostname", None):
355 hostname = sslopt["server_hostname"]
357 check_hostname = sslopt.get("check_hostname", True)
358 sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname)
360 return sock
363def _tunnel(sock: socket.socket, host: str, port: int, auth: Any) -> socket.socket:
364 debug("Connecting proxy...")
365 connect_header = f"CONNECT {host}:{port} HTTP/1.1\r\n"
366 connect_header += f"Host: {host}:{port}\r\n"
368 # TODO: support digest auth.
369 if auth and auth[0]:
370 auth_str = auth[0]
371 if auth[1]:
372 auth_str += f":{auth[1]}"
373 encoded_str = base64encode(auth_str.encode()).strip().decode().replace("\n", "")
374 connect_header += f"Proxy-Authorization: Basic {encoded_str}\r\n"
375 connect_header += "\r\n"
376 dump("request header", connect_header)
378 send(sock, connect_header)
380 try:
381 status, _, _ = read_headers(sock)
382 except (socket.error, WebSocketException) as e:
383 raise WebSocketProxyException(str(e))
385 if status != 200:
386 raise WebSocketProxyException(f"failed CONNECT via proxy status: {status}")
388 return sock
391def read_headers(sock: socket.socket) -> tuple:
392 status = None
393 status_message = None
394 headers: dict = {}
395 trace("--- response header ---")
397 while True:
398 line = recv_line(sock)
399 line = line.decode("utf-8").strip()
400 if not line:
401 break
402 trace(line)
403 if not status:
404 status_info = line.split(" ", 2)
405 status = int(status_info[1])
406 if len(status_info) > 2:
407 status_message = status_info[2]
408 else:
409 kv = line.split(":", 1)
410 if len(kv) != 2:
411 raise WebSocketException("Invalid header")
412 key, value = kv
413 if key.lower() == "set-cookie" and headers.get("set-cookie"):
414 existing_cookie = headers.get("set-cookie")
415 if existing_cookie is not None:
416 headers["set-cookie"] = existing_cookie + "; " + value.strip()
417 else:
418 headers["set-cookie"] = value.strip()
419 else:
420 headers[key.lower()] = value.strip()
422 trace("-----------------------")
424 return status, headers, status_message