1import time
2import socket
3import inspect
4import selectors
5from typing import Any, TYPE_CHECKING, Callable, Optional, Union
6
7if TYPE_CHECKING:
8 from ._app import WebSocketApp
9from ._logging import info
10from ._socket import send
11
12"""
13_dispatcher.py
14websocket - WebSocket client library for Python
15
16Copyright 2025 engn33r
17
18Licensed under the Apache License, Version 2.0 (the "License");
19you may not use this file except in compliance with the License.
20You may obtain a copy of the License at
21
22 http://www.apache.org/licenses/LICENSE-2.0
23
24Unless required by applicable law or agreed to in writing, software
25distributed under the License is distributed on an "AS IS" BASIS,
26WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27See the License for the specific language governing permissions and
28limitations under the License.
29"""
30
31
32class DispatcherBase:
33 """
34 DispatcherBase
35 """
36
37 def __init__(
38 self, app: "WebSocketApp", ping_timeout: Optional[Union[float, int]]
39 ) -> None:
40 self.app = app
41 self.ping_timeout = ping_timeout
42
43 def timeout(self, seconds: Optional[Union[float, int]], callback: Callable) -> None:
44 if seconds is not None:
45 time.sleep(seconds)
46 callback()
47
48 def reconnect(self, seconds: int, reconnector: Callable) -> None:
49 try:
50 info(
51 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
52 )
53 time.sleep(seconds)
54 reconnector(reconnecting=True)
55 except KeyboardInterrupt as e:
56 info(f"User exited {e}")
57 raise e
58
59 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
60 return send(sock, data)
61
62
63class Dispatcher(DispatcherBase):
64 """
65 Dispatcher
66 """
67
68 def read(
69 self,
70 sock: socket.socket,
71 read_callback: Callable,
72 check_callback: Callable,
73 ) -> None:
74 if self.app.sock is None or self.app.sock.sock is None:
75 return
76 sel = selectors.DefaultSelector()
77 sel.register(self.app.sock.sock, selectors.EVENT_READ)
78 try:
79 while self.app.keep_running:
80 if sel.select(self.ping_timeout):
81 if not read_callback():
82 break
83 check_callback()
84 finally:
85 sel.close()
86
87
88class SSLDispatcher(DispatcherBase):
89 """
90 SSLDispatcher
91 """
92
93 def read(
94 self,
95 sock: socket.socket,
96 read_callback: Callable,
97 check_callback: Callable,
98 ) -> None:
99 if self.app.sock is None or self.app.sock.sock is None:
100 return
101 sock = self.app.sock.sock
102 sel = selectors.DefaultSelector()
103 sel.register(sock, selectors.EVENT_READ)
104 try:
105 while self.app.keep_running:
106 if self.select(sock, sel):
107 if not read_callback():
108 break
109 check_callback()
110 finally:
111 sel.close()
112
113 def select(self, sock: Any, sel: selectors.DefaultSelector) -> Any:
114 if self.app.sock is None:
115 return None
116 sock = self.app.sock.sock
117 if sock.pending():
118 return [
119 sock,
120 ]
121
122 r = sel.select(self.ping_timeout)
123
124 if len(r) > 0:
125 return r[0][0]
126 return None
127
128
129class WrappedDispatcher:
130 """
131 WrappedDispatcher
132 """
133
134 def __init__(
135 self,
136 app: "WebSocketApp",
137 ping_timeout: Optional[Union[float, int]],
138 dispatcher: Any,
139 handleDisconnect: Any,
140 ) -> None:
141 self.app = app
142 self.ping_timeout = ping_timeout
143 self.dispatcher = dispatcher
144 self.handleDisconnect = handleDisconnect
145 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
146
147 def read(
148 self,
149 sock: socket.socket,
150 read_callback: Callable,
151 check_callback: Callable,
152 ) -> None:
153 self.dispatcher.read(sock, read_callback)
154 if self.ping_timeout:
155 self.timeout(self.ping_timeout, check_callback)
156
157 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
158 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect)
159 return len(data)
160
161 def timeout(self, seconds: float, callback: Callable, *args: Any) -> None:
162 self.dispatcher.timeout(seconds, callback, *args)
163
164 def reconnect(self, seconds: int, reconnector: Callable) -> None:
165 self.timeout(seconds, reconnector, True)