Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_app.py: 15%
267 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:34 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:34 +0000
1import inspect
2import selectors
3import socket
4import sys
5import threading
6import time
7import traceback
9from typing import Any, Callable, Optional, Union
11from . import _logging
12from ._abnf import ABNF
13from ._url import parse_url
14from ._core import WebSocket, getdefaulttimeout
15from ._exceptions import *
17"""
18_app.py
19websocket - WebSocket client library for Python
21Copyright 2023 engn33r
23Licensed under the Apache License, Version 2.0 (the "License");
24you may not use this file except in compliance with the License.
25You may obtain a copy of the License at
27 http://www.apache.org/licenses/LICENSE-2.0
29Unless required by applicable law or agreed to in writing, software
30distributed under the License is distributed on an "AS IS" BASIS,
31WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
32See the License for the specific language governing permissions and
33limitations under the License.
34"""
36__all__ = ["WebSocketApp"]
38RECONNECT = 0
41def setReconnect(reconnectInterval: int) -> None:
42 global RECONNECT
43 RECONNECT = reconnectInterval
46class DispatcherBase:
47 """
48 DispatcherBase
49 """
50 def __init__(self, app: Any, ping_timeout: float) -> None:
51 self.app = app
52 self.ping_timeout = ping_timeout
54 def timeout(self, seconds: int, callback: Callable) -> None:
55 time.sleep(seconds)
56 callback()
58 def reconnect(self, seconds: int, reconnector: Callable) -> None:
59 try:
60 _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format(
61 seconds_count=seconds, frame_count=len(inspect.stack())))
62 time.sleep(seconds)
63 reconnector(reconnecting=True)
64 except KeyboardInterrupt as e:
65 _logging.info("User exited {err}".format(err=e))
66 raise e
69class Dispatcher(DispatcherBase):
70 """
71 Dispatcher
72 """
73 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
74 sel = selectors.DefaultSelector()
75 sel.register(self.app.sock.sock, selectors.EVENT_READ)
76 try:
77 while self.app.keep_running:
78 r = sel.select(self.ping_timeout)
79 if r:
80 if not read_callback():
81 break
82 check_callback()
83 finally:
84 sel.close()
87class SSLDispatcher(DispatcherBase):
88 """
89 SSLDispatcher
90 """
91 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
92 sock = self.app.sock.sock
93 sel = selectors.DefaultSelector()
94 sel.register(sock, selectors.EVENT_READ)
95 try:
96 while self.app.keep_running:
97 r = self.select(sock, sel)
98 if r:
99 if not read_callback():
100 break
101 check_callback()
102 finally:
103 sel.close()
105 def select(self, sock, sel:selectors.DefaultSelector):
106 sock = self.app.sock.sock
107 if sock.pending():
108 return [sock,]
110 r = sel.select(self.ping_timeout)
112 if len(r) > 0:
113 return r[0][0]
116class WrappedDispatcher:
117 """
118 WrappedDispatcher
119 """
120 def __init__(self, app, ping_timeout: float, dispatcher: Dispatcher) -> None:
121 self.app = app
122 self.ping_timeout = ping_timeout
123 self.dispatcher = dispatcher
124 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
126 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None:
127 self.dispatcher.read(sock, read_callback)
128 self.ping_timeout and self.timeout(self.ping_timeout, check_callback)
130 def timeout(self, seconds: int, callback: Callable) -> None:
131 self.dispatcher.timeout(seconds, callback)
133 def reconnect(self, seconds: int, reconnector: Callable) -> None:
134 self.timeout(seconds, reconnector)
137class WebSocketApp:
138 """
139 Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
140 """
142 def __init__(self, url: str, header: Union[list, dict, Callable] = None,
143 on_open: Callable = None, on_message: Callable = None, on_error: Callable = None,
144 on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None,
145 on_cont_message: Callable = None,
146 keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None,
147 subprotocols: list = None,
148 on_data: Callable = None,
149 socket: socket.socket = None) -> None:
150 """
151 WebSocketApp initialization
153 Parameters
154 ----------
155 url: str
156 Websocket url.
157 header: list or dict or Callable
158 Custom header for websocket handshake.
159 If the parameter is a callable object, it is called just before the connection attempt.
160 The returned dict or list is used as custom header value.
161 This could be useful in order to properly setup timestamp dependent headers.
162 on_open: function
163 Callback object which is called at opening websocket.
164 on_open has one argument.
165 The 1st argument is this class object.
166 on_message: function
167 Callback object which is called when received data.
168 on_message has 2 arguments.
169 The 1st argument is this class object.
170 The 2nd argument is utf-8 data received from the server.
171 on_error: function
172 Callback object which is called when we get error.
173 on_error has 2 arguments.
174 The 1st argument is this class object.
175 The 2nd argument is exception object.
176 on_close: function
177 Callback object which is called when connection is closed.
178 on_close has 3 arguments.
179 The 1st argument is this class object.
180 The 2nd argument is close_status_code.
181 The 3rd argument is close_msg.
182 on_cont_message: function
183 Callback object which is called when a continuation
184 frame is received.
185 on_cont_message has 3 arguments.
186 The 1st argument is this class object.
187 The 2nd argument is utf-8 string which we get from the server.
188 The 3rd argument is continue flag. if 0, the data continue
189 to next frame data
190 on_data: function
191 Callback object which is called when a message received.
192 This is called before on_message or on_cont_message,
193 and then on_message or on_cont_message is called.
194 on_data has 4 argument.
195 The 1st argument is this class object.
196 The 2nd argument is utf-8 string which we get from the server.
197 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
198 The 4th argument is continue flag. If 0, the data continue
199 keep_running: bool
200 This parameter is obsolete and ignored.
201 get_mask_key: function
202 A callable function to get new mask keys, see the
203 WebSocket.set_mask_key's docstring for more information.
204 cookie: str
205 Cookie value.
206 subprotocols: list
207 List of available sub protocols. Default is None.
208 socket: socket
209 Pre-initialized stream socket.
210 """
211 self.url = url
212 self.header = header if header is not None else []
213 self.cookie = cookie
215 self.on_open = on_open
216 self.on_message = on_message
217 self.on_data = on_data
218 self.on_error = on_error
219 self.on_close = on_close
220 self.on_ping = on_ping
221 self.on_pong = on_pong
222 self.on_cont_message = on_cont_message
223 self.keep_running = False
224 self.get_mask_key = get_mask_key
225 self.sock = None
226 self.last_ping_tm = 0
227 self.last_pong_tm = 0
228 self.ping_thread = None
229 self.stop_ping = None
230 self.ping_interval = 0
231 self.ping_timeout = None
232 self.ping_payload = ""
233 self.subprotocols = subprotocols
234 self.prepared_socket = socket
235 self.has_errored = False
236 self.has_done_teardown = False
237 self.has_done_teardown_lock = threading.Lock()
239 def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None:
240 """
241 send message
243 Parameters
244 ----------
245 data: str
246 Message to send. If you set opcode to OPCODE_TEXT,
247 data must be utf-8 string or unicode.
248 opcode: int
249 Operation code of data. Default is OPCODE_TEXT.
250 """
252 if not self.sock or self.sock.send(data, opcode) == 0:
253 raise WebSocketConnectionClosedException(
254 "Connection is already closed.")
256 def close(self, **kwargs) -> None:
257 """
258 Close websocket connection.
259 """
260 self.keep_running = False
261 if self.sock:
262 self.sock.close(**kwargs)
263 self.sock = None
265 def _start_ping_thread(self) -> None:
266 self.last_ping_tm = self.last_pong_tm = 0
267 self.stop_ping = threading.Event()
268 self.ping_thread = threading.Thread(target=self._send_ping)
269 self.ping_thread.daemon = True
270 self.ping_thread.start()
272 def _stop_ping_thread(self) -> None:
273 if self.stop_ping:
274 self.stop_ping.set()
275 if self.ping_thread and self.ping_thread.is_alive():
276 self.ping_thread.join(3)
277 self.last_ping_tm = self.last_pong_tm = 0
279 def _send_ping(self) -> None:
280 if self.stop_ping.wait(self.ping_interval) or self.keep_running is False:
281 return
282 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True:
283 if self.sock:
284 self.last_ping_tm = time.time()
285 try:
286 _logging.debug("Sending ping")
287 self.sock.ping(self.ping_payload)
288 except Exception as e:
289 _logging.debug("Failed to send ping: {err}".format(err=e))
291 def run_forever(self, sockopt: tuple = None, sslopt: dict = None,
292 ping_interval: float = 0, ping_timeout: Optional[float] = None,
293 ping_payload: str = "",
294 http_proxy_host: str = None, http_proxy_port: Union[int, str] = None,
295 http_no_proxy: list = None, http_proxy_auth: tuple = None,
296 http_proxy_timeout: float = None,
297 skip_utf8_validation: bool = False,
298 host: str = None, origin: str = None, dispatcher: Dispatcher = None,
299 suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool:
300 """
301 Run event loop for WebSocket framework.
303 This loop is an infinite loop and is alive while websocket is available.
305 Parameters
306 ----------
307 sockopt: tuple
308 Values for socket.setsockopt.
309 sockopt must be tuple
310 and each element is argument of sock.setsockopt.
311 sslopt: dict
312 Optional dict object for ssl socket option.
313 ping_interval: int or float
314 Automatically send "ping" command
315 every specified period (in seconds).
316 If set to 0, no ping is sent periodically.
317 ping_timeout: int or float
318 Timeout (in seconds) if the pong message is not received.
319 ping_payload: str
320 Payload message to send with each ping.
321 http_proxy_host: str
322 HTTP proxy host name.
323 http_proxy_port: int or str
324 HTTP proxy port. If not set, set to 80.
325 http_no_proxy: list
326 Whitelisted host names that don't use the proxy.
327 http_proxy_timeout: int or float
328 HTTP proxy timeout, default is 60 sec as per python-socks.
329 http_proxy_auth: tuple
330 HTTP proxy auth information. tuple of username and password. Default is None.
331 skip_utf8_validation: bool
332 skip utf8 validation.
333 host: str
334 update host header.
335 origin: str
336 update origin header.
337 dispatcher: Dispatcher object
338 customize reading data from socket.
339 suppress_origin: bool
340 suppress outputting origin header.
341 proxy_type: str
342 type of proxy from: http, socks4, socks4a, socks5, socks5h
343 reconnect: int
344 delay interval when reconnecting
346 Returns
347 -------
348 teardown: bool
349 False if the `WebSocketApp` is closed or caught KeyboardInterrupt,
350 True if any other exception was raised during a loop.
351 """
353 if reconnect is None:
354 reconnect = RECONNECT
356 if ping_timeout is not None and ping_timeout <= 0:
357 raise WebSocketException("Ensure ping_timeout > 0")
358 if ping_interval is not None and ping_interval < 0:
359 raise WebSocketException("Ensure ping_interval >= 0")
360 if ping_timeout and ping_interval and ping_interval <= ping_timeout:
361 raise WebSocketException("Ensure ping_interval > ping_timeout")
362 if not sockopt:
363 sockopt = []
364 if not sslopt:
365 sslopt = {}
366 if self.sock:
367 raise WebSocketException("socket is already opened")
369 self.ping_interval = ping_interval
370 self.ping_timeout = ping_timeout
371 self.ping_payload = ping_payload
372 self.keep_running = True
374 def teardown(close_frame: ABNF = None):
375 """
376 Tears down the connection.
378 Parameters
379 ----------
380 close_frame: ABNF frame
381 If close_frame is set, the on_close handler is invoked
382 with the statusCode and reason from the provided frame.
383 """
385 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired.
386 # To ensure the work is only done once, we use this bool and lock.
387 with self.has_done_teardown_lock:
388 if self.has_done_teardown:
389 return
390 self.has_done_teardown = True
392 self._stop_ping_thread()
393 self.keep_running = False
394 if self.sock:
395 self.sock.close()
396 close_status_code, close_reason = self._get_close_args(
397 close_frame if close_frame else None)
398 self.sock = None
400 # Finally call the callback AFTER all teardown is complete
401 self._callback(self.on_close, close_status_code, close_reason)
403 def setSock(reconnecting: bool = False) -> None:
404 if reconnecting and self.sock:
405 self.sock.shutdown()
407 self.sock = WebSocket(
408 self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
409 fire_cont_frame=self.on_cont_message is not None,
410 skip_utf8_validation=skip_utf8_validation,
411 enable_multithread=True)
413 self.sock.settimeout(getdefaulttimeout())
414 try:
416 header = self.header() if callable(self.header) else self.header
418 self.sock.connect(
419 self.url, header=header, cookie=self.cookie,
420 http_proxy_host=http_proxy_host,
421 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
422 http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout,
423 subprotocols=self.subprotocols,
424 host=host, origin=origin, suppress_origin=suppress_origin,
425 proxy_type=proxy_type, socket=self.prepared_socket)
427 _logging.info("Websocket connected")
429 if self.ping_interval:
430 self._start_ping_thread()
432 self._callback(self.on_open)
434 dispatcher.read(self.sock.sock, read, check)
435 except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e:
436 handleDisconnect(e, reconnecting)
438 def read() -> bool:
439 if not self.keep_running:
440 return teardown()
442 try:
443 op_code, frame = self.sock.recv_data_frame(True)
444 except (WebSocketConnectionClosedException, KeyboardInterrupt) as e:
445 if custom_dispatcher:
446 return handleDisconnect(e)
447 else:
448 raise e
450 if op_code == ABNF.OPCODE_CLOSE:
451 return teardown(frame)
452 elif op_code == ABNF.OPCODE_PING:
453 self._callback(self.on_ping, frame.data)
454 elif op_code == ABNF.OPCODE_PONG:
455 self.last_pong_tm = time.time()
456 self._callback(self.on_pong, frame.data)
457 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
458 self._callback(self.on_data, frame.data,
459 frame.opcode, frame.fin)
460 self._callback(self.on_cont_message,
461 frame.data, frame.fin)
462 else:
463 data = frame.data
464 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
465 data = data.decode("utf-8")
466 self._callback(self.on_data, data, frame.opcode, True)
467 self._callback(self.on_message, data)
469 return True
471 def check() -> bool:
472 if (self.ping_timeout):
473 has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout
474 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
475 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout
477 if (self.last_ping_tm and
478 has_timeout_expired and
479 (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
480 raise WebSocketTimeoutException("ping/pong timed out")
481 return True
483 def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool:
484 self.has_errored = True
485 self._stop_ping_thread()
486 if not reconnecting:
487 self._callback(self.on_error, e)
489 if isinstance(e, (KeyboardInterrupt, SystemExit)):
490 teardown()
491 # Propagate further
492 raise
494 if reconnect:
495 _logging.info("{err} - reconnect".format(err=e))
496 if custom_dispatcher:
497 _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
498 dispatcher.reconnect(reconnect, setSock)
499 else:
500 _logging.error("{err} - goodbye".format(err=e))
501 teardown()
503 custom_dispatcher = bool(dispatcher)
504 dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3])
506 try:
507 setSock()
508 if not custom_dispatcher and reconnect:
509 while self.keep_running:
510 _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
511 dispatcher.reconnect(reconnect, setSock)
512 except (KeyboardInterrupt, Exception) as e:
513 _logging.info("tearing down on exception {err}".format(err=e))
514 teardown()
515 finally:
516 if not custom_dispatcher:
517 # Ensure teardown was called before returning from run_forever
518 teardown()
520 return self.has_errored
522 def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase:
523 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
524 return WrappedDispatcher(self, ping_timeout, dispatcher)
525 timeout = ping_timeout or 10
526 if is_ssl:
527 return SSLDispatcher(self, timeout)
529 return Dispatcher(self, timeout)
531 def _get_close_args(self, close_frame: ABNF) -> list:
532 """
533 _get_close_args extracts the close code and reason from the close body
534 if it exists (RFC6455 says WebSocket Connection Close Code is optional)
535 """
536 # Need to catch the case where close_frame is None
537 # Otherwise the following if statement causes an error
538 if not self.on_close or not close_frame:
539 return [None, None]
541 # Extract close frame status code
542 if close_frame.data and len(close_frame.data) >= 2:
543 close_status_code = 256 * close_frame.data[0] + close_frame.data[1]
544 reason = close_frame.data[2:].decode('utf-8')
545 return [close_status_code, reason]
546 else:
547 # Most likely reached this because len(close_frame_data.data) < 2
548 return [None, None]
550 def _callback(self, callback, *args) -> None:
551 if callback:
552 try:
553 callback(self, *args)
555 except Exception as e:
556 _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e))
557 if self.on_error:
558 self.on_error(self, e)