Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import socket
2import struct
3import threading
4import time
5from typing import Optional, Union
7# websocket modules
8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
9from ._exceptions import (
10 WebSocketProtocolException,
11 WebSocketConnectionClosedException,
12 WebSocketTimeoutException,
13)
14from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake
15from ._http import connect, proxy_info
16from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
17from ._socket import getdefaulttimeout, recv, send, sock_opt
18from ._ssl_compat import ssl
19from ._utils import NoLock
20from ._dispatcher import DispatcherBase, WrappedDispatcher
22"""
23_core.py
24websocket - WebSocket client library for Python
26Copyright 2025 engn33r
28Licensed under the Apache License, Version 2.0 (the "License");
29you may not use this file except in compliance with the License.
30You may obtain a copy of the License at
32 http://www.apache.org/licenses/LICENSE-2.0
34Unless required by applicable law or agreed to in writing, software
35distributed under the License is distributed on an "AS IS" BASIS,
36WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
37See the License for the specific language governing permissions and
38limitations under the License.
39"""
41__all__ = ["WebSocket", "create_connection"]
44class WebSocket:
45 """
46 Low level WebSocket interface.
48 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
50 We can connect to the websocket server and send/receive data.
51 The following example is an echo client.
53 >>> import websocket
54 >>> ws = websocket.WebSocket()
55 >>> ws.connect("ws://echo.websocket.events")
56 >>> ws.recv()
57 'echo.websocket.events sponsored by Lob.com'
58 >>> ws.send("Hello, Server")
59 19
60 >>> ws.recv()
61 'Hello, Server'
62 >>> ws.close()
64 Parameters
65 ----------
66 get_mask_key: func
67 A callable function to get new mask keys, see the
68 WebSocket.set_mask_key's docstring for more information.
69 sockopt: tuple
70 Values for socket.setsockopt.
71 sockopt must be tuple and each element is argument of sock.setsockopt.
72 sslopt: dict
73 Optional dict object for ssl socket options. See FAQ for details.
74 fire_cont_frame: bool
75 Fire recv event for each cont frame. Default is False.
76 enable_multithread: bool
77 If set to True, lock send method.
78 skip_utf8_validation: bool
79 Skip utf8 validation.
80 """
82 def __init__(
83 self,
84 get_mask_key=None,
85 sockopt=None,
86 sslopt=None,
87 fire_cont_frame: bool = False,
88 enable_multithread: bool = True,
89 skip_utf8_validation: bool = False,
90 dispatcher: Union[DispatcherBase, WrappedDispatcher] = None,
91 **_,
92 ):
93 """
94 Initialize WebSocket object.
96 Parameters
97 ----------
98 sslopt: dict
99 Optional dict object for ssl socket options. See FAQ for details.
100 """
101 self.sock_opt = sock_opt(sockopt, sslopt)
102 self.handshake_response = None
103 self.sock: Optional[socket.socket] = None
105 self.connected = False
106 self.get_mask_key = get_mask_key
107 # These buffer over the build-up of a single frame.
108 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
109 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
110 self.dispatcher = dispatcher
112 if enable_multithread:
113 self.lock = threading.Lock()
114 self.readlock = threading.Lock()
115 else:
116 self.lock = NoLock() # type: ignore[assignment]
117 self.readlock = NoLock() # type: ignore[assignment]
119 def __iter__(self):
120 """
121 Allow iteration over websocket, implying sequential `recv` executions.
122 """
123 while True:
124 yield self.recv()
126 def __next__(self):
127 return self.recv()
129 def next(self):
130 return self.__next__()
132 def fileno(self):
133 return self.sock.fileno()
135 def set_mask_key(self, func):
136 """
137 Set function to create mask key. You can customize mask key generator.
138 Mainly, this is for testing purpose.
140 Parameters
141 ----------
142 func: func
143 callable object. the func takes 1 argument as integer.
144 The argument means length of mask key.
145 This func must return string(byte array),
146 which length is argument specified.
147 """
148 self.get_mask_key = func
150 def gettimeout(self) -> Optional[Union[float, int]]:
151 """
152 Get the websocket timeout (in seconds) as an int or float
154 Returns
155 ----------
156 timeout: int or float
157 returns timeout value (in seconds). This value could be either float/integer.
158 """
159 return self.sock_opt.timeout
161 def settimeout(self, timeout: Optional[Union[float, int]]):
162 """
163 Set the timeout to the websocket.
165 Parameters
166 ----------
167 timeout: int or float
168 timeout time (in seconds). This value could be either float/integer.
169 """
170 self.sock_opt.timeout = timeout
171 if self.sock:
172 self.sock.settimeout(timeout)
174 timeout = property(gettimeout, settimeout)
176 def getsubprotocol(self):
177 """
178 Get subprotocol
179 """
180 if self.handshake_response:
181 return self.handshake_response.subprotocol
182 else:
183 return None
185 subprotocol = property(getsubprotocol)
187 def getstatus(self):
188 """
189 Get handshake status
190 """
191 if self.handshake_response:
192 return self.handshake_response.status
193 else:
194 return None
196 status = property(getstatus)
198 def getheaders(self):
199 """
200 Get handshake response header
201 """
202 if self.handshake_response:
203 return self.handshake_response.headers
204 else:
205 return None
207 def is_ssl(self):
208 try:
209 return isinstance(self.sock, ssl.SSLSocket)
210 except (AttributeError, NameError):
211 return False
213 headers = property(getheaders)
215 def connect(self, url, **options):
216 """
217 Connect to url. url is websocket url scheme.
218 ie. ws://host:port/resource
219 You can customize using 'options'.
220 If you set "header" list object, you can set your own custom header.
222 >>> ws = WebSocket()
223 >>> ws.connect("ws://echo.websocket.events",
224 ... header=["User-Agent: MyProgram",
225 ... "x-custom: header"])
227 Parameters
228 ----------
229 header: list or dict
230 Custom http header list or dict.
231 cookie: str
232 Cookie value.
233 origin: str
234 Custom origin url.
235 connection: str
236 Custom connection header value.
237 Default value "Upgrade" set in _handshake.py
238 suppress_origin: bool
239 Suppress outputting origin header.
240 host: str
241 Custom host header string.
242 timeout: int or float
243 Socket timeout time. This value is an integer or float.
244 If you set None for this value, it means "use default_timeout value"
245 http_proxy_host: str
246 HTTP proxy host name.
247 http_proxy_port: str or int
248 HTTP proxy port. Default is 80.
249 http_no_proxy: list
250 Whitelisted host names that don't use the proxy.
251 http_proxy_auth: tuple
252 HTTP proxy auth information. Tuple of username and password. Default is None.
253 http_proxy_timeout: int or float
254 HTTP proxy timeout, default is 60 sec as per python-socks.
255 redirect_limit: int
256 Number of redirects to follow.
257 subprotocols: list
258 List of available subprotocols. Default is None.
259 socket: socket
260 Pre-initialized stream socket.
261 """
262 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
263 self.sock, addrs = connect(
264 url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
265 )
267 try:
268 self.handshake_response = handshake(self.sock, url, *addrs, **options)
269 for _ in range(options.pop("redirect_limit", 3)):
270 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
271 url = self.handshake_response.headers["location"]
272 self.sock.close()
273 self.sock, addrs = connect(
274 url,
275 self.sock_opt,
276 proxy_info(**options),
277 options.pop("socket", None),
278 )
279 self.handshake_response = handshake(
280 self.sock, url, *addrs, **options
281 )
282 self.connected = True
283 except:
284 if self.sock:
285 self.sock.close()
286 self.sock = None
287 raise
289 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
290 """
291 Send the data as string.
293 Parameters
294 ----------
295 payload: str
296 Payload must be utf-8 string or unicode,
297 If the opcode is OPCODE_TEXT.
298 Otherwise, it must be string(byte array).
299 opcode: int
300 Operation code (opcode) to send.
301 """
303 frame = ABNF.create_frame(payload, opcode)
304 return self.send_frame(frame)
306 def send_text(self, text_data: str) -> int:
307 """
308 Sends UTF-8 encoded text.
309 """
310 return self.send(text_data, ABNF.OPCODE_TEXT)
312 def send_bytes(self, data: Union[bytes, bytearray]) -> int:
313 """
314 Sends a sequence of bytes.
315 """
316 return self.send(data, ABNF.OPCODE_BINARY)
318 def send_frame(self, frame) -> int:
319 """
320 Send the data frame.
322 >>> ws = create_connection("ws://echo.websocket.events")
323 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
324 >>> ws.send_frame(frame)
325 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
326 >>> ws.send_frame(frame)
327 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
328 >>> ws.send_frame(frame)
330 Parameters
331 ----------
332 frame: ABNF frame
333 frame data created by ABNF.create_frame
334 """
335 if self.get_mask_key:
336 frame.get_mask_key = self.get_mask_key
337 data = frame.format()
338 length = len(data)
339 if isEnabledForTrace():
340 trace(f"++Sent raw: {repr(data)}")
341 trace(f"++Sent decoded: {frame.__str__()}")
342 with self.lock:
343 while data:
344 bytes_sent = self._send(data)
345 data = data[bytes_sent:]
347 return length
349 def send_binary(self, payload: bytes) -> int:
350 """
351 Send a binary message (OPCODE_BINARY).
353 Parameters
354 ----------
355 payload: bytes
356 payload of message to send.
357 """
358 return self.send(payload, ABNF.OPCODE_BINARY)
360 def ping(self, payload: Union[str, bytes] = ""):
361 """
362 Send ping data.
364 Parameters
365 ----------
366 payload: str
367 data payload to send server.
368 """
369 if isinstance(payload, str):
370 payload = payload.encode("utf-8")
371 self.send(payload, ABNF.OPCODE_PING)
373 def pong(self, payload: Union[str, bytes] = ""):
374 """
375 Send pong data.
377 Parameters
378 ----------
379 payload: str
380 data payload to send server.
381 """
382 if isinstance(payload, str):
383 payload = payload.encode("utf-8")
384 self.send(payload, ABNF.OPCODE_PONG)
386 def recv(self) -> Union[str, bytes]:
387 """
388 Receive string data(byte array) from the server.
390 Returns
391 ----------
392 data: string (byte array) value.
393 """
394 with self.readlock:
395 opcode, data = self.recv_data()
396 if opcode == ABNF.OPCODE_TEXT:
397 data_received: Union[bytes, str] = data
398 if isinstance(data_received, bytes):
399 return data_received.decode("utf-8")
400 elif isinstance(data_received, str):
401 return data_received
402 elif opcode == ABNF.OPCODE_BINARY:
403 data_binary: bytes = data
404 return data_binary
405 else:
406 return ""
408 def recv_data(self, control_frame: bool = False) -> tuple:
409 """
410 Receive data with operation code.
412 Parameters
413 ----------
414 control_frame: bool
415 a boolean flag indicating whether to return control frame
416 data, defaults to False
418 Returns
419 -------
420 opcode, frame.data: tuple
421 tuple of operation code and string(byte array) value.
422 """
423 opcode, frame = self.recv_data_frame(control_frame)
424 return opcode, frame.data
426 def recv_data_frame(self, control_frame: bool = False) -> tuple:
427 """
428 Receive data with operation code.
430 If a valid ping message is received, a pong response is sent.
432 Parameters
433 ----------
434 control_frame: bool
435 a boolean flag indicating whether to return control frame
436 data, defaults to False
438 Returns
439 -------
440 frame.opcode, frame: tuple
441 tuple of operation code and string(byte array) value.
442 """
443 while True:
444 frame = self.recv_frame()
445 if isEnabledForTrace():
446 trace(f"++Rcv raw: {repr(frame.format())}")
447 trace(f"++Rcv decoded: {frame.__str__()}")
448 if not frame:
449 # handle error:
450 # 'NoneType' object has no attribute 'opcode'
451 raise WebSocketProtocolException(f"Not a valid frame {frame}")
452 elif frame.opcode in (
453 ABNF.OPCODE_TEXT,
454 ABNF.OPCODE_BINARY,
455 ABNF.OPCODE_CONT,
456 ):
457 self.cont_frame.validate(frame)
458 self.cont_frame.add(frame)
460 if self.cont_frame.is_fire(frame):
461 return self.cont_frame.extract(frame)
463 elif frame.opcode == ABNF.OPCODE_CLOSE:
464 self.send_close()
465 return frame.opcode, frame
466 elif frame.opcode == ABNF.OPCODE_PING:
467 if len(frame.data) < 126:
468 self.pong(frame.data)
469 else:
470 raise WebSocketProtocolException("Ping message is too long")
471 if control_frame:
472 return frame.opcode, frame
473 elif frame.opcode == ABNF.OPCODE_PONG:
474 if control_frame:
475 return frame.opcode, frame
477 def recv_frame(self):
478 """
479 Receive data as frame from server.
481 Returns
482 -------
483 self.frame_buffer.recv_frame(): ABNF frame object
484 """
485 return self.frame_buffer.recv_frame()
487 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
488 """
489 Send close data to the server.
491 Parameters
492 ----------
493 status: int
494 Status code to send. See STATUS_XXX.
495 reason: str or bytes
496 The reason to close. This must be string or UTF-8 bytes.
497 """
498 if status < 0 or status >= ABNF.LENGTH_16:
499 raise ValueError("code is invalid range")
500 self.connected = False
501 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
503 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3):
504 """
505 Close Websocket object
507 Parameters
508 ----------
509 status: int
510 Status code to send. See VALID_CLOSE_STATUS in ABNF.
511 reason: bytes
512 The reason to close in UTF-8.
513 timeout: int or float
514 Timeout until receive a close frame.
515 If None, it will wait forever until receive a close frame.
516 """
517 if not self.connected:
518 return
519 if status < 0 or status >= ABNF.LENGTH_16:
520 raise ValueError("code is invalid range")
522 try:
523 self.connected = False
524 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
525 if self.sock is None:
526 return
527 sock_timeout = self.sock.gettimeout()
528 self.sock.settimeout(timeout)
529 start_time = time.time()
530 while timeout is None or time.time() - start_time < timeout:
531 try:
532 frame = self.recv_frame()
533 if frame.opcode != ABNF.OPCODE_CLOSE:
534 continue
535 if isEnabledForError():
536 recv_status = struct.unpack("!H", frame.data[0:2])[0]
537 if recv_status >= 3000 and recv_status <= 4999:
538 debug(f"close status: {repr(recv_status)}")
539 elif recv_status != STATUS_NORMAL:
540 error(f"close status: {repr(recv_status)}")
541 break
542 except (
543 WebSocketConnectionClosedException,
544 WebSocketTimeoutException,
545 struct.error,
546 ):
547 break
548 if self.sock is not None:
549 self.sock.settimeout(sock_timeout)
550 self.sock.shutdown(socket.SHUT_RDWR)
551 except:
552 pass
554 self.shutdown()
556 def abort(self):
557 """
558 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
559 """
560 if self.connected:
561 self.sock.shutdown(socket.SHUT_RDWR)
563 def shutdown(self):
564 """
565 close socket, immediately.
566 """
567 if self.sock:
568 self.sock.close()
569 self.sock = None
570 self.connected = False
572 def _send(self, data: Union[str, bytes]):
573 if self.sock is None:
574 raise WebSocketConnectionClosedException("socket is already closed.")
575 if self.dispatcher:
576 return self.dispatcher.send(self.sock, data)
577 return send(self.sock, data)
579 def _recv(self, bufsize):
580 try:
581 return recv(self.sock, bufsize)
582 except WebSocketConnectionClosedException:
583 if self.sock:
584 self.sock.close()
585 self.sock = None
586 self.connected = False
587 raise
590def create_connection(url: str, timeout=None, class_=WebSocket, **options):
591 """
592 Connect to url and return websocket object.
594 Connect to url and return the WebSocket object.
595 Passing optional timeout parameter will set the timeout on the socket.
596 If no timeout is supplied,
597 the global default timeout setting returned by getdefaulttimeout() is used.
598 You can customize using 'options'.
599 If you set "header" list object, you can set your own custom header.
601 >>> conn = create_connection("ws://echo.websocket.events",
602 ... header=["User-Agent: MyProgram",
603 ... "x-custom: header"])
605 Parameters
606 ----------
607 class_: class
608 class to instantiate when creating the connection. It has to implement
609 settimeout and connect. It's __init__ should be compatible with
610 WebSocket.__init__, i.e. accept all of it's kwargs.
611 header: list or dict
612 custom http header list or dict.
613 cookie: str
614 Cookie value.
615 origin: str
616 custom origin url.
617 suppress_origin: bool
618 suppress outputting origin header.
619 host: str
620 custom host header string.
621 timeout: int or float
622 socket timeout time. This value could be either float/integer.
623 If set to None, it uses the default_timeout value.
624 http_proxy_host: str
625 HTTP proxy host name.
626 http_proxy_port: str or int
627 HTTP proxy port. If not set, set to 80.
628 http_no_proxy: list
629 Whitelisted host names that don't use the proxy.
630 http_proxy_auth: tuple
631 HTTP proxy auth information. tuple of username and password. Default is None.
632 http_proxy_timeout: int or float
633 HTTP proxy timeout, default is 60 sec as per python-socks.
634 enable_multithread: bool
635 Enable lock for multithread.
636 redirect_limit: int
637 Number of redirects to follow.
638 sockopt: tuple
639 Values for socket.setsockopt.
640 sockopt must be a tuple and each element is an argument of sock.setsockopt.
641 sslopt: dict
642 Optional dict object for ssl socket options. See FAQ for details.
643 subprotocols: list
644 List of available subprotocols. Default is None.
645 skip_utf8_validation: bool
646 Skip utf8 validation.
647 socket: socket
648 Pre-initialized stream socket.
649 """
650 sockopt = options.pop("sockopt", [])
651 sslopt = options.pop("sslopt", {})
652 fire_cont_frame = options.pop("fire_cont_frame", False)
653 enable_multithread = options.pop("enable_multithread", True)
654 skip_utf8_validation = options.pop("skip_utf8_validation", False)
655 websock = class_(
656 sockopt=sockopt,
657 sslopt=sslopt,
658 fire_cont_frame=fire_cont_frame,
659 enable_multithread=enable_multithread,
660 skip_utf8_validation=skip_utf8_validation,
661 **options,
662 )
663 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
664 websock.connect(url, **options)
665 return websock