Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_core.py: 23%
208 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:34 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:34 +0000
1import socket
2import struct
3import threading
4import time
6from typing import Optional, Union
8# websocket modules
9from ._abnf import *
10from ._exceptions import *
11from ._handshake import *
12from ._http import *
13from ._logging import *
14from ._socket import *
15from ._ssl_compat import *
16from ._utils import *
18"""
19_core.py
20websocket - WebSocket client library for Python
22Copyright 2023 engn33r
24Licensed under the Apache License, Version 2.0 (the "License");
25you may not use this file except in compliance with the License.
26You may obtain a copy of the License at
28 http://www.apache.org/licenses/LICENSE-2.0
30Unless required by applicable law or agreed to in writing, software
31distributed under the License is distributed on an "AS IS" BASIS,
32WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33See the License for the specific language governing permissions and
34limitations under the License.
35"""
37__all__ = ['WebSocket', 'create_connection']
40class WebSocket:
41 """
42 Low level WebSocket interface.
44 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
46 We can connect to the websocket server and send/receive data.
47 The following example is an echo client.
49 >>> import websocket
50 >>> ws = websocket.WebSocket()
51 >>> ws.connect("ws://echo.websocket.events")
52 >>> ws.recv()
53 'echo.websocket.events sponsored by Lob.com'
54 >>> ws.send("Hello, Server")
55 19
56 >>> ws.recv()
57 'Hello, Server'
58 >>> ws.close()
60 Parameters
61 ----------
62 get_mask_key: func
63 A callable function to get new mask keys, see the
64 WebSocket.set_mask_key's docstring for more information.
65 sockopt: tuple
66 Values for socket.setsockopt.
67 sockopt must be tuple and each element is argument of sock.setsockopt.
68 sslopt: dict
69 Optional dict object for ssl socket options. See FAQ for details.
70 fire_cont_frame: bool
71 Fire recv event for each cont frame. Default is False.
72 enable_multithread: bool
73 If set to True, lock send method.
74 skip_utf8_validation: bool
75 Skip utf8 validation.
76 """
78 def __init__(self, get_mask_key=None, sockopt=None, sslopt=None,
79 fire_cont_frame: bool = False, enable_multithread: bool = True,
80 skip_utf8_validation: bool = False, **_):
81 """
82 Initialize WebSocket object.
84 Parameters
85 ----------
86 sslopt: dict
87 Optional dict object for ssl socket options. See FAQ for details.
88 """
89 self.sock_opt = sock_opt(sockopt, sslopt)
90 self.handshake_response = None
91 self.sock = None
93 self.connected = False
94 self.get_mask_key = get_mask_key
95 # These buffer over the build-up of a single frame.
96 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
97 self.cont_frame = continuous_frame(
98 fire_cont_frame, skip_utf8_validation)
100 if enable_multithread:
101 self.lock = threading.Lock()
102 self.readlock = threading.Lock()
103 else:
104 self.lock = NoLock()
105 self.readlock = NoLock()
107 def __iter__(self):
108 """
109 Allow iteration over websocket, implying sequential `recv` executions.
110 """
111 while True:
112 yield self.recv()
114 def __next__(self):
115 return self.recv()
117 def next(self):
118 return self.__next__()
120 def fileno(self):
121 return self.sock.fileno()
123 def set_mask_key(self, func):
124 """
125 Set function to create mask key. You can customize mask key generator.
126 Mainly, this is for testing purpose.
128 Parameters
129 ----------
130 func: func
131 callable object. the func takes 1 argument as integer.
132 The argument means length of mask key.
133 This func must return string(byte array),
134 which length is argument specified.
135 """
136 self.get_mask_key = func
138 def gettimeout(self) -> float:
139 """
140 Get the websocket timeout (in seconds) as an int or float
142 Returns
143 ----------
144 timeout: int or float
145 returns timeout value (in seconds). This value could be either float/integer.
146 """
147 return self.sock_opt.timeout
149 def settimeout(self, timeout: Optional[float]):
150 """
151 Set the timeout to the websocket.
153 Parameters
154 ----------
155 timeout: int or float
156 timeout time (in seconds). This value could be either float/integer.
157 """
158 self.sock_opt.timeout = timeout
159 if self.sock:
160 self.sock.settimeout(timeout)
162 timeout = property(gettimeout, settimeout)
164 def getsubprotocol(self):
165 """
166 Get subprotocol
167 """
168 if self.handshake_response:
169 return self.handshake_response.subprotocol
170 else:
171 return None
173 subprotocol = property(getsubprotocol)
175 def getstatus(self):
176 """
177 Get handshake status
178 """
179 if self.handshake_response:
180 return self.handshake_response.status
181 else:
182 return None
184 status = property(getstatus)
186 def getheaders(self):
187 """
188 Get handshake response header
189 """
190 if self.handshake_response:
191 return self.handshake_response.headers
192 else:
193 return None
195 def is_ssl(self):
196 try:
197 return isinstance(self.sock, ssl.SSLSocket)
198 except:
199 return False
201 headers = property(getheaders)
203 def connect(self, url, **options):
204 """
205 Connect to url. url is websocket url scheme.
206 ie. ws://host:port/resource
207 You can customize using 'options'.
208 If you set "header" list object, you can set your own custom header.
210 >>> ws = WebSocket()
211 >>> ws.connect("ws://echo.websocket.events",
212 ... header=["User-Agent: MyProgram",
213 ... "x-custom: header"])
215 Parameters
216 ----------
217 header: list or dict
218 Custom http header list or dict.
219 cookie: str
220 Cookie value.
221 origin: str
222 Custom origin url.
223 connection: str
224 Custom connection header value.
225 Default value "Upgrade" set in _handshake.py
226 suppress_origin: bool
227 Suppress outputting origin header.
228 host: str
229 Custom host header string.
230 timeout: int or float
231 Socket timeout time. This value is an integer or float.
232 If you set None for this value, it means "use default_timeout value"
233 http_proxy_host: str
234 HTTP proxy host name.
235 http_proxy_port: str or int
236 HTTP proxy port. Default is 80.
237 http_no_proxy: list
238 Whitelisted host names that don't use the proxy.
239 http_proxy_auth: tuple
240 HTTP proxy auth information. Tuple of username and password. Default is None.
241 http_proxy_timeout: int or float
242 HTTP proxy timeout, default is 60 sec as per python-socks.
243 redirect_limit: int
244 Number of redirects to follow.
245 subprotocols: list
246 List of available subprotocols. Default is None.
247 socket: socket
248 Pre-initialized stream socket.
249 """
250 self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
251 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
252 options.pop('socket', None))
254 try:
255 self.handshake_response = handshake(self.sock, url, *addrs, **options)
256 for attempt in range(options.pop('redirect_limit', 3)):
257 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
258 url = self.handshake_response.headers['location']
259 self.sock.close()
260 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
261 options.pop('socket', None))
262 self.handshake_response = handshake(self.sock, url, *addrs, **options)
263 self.connected = True
264 except:
265 if self.sock:
266 self.sock.close()
267 self.sock = None
268 raise
270 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
271 """
272 Send the data as string.
274 Parameters
275 ----------
276 payload: str
277 Payload must be utf-8 string or unicode,
278 If the opcode is OPCODE_TEXT.
279 Otherwise, it must be string(byte array).
280 opcode: int
281 Operation code (opcode) to send.
282 """
284 frame = ABNF.create_frame(payload, opcode)
285 return self.send_frame(frame)
287 def send_frame(self, frame) -> int:
288 """
289 Send the data frame.
291 >>> ws = create_connection("ws://echo.websocket.events")
292 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
293 >>> ws.send_frame(frame)
294 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
295 >>> ws.send_frame(frame)
296 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
297 >>> ws.send_frame(frame)
299 Parameters
300 ----------
301 frame: ABNF frame
302 frame data created by ABNF.create_frame
303 """
304 if self.get_mask_key:
305 frame.get_mask_key = self.get_mask_key
306 data = frame.format()
307 length = len(data)
308 if (isEnabledForTrace()):
309 trace("++Sent raw: " + repr(data))
310 trace("++Sent decoded: " + frame.__str__())
311 with self.lock:
312 while data:
313 l = self._send(data)
314 data = data[l:]
316 return length
318 def send_binary(self, payload: bytes) -> int:
319 """
320 Send a binary message (OPCODE_BINARY).
322 Parameters
323 ----------
324 payload: bytes
325 payload of message to send.
326 """
327 return self.send(payload, ABNF.OPCODE_BINARY)
329 def ping(self, payload: Union[str, bytes] = ""):
330 """
331 Send ping data.
333 Parameters
334 ----------
335 payload: str
336 data payload to send server.
337 """
338 if isinstance(payload, str):
339 payload = payload.encode("utf-8")
340 self.send(payload, ABNF.OPCODE_PING)
342 def pong(self, payload: Union[str, bytes] = ""):
343 """
344 Send pong data.
346 Parameters
347 ----------
348 payload: str
349 data payload to send server.
350 """
351 if isinstance(payload, str):
352 payload = payload.encode("utf-8")
353 self.send(payload, ABNF.OPCODE_PONG)
355 def recv(self) -> Union[str, bytes]:
356 """
357 Receive string data(byte array) from the server.
359 Returns
360 ----------
361 data: string (byte array) value.
362 """
363 with self.readlock:
364 opcode, data = self.recv_data()
365 if opcode == ABNF.OPCODE_TEXT:
366 return data.decode("utf-8")
367 elif opcode == ABNF.OPCODE_TEXT or opcode == ABNF.OPCODE_BINARY:
368 return data
369 else:
370 return ''
372 def recv_data(self, control_frame: bool = False) -> tuple:
373 """
374 Receive data with operation code.
376 Parameters
377 ----------
378 control_frame: bool
379 a boolean flag indicating whether to return control frame
380 data, defaults to False
382 Returns
383 -------
384 opcode, frame.data: tuple
385 tuple of operation code and string(byte array) value.
386 """
387 opcode, frame = self.recv_data_frame(control_frame)
388 return opcode, frame.data
390 def recv_data_frame(self, control_frame: bool = False):
391 """
392 Receive data with operation code.
394 If a valid ping message is received, a pong response is sent.
396 Parameters
397 ----------
398 control_frame: bool
399 a boolean flag indicating whether to return control frame
400 data, defaults to False
402 Returns
403 -------
404 frame.opcode, frame: tuple
405 tuple of operation code and string(byte array) value.
406 """
407 while True:
408 frame = self.recv_frame()
409 if (isEnabledForTrace()):
410 trace("++Rcv raw: " + repr(frame.format()))
411 trace("++Rcv decoded: " + frame.__str__())
412 if not frame:
413 # handle error:
414 # 'NoneType' object has no attribute 'opcode'
415 raise WebSocketProtocolException(
416 "Not a valid frame {frame}".format(frame=frame))
417 elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT):
418 self.cont_frame.validate(frame)
419 self.cont_frame.add(frame)
421 if self.cont_frame.is_fire(frame):
422 return self.cont_frame.extract(frame)
424 elif frame.opcode == ABNF.OPCODE_CLOSE:
425 self.send_close()
426 return frame.opcode, frame
427 elif frame.opcode == ABNF.OPCODE_PING:
428 if len(frame.data) < 126:
429 self.pong(frame.data)
430 else:
431 raise WebSocketProtocolException(
432 "Ping message is too long")
433 if control_frame:
434 return frame.opcode, frame
435 elif frame.opcode == ABNF.OPCODE_PONG:
436 if control_frame:
437 return frame.opcode, frame
439 def recv_frame(self):
440 """
441 Receive data as frame from server.
443 Returns
444 -------
445 self.frame_buffer.recv_frame(): ABNF frame object
446 """
447 return self.frame_buffer.recv_frame()
449 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
450 """
451 Send close data to the server.
453 Parameters
454 ----------
455 status: int
456 Status code to send. See STATUS_XXX.
457 reason: str or bytes
458 The reason to close. This must be string or UTF-8 bytes.
459 """
460 if status < 0 or status >= ABNF.LENGTH_16:
461 raise ValueError("code is invalid range")
462 self.connected = False
463 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
465 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: float = 3):
466 """
467 Close Websocket object
469 Parameters
470 ----------
471 status: int
472 Status code to send. See VALID_CLOSE_STATUS in ABNF.
473 reason: bytes
474 The reason to close in UTF-8.
475 timeout: int or float
476 Timeout until receive a close frame.
477 If None, it will wait forever until receive a close frame.
478 """
479 if self.connected:
480 if status < 0 or status >= ABNF.LENGTH_16:
481 raise ValueError("code is invalid range")
483 try:
484 self.connected = False
485 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
486 sock_timeout = self.sock.gettimeout()
487 self.sock.settimeout(timeout)
488 start_time = time.time()
489 while timeout is None or time.time() - start_time < timeout:
490 try:
491 frame = self.recv_frame()
492 if frame.opcode != ABNF.OPCODE_CLOSE:
493 continue
494 if isEnabledForError():
495 recv_status = struct.unpack("!H", frame.data[0:2])[0]
496 if recv_status >= 3000 and recv_status <= 4999:
497 debug("close status: " + repr(recv_status))
498 elif recv_status != STATUS_NORMAL:
499 error("close status: " + repr(recv_status))
500 break
501 except:
502 break
503 self.sock.settimeout(sock_timeout)
504 self.sock.shutdown(socket.SHUT_RDWR)
505 except:
506 pass
508 self.shutdown()
510 def abort(self):
511 """
512 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
513 """
514 if self.connected:
515 self.sock.shutdown(socket.SHUT_RDWR)
517 def shutdown(self):
518 """
519 close socket, immediately.
520 """
521 if self.sock:
522 self.sock.close()
523 self.sock = None
524 self.connected = False
526 def _send(self, data: Union[str, bytes]):
527 return send(self.sock, data)
529 def _recv(self, bufsize):
530 try:
531 return recv(self.sock, bufsize)
532 except WebSocketConnectionClosedException:
533 if self.sock:
534 self.sock.close()
535 self.sock = None
536 self.connected = False
537 raise
540def create_connection(url: str, timeout=None, class_=WebSocket, **options):
541 """
542 Connect to url and return websocket object.
544 Connect to url and return the WebSocket object.
545 Passing optional timeout parameter will set the timeout on the socket.
546 If no timeout is supplied,
547 the global default timeout setting returned by getdefaulttimeout() is used.
548 You can customize using 'options'.
549 If you set "header" list object, you can set your own custom header.
551 >>> conn = create_connection("ws://echo.websocket.events",
552 ... header=["User-Agent: MyProgram",
553 ... "x-custom: header"])
555 Parameters
556 ----------
557 class_: class
558 class to instantiate when creating the connection. It has to implement
559 settimeout and connect. It's __init__ should be compatible with
560 WebSocket.__init__, i.e. accept all of it's kwargs.
561 header: list or dict
562 custom http header list or dict.
563 cookie: str
564 Cookie value.
565 origin: str
566 custom origin url.
567 suppress_origin: bool
568 suppress outputting origin header.
569 host: str
570 custom host header string.
571 timeout: int or float
572 socket timeout time. This value could be either float/integer.
573 If set to None, it uses the default_timeout value.
574 http_proxy_host: str
575 HTTP proxy host name.
576 http_proxy_port: str or int
577 HTTP proxy port. If not set, set to 80.
578 http_no_proxy: list
579 Whitelisted host names that don't use the proxy.
580 http_proxy_auth: tuple
581 HTTP proxy auth information. tuple of username and password. Default is None.
582 http_proxy_timeout: int or float
583 HTTP proxy timeout, default is 60 sec as per python-socks.
584 enable_multithread: bool
585 Enable lock for multithread.
586 redirect_limit: int
587 Number of redirects to follow.
588 sockopt: tuple
589 Values for socket.setsockopt.
590 sockopt must be a tuple and each element is an argument of sock.setsockopt.
591 sslopt: dict
592 Optional dict object for ssl socket options. See FAQ for details.
593 subprotocols: list
594 List of available subprotocols. Default is None.
595 skip_utf8_validation: bool
596 Skip utf8 validation.
597 socket: socket
598 Pre-initialized stream socket.
599 """
600 sockopt = options.pop("sockopt", [])
601 sslopt = options.pop("sslopt", {})
602 fire_cont_frame = options.pop("fire_cont_frame", False)
603 enable_multithread = options.pop("enable_multithread", True)
604 skip_utf8_validation = options.pop("skip_utf8_validation", False)
605 websock = class_(sockopt=sockopt, sslopt=sslopt,
606 fire_cont_frame=fire_cont_frame,
607 enable_multithread=enable_multithread,
608 skip_utf8_validation=skip_utf8_validation, **options)
609 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
610 websock.connect(url, **options)
611 return websock