Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import socket
2import struct
3import threading
4import time
5from typing import Any, Callable, Optional, Type, Union
7# websocket modules
8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
9from ._exceptions import (
10 WebSocketProtocolException,
11 WebSocketConnectionClosedException,
12 WebSocketTimeoutException,
13 WebSocketException,
14)
15from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake, handshake_response
16from ._http import connect, proxy_info
17from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
18from ._socket import getdefaulttimeout, recv, send, sock_opt
19from ._ssl_compat import ssl
20from ._utils import NoLock
21from ._dispatcher import DispatcherBase, WrappedDispatcher
23"""
24_core.py
25websocket - WebSocket client library for Python
27Copyright 2025 engn33r
29Licensed under the Apache License, Version 2.0 (the "License");
30you may not use this file except in compliance with the License.
31You may obtain a copy of the License at
33 http://www.apache.org/licenses/LICENSE-2.0
35Unless required by applicable law or agreed to in writing, software
36distributed under the License is distributed on an "AS IS" BASIS,
37WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
38See the License for the specific language governing permissions and
39limitations under the License.
40"""
42__all__ = ["WebSocket", "create_connection"]
45class WebSocket:
46 """
47 Low level WebSocket interface.
49 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
51 We can connect to the websocket server and send/receive data.
52 The following example is an echo client.
54 >>> import websocket
55 >>> ws = websocket.WebSocket()
56 >>> ws.connect("ws://echo.websocket.events")
57 >>> ws.recv()
58 'echo.websocket.events sponsored by Lob.com'
59 >>> ws.send("Hello, Server")
60 19
61 >>> ws.recv()
62 'Hello, Server'
63 >>> ws.close()
65 Parameters
66 ----------
67 get_mask_key: func
68 A callable function to get new mask keys, see the
69 WebSocket.set_mask_key's docstring for more information.
70 sockopt: tuple
71 Values for socket.setsockopt.
72 sockopt must be tuple and each element is argument of sock.setsockopt.
73 sslopt: dict
74 Optional dict object for ssl socket options. See FAQ for details.
75 fire_cont_frame: bool
76 Fire recv event for each cont frame. Default is False.
77 enable_multithread: bool
78 If set to True, lock send method.
79 skip_utf8_validation: bool
80 Skip utf8 validation.
81 """
83 def __init__(
84 self,
85 get_mask_key: Optional[Callable] = None,
86 sockopt: Optional[list] = None,
87 sslopt: Optional[dict] = None,
88 fire_cont_frame: bool = False,
89 enable_multithread: bool = True,
90 skip_utf8_validation: bool = False,
91 dispatcher: Optional[Union[DispatcherBase, WrappedDispatcher]] = None,
92 **_: Any,
93 ) -> None:
94 """
95 Initialize WebSocket object.
97 Parameters
98 ----------
99 sslopt: dict
100 Optional dict object for ssl socket options. See FAQ for details.
101 """
102 self.sock_opt = sock_opt(sockopt, sslopt)
103 self.handshake_response: Optional[handshake_response] = None
104 self.sock: Optional[socket.socket] = None
106 self.connected = False
107 self.close_frame: Optional[ABNF] = None
108 self.get_mask_key = get_mask_key
109 # These buffer over the build-up of a single frame.
110 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
111 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
112 self.dispatcher = dispatcher
114 if enable_multithread:
115 self.lock = threading.Lock()
116 self.readlock = threading.Lock()
117 else:
118 self.lock = NoLock() # type: ignore[assignment]
119 self.readlock = NoLock() # type: ignore[assignment]
121 def __iter__(self):
122 """
123 Allow iteration over websocket, implying sequential `recv` executions.
124 """
125 while True:
126 yield self.recv()
128 def __next__(self):
129 return self.recv()
131 def next(self):
132 return self.__next__()
134 def fileno(self):
135 if self.sock is None:
136 raise WebSocketException("Connection not established")
137 return self.sock.fileno()
139 def set_mask_key(self, func):
140 """
141 Set function to create mask key. You can customize mask key generator.
142 Mainly, this is for testing purpose.
144 Parameters
145 ----------
146 func: func
147 callable object. the func takes 1 argument as integer.
148 The argument means length of mask key.
149 This func must return string(byte array),
150 which length is argument specified.
151 """
152 self.get_mask_key = func
154 def gettimeout(self) -> Optional[Union[float, int]]:
155 """
156 Get the websocket timeout (in seconds) as an int or float
158 Returns
159 ----------
160 timeout: int or float
161 returns timeout value (in seconds). This value could be either float/integer.
162 """
163 return self.sock_opt.timeout
165 def settimeout(self, timeout: Optional[Union[float, int]]) -> None:
166 """
167 Set the timeout to the websocket.
169 Parameters
170 ----------
171 timeout: int or float
172 timeout time (in seconds). This value could be either float/integer.
173 """
174 self.sock_opt.timeout = timeout
175 if self.sock:
176 self.sock.settimeout(timeout)
178 timeout = property(gettimeout, settimeout)
180 def getsubprotocol(self) -> Optional[str]:
181 """
182 Get subprotocol
183 """
184 if self.handshake_response:
185 return self.handshake_response.subprotocol
186 else:
187 return None
189 subprotocol = property(getsubprotocol)
191 def getstatus(self) -> Optional[int]:
192 """
193 Get handshake status
194 """
195 if self.handshake_response:
196 return self.handshake_response.status
197 else:
198 return None
200 status = property(getstatus)
202 def getheaders(self) -> Optional[dict]:
203 """
204 Get handshake response header
205 """
206 if self.handshake_response:
207 return self.handshake_response.headers
208 else:
209 return None
211 def is_ssl(self):
212 try:
213 return isinstance(self.sock, ssl.SSLSocket)
214 except (AttributeError, NameError):
215 return False
217 headers = property(getheaders)
219 def connect(self, url, **options):
220 """
221 Connect to url. url is websocket url scheme.
222 ie. ws://host:port/resource
223 You can customize using 'options'.
224 If you set "header" list object, you can set your own custom header.
226 >>> ws = WebSocket()
227 >>> ws.connect("ws://echo.websocket.events",
228 ... header=["User-Agent: MyProgram",
229 ... "x-custom: header"])
231 Parameters
232 ----------
233 header: list or dict
234 Custom http header list or dict.
235 cookie: str
236 Cookie value.
237 origin: str
238 Custom origin url.
239 connection: str
240 Custom connection header value.
241 Default value "Upgrade" set in _handshake.py
242 suppress_origin: bool
243 Suppress outputting origin header.
244 suppress_host: bool
245 Suppress outputting host header.
246 host: str
247 Custom host header string.
248 timeout: int or float
249 Socket timeout time. This value is an integer or float.
250 If you set None for this value, it means "use default_timeout value"
251 http_proxy_host: str
252 HTTP proxy host name.
253 http_proxy_port: str or int
254 HTTP proxy port. Default is 80.
255 http_no_proxy: list
256 Whitelisted host names that don't use the proxy.
257 http_proxy_auth: tuple
258 HTTP proxy auth information. Tuple of username and password. Default is None.
259 http_proxy_timeout: int or float
260 HTTP proxy timeout, default is 60 sec as per python-socks.
261 redirect_limit: int
262 Number of redirects to follow.
263 subprotocols: list
264 List of available subprotocols. Default is None.
265 socket: socket
266 Pre-initialized stream socket.
267 """
268 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
269 self.sock, addrs = connect(
270 url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
271 )
273 try:
274 self.handshake_response = handshake(self.sock, url, *addrs, **options)
275 for _ in range(options.pop("redirect_limit", 3)):
276 if (
277 self.handshake_response is not None
278 and self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES
279 ):
280 url = self.handshake_response.headers["location"]
281 self.sock.close()
282 self.sock, addrs = connect(
283 url,
284 self.sock_opt,
285 proxy_info(**options),
286 options.pop("socket", None),
287 )
288 self.handshake_response = handshake(
289 self.sock, url, *addrs, **options
290 )
291 self.connected = True
292 except:
293 if self.sock:
294 self.sock.close()
295 self.sock = None
296 raise
298 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
299 """
300 Send the data as string.
302 Parameters
303 ----------
304 payload: str
305 Payload must be utf-8 string or unicode,
306 If the opcode is OPCODE_TEXT.
307 Otherwise, it must be string(byte array).
308 opcode: int
309 Operation code (opcode) to send.
310 """
312 frame = ABNF.create_frame(payload, opcode)
313 return self.send_frame(frame)
315 def send_text(self, text_data: str) -> int:
316 """
317 Sends UTF-8 encoded text.
318 """
319 return self.send(text_data, ABNF.OPCODE_TEXT)
321 def send_bytes(self, data: Union[bytes, bytearray]) -> int:
322 """
323 Sends a sequence of bytes.
324 """
325 return self.send(data, ABNF.OPCODE_BINARY)
327 def send_frame(self, frame: ABNF) -> int:
328 """
329 Send the data frame.
331 >>> ws = create_connection("ws://echo.websocket.events")
332 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
333 >>> ws.send_frame(frame)
334 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
335 >>> ws.send_frame(frame)
336 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
337 >>> ws.send_frame(frame)
339 Parameters
340 ----------
341 frame: ABNF frame
342 frame data created by ABNF.create_frame
343 """
344 if self.get_mask_key:
345 frame.get_mask_key = self.get_mask_key
346 data = frame.format()
347 length = len(data)
348 if isEnabledForTrace():
349 trace(f"++Sent raw: {repr(data)}")
350 trace(f"++Sent decoded: {frame.__str__()}")
351 with self.lock:
352 while data:
353 bytes_sent = self._send(data)
354 data = data[bytes_sent:]
356 return length
358 def send_binary(self, payload: bytes) -> int:
359 """
360 Send a binary message (OPCODE_BINARY).
362 Parameters
363 ----------
364 payload: bytes
365 payload of message to send.
366 """
367 return self.send(payload, ABNF.OPCODE_BINARY)
369 def ping(self, payload: Union[str, bytes] = "") -> None:
370 """
371 Send ping data.
373 Parameters
374 ----------
375 payload: str
376 data payload to send server.
377 """
378 if isinstance(payload, str):
379 payload = payload.encode("utf-8")
380 self.send(payload, ABNF.OPCODE_PING)
382 def pong(self, payload: Union[str, bytes] = "") -> None:
383 """
384 Send pong data.
386 Parameters
387 ----------
388 payload: str
389 data payload to send server.
390 """
391 if isinstance(payload, str):
392 payload = payload.encode("utf-8")
393 self.send(payload, ABNF.OPCODE_PONG)
395 def recv(self) -> Union[str, bytes]:
396 """
397 Receive string data(byte array) from the server.
399 Returns
400 ----------
401 data: string (byte array) value.
402 """
403 with self.readlock:
404 opcode, data = self.recv_data()
405 if opcode == ABNF.OPCODE_TEXT:
406 data_received: Union[bytes, str] = data
407 if isinstance(data_received, bytes):
408 return data_received.decode("utf-8")
409 elif isinstance(data_received, str):
410 return data_received
411 elif opcode == ABNF.OPCODE_BINARY:
412 data_binary: bytes = data
413 return data_binary
414 else:
415 return ""
417 def recv_data(self, control_frame: bool = False) -> tuple:
418 """
419 Receive data with operation code.
421 Parameters
422 ----------
423 control_frame: bool
424 a boolean flag indicating whether to return control frame
425 data, defaults to False
427 Returns
428 -------
429 opcode, frame.data: tuple
430 tuple of operation code and string(byte array) value.
431 """
432 opcode, frame = self.recv_data_frame(control_frame)
433 return opcode, frame.data
435 def recv_data_frame(self, control_frame: bool = False) -> tuple:
436 """
437 Receive data with operation code.
439 If a valid ping message is received, a pong response is sent.
441 Parameters
442 ----------
443 control_frame: bool
444 a boolean flag indicating whether to return control frame
445 data, defaults to False
447 Returns
448 -------
449 frame.opcode, frame: tuple
450 tuple of operation code and string(byte array) value.
451 """
452 while True:
453 frame = self.recv_frame()
454 if isEnabledForTrace():
455 trace(f"++Rcv raw: {repr(frame.format())}")
456 trace(f"++Rcv decoded: {frame.__str__()}")
457 if not frame:
458 # handle error:
459 # 'NoneType' object has no attribute 'opcode'
460 raise WebSocketProtocolException(f"Not a valid frame {frame}")
461 elif frame.opcode in (
462 ABNF.OPCODE_TEXT,
463 ABNF.OPCODE_BINARY,
464 ABNF.OPCODE_CONT,
465 ):
466 self.cont_frame.validate(frame)
467 self.cont_frame.add(frame)
469 if self.cont_frame.is_fire(frame):
470 return self.cont_frame.extract(frame)
472 elif frame.opcode == ABNF.OPCODE_CLOSE:
473 self.send_close()
474 return frame.opcode, frame
475 elif frame.opcode == ABNF.OPCODE_PING:
476 if len(frame.data) < 126:
477 self.pong(frame.data)
478 else:
479 raise WebSocketProtocolException("Ping message is too long")
480 if control_frame:
481 return frame.opcode, frame
482 elif frame.opcode == ABNF.OPCODE_PONG:
483 if control_frame:
484 return frame.opcode, frame
486 def recv_frame(self):
487 """
488 Receive data as frame from server.
490 Returns
491 -------
492 self.frame_buffer.recv_frame(): ABNF frame object
493 """
494 return self.frame_buffer.recv_frame()
496 def send_close(
497 self, status: int = STATUS_NORMAL, reason: Union[str, bytes] = b""
498 ) -> None:
499 """
500 Send close data to the server.
502 Parameters
503 ----------
504 status: int
505 Status code to send. See STATUS_XXX.
506 reason: str or bytes
507 The reason to close. This must be string or UTF-8 bytes.
508 """
509 if status < 0 or status >= ABNF.LENGTH_16:
510 raise ValueError("code is invalid range")
512 if reason is None:
513 reason_bytes = b""
514 elif isinstance(reason, str):
515 reason_bytes = reason.encode("utf-8")
516 elif isinstance(reason, bytes):
517 reason_bytes = reason
518 else:
519 reason_bytes = bytes(reason)
521 self.connected = False
522 self.send(struct.pack("!H", status) + reason_bytes, ABNF.OPCODE_CLOSE)
524 def close(
525 self,
526 status: int = STATUS_NORMAL,
527 reason: Union[str, bytes] = b"",
528 timeout: int = 3,
529 ) -> None:
530 """
531 Close Websocket object
533 Parameters
534 ----------
535 status: int
536 Status code to send. See VALID_CLOSE_STATUS in ABNF.
537 reason: bytes
538 The reason to close in UTF-8.
539 timeout: int or float
540 Timeout until receive a close frame.
541 If None, it will wait forever until receive a close frame.
542 """
543 if not self.connected:
544 return
545 if status < 0 or status >= ABNF.LENGTH_16:
546 raise ValueError("code is invalid range")
548 # Reset close_frame to avoid stale data from previous connections
549 self.close_frame = None
551 try:
552 self.connected = False
553 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
554 if self.sock is None:
555 return
556 sock_timeout = self.sock.gettimeout()
557 self.sock.settimeout(timeout)
558 start_time = time.time()
559 while timeout is None or time.time() - start_time < timeout:
560 try:
561 frame = self.recv_frame()
562 if frame.opcode != ABNF.OPCODE_CLOSE:
563 continue
564 # Store the peer's close frame for access by higher-level APIs
565 self.close_frame = frame
566 if isEnabledForError():
567 recv_status = struct.unpack("!H", frame.data[0:2])[0]
568 if recv_status >= 3000 and recv_status <= 4999:
569 debug(f"close status: {repr(recv_status)}")
570 elif recv_status != STATUS_NORMAL:
571 error(f"close status: {repr(recv_status)}")
572 break
573 except (
574 WebSocketConnectionClosedException,
575 WebSocketTimeoutException,
576 struct.error,
577 ):
578 break
579 if self.sock is not None:
580 self.sock.settimeout(sock_timeout)
581 self.sock.shutdown(socket.SHUT_RDWR)
582 except:
583 pass
585 self.shutdown()
587 def abort(self):
588 """
589 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
590 """
591 if self.connected and self.sock is not None:
592 self.sock.shutdown(socket.SHUT_RDWR)
594 def shutdown(self):
595 """
596 close socket, immediately.
597 """
598 if self.sock:
599 try:
600 # Check if socket is still open before closing
601 if not self.sock._closed:
602 self.sock.close()
603 except (OSError, AttributeError):
604 # Socket already closed or invalid file descriptor - this can happen
605 # during reconnection scenarios when network failures occur
606 debug("Socket already closed during shutdown")
607 pass
608 finally:
609 self.sock = None
610 self.connected = False
612 def _send(self, data: Union[str, bytes]) -> int:
613 if self.sock is None:
614 raise WebSocketConnectionClosedException("socket is already closed.")
615 if self.dispatcher:
616 return self.dispatcher.send(self.sock, data)
617 return send(self.sock, data)
619 def _recv(self, bufsize):
620 if self.sock is None:
621 raise WebSocketConnectionClosedException("Connection is closed")
622 try:
623 return recv(self.sock, bufsize)
624 except WebSocketConnectionClosedException:
625 if self.sock:
626 self.sock.close()
627 self.sock = None
628 self.connected = False
629 raise
632def create_connection(
633 url: str,
634 timeout: Optional[Union[float, int]] = None,
635 class_: Type[WebSocket] = WebSocket,
636 **options: Any,
637) -> WebSocket:
638 """
639 Connect to url and return websocket object.
641 Connect to url and return the WebSocket object.
642 Passing optional timeout parameter will set the timeout on the socket.
643 If no timeout is supplied,
644 the global default timeout setting returned by getdefaulttimeout() is used.
645 You can customize using 'options'.
646 If you set "header" list object, you can set your own custom header.
648 >>> conn = create_connection("ws://echo.websocket.events",
649 ... header=["User-Agent: MyProgram",
650 ... "x-custom: header"])
652 Parameters
653 ----------
654 class_: class
655 class to instantiate when creating the connection. It has to implement
656 settimeout and connect. It's __init__ should be compatible with
657 WebSocket.__init__, i.e. accept all of it's kwargs.
658 header: list or dict
659 custom http header list or dict.
660 cookie: str
661 Cookie value.
662 origin: str
663 custom origin url.
664 suppress_origin: bool
665 suppress outputting origin header.
666 host: str
667 custom host header string.
668 timeout: int or float
669 socket timeout time. This value could be either float/integer.
670 If set to None, it uses the default_timeout value.
671 http_proxy_host: str
672 HTTP proxy host name.
673 http_proxy_port: str or int
674 HTTP proxy port. If not set, set to 80.
675 http_no_proxy: list
676 Whitelisted host names that don't use the proxy.
677 http_proxy_auth: tuple
678 HTTP proxy auth information. tuple of username and password. Default is None.
679 http_proxy_timeout: int or float
680 HTTP proxy timeout, default is 60 sec as per python-socks.
681 enable_multithread: bool
682 Enable lock for multithread.
683 redirect_limit: int
684 Number of redirects to follow.
685 sockopt: tuple
686 Values for socket.setsockopt.
687 sockopt must be a tuple and each element is an argument of sock.setsockopt.
688 sslopt: dict
689 Optional dict object for ssl socket options. See FAQ for details.
690 subprotocols: list
691 List of available subprotocols. Default is None.
692 skip_utf8_validation: bool
693 Skip utf8 validation.
694 socket: socket
695 Pre-initialized stream socket.
696 """
697 sockopt = options.pop("sockopt", [])
698 sslopt = options.pop("sslopt", {})
699 fire_cont_frame = options.pop("fire_cont_frame", False)
700 enable_multithread = options.pop("enable_multithread", True)
701 skip_utf8_validation = options.pop("skip_utf8_validation", False)
702 websock = class_(
703 sockopt=sockopt,
704 sslopt=sslopt,
705 fire_cont_frame=fire_cont_frame,
706 enable_multithread=enable_multithread,
707 skip_utf8_validation=skip_utf8_validation,
708 **options,
709 )
710 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
711 websock.connect(url, **options)
712 return websock