1import time
2import socket
3import inspect
4import selectors
5from typing import Any, Callable, Optional, Union
6from . import _logging
7from ._socket import send
8
9
10class DispatcherBase:
11 """
12 DispatcherBase
13 """
14
15 def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None:
16 self.app = app
17 self.ping_timeout = ping_timeout
18
19 def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None:
20 time.sleep(seconds)
21 callback()
22
23 def reconnect(self, seconds: int, reconnector: Callable) -> None:
24 try:
25 _logging.info(
26 f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
27 )
28 time.sleep(seconds)
29 reconnector(reconnecting=True)
30 except KeyboardInterrupt as e:
31 _logging.info(f"User exited {e}")
32 raise e
33
34 def send(self, sock: socket.socket, data: Union[str, bytes]) -> None:
35 return send(sock, data)
36
37
38class Dispatcher(DispatcherBase):
39 """
40 Dispatcher
41 """
42
43 def read(
44 self,
45 sock: socket.socket,
46 read_callback: Callable,
47 check_callback: Callable,
48 ) -> None:
49 sel = selectors.DefaultSelector()
50 sel.register(self.app.sock.sock, selectors.EVENT_READ)
51 try:
52 while self.app.keep_running:
53 if sel.select(self.ping_timeout):
54 if not read_callback():
55 break
56 check_callback()
57 finally:
58 sel.close()
59
60
61class SSLDispatcher(DispatcherBase):
62 """
63 SSLDispatcher
64 """
65
66 def read(
67 self,
68 sock: socket.socket,
69 read_callback: Callable,
70 check_callback: Callable,
71 ) -> None:
72 sock = self.app.sock.sock
73 sel = selectors.DefaultSelector()
74 sel.register(sock, selectors.EVENT_READ)
75 try:
76 while self.app.keep_running:
77 if self.select(sock, sel):
78 if not read_callback():
79 break
80 check_callback()
81 finally:
82 sel.close()
83
84 def select(self, sock, sel: selectors.DefaultSelector):
85 sock = self.app.sock.sock
86 if sock.pending():
87 return [
88 sock,
89 ]
90
91 r = sel.select(self.ping_timeout)
92
93 if len(r) > 0:
94 return r[0][0]
95
96
97class WrappedDispatcher:
98 """
99 WrappedDispatcher
100 """
101
102 def __init__(
103 self, app, ping_timeout: Union[float, int, None], dispatcher, handleDisconnect
104 ) -> None:
105 self.app = app
106 self.ping_timeout = ping_timeout
107 self.dispatcher = dispatcher
108 self.handleDisconnect = handleDisconnect
109 dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
110
111 def read(
112 self,
113 sock: socket.socket,
114 read_callback: Callable,
115 check_callback: Callable,
116 ) -> None:
117 self.dispatcher.read(sock, read_callback)
118 self.ping_timeout and self.timeout(self.ping_timeout, check_callback)
119
120 def send(self, sock: socket.socket, data: Union[str, bytes]) -> None:
121 self.dispatcher.buffwrite(sock, data, send, self.handleDisconnect)
122 return len(data)
123
124 def timeout(self, seconds: float, callback: Callable, *args) -> None:
125 self.dispatcher.timeout(seconds, callback, *args)
126
127 def reconnect(self, seconds: int, reconnector: Callable) -> None:
128 self.timeout(seconds, reconnector, True)