Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1import socket 

2import struct 

3import threading 

4import time 

5from typing import Optional, Union 

6 

7# websocket modules 

8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer 

9from ._exceptions import ( 

10 WebSocketProtocolException, 

11 WebSocketConnectionClosedException, 

12 WebSocketTimeoutException, 

13) 

14from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake 

15from ._http import connect, proxy_info 

16from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace 

17from ._socket import getdefaulttimeout, recv, send, sock_opt 

18from ._ssl_compat import ssl 

19from ._utils import NoLock 

20from ._dispatcher import DispatcherBase, WrappedDispatcher 

21 

22""" 

23_core.py 

24websocket - WebSocket client library for Python 

25 

26Copyright 2025 engn33r 

27 

28Licensed under the Apache License, Version 2.0 (the "License"); 

29you may not use this file except in compliance with the License. 

30You may obtain a copy of the License at 

31 

32 http://www.apache.org/licenses/LICENSE-2.0 

33 

34Unless required by applicable law or agreed to in writing, software 

35distributed under the License is distributed on an "AS IS" BASIS, 

36WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

37See the License for the specific language governing permissions and 

38limitations under the License. 

39""" 

40 

41__all__ = ["WebSocket", "create_connection"] 

42 

43 

44class WebSocket: 

45 """ 

46 Low level WebSocket interface. 

47 

48 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

49 

50 We can connect to the websocket server and send/receive data. 

51 The following example is an echo client. 

52 

53 >>> import websocket 

54 >>> ws = websocket.WebSocket() 

55 >>> ws.connect("ws://echo.websocket.events") 

56 >>> ws.recv() 

57 'echo.websocket.events sponsored by Lob.com' 

58 >>> ws.send("Hello, Server") 

59 19 

60 >>> ws.recv() 

61 'Hello, Server' 

62 >>> ws.close() 

63 

64 Parameters 

65 ---------- 

66 get_mask_key: func 

67 A callable function to get new mask keys, see the 

68 WebSocket.set_mask_key's docstring for more information. 

69 sockopt: tuple 

70 Values for socket.setsockopt. 

71 sockopt must be tuple and each element is argument of sock.setsockopt. 

72 sslopt: dict 

73 Optional dict object for ssl socket options. See FAQ for details. 

74 fire_cont_frame: bool 

75 Fire recv event for each cont frame. Default is False. 

76 enable_multithread: bool 

77 If set to True, lock send method. 

78 skip_utf8_validation: bool 

79 Skip utf8 validation. 

80 """ 

81 

82 def __init__( 

83 self, 

84 get_mask_key=None, 

85 sockopt=None, 

86 sslopt=None, 

87 fire_cont_frame: bool = False, 

88 enable_multithread: bool = True, 

89 skip_utf8_validation: bool = False, 

90 dispatcher: Union[DispatcherBase, WrappedDispatcher] = None, 

91 **_, 

92 ): 

93 """ 

94 Initialize WebSocket object. 

95 

96 Parameters 

97 ---------- 

98 sslopt: dict 

99 Optional dict object for ssl socket options. See FAQ for details. 

100 """ 

101 self.sock_opt = sock_opt(sockopt, sslopt) 

102 self.handshake_response = None 

103 self.sock: Optional[socket.socket] = None 

104 

105 self.connected = False 

106 self.get_mask_key = get_mask_key 

107 # These buffer over the build-up of a single frame. 

108 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

109 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation) 

110 self.dispatcher = dispatcher 

111 

112 if enable_multithread: 

113 self.lock = threading.Lock() 

114 self.readlock = threading.Lock() 

115 else: 

116 self.lock = NoLock() # type: ignore[assignment] 

117 self.readlock = NoLock() # type: ignore[assignment] 

118 

119 def __iter__(self): 

120 """ 

121 Allow iteration over websocket, implying sequential `recv` executions. 

122 """ 

123 while True: 

124 yield self.recv() 

125 

126 def __next__(self): 

127 return self.recv() 

128 

129 def next(self): 

130 return self.__next__() 

131 

132 def fileno(self): 

133 return self.sock.fileno() 

134 

135 def set_mask_key(self, func): 

136 """ 

137 Set function to create mask key. You can customize mask key generator. 

138 Mainly, this is for testing purpose. 

139 

140 Parameters 

141 ---------- 

142 func: func 

143 callable object. the func takes 1 argument as integer. 

144 The argument means length of mask key. 

145 This func must return string(byte array), 

146 which length is argument specified. 

147 """ 

148 self.get_mask_key = func 

149 

150 def gettimeout(self) -> Optional[Union[float, int]]: 

151 """ 

152 Get the websocket timeout (in seconds) as an int or float 

153 

154 Returns 

155 ---------- 

156 timeout: int or float 

157 returns timeout value (in seconds). This value could be either float/integer. 

158 """ 

159 return self.sock_opt.timeout 

160 

161 def settimeout(self, timeout: Optional[Union[float, int]]): 

162 """ 

163 Set the timeout to the websocket. 

164 

165 Parameters 

166 ---------- 

167 timeout: int or float 

168 timeout time (in seconds). This value could be either float/integer. 

169 """ 

170 self.sock_opt.timeout = timeout 

171 if self.sock: 

172 self.sock.settimeout(timeout) 

173 

174 timeout = property(gettimeout, settimeout) 

175 

176 def getsubprotocol(self): 

177 """ 

178 Get subprotocol 

179 """ 

180 if self.handshake_response: 

181 return self.handshake_response.subprotocol 

182 else: 

183 return None 

184 

185 subprotocol = property(getsubprotocol) 

186 

187 def getstatus(self): 

188 """ 

189 Get handshake status 

190 """ 

191 if self.handshake_response: 

192 return self.handshake_response.status 

193 else: 

194 return None 

195 

196 status = property(getstatus) 

197 

198 def getheaders(self): 

199 """ 

200 Get handshake response header 

201 """ 

202 if self.handshake_response: 

203 return self.handshake_response.headers 

204 else: 

205 return None 

206 

207 def is_ssl(self): 

208 try: 

209 return isinstance(self.sock, ssl.SSLSocket) 

210 except (AttributeError, NameError): 

211 return False 

212 

213 headers = property(getheaders) 

214 

215 def connect(self, url, **options): 

216 """ 

217 Connect to url. url is websocket url scheme. 

218 ie. ws://host:port/resource 

219 You can customize using 'options'. 

220 If you set "header" list object, you can set your own custom header. 

221 

222 >>> ws = WebSocket() 

223 >>> ws.connect("ws://echo.websocket.events", 

224 ... header=["User-Agent: MyProgram", 

225 ... "x-custom: header"]) 

226 

227 Parameters 

228 ---------- 

229 header: list or dict 

230 Custom http header list or dict. 

231 cookie: str 

232 Cookie value. 

233 origin: str 

234 Custom origin url. 

235 connection: str 

236 Custom connection header value. 

237 Default value "Upgrade" set in _handshake.py 

238 suppress_origin: bool 

239 Suppress outputting origin header. 

240 host: str 

241 Custom host header string. 

242 timeout: int or float 

243 Socket timeout time. This value is an integer or float. 

244 If you set None for this value, it means "use default_timeout value" 

245 http_proxy_host: str 

246 HTTP proxy host name. 

247 http_proxy_port: str or int 

248 HTTP proxy port. Default is 80. 

249 http_no_proxy: list 

250 Whitelisted host names that don't use the proxy. 

251 http_proxy_auth: tuple 

252 HTTP proxy auth information. Tuple of username and password. Default is None. 

253 http_proxy_timeout: int or float 

254 HTTP proxy timeout, default is 60 sec as per python-socks. 

255 redirect_limit: int 

256 Number of redirects to follow. 

257 subprotocols: list 

258 List of available subprotocols. Default is None. 

259 socket: socket 

260 Pre-initialized stream socket. 

261 """ 

262 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout) 

263 self.sock, addrs = connect( 

264 url, self.sock_opt, proxy_info(**options), options.pop("socket", None) 

265 ) 

266 

267 try: 

268 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

269 for _ in range(options.pop("redirect_limit", 3)): 

270 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES: 

271 url = self.handshake_response.headers["location"] 

272 self.sock.close() 

273 self.sock, addrs = connect( 

274 url, 

275 self.sock_opt, 

276 proxy_info(**options), 

277 options.pop("socket", None), 

278 ) 

279 self.handshake_response = handshake( 

280 self.sock, url, *addrs, **options 

281 ) 

282 self.connected = True 

283 except: 

284 if self.sock: 

285 self.sock.close() 

286 self.sock = None 

287 raise 

288 

289 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int: 

290 """ 

291 Send the data as string. 

292 

293 Parameters 

294 ---------- 

295 payload: str 

296 Payload must be utf-8 string or unicode, 

297 If the opcode is OPCODE_TEXT. 

298 Otherwise, it must be string(byte array). 

299 opcode: int 

300 Operation code (opcode) to send. 

301 """ 

302 

303 frame = ABNF.create_frame(payload, opcode) 

304 return self.send_frame(frame) 

305 

306 def send_text(self, text_data: str) -> int: 

307 """ 

308 Sends UTF-8 encoded text. 

309 """ 

310 return self.send(text_data, ABNF.OPCODE_TEXT) 

311 

312 def send_bytes(self, data: Union[bytes, bytearray]) -> int: 

313 """ 

314 Sends a sequence of bytes. 

315 """ 

316 return self.send(data, ABNF.OPCODE_BINARY) 

317 

318 def send_frame(self, frame) -> int: 

319 """ 

320 Send the data frame. 

321 

322 >>> ws = create_connection("ws://echo.websocket.events") 

323 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

324 >>> ws.send_frame(frame) 

325 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

326 >>> ws.send_frame(frame) 

327 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

328 >>> ws.send_frame(frame) 

329 

330 Parameters 

331 ---------- 

332 frame: ABNF frame 

333 frame data created by ABNF.create_frame 

334 """ 

335 if self.get_mask_key: 

336 frame.get_mask_key = self.get_mask_key 

337 data = frame.format() 

338 length = len(data) 

339 if isEnabledForTrace(): 

340 trace(f"++Sent raw: {repr(data)}") 

341 trace(f"++Sent decoded: {frame.__str__()}") 

342 with self.lock: 

343 while data: 

344 bytes_sent = self._send(data) 

345 data = data[bytes_sent:] 

346 

347 return length 

348 

349 def send_binary(self, payload: bytes) -> int: 

350 """ 

351 Send a binary message (OPCODE_BINARY). 

352 

353 Parameters 

354 ---------- 

355 payload: bytes 

356 payload of message to send. 

357 """ 

358 return self.send(payload, ABNF.OPCODE_BINARY) 

359 

360 def ping(self, payload: Union[str, bytes] = ""): 

361 """ 

362 Send ping data. 

363 

364 Parameters 

365 ---------- 

366 payload: str 

367 data payload to send server. 

368 """ 

369 if isinstance(payload, str): 

370 payload = payload.encode("utf-8") 

371 self.send(payload, ABNF.OPCODE_PING) 

372 

373 def pong(self, payload: Union[str, bytes] = ""): 

374 """ 

375 Send pong data. 

376 

377 Parameters 

378 ---------- 

379 payload: str 

380 data payload to send server. 

381 """ 

382 if isinstance(payload, str): 

383 payload = payload.encode("utf-8") 

384 self.send(payload, ABNF.OPCODE_PONG) 

385 

386 def recv(self) -> Union[str, bytes]: 

387 """ 

388 Receive string data(byte array) from the server. 

389 

390 Returns 

391 ---------- 

392 data: string (byte array) value. 

393 """ 

394 with self.readlock: 

395 opcode, data = self.recv_data() 

396 if opcode == ABNF.OPCODE_TEXT: 

397 data_received: Union[bytes, str] = data 

398 if isinstance(data_received, bytes): 

399 return data_received.decode("utf-8") 

400 elif isinstance(data_received, str): 

401 return data_received 

402 elif opcode == ABNF.OPCODE_BINARY: 

403 data_binary: bytes = data 

404 return data_binary 

405 else: 

406 return "" 

407 

408 def recv_data(self, control_frame: bool = False) -> tuple: 

409 """ 

410 Receive data with operation code. 

411 

412 Parameters 

413 ---------- 

414 control_frame: bool 

415 a boolean flag indicating whether to return control frame 

416 data, defaults to False 

417 

418 Returns 

419 ------- 

420 opcode, frame.data: tuple 

421 tuple of operation code and string(byte array) value. 

422 """ 

423 opcode, frame = self.recv_data_frame(control_frame) 

424 return opcode, frame.data 

425 

426 def recv_data_frame(self, control_frame: bool = False) -> tuple: 

427 """ 

428 Receive data with operation code. 

429 

430 If a valid ping message is received, a pong response is sent. 

431 

432 Parameters 

433 ---------- 

434 control_frame: bool 

435 a boolean flag indicating whether to return control frame 

436 data, defaults to False 

437 

438 Returns 

439 ------- 

440 frame.opcode, frame: tuple 

441 tuple of operation code and string(byte array) value. 

442 """ 

443 while True: 

444 frame = self.recv_frame() 

445 if isEnabledForTrace(): 

446 trace(f"++Rcv raw: {repr(frame.format())}") 

447 trace(f"++Rcv decoded: {frame.__str__()}") 

448 if not frame: 

449 # handle error: 

450 # 'NoneType' object has no attribute 'opcode' 

451 raise WebSocketProtocolException(f"Not a valid frame {frame}") 

452 elif frame.opcode in ( 

453 ABNF.OPCODE_TEXT, 

454 ABNF.OPCODE_BINARY, 

455 ABNF.OPCODE_CONT, 

456 ): 

457 self.cont_frame.validate(frame) 

458 self.cont_frame.add(frame) 

459 

460 if self.cont_frame.is_fire(frame): 

461 return self.cont_frame.extract(frame) 

462 

463 elif frame.opcode == ABNF.OPCODE_CLOSE: 

464 self.send_close() 

465 return frame.opcode, frame 

466 elif frame.opcode == ABNF.OPCODE_PING: 

467 if len(frame.data) < 126: 

468 self.pong(frame.data) 

469 else: 

470 raise WebSocketProtocolException("Ping message is too long") 

471 if control_frame: 

472 return frame.opcode, frame 

473 elif frame.opcode == ABNF.OPCODE_PONG: 

474 if control_frame: 

475 return frame.opcode, frame 

476 

477 def recv_frame(self): 

478 """ 

479 Receive data as frame from server. 

480 

481 Returns 

482 ------- 

483 self.frame_buffer.recv_frame(): ABNF frame object 

484 """ 

485 return self.frame_buffer.recv_frame() 

486 

487 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""): 

488 """ 

489 Send close data to the server. 

490 

491 Parameters 

492 ---------- 

493 status: int 

494 Status code to send. See STATUS_XXX. 

495 reason: str or bytes 

496 The reason to close. This must be string or UTF-8 bytes. 

497 """ 

498 if status < 0 or status >= ABNF.LENGTH_16: 

499 raise ValueError("code is invalid range") 

500 self.connected = False 

501 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

502 

503 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3): 

504 """ 

505 Close Websocket object 

506 

507 Parameters 

508 ---------- 

509 status: int 

510 Status code to send. See VALID_CLOSE_STATUS in ABNF. 

511 reason: bytes 

512 The reason to close in UTF-8. 

513 timeout: int or float 

514 Timeout until receive a close frame. 

515 If None, it will wait forever until receive a close frame. 

516 """ 

517 if not self.connected: 

518 return 

519 if status < 0 or status >= ABNF.LENGTH_16: 

520 raise ValueError("code is invalid range") 

521 

522 try: 

523 self.connected = False 

524 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

525 if self.sock is None: 

526 return 

527 sock_timeout = self.sock.gettimeout() 

528 self.sock.settimeout(timeout) 

529 start_time = time.time() 

530 while timeout is None or time.time() - start_time < timeout: 

531 try: 

532 frame = self.recv_frame() 

533 if frame.opcode != ABNF.OPCODE_CLOSE: 

534 continue 

535 if isEnabledForError(): 

536 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

537 if recv_status >= 3000 and recv_status <= 4999: 

538 debug(f"close status: {repr(recv_status)}") 

539 elif recv_status != STATUS_NORMAL: 

540 error(f"close status: {repr(recv_status)}") 

541 break 

542 except ( 

543 WebSocketConnectionClosedException, 

544 WebSocketTimeoutException, 

545 struct.error, 

546 ): 

547 break 

548 if self.sock is not None: 

549 self.sock.settimeout(sock_timeout) 

550 self.sock.shutdown(socket.SHUT_RDWR) 

551 except: 

552 pass 

553 

554 self.shutdown() 

555 

556 def abort(self): 

557 """ 

558 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

559 """ 

560 if self.connected: 

561 self.sock.shutdown(socket.SHUT_RDWR) 

562 

563 def shutdown(self): 

564 """ 

565 close socket, immediately. 

566 """ 

567 if self.sock: 

568 self.sock.close() 

569 self.sock = None 

570 self.connected = False 

571 

572 def _send(self, data: Union[str, bytes]): 

573 if self.sock is None: 

574 raise WebSocketConnectionClosedException("socket is already closed.") 

575 if self.dispatcher: 

576 return self.dispatcher.send(self.sock, data) 

577 return send(self.sock, data) 

578 

579 def _recv(self, bufsize): 

580 try: 

581 return recv(self.sock, bufsize) 

582 except WebSocketConnectionClosedException: 

583 if self.sock: 

584 self.sock.close() 

585 self.sock = None 

586 self.connected = False 

587 raise 

588 

589 

590def create_connection(url: str, timeout=None, class_=WebSocket, **options): 

591 """ 

592 Connect to url and return websocket object. 

593 

594 Connect to url and return the WebSocket object. 

595 Passing optional timeout parameter will set the timeout on the socket. 

596 If no timeout is supplied, 

597 the global default timeout setting returned by getdefaulttimeout() is used. 

598 You can customize using 'options'. 

599 If you set "header" list object, you can set your own custom header. 

600 

601 >>> conn = create_connection("ws://echo.websocket.events", 

602 ... header=["User-Agent: MyProgram", 

603 ... "x-custom: header"]) 

604 

605 Parameters 

606 ---------- 

607 class_: class 

608 class to instantiate when creating the connection. It has to implement 

609 settimeout and connect. It's __init__ should be compatible with 

610 WebSocket.__init__, i.e. accept all of it's kwargs. 

611 header: list or dict 

612 custom http header list or dict. 

613 cookie: str 

614 Cookie value. 

615 origin: str 

616 custom origin url. 

617 suppress_origin: bool 

618 suppress outputting origin header. 

619 host: str 

620 custom host header string. 

621 timeout: int or float 

622 socket timeout time. This value could be either float/integer. 

623 If set to None, it uses the default_timeout value. 

624 http_proxy_host: str 

625 HTTP proxy host name. 

626 http_proxy_port: str or int 

627 HTTP proxy port. If not set, set to 80. 

628 http_no_proxy: list 

629 Whitelisted host names that don't use the proxy. 

630 http_proxy_auth: tuple 

631 HTTP proxy auth information. tuple of username and password. Default is None. 

632 http_proxy_timeout: int or float 

633 HTTP proxy timeout, default is 60 sec as per python-socks. 

634 enable_multithread: bool 

635 Enable lock for multithread. 

636 redirect_limit: int 

637 Number of redirects to follow. 

638 sockopt: tuple 

639 Values for socket.setsockopt. 

640 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

641 sslopt: dict 

642 Optional dict object for ssl socket options. See FAQ for details. 

643 subprotocols: list 

644 List of available subprotocols. Default is None. 

645 skip_utf8_validation: bool 

646 Skip utf8 validation. 

647 socket: socket 

648 Pre-initialized stream socket. 

649 """ 

650 sockopt = options.pop("sockopt", []) 

651 sslopt = options.pop("sslopt", {}) 

652 fire_cont_frame = options.pop("fire_cont_frame", False) 

653 enable_multithread = options.pop("enable_multithread", True) 

654 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

655 websock = class_( 

656 sockopt=sockopt, 

657 sslopt=sslopt, 

658 fire_cont_frame=fire_cont_frame, 

659 enable_multithread=enable_multithread, 

660 skip_utf8_validation=skip_utf8_validation, 

661 **options, 

662 ) 

663 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

664 websock.connect(url, **options) 

665 return websock