Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_app.py: 16%
250 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import inspect
2import selectors
3import sys
4import threading
5import time
6import traceback
7import socket
9from typing import Callable, Any
11from . import _logging
12from ._abnf import ABNF
13from ._url import parse_url
14from ._core import WebSocket, getdefaulttimeout
15from ._exceptions import *
17"""
18_app.py
19websocket - WebSocket client library for Python
21Copyright 2022 engn33r
23Licensed under the Apache License, Version 2.0 (the "License");
24you may not use this file except in compliance with the License.
25You may obtain a copy of the License at
27 http://www.apache.org/licenses/LICENSE-2.0
29Unless required by applicable law or agreed to in writing, software
30distributed under the License is distributed on an "AS IS" BASIS,
31WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
32See the License for the specific language governing permissions and
33limitations under the License.
34"""
36__all__ = ["WebSocketApp"]
38RECONNECT = 0
41def setReconnect(reconnectInterval: int) -> None:
42 global RECONNECT
43 RECONNECT = reconnectInterval
46class DispatcherBase:
47 """
48 DispatcherBase
49 """
50 def __init__(self, app: Any, ping_timeout: int or float) -> None:
51 self.app = app
52 self.ping_timeout = ping_timeout
54 def timeout(self, seconds: int, callback: Callable) -> None:
55 time.sleep(seconds)
56 callback()
58 def reconnect(self, seconds: int, reconnector: Callable) -> None:
59 try:
60 _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format(
61 seconds_count=seconds, frame_count=len(inspect.stack())))
62 time.sleep(seconds)
63 reconnector(reconnecting=True)
64 except KeyboardInterrupt as e:
65 _logging.info("User exited {err}".format(err=e))
68class Dispatcher(DispatcherBase):
69 """
70 Dispatcher
71 """
72 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None:
73 while self.app.keep_running:
74 sel = selectors.DefaultSelector()
75 sel.register(self.app.sock.sock, selectors.EVENT_READ)
77 r = sel.select(self.ping_timeout)
78 if r:
79 if not read_callback():
80 break
81 check_callback()
82 sel.close()
85class SSLDispatcher(DispatcherBase):
86 """
87 SSLDispatcher
88 """
89 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None:
90 while self.app.keep_running:
91 r = self.select()
92 if r:
93 if not read_callback():
94 break
95 check_callback()
97 def select(self) -> list:
98 sock = self.app.sock.sock
99 if sock.pending():
100 return [sock,]
102 sel = selectors.DefaultSelector()
103 sel.register(sock, selectors.EVENT_READ)
105 r = sel.select(self.ping_timeout)
106 sel.close()
108 if len(r) > 0:
109 return r[0][0]
112class WrappedDispatcher:
113 """
114 WrappedDispatcher
115 """
116 def __init__(self, app, ping_timeout: int or float, dispatcher: Dispatcher) -> None:
117 self.app = app
118 self.ping_timeout = ping_timeout
119 self.dispatcher = dispatcher
120 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
122 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None:
123 self.dispatcher.read(sock, read_callback)
124 self.ping_timeout and self.timeout(self.ping_timeout, check_callback)
126 def timeout(self, seconds: int, callback: Callable) -> None:
127 self.dispatcher.timeout(seconds, callback)
129 def reconnect(self, seconds: int, reconnector: Callable) -> None:
130 self.timeout(seconds, reconnector)
133class WebSocketApp:
134 """
135 Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
136 """
138 def __init__(self, url: str, header: list or dict = None,
139 on_open: Callable = None, on_message: Callable = None, on_error: Callable = None,
140 on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None,
141 on_cont_message: Callable = None,
142 keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None,
143 subprotocols: list = None,
144 on_data: Callable = None,
145 socket: socket = None) -> None:
146 """
147 WebSocketApp initialization
149 Parameters
150 ----------
151 url: str
152 Websocket url.
153 header: list or dict
154 Custom header for websocket handshake.
155 on_open: function
156 Callback object which is called at opening websocket.
157 on_open has one argument.
158 The 1st argument is this class object.
159 on_message: function
160 Callback object which is called when received data.
161 on_message has 2 arguments.
162 The 1st argument is this class object.
163 The 2nd argument is utf-8 data received from the server.
164 on_error: function
165 Callback object which is called when we get error.
166 on_error has 2 arguments.
167 The 1st argument is this class object.
168 The 2nd argument is exception object.
169 on_close: function
170 Callback object which is called when connection is closed.
171 on_close has 3 arguments.
172 The 1st argument is this class object.
173 The 2nd argument is close_status_code.
174 The 3rd argument is close_msg.
175 on_cont_message: function
176 Callback object which is called when a continuation
177 frame is received.
178 on_cont_message has 3 arguments.
179 The 1st argument is this class object.
180 The 2nd argument is utf-8 string which we get from the server.
181 The 3rd argument is continue flag. if 0, the data continue
182 to next frame data
183 on_data: function
184 Callback object which is called when a message received.
185 This is called before on_message or on_cont_message,
186 and then on_message or on_cont_message is called.
187 on_data has 4 argument.
188 The 1st argument is this class object.
189 The 2nd argument is utf-8 string which we get from the server.
190 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
191 The 4th argument is continue flag. If 0, the data continue
192 keep_running: bool
193 This parameter is obsolete and ignored.
194 get_mask_key: function
195 A callable function to get new mask keys, see the
196 WebSocket.set_mask_key's docstring for more information.
197 cookie: str
198 Cookie value.
199 subprotocols: list
200 List of available sub protocols. Default is None.
201 socket: socket
202 Pre-initialized stream socket.
203 """
204 self.url = url
205 self.header = header if header is not None else []
206 self.cookie = cookie
208 self.on_open = on_open
209 self.on_message = on_message
210 self.on_data = on_data
211 self.on_error = on_error
212 self.on_close = on_close
213 self.on_ping = on_ping
214 self.on_pong = on_pong
215 self.on_cont_message = on_cont_message
216 self.keep_running = False
217 self.get_mask_key = get_mask_key
218 self.sock = None
219 self.last_ping_tm = 0
220 self.last_pong_tm = 0
221 self.ping_thread = None
222 self.stop_ping = None
223 self.ping_interval = 0
224 self.ping_timeout = None
225 self.ping_payload = ""
226 self.subprotocols = subprotocols
227 self.prepared_socket = socket
228 self.has_errored = False
230 def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None:
231 """
232 send message
234 Parameters
235 ----------
236 data: str
237 Message to send. If you set opcode to OPCODE_TEXT,
238 data must be utf-8 string or unicode.
239 opcode: int
240 Operation code of data. Default is OPCODE_TEXT.
241 """
243 if not self.sock or self.sock.send(data, opcode) == 0:
244 raise WebSocketConnectionClosedException(
245 "Connection is already closed.")
247 def close(self, **kwargs) -> None:
248 """
249 Close websocket connection.
250 """
251 self.keep_running = False
252 if self.sock:
253 self.sock.close(**kwargs)
254 self.sock = None
256 def _start_ping_thread(self) -> None:
257 self.last_ping_tm = self.last_pong_tm = 0
258 self.stop_ping = threading.Event()
259 self.ping_thread = threading.Thread(target=self._send_ping)
260 self.ping_thread.daemon = True
261 self.ping_thread.start()
263 def _stop_ping_thread(self) -> None:
264 if self.stop_ping:
265 self.stop_ping.set()
266 if self.ping_thread and self.ping_thread.is_alive():
267 self.ping_thread.join(3)
268 self.last_ping_tm = self.last_pong_tm = 0
270 def _send_ping(self) -> None:
271 if self.stop_ping.wait(self.ping_interval):
272 return
273 while not self.stop_ping.wait(self.ping_interval):
274 if self.sock:
275 self.last_ping_tm = time.time()
276 try:
277 _logging.debug("Sending ping")
278 self.sock.ping(self.ping_payload)
279 except Exception as e:
280 _logging.debug("Failed to send ping: {err}".format(err=e))
282 def run_forever(self, sockopt: tuple = None, sslopt: dict = None,
283 ping_interval: int or float = 0, ping_timeout: int or float = None,
284 ping_payload: str = "",
285 http_proxy_host: str = None, http_proxy_port: int or str = None,
286 http_no_proxy: list = None, http_proxy_auth: tuple = None,
287 http_proxy_timeout: int or float = None,
288 skip_utf8_validation: bool = False,
289 host: str = None, origin: str = None, dispatcher: Dispatcher = None,
290 suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool:
291 """
292 Run event loop for WebSocket framework.
294 This loop is an infinite loop and is alive while websocket is available.
296 Parameters
297 ----------
298 sockopt: tuple
299 Values for socket.setsockopt.
300 sockopt must be tuple
301 and each element is argument of sock.setsockopt.
302 sslopt: dict
303 Optional dict object for ssl socket option.
304 ping_interval: int or float
305 Automatically send "ping" command
306 every specified period (in seconds).
307 If set to 0, no ping is sent periodically.
308 ping_timeout: int or float
309 Timeout (in seconds) if the pong message is not received.
310 ping_payload: str
311 Payload message to send with each ping.
312 http_proxy_host: str
313 HTTP proxy host name.
314 http_proxy_port: int or str
315 HTTP proxy port. If not set, set to 80.
316 http_no_proxy: list
317 Whitelisted host names that don't use the proxy.
318 http_proxy_timeout: int or float
319 HTTP proxy timeout, default is 60 sec as per python-socks.
320 http_proxy_auth: tuple
321 HTTP proxy auth information. tuple of username and password. Default is None.
322 skip_utf8_validation: bool
323 skip utf8 validation.
324 host: str
325 update host header.
326 origin: str
327 update origin header.
328 dispatcher: Dispatcher object
329 customize reading data from socket.
330 suppress_origin: bool
331 suppress outputting origin header.
332 proxy_type: str
333 type of proxy from: http, socks4, socks4a, socks5, socks5h
334 reconnect: int
335 delay interval when reconnecting
337 Returns
338 -------
339 teardown: bool
340 False if the `WebSocketApp` is closed or caught KeyboardInterrupt,
341 True if any other exception was raised during a loop.
342 """
344 if reconnect is None:
345 reconnect = RECONNECT
347 if ping_timeout is not None and ping_timeout <= 0:
348 raise WebSocketException("Ensure ping_timeout > 0")
349 if ping_interval is not None and ping_interval < 0:
350 raise WebSocketException("Ensure ping_interval >= 0")
351 if ping_timeout and ping_interval and ping_interval <= ping_timeout:
352 raise WebSocketException("Ensure ping_interval > ping_timeout")
353 if not sockopt:
354 sockopt = []
355 if not sslopt:
356 sslopt = {}
357 if self.sock:
358 raise WebSocketException("socket is already opened")
360 self.ping_interval = ping_interval
361 self.ping_timeout = ping_timeout
362 self.ping_payload = ping_payload
363 self.keep_running = True
365 def teardown(close_frame: ABNF = None) -> None:
366 """
367 Tears down the connection.
369 Parameters
370 ----------
371 close_frame: ABNF frame
372 If close_frame is set, the on_close handler is invoked
373 with the statusCode and reason from the provided frame.
374 """
376 self._stop_ping_thread()
377 self.keep_running = False
378 if self.sock:
379 self.sock.close()
380 close_status_code, close_reason = self._get_close_args(
381 close_frame if close_frame else None)
382 self.sock = None
384 # Finally call the callback AFTER all teardown is complete
385 self._callback(self.on_close, close_status_code, close_reason)
387 def setSock(reconnecting: bool = False) -> None:
388 if reconnecting and self.sock:
389 self.sock.shutdown()
391 self.sock = WebSocket(
392 self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
393 fire_cont_frame=self.on_cont_message is not None,
394 skip_utf8_validation=skip_utf8_validation,
395 enable_multithread=True)
397 self.sock.settimeout(getdefaulttimeout())
398 try:
399 self.sock.connect(
400 self.url, header=self.header, cookie=self.cookie,
401 http_proxy_host=http_proxy_host,
402 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
403 http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout,
404 subprotocols=self.subprotocols,
405 host=host, origin=origin, suppress_origin=suppress_origin,
406 proxy_type=proxy_type, socket=self.prepared_socket)
408 _logging.info("Websocket connected")
410 if self.ping_interval:
411 self._start_ping_thread()
413 self._callback(self.on_open)
415 dispatcher.read(self.sock.sock, read, check)
416 except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e:
417 handleDisconnect(e, reconnecting)
419 def read() -> bool:
420 if not self.keep_running:
421 return teardown()
423 try:
424 op_code, frame = self.sock.recv_data_frame(True)
425 except (WebSocketConnectionClosedException, KeyboardInterrupt) as e:
426 if custom_dispatcher:
427 return handleDisconnect(e)
428 else:
429 raise e
431 if op_code == ABNF.OPCODE_CLOSE:
432 return teardown(frame)
433 elif op_code == ABNF.OPCODE_PING:
434 self._callback(self.on_ping, frame.data)
435 elif op_code == ABNF.OPCODE_PONG:
436 self.last_pong_tm = time.time()
437 self._callback(self.on_pong, frame.data)
438 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
439 self._callback(self.on_data, frame.data,
440 frame.opcode, frame.fin)
441 self._callback(self.on_cont_message,
442 frame.data, frame.fin)
443 else:
444 data = frame.data
445 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
446 data = data.decode("utf-8")
447 self._callback(self.on_data, data, frame.opcode, True)
448 self._callback(self.on_message, data)
450 return True
452 def check() -> bool:
453 if (self.ping_timeout):
454 has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout
455 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
456 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout
458 if (self.last_ping_tm and
459 has_timeout_expired and
460 (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
461 raise WebSocketTimeoutException("ping/pong timed out")
462 return True
464 def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool:
465 self.has_errored = True
466 self._stop_ping_thread()
467 if not reconnecting:
468 self._callback(self.on_error, e)
470 if isinstance(e, (KeyboardInterrupt, SystemExit)):
471 teardown()
472 # Propagate further
473 raise
475 if reconnect:
476 _logging.info("{err} - reconnect".format(err=e))
477 if custom_dispatcher:
478 _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
479 dispatcher.reconnect(reconnect, setSock)
480 else:
481 _logging.error("{err} - goodbye".format(err=e))
482 teardown()
484 custom_dispatcher = bool(dispatcher)
485 dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3])
487 setSock()
488 if not custom_dispatcher and reconnect:
489 while self.keep_running:
490 _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack())))
491 dispatcher.reconnect(reconnect, setSock)
493 return self.has_errored
495 def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase:
496 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
497 return WrappedDispatcher(self, ping_timeout, dispatcher)
498 timeout = ping_timeout or 10
499 if is_ssl:
500 return SSLDispatcher(self, timeout)
502 return Dispatcher(self, timeout)
504 def _get_close_args(self, close_frame: ABNF) -> list:
505 """
506 _get_close_args extracts the close code and reason from the close body
507 if it exists (RFC6455 says WebSocket Connection Close Code is optional)
508 """
509 # Need to catch the case where close_frame is None
510 # Otherwise the following if statement causes an error
511 if not self.on_close or not close_frame:
512 return [None, None]
514 # Extract close frame status code
515 if close_frame.data and len(close_frame.data) >= 2:
516 close_status_code = 256 * close_frame.data[0] + close_frame.data[1]
517 reason = close_frame.data[2:].decode('utf-8')
518 return [close_status_code, reason]
519 else:
520 # Most likely reached this because len(close_frame_data.data) < 2
521 return [None, None]
523 def _callback(self, callback: Callable, *args: tuple) -> None:
524 if callback:
525 try:
526 callback(self, *args)
528 except Exception as e:
529 _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e))
530 if self.on_error:
531 self.on_error(self, e)