Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

246 statements  

1import socket 

2import struct 

3import threading 

4import time 

5from typing import Any, Callable, Optional, Type, Union 

6 

7# websocket modules 

8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer 

9from ._exceptions import ( 

10 WebSocketProtocolException, 

11 WebSocketConnectionClosedException, 

12 WebSocketTimeoutException, 

13 WebSocketException, 

14) 

15from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake, handshake_response 

16from ._http import connect, proxy_info 

17from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace 

18from ._socket import getdefaulttimeout, recv, send, sock_opt 

19from ._ssl_compat import ssl 

20from ._utils import NoLock 

21from ._dispatcher import DispatcherBase, WrappedDispatcher 

22 

23""" 

24_core.py 

25websocket - WebSocket client library for Python 

26 

27Copyright 2025 engn33r 

28 

29Licensed under the Apache License, Version 2.0 (the "License"); 

30you may not use this file except in compliance with the License. 

31You may obtain a copy of the License at 

32 

33 http://www.apache.org/licenses/LICENSE-2.0 

34 

35Unless required by applicable law or agreed to in writing, software 

36distributed under the License is distributed on an "AS IS" BASIS, 

37WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

38See the License for the specific language governing permissions and 

39limitations under the License. 

40""" 

41 

42__all__ = ["WebSocket", "create_connection"] 

43 

44 

45class WebSocket: 

46 """ 

47 Low level WebSocket interface. 

48 

49 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

50 

51 We can connect to the websocket server and send/receive data. 

52 The following example is an echo client. 

53 

54 >>> import websocket 

55 >>> ws = websocket.WebSocket() 

56 >>> ws.connect("ws://echo.websocket.events") 

57 >>> ws.recv() 

58 'echo.websocket.events sponsored by Lob.com' 

59 >>> ws.send("Hello, Server") 

60 19 

61 >>> ws.recv() 

62 'Hello, Server' 

63 >>> ws.close() 

64 

65 Parameters 

66 ---------- 

67 get_mask_key: func 

68 A callable function to get new mask keys, see the 

69 WebSocket.set_mask_key's docstring for more information. 

70 sockopt: tuple 

71 Values for socket.setsockopt. 

72 sockopt must be tuple and each element is argument of sock.setsockopt. 

73 sslopt: dict 

74 Optional dict object for ssl socket options. See FAQ for details. 

75 fire_cont_frame: bool 

76 Fire recv event for each cont frame. Default is False. 

77 enable_multithread: bool 

78 If set to True, lock send method. 

79 skip_utf8_validation: bool 

80 Skip utf8 validation. 

81 """ 

82 

83 def __init__( 

84 self, 

85 get_mask_key: Optional[Callable] = None, 

86 sockopt: Optional[list] = None, 

87 sslopt: Optional[dict] = None, 

88 fire_cont_frame: bool = False, 

89 enable_multithread: bool = True, 

90 skip_utf8_validation: bool = False, 

91 dispatcher: Optional[Union[DispatcherBase, WrappedDispatcher]] = None, 

92 **_: Any, 

93 ) -> None: 

94 """ 

95 Initialize WebSocket object. 

96 

97 Parameters 

98 ---------- 

99 sslopt: dict 

100 Optional dict object for ssl socket options. See FAQ for details. 

101 """ 

102 self.sock_opt = sock_opt(sockopt, sslopt) 

103 self.handshake_response: Optional[handshake_response] = None 

104 self.sock: Optional[socket.socket] = None 

105 

106 self.connected = False 

107 self.get_mask_key = get_mask_key 

108 # These buffer over the build-up of a single frame. 

109 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

110 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation) 

111 self.dispatcher = dispatcher 

112 

113 if enable_multithread: 

114 self.lock = threading.Lock() 

115 self.readlock = threading.Lock() 

116 else: 

117 self.lock = NoLock() # type: ignore[assignment] 

118 self.readlock = NoLock() # type: ignore[assignment] 

119 

120 def __iter__(self): 

121 """ 

122 Allow iteration over websocket, implying sequential `recv` executions. 

123 """ 

124 while True: 

125 yield self.recv() 

126 

127 def __next__(self): 

128 return self.recv() 

129 

130 def next(self): 

131 return self.__next__() 

132 

133 def fileno(self): 

134 if self.sock is None: 

135 raise WebSocketException("Connection not established") 

136 return self.sock.fileno() 

137 

138 def set_mask_key(self, func): 

139 """ 

140 Set function to create mask key. You can customize mask key generator. 

141 Mainly, this is for testing purpose. 

142 

143 Parameters 

144 ---------- 

145 func: func 

146 callable object. the func takes 1 argument as integer. 

147 The argument means length of mask key. 

148 This func must return string(byte array), 

149 which length is argument specified. 

150 """ 

151 self.get_mask_key = func 

152 

153 def gettimeout(self) -> Optional[Union[float, int]]: 

154 """ 

155 Get the websocket timeout (in seconds) as an int or float 

156 

157 Returns 

158 ---------- 

159 timeout: int or float 

160 returns timeout value (in seconds). This value could be either float/integer. 

161 """ 

162 return self.sock_opt.timeout 

163 

164 def settimeout(self, timeout: Optional[Union[float, int]]) -> None: 

165 """ 

166 Set the timeout to the websocket. 

167 

168 Parameters 

169 ---------- 

170 timeout: int or float 

171 timeout time (in seconds). This value could be either float/integer. 

172 """ 

173 self.sock_opt.timeout = timeout 

174 if self.sock: 

175 self.sock.settimeout(timeout) 

176 

177 timeout = property(gettimeout, settimeout) 

178 

179 def getsubprotocol(self) -> Optional[str]: 

180 """ 

181 Get subprotocol 

182 """ 

183 if self.handshake_response: 

184 return self.handshake_response.subprotocol 

185 else: 

186 return None 

187 

188 subprotocol = property(getsubprotocol) 

189 

190 def getstatus(self) -> Optional[int]: 

191 """ 

192 Get handshake status 

193 """ 

194 if self.handshake_response: 

195 return self.handshake_response.status 

196 else: 

197 return None 

198 

199 status = property(getstatus) 

200 

201 def getheaders(self) -> Optional[dict]: 

202 """ 

203 Get handshake response header 

204 """ 

205 if self.handshake_response: 

206 return self.handshake_response.headers 

207 else: 

208 return None 

209 

210 def is_ssl(self): 

211 try: 

212 return isinstance(self.sock, ssl.SSLSocket) 

213 except (AttributeError, NameError): 

214 return False 

215 

216 headers = property(getheaders) 

217 

218 def connect(self, url, **options): 

219 """ 

220 Connect to url. url is websocket url scheme. 

221 ie. ws://host:port/resource 

222 You can customize using 'options'. 

223 If you set "header" list object, you can set your own custom header. 

224 

225 >>> ws = WebSocket() 

226 >>> ws.connect("ws://echo.websocket.events", 

227 ... header=["User-Agent: MyProgram", 

228 ... "x-custom: header"]) 

229 

230 Parameters 

231 ---------- 

232 header: list or dict 

233 Custom http header list or dict. 

234 cookie: str 

235 Cookie value. 

236 origin: str 

237 Custom origin url. 

238 connection: str 

239 Custom connection header value. 

240 Default value "Upgrade" set in _handshake.py 

241 suppress_origin: bool 

242 Suppress outputting origin header. 

243 suppress_host: bool 

244 Suppress outputting host header. 

245 host: str 

246 Custom host header string. 

247 timeout: int or float 

248 Socket timeout time. This value is an integer or float. 

249 If you set None for this value, it means "use default_timeout value" 

250 http_proxy_host: str 

251 HTTP proxy host name. 

252 http_proxy_port: str or int 

253 HTTP proxy port. Default is 80. 

254 http_no_proxy: list 

255 Whitelisted host names that don't use the proxy. 

256 http_proxy_auth: tuple 

257 HTTP proxy auth information. Tuple of username and password. Default is None. 

258 http_proxy_timeout: int or float 

259 HTTP proxy timeout, default is 60 sec as per python-socks. 

260 redirect_limit: int 

261 Number of redirects to follow. 

262 subprotocols: list 

263 List of available subprotocols. Default is None. 

264 socket: socket 

265 Pre-initialized stream socket. 

266 """ 

267 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout) 

268 self.sock, addrs = connect( 

269 url, self.sock_opt, proxy_info(**options), options.pop("socket", None) 

270 ) 

271 

272 try: 

273 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

274 for _ in range(options.pop("redirect_limit", 3)): 

275 if ( 

276 self.handshake_response is not None 

277 and self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES 

278 ): 

279 url = self.handshake_response.headers["location"] 

280 self.sock.close() 

281 self.sock, addrs = connect( 

282 url, 

283 self.sock_opt, 

284 proxy_info(**options), 

285 options.pop("socket", None), 

286 ) 

287 self.handshake_response = handshake( 

288 self.sock, url, *addrs, **options 

289 ) 

290 self.connected = True 

291 except: 

292 if self.sock: 

293 self.sock.close() 

294 self.sock = None 

295 raise 

296 

297 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int: 

298 """ 

299 Send the data as string. 

300 

301 Parameters 

302 ---------- 

303 payload: str 

304 Payload must be utf-8 string or unicode, 

305 If the opcode is OPCODE_TEXT. 

306 Otherwise, it must be string(byte array). 

307 opcode: int 

308 Operation code (opcode) to send. 

309 """ 

310 

311 frame = ABNF.create_frame(payload, opcode) 

312 return self.send_frame(frame) 

313 

314 def send_text(self, text_data: str) -> int: 

315 """ 

316 Sends UTF-8 encoded text. 

317 """ 

318 return self.send(text_data, ABNF.OPCODE_TEXT) 

319 

320 def send_bytes(self, data: Union[bytes, bytearray]) -> int: 

321 """ 

322 Sends a sequence of bytes. 

323 """ 

324 return self.send(data, ABNF.OPCODE_BINARY) 

325 

326 def send_frame(self, frame: ABNF) -> int: 

327 """ 

328 Send the data frame. 

329 

330 >>> ws = create_connection("ws://echo.websocket.events") 

331 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

332 >>> ws.send_frame(frame) 

333 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

334 >>> ws.send_frame(frame) 

335 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

336 >>> ws.send_frame(frame) 

337 

338 Parameters 

339 ---------- 

340 frame: ABNF frame 

341 frame data created by ABNF.create_frame 

342 """ 

343 if self.get_mask_key: 

344 frame.get_mask_key = self.get_mask_key 

345 data = frame.format() 

346 length = len(data) 

347 if isEnabledForTrace(): 

348 trace(f"++Sent raw: {repr(data)}") 

349 trace(f"++Sent decoded: {frame.__str__()}") 

350 with self.lock: 

351 while data: 

352 bytes_sent = self._send(data) 

353 data = data[bytes_sent:] 

354 

355 return length 

356 

357 def send_binary(self, payload: bytes) -> int: 

358 """ 

359 Send a binary message (OPCODE_BINARY). 

360 

361 Parameters 

362 ---------- 

363 payload: bytes 

364 payload of message to send. 

365 """ 

366 return self.send(payload, ABNF.OPCODE_BINARY) 

367 

368 def ping(self, payload: Union[str, bytes] = "") -> None: 

369 """ 

370 Send ping data. 

371 

372 Parameters 

373 ---------- 

374 payload: str 

375 data payload to send server. 

376 """ 

377 if isinstance(payload, str): 

378 payload = payload.encode("utf-8") 

379 self.send(payload, ABNF.OPCODE_PING) 

380 

381 def pong(self, payload: Union[str, bytes] = "") -> None: 

382 """ 

383 Send pong data. 

384 

385 Parameters 

386 ---------- 

387 payload: str 

388 data payload to send server. 

389 """ 

390 if isinstance(payload, str): 

391 payload = payload.encode("utf-8") 

392 self.send(payload, ABNF.OPCODE_PONG) 

393 

394 def recv(self) -> Union[str, bytes]: 

395 """ 

396 Receive string data(byte array) from the server. 

397 

398 Returns 

399 ---------- 

400 data: string (byte array) value. 

401 """ 

402 with self.readlock: 

403 opcode, data = self.recv_data() 

404 if opcode == ABNF.OPCODE_TEXT: 

405 data_received: Union[bytes, str] = data 

406 if isinstance(data_received, bytes): 

407 return data_received.decode("utf-8") 

408 elif isinstance(data_received, str): 

409 return data_received 

410 elif opcode == ABNF.OPCODE_BINARY: 

411 data_binary: bytes = data 

412 return data_binary 

413 else: 

414 return "" 

415 

416 def recv_data(self, control_frame: bool = False) -> tuple: 

417 """ 

418 Receive data with operation code. 

419 

420 Parameters 

421 ---------- 

422 control_frame: bool 

423 a boolean flag indicating whether to return control frame 

424 data, defaults to False 

425 

426 Returns 

427 ------- 

428 opcode, frame.data: tuple 

429 tuple of operation code and string(byte array) value. 

430 """ 

431 opcode, frame = self.recv_data_frame(control_frame) 

432 return opcode, frame.data 

433 

434 def recv_data_frame(self, control_frame: bool = False) -> tuple: 

435 """ 

436 Receive data with operation code. 

437 

438 If a valid ping message is received, a pong response is sent. 

439 

440 Parameters 

441 ---------- 

442 control_frame: bool 

443 a boolean flag indicating whether to return control frame 

444 data, defaults to False 

445 

446 Returns 

447 ------- 

448 frame.opcode, frame: tuple 

449 tuple of operation code and string(byte array) value. 

450 """ 

451 while True: 

452 frame = self.recv_frame() 

453 if isEnabledForTrace(): 

454 trace(f"++Rcv raw: {repr(frame.format())}") 

455 trace(f"++Rcv decoded: {frame.__str__()}") 

456 if not frame: 

457 # handle error: 

458 # 'NoneType' object has no attribute 'opcode' 

459 raise WebSocketProtocolException(f"Not a valid frame {frame}") 

460 elif frame.opcode in ( 

461 ABNF.OPCODE_TEXT, 

462 ABNF.OPCODE_BINARY, 

463 ABNF.OPCODE_CONT, 

464 ): 

465 self.cont_frame.validate(frame) 

466 self.cont_frame.add(frame) 

467 

468 if self.cont_frame.is_fire(frame): 

469 return self.cont_frame.extract(frame) 

470 

471 elif frame.opcode == ABNF.OPCODE_CLOSE: 

472 self.send_close() 

473 return frame.opcode, frame 

474 elif frame.opcode == ABNF.OPCODE_PING: 

475 if len(frame.data) < 126: 

476 self.pong(frame.data) 

477 else: 

478 raise WebSocketProtocolException("Ping message is too long") 

479 if control_frame: 

480 return frame.opcode, frame 

481 elif frame.opcode == ABNF.OPCODE_PONG: 

482 if control_frame: 

483 return frame.opcode, frame 

484 

485 def recv_frame(self): 

486 """ 

487 Receive data as frame from server. 

488 

489 Returns 

490 ------- 

491 self.frame_buffer.recv_frame(): ABNF frame object 

492 """ 

493 return self.frame_buffer.recv_frame() 

494 

495 def send_close( 

496 self, status: int = STATUS_NORMAL, reason: Union[str, bytes] = b"" 

497 ) -> None: 

498 """ 

499 Send close data to the server. 

500 

501 Parameters 

502 ---------- 

503 status: int 

504 Status code to send. See STATUS_XXX. 

505 reason: str or bytes 

506 The reason to close. This must be string or UTF-8 bytes. 

507 """ 

508 if status < 0 or status >= ABNF.LENGTH_16: 

509 raise ValueError("code is invalid range") 

510 

511 if reason is None: 

512 reason_bytes = b"" 

513 elif isinstance(reason, str): 

514 reason_bytes = reason.encode("utf-8") 

515 elif isinstance(reason, bytes): 

516 reason_bytes = reason 

517 else: 

518 reason_bytes = bytes(reason) 

519 

520 self.connected = False 

521 self.send(struct.pack("!H", status) + reason_bytes, ABNF.OPCODE_CLOSE) 

522 

523 def close( 

524 self, 

525 status: int = STATUS_NORMAL, 

526 reason: Union[str, bytes] = b"", 

527 timeout: int = 3, 

528 ) -> None: 

529 """ 

530 Close Websocket object 

531 

532 Parameters 

533 ---------- 

534 status: int 

535 Status code to send. See VALID_CLOSE_STATUS in ABNF. 

536 reason: bytes 

537 The reason to close in UTF-8. 

538 timeout: int or float 

539 Timeout until receive a close frame. 

540 If None, it will wait forever until receive a close frame. 

541 """ 

542 if not self.connected: 

543 return 

544 if status < 0 or status >= ABNF.LENGTH_16: 

545 raise ValueError("code is invalid range") 

546 

547 try: 

548 self.connected = False 

549 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

550 if self.sock is None: 

551 return 

552 sock_timeout = self.sock.gettimeout() 

553 self.sock.settimeout(timeout) 

554 start_time = time.time() 

555 while timeout is None or time.time() - start_time < timeout: 

556 try: 

557 frame = self.recv_frame() 

558 if frame.opcode != ABNF.OPCODE_CLOSE: 

559 continue 

560 if isEnabledForError(): 

561 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

562 if recv_status >= 3000 and recv_status <= 4999: 

563 debug(f"close status: {repr(recv_status)}") 

564 elif recv_status != STATUS_NORMAL: 

565 error(f"close status: {repr(recv_status)}") 

566 break 

567 except ( 

568 WebSocketConnectionClosedException, 

569 WebSocketTimeoutException, 

570 struct.error, 

571 ): 

572 break 

573 if self.sock is not None: 

574 self.sock.settimeout(sock_timeout) 

575 self.sock.shutdown(socket.SHUT_RDWR) 

576 except: 

577 pass 

578 

579 self.shutdown() 

580 

581 def abort(self): 

582 """ 

583 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

584 """ 

585 if self.connected and self.sock is not None: 

586 self.sock.shutdown(socket.SHUT_RDWR) 

587 

588 def shutdown(self): 

589 """ 

590 close socket, immediately. 

591 """ 

592 if self.sock: 

593 try: 

594 # Check if socket is still open before closing 

595 if not self.sock._closed: 

596 self.sock.close() 

597 except (OSError, AttributeError): 

598 # Socket already closed or invalid file descriptor - this can happen 

599 # during reconnection scenarios when network failures occur 

600 debug("Socket already closed during shutdown") 

601 pass 

602 finally: 

603 self.sock = None 

604 self.connected = False 

605 

606 def _send(self, data: Union[str, bytes]) -> int: 

607 if self.sock is None: 

608 raise WebSocketConnectionClosedException("socket is already closed.") 

609 if self.dispatcher: 

610 return self.dispatcher.send(self.sock, data) 

611 return send(self.sock, data) 

612 

613 def _recv(self, bufsize): 

614 if self.sock is None: 

615 raise WebSocketConnectionClosedException("Connection is closed") 

616 try: 

617 return recv(self.sock, bufsize) 

618 except WebSocketConnectionClosedException: 

619 if self.sock: 

620 self.sock.close() 

621 self.sock = None 

622 self.connected = False 

623 raise 

624 

625 

626def create_connection( 

627 url: str, 

628 timeout: Optional[Union[float, int]] = None, 

629 class_: Type[WebSocket] = WebSocket, 

630 **options: Any, 

631) -> WebSocket: 

632 """ 

633 Connect to url and return websocket object. 

634 

635 Connect to url and return the WebSocket object. 

636 Passing optional timeout parameter will set the timeout on the socket. 

637 If no timeout is supplied, 

638 the global default timeout setting returned by getdefaulttimeout() is used. 

639 You can customize using 'options'. 

640 If you set "header" list object, you can set your own custom header. 

641 

642 >>> conn = create_connection("ws://echo.websocket.events", 

643 ... header=["User-Agent: MyProgram", 

644 ... "x-custom: header"]) 

645 

646 Parameters 

647 ---------- 

648 class_: class 

649 class to instantiate when creating the connection. It has to implement 

650 settimeout and connect. It's __init__ should be compatible with 

651 WebSocket.__init__, i.e. accept all of it's kwargs. 

652 header: list or dict 

653 custom http header list or dict. 

654 cookie: str 

655 Cookie value. 

656 origin: str 

657 custom origin url. 

658 suppress_origin: bool 

659 suppress outputting origin header. 

660 host: str 

661 custom host header string. 

662 timeout: int or float 

663 socket timeout time. This value could be either float/integer. 

664 If set to None, it uses the default_timeout value. 

665 http_proxy_host: str 

666 HTTP proxy host name. 

667 http_proxy_port: str or int 

668 HTTP proxy port. If not set, set to 80. 

669 http_no_proxy: list 

670 Whitelisted host names that don't use the proxy. 

671 http_proxy_auth: tuple 

672 HTTP proxy auth information. tuple of username and password. Default is None. 

673 http_proxy_timeout: int or float 

674 HTTP proxy timeout, default is 60 sec as per python-socks. 

675 enable_multithread: bool 

676 Enable lock for multithread. 

677 redirect_limit: int 

678 Number of redirects to follow. 

679 sockopt: tuple 

680 Values for socket.setsockopt. 

681 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

682 sslopt: dict 

683 Optional dict object for ssl socket options. See FAQ for details. 

684 subprotocols: list 

685 List of available subprotocols. Default is None. 

686 skip_utf8_validation: bool 

687 Skip utf8 validation. 

688 socket: socket 

689 Pre-initialized stream socket. 

690 """ 

691 sockopt = options.pop("sockopt", []) 

692 sslopt = options.pop("sslopt", {}) 

693 fire_cont_frame = options.pop("fire_cont_frame", False) 

694 enable_multithread = options.pop("enable_multithread", True) 

695 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

696 websock = class_( 

697 sockopt=sockopt, 

698 sslopt=sslopt, 

699 fire_cont_frame=fire_cont_frame, 

700 enable_multithread=enable_multithread, 

701 skip_utf8_validation=skip_utf8_validation, 

702 **options, 

703 ) 

704 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

705 websock.connect(url, **options) 

706 return websock