Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 11%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import inspect
2import socket
3import threading
4import time
5from typing import Any, Callable, List, Optional, Tuple, Union
7from ._logging import debug, error, info, warning
8from ._abnf import ABNF
9from ._core import WebSocket, getdefaulttimeout
10from ._exceptions import (
11 WebSocketConnectionClosedException,
12 WebSocketException,
13 WebSocketTimeoutException,
14)
15from ._ssl_compat import SSLEOFError
16from ._url import parse_url
17from ._dispatcher import Dispatcher, DispatcherBase, SSLDispatcher, WrappedDispatcher
19"""
20_app.py
21websocket - WebSocket client library for Python
23Copyright 2025 engn33r
25Licensed under the Apache License, Version 2.0 (the "License");
26you may not use this file except in compliance with the License.
27You may obtain a copy of the License at
29 http://www.apache.org/licenses/LICENSE-2.0
31Unless required by applicable law or agreed to in writing, software
32distributed under the License is distributed on an "AS IS" BASIS,
33WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34See the License for the specific language governing permissions and
35limitations under the License.
36"""
38__all__ = ["WebSocketApp"]
40RECONNECT = 0
43def set_reconnect(reconnectInterval: int) -> None:
44 global RECONNECT
45 RECONNECT = reconnectInterval
48class WebSocketApp:
49 """
50 Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
51 """
53 def __init__(
54 self,
55 url: str,
56 header: Optional[
57 Union[
58 list[str],
59 dict[str, str],
60 Callable[[], Union[list[str], dict[str, str]]],
61 ]
62 ] = None,
63 on_open: Optional[Callable[["WebSocketApp"], None]] = None,
64 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None,
65 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None,
66 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None,
67 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None,
68 on_ping: Optional[Callable] = None,
69 on_pong: Optional[Callable] = None,
70 on_cont_message: Optional[Callable] = None,
71 keep_running: bool = True,
72 get_mask_key: Optional[Callable] = None,
73 cookie: Optional[str] = None,
74 subprotocols: Optional[list[str]] = None,
75 on_data: Optional[Callable] = None,
76 socket: Optional[socket.socket] = None,
77 ) -> None:
78 """
79 WebSocketApp initialization
81 Parameters
82 ----------
83 url: str
84 Websocket url.
85 header: list or dict or Callable
86 Custom header for websocket handshake.
87 If the parameter is a callable object, it is called just before the connection attempt.
88 The returned dict or list is used as custom header value.
89 This could be useful in order to properly setup timestamp dependent headers.
90 on_open: function
91 Callback object which is called at opening websocket.
92 on_open has one argument.
93 The 1st argument is this class object.
94 on_reconnect: function
95 Callback object which is called at reconnecting websocket.
96 on_reconnect has one argument.
97 The 1st argument is this class object.
98 on_message: function
99 Callback object which is called when received data.
100 on_message has 2 arguments.
101 The 1st argument is this class object.
102 The 2nd argument is utf-8 data received from the server.
103 on_error: function
104 Callback object which is called when we get error.
105 on_error has 2 arguments.
106 The 1st argument is this class object.
107 The 2nd argument is exception object.
108 on_close: function
109 Callback object which is called when connection is closed.
110 on_close has 3 arguments.
111 The 1st argument is this class object.
112 The 2nd argument is close_status_code.
113 The 3rd argument is close_msg.
114 on_cont_message: function
115 Callback object which is called when a continuation
116 frame is received.
117 on_cont_message has 3 arguments.
118 The 1st argument is this class object.
119 The 2nd argument is utf-8 string which we get from the server.
120 The 3rd argument is continue flag. if 0, the data continue
121 to next frame data
122 on_data: function
123 Callback object which is called when a message received.
124 This is called before on_message or on_cont_message,
125 and then on_message or on_cont_message is called.
126 on_data has 4 argument.
127 The 1st argument is this class object.
128 The 2nd argument is utf-8 string which we get from the server.
129 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
130 The 4th argument is continue flag. If 0, the data continue
131 keep_running: bool
132 This parameter is obsolete and ignored.
133 get_mask_key: function
134 A callable function to get new mask keys, see the
135 WebSocket.set_mask_key's docstring for more information.
136 cookie: str
137 Cookie value.
138 subprotocols: list
139 List of available sub protocols. Default is None.
140 socket: socket
141 Pre-initialized stream socket.
142 """
143 self.url = url
144 self.header = header if header is not None else []
145 self.cookie = cookie
147 self.on_open = on_open
148 self.on_reconnect = on_reconnect
149 self.on_message = on_message
150 self.on_data = on_data
151 self.on_error = on_error
152 self.on_close = on_close
153 self.on_ping = on_ping
154 self.on_pong = on_pong
155 self.on_cont_message = on_cont_message
156 self.keep_running = False
157 self.get_mask_key = get_mask_key
158 self.sock: Optional[WebSocket] = None
159 self.last_ping_tm = float(0)
160 self.last_pong_tm = float(0)
161 self.ping_thread: Optional[threading.Thread] = None
162 self.stop_ping: Optional[threading.Event] = None
163 self.ping_interval = float(0)
164 self.ping_timeout: Optional[Union[float, int]] = None
165 self.ping_payload = ""
166 self.subprotocols = subprotocols
167 self.prepared_socket = socket
168 self.has_errored = False
169 self.has_done_teardown = False
170 self.has_done_teardown_lock = threading.Lock()
171 self.last_close_frame: Optional[ABNF] = None
173 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None:
174 """
175 send message
177 Parameters
178 ----------
179 data: str
180 Message to send. If you set opcode to OPCODE_TEXT,
181 data must be utf-8 string or unicode.
182 opcode: int
183 Operation code of data. Default is OPCODE_TEXT.
184 """
186 if not self.sock or self.sock.send(data, opcode) == 0:
187 raise WebSocketConnectionClosedException("Connection is already closed.")
189 def send_text(self, text_data: str) -> None:
190 """
191 Sends UTF-8 encoded text.
192 """
193 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0:
194 raise WebSocketConnectionClosedException("Connection is already closed.")
196 def send_bytes(self, data: Union[bytes, bytearray]) -> None:
197 """
198 Sends a sequence of bytes.
199 """
200 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0:
201 raise WebSocketConnectionClosedException("Connection is already closed.")
203 def close(self, **kwargs: Any) -> None:
204 """
205 Close websocket connection.
206 """
207 self.keep_running = False
208 if self.sock:
209 self.sock.close(**kwargs)
210 # Capture the peer's close frame before clearing socket reference
211 if self.sock.close_frame is not None:
212 self.last_close_frame = self.sock.close_frame
213 self.sock = None
215 def _start_ping_thread(self) -> None:
216 self.last_ping_tm = self.last_pong_tm = float(0)
217 self.stop_ping = threading.Event()
218 self.ping_thread = threading.Thread(target=self._send_ping)
219 self.ping_thread.daemon = True
220 self.ping_thread.start()
222 def _stop_ping_thread(self) -> None:
223 if self.stop_ping:
224 self.stop_ping.set()
225 if self.ping_thread and self.ping_thread.is_alive():
226 self.ping_thread.join(3)
227 # Handle thread leak - if thread doesn't terminate within timeout,
228 # force cleanup and log warning instead of abandoning the thread
229 if self.ping_thread.is_alive():
230 warning(
231 "Ping thread failed to terminate within 3 seconds, "
232 "forcing cleanup. Thread may be blocked."
233 )
234 # Force cleanup by clearing references even if thread is still alive
235 # The daemon thread will eventually be cleaned up by Python's GC
236 # but we prevent resource leaks by not holding references
238 # Always clean up references regardless of thread state
239 self.ping_thread = None
240 self.stop_ping = None
241 self.last_ping_tm = self.last_pong_tm = float(0)
243 def _send_ping(self) -> None:
244 if self.stop_ping is None:
245 return
246 if self.keep_running is False:
247 return
248 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True:
249 if self.sock:
250 self.last_ping_tm = time.time()
251 try:
252 debug("Sending ping")
253 self.sock.ping(self.ping_payload)
254 except Exception as e:
255 debug(f"Failed to send ping: {e}")
257 def ready(self):
258 return self.sock and self.sock.connected
260 def run_forever(
261 self,
262 sockopt: Optional[list] = None,
263 sslopt: Optional[dict] = None,
264 ping_interval: Union[float, int] = 0,
265 ping_timeout: Optional[Union[float, int]] = None,
266 ping_payload: str = "",
267 http_proxy_host: Optional[str] = None,
268 http_proxy_port: Optional[Union[int, str]] = None,
269 http_no_proxy: Optional[list] = None,
270 http_proxy_auth: Optional[tuple] = None,
271 http_proxy_timeout: Optional[float] = None,
272 skip_utf8_validation: bool = False,
273 host: Optional[str] = None,
274 origin: Optional[str] = None,
275 dispatcher: Any = None,
276 suppress_origin: bool = False,
277 suppress_host: bool = False,
278 proxy_type: Optional[str] = None,
279 reconnect: Optional[int] = None,
280 ) -> bool:
281 """
282 Run event loop for WebSocket framework.
284 This loop is an infinite loop and is alive while websocket is available.
286 Parameters
287 ----------
288 sockopt: tuple
289 Values for socket.setsockopt.
290 sockopt must be tuple
291 and each element is argument of sock.setsockopt.
292 sslopt: dict
293 Optional dict object for ssl socket option.
294 ping_interval: int or float
295 Automatically send "ping" command
296 every specified period (in seconds).
297 If set to 0, no ping is sent periodically.
298 ping_timeout: int or float
299 Timeout (in seconds) if the pong message is not received.
300 ping_payload: str
301 Payload message to send with each ping.
302 http_proxy_host: str
303 HTTP proxy host name.
304 http_proxy_port: int or str
305 HTTP proxy port. If not set, set to 80.
306 http_no_proxy: list
307 Whitelisted host names that don't use the proxy.
308 http_proxy_timeout: int or float
309 HTTP proxy timeout, default is 60 sec as per python-socks.
310 http_proxy_auth: tuple
311 HTTP proxy auth information. tuple of username and password. Default is None.
312 skip_utf8_validation: bool
313 skip utf8 validation.
314 host: str
315 update host header.
316 origin: str
317 update origin header.
318 dispatcher: Dispatcher object
319 customize reading data from socket.
320 suppress_origin: bool
321 suppress outputting origin header.
322 suppress_host: bool
323 suppress outputting host header.
324 proxy_type: str
325 type of proxy from: http, socks4, socks4a, socks5, socks5h
326 reconnect: int
327 delay interval when reconnecting
329 Returns
330 -------
331 teardown: bool
332 False if the `WebSocketApp` is closed or caught KeyboardInterrupt,
333 True if any other exception was raised during a loop.
334 """
336 if reconnect is None:
337 reconnect = RECONNECT
339 if ping_timeout is not None and ping_timeout <= 0:
340 raise WebSocketException("Ensure ping_timeout > 0")
341 if ping_interval is not None and ping_interval < 0:
342 raise WebSocketException("Ensure ping_interval >= 0")
343 if ping_timeout and ping_interval and ping_interval <= ping_timeout:
344 raise WebSocketException("Ensure ping_interval > ping_timeout")
345 if not sockopt:
346 sockopt = []
347 if not sslopt:
348 sslopt = {}
349 if self.sock:
350 raise WebSocketException("socket is already opened")
352 self.ping_interval = ping_interval
353 self.ping_timeout = ping_timeout
354 self.ping_payload = ping_payload
355 self.has_done_teardown = False
356 self.keep_running = True
358 def teardown(close_frame: Optional[ABNF] = None) -> None:
359 """
360 Tears down the connection.
362 Parameters
363 ----------
364 close_frame: ABNF frame
365 If close_frame is set, the on_close handler is invoked
366 with the statusCode and reason from the provided frame.
367 """
369 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired.
370 # To ensure the work is only done once, we use this bool and lock.
371 with self.has_done_teardown_lock:
372 if self.has_done_teardown:
373 return
374 self.has_done_teardown = True
376 self._stop_ping_thread()
377 self.keep_running = False
379 if self.sock:
380 # in cases like handleDisconnect, the "on_error" callback is called first. If the WebSocketApp
381 # is being used in a multithreaded application, we nee to make sure that "self.sock" is cleared
382 # before calling close, otherwise logic built around the sock being set can cause issues -
383 # specifically calling "run_forever" again, since is checks if "self.sock" is set.
384 current_sock = self.sock
385 self.sock = None
386 current_sock.close()
388 # Use stored close frame as fallback if none provided (e.g., client-initiated close)
389 effective_close_frame = (
390 close_frame if close_frame else self.last_close_frame
391 )
392 close_status_code, close_reason = self._get_close_args(
393 effective_close_frame
394 )
395 # Finally call the callback AFTER all teardown is complete
396 self._callback(self.on_close, close_status_code, close_reason)
398 def initialize_socket(reconnecting: bool = False) -> None:
399 if reconnecting and self.sock:
400 self.sock.shutdown()
402 # Reset close frame to avoid stale data from previous connections
403 self.last_close_frame = None
405 self.sock = WebSocket(
406 self.get_mask_key,
407 sockopt=sockopt,
408 sslopt=sslopt,
409 fire_cont_frame=self.on_cont_message is not None,
410 skip_utf8_validation=skip_utf8_validation,
411 enable_multithread=True,
412 dispatcher=dispatcher,
413 )
415 self.sock.settimeout(getdefaulttimeout())
416 try:
417 header = self.header() if callable(self.header) else self.header
419 self.sock.connect(
420 self.url,
421 header=header,
422 cookie=self.cookie,
423 http_proxy_host=http_proxy_host,
424 http_proxy_port=http_proxy_port,
425 http_no_proxy=http_no_proxy,
426 http_proxy_auth=http_proxy_auth,
427 http_proxy_timeout=http_proxy_timeout,
428 subprotocols=self.subprotocols,
429 host=host,
430 origin=origin,
431 suppress_origin=suppress_origin,
432 suppress_host=suppress_host,
433 proxy_type=proxy_type,
434 socket=self.prepared_socket,
435 )
437 info("Websocket connected")
439 if self.ping_interval:
440 self._start_ping_thread()
442 if reconnecting and self.on_reconnect:
443 self._callback(self.on_reconnect)
444 else:
445 self._callback(self.on_open)
447 assert dispatcher is not None
448 dispatcher.read(self.sock.sock, read, check)
449 except (
450 WebSocketConnectionClosedException,
451 ConnectionRefusedError,
452 KeyboardInterrupt,
453 SystemExit,
454 Exception,
455 ) as e:
456 handleDisconnect(e, reconnecting)
458 def read() -> bool:
459 if not self.keep_running:
460 teardown()
461 return False
463 if self.sock is None:
464 return False
466 try:
467 op_code, frame = self.sock.recv_data_frame(True)
468 except (
469 WebSocketConnectionClosedException,
470 KeyboardInterrupt,
471 SSLEOFError,
472 ) as e:
473 if custom_dispatcher:
474 return closed(e)
475 else:
476 raise e
478 if op_code == ABNF.OPCODE_CLOSE:
479 return closed(frame)
480 elif op_code == ABNF.OPCODE_PING:
481 self._callback(self.on_ping, frame.data)
482 elif op_code == ABNF.OPCODE_PONG:
483 self.last_pong_tm = time.time()
484 self._callback(self.on_pong, frame.data)
485 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
486 self._callback(self.on_data, frame.data, frame.opcode, frame.fin)
487 self._callback(self.on_cont_message, frame.data, frame.fin)
488 else:
489 data = frame.data
490 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
491 data = data.decode("utf-8")
492 self._callback(self.on_data, data, frame.opcode, True)
493 self._callback(self.on_message, data)
495 return True
497 def check() -> bool:
498 if self.ping_timeout:
499 has_timeout_expired = (
500 time.time() - self.last_ping_tm > self.ping_timeout
501 )
502 has_pong_not_arrived_after_last_ping = (
503 self.last_pong_tm - self.last_ping_tm < 0
504 )
505 has_pong_arrived_too_late = (
506 self.last_pong_tm - self.last_ping_tm > self.ping_timeout
507 )
509 if (
510 self.last_ping_tm
511 and has_timeout_expired
512 and (
513 has_pong_not_arrived_after_last_ping
514 or has_pong_arrived_too_late
515 )
516 ):
517 raise WebSocketTimeoutException("ping/pong timed out")
518 return True
520 def closed(
521 e: Union[
522 WebSocketConnectionClosedException,
523 ConnectionRefusedError,
524 KeyboardInterrupt,
525 SystemExit,
526 Exception,
527 str,
528 "ABNF", # Now explicitly handle ABNF frame objects
529 ] = "closed unexpectedly",
530 ) -> bool:
531 close_frame: Optional[ABNF] = None
532 if type(e) is str:
533 e = WebSocketConnectionClosedException(e)
534 elif isinstance(e, ABNF) and e.opcode == ABNF.OPCODE_CLOSE:
535 close_frame = e
536 # Convert close frames to a descriptive exception for on_error callback
537 close_status_code, close_reason = self._parse_close_frame(e)
538 reason_parts: List[str] = []
539 if close_status_code is None:
540 message = "Connection closed"
541 elif close_status_code == 1000:
542 message = "Connection closed normally (code 1000)"
543 else:
544 message = f"Connection closed (code {close_status_code})"
545 if close_reason:
546 reason_parts.append(close_reason)
547 if reason_parts:
548 message = f"{message}: {'; '.join(reason_parts)}"
549 converted = WebSocketConnectionClosedException(message)
550 setattr(converted, "status_code", close_status_code)
551 setattr(converted, "reason", close_reason)
552 e = converted
553 return handleDisconnect(e, bool(reconnect), close_frame=close_frame) # type: ignore[arg-type]
555 def handleDisconnect(
556 e: Union[
557 WebSocketConnectionClosedException,
558 ConnectionRefusedError,
559 KeyboardInterrupt,
560 SystemExit,
561 Exception,
562 ],
563 reconnecting: bool = False,
564 close_frame: Optional[ABNF] = None,
565 ) -> bool:
566 self.has_errored = True
567 self._stop_ping_thread()
568 if not reconnecting:
569 self._callback(self.on_error, e)
571 if isinstance(e, (KeyboardInterrupt, SystemExit)):
572 teardown(close_frame)
573 # Propagate further
574 raise
576 if reconnect:
577 info(f"{e} - reconnect")
578 if custom_dispatcher:
579 debug(
580 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]"
581 )
582 assert dispatcher is not None
583 dispatcher.reconnect(reconnect, initialize_socket)
584 else:
585 error(f"{e} - goodbye")
586 teardown(close_frame)
587 return self.has_errored
589 custom_dispatcher = bool(dispatcher)
590 dispatcher = self.create_dispatcher(
591 ping_timeout, dispatcher, parse_url(self.url)[3], closed
592 )
594 try:
595 initialize_socket()
596 if not custom_dispatcher and reconnect:
597 while self.keep_running:
598 debug(
599 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]"
600 )
601 dispatcher.reconnect(reconnect, initialize_socket)
602 except (KeyboardInterrupt, Exception) as e:
603 info(f"tearing down on exception {e}")
604 teardown()
605 finally:
606 if not custom_dispatcher:
607 # Ensure teardown was called before returning from run_forever
608 teardown()
610 return self.has_errored
612 def create_dispatcher(
613 self,
614 ping_timeout: Optional[Union[float, int]],
615 dispatcher: Optional[DispatcherBase] = None,
616 is_ssl: bool = False,
617 handleDisconnect: Callable = None,
618 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]:
619 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
620 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect)
621 timeout = ping_timeout or 10
622 if is_ssl:
623 return SSLDispatcher(self, timeout)
624 return Dispatcher(self, timeout)
626 def _get_close_args(
627 self, close_frame: Optional[ABNF]
628 ) -> List[Optional[Union[int, str]]]:
629 """
630 _get_close_args extracts the close code and reason from the close body
631 if it exists (RFC6455 says WebSocket Connection Close Code is optional)
632 """
633 # Need to catch the case where close_frame is None
634 # Otherwise the following if statement causes an error
635 if not close_frame:
636 return [None, None]
637 close_status_code, reason = self._parse_close_frame(close_frame)
638 if not self.on_close:
639 return [None, None]
640 return [close_status_code, reason]
642 def _parse_close_frame(
643 self, close_frame: Optional[ABNF]
644 ) -> Tuple[Optional[int], Optional[str]]:
645 """
646 Parse a close frame into status code and UTF-8 reason text.
647 """
648 if not close_frame or not getattr(close_frame, "data", None):
649 return (None, None)
651 data = close_frame.data
652 if isinstance(data, bytes):
653 data_bytes = data
654 elif isinstance(data, str):
655 data_bytes = data.encode("utf-8")
656 else:
657 data_bytes = bytes(data)
659 if len(data_bytes) < 2:
660 return (None, None)
662 close_status_code = 256 * int(data_bytes[0]) + int(data_bytes[1])
663 reason_bytes = data_bytes[2:]
665 reason: Optional[str]
666 if not reason_bytes:
667 reason = None
668 else:
669 try:
670 reason = reason_bytes.decode("utf-8")
671 except UnicodeDecodeError:
672 reason = reason_bytes.decode("utf-8", errors="replace")
674 return (close_status_code, reason)
676 def _callback(self, callback: Optional[Callable], *args: Any) -> None:
677 if callback:
678 try:
679 callback(self, *args)
681 except Exception as e:
682 error(f"error from callback {callback}: {e}")
683 # Bug fix: Prevent infinite recursion by not calling on_error
684 # when the failing callback IS on_error itself
685 if self.on_error and callback is not self.on_error:
686 self.on_error(self, e)