Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_core.py: 23%
207 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import socket
2import struct
3import threading
4import time
6# websocket modules
7from ._abnf import *
8from ._exceptions import *
9from ._handshake import *
10from ._http import *
11from ._logging import *
12from ._socket import *
13from ._ssl_compat import *
14from ._utils import *
16"""
17_core.py
18websocket - WebSocket client library for Python
20Copyright 2022 engn33r
22Licensed under the Apache License, Version 2.0 (the "License");
23you may not use this file except in compliance with the License.
24You may obtain a copy of the License at
26 http://www.apache.org/licenses/LICENSE-2.0
28Unless required by applicable law or agreed to in writing, software
29distributed under the License is distributed on an "AS IS" BASIS,
30WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
31See the License for the specific language governing permissions and
32limitations under the License.
33"""
35__all__ = ['WebSocket', 'create_connection']
38class WebSocket:
39 """
40 Low level WebSocket interface.
42 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
44 We can connect to the websocket server and send/receive data.
45 The following example is an echo client.
47 >>> import websocket
48 >>> ws = websocket.WebSocket()
49 >>> ws.connect("ws://echo.websocket.events")
50 >>> ws.recv()
51 'echo.websocket.events sponsored by Lob.com'
52 >>> ws.send("Hello, Server")
53 19
54 >>> ws.recv()
55 'Hello, Server'
56 >>> ws.close()
58 Parameters
59 ----------
60 get_mask_key: func
61 A callable function to get new mask keys, see the
62 WebSocket.set_mask_key's docstring for more information.
63 sockopt: tuple
64 Values for socket.setsockopt.
65 sockopt must be tuple and each element is argument of sock.setsockopt.
66 sslopt: dict
67 Optional dict object for ssl socket options. See FAQ for details.
68 fire_cont_frame: bool
69 Fire recv event for each cont frame. Default is False.
70 enable_multithread: bool
71 If set to True, lock send method.
72 skip_utf8_validation: bool
73 Skip utf8 validation.
74 """
76 def __init__(self, get_mask_key=None, sockopt=None, sslopt=None,
77 fire_cont_frame=False, enable_multithread=True,
78 skip_utf8_validation=False, **_):
79 """
80 Initialize WebSocket object.
82 Parameters
83 ----------
84 sslopt: dict
85 Optional dict object for ssl socket options. See FAQ for details.
86 """
87 self.sock_opt = sock_opt(sockopt, sslopt)
88 self.handshake_response = None
89 self.sock = None
91 self.connected = False
92 self.get_mask_key = get_mask_key
93 # These buffer over the build-up of a single frame.
94 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
95 self.cont_frame = continuous_frame(
96 fire_cont_frame, skip_utf8_validation)
98 if enable_multithread:
99 self.lock = threading.Lock()
100 self.readlock = threading.Lock()
101 else:
102 self.lock = NoLock()
103 self.readlock = NoLock()
105 def __iter__(self):
106 """
107 Allow iteration over websocket, implying sequential `recv` executions.
108 """
109 while True:
110 yield self.recv()
112 def __next__(self):
113 return self.recv()
115 def next(self):
116 return self.__next__()
118 def fileno(self):
119 return self.sock.fileno()
121 def set_mask_key(self, func):
122 """
123 Set function to create mask key. You can customize mask key generator.
124 Mainly, this is for testing purpose.
126 Parameters
127 ----------
128 func: func
129 callable object. the func takes 1 argument as integer.
130 The argument means length of mask key.
131 This func must return string(byte array),
132 which length is argument specified.
133 """
134 self.get_mask_key = func
136 def gettimeout(self):
137 """
138 Get the websocket timeout (in seconds) as an int or float
140 Returns
141 ----------
142 timeout: int or float
143 returns timeout value (in seconds). This value could be either float/integer.
144 """
145 return self.sock_opt.timeout
147 def settimeout(self, timeout):
148 """
149 Set the timeout to the websocket.
151 Parameters
152 ----------
153 timeout: int or float
154 timeout time (in seconds). This value could be either float/integer.
155 """
156 self.sock_opt.timeout = timeout
157 if self.sock:
158 self.sock.settimeout(timeout)
160 timeout = property(gettimeout, settimeout)
162 def getsubprotocol(self):
163 """
164 Get subprotocol
165 """
166 if self.handshake_response:
167 return self.handshake_response.subprotocol
168 else:
169 return None
171 subprotocol = property(getsubprotocol)
173 def getstatus(self):
174 """
175 Get handshake status
176 """
177 if self.handshake_response:
178 return self.handshake_response.status
179 else:
180 return None
182 status = property(getstatus)
184 def getheaders(self):
185 """
186 Get handshake response header
187 """
188 if self.handshake_response:
189 return self.handshake_response.headers
190 else:
191 return None
193 def is_ssl(self):
194 try:
195 return isinstance(self.sock, ssl.SSLSocket)
196 except:
197 return False
199 headers = property(getheaders)
201 def connect(self, url, **options):
202 """
203 Connect to url. url is websocket url scheme.
204 ie. ws://host:port/resource
205 You can customize using 'options'.
206 If you set "header" list object, you can set your own custom header.
208 >>> ws = WebSocket()
209 >>> ws.connect("ws://echo.websocket.events",
210 ... header=["User-Agent: MyProgram",
211 ... "x-custom: header"])
213 Parameters
214 ----------
215 header: list or dict
216 Custom http header list or dict.
217 cookie: str
218 Cookie value.
219 origin: str
220 Custom origin url.
221 connection: str
222 Custom connection header value.
223 Default value "Upgrade" set in _handshake.py
224 suppress_origin: bool
225 Suppress outputting origin header.
226 host: str
227 Custom host header string.
228 timeout: int or float
229 Socket timeout time. This value is an integer or float.
230 If you set None for this value, it means "use default_timeout value"
231 http_proxy_host: str
232 HTTP proxy host name.
233 http_proxy_port: str or int
234 HTTP proxy port. Default is 80.
235 http_no_proxy: list
236 Whitelisted host names that don't use the proxy.
237 http_proxy_auth: tuple
238 HTTP proxy auth information. Tuple of username and password. Default is None.
239 http_proxy_timeout: int or float
240 HTTP proxy timeout, default is 60 sec as per python-socks.
241 redirect_limit: int
242 Number of redirects to follow.
243 subprotocols: list
244 List of available subprotocols. Default is None.
245 socket: socket
246 Pre-initialized stream socket.
247 """
248 self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
249 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
250 options.pop('socket', None))
252 try:
253 self.handshake_response = handshake(self.sock, url, *addrs, **options)
254 for attempt in range(options.pop('redirect_limit', 3)):
255 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
256 url = self.handshake_response.headers['location']
257 self.sock.close()
258 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
259 options.pop('socket', None))
260 self.handshake_response = handshake(self.sock, url, *addrs, **options)
261 self.connected = True
262 except:
263 if self.sock:
264 self.sock.close()
265 self.sock = None
266 raise
268 def send(self, payload, opcode=ABNF.OPCODE_TEXT):
269 """
270 Send the data as string.
272 Parameters
273 ----------
274 payload: str
275 Payload must be utf-8 string or unicode,
276 If the opcode is OPCODE_TEXT.
277 Otherwise, it must be string(byte array).
278 opcode: int
279 Operation code (opcode) to send.
280 """
282 frame = ABNF.create_frame(payload, opcode)
283 return self.send_frame(frame)
285 def send_frame(self, frame):
286 """
287 Send the data frame.
289 >>> ws = create_connection("ws://echo.websocket.events")
290 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
291 >>> ws.send_frame(frame)
292 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
293 >>> ws.send_frame(frame)
294 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
295 >>> ws.send_frame(frame)
297 Parameters
298 ----------
299 frame: ABNF frame
300 frame data created by ABNF.create_frame
301 """
302 if self.get_mask_key:
303 frame.get_mask_key = self.get_mask_key
304 data = frame.format()
305 length = len(data)
306 if (isEnabledForTrace()):
307 trace("++Sent raw: " + repr(data))
308 trace("++Sent decoded: " + frame.__str__())
309 with self.lock:
310 while data:
311 l = self._send(data)
312 data = data[l:]
314 return length
316 def send_binary(self, payload):
317 """
318 Send a binary message (OPCODE_BINARY).
320 Parameters
321 ----------
322 payload: bytes
323 payload of message to send.
324 """
325 return self.send(payload, ABNF.OPCODE_BINARY)
327 def ping(self, payload=""):
328 """
329 Send ping data.
331 Parameters
332 ----------
333 payload: str
334 data payload to send server.
335 """
336 if isinstance(payload, str):
337 payload = payload.encode("utf-8")
338 self.send(payload, ABNF.OPCODE_PING)
340 def pong(self, payload=""):
341 """
342 Send pong data.
344 Parameters
345 ----------
346 payload: str
347 data payload to send server.
348 """
349 if isinstance(payload, str):
350 payload = payload.encode("utf-8")
351 self.send(payload, ABNF.OPCODE_PONG)
353 def recv(self):
354 """
355 Receive string data(byte array) from the server.
357 Returns
358 ----------
359 data: string (byte array) value.
360 """
361 with self.readlock:
362 opcode, data = self.recv_data()
363 if opcode == ABNF.OPCODE_TEXT:
364 return data.decode("utf-8")
365 elif opcode == ABNF.OPCODE_TEXT or opcode == ABNF.OPCODE_BINARY:
366 return data
367 else:
368 return ''
370 def recv_data(self, control_frame=False):
371 """
372 Receive data with operation code.
374 Parameters
375 ----------
376 control_frame: bool
377 a boolean flag indicating whether to return control frame
378 data, defaults to False
380 Returns
381 -------
382 opcode, frame.data: tuple
383 tuple of operation code and string(byte array) value.
384 """
385 opcode, frame = self.recv_data_frame(control_frame)
386 return opcode, frame.data
388 def recv_data_frame(self, control_frame=False):
389 """
390 Receive data with operation code.
392 If a valid ping message is received, a pong response is sent.
394 Parameters
395 ----------
396 control_frame: bool
397 a boolean flag indicating whether to return control frame
398 data, defaults to False
400 Returns
401 -------
402 frame.opcode, frame: tuple
403 tuple of operation code and string(byte array) value.
404 """
405 while True:
406 frame = self.recv_frame()
407 if (isEnabledForTrace()):
408 trace("++Rcv raw: " + repr(frame.format()))
409 trace("++Rcv decoded: " + frame.__str__())
410 if not frame:
411 # handle error:
412 # 'NoneType' object has no attribute 'opcode'
413 raise WebSocketProtocolException(
414 "Not a valid frame {frame}".format(frame=frame))
415 elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT):
416 self.cont_frame.validate(frame)
417 self.cont_frame.add(frame)
419 if self.cont_frame.is_fire(frame):
420 return self.cont_frame.extract(frame)
422 elif frame.opcode == ABNF.OPCODE_CLOSE:
423 self.send_close()
424 return frame.opcode, frame
425 elif frame.opcode == ABNF.OPCODE_PING:
426 if len(frame.data) < 126:
427 self.pong(frame.data)
428 else:
429 raise WebSocketProtocolException(
430 "Ping message is too long")
431 if control_frame:
432 return frame.opcode, frame
433 elif frame.opcode == ABNF.OPCODE_PONG:
434 if control_frame:
435 return frame.opcode, frame
437 def recv_frame(self):
438 """
439 Receive data as frame from server.
441 Returns
442 -------
443 self.frame_buffer.recv_frame(): ABNF frame object
444 """
445 return self.frame_buffer.recv_frame()
447 def send_close(self, status=STATUS_NORMAL, reason=b""):
448 """
449 Send close data to the server.
451 Parameters
452 ----------
453 status: int
454 Status code to send. See STATUS_XXX.
455 reason: str or bytes
456 The reason to close. This must be string or UTF-8 bytes.
457 """
458 if status < 0 or status >= ABNF.LENGTH_16:
459 raise ValueError("code is invalid range")
460 self.connected = False
461 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
463 def close(self, status=STATUS_NORMAL, reason=b"", timeout=3):
464 """
465 Close Websocket object
467 Parameters
468 ----------
469 status: int
470 Status code to send. See STATUS_XXX.
471 reason: bytes
472 The reason to close in UTF-8.
473 timeout: int or float
474 Timeout until receive a close frame.
475 If None, it will wait forever until receive a close frame.
476 """
477 if self.connected:
478 if status < 0 or status >= ABNF.LENGTH_16:
479 raise ValueError("code is invalid range")
481 try:
482 self.connected = False
483 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
484 sock_timeout = self.sock.gettimeout()
485 self.sock.settimeout(timeout)
486 start_time = time.time()
487 while timeout is None or time.time() - start_time < timeout:
488 try:
489 frame = self.recv_frame()
490 if frame.opcode != ABNF.OPCODE_CLOSE:
491 continue
492 if isEnabledForError():
493 recv_status = struct.unpack("!H", frame.data[0:2])[0]
494 if recv_status >= 3000 and recv_status <= 4999:
495 debug("close status: " + repr(recv_status))
496 elif recv_status != STATUS_NORMAL:
497 error("close status: " + repr(recv_status))
498 break
499 except:
500 break
501 self.sock.settimeout(sock_timeout)
502 self.sock.shutdown(socket.SHUT_RDWR)
503 except:
504 pass
506 self.shutdown()
508 def abort(self):
509 """
510 Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
511 """
512 if self.connected:
513 self.sock.shutdown(socket.SHUT_RDWR)
515 def shutdown(self):
516 """
517 close socket, immediately.
518 """
519 if self.sock:
520 self.sock.close()
521 self.sock = None
522 self.connected = False
524 def _send(self, data):
525 return send(self.sock, data)
527 def _recv(self, bufsize):
528 try:
529 return recv(self.sock, bufsize)
530 except WebSocketConnectionClosedException:
531 if self.sock:
532 self.sock.close()
533 self.sock = None
534 self.connected = False
535 raise
538def create_connection(url, timeout=None, class_=WebSocket, **options):
539 """
540 Connect to url and return websocket object.
542 Connect to url and return the WebSocket object.
543 Passing optional timeout parameter will set the timeout on the socket.
544 If no timeout is supplied,
545 the global default timeout setting returned by getdefaulttimeout() is used.
546 You can customize using 'options'.
547 If you set "header" list object, you can set your own custom header.
549 >>> conn = create_connection("ws://echo.websocket.events",
550 ... header=["User-Agent: MyProgram",
551 ... "x-custom: header"])
553 Parameters
554 ----------
555 class_: class
556 class to instantiate when creating the connection. It has to implement
557 settimeout and connect. It's __init__ should be compatible with
558 WebSocket.__init__, i.e. accept all of it's kwargs.
559 header: list or dict
560 custom http header list or dict.
561 cookie: str
562 Cookie value.
563 origin: str
564 custom origin url.
565 suppress_origin: bool
566 suppress outputting origin header.
567 host: str
568 custom host header string.
569 timeout: int or float
570 socket timeout time. This value could be either float/integer.
571 If set to None, it uses the default_timeout value.
572 http_proxy_host: str
573 HTTP proxy host name.
574 http_proxy_port: str or int
575 HTTP proxy port. If not set, set to 80.
576 http_no_proxy: list
577 Whitelisted host names that don't use the proxy.
578 http_proxy_auth: tuple
579 HTTP proxy auth information. tuple of username and password. Default is None.
580 http_proxy_timeout: int or float
581 HTTP proxy timeout, default is 60 sec as per python-socks.
582 enable_multithread: bool
583 Enable lock for multithread.
584 redirect_limit: int
585 Number of redirects to follow.
586 sockopt: tuple
587 Values for socket.setsockopt.
588 sockopt must be a tuple and each element is an argument of sock.setsockopt.
589 sslopt: dict
590 Optional dict object for ssl socket options. See FAQ for details.
591 subprotocols: list
592 List of available subprotocols. Default is None.
593 skip_utf8_validation: bool
594 Skip utf8 validation.
595 socket: socket
596 Pre-initialized stream socket.
597 """
598 sockopt = options.pop("sockopt", [])
599 sslopt = options.pop("sslopt", {})
600 fire_cont_frame = options.pop("fire_cont_frame", False)
601 enable_multithread = options.pop("enable_multithread", True)
602 skip_utf8_validation = options.pop("skip_utf8_validation", False)
603 websock = class_(sockopt=sockopt, sslopt=sslopt,
604 fire_cont_frame=fire_cont_frame,
605 enable_multithread=enable_multithread,
606 skip_utf8_validation=skip_utf8_validation, **options)
607 websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
608 websock.connect(url, **options)
609 return websock