Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_app.py: 15%

267 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:34 +0000

1import inspect 

2import selectors 

3import socket 

4import sys 

5import threading 

6import time 

7import traceback 

8 

9from typing import Any, Callable, Optional, Union 

10 

11from . import _logging 

12from ._abnf import ABNF 

13from ._url import parse_url 

14from ._core import WebSocket, getdefaulttimeout 

15from ._exceptions import * 

16 

17""" 

18_app.py 

19websocket - WebSocket client library for Python 

20 

21Copyright 2023 engn33r 

22 

23Licensed under the Apache License, Version 2.0 (the "License"); 

24you may not use this file except in compliance with the License. 

25You may obtain a copy of the License at 

26 

27 http://www.apache.org/licenses/LICENSE-2.0 

28 

29Unless required by applicable law or agreed to in writing, software 

30distributed under the License is distributed on an "AS IS" BASIS, 

31WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

32See the License for the specific language governing permissions and 

33limitations under the License. 

34""" 

35 

36__all__ = ["WebSocketApp"] 

37 

38RECONNECT = 0 

39 

40 

41def setReconnect(reconnectInterval: int) -> None: 

42 global RECONNECT 

43 RECONNECT = reconnectInterval 

44 

45 

46class DispatcherBase: 

47 """ 

48 DispatcherBase 

49 """ 

50 def __init__(self, app: Any, ping_timeout: float) -> None: 

51 self.app = app 

52 self.ping_timeout = ping_timeout 

53 

54 def timeout(self, seconds: int, callback: Callable) -> None: 

55 time.sleep(seconds) 

56 callback() 

57 

58 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

59 try: 

60 _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format( 

61 seconds_count=seconds, frame_count=len(inspect.stack()))) 

62 time.sleep(seconds) 

63 reconnector(reconnecting=True) 

64 except KeyboardInterrupt as e: 

65 _logging.info("User exited {err}".format(err=e)) 

66 raise e 

67 

68 

69class Dispatcher(DispatcherBase): 

70 """ 

71 Dispatcher 

72 """ 

73 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: 

74 sel = selectors.DefaultSelector() 

75 sel.register(self.app.sock.sock, selectors.EVENT_READ) 

76 try: 

77 while self.app.keep_running: 

78 r = sel.select(self.ping_timeout) 

79 if r: 

80 if not read_callback(): 

81 break 

82 check_callback() 

83 finally: 

84 sel.close() 

85 

86 

87class SSLDispatcher(DispatcherBase): 

88 """ 

89 SSLDispatcher 

90 """ 

91 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: 

92 sock = self.app.sock.sock 

93 sel = selectors.DefaultSelector() 

94 sel.register(sock, selectors.EVENT_READ) 

95 try: 

96 while self.app.keep_running: 

97 r = self.select(sock, sel) 

98 if r: 

99 if not read_callback(): 

100 break 

101 check_callback() 

102 finally: 

103 sel.close() 

104 

105 def select(self, sock, sel:selectors.DefaultSelector): 

106 sock = self.app.sock.sock 

107 if sock.pending(): 

108 return [sock,] 

109 

110 r = sel.select(self.ping_timeout) 

111 

112 if len(r) > 0: 

113 return r[0][0] 

114 

115 

116class WrappedDispatcher: 

117 """ 

118 WrappedDispatcher 

119 """ 

120 def __init__(self, app, ping_timeout: float, dispatcher: Dispatcher) -> None: 

121 self.app = app 

122 self.ping_timeout = ping_timeout 

123 self.dispatcher = dispatcher 

124 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt 

125 

126 def read(self, sock: socket.socket, read_callback: Callable, check_callback: Callable) -> None: 

127 self.dispatcher.read(sock, read_callback) 

128 self.ping_timeout and self.timeout(self.ping_timeout, check_callback) 

129 

130 def timeout(self, seconds: int, callback: Callable) -> None: 

131 self.dispatcher.timeout(seconds, callback) 

132 

133 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

134 self.timeout(seconds, reconnector) 

135 

136 

137class WebSocketApp: 

138 """ 

139 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

140 """ 

141 

142 def __init__(self, url: str, header: Union[list, dict, Callable] = None, 

143 on_open: Callable = None, on_message: Callable = None, on_error: Callable = None, 

144 on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None, 

145 on_cont_message: Callable = None, 

146 keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None, 

147 subprotocols: list = None, 

148 on_data: Callable = None, 

149 socket: socket.socket = None) -> None: 

150 """ 

151 WebSocketApp initialization 

152 

153 Parameters 

154 ---------- 

155 url: str 

156 Websocket url. 

157 header: list or dict or Callable 

158 Custom header for websocket handshake. 

159 If the parameter is a callable object, it is called just before the connection attempt. 

160 The returned dict or list is used as custom header value. 

161 This could be useful in order to properly setup timestamp dependent headers. 

162 on_open: function 

163 Callback object which is called at opening websocket. 

164 on_open has one argument. 

165 The 1st argument is this class object. 

166 on_message: function 

167 Callback object which is called when received data. 

168 on_message has 2 arguments. 

169 The 1st argument is this class object. 

170 The 2nd argument is utf-8 data received from the server. 

171 on_error: function 

172 Callback object which is called when we get error. 

173 on_error has 2 arguments. 

174 The 1st argument is this class object. 

175 The 2nd argument is exception object. 

176 on_close: function 

177 Callback object which is called when connection is closed. 

178 on_close has 3 arguments. 

179 The 1st argument is this class object. 

180 The 2nd argument is close_status_code. 

181 The 3rd argument is close_msg. 

182 on_cont_message: function 

183 Callback object which is called when a continuation 

184 frame is received. 

185 on_cont_message has 3 arguments. 

186 The 1st argument is this class object. 

187 The 2nd argument is utf-8 string which we get from the server. 

188 The 3rd argument is continue flag. if 0, the data continue 

189 to next frame data 

190 on_data: function 

191 Callback object which is called when a message received. 

192 This is called before on_message or on_cont_message, 

193 and then on_message or on_cont_message is called. 

194 on_data has 4 argument. 

195 The 1st argument is this class object. 

196 The 2nd argument is utf-8 string which we get from the server. 

197 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

198 The 4th argument is continue flag. If 0, the data continue 

199 keep_running: bool 

200 This parameter is obsolete and ignored. 

201 get_mask_key: function 

202 A callable function to get new mask keys, see the 

203 WebSocket.set_mask_key's docstring for more information. 

204 cookie: str 

205 Cookie value. 

206 subprotocols: list 

207 List of available sub protocols. Default is None. 

208 socket: socket 

209 Pre-initialized stream socket. 

210 """ 

211 self.url = url 

212 self.header = header if header is not None else [] 

213 self.cookie = cookie 

214 

215 self.on_open = on_open 

216 self.on_message = on_message 

217 self.on_data = on_data 

218 self.on_error = on_error 

219 self.on_close = on_close 

220 self.on_ping = on_ping 

221 self.on_pong = on_pong 

222 self.on_cont_message = on_cont_message 

223 self.keep_running = False 

224 self.get_mask_key = get_mask_key 

225 self.sock = None 

226 self.last_ping_tm = 0 

227 self.last_pong_tm = 0 

228 self.ping_thread = None 

229 self.stop_ping = None 

230 self.ping_interval = 0 

231 self.ping_timeout = None 

232 self.ping_payload = "" 

233 self.subprotocols = subprotocols 

234 self.prepared_socket = socket 

235 self.has_errored = False 

236 self.has_done_teardown = False 

237 self.has_done_teardown_lock = threading.Lock() 

238 

239 def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None: 

240 """ 

241 send message 

242 

243 Parameters 

244 ---------- 

245 data: str 

246 Message to send. If you set opcode to OPCODE_TEXT, 

247 data must be utf-8 string or unicode. 

248 opcode: int 

249 Operation code of data. Default is OPCODE_TEXT. 

250 """ 

251 

252 if not self.sock or self.sock.send(data, opcode) == 0: 

253 raise WebSocketConnectionClosedException( 

254 "Connection is already closed.") 

255 

256 def close(self, **kwargs) -> None: 

257 """ 

258 Close websocket connection. 

259 """ 

260 self.keep_running = False 

261 if self.sock: 

262 self.sock.close(**kwargs) 

263 self.sock = None 

264 

265 def _start_ping_thread(self) -> None: 

266 self.last_ping_tm = self.last_pong_tm = 0 

267 self.stop_ping = threading.Event() 

268 self.ping_thread = threading.Thread(target=self._send_ping) 

269 self.ping_thread.daemon = True 

270 self.ping_thread.start() 

271 

272 def _stop_ping_thread(self) -> None: 

273 if self.stop_ping: 

274 self.stop_ping.set() 

275 if self.ping_thread and self.ping_thread.is_alive(): 

276 self.ping_thread.join(3) 

277 self.last_ping_tm = self.last_pong_tm = 0 

278 

279 def _send_ping(self) -> None: 

280 if self.stop_ping.wait(self.ping_interval) or self.keep_running is False: 

281 return 

282 while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True: 

283 if self.sock: 

284 self.last_ping_tm = time.time() 

285 try: 

286 _logging.debug("Sending ping") 

287 self.sock.ping(self.ping_payload) 

288 except Exception as e: 

289 _logging.debug("Failed to send ping: {err}".format(err=e)) 

290 

291 def run_forever(self, sockopt: tuple = None, sslopt: dict = None, 

292 ping_interval: float = 0, ping_timeout: Optional[float] = None, 

293 ping_payload: str = "", 

294 http_proxy_host: str = None, http_proxy_port: Union[int, str] = None, 

295 http_no_proxy: list = None, http_proxy_auth: tuple = None, 

296 http_proxy_timeout: float = None, 

297 skip_utf8_validation: bool = False, 

298 host: str = None, origin: str = None, dispatcher: Dispatcher = None, 

299 suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool: 

300 """ 

301 Run event loop for WebSocket framework. 

302 

303 This loop is an infinite loop and is alive while websocket is available. 

304 

305 Parameters 

306 ---------- 

307 sockopt: tuple 

308 Values for socket.setsockopt. 

309 sockopt must be tuple 

310 and each element is argument of sock.setsockopt. 

311 sslopt: dict 

312 Optional dict object for ssl socket option. 

313 ping_interval: int or float 

314 Automatically send "ping" command 

315 every specified period (in seconds). 

316 If set to 0, no ping is sent periodically. 

317 ping_timeout: int or float 

318 Timeout (in seconds) if the pong message is not received. 

319 ping_payload: str 

320 Payload message to send with each ping. 

321 http_proxy_host: str 

322 HTTP proxy host name. 

323 http_proxy_port: int or str 

324 HTTP proxy port. If not set, set to 80. 

325 http_no_proxy: list 

326 Whitelisted host names that don't use the proxy. 

327 http_proxy_timeout: int or float 

328 HTTP proxy timeout, default is 60 sec as per python-socks. 

329 http_proxy_auth: tuple 

330 HTTP proxy auth information. tuple of username and password. Default is None. 

331 skip_utf8_validation: bool 

332 skip utf8 validation. 

333 host: str 

334 update host header. 

335 origin: str 

336 update origin header. 

337 dispatcher: Dispatcher object 

338 customize reading data from socket. 

339 suppress_origin: bool 

340 suppress outputting origin header. 

341 proxy_type: str 

342 type of proxy from: http, socks4, socks4a, socks5, socks5h 

343 reconnect: int 

344 delay interval when reconnecting 

345 

346 Returns 

347 ------- 

348 teardown: bool 

349 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

350 True if any other exception was raised during a loop. 

351 """ 

352 

353 if reconnect is None: 

354 reconnect = RECONNECT 

355 

356 if ping_timeout is not None and ping_timeout <= 0: 

357 raise WebSocketException("Ensure ping_timeout > 0") 

358 if ping_interval is not None and ping_interval < 0: 

359 raise WebSocketException("Ensure ping_interval >= 0") 

360 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

361 raise WebSocketException("Ensure ping_interval > ping_timeout") 

362 if not sockopt: 

363 sockopt = [] 

364 if not sslopt: 

365 sslopt = {} 

366 if self.sock: 

367 raise WebSocketException("socket is already opened") 

368 

369 self.ping_interval = ping_interval 

370 self.ping_timeout = ping_timeout 

371 self.ping_payload = ping_payload 

372 self.keep_running = True 

373 

374 def teardown(close_frame: ABNF = None): 

375 """ 

376 Tears down the connection. 

377 

378 Parameters 

379 ---------- 

380 close_frame: ABNF frame 

381 If close_frame is set, the on_close handler is invoked 

382 with the statusCode and reason from the provided frame. 

383 """ 

384 

385 # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired. 

386 # To ensure the work is only done once, we use this bool and lock. 

387 with self.has_done_teardown_lock: 

388 if self.has_done_teardown: 

389 return 

390 self.has_done_teardown = True 

391 

392 self._stop_ping_thread() 

393 self.keep_running = False 

394 if self.sock: 

395 self.sock.close() 

396 close_status_code, close_reason = self._get_close_args( 

397 close_frame if close_frame else None) 

398 self.sock = None 

399 

400 # Finally call the callback AFTER all teardown is complete 

401 self._callback(self.on_close, close_status_code, close_reason) 

402 

403 def setSock(reconnecting: bool = False) -> None: 

404 if reconnecting and self.sock: 

405 self.sock.shutdown() 

406 

407 self.sock = WebSocket( 

408 self.get_mask_key, sockopt=sockopt, sslopt=sslopt, 

409 fire_cont_frame=self.on_cont_message is not None, 

410 skip_utf8_validation=skip_utf8_validation, 

411 enable_multithread=True) 

412 

413 self.sock.settimeout(getdefaulttimeout()) 

414 try: 

415 

416 header = self.header() if callable(self.header) else self.header 

417 

418 self.sock.connect( 

419 self.url, header=header, cookie=self.cookie, 

420 http_proxy_host=http_proxy_host, 

421 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, 

422 http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout, 

423 subprotocols=self.subprotocols, 

424 host=host, origin=origin, suppress_origin=suppress_origin, 

425 proxy_type=proxy_type, socket=self.prepared_socket) 

426 

427 _logging.info("Websocket connected") 

428 

429 if self.ping_interval: 

430 self._start_ping_thread() 

431 

432 self._callback(self.on_open) 

433 

434 dispatcher.read(self.sock.sock, read, check) 

435 except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e: 

436 handleDisconnect(e, reconnecting) 

437 

438 def read() -> bool: 

439 if not self.keep_running: 

440 return teardown() 

441 

442 try: 

443 op_code, frame = self.sock.recv_data_frame(True) 

444 except (WebSocketConnectionClosedException, KeyboardInterrupt) as e: 

445 if custom_dispatcher: 

446 return handleDisconnect(e) 

447 else: 

448 raise e 

449 

450 if op_code == ABNF.OPCODE_CLOSE: 

451 return teardown(frame) 

452 elif op_code == ABNF.OPCODE_PING: 

453 self._callback(self.on_ping, frame.data) 

454 elif op_code == ABNF.OPCODE_PONG: 

455 self.last_pong_tm = time.time() 

456 self._callback(self.on_pong, frame.data) 

457 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

458 self._callback(self.on_data, frame.data, 

459 frame.opcode, frame.fin) 

460 self._callback(self.on_cont_message, 

461 frame.data, frame.fin) 

462 else: 

463 data = frame.data 

464 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

465 data = data.decode("utf-8") 

466 self._callback(self.on_data, data, frame.opcode, True) 

467 self._callback(self.on_message, data) 

468 

469 return True 

470 

471 def check() -> bool: 

472 if (self.ping_timeout): 

473 has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout 

474 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 

475 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

476 

477 if (self.last_ping_tm and 

478 has_timeout_expired and 

479 (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): 

480 raise WebSocketTimeoutException("ping/pong timed out") 

481 return True 

482 

483 def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool: 

484 self.has_errored = True 

485 self._stop_ping_thread() 

486 if not reconnecting: 

487 self._callback(self.on_error, e) 

488 

489 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

490 teardown() 

491 # Propagate further 

492 raise 

493 

494 if reconnect: 

495 _logging.info("{err} - reconnect".format(err=e)) 

496 if custom_dispatcher: 

497 _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) 

498 dispatcher.reconnect(reconnect, setSock) 

499 else: 

500 _logging.error("{err} - goodbye".format(err=e)) 

501 teardown() 

502 

503 custom_dispatcher = bool(dispatcher) 

504 dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3]) 

505 

506 try: 

507 setSock() 

508 if not custom_dispatcher and reconnect: 

509 while self.keep_running: 

510 _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) 

511 dispatcher.reconnect(reconnect, setSock) 

512 except (KeyboardInterrupt, Exception) as e: 

513 _logging.info("tearing down on exception {err}".format(err=e)) 

514 teardown() 

515 finally: 

516 if not custom_dispatcher: 

517 # Ensure teardown was called before returning from run_forever 

518 teardown() 

519 

520 return self.has_errored 

521 

522 def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase: 

523 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

524 return WrappedDispatcher(self, ping_timeout, dispatcher) 

525 timeout = ping_timeout or 10 

526 if is_ssl: 

527 return SSLDispatcher(self, timeout) 

528 

529 return Dispatcher(self, timeout) 

530 

531 def _get_close_args(self, close_frame: ABNF) -> list: 

532 """ 

533 _get_close_args extracts the close code and reason from the close body 

534 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

535 """ 

536 # Need to catch the case where close_frame is None 

537 # Otherwise the following if statement causes an error 

538 if not self.on_close or not close_frame: 

539 return [None, None] 

540 

541 # Extract close frame status code 

542 if close_frame.data and len(close_frame.data) >= 2: 

543 close_status_code = 256 * close_frame.data[0] + close_frame.data[1] 

544 reason = close_frame.data[2:].decode('utf-8') 

545 return [close_status_code, reason] 

546 else: 

547 # Most likely reached this because len(close_frame_data.data) < 2 

548 return [None, None] 

549 

550 def _callback(self, callback, *args) -> None: 

551 if callback: 

552 try: 

553 callback(self, *args) 

554 

555 except Exception as e: 

556 _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e)) 

557 if self.on_error: 

558 self.on_error(self, e)