Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

249 statements  

1import socket 

2import struct 

3import threading 

4import time 

5from typing import Any, Callable, Optional, Type, Union 

6 

7# websocket modules 

8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer 

9from ._exceptions import ( 

10 WebSocketProtocolException, 

11 WebSocketConnectionClosedException, 

12 WebSocketTimeoutException, 

13 WebSocketException, 

14) 

15from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake, handshake_response 

16from ._http import connect, proxy_info 

17from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace 

18from ._socket import getdefaulttimeout, recv, send, sock_opt 

19from ._ssl_compat import ssl 

20from ._utils import NoLock 

21from ._dispatcher import DispatcherBase, WrappedDispatcher 

22 

23""" 

24_core.py 

25websocket - WebSocket client library for Python 

26 

27Copyright 2025 engn33r 

28 

29Licensed under the Apache License, Version 2.0 (the "License"); 

30you may not use this file except in compliance with the License. 

31You may obtain a copy of the License at 

32 

33 http://www.apache.org/licenses/LICENSE-2.0 

34 

35Unless required by applicable law or agreed to in writing, software 

36distributed under the License is distributed on an "AS IS" BASIS, 

37WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

38See the License for the specific language governing permissions and 

39limitations under the License. 

40""" 

41 

42__all__ = ["WebSocket", "create_connection"] 

43 

44 

45class WebSocket: 

46 """ 

47 Low level WebSocket interface. 

48 

49 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

50 

51 We can connect to the websocket server and send/receive data. 

52 The following example is an echo client. 

53 

54 >>> import websocket 

55 >>> ws = websocket.WebSocket() 

56 >>> ws.connect("ws://echo.websocket.events") 

57 >>> ws.recv() 

58 'echo.websocket.events sponsored by Lob.com' 

59 >>> ws.send("Hello, Server") 

60 19 

61 >>> ws.recv() 

62 'Hello, Server' 

63 >>> ws.close() 

64 

65 Parameters 

66 ---------- 

67 get_mask_key: func 

68 A callable function to get new mask keys, see the 

69 WebSocket.set_mask_key's docstring for more information. 

70 sockopt: tuple 

71 Values for socket.setsockopt. 

72 sockopt must be tuple and each element is argument of sock.setsockopt. 

73 sslopt: dict 

74 Optional dict object for ssl socket options. See FAQ for details. 

75 fire_cont_frame: bool 

76 Fire recv event for each cont frame. Default is False. 

77 enable_multithread: bool 

78 If set to True, lock send method. 

79 skip_utf8_validation: bool 

80 Skip utf8 validation. 

81 """ 

82 

83 def __init__( 

84 self, 

85 get_mask_key: Optional[Callable] = None, 

86 sockopt: Optional[list] = None, 

87 sslopt: Optional[dict] = None, 

88 fire_cont_frame: bool = False, 

89 enable_multithread: bool = True, 

90 skip_utf8_validation: bool = False, 

91 dispatcher: Optional[Union[DispatcherBase, WrappedDispatcher]] = None, 

92 **_: Any, 

93 ) -> None: 

94 """ 

95 Initialize WebSocket object. 

96 

97 Parameters 

98 ---------- 

99 sslopt: dict 

100 Optional dict object for ssl socket options. See FAQ for details. 

101 """ 

102 self.sock_opt = sock_opt(sockopt, sslopt) 

103 self.handshake_response: Optional[handshake_response] = None 

104 self.sock: Optional[socket.socket] = None 

105 

106 self.connected = False 

107 self.close_frame: Optional[ABNF] = None 

108 self.get_mask_key = get_mask_key 

109 # These buffer over the build-up of a single frame. 

110 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

111 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation) 

112 self.dispatcher = dispatcher 

113 

114 if enable_multithread: 

115 self.lock = threading.Lock() 

116 self.readlock = threading.Lock() 

117 else: 

118 self.lock = NoLock() # type: ignore[assignment] 

119 self.readlock = NoLock() # type: ignore[assignment] 

120 

121 def __iter__(self): 

122 """ 

123 Allow iteration over websocket, implying sequential `recv` executions. 

124 """ 

125 while True: 

126 yield self.recv() 

127 

128 def __next__(self): 

129 return self.recv() 

130 

131 def next(self): 

132 return self.__next__() 

133 

134 def fileno(self): 

135 if self.sock is None: 

136 raise WebSocketException("Connection not established") 

137 return self.sock.fileno() 

138 

139 def set_mask_key(self, func): 

140 """ 

141 Set function to create mask key. You can customize mask key generator. 

142 Mainly, this is for testing purpose. 

143 

144 Parameters 

145 ---------- 

146 func: func 

147 callable object. the func takes 1 argument as integer. 

148 The argument means length of mask key. 

149 This func must return string(byte array), 

150 which length is argument specified. 

151 """ 

152 self.get_mask_key = func 

153 

154 def gettimeout(self) -> Optional[Union[float, int]]: 

155 """ 

156 Get the websocket timeout (in seconds) as an int or float 

157 

158 Returns 

159 ---------- 

160 timeout: int or float 

161 returns timeout value (in seconds). This value could be either float/integer. 

162 """ 

163 return self.sock_opt.timeout 

164 

165 def settimeout(self, timeout: Optional[Union[float, int]]) -> None: 

166 """ 

167 Set the timeout to the websocket. 

168 

169 Parameters 

170 ---------- 

171 timeout: int or float 

172 timeout time (in seconds). This value could be either float/integer. 

173 """ 

174 self.sock_opt.timeout = timeout 

175 if self.sock: 

176 self.sock.settimeout(timeout) 

177 

178 timeout = property(gettimeout, settimeout) 

179 

180 def getsubprotocol(self) -> Optional[str]: 

181 """ 

182 Get subprotocol 

183 """ 

184 if self.handshake_response: 

185 return self.handshake_response.subprotocol 

186 else: 

187 return None 

188 

189 subprotocol = property(getsubprotocol) 

190 

191 def getstatus(self) -> Optional[int]: 

192 """ 

193 Get handshake status 

194 """ 

195 if self.handshake_response: 

196 return self.handshake_response.status 

197 else: 

198 return None 

199 

200 status = property(getstatus) 

201 

202 def getheaders(self) -> Optional[dict]: 

203 """ 

204 Get handshake response header 

205 """ 

206 if self.handshake_response: 

207 return self.handshake_response.headers 

208 else: 

209 return None 

210 

211 def is_ssl(self): 

212 try: 

213 return isinstance(self.sock, ssl.SSLSocket) 

214 except (AttributeError, NameError): 

215 return False 

216 

217 headers = property(getheaders) 

218 

219 def connect(self, url, **options): 

220 """ 

221 Connect to url. url is websocket url scheme. 

222 ie. ws://host:port/resource 

223 You can customize using 'options'. 

224 If you set "header" list object, you can set your own custom header. 

225 

226 >>> ws = WebSocket() 

227 >>> ws.connect("ws://echo.websocket.events", 

228 ... header=["User-Agent: MyProgram", 

229 ... "x-custom: header"]) 

230 

231 Parameters 

232 ---------- 

233 header: list or dict 

234 Custom http header list or dict. 

235 cookie: str 

236 Cookie value. 

237 origin: str 

238 Custom origin url. 

239 connection: str 

240 Custom connection header value. 

241 Default value "Upgrade" set in _handshake.py 

242 suppress_origin: bool 

243 Suppress outputting origin header. 

244 suppress_host: bool 

245 Suppress outputting host header. 

246 host: str 

247 Custom host header string. 

248 timeout: int or float 

249 Socket timeout time. This value is an integer or float. 

250 If you set None for this value, it means "use default_timeout value" 

251 http_proxy_host: str 

252 HTTP proxy host name. 

253 http_proxy_port: str or int 

254 HTTP proxy port. Default is 80. 

255 http_no_proxy: list 

256 Whitelisted host names that don't use the proxy. 

257 http_proxy_auth: tuple 

258 HTTP proxy auth information. Tuple of username and password. Default is None. 

259 http_proxy_timeout: int or float 

260 HTTP proxy timeout, default is 60 sec as per python-socks. 

261 redirect_limit: int 

262 Number of redirects to follow. 

263 subprotocols: list 

264 List of available subprotocols. Default is None. 

265 socket: socket 

266 Pre-initialized stream socket. 

267 """ 

268 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout) 

269 self.sock, addrs = connect( 

270 url, self.sock_opt, proxy_info(**options), options.pop("socket", None) 

271 ) 

272 

273 try: 

274 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

275 for _ in range(options.pop("redirect_limit", 3)): 

276 if ( 

277 self.handshake_response is not None 

278 and self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES 

279 ): 

280 url = self.handshake_response.headers["location"] 

281 self.sock.close() 

282 self.sock, addrs = connect( 

283 url, 

284 self.sock_opt, 

285 proxy_info(**options), 

286 options.pop("socket", None), 

287 ) 

288 self.handshake_response = handshake( 

289 self.sock, url, *addrs, **options 

290 ) 

291 self.connected = True 

292 except: 

293 if self.sock: 

294 self.sock.close() 

295 self.sock = None 

296 raise 

297 

298 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int: 

299 """ 

300 Send the data as string. 

301 

302 Parameters 

303 ---------- 

304 payload: str 

305 Payload must be utf-8 string or unicode, 

306 If the opcode is OPCODE_TEXT. 

307 Otherwise, it must be string(byte array). 

308 opcode: int 

309 Operation code (opcode) to send. 

310 """ 

311 

312 frame = ABNF.create_frame(payload, opcode) 

313 return self.send_frame(frame) 

314 

315 def send_text(self, text_data: str) -> int: 

316 """ 

317 Sends UTF-8 encoded text. 

318 """ 

319 return self.send(text_data, ABNF.OPCODE_TEXT) 

320 

321 def send_bytes(self, data: Union[bytes, bytearray]) -> int: 

322 """ 

323 Sends a sequence of bytes. 

324 """ 

325 return self.send(data, ABNF.OPCODE_BINARY) 

326 

327 def send_frame(self, frame: ABNF) -> int: 

328 """ 

329 Send the data frame. 

330 

331 >>> ws = create_connection("ws://echo.websocket.events") 

332 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

333 >>> ws.send_frame(frame) 

334 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

335 >>> ws.send_frame(frame) 

336 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

337 >>> ws.send_frame(frame) 

338 

339 Parameters 

340 ---------- 

341 frame: ABNF frame 

342 frame data created by ABNF.create_frame 

343 """ 

344 if self.get_mask_key: 

345 frame.get_mask_key = self.get_mask_key 

346 data = frame.format() 

347 length = len(data) 

348 if isEnabledForTrace(): 

349 trace(f"++Sent raw: {repr(data)}") 

350 trace(f"++Sent decoded: {frame.__str__()}") 

351 with self.lock: 

352 while data: 

353 bytes_sent = self._send(data) 

354 data = data[bytes_sent:] 

355 

356 return length 

357 

358 def send_binary(self, payload: bytes) -> int: 

359 """ 

360 Send a binary message (OPCODE_BINARY). 

361 

362 Parameters 

363 ---------- 

364 payload: bytes 

365 payload of message to send. 

366 """ 

367 return self.send(payload, ABNF.OPCODE_BINARY) 

368 

369 def ping(self, payload: Union[str, bytes] = "") -> None: 

370 """ 

371 Send ping data. 

372 

373 Parameters 

374 ---------- 

375 payload: str 

376 data payload to send server. 

377 """ 

378 if isinstance(payload, str): 

379 payload = payload.encode("utf-8") 

380 self.send(payload, ABNF.OPCODE_PING) 

381 

382 def pong(self, payload: Union[str, bytes] = "") -> None: 

383 """ 

384 Send pong data. 

385 

386 Parameters 

387 ---------- 

388 payload: str 

389 data payload to send server. 

390 """ 

391 if isinstance(payload, str): 

392 payload = payload.encode("utf-8") 

393 self.send(payload, ABNF.OPCODE_PONG) 

394 

395 def recv(self) -> Union[str, bytes]: 

396 """ 

397 Receive string data(byte array) from the server. 

398 

399 Returns 

400 ---------- 

401 data: string (byte array) value. 

402 """ 

403 with self.readlock: 

404 opcode, data = self.recv_data() 

405 if opcode == ABNF.OPCODE_TEXT: 

406 data_received: Union[bytes, str] = data 

407 if isinstance(data_received, bytes): 

408 return data_received.decode("utf-8") 

409 elif isinstance(data_received, str): 

410 return data_received 

411 elif opcode == ABNF.OPCODE_BINARY: 

412 data_binary: bytes = data 

413 return data_binary 

414 else: 

415 return "" 

416 

417 def recv_data(self, control_frame: bool = False) -> tuple: 

418 """ 

419 Receive data with operation code. 

420 

421 Parameters 

422 ---------- 

423 control_frame: bool 

424 a boolean flag indicating whether to return control frame 

425 data, defaults to False 

426 

427 Returns 

428 ------- 

429 opcode, frame.data: tuple 

430 tuple of operation code and string(byte array) value. 

431 """ 

432 opcode, frame = self.recv_data_frame(control_frame) 

433 return opcode, frame.data 

434 

435 def recv_data_frame(self, control_frame: bool = False) -> tuple: 

436 """ 

437 Receive data with operation code. 

438 

439 If a valid ping message is received, a pong response is sent. 

440 

441 Parameters 

442 ---------- 

443 control_frame: bool 

444 a boolean flag indicating whether to return control frame 

445 data, defaults to False 

446 

447 Returns 

448 ------- 

449 frame.opcode, frame: tuple 

450 tuple of operation code and string(byte array) value. 

451 """ 

452 while True: 

453 frame = self.recv_frame() 

454 if isEnabledForTrace(): 

455 trace(f"++Rcv raw: {repr(frame.format())}") 

456 trace(f"++Rcv decoded: {frame.__str__()}") 

457 if not frame: 

458 # handle error: 

459 # 'NoneType' object has no attribute 'opcode' 

460 raise WebSocketProtocolException(f"Not a valid frame {frame}") 

461 elif frame.opcode in ( 

462 ABNF.OPCODE_TEXT, 

463 ABNF.OPCODE_BINARY, 

464 ABNF.OPCODE_CONT, 

465 ): 

466 self.cont_frame.validate(frame) 

467 self.cont_frame.add(frame) 

468 

469 if self.cont_frame.is_fire(frame): 

470 return self.cont_frame.extract(frame) 

471 

472 elif frame.opcode == ABNF.OPCODE_CLOSE: 

473 self.send_close() 

474 return frame.opcode, frame 

475 elif frame.opcode == ABNF.OPCODE_PING: 

476 if len(frame.data) < 126: 

477 self.pong(frame.data) 

478 else: 

479 raise WebSocketProtocolException("Ping message is too long") 

480 if control_frame: 

481 return frame.opcode, frame 

482 elif frame.opcode == ABNF.OPCODE_PONG: 

483 if control_frame: 

484 return frame.opcode, frame 

485 

486 def recv_frame(self): 

487 """ 

488 Receive data as frame from server. 

489 

490 Returns 

491 ------- 

492 self.frame_buffer.recv_frame(): ABNF frame object 

493 """ 

494 return self.frame_buffer.recv_frame() 

495 

496 def send_close( 

497 self, status: int = STATUS_NORMAL, reason: Union[str, bytes] = b"" 

498 ) -> None: 

499 """ 

500 Send close data to the server. 

501 

502 Parameters 

503 ---------- 

504 status: int 

505 Status code to send. See STATUS_XXX. 

506 reason: str or bytes 

507 The reason to close. This must be string or UTF-8 bytes. 

508 """ 

509 if status < 0 or status >= ABNF.LENGTH_16: 

510 raise ValueError("code is invalid range") 

511 

512 if reason is None: 

513 reason_bytes = b"" 

514 elif isinstance(reason, str): 

515 reason_bytes = reason.encode("utf-8") 

516 elif isinstance(reason, bytes): 

517 reason_bytes = reason 

518 else: 

519 reason_bytes = bytes(reason) 

520 

521 self.connected = False 

522 self.send(struct.pack("!H", status) + reason_bytes, ABNF.OPCODE_CLOSE) 

523 

524 def close( 

525 self, 

526 status: int = STATUS_NORMAL, 

527 reason: Union[str, bytes] = b"", 

528 timeout: int = 3, 

529 ) -> None: 

530 """ 

531 Close Websocket object 

532 

533 Parameters 

534 ---------- 

535 status: int 

536 Status code to send. See VALID_CLOSE_STATUS in ABNF. 

537 reason: bytes 

538 The reason to close in UTF-8. 

539 timeout: int or float 

540 Timeout until receive a close frame. 

541 If None, it will wait forever until receive a close frame. 

542 """ 

543 if not self.connected: 

544 return 

545 if status < 0 or status >= ABNF.LENGTH_16: 

546 raise ValueError("code is invalid range") 

547 

548 # Reset close_frame to avoid stale data from previous connections 

549 self.close_frame = None 

550 

551 try: 

552 self.connected = False 

553 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

554 if self.sock is None: 

555 return 

556 sock_timeout = self.sock.gettimeout() 

557 self.sock.settimeout(timeout) 

558 start_time = time.time() 

559 while timeout is None or time.time() - start_time < timeout: 

560 try: 

561 frame = self.recv_frame() 

562 if frame.opcode != ABNF.OPCODE_CLOSE: 

563 continue 

564 # Store the peer's close frame for access by higher-level APIs 

565 self.close_frame = frame 

566 if isEnabledForError(): 

567 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

568 if recv_status >= 3000 and recv_status <= 4999: 

569 debug(f"close status: {repr(recv_status)}") 

570 elif recv_status != STATUS_NORMAL: 

571 error(f"close status: {repr(recv_status)}") 

572 break 

573 except ( 

574 WebSocketConnectionClosedException, 

575 WebSocketTimeoutException, 

576 struct.error, 

577 ): 

578 break 

579 if self.sock is not None: 

580 self.sock.settimeout(sock_timeout) 

581 self.sock.shutdown(socket.SHUT_RDWR) 

582 except: 

583 pass 

584 

585 self.shutdown() 

586 

587 def abort(self): 

588 """ 

589 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

590 """ 

591 if self.connected and self.sock is not None: 

592 self.sock.shutdown(socket.SHUT_RDWR) 

593 

594 def shutdown(self): 

595 """ 

596 close socket, immediately. 

597 """ 

598 if self.sock: 

599 try: 

600 # Check if socket is still open before closing 

601 if not self.sock._closed: 

602 self.sock.close() 

603 except (OSError, AttributeError): 

604 # Socket already closed or invalid file descriptor - this can happen 

605 # during reconnection scenarios when network failures occur 

606 debug("Socket already closed during shutdown") 

607 pass 

608 finally: 

609 self.sock = None 

610 self.connected = False 

611 

612 def _send(self, data: Union[str, bytes]) -> int: 

613 if self.sock is None: 

614 raise WebSocketConnectionClosedException("socket is already closed.") 

615 if self.dispatcher: 

616 return self.dispatcher.send(self.sock, data) 

617 return send(self.sock, data) 

618 

619 def _recv(self, bufsize): 

620 if self.sock is None: 

621 raise WebSocketConnectionClosedException("Connection is closed") 

622 try: 

623 return recv(self.sock, bufsize) 

624 except WebSocketConnectionClosedException: 

625 if self.sock: 

626 self.sock.close() 

627 self.sock = None 

628 self.connected = False 

629 raise 

630 

631 

632def create_connection( 

633 url: str, 

634 timeout: Optional[Union[float, int]] = None, 

635 class_: Type[WebSocket] = WebSocket, 

636 **options: Any, 

637) -> WebSocket: 

638 """ 

639 Connect to url and return websocket object. 

640 

641 Connect to url and return the WebSocket object. 

642 Passing optional timeout parameter will set the timeout on the socket. 

643 If no timeout is supplied, 

644 the global default timeout setting returned by getdefaulttimeout() is used. 

645 You can customize using 'options'. 

646 If you set "header" list object, you can set your own custom header. 

647 

648 >>> conn = create_connection("ws://echo.websocket.events", 

649 ... header=["User-Agent: MyProgram", 

650 ... "x-custom: header"]) 

651 

652 Parameters 

653 ---------- 

654 class_: class 

655 class to instantiate when creating the connection. It has to implement 

656 settimeout and connect. It's __init__ should be compatible with 

657 WebSocket.__init__, i.e. accept all of it's kwargs. 

658 header: list or dict 

659 custom http header list or dict. 

660 cookie: str 

661 Cookie value. 

662 origin: str 

663 custom origin url. 

664 suppress_origin: bool 

665 suppress outputting origin header. 

666 host: str 

667 custom host header string. 

668 timeout: int or float 

669 socket timeout time. This value could be either float/integer. 

670 If set to None, it uses the default_timeout value. 

671 http_proxy_host: str 

672 HTTP proxy host name. 

673 http_proxy_port: str or int 

674 HTTP proxy port. If not set, set to 80. 

675 http_no_proxy: list 

676 Whitelisted host names that don't use the proxy. 

677 http_proxy_auth: tuple 

678 HTTP proxy auth information. tuple of username and password. Default is None. 

679 http_proxy_timeout: int or float 

680 HTTP proxy timeout, default is 60 sec as per python-socks. 

681 enable_multithread: bool 

682 Enable lock for multithread. 

683 redirect_limit: int 

684 Number of redirects to follow. 

685 sockopt: tuple 

686 Values for socket.setsockopt. 

687 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

688 sslopt: dict 

689 Optional dict object for ssl socket options. See FAQ for details. 

690 subprotocols: list 

691 List of available subprotocols. Default is None. 

692 skip_utf8_validation: bool 

693 Skip utf8 validation. 

694 socket: socket 

695 Pre-initialized stream socket. 

696 """ 

697 sockopt = options.pop("sockopt", []) 

698 sslopt = options.pop("sslopt", {}) 

699 fire_cont_frame = options.pop("fire_cont_frame", False) 

700 enable_multithread = options.pop("enable_multithread", True) 

701 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

702 websock = class_( 

703 sockopt=sockopt, 

704 sslopt=sslopt, 

705 fire_cont_frame=fire_cont_frame, 

706 enable_multithread=enable_multithread, 

707 skip_utf8_validation=skip_utf8_validation, 

708 **options, 

709 ) 

710 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

711 websock.connect(url, **options) 

712 return websock