Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

280 statements  

1import inspect 

2import socket 

3import threading 

4import time 

5from typing import Any, Callable, List, Optional, Tuple, Union 

6 

7from ._logging import debug, error, info, warning 

8from ._abnf import ABNF 

9from ._core import WebSocket, getdefaulttimeout 

10from ._exceptions import ( 

11 WebSocketConnectionClosedException, 

12 WebSocketException, 

13 WebSocketTimeoutException, 

14) 

15from ._ssl_compat import SSLError 

16from ._url import parse_url 

17from ._dispatcher import Dispatcher, DispatcherBase, SSLDispatcher, WrappedDispatcher 

18 

19""" 

20_app.py 

21websocket - WebSocket client library for Python 

22 

23Copyright 2025 engn33r 

24 

25Licensed under the Apache License, Version 2.0 (the "License"); 

26you may not use this file except in compliance with the License. 

27You may obtain a copy of the License at 

28 

29 http://www.apache.org/licenses/LICENSE-2.0 

30 

31Unless required by applicable law or agreed to in writing, software 

32distributed under the License is distributed on an "AS IS" BASIS, 

33WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

34See the License for the specific language governing permissions and 

35limitations under the License. 

36""" 

37 

38__all__ = ["WebSocketApp"] 

39 

40RECONNECT = 0 

41 

42 

43def set_reconnect(reconnectInterval: int) -> None: 

44 global RECONNECT 

45 RECONNECT = reconnectInterval 

46 

47 

48class WebSocketApp: 

49 """ 

50 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

51 """ 

52 

53 def __init__( 

54 self, 

55 url: str, 

56 header: Optional[ 

57 Union[ 

58 list[str], 

59 dict[str, str], 

60 Callable[[], Union[list[str], dict[str, str]]], 

61 ] 

62 ] = None, 

63 on_open: Optional[Callable[["WebSocketApp"], None]] = None, 

64 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None, 

65 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None, 

66 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None, 

67 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None, 

68 on_ping: Optional[Callable] = None, 

69 on_pong: Optional[Callable] = None, 

70 on_cont_message: Optional[Callable] = None, 

71 keep_running: bool = True, 

72 get_mask_key: Optional[Callable] = None, 

73 cookie: Optional[str] = None, 

74 subprotocols: Optional[list[str]] = None, 

75 on_data: Optional[Callable] = None, 

76 socket: Optional[socket.socket] = None, 

77 ) -> None: 

78 """ 

79 WebSocketApp initialization 

80 

81 Parameters 

82 ---------- 

83 url: str 

84 Websocket url. 

85 header: list or dict or Callable 

86 Custom header for websocket handshake. 

87 If the parameter is a callable object, it is called just before the connection attempt. 

88 The returned dict or list is used as custom header value. 

89 This could be useful in order to properly setup timestamp dependent headers. 

90 on_open: function 

91 Callback object which is called at opening websocket. 

92 on_open has one argument. 

93 The 1st argument is this class object. 

94 on_reconnect: function 

95 Callback object which is called at reconnecting websocket. 

96 on_reconnect has one argument. 

97 The 1st argument is this class object. 

98 on_message: function 

99 Callback object which is called when received data. 

100 on_message has 2 arguments. 

101 The 1st argument is this class object. 

102 The 2nd argument is utf-8 data received from the server. 

103 on_error: function 

104 Callback object which is called when we get error. 

105 on_error has 2 arguments. 

106 The 1st argument is this class object. 

107 The 2nd argument is exception object. 

108 on_close: function 

109 Callback object which is called when connection is closed. 

110 on_close has 3 arguments. 

111 The 1st argument is this class object. 

112 The 2nd argument is close_status_code. 

113 The 3rd argument is close_msg. 

114 on_cont_message: function 

115 Callback object which is called when a continuation 

116 frame is received. 

117 on_cont_message has 3 arguments. 

118 The 1st argument is this class object. 

119 The 2nd argument is utf-8 string which we get from the server. 

120 The 3rd argument is continue flag. if 0, the data continue 

121 to next frame data 

122 on_data: function 

123 Callback object which is called when a message received. 

124 This is called before on_message or on_cont_message, 

125 and then on_message or on_cont_message is called. 

126 on_data has 4 argument. 

127 The 1st argument is this class object. 

128 The 2nd argument is utf-8 string which we get from the server. 

129 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

130 The 4th argument is continue flag. If 0, the data continue 

131 keep_running: bool 

132 This parameter is obsolete and ignored. 

133 get_mask_key: function 

134 A callable function to get new mask keys, see the 

135 WebSocket.set_mask_key's docstring for more information. 

136 cookie: str 

137 Cookie value. 

138 subprotocols: list 

139 List of available sub protocols. Default is None. 

140 socket: socket 

141 Pre-initialized stream socket. 

142 """ 

143 self.url = url 

144 self.header = header if header is not None else [] 

145 self.cookie = cookie 

146 

147 self.on_open = on_open 

148 self.on_reconnect = on_reconnect 

149 self.on_message = on_message 

150 self.on_data = on_data 

151 self.on_error = on_error 

152 self.on_close = on_close 

153 self.on_ping = on_ping 

154 self.on_pong = on_pong 

155 self.on_cont_message = on_cont_message 

156 self.keep_running = False 

157 self.get_mask_key = get_mask_key 

158 self.sock: Optional[WebSocket] = None 

159 self.last_ping_tm = float(0) 

160 self.last_pong_tm = float(0) 

161 self.ping_thread: Optional[threading.Thread] = None 

162 self.stop_ping: Optional[threading.Event] = None 

163 self.ping_interval = float(0) 

164 self.ping_timeout: Optional[Union[float, int]] = None 

165 self.ping_payload = "" 

166 self.subprotocols = subprotocols 

167 self.prepared_socket = socket 

168 self.has_errored = False 

169 self.has_done_teardown = False 

170 self.has_done_teardown_lock = threading.Lock() 

171 self.last_close_frame: Optional[ABNF] = None 

172 

173 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: 

174 """ 

175 send message 

176 

177 Parameters 

178 ---------- 

179 data: str 

180 Message to send. If you set opcode to OPCODE_TEXT, 

181 data must be utf-8 string or unicode. 

182 opcode: int 

183 Operation code of data. Default is OPCODE_TEXT. 

184 """ 

185 

186 if not self.sock or self.sock.send(data, opcode) == 0: 

187 raise WebSocketConnectionClosedException("Connection is already closed.") 

188 

189 def send_text(self, text_data: str) -> None: 

190 """ 

191 Sends UTF-8 encoded text. 

192 """ 

193 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: 

194 raise WebSocketConnectionClosedException("Connection is already closed.") 

195 

196 def send_bytes(self, data: Union[bytes, bytearray]) -> None: 

197 """ 

198 Sends a sequence of bytes. 

199 """ 

200 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: 

201 raise WebSocketConnectionClosedException("Connection is already closed.") 

202 

203 def close(self, **kwargs: Any) -> None: 

204 """ 

205 Close websocket connection. 

206 """ 

207 self.keep_running = False 

208 if self.sock: 

209 self.sock.close(**kwargs) 

210 # Capture the peer's close frame before clearing socket reference 

211 if self.sock.close_frame is not None: 

212 self.last_close_frame = self.sock.close_frame 

213 self.sock = None 

214 

215 def _start_ping_thread(self) -> None: 

216 self.last_ping_tm = self.last_pong_tm = float(0) 

217 self.stop_ping = threading.Event() 

218 self.ping_thread = threading.Thread(target=self._send_ping) 

219 self.ping_thread.daemon = True 

220 self.ping_thread.start() 

221 

222 def _stop_ping_thread(self) -> None: 

223 if self.stop_ping: 

224 self.stop_ping.set() 

225 if self.ping_thread and self.ping_thread.is_alive(): 

226 self.ping_thread.join(3) 

227 # Handle thread leak - if thread doesn't terminate within timeout, 

228 # force cleanup and log warning instead of abandoning the thread 

229 if self.ping_thread.is_alive(): 

230 warning( 

231 "Ping thread failed to terminate within 3 seconds, " 

232 "forcing cleanup. Thread may be blocked." 

233 ) 

234 # Force cleanup by clearing references even if thread is still alive 

235 # The daemon thread will eventually be cleaned up by Python's GC 

236 # but we prevent resource leaks by not holding references 

237 

238 # Always clean up references regardless of thread state 

239 self.ping_thread = None 

240 self.stop_ping = None 

241 self.last_ping_tm = self.last_pong_tm = float(0) 

242 

243 def _send_ping(self) -> None: 

244 if self.stop_ping is None: 

245 return 

246 if self.keep_running is False: 

247 return 

248 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: 

249 if self.sock: 

250 self.last_ping_tm = time.time() 

251 try: 

252 debug("Sending ping") 

253 self.sock.ping(self.ping_payload) 

254 except Exception as e: 

255 debug(f"Failed to send ping: {e}") 

256 

257 def ready(self): 

258 return self.sock and self.sock.connected 

259 

260 def run_forever( 

261 self, 

262 sockopt: Optional[list] = None, 

263 sslopt: Optional[dict] = None, 

264 ping_interval: Union[float, int] = 0, 

265 ping_timeout: Optional[Union[float, int]] = None, 

266 ping_payload: str = "", 

267 http_proxy_host: Optional[str] = None, 

268 http_proxy_port: Optional[Union[int, str]] = None, 

269 http_no_proxy: Optional[list] = None, 

270 http_proxy_auth: Optional[tuple] = None, 

271 http_proxy_timeout: Optional[float] = None, 

272 skip_utf8_validation: bool = False, 

273 host: Optional[str] = None, 

274 origin: Optional[str] = None, 

275 dispatcher: Any = None, 

276 suppress_origin: bool = False, 

277 suppress_host: bool = False, 

278 proxy_type: Optional[str] = None, 

279 reconnect: Optional[int] = None, 

280 ) -> bool: 

281 """ 

282 Run event loop for WebSocket framework. 

283 

284 This loop is an infinite loop and is alive while websocket is available. 

285 

286 Parameters 

287 ---------- 

288 sockopt: tuple 

289 Values for socket.setsockopt. 

290 sockopt must be tuple 

291 and each element is argument of sock.setsockopt. 

292 sslopt: dict 

293 Optional dict object for ssl socket option. 

294 ping_interval: int or float 

295 Automatically send "ping" command 

296 every specified period (in seconds). 

297 If set to 0, no ping is sent periodically. 

298 ping_timeout: int or float 

299 Timeout (in seconds) if the pong message is not received. 

300 ping_payload: str 

301 Payload message to send with each ping. 

302 http_proxy_host: str 

303 HTTP proxy host name. 

304 http_proxy_port: int or str 

305 HTTP proxy port. If not set, set to 80. 

306 http_no_proxy: list 

307 Whitelisted host names that don't use the proxy. 

308 http_proxy_timeout: int or float 

309 HTTP proxy timeout, default is 60 sec as per python-socks. 

310 http_proxy_auth: tuple 

311 HTTP proxy auth information. tuple of username and password. Default is None. 

312 skip_utf8_validation: bool 

313 skip utf8 validation. 

314 host: str 

315 update host header. 

316 origin: str 

317 update origin header. 

318 dispatcher: Dispatcher object 

319 customize reading data from socket. 

320 suppress_origin: bool 

321 suppress outputting origin header. 

322 suppress_host: bool 

323 suppress outputting host header. 

324 proxy_type: str 

325 type of proxy from: http, socks4, socks4a, socks5, socks5h 

326 reconnect: int 

327 delay interval when reconnecting 

328 

329 Returns 

330 ------- 

331 teardown: bool 

332 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

333 True if any other exception was raised during a loop. 

334 """ 

335 

336 if reconnect is None: 

337 reconnect = RECONNECT 

338 

339 if ping_timeout is not None and ping_timeout <= 0: 

340 raise WebSocketException("Ensure ping_timeout > 0") 

341 if ping_interval is not None and ping_interval < 0: 

342 raise WebSocketException("Ensure ping_interval >= 0") 

343 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

344 raise WebSocketException("Ensure ping_interval > ping_timeout") 

345 if not sockopt: 

346 sockopt = [] 

347 if not sslopt: 

348 sslopt = {} 

349 if self.sock: 

350 raise WebSocketException("socket is already opened") 

351 

352 self.ping_interval = ping_interval 

353 self.ping_timeout = ping_timeout 

354 self.ping_payload = ping_payload 

355 self.has_done_teardown = False 

356 self.has_errored = False 

357 self.keep_running = True 

358 

359 def teardown(close_frame: Optional[ABNF] = None) -> None: 

360 """ 

361 Tears down the connection. 

362 

363 Parameters 

364 ---------- 

365 close_frame: ABNF frame 

366 If close_frame is set, the on_close handler is invoked 

367 with the statusCode and reason from the provided frame. 

368 """ 

369 

370 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. 

371 # To ensure the work is only done once, we use this bool and lock. 

372 with self.has_done_teardown_lock: 

373 if self.has_done_teardown: 

374 return 

375 self.has_done_teardown = True 

376 

377 self._stop_ping_thread() 

378 self.keep_running = False 

379 

380 if self.sock: 

381 # in cases like handleDisconnect, the "on_error" callback is called first. If the WebSocketApp 

382 # is being used in a multithreaded application, we nee to make sure that "self.sock" is cleared 

383 # before calling close, otherwise logic built around the sock being set can cause issues - 

384 # specifically calling "run_forever" again, since is checks if "self.sock" is set. 

385 current_sock = self.sock 

386 self.sock = None 

387 current_sock.close() 

388 

389 # Use stored close frame as fallback if none provided (e.g., client-initiated close) 

390 effective_close_frame = ( 

391 close_frame if close_frame else self.last_close_frame 

392 ) 

393 close_status_code, close_reason = self._get_close_args( 

394 effective_close_frame 

395 ) 

396 # Finally call the callback AFTER all teardown is complete 

397 self._callback(self.on_close, close_status_code, close_reason) 

398 

399 def initialize_socket(reconnecting: bool = False) -> None: 

400 if reconnecting and self.sock: 

401 self.sock.shutdown() 

402 

403 # Reset close frame to avoid stale data from previous connections 

404 self.last_close_frame = None 

405 

406 self.sock = WebSocket( 

407 self.get_mask_key, 

408 sockopt=sockopt, 

409 sslopt=sslopt, 

410 fire_cont_frame=self.on_cont_message is not None, 

411 skip_utf8_validation=skip_utf8_validation, 

412 enable_multithread=True, 

413 dispatcher=dispatcher, 

414 ) 

415 

416 self.sock.settimeout(getdefaulttimeout()) 

417 try: 

418 header = self.header() if callable(self.header) else self.header 

419 

420 self.sock.connect( 

421 self.url, 

422 header=header, 

423 cookie=self.cookie, 

424 http_proxy_host=http_proxy_host, 

425 http_proxy_port=http_proxy_port, 

426 http_no_proxy=http_no_proxy, 

427 http_proxy_auth=http_proxy_auth, 

428 http_proxy_timeout=http_proxy_timeout, 

429 subprotocols=self.subprotocols, 

430 host=host, 

431 origin=origin, 

432 suppress_origin=suppress_origin, 

433 suppress_host=suppress_host, 

434 proxy_type=proxy_type, 

435 socket=self.prepared_socket, 

436 ) 

437 

438 info("Websocket connected") 

439 

440 if self.ping_interval: 

441 self._start_ping_thread() 

442 

443 if reconnecting and self.on_reconnect: 

444 self._callback(self.on_reconnect) 

445 else: 

446 self._callback(self.on_open) 

447 

448 assert dispatcher is not None 

449 dispatcher.read(self.sock.sock, read, check) 

450 except ( 

451 WebSocketConnectionClosedException, 

452 ConnectionRefusedError, 

453 KeyboardInterrupt, 

454 SystemExit, 

455 Exception, 

456 ) as e: 

457 handleDisconnect(e, reconnecting) 

458 

459 def read() -> bool: 

460 if not self.keep_running: 

461 teardown() 

462 return False 

463 

464 if self.sock is None: 

465 return False 

466 

467 try: 

468 op_code, frame = self.sock.recv_data_frame(True) 

469 except ( 

470 WebSocketConnectionClosedException, 

471 KeyboardInterrupt, 

472 SSLError, 

473 ConnectionResetError, 

474 WebSocketTimeoutException, 

475 ) as e: 

476 if custom_dispatcher: 

477 return closed(e) 

478 else: 

479 raise e 

480 

481 if op_code == ABNF.OPCODE_CLOSE: 

482 return closed(frame) 

483 elif op_code == ABNF.OPCODE_PING: 

484 self._callback(self.on_ping, frame.data) 

485 elif op_code == ABNF.OPCODE_PONG: 

486 self.last_pong_tm = time.time() 

487 self._callback(self.on_pong, frame.data) 

488 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

489 self._callback(self.on_data, frame.data, frame.opcode, frame.fin) 

490 self._callback(self.on_cont_message, frame.data, frame.fin) 

491 else: 

492 data = frame.data 

493 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

494 data = data.decode("utf-8") 

495 self._callback(self.on_data, data, frame.opcode, True) 

496 self._callback(self.on_message, data) 

497 

498 return True 

499 

500 def check() -> bool: 

501 if self.ping_timeout: 

502 has_timeout_expired = ( 

503 time.time() - self.last_ping_tm > self.ping_timeout 

504 ) 

505 has_pong_not_arrived_after_last_ping = ( 

506 self.last_pong_tm - self.last_ping_tm < 0 

507 ) 

508 has_pong_arrived_too_late = ( 

509 self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

510 ) 

511 

512 if ( 

513 self.last_ping_tm 

514 and has_timeout_expired 

515 and ( 

516 has_pong_not_arrived_after_last_ping 

517 or has_pong_arrived_too_late 

518 ) 

519 ): 

520 raise WebSocketTimeoutException("ping/pong timed out") 

521 return True 

522 

523 def closed( 

524 e: Union[ 

525 WebSocketConnectionClosedException, 

526 ConnectionRefusedError, 

527 KeyboardInterrupt, 

528 SystemExit, 

529 Exception, 

530 str, 

531 "ABNF", # Now explicitly handle ABNF frame objects 

532 ] = "closed unexpectedly", 

533 ) -> bool: 

534 close_frame: Optional[ABNF] = None 

535 if type(e) is str: 

536 e = WebSocketConnectionClosedException(e) 

537 elif isinstance(e, ABNF) and e.opcode == ABNF.OPCODE_CLOSE: 

538 close_frame = e 

539 # Convert close frames to a descriptive exception for on_error callback 

540 close_status_code, close_reason = self._parse_close_frame(e) 

541 reason_parts: List[str] = [] 

542 if close_status_code is None: 

543 message = "Connection closed" 

544 elif close_status_code == 1000: 

545 message = "Connection closed normally (code 1000)" 

546 else: 

547 message = f"Connection closed (code {close_status_code})" 

548 if close_reason: 

549 reason_parts.append(close_reason) 

550 if reason_parts: 

551 message = f"{message}: {'; '.join(reason_parts)}" 

552 converted = WebSocketConnectionClosedException(message) 

553 setattr(converted, "status_code", close_status_code) 

554 setattr(converted, "reason", close_reason) 

555 e = converted 

556 return handleDisconnect(e, bool(reconnect), close_frame=close_frame) # type: ignore[arg-type] 

557 

558 def handleDisconnect( 

559 e: Union[ 

560 WebSocketConnectionClosedException, 

561 ConnectionRefusedError, 

562 KeyboardInterrupt, 

563 SystemExit, 

564 Exception, 

565 ], 

566 reconnecting: bool = False, 

567 close_frame: Optional[ABNF] = None, 

568 ) -> bool: 

569 if close_frame is None: 

570 self.has_errored = True 

571 self._stop_ping_thread() 

572 if not reconnecting: 

573 self._callback(self.on_error, e) 

574 

575 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

576 teardown(close_frame) 

577 # Propagate further 

578 raise 

579 

580 if reconnect: 

581 info(f"{e} - reconnect") 

582 if custom_dispatcher: 

583 debug( 

584 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

585 ) 

586 assert dispatcher is not None 

587 dispatcher.reconnect(reconnect, initialize_socket) 

588 else: 

589 error(f"{e} - goodbye") 

590 teardown(close_frame) 

591 return self.has_errored 

592 

593 custom_dispatcher = bool(dispatcher) 

594 dispatcher = self.create_dispatcher( 

595 ping_timeout, dispatcher, parse_url(self.url)[3], closed 

596 ) 

597 

598 try: 

599 initialize_socket() 

600 if not custom_dispatcher and reconnect: 

601 while self.keep_running: 

602 debug( 

603 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

604 ) 

605 dispatcher.reconnect(reconnect, initialize_socket) 

606 except (KeyboardInterrupt, Exception) as e: 

607 info(f"tearing down on exception {e}") 

608 teardown() 

609 finally: 

610 if not custom_dispatcher: 

611 # Ensure teardown was called before returning from run_forever 

612 teardown() 

613 

614 return self.has_errored 

615 

616 def create_dispatcher( 

617 self, 

618 ping_timeout: Optional[Union[float, int]], 

619 dispatcher: Optional[DispatcherBase] = None, 

620 is_ssl: bool = False, 

621 handleDisconnect: Callable = None, 

622 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: 

623 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

624 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect) 

625 timeout = ping_timeout or 10 

626 if is_ssl: 

627 return SSLDispatcher(self, timeout) 

628 return Dispatcher(self, timeout) 

629 

630 def _get_close_args( 

631 self, close_frame: Optional[ABNF] 

632 ) -> List[Optional[Union[int, str]]]: 

633 """ 

634 _get_close_args extracts the close code and reason from the close body 

635 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

636 """ 

637 # Need to catch the case where close_frame is None 

638 # Otherwise the following if statement causes an error 

639 if not close_frame: 

640 return [None, None] 

641 close_status_code, reason = self._parse_close_frame(close_frame) 

642 if not self.on_close: 

643 return [None, None] 

644 return [close_status_code, reason] 

645 

646 def _parse_close_frame( 

647 self, close_frame: Optional[ABNF] 

648 ) -> Tuple[Optional[int], Optional[str]]: 

649 """ 

650 Parse a close frame into status code and UTF-8 reason text. 

651 """ 

652 if not close_frame or not getattr(close_frame, "data", None): 

653 return (None, None) 

654 

655 data = close_frame.data 

656 if isinstance(data, bytes): 

657 data_bytes = data 

658 elif isinstance(data, str): 

659 data_bytes = data.encode("utf-8") 

660 else: 

661 data_bytes = bytes(data) 

662 

663 if len(data_bytes) < 2: 

664 return (None, None) 

665 

666 close_status_code = 256 * int(data_bytes[0]) + int(data_bytes[1]) 

667 reason_bytes = data_bytes[2:] 

668 

669 reason: Optional[str] 

670 if not reason_bytes: 

671 reason = None 

672 else: 

673 try: 

674 reason = reason_bytes.decode("utf-8") 

675 except UnicodeDecodeError: 

676 reason = reason_bytes.decode("utf-8", errors="replace") 

677 

678 return (close_status_code, reason) 

679 

680 def _callback(self, callback: Optional[Callable], *args: Any) -> None: 

681 if callback: 

682 try: 

683 callback(self, *args) 

684 

685 except Exception as e: 

686 error(f"error from callback {callback}: {e}") 

687 # Bug fix: Prevent infinite recursion by not calling on_error 

688 # when the failing callback IS on_error itself 

689 if self.on_error and callback is not self.on_error: 

690 self.on_error(self, e)