Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_dispatcher.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1import time 

2import socket 

3import inspect 

4import selectors 

5from typing import Any, TYPE_CHECKING, Callable, Optional, Union 

6 

7if TYPE_CHECKING: 

8 from ._app import WebSocketApp 

9from ._logging import info 

10from ._socket import send 

11 

12""" 

13_dispatcher.py 

14websocket - WebSocket client library for Python 

15 

16Copyright 2025 engn33r 

17 

18Licensed under the Apache License, Version 2.0 (the "License"); 

19you may not use this file except in compliance with the License. 

20You may obtain a copy of the License at 

21 

22 http://www.apache.org/licenses/LICENSE-2.0 

23 

24Unless required by applicable law or agreed to in writing, software 

25distributed under the License is distributed on an "AS IS" BASIS, 

26WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

27See the License for the specific language governing permissions and 

28limitations under the License. 

29""" 

30 

31 

32class DispatcherBase: 

33 """ 

34 DispatcherBase 

35 """ 

36 

37 def __init__( 

38 self, app: "WebSocketApp", ping_timeout: Optional[Union[float, int]] 

39 ) -> None: 

40 self.app = app 

41 self.ping_timeout = ping_timeout 

42 

43 def timeout(self, seconds: Optional[Union[float, int]], callback: Callable) -> None: 

44 if seconds is not None: 

45 time.sleep(seconds) 

46 callback() 

47 

48 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

49 try: 

50 info( 

51 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]" 

52 ) 

53 time.sleep(seconds) 

54 reconnector(reconnecting=True) 

55 except KeyboardInterrupt as e: 

56 info(f"User exited {e}") 

57 raise e 

58 

59 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int: 

60 return send(sock, data) 

61 

62 

63class Dispatcher(DispatcherBase): 

64 """ 

65 Dispatcher 

66 """ 

67 

68 def read( 

69 self, 

70 sock: socket.socket, 

71 read_callback: Callable, 

72 check_callback: Callable, 

73 ) -> None: 

74 if self.app.sock is None or self.app.sock.sock is None: 

75 return 

76 sel = selectors.DefaultSelector() 

77 sel.register(self.app.sock.sock, selectors.EVENT_READ) 

78 try: 

79 while self.app.keep_running: 

80 if sel.select(self.ping_timeout): 

81 if not read_callback(): 

82 break 

83 check_callback() 

84 finally: 

85 sel.close() 

86 

87 

88class SSLDispatcher(DispatcherBase): 

89 """ 

90 SSLDispatcher 

91 """ 

92 

93 def read( 

94 self, 

95 sock: socket.socket, 

96 read_callback: Callable, 

97 check_callback: Callable, 

98 ) -> None: 

99 if self.app.sock is None or self.app.sock.sock is None: 

100 return 

101 sock = self.app.sock.sock 

102 sel = selectors.DefaultSelector() 

103 sel.register(sock, selectors.EVENT_READ) 

104 try: 

105 while self.app.keep_running: 

106 if self.select(sock, sel): 

107 if not read_callback(): 

108 break 

109 check_callback() 

110 finally: 

111 sel.close() 

112 

113 def select(self, sock: Any, sel: selectors.DefaultSelector) -> Any: 

114 if self.app.sock is None: 

115 return None 

116 sock = self.app.sock.sock 

117 if sock.pending(): 

118 return [ 

119 sock, 

120 ] 

121 

122 r = sel.select(self.ping_timeout) 

123 

124 if len(r) > 0: 

125 return r[0][0] 

126 return None 

127 

128 

129class WrappedDispatcher: 

130 """ 

131 WrappedDispatcher 

132 """ 

133 

134 def __init__( 

135 self, 

136 app: "WebSocketApp", 

137 ping_timeout: Optional[Union[float, int]], 

138 dispatcher: Any, 

139 handleDisconnect: Any, 

140 ) -> None: 

141 self.app = app 

142 self.ping_timeout = ping_timeout 

143 self.dispatcher = dispatcher 

144 self.handleDisconnect = handleDisconnect 

145 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt 

146 

147 def read( 

148 self, 

149 sock: socket.socket, 

150 read_callback: Callable, 

151 check_callback: Callable, 

152 ) -> None: 

153 self.dispatcher.read(sock, read_callback) 

154 if self.ping_timeout: 

155 self.timeout(self.ping_timeout, check_callback) 

156 

157 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int: 

158 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect) 

159 return len(data) 

160 

161 def timeout(self, seconds: float, callback: Callable, *args: Any) -> None: 

162 self.dispatcher.timeout(seconds, callback, *args) 

163 

164 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

165 self.timeout(seconds, reconnector, True)