Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/event.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import threading
3from abc import ABC, abstractmethod
4from enum import Enum
5from typing import List, Optional, Union
7from redis.auth.token import TokenInterface
8from redis.credentials import CredentialProvider, StreamingCredentialProvider
11class EventListenerInterface(ABC):
12 """
13 Represents a listener for given event object.
14 """
16 @abstractmethod
17 def listen(self, event: object):
18 pass
21class AsyncEventListenerInterface(ABC):
22 """
23 Represents an async listener for given event object.
24 """
26 @abstractmethod
27 async def listen(self, event: object):
28 pass
31class EventDispatcherInterface(ABC):
32 """
33 Represents a dispatcher that dispatches events to listeners
34 associated with given event.
35 """
37 @abstractmethod
38 def dispatch(self, event: object):
39 pass
41 @abstractmethod
42 async def dispatch_async(self, event: object):
43 pass
46class EventException(Exception):
47 """
48 Exception wrapper that adds an event object into exception context.
49 """
51 def __init__(self, exception: Exception, event: object):
52 self.exception = exception
53 self.event = event
54 super().__init__(exception)
57class EventDispatcher(EventDispatcherInterface):
58 # TODO: Make dispatcher to accept external mappings.
59 def __init__(self):
60 """
61 Mapping should be extended for any new events or listeners to be added.
62 """
63 self._event_listeners_mapping = {
64 AfterConnectionReleasedEvent: [
65 ReAuthConnectionListener(),
66 ],
67 AfterPooledConnectionsInstantiationEvent: [
68 RegisterReAuthForPooledConnections()
69 ],
70 AfterSingleConnectionInstantiationEvent: [
71 RegisterReAuthForSingleConnection()
72 ],
73 AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()],
74 AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()],
75 AsyncAfterConnectionReleasedEvent: [
76 AsyncReAuthConnectionListener(),
77 ],
78 }
80 def dispatch(self, event: object):
81 listeners = self._event_listeners_mapping.get(type(event))
83 for listener in listeners:
84 listener.listen(event)
86 async def dispatch_async(self, event: object):
87 listeners = self._event_listeners_mapping.get(type(event))
89 for listener in listeners:
90 await listener.listen(event)
93class AfterConnectionReleasedEvent:
94 """
95 Event that will be fired before each command execution.
96 """
98 def __init__(self, connection):
99 self._connection = connection
101 @property
102 def connection(self):
103 return self._connection
106class AsyncAfterConnectionReleasedEvent(AfterConnectionReleasedEvent):
107 pass
110class ClientType(Enum):
111 SYNC = ("sync",)
112 ASYNC = ("async",)
115class AfterPooledConnectionsInstantiationEvent:
116 """
117 Event that will be fired after pooled connection instances was created.
118 """
120 def __init__(
121 self,
122 connection_pools: List,
123 client_type: ClientType,
124 credential_provider: Optional[CredentialProvider] = None,
125 ):
126 self._connection_pools = connection_pools
127 self._client_type = client_type
128 self._credential_provider = credential_provider
130 @property
131 def connection_pools(self):
132 return self._connection_pools
134 @property
135 def client_type(self) -> ClientType:
136 return self._client_type
138 @property
139 def credential_provider(self) -> Union[CredentialProvider, None]:
140 return self._credential_provider
143class AfterSingleConnectionInstantiationEvent:
144 """
145 Event that will be fired after single connection instances was created.
147 :param connection_lock: For sync client thread-lock should be provided,
148 for async asyncio.Lock
149 """
151 def __init__(
152 self,
153 connection,
154 client_type: ClientType,
155 connection_lock: Union[threading.RLock, asyncio.Lock],
156 ):
157 self._connection = connection
158 self._client_type = client_type
159 self._connection_lock = connection_lock
161 @property
162 def connection(self):
163 return self._connection
165 @property
166 def client_type(self) -> ClientType:
167 return self._client_type
169 @property
170 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
171 return self._connection_lock
174class AfterPubSubConnectionInstantiationEvent:
175 def __init__(
176 self,
177 pubsub_connection,
178 connection_pool,
179 client_type: ClientType,
180 connection_lock: Union[threading.RLock, asyncio.Lock],
181 ):
182 self._pubsub_connection = pubsub_connection
183 self._connection_pool = connection_pool
184 self._client_type = client_type
185 self._connection_lock = connection_lock
187 @property
188 def pubsub_connection(self):
189 return self._pubsub_connection
191 @property
192 def connection_pool(self):
193 return self._connection_pool
195 @property
196 def client_type(self) -> ClientType:
197 return self._client_type
199 @property
200 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
201 return self._connection_lock
204class AfterAsyncClusterInstantiationEvent:
205 """
206 Event that will be fired after async cluster instance was created.
208 Async cluster doesn't use connection pools,
209 instead ClusterNode object manages connections.
210 """
212 def __init__(
213 self,
214 nodes: dict,
215 credential_provider: Optional[CredentialProvider] = None,
216 ):
217 self._nodes = nodes
218 self._credential_provider = credential_provider
220 @property
221 def nodes(self) -> dict:
222 return self._nodes
224 @property
225 def credential_provider(self) -> Union[CredentialProvider, None]:
226 return self._credential_provider
229class ReAuthConnectionListener(EventListenerInterface):
230 """
231 Listener that performs re-authentication of given connection.
232 """
234 def listen(self, event: AfterConnectionReleasedEvent):
235 event.connection.re_auth()
238class AsyncReAuthConnectionListener(AsyncEventListenerInterface):
239 """
240 Async listener that performs re-authentication of given connection.
241 """
243 async def listen(self, event: AsyncAfterConnectionReleasedEvent):
244 await event.connection.re_auth()
247class RegisterReAuthForPooledConnections(EventListenerInterface):
248 """
249 Listener that registers a re-authentication callback for pooled connections.
250 Required by :class:`StreamingCredentialProvider`.
251 """
253 def __init__(self):
254 self._event = None
256 def listen(self, event: AfterPooledConnectionsInstantiationEvent):
257 if isinstance(event.credential_provider, StreamingCredentialProvider):
258 self._event = event
260 if event.client_type == ClientType.SYNC:
261 event.credential_provider.on_next(self._re_auth)
262 event.credential_provider.on_error(self._raise_on_error)
263 else:
264 event.credential_provider.on_next(self._re_auth_async)
265 event.credential_provider.on_error(self._raise_on_error_async)
267 def _re_auth(self, token):
268 for pool in self._event.connection_pools:
269 pool.re_auth_callback(token)
271 async def _re_auth_async(self, token):
272 for pool in self._event.connection_pools:
273 await pool.re_auth_callback(token)
275 def _raise_on_error(self, error: Exception):
276 raise EventException(error, self._event)
278 async def _raise_on_error_async(self, error: Exception):
279 raise EventException(error, self._event)
282class RegisterReAuthForSingleConnection(EventListenerInterface):
283 """
284 Listener that registers a re-authentication callback for single connection.
285 Required by :class:`StreamingCredentialProvider`.
286 """
288 def __init__(self):
289 self._event = None
291 def listen(self, event: AfterSingleConnectionInstantiationEvent):
292 if isinstance(
293 event.connection.credential_provider, StreamingCredentialProvider
294 ):
295 self._event = event
297 if event.client_type == ClientType.SYNC:
298 event.connection.credential_provider.on_next(self._re_auth)
299 event.connection.credential_provider.on_error(self._raise_on_error)
300 else:
301 event.connection.credential_provider.on_next(self._re_auth_async)
302 event.connection.credential_provider.on_error(
303 self._raise_on_error_async
304 )
306 def _re_auth(self, token):
307 with self._event.connection_lock:
308 self._event.connection.send_command(
309 "AUTH", token.try_get("oid"), token.get_value()
310 )
311 self._event.connection.read_response()
313 async def _re_auth_async(self, token):
314 async with self._event.connection_lock:
315 await self._event.connection.send_command(
316 "AUTH", token.try_get("oid"), token.get_value()
317 )
318 await self._event.connection.read_response()
320 def _raise_on_error(self, error: Exception):
321 raise EventException(error, self._event)
323 async def _raise_on_error_async(self, error: Exception):
324 raise EventException(error, self._event)
327class RegisterReAuthForAsyncClusterNodes(EventListenerInterface):
328 def __init__(self):
329 self._event = None
331 def listen(self, event: AfterAsyncClusterInstantiationEvent):
332 if isinstance(event.credential_provider, StreamingCredentialProvider):
333 self._event = event
334 event.credential_provider.on_next(self._re_auth)
335 event.credential_provider.on_error(self._raise_on_error)
337 async def _re_auth(self, token: TokenInterface):
338 for key in self._event.nodes:
339 await self._event.nodes[key].re_auth_callback(token)
341 async def _raise_on_error(self, error: Exception):
342 raise EventException(error, self._event)
345class RegisterReAuthForPubSub(EventListenerInterface):
346 def __init__(self):
347 self._connection = None
348 self._connection_pool = None
349 self._client_type = None
350 self._connection_lock = None
351 self._event = None
353 def listen(self, event: AfterPubSubConnectionInstantiationEvent):
354 if isinstance(
355 event.pubsub_connection.credential_provider, StreamingCredentialProvider
356 ) and event.pubsub_connection.get_protocol() in [3, "3"]:
357 self._event = event
358 self._connection = event.pubsub_connection
359 self._connection_pool = event.connection_pool
360 self._client_type = event.client_type
361 self._connection_lock = event.connection_lock
363 if self._client_type == ClientType.SYNC:
364 self._connection.credential_provider.on_next(self._re_auth)
365 self._connection.credential_provider.on_error(self._raise_on_error)
366 else:
367 self._connection.credential_provider.on_next(self._re_auth_async)
368 self._connection.credential_provider.on_error(
369 self._raise_on_error_async
370 )
372 def _re_auth(self, token: TokenInterface):
373 with self._connection_lock:
374 self._connection.send_command(
375 "AUTH", token.try_get("oid"), token.get_value()
376 )
377 self._connection.read_response()
379 self._connection_pool.re_auth_callback(token)
381 async def _re_auth_async(self, token: TokenInterface):
382 async with self._connection_lock:
383 await self._connection.send_command(
384 "AUTH", token.try_get("oid"), token.get_value()
385 )
386 await self._connection.read_response()
388 await self._connection_pool.re_auth_callback(token)
390 def _raise_on_error(self, error: Exception):
391 raise EventException(error, self._event)
393 async def _raise_on_error_async(self, error: Exception):
394 raise EventException(error, self._event)