Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_app.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

226 statements  

1import inspect 

2import selectors 

3import socket 

4import threading 

5import time 

6from typing import Any, Callable, Optional, Union 

7 

8from . import _logging 

9from ._abnf import ABNF 

10from ._core import WebSocket, getdefaulttimeout 

11from ._exceptions import ( 

12 WebSocketConnectionClosedException, 

13 WebSocketException, 

14 WebSocketTimeoutException, 

15) 

16from ._ssl_compat import SSLEOFError 

17from ._url import parse_url 

18from ._dispatcher import * 

19 

20""" 

21_app.py 

22websocket - WebSocket client library for Python 

23 

24Copyright 2024 engn33r 

25 

26Licensed under the Apache License, Version 2.0 (the "License"); 

27you may not use this file except in compliance with the License. 

28You may obtain a copy of the License at 

29 

30 http://www.apache.org/licenses/LICENSE-2.0 

31 

32Unless required by applicable law or agreed to in writing, software 

33distributed under the License is distributed on an "AS IS" BASIS, 

34WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

35See the License for the specific language governing permissions and 

36limitations under the License. 

37""" 

38 

39__all__ = ["WebSocketApp"] 

40 

41RECONNECT = 0 

42 

43 

44def setReconnect(reconnectInterval: int) -> None: 

45 global RECONNECT 

46 RECONNECT = reconnectInterval 

47 

48 

49class WebSocketApp: 

50 """ 

51 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

52 """ 

53 

54 def __init__( 

55 self, 

56 url: str, 

57 header: Union[list, dict, Callable, None] = None, 

58 on_open: Optional[Callable[["WebSocketApp"], None]] = None, 

59 on_reconnect: Optional[Callable[["WebSocketApp"], None]] = None, 

60 on_message: Optional[Callable[["WebSocketApp", Any], None]] = None, 

61 on_error: Optional[Callable[["WebSocketApp", Any], None]] = None, 

62 on_close: Optional[Callable[["WebSocketApp", Any, Any], None]] = None, 

63 on_ping: Optional[Callable] = None, 

64 on_pong: Optional[Callable] = None, 

65 on_cont_message: Optional[Callable] = None, 

66 keep_running: bool = True, 

67 get_mask_key: Optional[Callable] = None, 

68 cookie: Optional[str] = None, 

69 subprotocols: Optional[list] = None, 

70 on_data: Optional[Callable] = None, 

71 socket: Optional[socket.socket] = None, 

72 ) -> None: 

73 """ 

74 WebSocketApp initialization 

75 

76 Parameters 

77 ---------- 

78 url: str 

79 Websocket url. 

80 header: list or dict or Callable 

81 Custom header for websocket handshake. 

82 If the parameter is a callable object, it is called just before the connection attempt. 

83 The returned dict or list is used as custom header value. 

84 This could be useful in order to properly setup timestamp dependent headers. 

85 on_open: function 

86 Callback object which is called at opening websocket. 

87 on_open has one argument. 

88 The 1st argument is this class object. 

89 on_reconnect: function 

90 Callback object which is called at reconnecting websocket. 

91 on_reconnect has one argument. 

92 The 1st argument is this class object. 

93 on_message: function 

94 Callback object which is called when received data. 

95 on_message has 2 arguments. 

96 The 1st argument is this class object. 

97 The 2nd argument is utf-8 data received from the server. 

98 on_error: function 

99 Callback object which is called when we get error. 

100 on_error has 2 arguments. 

101 The 1st argument is this class object. 

102 The 2nd argument is exception object. 

103 on_close: function 

104 Callback object which is called when connection is closed. 

105 on_close has 3 arguments. 

106 The 1st argument is this class object. 

107 The 2nd argument is close_status_code. 

108 The 3rd argument is close_msg. 

109 on_cont_message: function 

110 Callback object which is called when a continuation 

111 frame is received. 

112 on_cont_message has 3 arguments. 

113 The 1st argument is this class object. 

114 The 2nd argument is utf-8 string which we get from the server. 

115 The 3rd argument is continue flag. if 0, the data continue 

116 to next frame data 

117 on_data: function 

118 Callback object which is called when a message received. 

119 This is called before on_message or on_cont_message, 

120 and then on_message or on_cont_message is called. 

121 on_data has 4 argument. 

122 The 1st argument is this class object. 

123 The 2nd argument is utf-8 string which we get from the server. 

124 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

125 The 4th argument is continue flag. If 0, the data continue 

126 keep_running: bool 

127 This parameter is obsolete and ignored. 

128 get_mask_key: function 

129 A callable function to get new mask keys, see the 

130 WebSocket.set_mask_key's docstring for more information. 

131 cookie: str 

132 Cookie value. 

133 subprotocols: list 

134 List of available sub protocols. Default is None. 

135 socket: socket 

136 Pre-initialized stream socket. 

137 """ 

138 self.url = url 

139 self.header = header if header is not None else [] 

140 self.cookie = cookie 

141 

142 self.on_open = on_open 

143 self.on_reconnect = on_reconnect 

144 self.on_message = on_message 

145 self.on_data = on_data 

146 self.on_error = on_error 

147 self.on_close = on_close 

148 self.on_ping = on_ping 

149 self.on_pong = on_pong 

150 self.on_cont_message = on_cont_message 

151 self.keep_running = False 

152 self.get_mask_key = get_mask_key 

153 self.sock: Optional[WebSocket] = None 

154 self.last_ping_tm = float(0) 

155 self.last_pong_tm = float(0) 

156 self.ping_thread: Optional[threading.Thread] = None 

157 self.stop_ping: Optional[threading.Event] = None 

158 self.ping_interval = float(0) 

159 self.ping_timeout: Union[float, int, None] = None 

160 self.ping_payload = "" 

161 self.subprotocols = subprotocols 

162 self.prepared_socket = socket 

163 self.has_errored = False 

164 self.has_done_teardown = False 

165 self.has_done_teardown_lock = threading.Lock() 

166 

167 def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None: 

168 """ 

169 send message 

170 

171 Parameters 

172 ---------- 

173 data: str 

174 Message to send. If you set opcode to OPCODE_TEXT, 

175 data must be utf-8 string or unicode. 

176 opcode: int 

177 Operation code of data. Default is OPCODE_TEXT. 

178 """ 

179 

180 if not self.sock or self.sock.send(data, opcode) == 0: 

181 raise WebSocketConnectionClosedException("Connection is already closed.") 

182 

183 def send_text(self, text_data: str) -> None: 

184 """ 

185 Sends UTF-8 encoded text. 

186 """ 

187 if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0: 

188 raise WebSocketConnectionClosedException("Connection is already closed.") 

189 

190 def send_bytes(self, data: Union[bytes, bytearray]) -> None: 

191 """ 

192 Sends a sequence of bytes. 

193 """ 

194 if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0: 

195 raise WebSocketConnectionClosedException("Connection is already closed.") 

196 

197 def close(self, **kwargs) -> None: 

198 """ 

199 Close websocket connection. 

200 """ 

201 self.keep_running = False 

202 if self.sock: 

203 self.sock.close(**kwargs) 

204 self.sock = None 

205 

206 def _start_ping_thread(self) -> None: 

207 self.last_ping_tm = self.last_pong_tm = float(0) 

208 self.stop_ping = threading.Event() 

209 self.ping_thread = threading.Thread(target=self._send_ping) 

210 self.ping_thread.daemon = True 

211 self.ping_thread.start() 

212 

213 def _stop_ping_thread(self) -> None: 

214 if self.stop_ping: 

215 self.stop_ping.set() 

216 if self.ping_thread and self.ping_thread.is_alive(): 

217 self.ping_thread.join(3) 

218 self.last_ping_tm = self.last_pong_tm = float(0) 

219 

220 def _send_ping(self) -> None: 

221 if self.stop_ping.wait(self.ping_interval) or self.keep_running is False: 

222 return 

223 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: 

224 if self.sock: 

225 self.last_ping_tm = time.time() 

226 try: 

227 _logging.debug("Sending ping") 

228 self.sock.ping(self.ping_payload) 

229 except Exception as e: 

230 _logging.debug(f"Failed to send ping: {e}") 

231 

232 def ready(self): 

233 return self.sock and self.sock.connected 

234 

235 def run_forever( 

236 self, 

237 sockopt: tuple = None, 

238 sslopt: dict = None, 

239 ping_interval: Union[float, int] = 0, 

240 ping_timeout: Union[float, int, None] = None, 

241 ping_payload: str = "", 

242 http_proxy_host: str = None, 

243 http_proxy_port: Union[int, str] = None, 

244 http_no_proxy: list = None, 

245 http_proxy_auth: tuple = None, 

246 http_proxy_timeout: Optional[float] = None, 

247 skip_utf8_validation: bool = False, 

248 host: str = None, 

249 origin: str = None, 

250 dispatcher=None, 

251 suppress_origin: bool = False, 

252 proxy_type: str = None, 

253 reconnect: int = None, 

254 ) -> bool: 

255 """ 

256 Run event loop for WebSocket framework. 

257 

258 This loop is an infinite loop and is alive while websocket is available. 

259 

260 Parameters 

261 ---------- 

262 sockopt: tuple 

263 Values for socket.setsockopt. 

264 sockopt must be tuple 

265 and each element is argument of sock.setsockopt. 

266 sslopt: dict 

267 Optional dict object for ssl socket option. 

268 ping_interval: int or float 

269 Automatically send "ping" command 

270 every specified period (in seconds). 

271 If set to 0, no ping is sent periodically. 

272 ping_timeout: int or float 

273 Timeout (in seconds) if the pong message is not received. 

274 ping_payload: str 

275 Payload message to send with each ping. 

276 http_proxy_host: str 

277 HTTP proxy host name. 

278 http_proxy_port: int or str 

279 HTTP proxy port. If not set, set to 80. 

280 http_no_proxy: list 

281 Whitelisted host names that don't use the proxy. 

282 http_proxy_timeout: int or float 

283 HTTP proxy timeout, default is 60 sec as per python-socks. 

284 http_proxy_auth: tuple 

285 HTTP proxy auth information. tuple of username and password. Default is None. 

286 skip_utf8_validation: bool 

287 skip utf8 validation. 

288 host: str 

289 update host header. 

290 origin: str 

291 update origin header. 

292 dispatcher: Dispatcher object 

293 customize reading data from socket. 

294 suppress_origin: bool 

295 suppress outputting origin header. 

296 proxy_type: str 

297 type of proxy from: http, socks4, socks4a, socks5, socks5h 

298 reconnect: int 

299 delay interval when reconnecting 

300 

301 Returns 

302 ------- 

303 teardown: bool 

304 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

305 True if any other exception was raised during a loop. 

306 """ 

307 

308 if reconnect is None: 

309 reconnect = RECONNECT 

310 

311 if ping_timeout is not None and ping_timeout <= 0: 

312 raise WebSocketException("Ensure ping_timeout > 0") 

313 if ping_interval is not None and ping_interval < 0: 

314 raise WebSocketException("Ensure ping_interval >= 0") 

315 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

316 raise WebSocketException("Ensure ping_interval > ping_timeout") 

317 if not sockopt: 

318 sockopt = () 

319 if not sslopt: 

320 sslopt = {} 

321 if self.sock: 

322 raise WebSocketException("socket is already opened") 

323 

324 self.ping_interval = ping_interval 

325 self.ping_timeout = ping_timeout 

326 self.ping_payload = ping_payload 

327 self.has_done_teardown = False 

328 self.keep_running = True 

329 

330 def teardown(close_frame: ABNF = None): 

331 """ 

332 Tears down the connection. 

333 

334 Parameters 

335 ---------- 

336 close_frame: ABNF frame 

337 If close_frame is set, the on_close handler is invoked 

338 with the statusCode and reason from the provided frame. 

339 """ 

340 

341 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. 

342 # To ensure the work is only done once, we use this bool and lock. 

343 with self.has_done_teardown_lock: 

344 if self.has_done_teardown: 

345 return 

346 self.has_done_teardown = True 

347 

348 self._stop_ping_thread() 

349 self.keep_running = False 

350 if self.sock: 

351 self.sock.close() 

352 close_status_code, close_reason = self._get_close_args( 

353 close_frame if close_frame else None 

354 ) 

355 self.sock = None 

356 

357 # Finally call the callback AFTER all teardown is complete 

358 self._callback(self.on_close, close_status_code, close_reason) 

359 

360 def setSock(reconnecting: bool = False) -> None: 

361 if reconnecting and self.sock: 

362 self.sock.shutdown() 

363 

364 self.sock = WebSocket( 

365 self.get_mask_key, 

366 sockopt=sockopt, 

367 sslopt=sslopt, 

368 fire_cont_frame=self.on_cont_message is not None, 

369 skip_utf8_validation=skip_utf8_validation, 

370 enable_multithread=True, 

371 dispatcher=dispatcher, 

372 ) 

373 

374 self.sock.settimeout(getdefaulttimeout()) 

375 try: 

376 header = self.header() if callable(self.header) else self.header 

377 

378 self.sock.connect( 

379 self.url, 

380 header=header, 

381 cookie=self.cookie, 

382 http_proxy_host=http_proxy_host, 

383 http_proxy_port=http_proxy_port, 

384 http_no_proxy=http_no_proxy, 

385 http_proxy_auth=http_proxy_auth, 

386 http_proxy_timeout=http_proxy_timeout, 

387 subprotocols=self.subprotocols, 

388 host=host, 

389 origin=origin, 

390 suppress_origin=suppress_origin, 

391 proxy_type=proxy_type, 

392 socket=self.prepared_socket, 

393 ) 

394 

395 _logging.info("Websocket connected") 

396 

397 if self.ping_interval: 

398 self._start_ping_thread() 

399 

400 if reconnecting and self.on_reconnect: 

401 self._callback(self.on_reconnect) 

402 else: 

403 self._callback(self.on_open) 

404 

405 dispatcher.read(self.sock.sock, read, check) 

406 except ( 

407 WebSocketConnectionClosedException, 

408 ConnectionRefusedError, 

409 KeyboardInterrupt, 

410 SystemExit, 

411 Exception, 

412 ) as e: 

413 handleDisconnect(e, reconnecting) 

414 

415 def read() -> bool: 

416 if not self.keep_running: 

417 return teardown() 

418 

419 try: 

420 op_code, frame = self.sock.recv_data_frame(True) 

421 except ( 

422 WebSocketConnectionClosedException, 

423 KeyboardInterrupt, 

424 SSLEOFError, 

425 ) as e: 

426 if custom_dispatcher: 

427 return closed(e) 

428 else: 

429 raise e 

430 

431 if op_code == ABNF.OPCODE_CLOSE: 

432 return closed(frame) 

433 elif op_code == ABNF.OPCODE_PING: 

434 self._callback(self.on_ping, frame.data) 

435 elif op_code == ABNF.OPCODE_PONG: 

436 self.last_pong_tm = time.time() 

437 self._callback(self.on_pong, frame.data) 

438 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

439 self._callback(self.on_data, frame.data, frame.opcode, frame.fin) 

440 self._callback(self.on_cont_message, frame.data, frame.fin) 

441 else: 

442 data = frame.data 

443 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

444 data = data.decode("utf-8") 

445 self._callback(self.on_data, data, frame.opcode, True) 

446 self._callback(self.on_message, data) 

447 

448 return True 

449 

450 def check() -> bool: 

451 if self.ping_timeout: 

452 has_timeout_expired = ( 

453 time.time() - self.last_ping_tm > self.ping_timeout 

454 ) 

455 has_pong_not_arrived_after_last_ping = ( 

456 self.last_pong_tm - self.last_ping_tm < 0 

457 ) 

458 has_pong_arrived_too_late = ( 

459 self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

460 ) 

461 

462 if ( 

463 self.last_ping_tm 

464 and has_timeout_expired 

465 and ( 

466 has_pong_not_arrived_after_last_ping 

467 or has_pong_arrived_too_late 

468 ) 

469 ): 

470 raise WebSocketTimeoutException("ping/pong timed out") 

471 return True 

472 

473 def closed( 

474 e: Union[ 

475 WebSocketConnectionClosedException, 

476 ConnectionRefusedError, 

477 KeyboardInterrupt, 

478 SystemExit, 

479 Exception, 

480 str, 

481 ] = "closed unexpectedly" 

482 ) -> bool: 

483 if type(e) is str: 

484 e = WebSocketConnectionClosedException(e) 

485 return handleDisconnect(e, bool(reconnect)) 

486 

487 def handleDisconnect( 

488 e: Union[ 

489 WebSocketConnectionClosedException, 

490 ConnectionRefusedError, 

491 KeyboardInterrupt, 

492 SystemExit, 

493 Exception, 

494 ], 

495 reconnecting: bool = False, 

496 ) -> bool: 

497 self.has_errored = True 

498 self._stop_ping_thread() 

499 if not reconnecting: 

500 self._callback(self.on_error, e) 

501 

502 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

503 teardown() 

504 # Propagate further 

505 raise 

506 

507 if reconnect: 

508 _logging.info(f"{e} - reconnect") 

509 if custom_dispatcher: 

510 _logging.debug( 

511 f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

512 ) 

513 dispatcher.reconnect(reconnect, setSock) 

514 else: 

515 _logging.error(f"{e} - goodbye") 

516 teardown() 

517 

518 custom_dispatcher = bool(dispatcher) 

519 dispatcher = self.create_dispatcher( 

520 ping_timeout, dispatcher, parse_url(self.url)[3], closed 

521 ) 

522 

523 try: 

524 setSock() 

525 if not custom_dispatcher and reconnect: 

526 while self.keep_running: 

527 _logging.debug( 

528 f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]" 

529 ) 

530 dispatcher.reconnect(reconnect, setSock) 

531 except (KeyboardInterrupt, Exception) as e: 

532 _logging.info(f"tearing down on exception {e}") 

533 teardown() 

534 finally: 

535 if not custom_dispatcher: 

536 # Ensure teardown was called before returning from run_forever 

537 teardown() 

538 

539 return self.has_errored 

540 

541 def create_dispatcher( 

542 self, 

543 ping_timeout: Union[float, int, None], 

544 dispatcher: Optional[DispatcherBase] = None, 

545 is_ssl: bool = False, 

546 handleDisconnect: Callable = None, 

547 ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]: 

548 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

549 return WrappedDispatcher(self, ping_timeout, dispatcher, handleDisconnect) 

550 timeout = ping_timeout or 10 

551 if is_ssl: 

552 return SSLDispatcher(self, timeout) 

553 return Dispatcher(self, timeout) 

554 

555 def _get_close_args(self, close_frame: ABNF) -> list: 

556 """ 

557 _get_close_args extracts the close code and reason from the close body 

558 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

559 """ 

560 # Need to catch the case where close_frame is None 

561 # Otherwise the following if statement causes an error 

562 if not self.on_close or not close_frame: 

563 return [None, None] 

564 

565 # Extract close frame status code 

566 if close_frame.data and len(close_frame.data) >= 2: 

567 close_status_code = 256 * int(close_frame.data[0]) + int( 

568 close_frame.data[1] 

569 ) 

570 reason = close_frame.data[2:] 

571 if isinstance(reason, bytes): 

572 reason = reason.decode("utf-8") 

573 return [close_status_code, reason] 

574 else: 

575 # Most likely reached this because len(close_frame_data.data) < 2 

576 return [None, None] 

577 

578 def _callback(self, callback, *args) -> None: 

579 if callback: 

580 try: 

581 callback(self, *args) 

582 

583 except Exception as e: 

584 _logging.error(f"error from callback {callback}: {e}") 

585 if self.on_error: 

586 self.on_error(self, e)