Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/blinker/base.py: 30%
218 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1"""Signals and events.
3A small implementation of signals, inspired by a snippet of Django signal
4API client code seen in a blog post. Signals are first-class objects and
5each manages its own receivers and message emission.
7The :func:`signal` function provides singleton behavior for named signals.
9"""
10from __future__ import annotations
12import typing as t
13from collections import defaultdict
14from contextlib import contextmanager
15from warnings import warn
16from weakref import WeakValueDictionary
18from blinker._utilities import annotatable_weakref
19from blinker._utilities import hashable_identity
20from blinker._utilities import IdentityType
21from blinker._utilities import is_coroutine_function
22from blinker._utilities import lazy_property
23from blinker._utilities import reference
24from blinker._utilities import symbol
25from blinker._utilities import WeakTypes
27if t.TYPE_CHECKING:
28 import typing_extensions as te
30 T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any])
32 T = t.TypeVar("T")
33 P = te.ParamSpec("P")
35 AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
36 SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
38ANY = symbol("ANY")
39ANY.__doc__ = 'Token for "any sender".'
40ANY_ID = 0
43class Signal:
44 """A notification emitter."""
46 #: An :obj:`ANY` convenience synonym, allows ``Signal.ANY``
47 #: without an additional import.
48 ANY = ANY
50 @lazy_property
51 def receiver_connected(self) -> Signal:
52 """Emitted after each :meth:`connect`.
54 The signal sender is the signal instance, and the :meth:`connect`
55 arguments are passed through: *receiver*, *sender*, and *weak*.
57 .. versionadded:: 1.2
59 """
60 return Signal(doc="Emitted after a receiver connects.")
62 @lazy_property
63 def receiver_disconnected(self) -> Signal:
64 """Emitted after :meth:`disconnect`.
66 The sender is the signal instance, and the :meth:`disconnect` arguments
67 are passed through: *receiver* and *sender*.
69 Note, this signal is emitted **only** when :meth:`disconnect` is
70 called explicitly.
72 The disconnect signal can not be emitted by an automatic disconnect
73 (due to a weakly referenced receiver or sender going out of scope),
74 as the receiver and/or sender instances are no longer available for
75 use at the time this signal would be emitted.
77 An alternative approach is available by subscribing to
78 :attr:`receiver_connected` and setting up a custom weakref cleanup
79 callback on weak receivers and senders.
81 .. versionadded:: 1.2
83 """
84 return Signal(doc="Emitted after a receiver disconnects.")
86 def __init__(self, doc: str | None = None) -> None:
87 """
88 :param doc: optional. If provided, will be assigned to the signal's
89 __doc__ attribute.
91 """
92 if doc:
93 self.__doc__ = doc
94 #: A mapping of connected receivers.
95 #:
96 #: The values of this mapping are not meaningful outside of the
97 #: internal :class:`Signal` implementation, however the boolean value
98 #: of the mapping is useful as an extremely efficient check to see if
99 #: any receivers are connected to the signal.
100 self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {}
101 self.is_muted = False
102 self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict(set)
103 self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict(set)
104 self._weak_senders: dict[IdentityType, annotatable_weakref] = {}
106 def connect(
107 self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
108 ) -> T_callable:
109 """Connect *receiver* to signal events sent by *sender*.
111 :param receiver: A callable. Will be invoked by :meth:`send` with
112 `sender=` as a single positional argument and any ``kwargs`` that
113 were provided to a call to :meth:`send`.
115 :param sender: Any object or :obj:`ANY`, defaults to ``ANY``.
116 Restricts notifications delivered to *receiver* to only those
117 :meth:`send` emissions sent by *sender*. If ``ANY``, the receiver
118 will always be notified. A *receiver* may be connected to
119 multiple *sender* values on the same Signal through multiple calls
120 to :meth:`connect`.
122 :param weak: If true, the Signal will hold a weakref to *receiver*
123 and automatically disconnect when *receiver* goes out of scope or
124 is garbage collected. Defaults to True.
126 """
127 receiver_id = hashable_identity(receiver)
128 receiver_ref: T_callable | annotatable_weakref
130 if weak:
131 receiver_ref = reference(receiver, self._cleanup_receiver)
132 receiver_ref.receiver_id = receiver_id
133 else:
134 receiver_ref = receiver
135 sender_id: IdentityType
136 if sender is ANY:
137 sender_id = ANY_ID
138 else:
139 sender_id = hashable_identity(sender)
141 self.receivers.setdefault(receiver_id, receiver_ref)
142 self._by_sender[sender_id].add(receiver_id)
143 self._by_receiver[receiver_id].add(sender_id)
144 del receiver_ref
146 if sender is not ANY and sender_id not in self._weak_senders:
147 # wire together a cleanup for weakref-able senders
148 try:
149 sender_ref = reference(sender, self._cleanup_sender)
150 sender_ref.sender_id = sender_id
151 except TypeError:
152 pass
153 else:
154 self._weak_senders.setdefault(sender_id, sender_ref)
155 del sender_ref
157 # broadcast this connection. if receivers raise, disconnect.
158 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
159 try:
160 self.receiver_connected.send(
161 self, receiver=receiver, sender=sender, weak=weak
162 )
163 except TypeError as e:
164 self.disconnect(receiver, sender)
165 raise e
166 if receiver_connected.receivers and self is not receiver_connected:
167 try:
168 receiver_connected.send(
169 self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak
170 )
171 except TypeError as e:
172 self.disconnect(receiver, sender)
173 raise e
174 return receiver
176 def connect_via(
177 self, sender: t.Any, weak: bool = False
178 ) -> t.Callable[[T_callable], T_callable]:
179 """Connect the decorated function as a receiver for *sender*.
181 :param sender: Any object or :obj:`ANY`. The decorated function
182 will only receive :meth:`send` emissions sent by *sender*. If
183 ``ANY``, the receiver will always be notified. A function may be
184 decorated multiple times with differing *sender* values.
186 :param weak: If true, the Signal will hold a weakref to the
187 decorated function and automatically disconnect when *receiver*
188 goes out of scope or is garbage collected. Unlike
189 :meth:`connect`, this defaults to False.
191 The decorated function will be invoked by :meth:`send` with
192 `sender=` as a single positional argument and any ``kwargs`` that
193 were provided to the call to :meth:`send`.
196 .. versionadded:: 1.1
198 """
200 def decorator(fn: T_callable) -> T_callable:
201 self.connect(fn, sender, weak)
202 return fn
204 return decorator
206 @contextmanager
207 def connected_to(
208 self, receiver: t.Callable, sender: t.Any = ANY
209 ) -> t.Generator[None, None, None]:
210 """Execute a block with the signal temporarily connected to *receiver*.
212 :param receiver: a receiver callable
213 :param sender: optional, a sender to filter on
215 This is a context manager for use in the ``with`` statement. It can
216 be useful in unit tests. *receiver* is connected to the signal for
217 the duration of the ``with`` block, and will be disconnected
218 automatically when exiting the block:
220 .. code-block:: python
222 with on_ready.connected_to(receiver):
223 # do stuff
224 on_ready.send(123)
226 .. versionadded:: 1.1
228 """
229 self.connect(receiver, sender=sender, weak=False)
230 try:
231 yield None
232 except Exception as e:
233 self.disconnect(receiver)
234 raise e
235 else:
236 self.disconnect(receiver)
238 @contextmanager
239 def muted(self) -> t.Generator[None, None, None]:
240 """Context manager for temporarily disabling signal.
241 Useful for test purposes.
242 """
243 self.is_muted = True
244 try:
245 yield None
246 except Exception as e:
247 raise e
248 finally:
249 self.is_muted = False
251 def temporarily_connected_to(
252 self, receiver: t.Callable, sender: t.Any = ANY
253 ) -> t.ContextManager[None]:
254 """An alias for :meth:`connected_to`.
256 :param receiver: a receiver callable
257 :param sender: optional, a sender to filter on
259 .. versionadded:: 0.9
261 .. versionchanged:: 1.1
262 Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was
263 deprecated in 1.2 and will be removed in a subsequent version.
265 """
266 warn(
267 "temporarily_connected_to is deprecated; use connected_to instead.",
268 DeprecationWarning,
269 )
270 return self.connected_to(receiver, sender)
272 def send(
273 self,
274 *sender: t.Any,
275 _async_wrapper: AsyncWrapperType | None = None,
276 **kwargs: t.Any,
277 ) -> list[tuple[t.Callable, t.Any]]:
278 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
280 Returns a list of 2-tuples, pairing receivers with their return
281 value. The ordering of receiver notification is undefined.
283 :param sender: Any object or ``None``. If omitted, synonymous
284 with ``None``. Only accepts one positional argument.
285 :param _async_wrapper: A callable that should wrap a coroutine
286 receiver and run it when called synchronously.
288 :param kwargs: Data to be sent to receivers.
289 """
290 if self.is_muted:
291 return []
293 sender = self._extract_sender(sender)
294 results = []
295 for receiver in self.receivers_for(sender):
296 if is_coroutine_function(receiver):
297 if _async_wrapper is None:
298 raise RuntimeError("Cannot send to a coroutine function")
299 receiver = _async_wrapper(receiver)
300 result = receiver(sender, **kwargs) # type: ignore[call-arg]
301 results.append((receiver, result))
302 return results
304 async def send_async(
305 self,
306 *sender: t.Any,
307 _sync_wrapper: SyncWrapperType | None = None,
308 **kwargs: t.Any,
309 ) -> list[tuple[t.Callable, t.Any]]:
310 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
312 Returns a list of 2-tuples, pairing receivers with their return
313 value. The ordering of receiver notification is undefined.
315 :param sender: Any object or ``None``. If omitted, synonymous
316 with ``None``. Only accepts one positional argument.
317 :param _sync_wrapper: A callable that should wrap a synchronous
318 receiver and run it when awaited.
320 :param kwargs: Data to be sent to receivers.
321 """
322 if self.is_muted:
323 return []
325 sender = self._extract_sender(sender)
326 results = []
327 for receiver in self.receivers_for(sender):
328 if not is_coroutine_function(receiver):
329 if _sync_wrapper is None:
330 raise RuntimeError("Cannot send to a non-coroutine function")
331 receiver = _sync_wrapper(receiver) # type: ignore[arg-type]
332 result = await receiver(sender, **kwargs) # type: ignore[call-arg, misc]
333 results.append((receiver, result))
334 return results
336 def _extract_sender(self, sender: t.Any) -> t.Any:
337 if not self.receivers:
338 # Ensure correct signature even on no-op sends, disable with -O
339 # for lowest possible cost.
340 if __debug__ and sender and len(sender) > 1:
341 raise TypeError(
342 f"send() accepts only one positional argument, {len(sender)} given"
343 )
344 return []
346 # Using '*sender' rather than 'sender=None' allows 'sender' to be
347 # used as a keyword argument- i.e. it's an invisible name in the
348 # function signature.
349 if len(sender) == 0:
350 sender = None
351 elif len(sender) > 1:
352 raise TypeError(
353 f"send() accepts only one positional argument, {len(sender)} given"
354 )
355 else:
356 sender = sender[0]
357 return sender
359 def has_receivers_for(self, sender: t.Any) -> bool:
360 """True if there is probably a receiver for *sender*.
362 Performs an optimistic check only. Does not guarantee that all
363 weakly referenced receivers are still alive. See
364 :meth:`receivers_for` for a stronger search.
366 """
367 if not self.receivers:
368 return False
369 if self._by_sender[ANY_ID]:
370 return True
371 if sender is ANY:
372 return False
373 return hashable_identity(sender) in self._by_sender
375 def receivers_for(
376 self, sender: t.Any
377 ) -> t.Generator[t.Callable | annotatable_weakref, None, None]:
378 """Iterate all live receivers listening for *sender*."""
379 # TODO: test receivers_for(ANY)
380 if self.receivers:
381 sender_id = hashable_identity(sender)
382 if sender_id in self._by_sender:
383 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
384 else:
385 ids = self._by_sender[ANY_ID].copy()
386 for receiver_id in ids:
387 receiver = self.receivers.get(receiver_id)
388 if receiver is None:
389 continue
390 if isinstance(receiver, WeakTypes):
391 strong = receiver()
392 if strong is None:
393 self._disconnect(receiver_id, ANY_ID)
394 continue
395 receiver = strong
396 yield receiver # type: ignore[misc]
398 def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
399 """Disconnect *receiver* from this signal's events.
401 :param receiver: a previously :meth:`connected<connect>` callable
403 :param sender: a specific sender to disconnect from, or :obj:`ANY`
404 to disconnect from all senders. Defaults to ``ANY``.
406 """
407 sender_id: IdentityType
408 if sender is ANY:
409 sender_id = ANY_ID
410 else:
411 sender_id = hashable_identity(sender)
412 receiver_id = hashable_identity(receiver)
413 self._disconnect(receiver_id, sender_id)
415 if (
416 "receiver_disconnected" in self.__dict__
417 and self.receiver_disconnected.receivers
418 ):
419 self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
421 def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None:
422 if sender_id == ANY_ID:
423 if self._by_receiver.pop(receiver_id, False):
424 for bucket in self._by_sender.values():
425 bucket.discard(receiver_id)
426 self.receivers.pop(receiver_id, None)
427 else:
428 self._by_sender[sender_id].discard(receiver_id)
429 self._by_receiver[receiver_id].discard(sender_id)
431 def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None:
432 """Disconnect a receiver from all senders."""
433 self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID)
435 def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None:
436 """Disconnect all receivers from a sender."""
437 sender_id = t.cast(IdentityType, sender_ref.sender_id)
438 assert sender_id != ANY_ID
439 self._weak_senders.pop(sender_id, None)
440 for receiver_id in self._by_sender.pop(sender_id, ()):
441 self._by_receiver[receiver_id].discard(sender_id)
443 def _cleanup_bookkeeping(self) -> None:
444 """Prune unused sender/receiver bookkeeping. Not threadsafe.
446 Connecting & disconnecting leave behind a small amount of bookkeeping
447 for the receiver and sender values. Typical workloads using Blinker,
448 for example in most web apps, Flask, CLI scripts, etc., are not
449 adversely affected by this bookkeeping.
451 With a long-running Python process performing dynamic signal routing
452 with high volume- e.g. connecting to function closures, "senders" are
453 all unique object instances, and doing all of this over and over- you
454 may see memory usage will grow due to extraneous bookkeeping. (An empty
455 set() for each stale sender/receiver pair.)
457 This method will prune that bookkeeping away, with the caveat that such
458 pruning is not threadsafe. The risk is that cleanup of a fully
459 disconnected receiver/sender pair occurs while another thread is
460 connecting that same pair. If you are in the highly dynamic, unique
461 receiver/sender situation that has lead you to this method, that
462 failure mode is perhaps not a big deal for you.
463 """
464 for mapping in (self._by_sender, self._by_receiver):
465 for _id, bucket in list(mapping.items()):
466 if not bucket:
467 mapping.pop(_id, None)
469 def _clear_state(self) -> None:
470 """Throw away all signal state. Useful for unit tests."""
471 self._weak_senders.clear()
472 self.receivers.clear()
473 self._by_sender.clear()
474 self._by_receiver.clear()
477receiver_connected = Signal(
478 """\
479Sent by a :class:`Signal` after a receiver connects.
481:argument: the Signal that was connected to
482:keyword receiver_arg: the connected receiver
483:keyword sender_arg: the sender to connect to
484:keyword weak_arg: true if the connection to receiver_arg is a weak reference
486.. deprecated:: 1.2
488As of 1.2, individual signals have their own private
489:attr:`~Signal.receiver_connected` and
490:attr:`~Signal.receiver_disconnected` signals with a slightly simplified
491call signature. This global signal is planned to be removed in 1.6.
493"""
494)
497class NamedSignal(Signal):
498 """A named generic notification emitter."""
500 def __init__(self, name: str, doc: str | None = None) -> None:
501 Signal.__init__(self, doc)
503 #: The name of this signal.
504 self.name = name
506 def __repr__(self) -> str:
507 base = Signal.__repr__(self)
508 return f"{base[:-1]}; {self.name!r}>"
511class Namespace(dict):
512 """A mapping of signal names to signals."""
514 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
515 """Return the :class:`NamedSignal` *name*, creating it if required.
517 Repeated calls to this function will return the same signal object.
519 """
520 try:
521 return self[name] # type: ignore[no-any-return]
522 except KeyError:
523 result = self.setdefault(name, NamedSignal(name, doc))
524 return result # type: ignore[no-any-return]
527class WeakNamespace(WeakValueDictionary):
528 """A weak mapping of signal names to signals.
530 Automatically cleans up unused Signals when the last reference goes out
531 of scope. This namespace implementation exists for a measure of legacy
532 compatibility with Blinker <= 1.2, and may be dropped in the future.
534 .. versionadded:: 1.3
536 """
538 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
539 """Return the :class:`NamedSignal` *name*, creating it if required.
541 Repeated calls to this function will return the same signal object.
543 """
544 try:
545 return self[name] # type: ignore[no-any-return]
546 except KeyError:
547 result = self.setdefault(name, NamedSignal(name, doc))
548 return result # type: ignore[no-any-return]
551signal = Namespace().signal