Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import socket
2import struct
3import threading
4import time
5from typing import Any, Callable, Optional, Type, Union
7# websocket modules
8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
9from ._exceptions import (
10 WebSocketProtocolException,
11 WebSocketConnectionClosedException,
12 WebSocketTimeoutException,
13 WebSocketException,
14)
15from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake, handshake_response
16from ._http import connect, proxy_info
17from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
18from ._socket import getdefaulttimeout, recv, send, sock_opt
19from ._ssl_compat import ssl
20from ._utils import NoLock
21from ._dispatcher import DispatcherBase, WrappedDispatcher
23"""
24_core.py
25websocket - WebSocket client library for Python
27Copyright 2025 engn33r
29Licensed under the Apache License, Version 2.0 (the "License");
30you may not use this file except in compliance with the License.
31You may obtain a copy of the License at
33 http://www.apache.org/licenses/LICENSE-2.0
35Unless required by applicable law or agreed to in writing, software
36distributed under the License is distributed on an "AS IS" BASIS,
37WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
38See the License for the specific language governing permissions and
39limitations under the License.
40"""
42__all__ = ["WebSocket", "create_connection"]
45class WebSocket:
46 """
47 Low level WebSocket interface.
49 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
51 We can connect to the websocket server and send/receive data.
52 The following example is an echo client.
54 >>> import websocket
55 >>> ws = websocket.WebSocket()
56 >>> ws.connect("ws://echo.websocket.events")
57 >>> ws.recv()
58 'echo.websocket.events sponsored by Lob.com'
59 >>> ws.send("Hello, Server")
60 19
61 >>> ws.recv()
62 'Hello, Server'
63 >>> ws.close()
65 Parameters
66 ----------
67 get_mask_key: func
68 A callable function to get new mask keys, see the
69 WebSocket.set_mask_key's docstring for more information.
70 sockopt: tuple
71 Values for socket.setsockopt.
72 sockopt must be tuple and each element is argument of sock.setsockopt.
73 sslopt: dict
74 Optional dict object for ssl socket options. See FAQ for details.
75 fire_cont_frame: bool
76 Fire recv event for each cont frame. Default is False.
77 enable_multithread: bool
78 If set to True, lock send method.
79 skip_utf8_validation: bool
80 Skip utf8 validation.
81 """
83 def __init__(
84 self,
85 get_mask_key: Optional[Callable] = None,
86 sockopt: Optional[list] = None,
87 sslopt: Optional[dict] = None,
88 fire_cont_frame: bool = False,
89 enable_multithread: bool = True,
90 skip_utf8_validation: bool = False,
91 dispatcher: Optional[Union[DispatcherBase, WrappedDispatcher]] = None,
92 **_: Any,
93 ) -> None:
94 """
95 Initialize WebSocket object.
97 Parameters
98 ----------
99 sslopt: dict
100 Optional dict object for ssl socket options. See FAQ for details.
101 """
102 self.sock_opt = sock_opt(sockopt, sslopt)
103 self.handshake_response: Optional[handshake_response] = None
104 self.sock: Optional[socket.socket] = None
106 self.connected = False
107 self.get_mask_key = get_mask_key
108 # These buffer over the build-up of a single frame.
109 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
110 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
111 self.dispatcher = dispatcher
113 if enable_multithread:
114 self.lock = threading.Lock()
115 self.readlock = threading.Lock()
116 else:
117 self.lock = NoLock() # type: ignore[assignment]
118 self.readlock = NoLock() # type: ignore[assignment]
120 def __iter__(self):
121 """
122 Allow iteration over websocket, implying sequential `recv` executions.
123 """
124 while True:
125 yield self.recv()
127 def __next__(self):
128 return self.recv()
130 def next(self):
131 return self.__next__()
133 def fileno(self):
134 if self.sock is None:
135 raise WebSocketException("Connection not established")
136 return self.sock.fileno()
138 def set_mask_key(self, func):
139 """
140 Set function to create mask key. You can customize mask key generator.
141 Mainly, this is for testing purpose.
143 Parameters
144 ----------
145 func: func
146 callable object. the func takes 1 argument as integer.
147 The argument means length of mask key.
148 This func must return string(byte array),
149 which length is argument specified.
150 """
151 self.get_mask_key = func
153 def gettimeout(self) -> Optional[Union[float, int]]:
154 """
155 Get the websocket timeout (in seconds) as an int or float
157 Returns
158 ----------
159 timeout: int or float
160 returns timeout value (in seconds). This value could be either float/integer.
161 """
162 return self.sock_opt.timeout
164 def settimeout(self, timeout: Optional[Union[float, int]]) -> None:
165 """
166 Set the timeout to the websocket.
168 Parameters
169 ----------
170 timeout: int or float
171 timeout time (in seconds). This value could be either float/integer.
172 """
173 self.sock_opt.timeout = timeout
174 if self.sock:
175 self.sock.settimeout(timeout)
177 timeout = property(gettimeout, settimeout)
179 def getsubprotocol(self) -> Optional[str]:
180 """
181 Get subprotocol
182 """
183 if self.handshake_response:
184 return self.handshake_response.subprotocol
185 else:
186 return None
188 subprotocol = property(getsubprotocol)
190 def getstatus(self) -> Optional[int]:
191 """
192 Get handshake status
193 """
194 if self.handshake_response:
195 return self.handshake_response.status
196 else:
197 return None
199 status = property(getstatus)
201 def getheaders(self) -> Optional[dict]:
202 """
203 Get handshake response header
204 """
205 if self.handshake_response:
206 return self.handshake_response.headers
207 else:
208 return None
210 def is_ssl(self):
211 try:
212 return isinstance(self.sock, ssl.SSLSocket)
213 except (AttributeError, NameError):
214 return False
216 headers = property(getheaders)
218 def connect(self, url, **options):
219 """
220 Connect to url. url is websocket url scheme.
221 ie. ws://host:port/resource
222 You can customize using 'options'.
223 If you set "header" list object, you can set your own custom header.
225 >>> ws = WebSocket()
226 >>> ws.connect("ws://echo.websocket.events",
227 ... header=["User-Agent: MyProgram",
228 ... "x-custom: header"])
230 Parameters
231 ----------
232 header: list or dict
233 Custom http header list or dict.
234 cookie: str
235 Cookie value.
236 origin: str
237 Custom origin url.
238 connection: str
239 Custom connection header value.
240 Default value "Upgrade" set in _handshake.py
241 suppress_origin: bool
242 Suppress outputting origin header.
243 suppress_host: bool
244 Suppress outputting host header.
245 host: str
246 Custom host header string.
247 timeout: int or float
248 Socket timeout time. This value is an integer or float.
249 If you set None for this value, it means "use default_timeout value"
250 http_proxy_host: str
251 HTTP proxy host name.
252 http_proxy_port: str or int
253 HTTP proxy port. Default is 80.
254 http_no_proxy: list
255 Whitelisted host names that don't use the proxy.
256 http_proxy_auth: tuple
257 HTTP proxy auth information. Tuple of username and password. Default is None.
258 http_proxy_timeout: int or float
259 HTTP proxy timeout, default is 60 sec as per python-socks.
260 redirect_limit: int
261 Number of redirects to follow.
262 subprotocols: list
263 List of available subprotocols. Default is None.
264 socket: socket
265 Pre-initialized stream socket.
266 """
267 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
268 self.sock, addrs = connect(
269 url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
270 )
272 try:
273 self.handshake_response = handshake(self.sock, url, *addrs, **options)
274 for _ in range(options.pop("redirect_limit", 3)):
275 if (
276 self.handshake_response is not None
277 and self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES
278 ):
279 url = self.handshake_response.headers["location"]
280 self.sock.close()
281 self.sock, addrs = connect(
282 url,
283 self.sock_opt,
284 proxy_info(**options),
285 options.pop("socket", None),
286 )
287 self.handshake_response = handshake(
288 self.sock, url, *addrs, **options
289 )
290 self.connected = True
291 except:
292 if self.sock:
293 self.sock.close()
294 self.sock = None
295 raise
297 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
298 """
299 Send the data as string.
301 Parameters
302 ----------
303 payload: str
304 Payload must be utf-8 string or unicode,
305 If the opcode is OPCODE_TEXT.
306 Otherwise, it must be string(byte array).
307 opcode: int
308 Operation code (opcode) to send.
309 """
311 frame = ABNF.create_frame(payload, opcode)
312 return self.send_frame(frame)
314 def send_text(self, text_data: str) -> int:
315 """
316 Sends UTF-8 encoded text.
317 """
318 return self.send(text_data, ABNF.OPCODE_TEXT)
320 def send_bytes(self, data: Union[bytes, bytearray]) -> int:
321 """
322 Sends a sequence of bytes.
323 """
324 return self.send(data, ABNF.OPCODE_BINARY)
326 def send_frame(self, frame: ABNF) -> int:
327 """
328 Send the data frame.
330 >>> ws = create_connection("ws://echo.websocket.events")
331 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
332 >>> ws.send_frame(frame)
333 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
334 >>> ws.send_frame(frame)
335 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
336 >>> ws.send_frame(frame)
338 Parameters
339 ----------
340 frame: ABNF frame
341 frame data created by ABNF.create_frame
342 """
343 if self.get_mask_key:
344 frame.get_mask_key = self.get_mask_key
345 data = frame.format()
346 length = len(data)
347 if isEnabledForTrace():
348 trace(f"++Sent raw: {repr(data)}")
349 trace(f"++Sent decoded: {frame.__str__()}")
350 with self.lock:
351 while data:
352 bytes_sent = self._send(data)
353 data = data[bytes_sent:]
355 return length
357 def send_binary(self, payload: bytes) -> int:
358 """
359 Send a binary message (OPCODE_BINARY).
361 Parameters
362 ----------
363 payload: bytes
364 payload of message to send.
365 """
366 return self.send(payload, ABNF.OPCODE_BINARY)
368 def ping(self, payload: Union[str, bytes] = "") -> None:
369 """
370 Send ping data.
372 Parameters
373 ----------
374 payload: str
375 data payload to send server.
376 """
377 if isinstance(payload, str):
378 payload = payload.encode("utf-8")
379 self.send(payload, ABNF.OPCODE_PING)
381 def pong(self, payload: Union[str, bytes] = "") -> None:
382 """
383 Send pong data.
385 Parameters
386 ----------
387 payload: str
388 data payload to send server.
389 """
390 if isinstance(payload, str):
391 payload = payload.encode("utf-8")
392 self.send(payload, ABNF.OPCODE_PONG)
394 def recv(self) -> Union[str, bytes]:
395 """
396 Receive string data(byte array) from the server.
398 Returns
399 ----------
400 data: string (byte array) value.
401 """
402 with self.readlock:
403 opcode, data = self.recv_data()
404 if opcode == ABNF.OPCODE_TEXT:
405 data_received: Union[bytes, str] = data
406 if isinstance(data_received, bytes):
407 return data_received.decode("utf-8")
408 elif isinstance(data_received, str):
409 return data_received
410 elif opcode == ABNF.OPCODE_BINARY:
411 data_binary: bytes = data
412 return data_binary
413 else:
414 return ""
416 def recv_data(self, control_frame: bool = False) -> tuple:
417 """
418 Receive data with operation code.
420 Parameters
421 ----------
422 control_frame: bool
423 a boolean flag indicating whether to return control frame
424 data, defaults to False
426 Returns
427 -------
428 opcode, frame.data: tuple
429 tuple of operation code and string(byte array) value.
430 """
431 opcode, frame = self.recv_data_frame(control_frame)
432 return opcode, frame.data
434 def recv_data_frame(self, control_frame: bool = False) -> tuple:
435 """
436 Receive data with operation code.
438 If a valid ping message is received, a pong response is sent.
440 Parameters
441 ----------
442 control_frame: bool
443 a boolean flag indicating whether to return control frame
444 data, defaults to False
446 Returns
447 -------
448 frame.opcode, frame: tuple
449 tuple of operation code and string(byte array) value.
450 """
451 while True:
452 frame = self.recv_frame()
453 if isEnabledForTrace():
454 trace(f"++Rcv raw: {repr(frame.format())}")
455 trace(f"++Rcv decoded: {frame.__str__()}")
456 if not frame:
457 # handle error:
458 # 'NoneType' object has no attribute 'opcode'
459 raise WebSocketProtocolException(f"Not a valid frame {frame}")
460 elif frame.opcode in (
461 ABNF.OPCODE_TEXT,
462 ABNF.OPCODE_BINARY,
463 ABNF.OPCODE_CONT,
464 ):
465 self.cont_frame.validate(frame)
466 self.cont_frame.add(frame)
468 if self.cont_frame.is_fire(frame):
469 return self.cont_frame.extract(frame)
471 elif frame.opcode == ABNF.OPCODE_CLOSE:
472 self.send_close()
473 return frame.opcode, frame
474 elif frame.opcode == ABNF.OPCODE_PING:
475 if len(frame.data) < 126:
476 self.pong(frame.data)
477 else:
478 raise WebSocketProtocolException("Ping message is too long")
479 if control_frame:
480 return frame.opcode, frame
481 elif frame.opcode == ABNF.OPCODE_PONG:
482 if control_frame:
483 return frame.opcode, frame
485 def recv_frame(self):
486 """
487 Receive data as frame from server.
489 Returns
490 -------
491 self.frame_buffer.recv_frame(): ABNF frame object
492 """
493 return self.frame_buffer.recv_frame()
495 def send_close(
496 self, status: int = STATUS_NORMAL, reason: Union[str, bytes] = b""
497 ) -> None:
498 """
499 Send close data to the server.
501 Parameters
502 ----------
503 status: int
504 Status code to send. See STATUS_XXX.
505 reason: str or bytes
506 The reason to close. This must be string or UTF-8 bytes.
507 """
508 if status < 0 or status >= ABNF.LENGTH_16:
509 raise ValueError("code is invalid range")
511 if reason is None:
512 reason_bytes = b""
513 elif isinstance(reason, str):
514 reason_bytes = reason.encode("utf-8")
515 elif isinstance(reason, bytes):
516 reason_bytes = reason
517 else:
518 reason_bytes = bytes(reason)
520 self.connected = False
521 self.send(struct.pack("!H", status) + reason_bytes, ABNF.OPCODE_CLOSE)
523 def close(
524 self,
525 status: int = STATUS_NORMAL,
526 reason: Union[str, bytes] = b"",
527 timeout: int = 3,
528 ) -> None:
529 """
530 Close Websocket object
532 Parameters
533 ----------
534 status: int
535 Status code to send. See VALID_CLOSE_STATUS in ABNF.
536 reason: bytes
537 The reason to close in UTF-8.
538 timeout: int or float
539 Timeout until receive a close frame.
540 If None, it will wait forever until receive a close frame.
541 """
542 if not self.connected:
543 return
544 if status < 0 or status >= ABNF.LENGTH_16:
545 raise ValueError("code is invalid range")
547 try:
548 self.connected = False
549 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
550 if self.sock is None:
551 return
552 sock_timeout = self.sock.gettimeout()
553 self.sock.settimeout(timeout)
554 start_time = time.time()
555 while timeout is None or time.time() - start_time < timeout:
556 try:
557 frame = self.recv_frame()
558 if frame.opcode != ABNF.OPCODE_CLOSE:
559 continue
560 if isEnabledForError():
561 recv_status = struct.unpack("!H", frame.data[0:2])[0]
562 if recv_status >= 3000 and recv_status <= 4999:
563 debug(f"close status: {repr(recv_status)}")
564 elif recv_status != STATUS_NORMAL:
565 error(f"close status: {repr(recv_status)}")
566 break
567 except (
568 WebSocketConnectionClosedException,
569 WebSocketTimeoutException,
570 struct.error,
571 ):
572 break
573 if self.sock is not None:
574 self.sock.settimeout(sock_timeout)
575 self.sock.shutdown(socket.SHUT_RDWR)
576 except:
577 pass
579 self.shutdown()
581 def abort(self):
582 """
583 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
584 """
585 if self.connected and self.sock is not None:
586 self.sock.shutdown(socket.SHUT_RDWR)
588 def shutdown(self):
589 """
590 close socket, immediately.
591 """
592 if self.sock:
593 try:
594 # Check if socket is still open before closing
595 if not self.sock._closed:
596 self.sock.close()
597 except (OSError, AttributeError):
598 # Socket already closed or invalid file descriptor - this can happen
599 # during reconnection scenarios when network failures occur
600 debug("Socket already closed during shutdown")
601 pass
602 finally:
603 self.sock = None
604 self.connected = False
606 def _send(self, data: Union[str, bytes]) -> int:
607 if self.sock is None:
608 raise WebSocketConnectionClosedException("socket is already closed.")
609 if self.dispatcher:
610 return self.dispatcher.send(self.sock, data)
611 return send(self.sock, data)
613 def _recv(self, bufsize):
614 if self.sock is None:
615 raise WebSocketConnectionClosedException("Connection is closed")
616 try:
617 return recv(self.sock, bufsize)
618 except WebSocketConnectionClosedException:
619 if self.sock:
620 self.sock.close()
621 self.sock = None
622 self.connected = False
623 raise
626def create_connection(
627 url: str,
628 timeout: Optional[Union[float, int]] = None,
629 class_: Type[WebSocket] = WebSocket,
630 **options: Any,
631) -> WebSocket:
632 """
633 Connect to url and return websocket object.
635 Connect to url and return the WebSocket object.
636 Passing optional timeout parameter will set the timeout on the socket.
637 If no timeout is supplied,
638 the global default timeout setting returned by getdefaulttimeout() is used.
639 You can customize using 'options'.
640 If you set "header" list object, you can set your own custom header.
642 >>> conn = create_connection("ws://echo.websocket.events",
643 ... header=["User-Agent: MyProgram",
644 ... "x-custom: header"])
646 Parameters
647 ----------
648 class_: class
649 class to instantiate when creating the connection. It has to implement
650 settimeout and connect. It's __init__ should be compatible with
651 WebSocket.__init__, i.e. accept all of it's kwargs.
652 header: list or dict
653 custom http header list or dict.
654 cookie: str
655 Cookie value.
656 origin: str
657 custom origin url.
658 suppress_origin: bool
659 suppress outputting origin header.
660 host: str
661 custom host header string.
662 timeout: int or float
663 socket timeout time. This value could be either float/integer.
664 If set to None, it uses the default_timeout value.
665 http_proxy_host: str
666 HTTP proxy host name.
667 http_proxy_port: str or int
668 HTTP proxy port. If not set, set to 80.
669 http_no_proxy: list
670 Whitelisted host names that don't use the proxy.
671 http_proxy_auth: tuple
672 HTTP proxy auth information. tuple of username and password. Default is None.
673 http_proxy_timeout: int or float
674 HTTP proxy timeout, default is 60 sec as per python-socks.
675 enable_multithread: bool
676 Enable lock for multithread.
677 redirect_limit: int
678 Number of redirects to follow.
679 sockopt: tuple
680 Values for socket.setsockopt.
681 sockopt must be a tuple and each element is an argument of sock.setsockopt.
682 sslopt: dict
683 Optional dict object for ssl socket options. See FAQ for details.
684 subprotocols: list
685 List of available subprotocols. Default is None.
686 skip_utf8_validation: bool
687 Skip utf8 validation.
688 socket: socket
689 Pre-initialized stream socket.
690 """
691 sockopt = options.pop("sockopt", [])
692 sslopt = options.pop("sslopt", {})
693 fire_cont_frame = options.pop("fire_cont_frame", False)
694 enable_multithread = options.pop("enable_multithread", True)
695 skip_utf8_validation = options.pop("skip_utf8_validation", False)
696 websock = class_(
697 sockopt=sockopt,
698 sslopt=sslopt,
699 fire_cont_frame=fire_cont_frame,
700 enable_multithread=enable_multithread,
701 skip_utf8_validation=skip_utf8_validation,
702 **options,
703 )
704 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
705 websock.connect(url, **options)
706 return websock