Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import socket
2import struct
3import threading
4import time
5from typing import Optional, Union
7# websocket modules
8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
9from ._exceptions import WebSocketProtocolException, WebSocketConnectionClosedException
10from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake
11from ._http import connect, proxy_info
12from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
13from ._socket import getdefaulttimeout, recv, send, sock_opt
14from ._ssl_compat import ssl
15from ._utils import NoLock
16from ._dispatcher import DispatcherBase, WrappedDispatcher
18"""
19_core.py
20websocket - WebSocket client library for Python
22Copyright 2024 engn33r
24Licensed under the Apache License, Version 2.0 (the "License");
25you may not use this file except in compliance with the License.
26You may obtain a copy of the License at
28 http://www.apache.org/licenses/LICENSE-2.0
30Unless required by applicable law or agreed to in writing, software
31distributed under the License is distributed on an "AS IS" BASIS,
32WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33See the License for the specific language governing permissions and
34limitations under the License.
35"""
37__all__ = ["WebSocket", "create_connection"]
40class WebSocket:
41 """
42 Low level WebSocket interface.
44 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
46 We can connect to the websocket server and send/receive data.
47 The following example is an echo client.
49 >>> import websocket
50 >>> ws = websocket.WebSocket()
51 >>> ws.connect("ws://echo.websocket.events")
52 >>> ws.recv()
53 'echo.websocket.events sponsored by Lob.com'
54 >>> ws.send("Hello, Server")
55 19
56 >>> ws.recv()
57 'Hello, Server'
58 >>> ws.close()
60 Parameters
61 ----------
62 get_mask_key: func
63 A callable function to get new mask keys, see the
64 WebSocket.set_mask_key's docstring for more information.
65 sockopt: tuple
66 Values for socket.setsockopt.
67 sockopt must be tuple and each element is argument of sock.setsockopt.
68 sslopt: dict
69 Optional dict object for ssl socket options. See FAQ for details.
70 fire_cont_frame: bool
71 Fire recv event for each cont frame. Default is False.
72 enable_multithread: bool
73 If set to True, lock send method.
74 skip_utf8_validation: bool
75 Skip utf8 validation.
76 """
78 def __init__(
79 self,
80 get_mask_key=None,
81 sockopt=None,
82 sslopt=None,
83 fire_cont_frame: bool = False,
84 enable_multithread: bool = True,
85 skip_utf8_validation: bool = False,
86 dispatcher: Union[DispatcherBase, WrappedDispatcher] = None,
87 **_,
88 ):
89 """
90 Initialize WebSocket object.
92 Parameters
93 ----------
94 sslopt: dict
95 Optional dict object for ssl socket options. See FAQ for details.
96 """
97 self.sock_opt = sock_opt(sockopt, sslopt)
98 self.handshake_response = None
99 self.sock: Optional[socket.socket] = None
101 self.connected = False
102 self.get_mask_key = get_mask_key
103 # These buffer over the build-up of a single frame.
104 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
105 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
106 self.dispatcher = dispatcher
108 if enable_multithread:
109 self.lock = threading.Lock()
110 self.readlock = threading.Lock()
111 else:
112 self.lock = NoLock()
113 self.readlock = NoLock()
115 def __iter__(self):
116 """
117 Allow iteration over websocket, implying sequential `recv` executions.
118 """
119 while True:
120 yield self.recv()
122 def __next__(self):
123 return self.recv()
125 def next(self):
126 return self.__next__()
128 def fileno(self):
129 return self.sock.fileno()
131 def set_mask_key(self, func):
132 """
133 Set function to create mask key. You can customize mask key generator.
134 Mainly, this is for testing purpose.
136 Parameters
137 ----------
138 func: func
139 callable object. the func takes 1 argument as integer.
140 The argument means length of mask key.
141 This func must return string(byte array),
142 which length is argument specified.
143 """
144 self.get_mask_key = func
146 def gettimeout(self) -> Union[float, int, None]:
147 """
148 Get the websocket timeout (in seconds) as an int or float
150 Returns
151 ----------
152 timeout: int or float
153 returns timeout value (in seconds). This value could be either float/integer.
154 """
155 return self.sock_opt.timeout
157 def settimeout(self, timeout: Union[float, int, None]):
158 """
159 Set the timeout to the websocket.
161 Parameters
162 ----------
163 timeout: int or float
164 timeout time (in seconds). This value could be either float/integer.
165 """
166 self.sock_opt.timeout = timeout
167 if self.sock:
168 self.sock.settimeout(timeout)
170 timeout = property(gettimeout, settimeout)
172 def getsubprotocol(self):
173 """
174 Get subprotocol
175 """
176 if self.handshake_response:
177 return self.handshake_response.subprotocol
178 else:
179 return None
181 subprotocol = property(getsubprotocol)
183 def getstatus(self):
184 """
185 Get handshake status
186 """
187 if self.handshake_response:
188 return self.handshake_response.status
189 else:
190 return None
192 status = property(getstatus)
194 def getheaders(self):
195 """
196 Get handshake response header
197 """
198 if self.handshake_response:
199 return self.handshake_response.headers
200 else:
201 return None
203 def is_ssl(self):
204 try:
205 return isinstance(self.sock, ssl.SSLSocket)
206 except:
207 return False
209 headers = property(getheaders)
211 def connect(self, url, **options):
212 """
213 Connect to url. url is websocket url scheme.
214 ie. ws://host:port/resource
215 You can customize using 'options'.
216 If you set "header" list object, you can set your own custom header.
218 >>> ws = WebSocket()
219 >>> ws.connect("ws://echo.websocket.events",
220 ... header=["User-Agent: MyProgram",
221 ... "x-custom: header"])
223 Parameters
224 ----------
225 header: list or dict
226 Custom http header list or dict.
227 cookie: str
228 Cookie value.
229 origin: str
230 Custom origin url.
231 connection: str
232 Custom connection header value.
233 Default value "Upgrade" set in _handshake.py
234 suppress_origin: bool
235 Suppress outputting origin header.
236 host: str
237 Custom host header string.
238 timeout: int or float
239 Socket timeout time. This value is an integer or float.
240 If you set None for this value, it means "use default_timeout value"
241 http_proxy_host: str
242 HTTP proxy host name.
243 http_proxy_port: str or int
244 HTTP proxy port. Default is 80.
245 http_no_proxy: list
246 Whitelisted host names that don't use the proxy.
247 http_proxy_auth: tuple
248 HTTP proxy auth information. Tuple of username and password. Default is None.
249 http_proxy_timeout: int or float
250 HTTP proxy timeout, default is 60 sec as per python-socks.
251 redirect_limit: int
252 Number of redirects to follow.
253 subprotocols: list
254 List of available subprotocols. Default is None.
255 socket: socket
256 Pre-initialized stream socket.
257 """
258 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
259 self.sock, addrs = connect(
260 url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
261 )
263 try:
264 self.handshake_response = handshake(self.sock, url, *addrs, **options)
265 for _ in range(options.pop("redirect_limit", 3)):
266 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
267 url = self.handshake_response.headers["location"]
268 self.sock.close()
269 self.sock, addrs = connect(
270 url,
271 self.sock_opt,
272 proxy_info(**options),
273 options.pop("socket", None),
274 )
275 self.handshake_response = handshake(
276 self.sock, url, *addrs, **options
277 )
278 self.connected = True
279 except:
280 if self.sock:
281 self.sock.close()
282 self.sock = None
283 raise
285 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
286 """
287 Send the data as string.
289 Parameters
290 ----------
291 payload: str
292 Payload must be utf-8 string or unicode,
293 If the opcode is OPCODE_TEXT.
294 Otherwise, it must be string(byte array).
295 opcode: int
296 Operation code (opcode) to send.
297 """
299 frame = ABNF.create_frame(payload, opcode)
300 return self.send_frame(frame)
302 def send_text(self, text_data: str) -> int:
303 """
304 Sends UTF-8 encoded text.
305 """
306 return self.send(text_data, ABNF.OPCODE_TEXT)
308 def send_bytes(self, data: Union[bytes, bytearray]) -> int:
309 """
310 Sends a sequence of bytes.
311 """
312 return self.send(data, ABNF.OPCODE_BINARY)
314 def send_frame(self, frame) -> int:
315 """
316 Send the data frame.
318 >>> ws = create_connection("ws://echo.websocket.events")
319 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
320 >>> ws.send_frame(frame)
321 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
322 >>> ws.send_frame(frame)
323 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
324 >>> ws.send_frame(frame)
326 Parameters
327 ----------
328 frame: ABNF frame
329 frame data created by ABNF.create_frame
330 """
331 if self.get_mask_key:
332 frame.get_mask_key = self.get_mask_key
333 data = frame.format()
334 length = len(data)
335 if isEnabledForTrace():
336 trace(f"++Sent raw: {repr(data)}")
337 trace(f"++Sent decoded: {frame.__str__()}")
338 with self.lock:
339 while data:
340 l = self._send(data)
341 data = data[l:]
343 return length
345 def send_binary(self, payload: bytes) -> int:
346 """
347 Send a binary message (OPCODE_BINARY).
349 Parameters
350 ----------
351 payload: bytes
352 payload of message to send.
353 """
354 return self.send(payload, ABNF.OPCODE_BINARY)
356 def ping(self, payload: Union[str, bytes] = ""):
357 """
358 Send ping data.
360 Parameters
361 ----------
362 payload: str
363 data payload to send server.
364 """
365 if isinstance(payload, str):
366 payload = payload.encode("utf-8")
367 self.send(payload, ABNF.OPCODE_PING)
369 def pong(self, payload: Union[str, bytes] = ""):
370 """
371 Send pong data.
373 Parameters
374 ----------
375 payload: str
376 data payload to send server.
377 """
378 if isinstance(payload, str):
379 payload = payload.encode("utf-8")
380 self.send(payload, ABNF.OPCODE_PONG)
382 def recv(self) -> Union[str, bytes]:
383 """
384 Receive string data(byte array) from the server.
386 Returns
387 ----------
388 data: string (byte array) value.
389 """
390 with self.readlock:
391 opcode, data = self.recv_data()
392 if opcode == ABNF.OPCODE_TEXT:
393 data_received: Union[bytes, str] = data
394 if isinstance(data_received, bytes):
395 return data_received.decode("utf-8")
396 elif isinstance(data_received, str):
397 return data_received
398 elif opcode == ABNF.OPCODE_BINARY:
399 data_binary: bytes = data
400 return data_binary
401 else:
402 return ""
404 def recv_data(self, control_frame: bool = False) -> tuple:
405 """
406 Receive data with operation code.
408 Parameters
409 ----------
410 control_frame: bool
411 a boolean flag indicating whether to return control frame
412 data, defaults to False
414 Returns
415 -------
416 opcode, frame.data: tuple
417 tuple of operation code and string(byte array) value.
418 """
419 opcode, frame = self.recv_data_frame(control_frame)
420 return opcode, frame.data
422 def recv_data_frame(self, control_frame: bool = False) -> tuple:
423 """
424 Receive data with operation code.
426 If a valid ping message is received, a pong response is sent.
428 Parameters
429 ----------
430 control_frame: bool
431 a boolean flag indicating whether to return control frame
432 data, defaults to False
434 Returns
435 -------
436 frame.opcode, frame: tuple
437 tuple of operation code and string(byte array) value.
438 """
439 while True:
440 frame = self.recv_frame()
441 if isEnabledForTrace():
442 trace(f"++Rcv raw: {repr(frame.format())}")
443 trace(f"++Rcv decoded: {frame.__str__()}")
444 if not frame:
445 # handle error:
446 # 'NoneType' object has no attribute 'opcode'
447 raise WebSocketProtocolException(f"Not a valid frame {frame}")
448 elif frame.opcode in (
449 ABNF.OPCODE_TEXT,
450 ABNF.OPCODE_BINARY,
451 ABNF.OPCODE_CONT,
452 ):
453 self.cont_frame.validate(frame)
454 self.cont_frame.add(frame)
456 if self.cont_frame.is_fire(frame):
457 return self.cont_frame.extract(frame)
459 elif frame.opcode == ABNF.OPCODE_CLOSE:
460 self.send_close()
461 return frame.opcode, frame
462 elif frame.opcode == ABNF.OPCODE_PING:
463 if len(frame.data) < 126:
464 self.pong(frame.data)
465 else:
466 raise WebSocketProtocolException("Ping message is too long")
467 if control_frame:
468 return frame.opcode, frame
469 elif frame.opcode == ABNF.OPCODE_PONG:
470 if control_frame:
471 return frame.opcode, frame
473 def recv_frame(self):
474 """
475 Receive data as frame from server.
477 Returns
478 -------
479 self.frame_buffer.recv_frame(): ABNF frame object
480 """
481 return self.frame_buffer.recv_frame()
483 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
484 """
485 Send close data to the server.
487 Parameters
488 ----------
489 status: int
490 Status code to send. See STATUS_XXX.
491 reason: str or bytes
492 The reason to close. This must be string or UTF-8 bytes.
493 """
494 if status < 0 or status >= ABNF.LENGTH_16:
495 raise ValueError("code is invalid range")
496 self.connected = False
497 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
499 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3):
500 """
501 Close Websocket object
503 Parameters
504 ----------
505 status: int
506 Status code to send. See VALID_CLOSE_STATUS in ABNF.
507 reason: bytes
508 The reason to close in UTF-8.
509 timeout: int or float
510 Timeout until receive a close frame.
511 If None, it will wait forever until receive a close frame.
512 """
513 if not self.connected:
514 return
515 if status < 0 or status >= ABNF.LENGTH_16:
516 raise ValueError("code is invalid range")
518 try:
519 self.connected = False
520 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
521 sock_timeout = self.sock.gettimeout()
522 self.sock.settimeout(timeout)
523 start_time = time.time()
524 while timeout is None or time.time() - start_time < timeout:
525 try:
526 frame = self.recv_frame()
527 if frame.opcode != ABNF.OPCODE_CLOSE:
528 continue
529 if isEnabledForError():
530 recv_status = struct.unpack("!H", frame.data[0:2])[0]
531 if recv_status >= 3000 and recv_status <= 4999:
532 debug(f"close status: {repr(recv_status)}")
533 elif recv_status != STATUS_NORMAL:
534 error(f"close status: {repr(recv_status)}")
535 break
536 except:
537 break
538 self.sock.settimeout(sock_timeout)
539 self.sock.shutdown(socket.SHUT_RDWR)
540 except:
541 pass
543 self.shutdown()
545 def abort(self):
546 """
547 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
548 """
549 if self.connected:
550 self.sock.shutdown(socket.SHUT_RDWR)
552 def shutdown(self):
553 """
554 close socket, immediately.
555 """
556 if self.sock:
557 self.sock.close()
558 self.sock = None
559 self.connected = False
561 def _send(self, data: Union[str, bytes]):
562 if self.dispatcher:
563 return self.dispatcher.send(self.sock, data)
564 return send(self.sock, data)
566 def _recv(self, bufsize):
567 try:
568 return recv(self.sock, bufsize)
569 except WebSocketConnectionClosedException:
570 if self.sock:
571 self.sock.close()
572 self.sock = None
573 self.connected = False
574 raise
577def create_connection(url: str, timeout=None, class_=WebSocket, **options):
578 """
579 Connect to url and return websocket object.
581 Connect to url and return the WebSocket object.
582 Passing optional timeout parameter will set the timeout on the socket.
583 If no timeout is supplied,
584 the global default timeout setting returned by getdefaulttimeout() is used.
585 You can customize using 'options'.
586 If you set "header" list object, you can set your own custom header.
588 >>> conn = create_connection("ws://echo.websocket.events",
589 ... header=["User-Agent: MyProgram",
590 ... "x-custom: header"])
592 Parameters
593 ----------
594 class_: class
595 class to instantiate when creating the connection. It has to implement
596 settimeout and connect. It's __init__ should be compatible with
597 WebSocket.__init__, i.e. accept all of it's kwargs.
598 header: list or dict
599 custom http header list or dict.
600 cookie: str
601 Cookie value.
602 origin: str
603 custom origin url.
604 suppress_origin: bool
605 suppress outputting origin header.
606 host: str
607 custom host header string.
608 timeout: int or float
609 socket timeout time. This value could be either float/integer.
610 If set to None, it uses the default_timeout value.
611 http_proxy_host: str
612 HTTP proxy host name.
613 http_proxy_port: str or int
614 HTTP proxy port. If not set, set to 80.
615 http_no_proxy: list
616 Whitelisted host names that don't use the proxy.
617 http_proxy_auth: tuple
618 HTTP proxy auth information. tuple of username and password. Default is None.
619 http_proxy_timeout: int or float
620 HTTP proxy timeout, default is 60 sec as per python-socks.
621 enable_multithread: bool
622 Enable lock for multithread.
623 redirect_limit: int
624 Number of redirects to follow.
625 sockopt: tuple
626 Values for socket.setsockopt.
627 sockopt must be a tuple and each element is an argument of sock.setsockopt.
628 sslopt: dict
629 Optional dict object for ssl socket options. See FAQ for details.
630 subprotocols: list
631 List of available subprotocols. Default is None.
632 skip_utf8_validation: bool
633 Skip utf8 validation.
634 socket: socket
635 Pre-initialized stream socket.
636 """
637 sockopt = options.pop("sockopt", [])
638 sslopt = options.pop("sslopt", {})
639 fire_cont_frame = options.pop("fire_cont_frame", False)
640 enable_multithread = options.pop("enable_multithread", True)
641 skip_utf8_validation = options.pop("skip_utf8_validation", False)
642 websock = class_(
643 sockopt=sockopt,
644 sslopt=sslopt,
645 fire_cont_frame=fire_cont_frame,
646 enable_multithread=enable_multithread,
647 skip_utf8_validation=skip_utf8_validation,
648 **options,
649 )
650 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
651 websock.connect(url, **options)
652 return websock