Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/event.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import threading
3from abc import ABC, abstractmethod
4from enum import Enum
5from typing import Dict, List, Optional, Type, Union
7from redis.auth.token import TokenInterface
8from redis.credentials import CredentialProvider, StreamingCredentialProvider
9from redis.utils import check_protocol_version
12class EventListenerInterface(ABC):
13 """
14 Represents a listener for given event object.
15 """
17 @abstractmethod
18 def listen(self, event: object):
19 pass
22class AsyncEventListenerInterface(ABC):
23 """
24 Represents an async listener for given event object.
25 """
27 @abstractmethod
28 async def listen(self, event: object):
29 pass
32class EventDispatcherInterface(ABC):
33 """
34 Represents a dispatcher that dispatches events to listeners
35 associated with given event.
36 """
38 @abstractmethod
39 def dispatch(self, event: object):
40 pass
42 @abstractmethod
43 async def dispatch_async(self, event: object):
44 pass
46 @abstractmethod
47 def register_listeners(
48 self,
49 mappings: Dict[
50 Type[object],
51 List[Union[EventListenerInterface, AsyncEventListenerInterface]],
52 ],
53 ):
54 """Register additional listeners."""
55 pass
58class EventException(Exception):
59 """
60 Exception wrapper that adds an event object into exception context.
61 """
63 def __init__(self, exception: Exception, event: object):
64 self.exception = exception
65 self.event = event
66 super().__init__(exception)
69class EventDispatcher(EventDispatcherInterface):
70 # TODO: Make dispatcher to accept external mappings.
71 def __init__(
72 self,
73 event_listeners: Optional[
74 Dict[Type[object], List[EventListenerInterface]]
75 ] = None,
76 ):
77 """
78 Dispatcher that dispatches events to listeners associated with given event.
79 """
80 self._event_listeners_mapping: Dict[
81 Type[object], List[EventListenerInterface]
82 ] = {
83 AfterConnectionReleasedEvent: [
84 ReAuthConnectionListener(),
85 ],
86 AfterPooledConnectionsInstantiationEvent: [
87 RegisterReAuthForPooledConnections()
88 ],
89 AfterSingleConnectionInstantiationEvent: [
90 RegisterReAuthForSingleConnection()
91 ],
92 AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()],
93 AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()],
94 AsyncAfterConnectionReleasedEvent: [
95 AsyncReAuthConnectionListener(),
96 ],
97 }
99 self._lock = threading.Lock()
100 self._async_lock = None
102 if event_listeners:
103 self.register_listeners(event_listeners)
105 def dispatch(self, event: object):
106 with self._lock:
107 listeners = self._event_listeners_mapping.get(type(event), [])
109 for listener in listeners:
110 listener.listen(event)
112 async def dispatch_async(self, event: object):
113 if self._async_lock is None:
114 self._async_lock = asyncio.Lock()
116 async with self._async_lock:
117 listeners = self._event_listeners_mapping.get(type(event), [])
119 for listener in listeners:
120 await listener.listen(event)
122 def register_listeners(
123 self,
124 mappings: Dict[
125 Type[object],
126 List[Union[EventListenerInterface, AsyncEventListenerInterface]],
127 ],
128 ):
129 with self._lock:
130 for event_type in mappings:
131 if event_type in self._event_listeners_mapping:
132 self._event_listeners_mapping[event_type] = list(
133 set(
134 self._event_listeners_mapping[event_type]
135 + mappings[event_type]
136 )
137 )
138 else:
139 self._event_listeners_mapping[event_type] = mappings[event_type]
142class AfterConnectionReleasedEvent:
143 """
144 Event that will be fired before each command execution.
145 """
147 def __init__(self, connection):
148 self._connection = connection
150 @property
151 def connection(self):
152 return self._connection
155class AsyncAfterConnectionReleasedEvent(AfterConnectionReleasedEvent):
156 pass
159class ClientType(Enum):
160 SYNC = ("sync",)
161 ASYNC = ("async",)
164class AfterPooledConnectionsInstantiationEvent:
165 """
166 Event that will be fired after pooled connection instances was created.
167 """
169 def __init__(
170 self,
171 connection_pools: List,
172 client_type: ClientType,
173 credential_provider: Optional[CredentialProvider] = None,
174 ):
175 self._connection_pools = connection_pools
176 self._client_type = client_type
177 self._credential_provider = credential_provider
179 @property
180 def connection_pools(self):
181 return self._connection_pools
183 @property
184 def client_type(self) -> ClientType:
185 return self._client_type
187 @property
188 def credential_provider(self) -> Union[CredentialProvider, None]:
189 return self._credential_provider
192class AfterSingleConnectionInstantiationEvent:
193 """
194 Event that will be fired after single connection instances was created.
196 :param connection_lock: For sync client thread-lock should be provided,
197 for async asyncio.Lock
198 """
200 def __init__(
201 self,
202 connection,
203 client_type: ClientType,
204 connection_lock: Union[threading.RLock, asyncio.Lock],
205 ):
206 self._connection = connection
207 self._client_type = client_type
208 self._connection_lock = connection_lock
210 @property
211 def connection(self):
212 return self._connection
214 @property
215 def client_type(self) -> ClientType:
216 return self._client_type
218 @property
219 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
220 return self._connection_lock
223class AfterPubSubConnectionInstantiationEvent:
224 def __init__(
225 self,
226 pubsub_connection,
227 connection_pool,
228 client_type: ClientType,
229 connection_lock: Union[threading.RLock, asyncio.Lock],
230 ):
231 self._pubsub_connection = pubsub_connection
232 self._connection_pool = connection_pool
233 self._client_type = client_type
234 self._connection_lock = connection_lock
236 @property
237 def pubsub_connection(self):
238 return self._pubsub_connection
240 @property
241 def connection_pool(self):
242 return self._connection_pool
244 @property
245 def client_type(self) -> ClientType:
246 return self._client_type
248 @property
249 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]:
250 return self._connection_lock
253class AfterAsyncClusterInstantiationEvent:
254 """
255 Event that will be fired after async cluster instance was created.
257 Async cluster doesn't use connection pools,
258 instead ClusterNode object manages connections.
259 """
261 def __init__(
262 self,
263 nodes: dict,
264 credential_provider: Optional[CredentialProvider] = None,
265 ):
266 self._nodes = nodes
267 self._credential_provider = credential_provider
269 @property
270 def nodes(self) -> dict:
271 return self._nodes
273 @property
274 def credential_provider(self) -> Union[CredentialProvider, None]:
275 return self._credential_provider
278class OnCommandsFailEvent:
279 """
280 Event fired whenever a command fails during the execution.
281 """
283 def __init__(
284 self,
285 commands: tuple,
286 exception: Exception,
287 ):
288 self._commands = commands
289 self._exception = exception
291 @property
292 def commands(self) -> tuple:
293 return self._commands
295 @property
296 def exception(self) -> Exception:
297 return self._exception
300class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
301 pass
304class ReAuthConnectionListener(EventListenerInterface):
305 """
306 Listener that performs re-authentication of given connection.
307 """
309 def listen(self, event: AfterConnectionReleasedEvent):
310 event.connection.re_auth()
313class AsyncReAuthConnectionListener(AsyncEventListenerInterface):
314 """
315 Async listener that performs re-authentication of given connection.
316 """
318 async def listen(self, event: AsyncAfterConnectionReleasedEvent):
319 await event.connection.re_auth()
322class RegisterReAuthForPooledConnections(EventListenerInterface):
323 """
324 Listener that registers a re-authentication callback for pooled connections.
325 Required by :class:`StreamingCredentialProvider`.
326 """
328 def __init__(self):
329 self._event = None
331 def listen(self, event: AfterPooledConnectionsInstantiationEvent):
332 if isinstance(event.credential_provider, StreamingCredentialProvider):
333 self._event = event
335 if event.client_type == ClientType.SYNC:
336 event.credential_provider.on_next(self._re_auth)
337 event.credential_provider.on_error(self._raise_on_error)
338 else:
339 event.credential_provider.on_next(self._re_auth_async)
340 event.credential_provider.on_error(self._raise_on_error_async)
342 def _re_auth(self, token):
343 for pool in self._event.connection_pools:
344 pool.re_auth_callback(token)
346 async def _re_auth_async(self, token):
347 for pool in self._event.connection_pools:
348 await pool.re_auth_callback(token)
350 def _raise_on_error(self, error: Exception):
351 raise EventException(error, self._event)
353 async def _raise_on_error_async(self, error: Exception):
354 raise EventException(error, self._event)
357class RegisterReAuthForSingleConnection(EventListenerInterface):
358 """
359 Listener that registers a re-authentication callback for single connection.
360 Required by :class:`StreamingCredentialProvider`.
361 """
363 def __init__(self):
364 self._event = None
366 def listen(self, event: AfterSingleConnectionInstantiationEvent):
367 if isinstance(
368 event.connection.credential_provider, StreamingCredentialProvider
369 ):
370 self._event = event
372 if event.client_type == ClientType.SYNC:
373 event.connection.credential_provider.on_next(self._re_auth)
374 event.connection.credential_provider.on_error(self._raise_on_error)
375 else:
376 event.connection.credential_provider.on_next(self._re_auth_async)
377 event.connection.credential_provider.on_error(
378 self._raise_on_error_async
379 )
381 def _re_auth(self, token):
382 with self._event.connection_lock:
383 self._event.connection.send_command(
384 "AUTH", token.try_get("oid"), token.get_value()
385 )
386 self._event.connection.read_response()
388 async def _re_auth_async(self, token):
389 async with self._event.connection_lock:
390 await self._event.connection.send_command(
391 "AUTH", token.try_get("oid"), token.get_value()
392 )
393 await self._event.connection.read_response()
395 def _raise_on_error(self, error: Exception):
396 raise EventException(error, self._event)
398 async def _raise_on_error_async(self, error: Exception):
399 raise EventException(error, self._event)
402class RegisterReAuthForAsyncClusterNodes(EventListenerInterface):
403 def __init__(self):
404 self._event = None
406 def listen(self, event: AfterAsyncClusterInstantiationEvent):
407 if isinstance(event.credential_provider, StreamingCredentialProvider):
408 self._event = event
409 event.credential_provider.on_next(self._re_auth)
410 event.credential_provider.on_error(self._raise_on_error)
412 async def _re_auth(self, token: TokenInterface):
413 for key in self._event.nodes:
414 await self._event.nodes[key].re_auth_callback(token)
416 async def _raise_on_error(self, error: Exception):
417 raise EventException(error, self._event)
420class RegisterReAuthForPubSub(EventListenerInterface):
421 def __init__(self):
422 self._connection = None
423 self._connection_pool = None
424 self._client_type = None
425 self._connection_lock = None
426 self._event = None
428 def listen(self, event: AfterPubSubConnectionInstantiationEvent):
429 if isinstance(
430 event.pubsub_connection.credential_provider, StreamingCredentialProvider
431 ) and check_protocol_version(event.pubsub_connection.get_protocol(), 3):
432 self._event = event
433 self._connection = event.pubsub_connection
434 self._connection_pool = event.connection_pool
435 self._client_type = event.client_type
436 self._connection_lock = event.connection_lock
438 if self._client_type == ClientType.SYNC:
439 self._connection.credential_provider.on_next(self._re_auth)
440 self._connection.credential_provider.on_error(self._raise_on_error)
441 else:
442 self._connection.credential_provider.on_next(self._re_auth_async)
443 self._connection.credential_provider.on_error(
444 self._raise_on_error_async
445 )
447 def _re_auth(self, token: TokenInterface):
448 with self._connection_lock:
449 self._connection.send_command(
450 "AUTH", token.try_get("oid"), token.get_value()
451 )
452 self._connection.read_response()
454 self._connection_pool.re_auth_callback(token)
456 async def _re_auth_async(self, token: TokenInterface):
457 async with self._connection_lock:
458 await self._connection.send_command(
459 "AUTH", token.try_get("oid"), token.get_value()
460 )
461 await self._connection.read_response()
463 await self._connection_pool.re_auth_callback(token)
465 def _raise_on_error(self, error: Exception):
466 raise EventException(error, self._event)
468 async def _raise_on_error_async(self, error: Exception):
469 raise EventException(error, self._event)