Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_dispatcher.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1import time 

2import socket 

3import inspect 

4import selectors 

5from typing import Any, Callable, Optional, Union 

6from . import _logging 

7from ._socket import send 

8 

9 

10class DispatcherBase: 

11 """ 

12 DispatcherBase 

13 """ 

14 

15 def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None: 

16 self.app = app 

17 self.ping_timeout = ping_timeout 

18 

19 def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None: 

20 time.sleep(seconds) 

21 callback() 

22 

23 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

24 try: 

25 _logging.info( 

26 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]" 

27 ) 

28 time.sleep(seconds) 

29 reconnector(reconnecting=True) 

30 except KeyboardInterrupt as e: 

31 _logging.info(f"User exited {e}") 

32 raise e 

33 

34 def send(self, sock: socket.socket, data: Union[str, bytes]) -> None: 

35 return send(sock, data) 

36 

37 

38class Dispatcher(DispatcherBase): 

39 """ 

40 Dispatcher 

41 """ 

42 

43 def read( 

44 self, 

45 sock: socket.socket, 

46 read_callback: Callable, 

47 check_callback: Callable, 

48 ) -> None: 

49 sel = selectors.DefaultSelector() 

50 sel.register(self.app.sock.sock, selectors.EVENT_READ) 

51 try: 

52 while self.app.keep_running: 

53 if sel.select(self.ping_timeout): 

54 if not read_callback(): 

55 break 

56 check_callback() 

57 finally: 

58 sel.close() 

59 

60 

61class SSLDispatcher(DispatcherBase): 

62 """ 

63 SSLDispatcher 

64 """ 

65 

66 def read( 

67 self, 

68 sock: socket.socket, 

69 read_callback: Callable, 

70 check_callback: Callable, 

71 ) -> None: 

72 sock = self.app.sock.sock 

73 sel = selectors.DefaultSelector() 

74 sel.register(sock, selectors.EVENT_READ) 

75 try: 

76 while self.app.keep_running: 

77 if self.select(sock, sel): 

78 if not read_callback(): 

79 break 

80 check_callback() 

81 finally: 

82 sel.close() 

83 

84 def select(self, sock, sel: selectors.DefaultSelector): 

85 sock = self.app.sock.sock 

86 if sock.pending(): 

87 return [ 

88 sock, 

89 ] 

90 

91 r = sel.select(self.ping_timeout) 

92 

93 if len(r) > 0: 

94 return r[0][0] 

95 

96 

97class WrappedDispatcher: 

98 """ 

99 WrappedDispatcher 

100 """ 

101 

102 def __init__( 

103 self, app, ping_timeout: Union[float, int, None], dispatcher, handleDisconnect 

104 ) -> None: 

105 self.app = app 

106 self.ping_timeout = ping_timeout 

107 self.dispatcher = dispatcher 

108 self.handleDisconnect = handleDisconnect 

109 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt 

110 

111 def read( 

112 self, 

113 sock: socket.socket, 

114 read_callback: Callable, 

115 check_callback: Callable, 

116 ) -> None: 

117 self.dispatcher.read(sock, read_callback) 

118 self.ping_timeout and self.timeout(self.ping_timeout, check_callback) 

119 

120 def send(self, sock: socket.socket, data: Union[str, bytes]) -> None: 

121 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect) 

122 return len(data) 

123 

124 def timeout(self, seconds: float, callback: Callable, *args) -> None: 

125 self.dispatcher.timeout(seconds, callback, *args) 

126 

127 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

128 self.timeout(seconds, reconnector, True)