Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

273 statements  

1import inspect 

2import socket 

3import threading 

4import time 

5from typing import Any, Callable, List, Optional, Tuple, Union 

6 

7from ._logging import debug, error, info, warning 

8from ._abnf import ABNF 

9from ._core import WebSocket, getdefaulttimeout 

10from ._exceptions import ( 

11 WebSocketConnectionClosedException, 

12 WebSocketException, 

13 WebSocketTimeoutException, 

14) 

15from ._ssl_compat import SSLEOFError 

16from ._url import parse_url 

17from ._dispatcher import Dispatcher, DispatcherBase, SSLDispatcher, WrappedDispatcher 

18 

19""" 

20_app.py 

21websocket - WebSocket client library for Python 

22 

23Copyright 2025 engn33r 

24 

25Licensed under the Apache License, Version 2.0 (the "License"); 

26you may not use this file except in compliance with the License. 

27You may obtain a copy of the License at 

28 

29 http://www.apache.org/licenses/LICENSE-2.0 

30 

31Unless required by applicable law or agreed to in writing, software 

32distributed under the License is distributed on an "AS IS" BASIS, 

33WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

34See the License for the specific language governing permissions and 

35limitations under the License. 

36""" 

37 

38__all__ = ["WebSocketApp"] 

39 

40RECONNECT = 0 

41 

42 

43def set_reconnect(reconnectInterval: int) -> None: 

44 global RECONNECT 

45 RECONNECT = reconnectInterval 

46 

47 

48class WebSocketApp: 

49 """ 

50 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

51 """ 

52 

53 def __init__( 

54 self, 

55 url: str, 

56 header: Optional[ 

57 Union[ 

58 list[str], 

59 dict[str, str], 

60 Callable[[], Union[list[str], dict[str, str]]], 

61 ] 

62 ] = None, 

63 on_open: Optional[Callable[["WebSocketApp"], None]] = None, 

64 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None, 

65 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None, 

66 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None, 

67 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None, 

68 on_ping: Optional[Callable] = None, 

69 on_pong: Optional[Callable] = None, 

70 on_cont_message: Optional[Callable] = None, 

71 keep_running: bool = True, 

72 get_mask_key: Optional[Callable] = None, 

73 cookie: Optional[str] = None, 

74 subprotocols: Optional[list[str]] = None, 

75 on_data: Optional[Callable] = None, 

76 socket: Optional[socket.socket] = None, 

77 ) -> None: 

78 """ 

79 WebSocketApp initialization 

80 

81 Parameters 

82 ---------- 

83 url: str 

84 Websocket url. 

85 header: list or dict or Callable 

86 Custom header for websocket handshake. 

87 If the parameter is a callable object, it is called just before the connection attempt. 

88 The returned dict or list is used as custom header value. 

89 This could be useful in order to properly setup timestamp dependent headers. 

90 on_open: function 

91 Callback object which is called at opening websocket. 

92 on_open has one argument. 

93 The 1st argument is this class object. 

94 on_reconnect: function 

95 Callback object which is called at reconnecting websocket. 

96 on_reconnect has one argument. 

97 The 1st argument is this class object. 

98 on_message: function 

99 Callback object which is called when received data. 

100 on_message has 2 arguments. 

101 The 1st argument is this class object. 

102 The 2nd argument is utf-8 data received from the server. 

103 on_error: function 

104 Callback object which is called when we get error. 

105 on_error has 2 arguments. 

106 The 1st argument is this class object. 

107 The 2nd argument is exception object. 

108 on_close: function 

109 Callback object which is called when connection is closed. 

110 on_close has 3 arguments. 

111 The 1st argument is this class object. 

112 The 2nd argument is close_status_code. 

113 The 3rd argument is close_msg. 

114 on_cont_message: function 

115 Callback object which is called when a continuation 

116 frame is received. 

117 on_cont_message has 3 arguments. 

118 The 1st argument is this class object. 

119 The 2nd argument is utf-8 string which we get from the server. 

120 The 3rd argument is continue flag. if 0, the data continue 

121 to next frame data 

122 on_data: function 

123 Callback object which is called when a message received. 

124 This is called before on_message or on_cont_message, 

125 and then on_message or on_cont_message is called. 

126 on_data has 4 argument. 

127 The 1st argument is this class object. 

128 The 2nd argument is utf-8 string which we get from the server. 

129 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

130 The 4th argument is continue flag. If 0, the data continue 

131 keep_running: bool 

132 This parameter is obsolete and ignored. 

133 get_mask_key: function 

134 A callable function to get new mask keys, see the 

135 WebSocket.set_mask_key's docstring for more information. 

136 cookie: str 

137 Cookie value. 

138 subprotocols: list 

139 List of available sub protocols. Default is None. 

140 socket: socket 

141 Pre-initialized stream socket. 

142 """ 

143 self.url = url 

144 self.header = header if header is not None else [] 

145 self.cookie = cookie 

146 

147 self.on_open = on_open 

148 self.on_reconnect = on_reconnect 

149 self.on_message = on_message 

150 self.on_data = on_data 

151 self.on_error = on_error 

152 self.on_close = on_close 

153 self.on_ping = on_ping 

154 self.on_pong = on_pong 

155 self.on_cont_message = on_cont_message 

156 self.keep_running = False 

157 self.get_mask_key = get_mask_key 

158 self.sock: Optional[WebSocket] = None 

159 self.last_ping_tm = float(0) 

160 self.last_pong_tm = float(0) 

161 self.ping_thread: Optional[threading.Thread] = None 

162 self.stop_ping: Optional[threading.Event] = None 

163 self.ping_interval = float(0) 

164 self.ping_timeout: Optional[Union[float, int]] = None 

165 self.ping_payload = "" 

166 self.subprotocols = subprotocols 

167 self.prepared_socket = socket 

168 self.has_errored = False 

169 self.has_done_teardown = False 

170 self.has_done_teardown_lock = threading.Lock() 

171 

172 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: 

173 """ 

174 send message 

175 

176 Parameters 

177 ---------- 

178 data: str 

179 Message to send. If you set opcode to OPCODE_TEXT, 

180 data must be utf-8 string or unicode. 

181 opcode: int 

182 Operation code of data. Default is OPCODE_TEXT. 

183 """ 

184 

185 if not self.sock or self.sock.send(data, opcode) == 0: 

186 raise WebSocketConnectionClosedException("Connection is already closed.") 

187 

188 def send_text(self, text_data: str) -> None: 

189 """ 

190 Sends UTF-8 encoded text. 

191 """ 

192 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: 

193 raise WebSocketConnectionClosedException("Connection is already closed.") 

194 

195 def send_bytes(self, data: Union[bytes, bytearray]) -> None: 

196 """ 

197 Sends a sequence of bytes. 

198 """ 

199 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: 

200 raise WebSocketConnectionClosedException("Connection is already closed.") 

201 

202 def close(self, **kwargs: Any) -> None: 

203 """ 

204 Close websocket connection. 

205 """ 

206 self.keep_running = False 

207 if self.sock: 

208 self.sock.close(**kwargs) 

209 self.sock = None 

210 

211 def _start_ping_thread(self) -> None: 

212 self.last_ping_tm = self.last_pong_tm = float(0) 

213 self.stop_ping = threading.Event() 

214 self.ping_thread = threading.Thread(target=self._send_ping) 

215 self.ping_thread.daemon = True 

216 self.ping_thread.start() 

217 

218 def _stop_ping_thread(self) -> None: 

219 if self.stop_ping: 

220 self.stop_ping.set() 

221 if self.ping_thread and self.ping_thread.is_alive(): 

222 self.ping_thread.join(3) 

223 # Handle thread leak - if thread doesn't terminate within timeout, 

224 # force cleanup and log warning instead of abandoning the thread 

225 if self.ping_thread.is_alive(): 

226 warning( 

227 "Ping thread failed to terminate within 3 seconds, " 

228 "forcing cleanup. Thread may be blocked." 

229 ) 

230 # Force cleanup by clearing references even if thread is still alive 

231 # The daemon thread will eventually be cleaned up by Python's GC 

232 # but we prevent resource leaks by not holding references 

233 

234 # Always clean up references regardless of thread state 

235 self.ping_thread = None 

236 self.stop_ping = None 

237 self.last_ping_tm = self.last_pong_tm = float(0) 

238 

239 def _send_ping(self) -> None: 

240 if self.stop_ping is None: 

241 return 

242 if self.keep_running is False: 

243 return 

244 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: 

245 if self.sock: 

246 self.last_ping_tm = time.time() 

247 try: 

248 debug("Sending ping") 

249 self.sock.ping(self.ping_payload) 

250 except Exception as e: 

251 debug(f"Failed to send ping: {e}") 

252 

253 def ready(self): 

254 return self.sock and self.sock.connected 

255 

256 def run_forever( 

257 self, 

258 sockopt: Optional[list] = None, 

259 sslopt: Optional[dict] = None, 

260 ping_interval: Union[float, int] = 0, 

261 ping_timeout: Optional[Union[float, int]] = None, 

262 ping_payload: str = "", 

263 http_proxy_host: Optional[str] = None, 

264 http_proxy_port: Optional[Union[int, str]] = None, 

265 http_no_proxy: Optional[list] = None, 

266 http_proxy_auth: Optional[tuple] = None, 

267 http_proxy_timeout: Optional[float] = None, 

268 skip_utf8_validation: bool = False, 

269 host: Optional[str] = None, 

270 origin: Optional[str] = None, 

271 dispatcher: Any = None, 

272 suppress_origin: bool = False, 

273 suppress_host: bool = False, 

274 proxy_type: Optional[str] = None, 

275 reconnect: Optional[int] = None, 

276 ) -> bool: 

277 """ 

278 Run event loop for WebSocket framework. 

279 

280 This loop is an infinite loop and is alive while websocket is available. 

281 

282 Parameters 

283 ---------- 

284 sockopt: tuple 

285 Values for socket.setsockopt. 

286 sockopt must be tuple 

287 and each element is argument of sock.setsockopt. 

288 sslopt: dict 

289 Optional dict object for ssl socket option. 

290 ping_interval: int or float 

291 Automatically send "ping" command 

292 every specified period (in seconds). 

293 If set to 0, no ping is sent periodically. 

294 ping_timeout: int or float 

295 Timeout (in seconds) if the pong message is not received. 

296 ping_payload: str 

297 Payload message to send with each ping. 

298 http_proxy_host: str 

299 HTTP proxy host name. 

300 http_proxy_port: int or str 

301 HTTP proxy port. If not set, set to 80. 

302 http_no_proxy: list 

303 Whitelisted host names that don't use the proxy. 

304 http_proxy_timeout: int or float 

305 HTTP proxy timeout, default is 60 sec as per python-socks. 

306 http_proxy_auth: tuple 

307 HTTP proxy auth information. tuple of username and password. Default is None. 

308 skip_utf8_validation: bool 

309 skip utf8 validation. 

310 host: str 

311 update host header. 

312 origin: str 

313 update origin header. 

314 dispatcher: Dispatcher object 

315 customize reading data from socket. 

316 suppress_origin: bool 

317 suppress outputting origin header. 

318 suppress_host: bool 

319 suppress outputting host header. 

320 proxy_type: str 

321 type of proxy from: http, socks4, socks4a, socks5, socks5h 

322 reconnect: int 

323 delay interval when reconnecting 

324 

325 Returns 

326 ------- 

327 teardown: bool 

328 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

329 True if any other exception was raised during a loop. 

330 """ 

331 

332 if reconnect is None: 

333 reconnect = RECONNECT 

334 

335 if ping_timeout is not None and ping_timeout <= 0: 

336 raise WebSocketException("Ensure ping_timeout > 0") 

337 if ping_interval is not None and ping_interval < 0: 

338 raise WebSocketException("Ensure ping_interval >= 0") 

339 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

340 raise WebSocketException("Ensure ping_interval > ping_timeout") 

341 if not sockopt: 

342 sockopt = [] 

343 if not sslopt: 

344 sslopt = {} 

345 if self.sock: 

346 raise WebSocketException("socket is already opened") 

347 

348 self.ping_interval = ping_interval 

349 self.ping_timeout = ping_timeout 

350 self.ping_payload = ping_payload 

351 self.has_done_teardown = False 

352 self.keep_running = True 

353 

354 def teardown(close_frame: Optional[ABNF] = None) -> None: 

355 """ 

356 Tears down the connection. 

357 

358 Parameters 

359 ---------- 

360 close_frame: ABNF frame 

361 If close_frame is set, the on_close handler is invoked 

362 with the statusCode and reason from the provided frame. 

363 """ 

364 

365 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. 

366 # To ensure the work is only done once, we use this bool and lock. 

367 with self.has_done_teardown_lock: 

368 if self.has_done_teardown: 

369 return 

370 self.has_done_teardown = True 

371 

372 self._stop_ping_thread() 

373 self.keep_running = False 

374 

375 if self.sock: 

376 # in cases like handleDisconnect, the "on_error" callback is called first. If the WebSocketApp 

377 # is being used in a multithreaded application, we nee to make sure that "self.sock" is cleared 

378 # before calling close, otherwise logic built around the sock being set can cause issues - 

379 # specifically calling "run_forever" again, since is checks if "self.sock" is set. 

380 current_sock = self.sock 

381 self.sock = None 

382 current_sock.close() 

383 

384 close_status_code, close_reason = self._get_close_args( 

385 close_frame if close_frame else None 

386 ) 

387 # Finally call the callback AFTER all teardown is complete 

388 self._callback(self.on_close, close_status_code, close_reason) 

389 

390 def initialize_socket(reconnecting: bool = False) -> None: 

391 if reconnecting and self.sock: 

392 self.sock.shutdown() 

393 

394 self.sock = WebSocket( 

395 self.get_mask_key, 

396 sockopt=sockopt, 

397 sslopt=sslopt, 

398 fire_cont_frame=self.on_cont_message is not None, 

399 skip_utf8_validation=skip_utf8_validation, 

400 enable_multithread=True, 

401 dispatcher=dispatcher, 

402 ) 

403 

404 self.sock.settimeout(getdefaulttimeout()) 

405 try: 

406 header = self.header() if callable(self.header) else self.header 

407 

408 self.sock.connect( 

409 self.url, 

410 header=header, 

411 cookie=self.cookie, 

412 http_proxy_host=http_proxy_host, 

413 http_proxy_port=http_proxy_port, 

414 http_no_proxy=http_no_proxy, 

415 http_proxy_auth=http_proxy_auth, 

416 http_proxy_timeout=http_proxy_timeout, 

417 subprotocols=self.subprotocols, 

418 host=host, 

419 origin=origin, 

420 suppress_origin=suppress_origin, 

421 suppress_host=suppress_host, 

422 proxy_type=proxy_type, 

423 socket=self.prepared_socket, 

424 ) 

425 

426 info("Websocket connected") 

427 

428 if self.ping_interval: 

429 self._start_ping_thread() 

430 

431 if reconnecting and self.on_reconnect: 

432 self._callback(self.on_reconnect) 

433 else: 

434 self._callback(self.on_open) 

435 

436 assert dispatcher is not None 

437 dispatcher.read(self.sock.sock, read, check) 

438 except ( 

439 WebSocketConnectionClosedException, 

440 ConnectionRefusedError, 

441 KeyboardInterrupt, 

442 SystemExit, 

443 Exception, 

444 ) as e: 

445 handleDisconnect(e, reconnecting) 

446 

447 def read() -> bool: 

448 if not self.keep_running: 

449 teardown() 

450 return False 

451 

452 if self.sock is None: 

453 return False 

454 

455 try: 

456 op_code, frame = self.sock.recv_data_frame(True) 

457 except ( 

458 WebSocketConnectionClosedException, 

459 KeyboardInterrupt, 

460 SSLEOFError, 

461 ) as e: 

462 if custom_dispatcher: 

463 return closed(e) 

464 else: 

465 raise e 

466 

467 if op_code == ABNF.OPCODE_CLOSE: 

468 return closed(frame) 

469 elif op_code == ABNF.OPCODE_PING: 

470 self._callback(self.on_ping, frame.data) 

471 elif op_code == ABNF.OPCODE_PONG: 

472 self.last_pong_tm = time.time() 

473 self._callback(self.on_pong, frame.data) 

474 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

475 self._callback(self.on_data, frame.data, frame.opcode, frame.fin) 

476 self._callback(self.on_cont_message, frame.data, frame.fin) 

477 else: 

478 data = frame.data 

479 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

480 data = data.decode("utf-8") 

481 self._callback(self.on_data, data, frame.opcode, True) 

482 self._callback(self.on_message, data) 

483 

484 return True 

485 

486 def check() -> bool: 

487 if self.ping_timeout: 

488 has_timeout_expired = ( 

489 time.time() - self.last_ping_tm > self.ping_timeout 

490 ) 

491 has_pong_not_arrived_after_last_ping = ( 

492 self.last_pong_tm - self.last_ping_tm < 0 

493 ) 

494 has_pong_arrived_too_late = ( 

495 self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

496 ) 

497 

498 if ( 

499 self.last_ping_tm 

500 and has_timeout_expired 

501 and ( 

502 has_pong_not_arrived_after_last_ping 

503 or has_pong_arrived_too_late 

504 ) 

505 ): 

506 raise WebSocketTimeoutException("ping/pong timed out") 

507 return True 

508 

509 def closed( 

510 e: Union[ 

511 WebSocketConnectionClosedException, 

512 ConnectionRefusedError, 

513 KeyboardInterrupt, 

514 SystemExit, 

515 Exception, 

516 str, 

517 "ABNF", # Now explicitly handle ABNF frame objects 

518 ] = "closed unexpectedly", 

519 ) -> bool: 

520 close_frame: Optional[ABNF] = None 

521 if type(e) is str: 

522 e = WebSocketConnectionClosedException(e) 

523 elif isinstance(e, ABNF) and e.opcode == ABNF.OPCODE_CLOSE: 

524 close_frame = e 

525 # Convert close frames to a descriptive exception for on_error callback 

526 close_status_code, close_reason = self._parse_close_frame(e) 

527 reason_parts: List[str] = [] 

528 if close_status_code is None: 

529 message = "Connection closed" 

530 elif close_status_code == 1000: 

531 message = "Connection closed normally (code 1000)" 

532 else: 

533 message = f"Connection closed (code {close_status_code})" 

534 if close_reason: 

535 reason_parts.append(close_reason) 

536 if reason_parts: 

537 message = f"{message}: {'; '.join(reason_parts)}" 

538 converted = WebSocketConnectionClosedException(message) 

539 setattr(converted, "status_code", close_status_code) 

540 setattr(converted, "reason", close_reason) 

541 e = converted 

542 return handleDisconnect(e, bool(reconnect), close_frame=close_frame) # type: ignore[arg-type] 

543 

544 def handleDisconnect( 

545 e: Union[ 

546 WebSocketConnectionClosedException, 

547 ConnectionRefusedError, 

548 KeyboardInterrupt, 

549 SystemExit, 

550 Exception, 

551 ], 

552 reconnecting: bool = False, 

553 close_frame: Optional[ABNF] = None, 

554 ) -> bool: 

555 self.has_errored = True 

556 self._stop_ping_thread() 

557 if not reconnecting: 

558 self._callback(self.on_error, e) 

559 

560 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

561 teardown(close_frame) 

562 # Propagate further 

563 raise 

564 

565 if reconnect: 

566 info(f"{e} - reconnect") 

567 if custom_dispatcher: 

568 debug( 

569 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

570 ) 

571 assert dispatcher is not None 

572 dispatcher.reconnect(reconnect, initialize_socket) 

573 else: 

574 error(f"{e} - goodbye") 

575 teardown(close_frame) 

576 return self.has_errored 

577 

578 custom_dispatcher = bool(dispatcher) 

579 dispatcher = self.create_dispatcher( 

580 ping_timeout, dispatcher, parse_url(self.url)[3], closed 

581 ) 

582 

583 try: 

584 initialize_socket() 

585 if not custom_dispatcher and reconnect: 

586 while self.keep_running: 

587 debug( 

588 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

589 ) 

590 dispatcher.reconnect(reconnect, initialize_socket) 

591 except (KeyboardInterrupt, Exception) as e: 

592 info(f"tearing down on exception {e}") 

593 teardown() 

594 finally: 

595 if not custom_dispatcher: 

596 # Ensure teardown was called before returning from run_forever 

597 teardown() 

598 

599 return self.has_errored 

600 

601 def create_dispatcher( 

602 self, 

603 ping_timeout: Optional[Union[float, int]], 

604 dispatcher: Optional[DispatcherBase] = None, 

605 is_ssl: bool = False, 

606 handleDisconnect: Callable = None, 

607 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: 

608 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

609 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect) 

610 timeout = ping_timeout or 10 

611 if is_ssl: 

612 return SSLDispatcher(self, timeout) 

613 return Dispatcher(self, timeout) 

614 

615 def _get_close_args( 

616 self, close_frame: Optional[ABNF] 

617 ) -> List[Optional[Union[int, str]]]: 

618 """ 

619 _get_close_args extracts the close code and reason from the close body 

620 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

621 """ 

622 # Need to catch the case where close_frame is None 

623 # Otherwise the following if statement causes an error 

624 if not close_frame: 

625 return [None, None] 

626 close_status_code, reason = self._parse_close_frame(close_frame) 

627 if not self.on_close: 

628 return [None, None] 

629 return [close_status_code, reason] 

630 

631 def _parse_close_frame( 

632 self, close_frame: Optional[ABNF] 

633 ) -> Tuple[Optional[int], Optional[str]]: 

634 """ 

635 Parse a close frame into status code and UTF-8 reason text. 

636 """ 

637 if not close_frame or not getattr(close_frame, "data", None): 

638 return (None, None) 

639 

640 data = close_frame.data 

641 if isinstance(data, bytes): 

642 data_bytes = data 

643 elif isinstance(data, str): 

644 data_bytes = data.encode("utf-8") 

645 else: 

646 data_bytes = bytes(data) 

647 

648 if len(data_bytes) < 2: 

649 return (None, None) 

650 

651 close_status_code = 256 * int(data_bytes[0]) + int(data_bytes[1]) 

652 reason_bytes = data_bytes[2:] 

653 

654 reason: Optional[str] 

655 if not reason_bytes: 

656 reason = None 

657 else: 

658 try: 

659 reason = reason_bytes.decode("utf-8") 

660 except UnicodeDecodeError: 

661 reason = reason_bytes.decode("utf-8", errors="replace") 

662 

663 return (close_status_code, reason) 

664 

665 def _callback(self, callback: Optional[Callable], *args: Any) -> None: 

666 if callback: 

667 try: 

668 callback(self, *args) 

669 

670 except Exception as e: 

671 error(f"error from callback {callback}: {e}") 

672 # Bug fix: Prevent infinite recursion by not calling on_error 

673 # when the failing callback IS on_error itself 

674 if self.on_error and callback is not self.on_error: 

675 self.on_error(self, e)