1import time
2import socket
3import inspect
4import selectors
5from typing import TYPE_CHECKING, Callable, Optional, Union
6
7if TYPE_CHECKING:
8 from ._app import WebSocketApp
9from . import _logging
10from ._socket import send
11
12"""
13_dispatcher.py
14websocket - WebSocket client library for Python
15
16Copyright 2025 engn33r
17
18Licensed under the Apache License, Version 2.0 (the "License");
19you may not use this file except in compliance with the License.
20You may obtain a copy of the License at
21
22 http://www.apache.org/licenses/LICENSE-2.0
23
24Unless required by applicable law or agreed to in writing, software
25distributed under the License is distributed on an "AS IS" BASIS,
26WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27See the License for the specific language governing permissions and
28limitations under the License.
29"""
30
31class DispatcherBase:
32 """
33 DispatcherBase
34 """
35
36 def __init__(
37 self, app: "WebSocketApp", ping_timeout: Optional[Union[float, int]]
38 ) -> None:
39 self.app = app
40 self.ping_timeout = ping_timeout
41
42 def timeout(self, seconds: Optional[Union[float, int]], callback: Callable) -> None:
43 if seconds is not None:
44 time.sleep(seconds)
45 callback()
46
47 def reconnect(self, seconds: int, reconnector: Callable) -> None:
48 try:
49 _logging.info(
50 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
51 )
52 time.sleep(seconds)
53 reconnector(reconnecting=True)
54 except KeyboardInterrupt as e:
55 _logging.info(f"User exited {e}")
56 raise e
57
58 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
59 return send(sock, data)
60
61
62class Dispatcher(DispatcherBase):
63 """
64 Dispatcher
65 """
66
67 def read(
68 self,
69 sock: socket.socket,
70 read_callback: Callable,
71 check_callback: Callable,
72 ) -> None:
73 if self.app.sock is None or self.app.sock.sock is None:
74 return
75 sel = selectors.DefaultSelector()
76 sel.register(self.app.sock.sock, selectors.EVENT_READ)
77 try:
78 while self.app.keep_running:
79 if sel.select(self.ping_timeout):
80 if not read_callback():
81 break
82 check_callback()
83 finally:
84 sel.close()
85
86
87class SSLDispatcher(DispatcherBase):
88 """
89 SSLDispatcher
90 """
91
92 def read(
93 self,
94 sock: socket.socket,
95 read_callback: Callable,
96 check_callback: Callable,
97 ) -> None:
98 if self.app.sock is None or self.app.sock.sock is None:
99 return
100 sock = self.app.sock.sock
101 sel = selectors.DefaultSelector()
102 sel.register(sock, selectors.EVENT_READ)
103 try:
104 while self.app.keep_running:
105 if self.select(sock, sel):
106 if not read_callback():
107 break
108 check_callback()
109 finally:
110 sel.close()
111
112 def select(self, sock, sel: selectors.DefaultSelector):
113 if self.app.sock is None:
114 return None
115 sock = self.app.sock.sock
116 if sock.pending():
117 return [
118 sock,
119 ]
120
121 r = sel.select(self.ping_timeout)
122
123 if len(r) > 0:
124 return r[0][0]
125 return None
126
127
128class WrappedDispatcher:
129 """
130 WrappedDispatcher
131 """
132
133 def __init__(
134 self,
135 app: "WebSocketApp",
136 ping_timeout: Optional[Union[float, int]],
137 dispatcher,
138 handleDisconnect,
139 ) -> None:
140 self.app = app
141 self.ping_timeout = ping_timeout
142 self.dispatcher = dispatcher
143 self.handleDisconnect = handleDisconnect
144 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
145
146 def read(
147 self,
148 sock: socket.socket,
149 read_callback: Callable,
150 check_callback: Callable,
151 ) -> None:
152 self.dispatcher.read(sock, read_callback)
153 if self.ping_timeout:
154 self.timeout(self.ping_timeout, check_callback)
155
156 def send(self, sock: socket.socket, data: Union[str, bytes]) -> int:
157 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect)
158 return len(data)
159
160 def timeout(self, seconds: float, callback: Callable, *args) -> None:
161 self.dispatcher.timeout(seconds, callback, *args)
162
163 def reconnect(self, seconds: int, reconnector: Callable) -> None:
164 self.timeout(seconds, reconnector, True)