Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import inspect
2import selectors
3import socket
4import threading
5import time
6from typing import Any, Callable, Optional, Union
8from . import _logging
9from ._abnf import ABNF
10from ._core import WebSocket, getdefaulttimeout
11from ._exceptions import (
12 WebSocketConnectionClosedException,
13 WebSocketException,
14 WebSocketTimeoutException,
15)
16from ._ssl_compat import SSLEOFError
17from ._url import parse_url
18from ._dispatcher import *
20"""
21_app.py
22websocket - WebSocket client library for Python
24Copyright 2024 engn33r
26Licensed under the Apache License, Version 2.0 (the "License");
27you may not use this file except in compliance with the License.
28You may obtain a copy of the License at
30 http://www.apache.org/licenses/LICENSE-2.0
32Unless required by applicable law or agreed to in writing, software
33distributed under the License is distributed on an "AS IS" BASIS,
34WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
35See the License for the specific language governing permissions and
36limitations under the License.
37"""
39__all__ = ["WebSocketApp"]
41RECONNECT = 0
44def setReconnect(reconnectInterval: int) -> None:
45 global RECONNECT
46 RECONNECT = reconnectInterval
49class WebSocketApp:
50 """
51 Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
52 """
54 def __init__(
55 self,
56 url: str,
57 header: Union[list, dict, Callable, None] = None,
58 on_open: Optional[Callable[["WebSocketApp"], None]] = None,
59 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None,
60 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None,
61 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None,
62 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None,
63 on_ping: Optional[Callable] = None,
64 on_pong: Optional[Callable] = None,
65 on_cont_message: Optional[Callable] = None,
66 keep_running: bool = True,
67 get_mask_key: Optional[Callable] = None,
68 cookie: Optional[str] = None,
69 subprotocols: Optional[list] = None,
70 on_data: Optional[Callable] = None,
71 socket: Optional[socket.socket] = None,
72 ) -> None:
73 """
74 WebSocketApp initialization
76 Parameters
77 ----------
78 url: str
79 Websocket url.
80 header: list or dict or Callable
81 Custom header for websocket handshake.
82 If the parameter is a callable object, it is called just before the connection attempt.
83 The returned dict or list is used as custom header value.
84 This could be useful in order to properly setup timestamp dependent headers.
85 on_open: function
86 Callback object which is called at opening websocket.
87 on_open has one argument.
88 The 1st argument is this class object.
89 on_reconnect: function
90 Callback object which is called at reconnecting websocket.
91 on_reconnect has one argument.
92 The 1st argument is this class object.
93 on_message: function
94 Callback object which is called when received data.
95 on_message has 2 arguments.
96 The 1st argument is this class object.
97 The 2nd argument is utf-8 data received from the server.
98 on_error: function
99 Callback object which is called when we get error.
100 on_error has 2 arguments.
101 The 1st argument is this class object.
102 The 2nd argument is exception object.
103 on_close: function
104 Callback object which is called when connection is closed.
105 on_close has 3 arguments.
106 The 1st argument is this class object.
107 The 2nd argument is close_status_code.
108 The 3rd argument is close_msg.
109 on_cont_message: function
110 Callback object which is called when a continuation
111 frame is received.
112 on_cont_message has 3 arguments.
113 The 1st argument is this class object.
114 The 2nd argument is utf-8 string which we get from the server.
115 The 3rd argument is continue flag. if 0, the data continue
116 to next frame data
117 on_data: function
118 Callback object which is called when a message received.
119 This is called before on_message or on_cont_message,
120 and then on_message or on_cont_message is called.
121 on_data has 4 argument.
122 The 1st argument is this class object.
123 The 2nd argument is utf-8 string which we get from the server.
124 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
125 The 4th argument is continue flag. If 0, the data continue
126 keep_running: bool
127 This parameter is obsolete and ignored.
128 get_mask_key: function
129 A callable function to get new mask keys, see the
130 WebSocket.set_mask_key's docstring for more information.
131 cookie: str
132 Cookie value.
133 subprotocols: list
134 List of available sub protocols. Default is None.
135 socket: socket
136 Pre-initialized stream socket.
137 """
138 self.url = url
139 self.header = header if header is not None else []
140 self.cookie = cookie
142 self.on_open = on_open
143 self.on_reconnect = on_reconnect
144 self.on_message = on_message
145 self.on_data = on_data
146 self.on_error = on_error
147 self.on_close = on_close
148 self.on_ping = on_ping
149 self.on_pong = on_pong
150 self.on_cont_message = on_cont_message
151 self.keep_running = False
152 self.get_mask_key = get_mask_key
153 self.sock: Optional[WebSocket] = None
154 self.last_ping_tm = float(0)
155 self.last_pong_tm = float(0)
156 self.ping_thread: Optional[threading.Thread] = None
157 self.stop_ping: Optional[threading.Event] = None
158 self.ping_interval = float(0)
159 self.ping_timeout: Union[float, int, None] = None
160 self.ping_payload = ""
161 self.subprotocols = subprotocols
162 self.prepared_socket = socket
163 self.has_errored = False
164 self.has_done_teardown = False
165 self.has_done_teardown_lock = threading.Lock()
167 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None:
168 """
169 send message
171 Parameters
172 ----------
173 data: str
174 Message to send. If you set opcode to OPCODE_TEXT,
175 data must be utf-8 string or unicode.
176 opcode: int
177 Operation code of data. Default is OPCODE_TEXT.
178 """
180 if not self.sock or self.sock.send(data, opcode) == 0:
181 raise WebSocketConnectionClosedException("Connection is already closed.")
183 def send_text(self, text_data: str) -> None:
184 """
185 Sends UTF-8 encoded text.
186 """
187 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0:
188 raise WebSocketConnectionClosedException("Connection is already closed.")
190 def send_bytes(self, data: Union[bytes, bytearray]) -> None:
191 """
192 Sends a sequence of bytes.
193 """
194 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0:
195 raise WebSocketConnectionClosedException("Connection is already closed.")
197 def close(self, **kwargs) -> None:
198 """
199 Close websocket connection.
200 """
201 self.keep_running = False
202 if self.sock:
203 self.sock.close(**kwargs)
204 self.sock = None
206 def _start_ping_thread(self) -> None:
207 self.last_ping_tm = self.last_pong_tm = float(0)
208 self.stop_ping = threading.Event()
209 self.ping_thread = threading.Thread(target=self._send_ping)
210 self.ping_thread.daemon = True
211 self.ping_thread.start()
213 def _stop_ping_thread(self) -> None:
214 if self.stop_ping:
215 self.stop_ping.set()
216 if self.ping_thread and self.ping_thread.is_alive():
217 self.ping_thread.join(3)
218 self.last_ping_tm = self.last_pong_tm = float(0)
220 def _send_ping(self) -> None:
221 if self.stop_ping.wait(self.ping_interval) or self.keep_running is False:
222 return
223 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True:
224 if self.sock:
225 self.last_ping_tm = time.time()
226 try:
227 _logging.debug("Sending ping")
228 self.sock.ping(self.ping_payload)
229 except Exception as e:
230 _logging.debug(f"Failed to send ping: {e}")
232 def ready(self):
233 return self.sock and self.sock.connected
235 def run_forever(
236 self,
237 sockopt: tuple = None,
238 sslopt: dict = None,
239 ping_interval: Union[float, int] = 0,
240 ping_timeout: Union[float, int, None] = None,
241 ping_payload: str = "",
242 http_proxy_host: str = None,
243 http_proxy_port: Union[int, str] = None,
244 http_no_proxy: list = None,
245 http_proxy_auth: tuple = None,
246 http_proxy_timeout: Optional[float] = None,
247 skip_utf8_validation: bool = False,
248 host: str = None,
249 origin: str = None,
250 dispatcher=None,
251 suppress_origin: bool = False,
252 proxy_type: str = None,
253 reconnect: int = None,
254 ) -> bool:
255 """
256 Run event loop for WebSocket framework.
258 This loop is an infinite loop and is alive while websocket is available.
260 Parameters
261 ----------
262 sockopt: tuple
263 Values for socket.setsockopt.
264 sockopt must be tuple
265 and each element is argument of sock.setsockopt.
266 sslopt: dict
267 Optional dict object for ssl socket option.
268 ping_interval: int or float
269 Automatically send "ping" command
270 every specified period (in seconds).
271 If set to 0, no ping is sent periodically.
272 ping_timeout: int or float
273 Timeout (in seconds) if the pong message is not received.
274 ping_payload: str
275 Payload message to send with each ping.
276 http_proxy_host: str
277 HTTP proxy host name.
278 http_proxy_port: int or str
279 HTTP proxy port. If not set, set to 80.
280 http_no_proxy: list
281 Whitelisted host names that don't use the proxy.
282 http_proxy_timeout: int or float
283 HTTP proxy timeout, default is 60 sec as per python-socks.
284 http_proxy_auth: tuple
285 HTTP proxy auth information. tuple of username and password. Default is None.
286 skip_utf8_validation: bool
287 skip utf8 validation.
288 host: str
289 update host header.
290 origin: str
291 update origin header.
292 dispatcher: Dispatcher object
293 customize reading data from socket.
294 suppress_origin: bool
295 suppress outputting origin header.
296 proxy_type: str
297 type of proxy from: http, socks4, socks4a, socks5, socks5h
298 reconnect: int
299 delay interval when reconnecting
301 Returns
302 -------
303 teardown: bool
304 False if the `WebSocketApp` is closed or caught KeyboardInterrupt,
305 True if any other exception was raised during a loop.
306 """
308 if reconnect is None:
309 reconnect = RECONNECT
311 if ping_timeout is not None and ping_timeout <= 0:
312 raise WebSocketException("Ensure ping_timeout > 0")
313 if ping_interval is not None and ping_interval < 0:
314 raise WebSocketException("Ensure ping_interval >= 0")
315 if ping_timeout and ping_interval and ping_interval <= ping_timeout:
316 raise WebSocketException("Ensure ping_interval > ping_timeout")
317 if not sockopt:
318 sockopt = ()
319 if not sslopt:
320 sslopt = {}
321 if self.sock:
322 raise WebSocketException("socket is already opened")
324 self.ping_interval = ping_interval
325 self.ping_timeout = ping_timeout
326 self.ping_payload = ping_payload
327 self.has_done_teardown = False
328 self.keep_running = True
330 def teardown(close_frame: ABNF = None):
331 """
332 Tears down the connection.
334 Parameters
335 ----------
336 close_frame: ABNF frame
337 If close_frame is set, the on_close handler is invoked
338 with the statusCode and reason from the provided frame.
339 """
341 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired.
342 # To ensure the work is only done once, we use this bool and lock.
343 with self.has_done_teardown_lock:
344 if self.has_done_teardown:
345 return
346 self.has_done_teardown = True
348 self._stop_ping_thread()
349 self.keep_running = False
350 if self.sock:
351 self.sock.close()
352 close_status_code, close_reason = self._get_close_args(
353 close_frame if close_frame else None
354 )
355 self.sock = None
357 # Finally call the callback AFTER all teardown is complete
358 self._callback(self.on_close, close_status_code, close_reason)
360 def setSock(reconnecting: bool = False) -> None:
361 if reconnecting and self.sock:
362 self.sock.shutdown()
364 self.sock = WebSocket(
365 self.get_mask_key,
366 sockopt=sockopt,
367 sslopt=sslopt,
368 fire_cont_frame=self.on_cont_message is not None,
369 skip_utf8_validation=skip_utf8_validation,
370 enable_multithread=True,
371 dispatcher=dispatcher,
372 )
374 self.sock.settimeout(getdefaulttimeout())
375 try:
376 header = self.header() if callable(self.header) else self.header
378 self.sock.connect(
379 self.url,
380 header=header,
381 cookie=self.cookie,
382 http_proxy_host=http_proxy_host,
383 http_proxy_port=http_proxy_port,
384 http_no_proxy=http_no_proxy,
385 http_proxy_auth=http_proxy_auth,
386 http_proxy_timeout=http_proxy_timeout,
387 subprotocols=self.subprotocols,
388 host=host,
389 origin=origin,
390 suppress_origin=suppress_origin,
391 proxy_type=proxy_type,
392 socket=self.prepared_socket,
393 )
395 _logging.info("Websocket connected")
397 if self.ping_interval:
398 self._start_ping_thread()
400 if reconnecting and self.on_reconnect:
401 self._callback(self.on_reconnect)
402 else:
403 self._callback(self.on_open)
405 dispatcher.read(self.sock.sock, read, check)
406 except (
407 WebSocketConnectionClosedException,
408 ConnectionRefusedError,
409 KeyboardInterrupt,
410 SystemExit,
411 Exception,
412 ) as e:
413 handleDisconnect(e, reconnecting)
415 def read() -> bool:
416 if not self.keep_running:
417 return teardown()
419 try:
420 op_code, frame = self.sock.recv_data_frame(True)
421 except (
422 WebSocketConnectionClosedException,
423 KeyboardInterrupt,
424 SSLEOFError,
425 ) as e:
426 if custom_dispatcher:
427 return closed(e)
428 else:
429 raise e
431 if op_code == ABNF.OPCODE_CLOSE:
432 return closed(frame)
433 elif op_code == ABNF.OPCODE_PING:
434 self._callback(self.on_ping, frame.data)
435 elif op_code == ABNF.OPCODE_PONG:
436 self.last_pong_tm = time.time()
437 self._callback(self.on_pong, frame.data)
438 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
439 self._callback(self.on_data, frame.data, frame.opcode, frame.fin)
440 self._callback(self.on_cont_message, frame.data, frame.fin)
441 else:
442 data = frame.data
443 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
444 data = data.decode("utf-8")
445 self._callback(self.on_data, data, frame.opcode, True)
446 self._callback(self.on_message, data)
448 return True
450 def check() -> bool:
451 if self.ping_timeout:
452 has_timeout_expired = (
453 time.time() - self.last_ping_tm > self.ping_timeout
454 )
455 has_pong_not_arrived_after_last_ping = (
456 self.last_pong_tm - self.last_ping_tm < 0
457 )
458 has_pong_arrived_too_late = (
459 self.last_pong_tm - self.last_ping_tm > self.ping_timeout
460 )
462 if (
463 self.last_ping_tm
464 and has_timeout_expired
465 and (
466 has_pong_not_arrived_after_last_ping
467 or has_pong_arrived_too_late
468 )
469 ):
470 raise WebSocketTimeoutException("ping/pong timed out")
471 return True
473 def closed(
474 e: Union[
475 WebSocketConnectionClosedException,
476 ConnectionRefusedError,
477 KeyboardInterrupt,
478 SystemExit,
479 Exception,
480 str,
481 ] = "closed unexpectedly"
482 ) -> bool:
483 if type(e) is str:
484 e = WebSocketConnectionClosedException(e)
485 return handleDisconnect(e, bool(reconnect))
487 def handleDisconnect(
488 e: Union[
489 WebSocketConnectionClosedException,
490 ConnectionRefusedError,
491 KeyboardInterrupt,
492 SystemExit,
493 Exception,
494 ],
495 reconnecting: bool = False,
496 ) -> bool:
497 self.has_errored = True
498 self._stop_ping_thread()
499 if not reconnecting:
500 self._callback(self.on_error, e)
502 if isinstance(e, (KeyboardInterrupt, SystemExit)):
503 teardown()
504 # Propagate further
505 raise
507 if reconnect:
508 _logging.info(f"{e} - reconnect")
509 if custom_dispatcher:
510 _logging.debug(
511 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]"
512 )
513 dispatcher.reconnect(reconnect, setSock)
514 else:
515 _logging.error(f"{e} - goodbye")
516 teardown()
518 custom_dispatcher = bool(dispatcher)
519 dispatcher = self.create_dispatcher(
520 ping_timeout, dispatcher, parse_url(self.url)[3], closed
521 )
523 try:
524 setSock()
525 if not custom_dispatcher and reconnect:
526 while self.keep_running:
527 _logging.debug(
528 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]"
529 )
530 dispatcher.reconnect(reconnect, setSock)
531 except (KeyboardInterrupt, Exception) as e:
532 _logging.info(f"tearing down on exception {e}")
533 teardown()
534 finally:
535 if not custom_dispatcher:
536 # Ensure teardown was called before returning from run_forever
537 teardown()
539 return self.has_errored
541 def create_dispatcher(
542 self,
543 ping_timeout: Union[float, int, None],
544 dispatcher: Optional[DispatcherBase] = None,
545 is_ssl: bool = False,
546 handleDisconnect: Callable = None,
547 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]:
548 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
549 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect)
550 timeout = ping_timeout or 10
551 if is_ssl:
552 return SSLDispatcher(self, timeout)
553 return Dispatcher(self, timeout)
555 def _get_close_args(self, close_frame: ABNF) -> list:
556 """
557 _get_close_args extracts the close code and reason from the close body
558 if it exists (RFC6455 says WebSocket Connection Close Code is optional)
559 """
560 # Need to catch the case where close_frame is None
561 # Otherwise the following if statement causes an error
562 if not self.on_close or not close_frame:
563 return [None, None]
565 # Extract close frame status code
566 if close_frame.data and len(close_frame.data) >= 2:
567 close_status_code = 256 * int(close_frame.data[0]) + int(
568 close_frame.data[1]
569 )
570 reason = close_frame.data[2:]
571 if isinstance(reason, bytes):
572 reason = reason.decode("utf-8")
573 return [close_status_code, reason]
574 else:
575 # Most likely reached this because len(close_frame_data.data) < 2
576 return [None, None]
578 def _callback(self, callback, *args) -> None:
579 if callback:
580 try:
581 callback(self, *args)
583 except Exception as e:
584 _logging.error(f"error from callback {callback}: {e}")
585 if self.on_error:
586 self.on_error(self, e)