Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_app.py: 16%

250 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import inspect 

2import selectors 

3import sys 

4import threading 

5import time 

6import traceback 

7import socket 

8 

9from typing import Callable, Any 

10 

11from . import _logging 

12from ._abnf import ABNF 

13from ._url import parse_url 

14from ._core import WebSocket, getdefaulttimeout 

15from ._exceptions import * 

16 

17""" 

18_app.py 

19websocket - WebSocket client library for Python 

20 

21Copyright 2022 engn33r 

22 

23Licensed under the Apache License, Version 2.0 (the "License"); 

24you may not use this file except in compliance with the License. 

25You may obtain a copy of the License at 

26 

27 http://www.apache.org/licenses/LICENSE-2.0 

28 

29Unless required by applicable law or agreed to in writing, software 

30distributed under the License is distributed on an "AS IS" BASIS, 

31WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

32See the License for the specific language governing permissions and 

33limitations under the License. 

34""" 

35 

36__all__ = ["WebSocketApp"] 

37 

38RECONNECT = 0 

39 

40 

41def setReconnect(reconnectInterval: int) -> None: 

42 global RECONNECT 

43 RECONNECT = reconnectInterval 

44 

45 

46class DispatcherBase: 

47 """ 

48 DispatcherBase 

49 """ 

50 def __init__(self, app: Any, ping_timeout: int or float) -> None: 

51 self.app = app 

52 self.ping_timeout = ping_timeout 

53 

54 def timeout(self, seconds: int, callback: Callable) -> None: 

55 time.sleep(seconds) 

56 callback() 

57 

58 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

59 try: 

60 _logging.info("reconnect() - retrying in {seconds_count} seconds [{frame_count} frames in stack]".format( 

61 seconds_count=seconds, frame_count=len(inspect.stack()))) 

62 time.sleep(seconds) 

63 reconnector(reconnecting=True) 

64 except KeyboardInterrupt as e: 

65 _logging.info("User exited {err}".format(err=e)) 

66 

67 

68class Dispatcher(DispatcherBase): 

69 """ 

70 Dispatcher 

71 """ 

72 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None: 

73 while self.app.keep_running: 

74 sel = selectors.DefaultSelector() 

75 sel.register(self.app.sock.sock, selectors.EVENT_READ) 

76 

77 r = sel.select(self.ping_timeout) 

78 if r: 

79 if not read_callback(): 

80 break 

81 check_callback() 

82 sel.close() 

83 

84 

85class SSLDispatcher(DispatcherBase): 

86 """ 

87 SSLDispatcher 

88 """ 

89 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None: 

90 while self.app.keep_running: 

91 r = self.select() 

92 if r: 

93 if not read_callback(): 

94 break 

95 check_callback() 

96 

97 def select(self) -> list: 

98 sock = self.app.sock.sock 

99 if sock.pending(): 

100 return [sock,] 

101 

102 sel = selectors.DefaultSelector() 

103 sel.register(sock, selectors.EVENT_READ) 

104 

105 r = sel.select(self.ping_timeout) 

106 sel.close() 

107 

108 if len(r) > 0: 

109 return r[0][0] 

110 

111 

112class WrappedDispatcher: 

113 """ 

114 WrappedDispatcher 

115 """ 

116 def __init__(self, app, ping_timeout: int or float, dispatcher: Dispatcher) -> None: 

117 self.app = app 

118 self.ping_timeout = ping_timeout 

119 self.dispatcher = dispatcher 

120 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt 

121 

122 def read(self, sock: socket, read_callback: Callable, check_callback: Callable) -> None: 

123 self.dispatcher.read(sock, read_callback) 

124 self.ping_timeout and self.timeout(self.ping_timeout, check_callback) 

125 

126 def timeout(self, seconds: int, callback: Callable) -> None: 

127 self.dispatcher.timeout(seconds, callback) 

128 

129 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

130 self.timeout(seconds, reconnector) 

131 

132 

133class WebSocketApp: 

134 """ 

135 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 

136 """ 

137 

138 def __init__(self, url: str, header: list or dict = None, 

139 on_open: Callable = None, on_message: Callable = None, on_error: Callable = None, 

140 on_close: Callable = None, on_ping: Callable = None, on_pong: Callable = None, 

141 on_cont_message: Callable = None, 

142 keep_running: bool = True, get_mask_key: Callable = None, cookie: str = None, 

143 subprotocols: list = None, 

144 on_data: Callable = None, 

145 socket: socket = None) -> None: 

146 """ 

147 WebSocketApp initialization 

148 

149 Parameters 

150 ---------- 

151 url: str 

152 Websocket url. 

153 header: list or dict 

154 Custom header for websocket handshake. 

155 on_open: function 

156 Callback object which is called at opening websocket. 

157 on_open has one argument. 

158 The 1st argument is this class object. 

159 on_message: function 

160 Callback object which is called when received data. 

161 on_message has 2 arguments. 

162 The 1st argument is this class object. 

163 The 2nd argument is utf-8 data received from the server. 

164 on_error: function 

165 Callback object which is called when we get error. 

166 on_error has 2 arguments. 

167 The 1st argument is this class object. 

168 The 2nd argument is exception object. 

169 on_close: function 

170 Callback object which is called when connection is closed. 

171 on_close has 3 arguments. 

172 The 1st argument is this class object. 

173 The 2nd argument is close_status_code. 

174 The 3rd argument is close_msg. 

175 on_cont_message: function 

176 Callback object which is called when a continuation 

177 frame is received. 

178 on_cont_message has 3 arguments. 

179 The 1st argument is this class object. 

180 The 2nd argument is utf-8 string which we get from the server. 

181 The 3rd argument is continue flag. if 0, the data continue 

182 to next frame data 

183 on_data: function 

184 Callback object which is called when a message received. 

185 This is called before on_message or on_cont_message, 

186 and then on_message or on_cont_message is called. 

187 on_data has 4 argument. 

188 The 1st argument is this class object. 

189 The 2nd argument is utf-8 string which we get from the server. 

190 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 

191 The 4th argument is continue flag. If 0, the data continue 

192 keep_running: bool 

193 This parameter is obsolete and ignored. 

194 get_mask_key: function 

195 A callable function to get new mask keys, see the 

196 WebSocket.set_mask_key's docstring for more information. 

197 cookie: str 

198 Cookie value. 

199 subprotocols: list 

200 List of available sub protocols. Default is None. 

201 socket: socket 

202 Pre-initialized stream socket. 

203 """ 

204 self.url = url 

205 self.header = header if header is not None else [] 

206 self.cookie = cookie 

207 

208 self.on_open = on_open 

209 self.on_message = on_message 

210 self.on_data = on_data 

211 self.on_error = on_error 

212 self.on_close = on_close 

213 self.on_ping = on_ping 

214 self.on_pong = on_pong 

215 self.on_cont_message = on_cont_message 

216 self.keep_running = False 

217 self.get_mask_key = get_mask_key 

218 self.sock = None 

219 self.last_ping_tm = 0 

220 self.last_pong_tm = 0 

221 self.ping_thread = None 

222 self.stop_ping = None 

223 self.ping_interval = 0 

224 self.ping_timeout = None 

225 self.ping_payload = "" 

226 self.subprotocols = subprotocols 

227 self.prepared_socket = socket 

228 self.has_errored = False 

229 

230 def send(self, data: str, opcode: int = ABNF.OPCODE_TEXT) -> None: 

231 """ 

232 send message 

233 

234 Parameters 

235 ---------- 

236 data: str 

237 Message to send. If you set opcode to OPCODE_TEXT, 

238 data must be utf-8 string or unicode. 

239 opcode: int 

240 Operation code of data. Default is OPCODE_TEXT. 

241 """ 

242 

243 if not self.sock or self.sock.send(data, opcode) == 0: 

244 raise WebSocketConnectionClosedException( 

245 "Connection is already closed.") 

246 

247 def close(self, **kwargs) -> None: 

248 """ 

249 Close websocket connection. 

250 """ 

251 self.keep_running = False 

252 if self.sock: 

253 self.sock.close(**kwargs) 

254 self.sock = None 

255 

256 def _start_ping_thread(self) -> None: 

257 self.last_ping_tm = self.last_pong_tm = 0 

258 self.stop_ping = threading.Event() 

259 self.ping_thread = threading.Thread(target=self._send_ping) 

260 self.ping_thread.daemon = True 

261 self.ping_thread.start() 

262 

263 def _stop_ping_thread(self) -> None: 

264 if self.stop_ping: 

265 self.stop_ping.set() 

266 if self.ping_thread and self.ping_thread.is_alive(): 

267 self.ping_thread.join(3) 

268 self.last_ping_tm = self.last_pong_tm = 0 

269 

270 def _send_ping(self) -> None: 

271 if self.stop_ping.wait(self.ping_interval): 

272 return 

273 while not self.stop_ping.wait(self.ping_interval): 

274 if self.sock: 

275 self.last_ping_tm = time.time() 

276 try: 

277 _logging.debug("Sending ping") 

278 self.sock.ping(self.ping_payload) 

279 except Exception as e: 

280 _logging.debug("Failed to send ping: {err}".format(err=e)) 

281 

282 def run_forever(self, sockopt: tuple = None, sslopt: dict = None, 

283 ping_interval: int or float = 0, ping_timeout: int or float = None, 

284 ping_payload: str = "", 

285 http_proxy_host: str = None, http_proxy_port: int or str = None, 

286 http_no_proxy: list = None, http_proxy_auth: tuple = None, 

287 http_proxy_timeout: int or float = None, 

288 skip_utf8_validation: bool = False, 

289 host: str = None, origin: str = None, dispatcher: Dispatcher = None, 

290 suppress_origin: bool = False, proxy_type: str = None, reconnect: int = None) -> bool: 

291 """ 

292 Run event loop for WebSocket framework. 

293 

294 This loop is an infinite loop and is alive while websocket is available. 

295 

296 Parameters 

297 ---------- 

298 sockopt: tuple 

299 Values for socket.setsockopt. 

300 sockopt must be tuple 

301 and each element is argument of sock.setsockopt. 

302 sslopt: dict 

303 Optional dict object for ssl socket option. 

304 ping_interval: int or float 

305 Automatically send "ping" command 

306 every specified period (in seconds). 

307 If set to 0, no ping is sent periodically. 

308 ping_timeout: int or float 

309 Timeout (in seconds) if the pong message is not received. 

310 ping_payload: str 

311 Payload message to send with each ping. 

312 http_proxy_host: str 

313 HTTP proxy host name. 

314 http_proxy_port: int or str 

315 HTTP proxy port. If not set, set to 80. 

316 http_no_proxy: list 

317 Whitelisted host names that don't use the proxy. 

318 http_proxy_timeout: int or float 

319 HTTP proxy timeout, default is 60 sec as per python-socks. 

320 http_proxy_auth: tuple 

321 HTTP proxy auth information. tuple of username and password. Default is None. 

322 skip_utf8_validation: bool 

323 skip utf8 validation. 

324 host: str 

325 update host header. 

326 origin: str 

327 update origin header. 

328 dispatcher: Dispatcher object 

329 customize reading data from socket. 

330 suppress_origin: bool 

331 suppress outputting origin header. 

332 proxy_type: str 

333 type of proxy from: http, socks4, socks4a, socks5, socks5h 

334 reconnect: int 

335 delay interval when reconnecting 

336 

337 Returns 

338 ------- 

339 teardown: bool 

340 False if the `WebSocketApp` is closed or caught KeyboardInterrupt, 

341 True if any other exception was raised during a loop. 

342 """ 

343 

344 if reconnect is None: 

345 reconnect = RECONNECT 

346 

347 if ping_timeout is not None and ping_timeout <= 0: 

348 raise WebSocketException("Ensure ping_timeout > 0") 

349 if ping_interval is not None and ping_interval < 0: 

350 raise WebSocketException("Ensure ping_interval >= 0") 

351 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 

352 raise WebSocketException("Ensure ping_interval > ping_timeout") 

353 if not sockopt: 

354 sockopt = [] 

355 if not sslopt: 

356 sslopt = {} 

357 if self.sock: 

358 raise WebSocketException("socket is already opened") 

359 

360 self.ping_interval = ping_interval 

361 self.ping_timeout = ping_timeout 

362 self.ping_payload = ping_payload 

363 self.keep_running = True 

364 

365 def teardown(close_frame: ABNF = None) -> None: 

366 """ 

367 Tears down the connection. 

368 

369 Parameters 

370 ---------- 

371 close_frame: ABNF frame 

372 If close_frame is set, the on_close handler is invoked 

373 with the statusCode and reason from the provided frame. 

374 """ 

375 

376 self._stop_ping_thread() 

377 self.keep_running = False 

378 if self.sock: 

379 self.sock.close() 

380 close_status_code, close_reason = self._get_close_args( 

381 close_frame if close_frame else None) 

382 self.sock = None 

383 

384 # Finally call the callback AFTER all teardown is complete 

385 self._callback(self.on_close, close_status_code, close_reason) 

386 

387 def setSock(reconnecting: bool = False) -> None: 

388 if reconnecting and self.sock: 

389 self.sock.shutdown() 

390 

391 self.sock = WebSocket( 

392 self.get_mask_key, sockopt=sockopt, sslopt=sslopt, 

393 fire_cont_frame=self.on_cont_message is not None, 

394 skip_utf8_validation=skip_utf8_validation, 

395 enable_multithread=True) 

396 

397 self.sock.settimeout(getdefaulttimeout()) 

398 try: 

399 self.sock.connect( 

400 self.url, header=self.header, cookie=self.cookie, 

401 http_proxy_host=http_proxy_host, 

402 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, 

403 http_proxy_auth=http_proxy_auth, http_proxy_timeout=http_proxy_timeout, 

404 subprotocols=self.subprotocols, 

405 host=host, origin=origin, suppress_origin=suppress_origin, 

406 proxy_type=proxy_type, socket=self.prepared_socket) 

407 

408 _logging.info("Websocket connected") 

409 

410 if self.ping_interval: 

411 self._start_ping_thread() 

412 

413 self._callback(self.on_open) 

414 

415 dispatcher.read(self.sock.sock, read, check) 

416 except (WebSocketConnectionClosedException, ConnectionRefusedError, KeyboardInterrupt, SystemExit, Exception) as e: 

417 handleDisconnect(e, reconnecting) 

418 

419 def read() -> bool: 

420 if not self.keep_running: 

421 return teardown() 

422 

423 try: 

424 op_code, frame = self.sock.recv_data_frame(True) 

425 except (WebSocketConnectionClosedException, KeyboardInterrupt) as e: 

426 if custom_dispatcher: 

427 return handleDisconnect(e) 

428 else: 

429 raise e 

430 

431 if op_code == ABNF.OPCODE_CLOSE: 

432 return teardown(frame) 

433 elif op_code == ABNF.OPCODE_PING: 

434 self._callback(self.on_ping, frame.data) 

435 elif op_code == ABNF.OPCODE_PONG: 

436 self.last_pong_tm = time.time() 

437 self._callback(self.on_pong, frame.data) 

438 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 

439 self._callback(self.on_data, frame.data, 

440 frame.opcode, frame.fin) 

441 self._callback(self.on_cont_message, 

442 frame.data, frame.fin) 

443 else: 

444 data = frame.data 

445 if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation: 

446 data = data.decode("utf-8") 

447 self._callback(self.on_data, data, frame.opcode, True) 

448 self._callback(self.on_message, data) 

449 

450 return True 

451 

452 def check() -> bool: 

453 if (self.ping_timeout): 

454 has_timeout_expired = time.time() - self.last_ping_tm > self.ping_timeout 

455 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 

456 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > self.ping_timeout 

457 

458 if (self.last_ping_tm and 

459 has_timeout_expired and 

460 (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): 

461 raise WebSocketTimeoutException("ping/pong timed out") 

462 return True 

463 

464 def handleDisconnect(e: Exception, reconnecting: bool = False) -> bool: 

465 self.has_errored = True 

466 self._stop_ping_thread() 

467 if not reconnecting: 

468 self._callback(self.on_error, e) 

469 

470 if isinstance(e, (KeyboardInterrupt, SystemExit)): 

471 teardown() 

472 # Propagate further 

473 raise 

474 

475 if reconnect: 

476 _logging.info("{err} - reconnect".format(err=e)) 

477 if custom_dispatcher: 

478 _logging.debug("Calling custom dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) 

479 dispatcher.reconnect(reconnect, setSock) 

480 else: 

481 _logging.error("{err} - goodbye".format(err=e)) 

482 teardown() 

483 

484 custom_dispatcher = bool(dispatcher) 

485 dispatcher = self.create_dispatcher(ping_timeout, dispatcher, parse_url(self.url)[3]) 

486 

487 setSock() 

488 if not custom_dispatcher and reconnect: 

489 while self.keep_running: 

490 _logging.debug("Calling dispatcher reconnect [{frame_count} frames in stack]".format(frame_count=len(inspect.stack()))) 

491 dispatcher.reconnect(reconnect, setSock) 

492 

493 return self.has_errored 

494 

495 def create_dispatcher(self, ping_timeout: int, dispatcher: Dispatcher = None, is_ssl: bool = False) -> DispatcherBase: 

496 if dispatcher: # If custom dispatcher is set, use WrappedDispatcher 

497 return WrappedDispatcher(self, ping_timeout, dispatcher) 

498 timeout = ping_timeout or 10 

499 if is_ssl: 

500 return SSLDispatcher(self, timeout) 

501 

502 return Dispatcher(self, timeout) 

503 

504 def _get_close_args(self, close_frame: ABNF) -> list: 

505 """ 

506 _get_close_args extracts the close code and reason from the close body 

507 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 

508 """ 

509 # Need to catch the case where close_frame is None 

510 # Otherwise the following if statement causes an error 

511 if not self.on_close or not close_frame: 

512 return [None, None] 

513 

514 # Extract close frame status code 

515 if close_frame.data and len(close_frame.data) >= 2: 

516 close_status_code = 256 * close_frame.data[0] + close_frame.data[1] 

517 reason = close_frame.data[2:].decode('utf-8') 

518 return [close_status_code, reason] 

519 else: 

520 # Most likely reached this because len(close_frame_data.data) < 2 

521 return [None, None] 

522 

523 def _callback(self, callback: Callable, *args: tuple) -> None: 

524 if callback: 

525 try: 

526 callback(self, *args) 

527 

528 except Exception as e: 

529 _logging.error("error from callback {callback}: {err}".format(callback=callback, err=e)) 

530 if self.on_error: 

531 self.on_error(self, e)