Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_dispatcher.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

83 statements  

1import time 

2import socket 

3import inspect 

4import selectors 

5from typing import TYPE_CHECKING, Callable, Optional, Union 

6 

7if TYPE_CHECKING: 

8 from ._app import WebSocketApp 

9from . import _logging 

10from ._socket import send 

11 

12""" 

13_dispatcher.py 

14websocket - WebSocket client library for Python 

15 

16Copyright 2025 engn33r 

17 

18Licensed under the Apache License, Version 2.0 (the "License"); 

19you may not use this file except in compliance with the License. 

20You may obtain a copy of the License at 

21 

22 http://www.apache.org/licenses/LICENSE-2.0 

23 

24Unless required by applicable law or agreed to in writing, software 

25distributed under the License is distributed on an "AS IS" BASIS, 

26WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

27See the License for the specific language governing permissions and 

28limitations under the License. 

29""" 

30 

31class DispatcherBase: 

32 """ 

33 DispatcherBase 

34 """ 

35 

36 def __init__( 

37 self, app: "WebSocketApp", ping_timeout: Optional[Union[float, int]] 

38 ) -> None: 

39 self.app = app 

40 self.ping_timeout = ping_timeout 

41 

42 def timeout(self, seconds: Optional[Union[float, int]], callback: Callable) -> None: 

43 if seconds is not None: 

44 time.sleep(seconds) 

45 callback() 

46 

47 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

48 try: 

49 _logging.info( 

50 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]" 

51 ) 

52 time.sleep(seconds) 

53 reconnector(reconnecting=True) 

54 except KeyboardInterrupt as e: 

55 _logging.info(f"User exited {e}") 

56 raise e 

57 

58 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int: 

59 return send(sock, data) 

60 

61 

62class Dispatcher(DispatcherBase): 

63 """ 

64 Dispatcher 

65 """ 

66 

67 def read( 

68 self, 

69 sock: socket.socket, 

70 read_callback: Callable, 

71 check_callback: Callable, 

72 ) -> None: 

73 if self.app.sock is None or self.app.sock.sock is None: 

74 return 

75 sel = selectors.DefaultSelector() 

76 sel.register(self.app.sock.sock, selectors.EVENT_READ) 

77 try: 

78 while self.app.keep_running: 

79 if sel.select(self.ping_timeout): 

80 if not read_callback(): 

81 break 

82 check_callback() 

83 finally: 

84 sel.close() 

85 

86 

87class SSLDispatcher(DispatcherBase): 

88 """ 

89 SSLDispatcher 

90 """ 

91 

92 def read( 

93 self, 

94 sock: socket.socket, 

95 read_callback: Callable, 

96 check_callback: Callable, 

97 ) -> None: 

98 if self.app.sock is None or self.app.sock.sock is None: 

99 return 

100 sock = self.app.sock.sock 

101 sel = selectors.DefaultSelector() 

102 sel.register(sock, selectors.EVENT_READ) 

103 try: 

104 while self.app.keep_running: 

105 if self.select(sock, sel): 

106 if not read_callback(): 

107 break 

108 check_callback() 

109 finally: 

110 sel.close() 

111 

112 def select(self, sock, sel: selectors.DefaultSelector): 

113 if self.app.sock is None: 

114 return None 

115 sock = self.app.sock.sock 

116 if sock.pending(): 

117 return [ 

118 sock, 

119 ] 

120 

121 r = sel.select(self.ping_timeout) 

122 

123 if len(r) > 0: 

124 return r[0][0] 

125 return None 

126 

127 

128class WrappedDispatcher: 

129 """ 

130 WrappedDispatcher 

131 """ 

132 

133 def __init__( 

134 self, 

135 app: "WebSocketApp", 

136 ping_timeout: Optional[Union[float, int]], 

137 dispatcher, 

138 handleDisconnect, 

139 ) -> None: 

140 self.app = app 

141 self.ping_timeout = ping_timeout 

142 self.dispatcher = dispatcher 

143 self.handleDisconnect = handleDisconnect 

144 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt 

145 

146 def read( 

147 self, 

148 sock: socket.socket, 

149 read_callback: Callable, 

150 check_callback: Callable, 

151 ) -> None: 

152 self.dispatcher.read(sock, read_callback) 

153 if self.ping_timeout: 

154 self.timeout(self.ping_timeout, check_callback) 

155 

156 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int: 

157 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect) 

158 return len(data) 

159 

160 def timeout(self, seconds: float, callback: Callable, *args) -> None: 

161 self.dispatcher.timeout(seconds, callback, *args) 

162 

163 def reconnect(self, seconds: int, reconnector: Callable) -> None: 

164 self.timeout(seconds, reconnector, True)