Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

236 statements  

1import inspect 

2import socket 

3import threading 

4import time 

5from typing import Any, Callable, Optional, Union 

6 

7from . import _logging 

8from ._abnf import ABNF 

9from ._core import WebSocket, getdefaulttimeout 

10from ._exceptions import ( 

11 WebSocketConnectionClosedException, 

12 WebSocketException, 

13 WebSocketTimeoutException, 

14) 

15from ._ssl_compat import SSLEOFError 

16from ._url import parse_url 

17from ._dispatcher import Dispatcher, DispatcherBase, SSLDispatcher, WrappedDispatcher 

18 

19""" 

20_app.py 

21websocket - WebSocket client library for Python 

22 

23Copyright 2025 engn33r 

24 

25Licensed under the Apache License, Version 2.0 (the "License"); 

26you may not use this file except in compliance with the License. 

27You may obtain a copy of the License at 

28 

29 http://www.apache.org/licenses/LICENSE-2.0 

30 

31Unless required by applicable law or agreed to in writing, software 

32distributed under the License is distributed on an "AS IS" BASIS, 

33WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

34See the License for the specific language governing permissions and 

35limitations under the License. 

36""" 

37 

38__all__ = ["WebSocketApp"] 

39 

40RECONNECT = 0 

41 

42 

43def set_reconnect(reconnectInterval: int) -> None: 

44 global RECONNECT 

45 RECONNECT = reconnectInterval 

46 

47 

48class WebSocketApp: 

49 """ 

50 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

51 """ 

52 

53 def __init__( 

54 self, 

55 url: str, 

56 header: Optional[ 

57 Union[ 

58 list[str], 

59 dict[str, str], 

60 Callable[[], Union[list[str], dict[str, str]]], 

61 ] 

62 ] = None, 

63 on_open: Optional[Callable[["WebSocketApp"], None]] = None, 

64 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None, 

65 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None, 

66 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None, 

67 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None, 

68 on_ping: Optional[Callable] = None, 

69 on_pong: Optional[Callable] = None, 

70 on_cont_message: Optional[Callable] = None, 

71 keep_running: bool = True, 

72 get_mask_key: Optional[Callable] = None, 

73 cookie: Optional[str] = None, 

74 subprotocols: Optional[list[str]] = None, 

75 on_data: Optional[Callable] = None, 

76 socket: Optional[socket.socket] = None, 

77 ) -> None: 

78 """ 

79 WebSocketApp initialization 

80 

81 Parameters 

82 ---------- 

83 url: str 

84 Websocket url. 

85 header: list or dict or Callable 

86 Custom header for websocket handshake. 

87 If the parameter is a callable object, it is called just before the connection attempt. 

88 The returned dict or list is used as custom header value. 

89 This could be useful in order to properly setup timestamp dependent headers. 

90 on_open: function 

91 Callback object which is called at opening websocket. 

92 on_open has one argument. 

93 The 1st argument is this class object. 

94 on_reconnect: function 

95 Callback object which is called at reconnecting websocket. 

96 on_reconnect has one argument. 

97 The 1st argument is this class object. 

98 on_message: function 

99 Callback object which is called when received data. 

100 on_message has 2 arguments. 

101 The 1st argument is this class object. 

102 The 2nd argument is utf-8 data received from the server. 

103 on_error: function 

104 Callback object which is called when we get error. 

105 on_error has 2 arguments. 

106 The 1st argument is this class object. 

107 The 2nd argument is exception object. 

108 on_close: function 

109 Callback object which is called when connection is closed. 

110 on_close has 3 arguments. 

111 The 1st argument is this class object. 

112 The 2nd argument is close_status_code. 

113 The 3rd argument is close_msg. 

114 on_cont_message: function 

115 Callback object which is called when a continuation 

116 frame is received. 

117 on_cont_message has 3 arguments. 

118 The 1st argument is this class object. 

119 The 2nd argument is utf-8 string which we get from the server. 

120 The 3rd argument is continue flag. if 0, the data continue 

121 to next frame data 

122 on_data: function 

123 Callback object which is called when a message received. 

124 This is called before on_message or on_cont_message, 

125 and then on_message or on_cont_message is called. 

126 on_data has 4 argument. 

127 The 1st argument is this class object. 

128 The 2nd argument is utf-8 string which we get from the server. 

129 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

130 The 4th argument is continue flag. If 0, the data continue 

131 keep_running: bool 

132 This parameter is obsolete and ignored. 

133 get_mask_key: function 

134 A callable function to get new mask keys, see the 

135 WebSocket.set_mask_key's docstring for more information. 

136 cookie: str 

137 Cookie value. 

138 subprotocols: list 

139 List of available sub protocols. Default is None. 

140 socket: socket 

141 Pre-initialized stream socket. 

142 """ 

143 self.url = url 

144 self.header = header if header is not None else [] 

145 self.cookie = cookie 

146 

147 self.on_open = on_open 

148 self.on_reconnect = on_reconnect 

149 self.on_message = on_message 

150 self.on_data = on_data 

151 self.on_error = on_error 

152 self.on_close = on_close 

153 self.on_ping = on_ping 

154 self.on_pong = on_pong 

155 self.on_cont_message = on_cont_message 

156 self.keep_running = False 

157 self.get_mask_key = get_mask_key 

158 self.sock: Optional[WebSocket] = None 

159 self.last_ping_tm = float(0) 

160 self.last_pong_tm = float(0) 

161 self.ping_thread: Optional[threading.Thread] = None 

162 self.stop_ping: Optional[threading.Event] = None 

163 self.ping_interval = float(0) 

164 self.ping_timeout: Optional[Union[float, int]] = None 

165 self.ping_payload = "" 

166 self.subprotocols = subprotocols 

167 self.prepared_socket = socket 

168 self.has_errored = False 

169 self.has_done_teardown = False 

170 self.has_done_teardown_lock = threading.Lock() 

171 

172 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: 

173 """ 

174 send message 

175 

176 Parameters 

177 ---------- 

178 data: str 

179 Message to send. If you set opcode to OPCODE_TEXT, 

180 data must be utf-8 string or unicode. 

181 opcode: int 

182 Operation code of data. Default is OPCODE_TEXT. 

183 """ 

184 

185 if not self.sock or self.sock.send(data, opcode) == 0: 

186 raise WebSocketConnectionClosedException("Connection is already closed.") 

187 

188 def send_text(self, text_data: str) -> None: 

189 """ 

190 Sends UTF-8 encoded text. 

191 """ 

192 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: 

193 raise WebSocketConnectionClosedException("Connection is already closed.") 

194 

195 def send_bytes(self, data: Union[bytes, bytearray]) -> None: 

196 """ 

197 Sends a sequence of bytes. 

198 """ 

199 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: 

200 raise WebSocketConnectionClosedException("Connection is already closed.") 

201 

202 def close(self, **kwargs) -> None: 

203 """ 

204 Close websocket connection. 

205 """ 

206 self.keep_running = False 

207 if self.sock: 

208 self.sock.close(**kwargs) 

209 self.sock = None 

210 

211 def _start_ping_thread(self) -> None: 

212 self.last_ping_tm = self.last_pong_tm = float(0) 

213 self.stop_ping = threading.Event() 

214 self.ping_thread = threading.Thread(target=self._send_ping) 

215 self.ping_thread.daemon = True 

216 self.ping_thread.start() 

217 

218 def _stop_ping_thread(self) -> None: 

219 if self.stop_ping: 

220 self.stop_ping.set() 

221 if self.ping_thread and self.ping_thread.is_alive(): 

222 self.ping_thread.join(3) 

223 # Handle thread leak - if thread doesn't terminate within timeout, 

224 # force cleanup and log warning instead of abandoning the thread 

225 if self.ping_thread.is_alive(): 

226 _logging.warning( 

227 "Ping thread failed to terminate within 3 seconds, " 

228 "forcing cleanup. Thread may be blocked." 

229 ) 

230 # Force cleanup by clearing references even if thread is still alive 

231 # The daemon thread will eventually be cleaned up by Python's GC 

232 # but we prevent resource leaks by not holding references 

233 

234 # Always clean up references regardless of thread state 

235 self.ping_thread = None 

236 self.stop_ping = None 

237 self.last_ping_tm = self.last_pong_tm = float(0) 

238 

239 def _send_ping(self) -> None: 

240 if self.stop_ping is None: 

241 return 

242 if self.stop_ping.wait(self.ping_interval) or self.keep_running is False: 

243 return 

244 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: 

245 if self.sock: 

246 self.last_ping_tm = time.time() 

247 try: 

248 _logging.debug("Sending ping") 

249 self.sock.ping(self.ping_payload) 

250 except Exception as e: 

251 _logging.debug(f"Failed to send ping: {e}") 

252 

253 def ready(self): 

254 return self.sock and self.sock.connected 

255 

256 def run_forever( 

257 self, 

258 sockopt: tuple = None, 

259 sslopt: dict = None, 

260 ping_interval: Union[float, int] = 0, 

261 ping_timeout: Optional[Union[float, int]] = None, 

262 ping_payload: str = "", 

263 http_proxy_host: str = None, 

264 http_proxy_port: Union[int, str] = None, 

265 http_no_proxy: list = None, 

266 http_proxy_auth: tuple = None, 

267 http_proxy_timeout: Optional[float] = None, 

268 skip_utf8_validation: bool = False, 

269 host: str = None, 

270 origin: str = None, 

271 dispatcher=None, 

272 suppress_origin: bool = False, 

273 proxy_type: str = None, 

274 reconnect: int = None, 

275 ) -> bool: 

276 """ 

277 Run event loop for WebSocket framework. 

278 

279 This loop is an infinite loop and is alive while websocket is available. 

280 

281 Parameters 

282 ---------- 

283 sockopt: tuple 

284 Values for socket.setsockopt. 

285 sockopt must be tuple 

286 and each element is argument of sock.setsockopt. 

287 sslopt: dict 

288 Optional dict object for ssl socket option. 

289 ping_interval: int or float 

290 Automatically send "ping" command 

291 every specified period (in seconds). 

292 If set to 0, no ping is sent periodically. 

293 ping_timeout: int or float 

294 Timeout (in seconds) if the pong message is not received. 

295 ping_payload: str 

296 Payload message to send with each ping. 

297 http_proxy_host: str 

298 HTTP proxy host name. 

299 http_proxy_port: int or str 

300 HTTP proxy port. If not set, set to 80. 

301 http_no_proxy: list 

302 Whitelisted host names that don't use the proxy. 

303 http_proxy_timeout: int or float 

304 HTTP proxy timeout, default is 60 sec as per python-socks. 

305 http_proxy_auth: tuple 

306 HTTP proxy auth information. tuple of username and password. Default is None. 

307 skip_utf8_validation: bool 

308 skip utf8 validation. 

309 host: str 

310 update host header. 

311 origin: str 

312 update origin header. 

313 dispatcher: Dispatcher object 

314 customize reading data from socket. 

315 suppress_origin: bool 

316 suppress outputting origin header. 

317 proxy_type: str 

318 type of proxy from: http, socks4, socks4a, socks5, socks5h 

319 reconnect: int 

320 delay interval when reconnecting 

321 

322 Returns 

323 ------- 

324 teardown: bool 

325 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

326 True if any other exception was raised during a loop. 

327 """ 

328 

329 if reconnect is None: 

330 reconnect = RECONNECT 

331 

332 if ping_timeout is not None and ping_timeout <= 0: 

333 raise WebSocketException("Ensure ping_timeout > 0") 

334 if ping_interval is not None and ping_interval < 0: 

335 raise WebSocketException("Ensure ping_interval >= 0") 

336 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

337 raise WebSocketException("Ensure ping_interval > ping_timeout") 

338 if not sockopt: 

339 sockopt = () 

340 if not sslopt: 

341 sslopt = {} 

342 if self.sock: 

343 raise WebSocketException("socket is already opened") 

344 

345 self.ping_interval = ping_interval 

346 self.ping_timeout = ping_timeout 

347 self.ping_payload = ping_payload 

348 self.has_done_teardown = False 

349 self.keep_running = True 

350 

351 def teardown(close_frame: ABNF = None): 

352 """ 

353 Tears down the connection. 

354 

355 Parameters 

356 ---------- 

357 close_frame: ABNF frame 

358 If close_frame is set, the on_close handler is invoked 

359 with the statusCode and reason from the provided frame. 

360 """ 

361 

362 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. 

363 # To ensure the work is only done once, we use this bool and lock. 

364 with self.has_done_teardown_lock: 

365 if self.has_done_teardown: 

366 return 

367 self.has_done_teardown = True 

368 

369 self._stop_ping_thread() 

370 self.keep_running = False 

371 

372 if self.sock: 

373 # in cases like handleDisconnect, the "on_error" callback is called first. If the WebSocketApp 

374 # is being used in a multithreaded application, we nee to make sure that "self.sock" is cleared 

375 # before calling close, otherwise logic built around the sock being set can cause issues - 

376 # specifically calling "run_forever" again, since is checks if "self.sock" is set. 

377 current_sock = self.sock 

378 self.sock = None 

379 current_sock.close() 

380 

381 close_status_code, close_reason = self._get_close_args( 

382 close_frame if close_frame else None 

383 ) 

384 # Finally call the callback AFTER all teardown is complete 

385 self._callback(self.on_close, close_status_code, close_reason) 

386 

387 def initialize_socket(reconnecting: bool = False) -> None: 

388 if reconnecting and self.sock: 

389 self.sock.shutdown() 

390 

391 self.sock = WebSocket( 

392 self.get_mask_key, 

393 sockopt=sockopt, 

394 sslopt=sslopt, 

395 fire_cont_frame=self.on_cont_message is not None, 

396 skip_utf8_validation=skip_utf8_validation, 

397 enable_multithread=True, 

398 dispatcher=dispatcher, 

399 ) 

400 

401 self.sock.settimeout(getdefaulttimeout()) 

402 try: 

403 header = self.header() if callable(self.header) else self.header 

404 

405 self.sock.connect( 

406 self.url, 

407 header=header, 

408 cookie=self.cookie, 

409 http_proxy_host=http_proxy_host, 

410 http_proxy_port=http_proxy_port, 

411 http_no_proxy=http_no_proxy, 

412 http_proxy_auth=http_proxy_auth, 

413 http_proxy_timeout=http_proxy_timeout, 

414 subprotocols=self.subprotocols, 

415 host=host, 

416 origin=origin, 

417 suppress_origin=suppress_origin, 

418 proxy_type=proxy_type, 

419 socket=self.prepared_socket, 

420 ) 

421 

422 _logging.info("Websocket connected") 

423 

424 if self.ping_interval: 

425 self._start_ping_thread() 

426 

427 if reconnecting and self.on_reconnect: 

428 self._callback(self.on_reconnect) 

429 else: 

430 self._callback(self.on_open) 

431 

432 dispatcher.read(self.sock.sock, read, check) 

433 except ( 

434 WebSocketConnectionClosedException, 

435 ConnectionRefusedError, 

436 KeyboardInterrupt, 

437 SystemExit, 

438 Exception, 

439 ) as e: 

440 handleDisconnect(e, reconnecting) 

441 

442 def read() -> bool: 

443 if not self.keep_running: 

444 teardown() 

445 return False 

446 

447 if self.sock is None: 

448 return False 

449 

450 try: 

451 op_code, frame = self.sock.recv_data_frame(True) 

452 except ( 

453 WebSocketConnectionClosedException, 

454 KeyboardInterrupt, 

455 SSLEOFError, 

456 ) as e: 

457 if custom_dispatcher: 

458 return closed(e) 

459 else: 

460 raise e 

461 

462 if op_code == ABNF.OPCODE_CLOSE: 

463 return closed(frame) 

464 elif op_code == ABNF.OPCODE_PING: 

465 self._callback(self.on_ping, frame.data) 

466 elif op_code == ABNF.OPCODE_PONG: 

467 self.last_pong_tm = time.time() 

468 self._callback(self.on_pong, frame.data) 

469 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

470 self._callback(self.on_data, frame.data, frame.opcode, frame.fin) 

471 self._callback(self.on_cont_message, frame.data, frame.fin) 

472 else: 

473 data = frame.data 

474 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

475 data = data.decode("utf-8") 

476 self._callback(self.on_data, data, frame.opcode, True) 

477 self._callback(self.on_message, data) 

478 

479 return True 

480 

481 def check() -> bool: 

482 if self.ping_timeout: 

483 has_timeout_expired = ( 

484 time.time() - self.last_ping_tm > self.ping_timeout 

485 ) 

486 has_pong_not_arrived_after_last_ping = ( 

487 self.last_pong_tm - self.last_ping_tm < 0 

488 ) 

489 has_pong_arrived_too_late = ( 

490 self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

491 ) 

492 

493 if ( 

494 self.last_ping_tm 

495 and has_timeout_expired 

496 and ( 

497 has_pong_not_arrived_after_last_ping 

498 or has_pong_arrived_too_late 

499 ) 

500 ): 

501 raise WebSocketTimeoutException("ping/pong timed out") 

502 return True 

503 

504 def closed( 

505 e: Union[ 

506 WebSocketConnectionClosedException, 

507 ConnectionRefusedError, 

508 KeyboardInterrupt, 

509 SystemExit, 

510 Exception, 

511 str, 

512 ] = "closed unexpectedly", 

513 ) -> bool: 

514 if type(e) is str: 

515 e = WebSocketConnectionClosedException(e) 

516 return handleDisconnect(e, bool(reconnect)) # type: ignore[arg-type] 

517 

518 def handleDisconnect( 

519 e: Union[ 

520 WebSocketConnectionClosedException, 

521 ConnectionRefusedError, 

522 KeyboardInterrupt, 

523 SystemExit, 

524 Exception, 

525 ], 

526 reconnecting: bool = False, 

527 ) -> bool: 

528 self.has_errored = True 

529 self._stop_ping_thread() 

530 if not reconnecting: 

531 self._callback(self.on_error, e) 

532 

533 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

534 teardown() 

535 # Propagate further 

536 raise 

537 

538 if reconnect: 

539 _logging.info(f"{e} - reconnect") 

540 if custom_dispatcher: 

541 _logging.debug( 

542 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

543 ) 

544 dispatcher.reconnect(reconnect, initialize_socket) 

545 else: 

546 _logging.error(f"{e} - goodbye") 

547 teardown() 

548 return self.has_errored 

549 

550 custom_dispatcher = bool(dispatcher) 

551 dispatcher = self.create_dispatcher( 

552 ping_timeout, dispatcher, parse_url(self.url)[3], closed 

553 ) 

554 

555 try: 

556 initialize_socket() 

557 if not custom_dispatcher and reconnect: 

558 while self.keep_running: 

559 _logging.debug( 

560 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

561 ) 

562 dispatcher.reconnect(reconnect, initialize_socket) 

563 except (KeyboardInterrupt, Exception) as e: 

564 _logging.info(f"tearing down on exception {e}") 

565 teardown() 

566 finally: 

567 if not custom_dispatcher: 

568 # Ensure teardown was called before returning from run_forever 

569 teardown() 

570 

571 return self.has_errored 

572 

573 def create_dispatcher( 

574 self, 

575 ping_timeout: Optional[Union[float, int]], 

576 dispatcher: Optional[DispatcherBase] = None, 

577 is_ssl: bool = False, 

578 handleDisconnect: Callable = None, 

579 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: 

580 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

581 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect) 

582 timeout = ping_timeout or 10 

583 if is_ssl: 

584 return SSLDispatcher(self, timeout) 

585 return Dispatcher(self, timeout) 

586 

587 def _get_close_args(self, close_frame: ABNF) -> list: 

588 """ 

589 _get_close_args extracts the close code and reason from the close body 

590 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

591 """ 

592 # Need to catch the case where close_frame is None 

593 # Otherwise the following if statement causes an error 

594 if not self.on_close or not close_frame: 

595 return [None, None] 

596 

597 # Extract close frame status code 

598 if close_frame.data and len(close_frame.data) >= 2: 

599 close_status_code = 256 * int(close_frame.data[0]) + int( 

600 close_frame.data[1] 

601 ) 

602 reason = close_frame.data[2:] 

603 if isinstance(reason, bytes): 

604 reason = reason.decode("utf-8") 

605 return [close_status_code, reason] 

606 else: 

607 # Most likely reached this because len(close_frame_data.data) < 2 

608 return [None, None] 

609 

610 def _callback(self, callback, *args) -> None: 

611 if callback: 

612 try: 

613 callback(self, *args) 

614 

615 except Exception as e: 

616 _logging.error(f"error from callback {callback}: {e}") 

617 # Bug fix: Prevent infinite recursion by not calling on_error 

618 # when the failing callback IS on_error itself 

619 if self.on_error and callback is not self.on_error: 

620 self.on_error(self, e)